One of the more challenging aspects I encountered while working with stream processing was the task of performing near-merge or as-of joins on Kafka streams.
Especially when topics exist on multiple Kafka clusters, ruling out the option to connect and subscribe to multiple topics using a single consumer.
⟶
Some frameworks seem promising at first, mentioning “joining or merging” of event streams in the documentation. Or declaring support for arbitrary operations.
In my experience however, the following occurs:
As a consequence, the functionalities that support arbitrary stateful operations (like nearby joining of topics) were implemented in-house.
The reusable data-flow model was open-sourced under the name Snapstream.
If you want to follow along, run a Kafka broker as shown in the previous post, and install snapstream:
pip install snapstream
The basic concepts are:
snap
is used to bind sources and sinks to user defined handler functionsstream
is used to process each iterable in parallelAs you can see, the Topic t
is both an iterable and a callable:
When the processing logic of your stream solely relies on the message itself, without requiring any additional data, it is referred to as “stateless processing”.
Any other type of processing usually involves caching and retrieving state. RocksDB is a popular state store that’s used in Spark, KsqlDB, Faust and other popular frameworks (including Snapstream). We can use snapstream.Cache to persist any type of data:
from snapstream import Cache
cache = Cache('db')
# value '🏆' is stored under the key 'prize'
cache('prize', '🏆')
Cache is also callable, so we can use it as our sink.
At this point, we’re ready to perform some type of arbitrary operation on our data:
from datetime import datetime as dt
weather_messages = iter([
{'timestamp': dt(2023, 1, 1, 10), 'value': '🌞'},
{'timestamp': dt(2023, 1, 1, 12), 'value': '⛅'},
{'timestamp': dt(2023, 1, 1, 13), 'value': '🌧'},
])
activity_messages = iter([
{'timestamp': dt(2023, 1, 1, 10, 30), 'value': 'swimming'},
{'timestamp': dt(2023, 1, 1, 11, 30), 'value': 'walking home'},
{'timestamp': dt(2023, 1, 1, 12, 30), 'value': 'shopping'},
{'timestamp': dt(2023, 1, 1, 13, 10), 'value': 'lunch'},
])
Let’s try and figure out the weather at the time of each activity:
from time import sleep
from snapstream import Cache, snap, stream
weather_cache = Cache('weather')
@snap(weather_messages, sink=[weather_cache])
def handle_weather(val):
event_time = val['timestamp'].timestamp()
yield event_time, val
@snap(activity_messages, sink=[print])
def handle_activity(val):
sleep(0.1)
event_time = val['timestamp'].timestamp()
for weather in weather_cache.values(backwards=True, from_key=event_time):
yield val['value'], weather['value']
break
stream()
('swimming', '🌞')
('walking home', '🌞')
('shopping', '⛅')
('lunch', '🌧')
What we just did:
handle_weather
functionhandle_activity
functionNow you might be wondering two things…
The generators can easily be replaced with Topic
instances:
from snapstream import Topic
from snapstream.codecs import JsonCodec
weather_messages = Topic('weather', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'weather',
'group.id': 'weather',
}, codec=JsonCodec())
Change the handler so that it gets the value from the Kafka message object:
@snap(weather_messages, sink=[weather_cache])
def handle_weather(msg):
val = msg.value() # grab the value from the Kafka message
event_time = val['timestamp'].timestamp()
yield event_time, val
When we move sleep(0.1)
from the handle_activity
into the handle_weather
function, we will cause weather updates to arrive later than activity events.
Imagine if the “Cloudy” and “Rainy” events had a bit of a delay. If we did nothing, we’d send out incorrect weather states for the “Shopping” and “Lunch” activities:
('swimming', '🌞')
('walking home', '🌞')
('shopping', '🌞') # incorrect
('lunch', '⛅') # incorrect
We could defer processing so that late events can be joined succesfully, which has its downsides:
('swimming', '🌞')
('walking home', '🌞')
# defer 'shopping' event
('shopping', '⛅')
# defer 'lunch' event
('lunch', '🌧')
Or we could choose to send out revisions on incorrect results, would work great together with Kafka’s log compaction:
('swimming', '🌞')
('walking home', '🌞')
('shopping', '🌞') # incorrect
('shopping', '⛅')
('lunch', '⛅') # incorrect
('lunch', '🌧')
Taking this last option as an example, we could implement it using another cache for activities together with python’s queue
module:
from queue import Queue
queue = Queue()
def revisions():
while True:
yield queue.get()
By storing all activities in a cache, we can look up the ones that need a revision later. We’re also “snapping” the new revisions()
iterable to the activity handler function:
activity_cache = Cache('activity')
@snap(activity_messages, revisions(), sink=[activity_cache])
def handle_activity(val):
event_time = val['timestamp'].timestamp()
yield event_time, val
for weather in weather_cache.values(backwards=True, from_key=event_time):
print((val['value'], weather['value']))
break
When a late arriving weather update comes in, we can cache it like normal. In addition, we can find any activities that occured after the weather event time, and put them in the queue:
@snap(weather_messages, sink=[weather_cache])
def handle_weather(val):
sleep(0.1) # cause late events
event_time = val['timestamp'].timestamp()
yield event_time, val
for activity in activity_cache.values(from_key=event_time):
queue.put(activity)
Kafka handles messages within the same partition in the order in which they arrived. By putting the activities in the queue, the handle_activity
function is called with the same activities, but now the correct weather updates will be available in the weather cache:
('swimming', '🌞')
('walking home', '🌞')
('shopping', '🌞') # incorrect
('lunch', '🌞') # incorrect
('shopping', '⛅')
('lunch', '⛅') # incorrect
('lunch', '🌧')
There will be trade-offs; with lots of late weather updates there would also be lots of revisions.