This article will provide an overview of Kafka and how to get started with
Kafka in Python with a simple example.
What is Kafka?
Apache Kafka is a community distributed event streaming platform
capable of handling trillions of events a day. Initially conceived as a
messaging queue, Kafka is based on an abstraction of a distributed
commit log. Kafka has quickly evolved from messaging queue to a full-fledged event
streaming platform.
Kafka Concepts
This section highlights key foundational concepts around Kafka.
Topic
- In Kafka messages are organised into topics
- Topic refers to the name used to identify a stream of data
- To send a message it is written to a topic. Messages are read from the topic
- There is no limit on the number of topics in Kafka and it can typically get large
Partition
- In Kafka to parallelise a topic it can be divided into number partitions
- Partition is an sequence of records and new messages are appended to the partition
- Records in partition are assigned a sequence id and called the offset
Brokers
- Kafka is typically run as a cluster with one or more servers each of which is referred as broker
- Kafka broker, Kafka node, Kafka server typically mean the same
- Kafka brokers form a cluster by sharing information between them
Replication
- In Kafka, replication refer to the process of having multiple copies of data for availability
- Replication happens at the partition level
- Replication factor of a topic defines the number of copies to be maintained
- Kafka assigns a replica as leader. All other replicas are followers (in-sync replicas) for the leader
Producer
- Producer creates and publishes messages to one or more topics
- Producer can optionally select the partition that stores the data
Consumer
- Consumer reads and processed messages from one or more topics to perform the intended business logic
- In large scale systems there could be difference between the current message being processed by the consumer and the newest message arriving on a partition which is referred as offset lag.
Consumer Groups
- Consumer group is one or more consumers working together to process messages.
- Consumers can join a group by specifying the same group identifier.
- Kafka guarantees messages from a single partition are processed by one only consumer in the group.
- Parallelism to process the messages on a topic is defined by the number of consumers in a group.
- If there are more partitions for a topic than number of consumers in the group, some consumers will be assigned more than one partition.
- If there are less partitions for a topic than number of consumers in the group, some consumers will get no data.
Kafka Use Cases
Few use cases to understand how Kafka is being in real world use cases.
Messaging
Kafka could be used as replacement for more traditional message broker systems
like RabbitMQ. Messaging is typically used to decouple data producers and data
processing consumers. Kafka makes a a good solution for large scale
message processing applications.
Real time data analytics
Real time data analytics use cases - examples live traffic data, inventory
management for better consumer experience, medical diagnostics data, compute
performance metrics etc. require high throughput, low latency, performance
guarantees to deliver the real time experience. Kafka stream processing plays
a key role.
Log Aggregation
Log aggregation typically collects log files of servers and micro services and
puts them in a central place for processing. An example use case of log
aggregation could be to analyse call flows in a distributed system. Kafka
abstracts the details of files and provides a stream of messages which allows
for low latency processing.
Setting up Kafka
Follow these instructions to get started with Kafka.
- Download the latest version of Kafka
- Extract the download file
- Start the Zookeeper service
- Start the Kafka broker service
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz cd kafka_2.13-2.8.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Confluent Kafka for Python
Confluent Kafka is a reliable, performant and feature-rich Python client for
Apache Kafka v0.8 and above. Follow these instructions to get started
with confluent kafka.
- Create and activate a Python virtual environment
- Install confluent kafka module
python3 -m venv kafka-demo cd kafka-demo source activate
pip install confluent-kafka
We now have a working environment for Kafka with Python.
Python producer and consumer example using Confluent Kafka
Now let us write a simple producer and consumer to demonstrate the concepts
discussed above. It is pretty simple with confluent kafka and is only few
lines of code.
Let us focus on the producer first. This is an example of synchronous producer. Here we connect to the Kafka server on 9092 port and use the produce call to write a message to Kafka topic. We use the flush call to make the writes synchronous and to ensure the messages are delivered before the producer is shutdown.
from confluent_kafka import Producer, KafkaException try: producer = Producer({'bootstrap.servers': "localhost:9092"}) producer.produce("topic_1", "This is my test message") producer.flush() except KafkaException as e: print("Kafka failure " + e)
Synchronous writes limit the throughput and the recommendation is to use asynchronous writes. Let us look at an example. Here we connect to the Kafka server on 9092 port and use the produce call to write a message to Kafka topic. If there is a need to receive notification of delivery success or failure a callback can be associated. No delivery notification events are propagated until the poll() method is invoked.
from confluent_kafka import Producer, KafkaException def acked(err, msg): if err is not None: print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err)) else: print("Message produced: %s" % msg.value().decode('utf-8')) try: producer = Producer({'bootstrap.servers': "localhost:9092"}) producer.produce("topic_1", "This is my async test message", callback=acked) producer.poll(1) except KafkaException as e: print('Kafka failure ' + e)
Now let us write a simple consumer. We specify the Kafka server and the consumer group as the configuration to create the consumer. We subscribe to the topic of interest before going into a consume loop. In the consume loop we call the poll() method to retrieve the messages from the topic. If there are no messages then poll() method return None.
from confluent_kafka import Consumer consumer = Consumer({'bootstrap.servers': "localhost:9092", 'group.id': "mygroup"}) try: consumer.subscribe(["topic_1"]) while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): print("Consumer error: {}".format(msg.error())) continue msg_val = msg.value().decode('utf-8') print(msg_val) finally: consumer.close()
(kafka-demo) localhost> python consumer.py This is my async test message This is my async test messageWhen you run the producer application we should see the messages being received by the consumer.
localhost >python producer.py Message produced: This is my async test messageGet access to the source code of this example in GitHub.
0 comments:
Post a Comment