Processing messages from Kafka is relatively easy, but common errors can easily be avoided by having a better understanding of how Kafka works internally. We’ll process messages from the start of the topic, and see how Kafka handles offsets step by step.
Create a virtual environment and install the following dependencies:
Make sure that docker
is installed, then create a docker-compose.yml
file with the following contents:
version: "3.6"
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29091:29091"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9091,PLAINTEXT_HOST://localhost:29091
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
healthcheck:
test:
[
"CMD",
"kafka-topics",
"--list",
"--bootstrap-server",
"localhost:9091",
]
interval: 5s
timeout: 10s
retries: 50
Spin up your Kafka cluster:
docker-compose up
Create two jupyter notebooks: producer.ipynb
and consumer.ipynb
.
# producer.ipynb
from kafka import KafkaProducer
cfg = {
'bootstrap_servers': 'localhost:29091',
'security_protocol': 'PLAINTEXT',
}
p = KafkaProducer(**cfg)
# consumer.ipynb
from kafka import KafkaConsumer
cfg = {
'bootstrap_servers': 'localhost:29091',
'security_protocol': 'PLAINTEXT',
# 'group_id': 'test',
# 'auto_offset_reset': 'earliest',
}
c = KafkaConsumer(**cfg)
c.subscribe('test') # subscribing to Kafka topic 'test'
Let’s start sending data, add a cell to producer.ipynb
and send one message to Kafka:
p.send('test', b'123') # Sending message to topic 'test'
In consumer.ipynb
add a cell with the following code to start reading from the test
topic:
try:
for m in c:
print(m)
except KeyboardInterrupt:
pass
You’ll see zero output messages. That is because our consumer starts reading as of now, and it won’t process the messages on the backlog.
Send an additional message in a new cell:
p.send('test', b'456')
This time you will see the message in the output:
ConsumerRecord(topic='test' partition=0 offset=1 timestamp=1665395214... timestamp_type=0 key=None value=b'456' headers=[] checksum=None serialized_key_size=-1 serialized_value_size=3 serialized_header_size=-1)
Now change the code to:
c.poll() # triggers creation of consumer group
c.seek_to_beginning() # moves to beginning of the topic
try:
for m in c:
print(m)
except KeyboardInterrupt:
pass
This time you will see both messages:
ConsumerRecord(topic='test' partition=0 offset=0 timestamp=1665395214... timestamp_type=0 key=None value=b'123' headers=[] checksum=None serialized_key_size=-1 serialized_value_size=3 serialized_header_size=-1)
ConsumerRecord(topic='test' partition=0 offset=1 timestamp=1665395214... timestamp_type=0 key=None value=b'456' headers=[] checksum=None serialized_key_size=-1 serialized_value_size=3 serialized_header_size=-1)
Each message has an offset, we just started reading from offset 0. The offset can be used by the application to continue where it left off after it restarts (or crashes).
Kafka stores this offset for you if you provide a (consumer) group_id
. Uncomment the line in your consumer config:
'group_id': 'test',
Change your code back to:
try:
for m in c:
print(m)
except KeyboardInterrupt:
pass
Again, there is no output data.
Because this is a new consumer group, there’s no offset stored, so it starts at the “latest” offset by default (this is the case for this library: kafka-python
).
This can be changed by setting the auto_offset_reset
configuration property to “earliest”. Other than that, it’s possible to manually select the starting offset regardless of whether the consumer group offset was stored or not:
start_at_beginning = False
if start_at_beginning:
c.poll() # triggers creation of consumer group
c.seek_to_beginning() # moves to beginning of the topic
try:
for m in c:
print(m)
except KeyboardInterrupt:
pass
It’s also possible to figure out whether a consumer group exists, which offset it’s reading from (per partition). Create a new notebook called admin.ipynb
:
from kafka import KafkaAdminClient
cfg = {
'bootstrap_servers': 'localhost:29091',
'security_protocol': 'PLAINTEXT',
}
a = KafkaAdminClient(**cfg)
List the consumer groups:
a.list_consumer_groups()
[('test', 'consumer')] # consumer group 'test' which is a 'consumer'
You could decide to read from the first available offset (depending on the topic retention period) if the consumer group is new:
# check whether 'test' is not a key of {'test': 'consumer'}
# which means that no such consumer group exists
start_at_beginning = 'test' not in dict(a.list_consumer_groups())
When the consumer group has been used, you can see which offset it is at:
a.list_consumer_group_offsets(group_id='test')
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=4, metadata='')}
When a message has been received, it is assumed that all previous messages have been processed. By default, the consumer commits the offset to the Kafka cluster.
How many times the offset is committed depends on the auto_commit_interval_ms
property. So if the processed messages have not yet been committed, and the app crashes, it might occur that messages that have already been processed are processed once again (duplicates).
The alternative would be to disable auto commit enable_auto_commit
, and do the committing yourself:
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
# we only have one partition
partition = TopicPartition('test', partition=0)
try:
for m in c:
print(m) # the message is considered processed
next_offset = m.offset + 1 # offset of next message the app should consume
c.commit_async({
partition: OffsetAndMetadata(next_offset, None)
})
except KeyboardInterrupt:
pass
As you can see, we commit the next offset rather than the current one.
This article introduced key concepts for Kafka consumers and message processing. We covered handling offsets, reading messages from the start of a topic, creating and managing consumer groups, and manual message tracking. The provided example code offers a practical demonstration that can be expanded upon for larger applications. By grasping these concepts, you can enhance your Kafka consumer’s efficiency and reliability in processing messages from topics.