Event-Driven Apps Using Kafka and Python
In this blog post, we will design and implement an event-driven application using Kafka in Python. In this post, we take an example of furniture ordering from somewhere like Ikea. This is just a simple design that we do and is not similar to what is happening in the real world at Ikea.
We do this on our local machine, but for production environments, you can transfer it to a cloud provider like AWS, GCP, or Azure.

Let’s see what we have in the above architecture:
- Frontend: It can be a mobile or web app where the user can order the item. When the user selects and orders the furniture using the app, the front end is going to call the orders backend endpoint.
- Orders Backend: It takes in an order from the front end with all the data related to that order and then writes to a Kafka topic called `order_details`. The topic `order_details` is going to encapsulate all the information related to one individual order. This would be a simple Python file. You can deploy this backend service and also the next ones as some microservices on a Cloud using, for example, cloud run on GCP or Lambda on AWS.
- Transactions Backend: it subscribes to the `order_details` Kafka topic so that whenever someone writes to the topic, the transactions backend is going to read the message and process it in real-time. we assume that the transactions backend is going to do some credit card processing and some other checks to make sure that the order is confirmed. Once the order is confirmed, it’s going to write back to another Kafka topic called ‘order_confirmed’. The point of the ‘order confirmed’ topic is to encapsulate all data related to an order that has been confirmed.
- Email Backend: it subscribes to the ‘order_confirmed’ topic and sends a confirmation email to the user when the order is confirmed. It can also send a message to a topic like ‘order_email_sent’.
- Analytics Backend: it subscribes to the ‘order_confirmed’ topic and performs some analytics on it. For example, it can aggregate the total number of orders on that day and the total number of revenue coming from different orders. We can then send the analytics result on the topic ‘analytics_result’.
- Dashboard: We can also have a service to get some data from different topics and send them to a dashboard for visualization. Here, we just use one service for both of them in Python for simplicity, but you can separate them easily.
If you are new to Kafka, I highly recommend Week 6 of the data engineering zoomcamp by Datatalks club. You can also check these notes. Then you can come back here and continue.
We will need the following packages in this blog post:
kafka-python
flask
To run Kafka locally, I use the following compose file with a Kafka cluster with one broker and also one zookeeper, and some other Kafka components like control-center for UI, schema-registry, etc.:
## docker-compose-kafka.yml
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29093:29093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka-tools
container_name: kafka-tools
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
schema-registry:
image: confluentinc/cp-schema-registry:5.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
control-center:
image: confluentinc/cp-enterprise-control-center:5.4.0
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
I also like Kafka-UI and Conduktor. The control-center UI just shows the messages that are sent when the UI page for the topic is opened. I don’t like this and use Kafka-UI for the visualization. For Kafka-UI, you can add the following codes to your compose file instead of the control-center part:
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=broker:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
- KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT
- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://schema-registry:8081
We then need to run docker-compose -f docker-compose-kafka.yml up -d
to run Kafka with all the components. Note that Kafka should be running when we want to test the frontend and backend services that need to send or receive data from Kafka topics.
We can test if everything is up and running by docker-compose -f docker-compose-kafka.yml ps
command:
NAME COMMAND SERVICE STATUS PORTS
broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:29093->29093/tcp
kafka-tools "tail -f /dev/null" kafka-tools running
kafka-ui "/bin/sh -c 'java $J…" kafka-ui running 0.0.0.0:8080->8080/tcp
schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp
zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
In addition to the above docker-compose file for Kafka, we will have another one for our microservices too. You can add services one by one to the docker-compose file and test them.
Note that to be able to connect your producers and consumers to the Kafka broker:
- if you use the same network (putting microservice on the same docker-compose file as Kafka or using a separate docker-compose file and setting the network as the Kafka network), you can use
broker:29092
. - if you run your service locally on the same machine without dockerizing it, you can use
localhost:9092
in your code. - if you want to run Kafka on a machine and your services on another machine, you need to use
<kafka machine ip>:29093
in your code.
Let’s start with the backend services. we will add the front end later.
Orders Backend
Now, let’s go for the orders backend service. We will dockerize the app and put it in a separate docker-compose file called docker-compose-services.yml
and set the network the same as the Kafka network. The orders_backend.py
is a flask application as follows:
# orders_backend.py
import json
import time
from kafka import KafkaProducer
from flask import Flask, jsonify, request
ORDER_KAFKA_TOPIC = 'order_details'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`
app = Flask(__name__)
## from inside docker compose network - when add the service to compose file -> orders_backend:v1
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
# post endpoint to get user id , order id, user email, and order details
@app.route('/order', methods=['POST'])
def order():
user_id = request.json['user_id']
order_id = request.json['order_id']
user_email = request.json['user_email']
order_details = request.json['order_details']
order = {}
order['user_id'] = user_id
order['order_id'] = order_id
order['user_email'] = user_email
order['order_details'] = order_details
order['time'] = time.time()
producer.send(ORDER_KAFKA_TOPIC, order)
print("Sent order details {} to kafka topic: {}".format(order, ORDER_KAFKA_TOPIC))
return jsonify(order)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=5002, debug=True)
It has a post endpoint to get an order and publish it on a Kafka topic named ‘order_details’.
You can then easily dockerize this service. Here is the Dockerfile:
FROM python:3.9.7-slim
RUN pip install -U pip
RUN pip install pipenv
WORKDIR /app
COPY [ "Pipfile", "Pipfile.lock", "./" ]
RUN pipenv install - system - deploy
COPY [ "orders_backend.py", "./" ]
EXPOSE 5002
ENTRYPOINT ["python", "orders_backend.py"]
We can then build the image using:
docker build -t orders_backend:v1 .
The docker-compose file would be as follows:
# docker-compose-services.yml
version: "1"
services:
orders_backend:
restart: always
image: orders_backend:v1
ports:
- "5002:5002"
networks:
- ikea-ordering-kafka_default
networks:
ikea-ordering-kafka_default:
external: true
We can use postman to test it:

We can also see the messages on the topic in the UI:

Transactions Backend
This service is a simple service to listen to the ‘order_details’ topic, do some processing on the data, and send the confirmed message to the ‘order_confirmed’ topic:
# transactions_backend.py
import json
import time
from kafka import KafkaConsumer, KafkaProducer
OERDER_KAFKA_TOPIC = 'order_details'
ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`
consumer = KafkaConsumer(OERDER_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
while True:
for message in consumer:
print("Received order details: {}".format(message.value))
user_id = message.value['user_id']
order_id = message.value['order_id']
user_email = message.value['user_email']
order_details = message.value['order_details']
time = message.value['time']
## do some suff on the order and check the confirmation
order_confirmed = {}
order_confirmed['user_id'] = user_id
order_confirmed['order_id'] = order_id
order_confirmed['user_email'] = user_email
order_confirmed['order_details'] = order_details
order_confirmed['time'] = time
order_confirmed['status'] = 'confirmed'
producer.send(ORDER_CONFIRMED_KAFKA_TOPIC, order_confirmed)
print("Sent order details {} to kafka topic: {}".format(order_confirmed, ORDER_CONFIRMED_KAFKA_TOPIC))
The dockerfile is as follows:
FROM python:3.9.7-slim
RUN pip install -U pip
RUN pip install pipenv
WORKDIR /app
COPY [ "Pipfile", "Pipfile.lock", "./" ]
RUN pipenv install - system - deploy
COPY [ "transactions_backend.py", "./" ]
ENTRYPOINT ["python", "transactions_backend.py"]
You can build this image and update the docker-compose-services.yml
file:
# docker-compose-services.yml
version: "1"
services:
orders_backend:
restart: always
image: orders_backend:v1
ports:
- "5002:5002"
networks:
- ikea-ordering-kafka_default
transactions_backend:
restart: always
image: transactions_backend:v1
ports:
- "5003:5003"
networks:
- ikea-ordering-kafka_default
networks:
ikea-ordering-kafka_default:
external: true
Then again by testing the services with postman, we can see the messages coming to the topic:

Email Backend
The code for this service is as follows:
# email_backend.py
import json
import time
from kafka import KafkaConsumer, KafkaProducer
# from flask import Flask, jsonify, request
ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
EMAIL_SENT_KAFKA_TOPIC = 'order_email_sent'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
def send_email(user_id, order_id, user_email, order_details, time, status):
print("Sending email to user: {} with order details: {}".format(user_email, order_details))
# send email to user
# ...
# ...
# ...
# ...
return True
while True:
for message in consumer:
# read data from consumer and call the send_email() function
print("Received order details: {}".format(message.value))
user_id = message.value['user_id']
order_id = message.value['order_id']
user_email = message.value['user_email']
order_details = message.value['order_details']
time = message.value['time']
status = message.value['status']
email_send_status = send_email(user_id, order_id, user_email, order_details, time, status)
email_sent = {}
email_sent['user_id'] = user_id
email_sent['order_id'] = order_id
email_sent['user_email'] = user_email
email_sent['order_details'] = order_details
email_sent['time'] = time
email_sent['status'] = email_send_status
producer.send(EMAIL_SENT_KAFKA_TOPIC, email_sent)
print("Sent email details {} to kafka topic: {}".format(email_sent, EMAIL_SENT_KAFKA_TOPIC))
The docker file is also similar to previous ones with small modifications for the python file name. Then you can build the image and update the docker-compose file and run it. After sending some new messages via postman, we can see the messages on the topic in the UI:

Analytics Backend
The code for this service is as follows to get the confirmed order and calculate the total number of orders and total revenue:
# analytics_backend.py
import json
import time
from kafka import KafkaConsumer, KafkaProducer
ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
ANALYTICS_KAFKA_TOPIC = 'analytics_result'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
total_revenue = 0
total_orders_count = 0
while True:
for message in consumer:
# read data from consumer and do some analytics on it
print("Received order details: {}".format(message.value))
order_details = message.value['order_details']
total_revenue += int(order_details['price'])
total_orders_count += 1
analytics = {}
analytics['total_revenue'] = total_revenue
analytics['total_orders_count'] = total_orders_count
producer.send(ANALYTICS_KAFKA_TOPIC, analytics)
print("Sent analytics details {} to kafka topic: {}".format(analytics, ANALYTICS_KAFKA_TOPIC))
The docker file is again similar to previous ones with a small modification. Then build the image and update the compose file, and finally run it.
We can send some new messages via postman and see the messages on the topic in the UI:

That’s it for this blog post. I hope you got a general idea of how to use Kafka in your own projects. Let me know in the comments if you are interested to learn about other topics or tools :)
Thank you for taking the time to read my post. If you found it helpful or enjoyable, please consider giving it a like and sharing it with your friends. Your support means the world to me and helps me to continue creating valuable content for you.