Skip to content

Fix: Kafka Consumer Not Receiving Messages, Connection Refused, and Rebalancing Errors

FixDevs ·

Quick Answer

How to fix Apache Kafka issues — consumer not receiving messages, auto.offset.reset, Docker advertised.listeners, max.poll.interval.ms rebalancing, MessageSizeTooLargeException, and KafkaJS errors.

The Error

Your Kafka consumer connects successfully but receives no messages:

[Consumer] INFO Subscribed to topic(s): orders
[Consumer] INFO Kafka Consumer started
# ... silence. No messages.

Or you hit a connection error on startup:

# KafkaJS
KafkaJSConnectionError: Connection error: getaddrinfo ENOTFOUND kafka kafka:9092

# confluent-kafka-python
%3|1672531200.000|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]:
localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed:
Connection refused (after 2ms in state CONNECT)

Or the consumer gets kicked from its group repeatedly:

Heartbeat timeout. The coordinator considers the consumer dead.
Group rebalancing triggered.

Or a producer fails to send:

org.apache.kafka.common.errors.TimeoutException: Expiring 150 record(s) for
orders:0: 30025 ms has passed since batch creation plus linger time

Each of these is a separate root cause that needs a different fix.

Why This Happens

Most Kafka issues come down to three concepts:

  1. Offsets: Kafka tracks where each consumer group left off. If the offset is already at the latest message, the consumer sits idle waiting for new ones — it won’t re-read old messages unless you reset.
  2. Advertised listeners: Kafka tells clients which address to connect to. In Docker, this address is often wrong — the broker advertises a hostname that only resolves inside the container.
  3. Poll interval: Consumers must call poll() within max.poll.interval.ms or Kafka removes them from the group and reassigns their partitions.

Fix 1: Consumer Not Receiving Messages — Check auto.offset.reset

When a consumer joins a group for the first time (no committed offset), or when its last committed offset has expired from the log, Kafka looks at auto.offset.reset:

ValueBehavior
latestStart from now — only read messages produced after the consumer joined. Old messages are skipped.
earliestStart from the beginning — read all retained messages in the topic.
noneThrow an error if no valid committed offset exists.

The default is latest. This is why a consumer that joins after messages were produced sees nothing — it started after those messages and there’s no new data yet.

Fix: Set auto.offset.reset=earliest when you need to read existing messages:

# confluent-kafka-python
from confluent_kafka import Consumer

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'  # Read from beginning on first join
})
// KafkaJS
const consumer = kafka.consumer({
  groupId: 'my-group',
  fromBeginning: true  // Equivalent to auto.offset.reset=earliest
});

await consumer.subscribe({ topic: 'orders', fromBeginning: true });

Important: auto.offset.reset only applies when there’s no committed offset for the consumer group and topic. If the group already has a committed offset, Kafka uses that instead — regardless of this setting. To force re-reading from the beginning, either use a new group.id or reset the offsets manually.

Common Mistake: Developers test with auto.offset.reset=earliest, successfully read old messages, then deploy to production with latest (the default) and wonder why the consumer is silent. Always be explicit about auto.offset.reset in your configuration — never rely on the default in production code.

Wrong topic name: Also check for typos. Kafka topic names are case-sensitive. Orders and orders are different topics. If auto-create is enabled, Kafka may silently create a new empty topic instead of throwing an error.

Fix 2: Consumer Not Receiving Messages — Too Many Consumers

You can have at most one consumer per partition within the same consumer group. Extra consumers sit idle with zero partitions assigned.

Topic: orders
  ├── partition 0 → consumer-1
  ├── partition 1 → consumer-2
  └── partition 2 → consumer-3

consumer-4 → (no partition assigned, receives nothing)
consumer-5 → (no partition assigned, receives nothing)

If you have 2 consumers but 1 partition, one consumer handles all messages and the other receives nothing. The fix is to increase the partition count — you need at least as many partitions as the maximum number of consumers you want to run in parallel:

# Check partition count
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic orders

# Increase partitions (cannot be decreased after creation)
kafka-topics.sh --bootstrap-server localhost:9092 --alter \
  --topic orders --partitions 6

Note: Increasing partitions on an existing topic doesn’t rebalance existing data. New messages are distributed across all partitions going forward. Consider partition count at topic creation time — the rule of thumb is: partitions = max expected consumer parallelism.

Fix 3: Connection Refused — Docker advertised.listeners Not Set

This is the most common Docker + Kafka issue. When a Kafka broker starts, it tells clients which address to use for future connections. This is the advertised listener — and by default it’s the broker’s hostname, which is only resolvable inside the Docker network.

Your client connects, gets metadata saying “connect to kafka:9092”, then fails because kafka doesn’t resolve outside the container.

Fix: Configure two listener names — one for Docker-internal connections, one for host connections:

The env var names differ by Docker image — Confluent (confluentinc/cp-kafka) uses KAFKA_*, Bitnami uses KAFKA_CFG_* — but the underlying Kafka property names are the same. Here’s the Bitnami image format which also works well with Kafka 4.x KRaft mode:

# docker-compose.yml
services:
  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    environment:
      # KRaft mode (no ZooKeeper required in Kafka 4.x)
      KAFKA_CFG_NODE_ID: "1"
      KAFKA_CFG_PROCESS_ROLES: "broker,controller"
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      # Two listeners: one for Docker network, one for host
      KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  • Applications inside Docker (same docker-compose.yml) connect to kafka:29092
  • Applications on your host machine connect to localhost:9092
# From host machine
consumer = Consumer({'bootstrap.servers': 'localhost:9092', ...})

# From another Docker container in the same network
consumer = Consumer({'bootstrap.servers': 'kafka:29092', ...})

Pro Tip: If you see ENOTFOUND kafka from your host application, you’re using the internal listener address. If you see Connection refused from inside Docker, you’re using the external one. Check which bootstrap server address your application is actually using. For general Docker networking issues between containers, see docker-compose networking not working.

Fix 4: Consumer Group Rebalancing — max.poll.interval.ms Exceeded

Kafka requires consumers to call poll() regularly to prove they’re still alive. If processing takes longer than max.poll.interval.ms (default: 5 minutes), Kafka declares the consumer dead and triggers a rebalance — all consumers in the group stop processing while partitions are reassigned.

You’ll see something like:

WARN  The coordinator considers the consumer dead. Sending the consumer to rejoin the group.
INFO  Group rebalancing triggered (max.poll.interval.ms exceeded)

Two fixes depending on the root cause:

If your processing is slow but expected:

# confluent-kafka-python — increase the interval
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'max.poll.interval.ms': 600000,  # 10 minutes
    'max.poll.records': 50           # Process fewer records per poll
})
// KafkaJS — heartbeat is sent automatically in eachMessage
// For slow processing, use eachBatch and call heartbeat manually
await consumer.run({
  eachBatch: async ({ batch, heartbeat }) => {
    for (const message of batch.messages) {
      await processMessage(message);  // Slow operation
      await heartbeat();              // Tell Kafka we're still alive
    }
  }
});

If processing is fast but messages per batch are too many:

Reduce max.poll.records so each poll cycle finishes quickly:

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'max.poll.records': 100  # Process 100 instead of 500 per poll
})

The heartbeat / session timeout relationship:

heartbeat.interval.ms = 3000     (send heartbeat every 3s)
session.timeout.ms = 10000       (broker waits 10s for heartbeat)
max.poll.interval.ms = 300000    (5 min max between poll() calls)

Keep heartbeat.interval.ms at roughly 1/3 of session.timeout.ms. Setting session.timeout.ms too low causes frequent unnecessary rebalances; too high means failed consumers take longer to be detected.

Fix 5: Offset Management — Consumers Reprocessing After Restart

If your consumer reprocesses the same messages every time it restarts, it’s not committing offsets.

With enable.auto.commit=true (default):

Auto-commit happens every auto.commit.interval.ms (default: 5 seconds). If the consumer crashes between processing a message and the next auto-commit, those messages are reprocessed on restart. This is acceptable for idempotent workloads (e.g., updating a counter), but problematic for side effects (e.g., sending emails).

With enable.auto.commit=false (manual commit):

You must explicitly commit after processing:

# confluent-kafka-python
from confluent_kafka import Consumer, KafkaException

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    if msg.error():
        raise KafkaException(msg.error())

    # Process the message
    process_order(msg.value())

    # Commit only after successful processing
    consumer.commit(message=msg, asynchronous=False)
// KafkaJS — autoCommit is a run() option, not a consumer() option
const consumer = kafka.consumer({ groupId: 'my-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });

await consumer.run({
  autoCommit: false,  // Disable auto-commit here
  eachMessage: async ({ topic, partition, message }) => {
    await processOrder(message.value);
    // Commit offset + 1 (next message to read)
    await consumer.commitOffsets([{
      topic,
      partition,
      offset: (Number(message.offset) + 1).toString()
    }]);
  }
});

Common Mistake: When committing manually, commit offset + 1, not the message’s offset. Kafka’s committed offset means “the next offset to read”, not “the last offset read.”

Fix 6: Producer Send Failures

TimeoutException: Expiring X record(s):

The producer buffered messages but couldn’t deliver them to the broker before request.timeout.ms expired. Check the broker is running and reachable first. Then tune the producer:

from confluent_kafka import Producer

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'request.timeout.ms': 30000,    # Wait 30s for broker ack (default varies)
    'linger.ms': 100,               # Wait 100ms for batch to fill before sending
    'batch.size': 32768,            # 32KB batches
    'buffer.memory': 67108864,      # 64MB buffer
    'acks': 'all'                   # Wait for all replicas to acknowledge
})

MessageSizeTooLargeException:

Default max message size is 1 MB. You must increase it in three places — they must all be consistent:

# Broker: message.max.bytes (in server.properties or environment variable)
KAFKA_MESSAGE_MAX_BYTES=10485760  # 10MB

# Producer: max.request.size
# Consumer: max.partition.fetch.bytes
# Producer
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'max.request.size': 10485760  # 10MB
})

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'max.partition.fetch.bytes': 10485760  # 10MB
})

BufferExhaustedException:

The producer’s memory buffer is full — you’re producing faster than messages are being sent. Increase buffer.memory (default: 32 MB) or reduce the production rate:

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'buffer.memory': 134217728,  # 128MB
    'max.block.ms': 5000         # Block for 5s before throwing (default: 60s)
})

Still Not Working?

LEADER_NOT_AVAILABLE on First Connection

This error appears briefly during topic creation or broker startup while partition leaders are being elected. It’s transient and resolves within a few seconds. If it persists:

  1. Check that all brokers are running: kafka-broker-api-versions.sh --bootstrap-server localhost:9092
  2. Check partition leader status: kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic your-topic — a leader of -1 means no leader is elected
  3. Ensure the topic exists and has the correct replication factor for your broker count (can’t have replication factor of 3 with only 1 broker)

Kafka 4.x — ZooKeeper Is Gone (KRaft Mode)

As of Kafka 4.0 (released early 2025), ZooKeeper support is completely removed. If you’re upgrading from Kafka 3.x and your config still references ZooKeeper, it won’t start:

# Old config (Kafka 3.x with ZooKeeper) — no longer works in 4.x
zookeeper.connect=localhost:2181

# New KRaft config (Kafka 4.x)
# No ZooKeeper needed — Kafka manages its own metadata

For local development with Docker on Kafka 4.x, use the official apache/kafka or bitnami/kafka image which defaults to KRaft mode. Remove any ZooKeeper containers from your docker-compose.yml. See docker-compose depends-on not working if your application container starts before Kafka is ready and fails on the first connection attempt.

KafkaJS — Graceful Shutdown

Calling consumer.disconnect() from inside an eachMessage or eachBatch handler throws an error. Disconnect must happen outside the message loop:

const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'my-group' });

const shutdown = async () => {
  await consumer.disconnect(); // Safe to call here
  process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });

// eachMessage handler — do NOT call consumer.disconnect() here
await consumer.run({
  eachMessage: async ({ message }) => {
    await processMessage(message);
  }
});

SASL Authentication Errors

If you’re connecting to a managed Kafka service (Confluent Cloud, AWS MSK, Aiven) and getting authentication errors:

# confluent-kafka-python with SASL PLAIN + TLS
consumer = Consumer({
    'bootstrap.servers': 'your-broker.example.com:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'your-api-key',
    'sasl.password': 'your-api-secret',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})
// KafkaJS with SASL PLAIN
const kafka = new Kafka({
  brokers: ['your-broker.example.com:9092'],
  ssl: true,
  sasl: {
    mechanism: 'plain',
    username: 'your-api-key',
    password: 'your-api-secret'
  }
});

Check that the sasl.mechanism matches what the broker requires — PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 are not interchangeable. If you’re using a similar message broker and hitting connection issues, RabbitMQ connection refused covers the same class of AMQP authentication and network problems.

Checking Consumer Group Status

When a consumer connects but receives nothing, inspect the consumer group:

# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Describe a group — shows lag, offsets, and partition assignments
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-group

The LAG column tells you how many messages the consumer is behind. If lag is 0 and no messages arrive, the consumer is caught up and waiting for new messages (expected behavior). If lag is large and not decreasing, the consumer is stuck.

If your KAFKA_CFG_* environment variables are set but not taking effect, see docker-compose env not loaded for common .env file and variable substitution pitfalls.

To reset offsets to the beginning for a stopped consumer group:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group --topic orders \
  --reset-offsets --to-earliest --execute
F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles