Deep dive to Kafka

Rohit goel
12 min readApr 19, 2021

--

These days, Kafka popularity is on cloud nine. This article is a deep dive to the core concepts of Kafka and will help you get into in throughly. Let’s start !!

What is Kafka?

Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. It let’s you decouple your data streams and your systems.

Now your source systems like Transaction systems, Event producers etc. will have their data end up in Apache Kafka. While your target systems like databases, analytical systems etc. will source their data straight from Apache Kafka. Hence, we can say that it provide communication between producers and consumers with the help of message based topics.

Why we need Kafka?

Kafka was created by LinkedIn. Its now an open-source project, mainly maintained by Confluent.

Following are the main reasons to use Kafka:

  • Kafka is distributed and fault-tolerant.
  • Kafka has resilient architecture, means it can recover itself from node failures automatically.
  • It scales horizontally. It can scale to 100s of brokers and can scale upto millions of messages per seconds.
  • It provides high performance. its latency is less than 10ms, means it’s realtime.
  • It is used by thousands of firms worldwide, out of which 35% are Fortune 500.

What are Kafka Real world use cases?

Kafka is an easy to fit to any of the modern day distributed/complex systems to legacy systems. Below are some of the use cases where Kafka is a right choice to use.

  • Messaging
    Kafka can be used as a replacement for a more traditional message broker. In comparison to most messaging systems, it has been observed that Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a perfect fit for large scale message processing systems.
  • Metrics
    Kafka can be used for monitoring data. This involves aggregating statistics from distributed applications to produce centralised feeds of operational data.
  • BigData Ingestion
    Kafka is being used as a front to BigData ingestion which is a common pattern in Big-data world to use it as a “Ingestion Buffer”.
    Various Kafka connectors are available these days that will sink data from Kafka to HDFS, S3, Elastic search etc.
  • Log Aggregation
    One of the most common use case of Kafka is to provide a log aggregation solution. It typically receive log files from servers and puts them in a central place (a file server or HDFS perhaps) for processing.

What are Kafka core components?

Kafka has the following core components:

Topics

Messages are organised and durably stored in Topics. Topics can be visualised as folders in the filesystem and messages are files in that folder. Messages can be read from topics as often as we need and they don’t get deleted upon consumption. You can have as many topics as you want in Kafka and each topic is identified by its name. Topics name usually depends upon the nature of stream of data they are carrying.

  • Partitions

Topics are split into partitions. Each topic is spread over the buckets located on different Kafka brokers. Once data is written to the partition, it cant be changed. It’s immutable. Messages stored in partitions are ordered.

  • Offsets

Each message within a partition gets an incremental id called, Offsets. A message within a partition is located as Offset 3 in partition 0 of topic. If we want to write a message to partition, we write it at the end of the partition and offset is incremented by 1.

Kafka Topic, Partitions and offsets

Brokers

A Kafka cluster is composed of multiple brokers(Servers). Each broker is identified with its ID. The ID is always integer and it cant be String. Each broker contains certain topic partition. Once we get connected to any of the broker, also known as Bootstrap broker, we get connected to the entire cluster, which means we can access any broker within a cluster from that Bootstrap Broker.

A replication factor is the number of copies of data over multiple brokers. The replication factor value should be greater than 1 always (between 2 or 3). It ensures the availability of the data in case of the Broker node failure.
In the above example, we can see Topic A has replication factor of 2, which means that each partition within a topic will be replicated to 2 Broker nodes. Since, Topic B has replication factor of 1, each partition will be available to only one Broker node and data loss can happen in case of node failure.

At any given time, only broker can be a leader for a partition which means only that leader can receive or serve data for that partition. The other brokers will synchronise the data with the leader. We can say that each partition will have one leader and multiple ISR(in-sync replicas).

Zookeeper

Zookeeper main job is to manage brokers. It maintains the list of all the brokers within a cluster. Zookeeper is responsible for performing leader elections for partitions. It sends notification to Kafka in case of any changes like Broker down, Broker up, New topic created or deleted etc. We can say that as of now, Kafka cannot work without Zookeeper. But Confluent is working on replacing Zookeeper with Self-Managed Metadata Quorum

Producers

Producers are the client applications that publishes messages to the Kafka topics. There can be multiple Producers sending messages to the same topic. Producers know to which topic and to which partition data need to be send. The I/O is performed by broker node for the Producer.

Producers can choose to receive acknowledgment of the data writes to brokers. There are 3 possible acks values that can be set:

  • acks=0 : Producer will not wait for acknowledgement (data may loss)
  • acks=1 : Producer will wait for Leader acknowledgment. (Limited data loss)
  • acks=2 : Producer will wait for Leader + Replicas acknowledgment. (No data loss)

Producer can choose to send keys with the messages. Keys can be String, Integer etc. If key is NULL, the data will be send in round-robin fashion to each partition. If key is send, the message with same key will always go to same partition.

Consumers

Consumers are the client applications that read messages from the subscribed topics. Consumers can read and process messages before sending them to clients. Consumer knows which broker to read data from. Data is read in an order within each partition.

Consumer Group can be created when there are more than 1 consumer. Each consumer within a group will read from an exclusive partition. It means that if there are 3 partitions within a topic, and there are 2 consumers, then Consumer-1 will read data from partition-0 and partition-1 and Consumer-2 will read from Partition-2. There will be no case where Consumer-2 can read data from partition-0 or partition-1. Once assigned, consumer will consume data from the assigned partition only. If any of the consumer within the Consumer group goes down, rebalance occurs and partitions are re-assigned to the available Consumers within the consumer group. If the consumer group is having just 1 consumer, that consumer will read data from all the partitions of the Topic.

What are Consumer Offset?

Kafka stores the offset at which a consumer group has been reading. When a consumer has successfully processed the message received from the Kafka, it should commit the offset. The offsets are committed in a Kafka topic named _consumer_offsets. If the consumer dies, it will able to read data back from where it left earlier

Consumer can choose when to commit the offset. There are 3 delivery semantics :

  • At most once : Offsets are committed as soon as message is received. If the processing goes wrong, message will be lost
  • At least once: Offsets are committed after message is processed. If the processing goes wrong, message can be read again.
  • Exactly once : Can be achieved for Kafka-> Kafka workflows using Kafka Stream API’s.

How Kafka Works ?

Below is the diagram of the Kafka Architecture.

The Kafka works as follows:

  • Client or Source Systems send data to Producers.
  • Producers produces data to Kafka. Producers know to which topic and to which partition data need to be send. The I/O is performed by Broker node for the Producer.
  • Producers can send data in round-robin fashion if keys are NULL, or can perform key based ordering and sends all the messages with same key to one partition. Partition to which data need to send is calculated by hashing.
  • Producers can ask for acknowledgement from the broker by settings acks value to either 0, or 1 or 2.
  • Zookeeper manages broker and help in electing Leader for each partition.
  • If replication factor is greater than 1, then producer sends data to the Leader, and its leader responsibility to sync it with its replicas.
  • Each message within a partition gets an incremental id called, Offsets.
  • Consumer read messages from the subscribed topics. Data is read in an order within each partition.
  • Consumer needs to commit offsets upon receiving of data from Kafka. It can be done using 3 strategies: At most once, At least once and Exactly once
  • The offsets are committed in a Kafka topic named _consumer_offsets.
  • Once data is processed by the Consumer, it can send it to various downstream systems for Analytics, or can dump it to Database, or can send data for auditing etc.
  • Data remains within a topic for 7 days by default, which can be changed depending upon the requirement.
  • Other configurations like Compression etc can also be applied. We will visit these topics later.

What is Idempotent Producer?

From Kafka 1.1 onwards, we can define a “Idempotent Producer” which means that it wont produce duplicate data in case of network error. Idempotent producers can be used only when acks is set to “all”. Let’s understand it with the help of example.

In a Good Request, Producer produces the messages and send to Kafka, Kafka commit the message and sends acknowledgement to the producer and producer successfully received the acknowledgment.

In an Idempotent request, Producer produces the messages and send to Kafka, Kafka commit the message and sends acknowledgement to the producer, but producer didn’t receive the acknowledgement. In this case, Producer will retry to produce the same message, Since that message is already committed at the first time, it will not be committed again and only acknowledgement will be send.

To make the Producer Idempotent, we just need to set the following property at Producer:

Note: Running an Idempotent producer may impact throughput and latency, always play with it with your use-case before implementing it.

What is Message Compression?

Producers usually sends data in text format. It could be Json, Text, Avro etc. In such cases, it is important to apply compression to the producer.

Compression is enabled at the Producer level and it doesn’t require any configuration change at the Broker or the Consumer level. Compression is very effective when bigger batch of message is sent to Kafka.

We need to set the following property to enable compression at Producer. Default is set none.

“compression.type” : ‘none’ , ‘gzip’, ‘snappy’, ‘lz4’

Advantages of Compression:

  • Request size is small, hence less network traffic.
  • Faster to transfer data over the network, hence less latency
  • Better throughput.

Disadvantages of Compression

  • Compression at Producer require some extra CPU cycles.
  • De-compression at Consumer require some extra CPU cycles.

What are Segments

As we all know, Topics are made up of partitions. And Partitions are made up of Segments(Files). Each segment within a partition will have a range of Offsets. Only one segment is ACTIVE within a partition(the one data is written to).

There are two segment related configurations:

log.segment.bytes: the max size of the single segment in bytes
log.segment.ms: the time Kafka will wait before committing the segment if not full

What are Log Cleanup Policies?

Log cleanup means expiring the data/messages within the Kafka cluster.
Deleting messages from Kafka allow us to control the size of the messages on the disk. Overall, it limits the maintenance work of the Kafka cluster. Log Cleanup shouldn’t happen too frequently as it utilises CPU and RAM.

There are two policies that can be applied:

Policy-1: “log.cleanup.policy=delete” : It’s a default policy for all the User topics. Message is deleted on the basis of its age. By default, Message will be deleted after 1 week. This can be tweaked by setting the following property:
log.retention.hours

Delete can also be triggered depending upon the max size of the log(default is infinity). This can be tweaked by setting the following property:
log.retention.bytes

Policy-2: “log.cleanup.policy=compact” : It’s a default policy for all the _consumer_offsets topic. Message is deleted on the basis its key. It will delete all the duplicate keys. Following are the properties that can be configured:

Segment.hours(default 7 days): Max time to wait to close active Segment.
Segment.bytes (default 1 G): Max size of segment
Min.compaction.lag.ms (default 0): How long to wait before message is compacted
Delete.retention.ms(default 24 hours): Wait before deleting data marked for compaction

How to setup and Run Kafka?

Setting up and running Kafka is pretty straight forward for the Mac or the Linux users. For Windows users, you will find some hiccup’s while setting it up, hence i would prefer to set it up using Docker with Linux image.

  • Download Kafka
    Kafka can be downloaded from the following link. Once downloaded, you need to extract the Binary.
    https://kafka.apache.org/downloads
  • Starting Zookeeper
    Run the following command.
    zookeeper-server-start.sh config/zookeeper.properties

If you find output something similar to the screenshot, it means that Zookeeper has started without any error.

  • Starting Kafka
    Run the following command:
    kafka-server-start.sh config/server.properties
  • Configuring Topics
    Following are the commands to create, describe, listing and deleting topics.

Create Topic: kafka-topics.sh — zookeeper 127.0.0.1:2181 — topic demo_topic — create — partitions 3 — replication-factor 1

Here we are creating topic with name “demo_topic” with 3 partitions and replication factor as 1.

Describe Topic: kafka-topics.sh — zookeeper 127.0.0.1:2181 — topic demo_topic — describe

It will describe the configuration of the topic

List All Topics: kafka-topics.sh — zookeeper 127.0.0.1:2181 — list

Delete Topic: kafka-topics.sh — zookeeper 127.0.0.1:2181 — topic demo_topic — delete

It will delete the demo_topic

  • Starting Producer
    Run the following command
    kafka-console-producer.sh — broker-list 127.0.0.1:9092 — topic demo_topic — producer-property acks=all

Here we are starting the producer that will produce data on topic “demo_topic” with acks set to “all”. We produced data “Hello There” on the topic demo_topic.

  • Starting Consumer
    Run the following command
    kafka-console-consumer.sh — bootstrap-server 127.0.0.1:9092 — topic first_topic

Here we are starting the Consumer that will read data from topic “demo_topic”. We consumed data “Hello There” from the topic demo_topic.

Conclusion: Deep Dive to Kafka

In this blog, we have seen what Kafka is, what are its components. We understand how each component within Kafka works. We learn about different configurations that can be applied to producers, consumers etc and how we can tweak them to get the best of the Kafka. We also learn about Idempotent Producers, Message Compressions which will help in getting throughput and less latency. And finally, we actually gets our hand dirty by starting Kafka, creating topics, producing message on topic and consuming message from topic.

I hope you all had great learning. If you have any query/suggestions, feel free to ask in the comment section or you can connect with me over the LinkedIn. Here is link for the same.
https://www.linkedin.com/in/rohit-goel-b96b6860/

--

--