In the era of Big Data, lots and lots of data(volume) are being produced every second(velocity) from various sources like social media, blogs that I am writing currently, e-commerce, etc., which gets stored across different platforms in different schemas(varieties). In order to perform any ETL (Extract, Transform, Load) operation, a messaging/streaming system is needed which should be asynchronous and loosely coupled i.e. data from various sources/clients like hdfs, Cassandra, RDBMS, application log file, etc. could be dumped at a single place at the same time without all the clients depending on each other. One of the solutions to the problem is Kafka — An open-source distributed streaming platform created by LinkedIn and later donated to Apache. It is written in Scala.
Messages: It is basically a key-value pair contains useful data/record in the value section.
Topic: For multi-tenancy, multiple topics can be created which is just a feed name to which messages are published and subscribed.
Offset: Messages are stored in a sequential form similar to commit log and a sequential id is provided to each message starting from 0.
Broker: Kafka cluster consists of brokers which are just nodes in the cluster hosting stateless server maintained by a zookeeper. Since there is no master-slave concept here, all brokers are peers. Let's understand zookeeper first before proceeding further.
What is Zookeeper and Why is it needed in Kafka Cluster?
Zookeeper is a system for distributed cluster management. It is a distributed key-value store. It is highly-optimized for reads but writes are slower. It consists of an odd number of znodes known as an ensemble. In Kafka, it is needed for:
- Controller Election: All the read and writes from a partition for particular topics happen through the leader of the replica. Whenever the leader goes down, a new leader is elected by the zookeeper.
- Configuration of Topics: Metadata related to a topic that whether a particular topic is sitting in the broker, how many partitions are there, etc. are stored at the zookeeper end and are continuously in-sync whenever a message is produced.
- Access Control List(ACL) of a topic is maintained at a zookeeper.
Some of the key features of Kafka, which is a challenge for conventional messaging system makes it more popular:
- High Throughput: Throughput stands for the number of messages in a second (rate of messages) that can be processed. As we can partition the topic which can spread across different brokers, we can achieve thousands of reads and writes per second.
- Distributed: A distributed system is one which is split into multiple running machines, all of which work together in a cluster to appear as one single node to the end-user. Kafka is distributed as it stores, read and write the data on several nodes called a broker which along with Zookeeper collectively creates an ecosystem known as Kafka Cluster.
- Persistence: The message queue is maintained completely on a disk rather than keeping it in memory and several copies/replica called as ISR (in-sync replica)of the same data can be stored across different nodes. Hence, there is no chance of data loss due to failover scenarios and makes it durable.
- Scalability: Any system can be scaled horizontally or vertically. Vertical scalability means adding more resources like CPU, Memory to the same nodes and incurs a high operational cost. Horizontal scalability can be achieved by simply adding a few more nodes in the cluster which increases the capacity demands. Kafka scales horizontally means we can add a new nodes/broker in the cluster whenever we run out of capacity/space.
- Fault-Tolerant: If we have n topics each having m partitions then all n*m partitions will be replicated on q brokers if we set the replication factor to q. Hence, making it tolerant as a factor of q-1 i.e. we can afford the failure of q-1 broker nodes. Replication factor should always be less than or equal to the number of brokers as violating this condition will end up having two copies of the same replica on a single broker which doesn’t make sense.
Note: Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages.
Kafka scala library can be downloaded from here: http://kafka.apache.org/downloads which contains the zookeeper as well.
After extraction of zip, let’s say HOME_DIR = kafka_2.12–2.3.0/
First of all, we need to turn the zookeeper up by providing the dataDir name and port in HOME_DIR/config/zookeeper.properties.
Zookeeper will be up and running on 2181 port by default. Now, we need to start Kafka server. the script is located in the bin folder. Server related config can be made in HOME_DIR/config/server.properties. Let’s make
This will start a Kafka server at port 9091. Cheatsheets:
Creating a topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 partitions 1 --topic topic1Describing a particular topic:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic1Deleting a topic:
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic1
Note: Partition once set to a particular value cannot be decreased, it can only be increased.
How Producer writes a message?
The Producer first fetches the Metadata of the topic in order to know which broker needs to be updated with the message. Metadata is also stored at the brokers and is in continuous sync with the zookeeper because the zookeeper nodes are generally very less as compared to the no. of brokers. So, many producers would like to connect to the zookeeper to access the metadata and the performance degrades. Now, once the producer gets the metadata about the topic and partition, it writes the message in logs of the leader broker node and followers (ISR) copy it.
This writes operation can be either synchronous [i.e. the status about the message acknowledgment is returned to the producer only when the followers also copy that message in their log] or asynchronous [i.e. only the leader is updated with the new message, status is sent to the producer]. Retention Period: Messages on a disk can be persisted for a particular duration of time known as retention period, after that period automatic purging of the old messages will happen and will no longer be available for consumption. By default, it’s set to 7 days.
The message can be written to a topic in three Strategies:
a. send(key, value, topic, partition): specifically providing the partition in which writes needs to happen. This is not encouraged to use as it may create an imbalance in partition size.
b. send(key, value, topic): Here, default HashPartitioner is used to determine the partition in which the message will be written by finding the hash of key and taking mod with no. of partition for that topic. Our own custom Partitioner can also be written.
c. send(key=null, value, topic): In this case, the message is stored in all the partition in a round-robin fashion.
Creating a console producer:
bin/kafka-console-producer.sh --broker-list localhost:9091 --topic topic1Changing retention period for a topic to 10sec:
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name topic1 --add-config retention.ms=10000
A producer can send the messages in the form of a batch to improve efficiency. Once a batch reaches a particular size limit, it is dumped to the queue once. However, the offset will be sequential only for all the individual message and are deflated at the consumer end before passing it to the consumer API.
Producer API :
A Kafka producer is an application that can act as a source of data in a Kafka cluster. A producer can publish messages to one or more Kafka topics using the API provided by the Kafka jar files/dependencies. A properties object containing the configuration on storing the message needs to set before sending the message. Main classes ProducerRecord, KafkaProducer, Callback.
How consumer retrieves a message?
The consumer also retrieves the messages in the same way producer writes it by looking up into the metadata and reads the message from the leader partition. As Kafka is very fast and can get real-time messages, a single consumer will certainly have latency in reading a big chunk of a message from a topic known as consumer lag.
To overcome this problem, A Consumer group can be created which consists of several consumers having the same group id. Each consumer connects with a unique partition divided equally among all the consumers. The assignment of the partition to a particular consumer is the responsibility of Group Coordinator — One of the brokers in the cluster is nominated for this role. In order to manage the list of active consumers, all the consumer of a group sends their heartbeat to the group coordinator. The number of consumers in a group should be less than or equal to the number of partitions in that particular topic, violating the condition will end up in a situation where a consumer sits idle.
Creating a simple Consumer:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1Creating consumer to read from particular offset and partition:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1 --offset 0 --partition 1Creating a consumer group:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1 --group groupNameMultiple process/consumer of the above groupName can be created which is called Consumer Group.
More than one consumer can read a single topic at the same time. Now, in order to remember till which offset the particular read, storage known as consumer offset as a hidden topic - __consumer_offsets is provided to store the last offset of a partition read by the consumer of a particular group.
The Consumer offset has the key as — > [Group Id, Topic, Partition] and value — > [Offset, …]
Similar to producer API, Kafka provides classes to connect to the bootstrap servers and get the messages. Deserializer needs to be written when passing a message of other than standard data types.
Java API of Kafka for Producer and Consumer can be found here: https://github.com/ercsonusharma/learnkafka
How Kafka is Fast?
Kafka follows a certain strategy which is part of its design to make it perform better and faster.
- No Random Disk Access: It uses a sequential data structure known as an immutable queue where read and write operation is always constant time O(1). It appends the message at the end and read from the beginning or from a particular offset.
- Sequential I/O: Modern operating systems allocate most of their free memory to disk-caching and are faster for storing and retrieving sequential data.
- Zero Copy: The data from disk is unnecessarily loaded into the application memory as it is not being modified at all. So, instead of loading it to the application, it sends the same data from the kernel context buffer over the socket, NIC buffer and to the network.
- Batching of messages: Several messages are grouped together in order to avoid the multiple network call.
- Message Compression: Before transferring the message over the wire, it is compressed using compression algorithm like gzip, snappy, etc. and decompressed at the consumer API layer.
How the data reside on the Broker instance / physical disk?
All the messages in a broker are stored in the log directory (log-dir-1) configured in the config file before turning the Kafka server up. Inside that directory, a folder containing a partition of a particular topic can be found in the format as topic_name-partition_number e.g. topic1–0. The __consumer_offsets topic is also stored in the same log directory.
Inside the partition directory of a particular topic, Kafka segment file 0000–00.log, index file 0000–00.index and time index 0000–00.timeindex can be found. All the data belonging to that partition are written in an active segment as a new segment file is created when the old segment size or time limit is reached. Indexes map each offset to their message’s position in the log. Since offset is sequential binary search is applied to find a data index in log file at a particular offset.
Log Compacted Topics
Duplicate Keys are marked for deletion from segment file. The value here can be updated and deleted by passing a null value to a particular key.
Thanks for reading! Watch this space for advanced Kafka story. Please do read my other stories as well.