This blog post is the first of a series of posts on how to setup a Stream processing pipeline with Python and the Faust library. If you’re already familiar with Faust’s general concepts, you can move straight to the part II to see Faust in action.
Faust is a library porting the concepts of Kafka Streams to Python. If you’re not familiar at all with Kafka, it might be a good idea to read the Kafka documentation and the Kafka Streams section before diving into the series.
For this first post, we’ll just go through the Faust basics and describe the main concepts and its general API. It’s not the most exciting part but it’s going to be a good reference when we’ll get our hands dirty with the implementation.
Kafka Streams provides a client library with a few abstractions on top of Apache Kafka. Because Faust mostly reuses the exact same concepts, learning to use Faust is actually a great introduction to Kafka Streams if you’re familiar with Python.
Ok, let’s get onto it!
Application 🔗
The Application is the starting point of your Faust stream processing journey. It is an instance of the library and provides access to most of the core API of Faust via Python decorators.
To create an application, you need an app ID, a broker and a driver to use for persisting data (optional).
import faust
app = faust.App('my-app-id', broker='kafka://', store='rocksdb://')
Agent, Stream and Processor 🔗
In Kafka Streams terms, a Faust agent is a stream processor. It consumes from a topic and processes every message it finds.
In Faust, an agent is an async def function that can process infinite streams in parallel. If you’re not familiar with asyncio, you should probably have a look at the Python offical documentation for asyncio.
The agent is used as a decorator to your processing function. Your async function must iterate over the stream using the async for loop.
@app.agent(value_type=Order)
async def order(orders):
async for order in orders:
# process infinite stream of orders.
print(f'Order for {order.account_id}: {order.amount}')
The stream itself is the data you process and you iterate over with the agent.
The Processor is the function that will transform your data. You can have any number of processors and you can chain them in order.
def add_default_language(value: MyModel) -> MyModel:
if not value.language:
value.language = 'US'
return value
async def add_client_info(value: MyModel) -> MyModel:
value.client = await get_http_client_info(value.account_id)
return value
s = app.stream(my_topic,
processors=[add_default_language, add_client_info])
Table 🔗
Depending on your use case, you might want to persist data (e.g. you want to collect the number of clicks per day on a link).
Tables allow you to do this in a sharded way. They are distributed key/values stores using RocksDB under the hood. In your code, you just use normal Python dictionaries for storing data.
click_topic = app.topic('clicks', key_type=str, value_type=int)
# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)
@app.agent(click_topic)
async def count_click(clicks):
async for url, count in clicks.items():
counts[url] += count
Model and Record 🔗
Models describe the fields of your data structures.
The only type supported at the time of writing is the Record type, which is the equivalent of a Python dictionary (key value mapping).
Conclusion 🔗
This was a short and not that sweet introduction to Python Faust and the Kafka Streams. Hope that gave you good overview of the general concepts and we’ll try to make things a bit more fun and practical with the next post!