Apache Kafka Java Example(Producer + Consumer)

Apache Kafka Java Example(Producer + Consumer) thumbnail
106K
By Dhiraj 30 March, 2020

In this tutorial, we will be developing a sample apache kafka java application using maven. We will be configuring apache kafka and zookeeper in our local machine and create a test topic with multiple partitions in a kafka broker.We will have a separate consumer and producer defined in java that will produce message to the topic and also consume message from it.We will also take a look into how to produce messages to multiple partitions of a single topic and how those messages are consumed by consumer group.

You can visit this article for Kafka and Spring Boot integration.

First of all, let us get started with installing and configuring Apache Kafka on local system and create a simple topic with 1 partition and write java program for producer and consumer.The project will be a maven based project. After this, we will be creating another topic with multiple partitions and equivalent number of consumers in a consumer-group to balance the consuming between the partitions. Also, we will be having multiple java implementations of the different consumers.

Zookeeper Setup On windows

Assuming that you have jdk 8 installed already let us start with installing and configuring zookeeper on Windows.Download zookeeper from https://zookeeper.apache.org/releases.html. I have downloaded zookeeper version 3.4.10 as in the kafka lib directory, the existing version of zookeeper is 3.4.10.Once downloaded, follow following steps:

1. Extract it and in my case I have extracted kafka and zookeeper in following directory:

C:\D\softwares\kafka_2.12-1.0.1  --kafka location
C:\D\softwares\kafka-new\zookeeper-3.4.10  --zookeeper location

2. Once this is extracted, let us add zookeeper in the environment variables.For this go to Control Panel\All Control Panel Items\System and click on the Advanced System Settings and then Environment Variables and then edit the system variables as below:

zookeeper-windows-config

3. Also, edit the PATH variable and add new entry as %ZOOKEEPER_HOME%\bin\ for zookeeper.

4. Rename file C:\D\softwares\kafka-new\zookeeper-3.4.10\zookeeper-3.4.10\conf\zoo_sample.cfg to zoo.cfg

5. Now, in the command prompt, enter the command zkserver and the zookeeper is up and running on http://localhost:2181

zookeeper-console

Kafka Setup On windows

Head over to http://kafka.apache.org/downloads.html and download Scala 2.12. This version has scala and zookepper already included in it.Follow below steps to set up kafka.

1. Unzip the downloaded binary. In my case it is - C:\D\softwares\kafka_2.12-1.0.1

2. Go to folder C:\D\softwares\kafka_2.12-1.0.1\config and edit server.properties

3. log.dirs=.\logs

4. Now open a new terminal at C:\D\softwares\kafka_2.12-1.0.1.

5. Execute .\bin\windows\kafka-server-start.bat .\config\server.properties to start Kafka. Since, we have not made any changes in the default configuration, Kafka should be up and running on http://localhost:9092

running-kafka-console

Let us create a topic with a name devglan-test

cd C:\D\softwares\kafka_2.12-1.0.1\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic devglan-test

Above command will create a topic named devglan-test with single partition and hence with a replication-factor of 1. This will be a single node - single broker kafka cluster.

create-kafka-topic

Now let us create a producer and consumer for this topic.

cd C:\D\softwares\kafka_2.12-1.0.1\bin\windows
C:\D\softwares\kafka_2.12-1.0.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic devglan-test --producer
C:\D\softwares\kafka_2.12-1.0.1\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic devglan-test   --consumer

Producer can produce messages and consumer can consume messages in the following way from the terminal

producer-consumer-console

Kafka Architecture

kafka-architecture

Producers are the data source that produces or streams data to the Kafka cluster whereas the consumers consume those data from the Kafka cluster.

Kafka cluster is a collection of no. of brokers and clients do not connect directly to brokers. Instead, clients connect to c-brokers which actually distributes the connection to the clients.

Kafka cluster has multiple brokers in it and each broker could be a separate machine in itself to provide multiple data backup and distribute the load.

kafka-cluster

Each Broker contains one or more different Kafka topics. For example, Broker 1 might contain 2 different topics as Topic 1 and Topic 2. Now each topic of a single broker will have partitions. This helps in replicated commit log service and provides resilience.

Kafka topics provide segregation between the messages produced by different producers. For example, the sales process is producing messages into a sales topic whereas the account process is producing messages on the account topic. Each topic partition is an ordered log of immutable messages

Anatomy of a Topic

Let us assume we have 3 partitions of a topic and each partition starts with an index 0. The write operation starts with the partition 0 and the same data is replicated in other remaining partitions of a topic. Now, the consumer can start consuming data from any one of the partitions from any desired offset.

Offset defines the location from where any consumer is reading a message from a partition.

As we saw above, each topic has multiple partitions. Now, let us see how these messages of each partition are consumed by the consumer group. A consumer group is a group of consumers and each consumer is mapped to a partition or partitions and the consumer can only consume messages from the assigned partition.

If there are 3 consumers in a consumer group, then in an ideal case there would be 3 partitions in a topic. But if there are 4 consumers but only 3 partitions are available then any one of the 4 consumer won't be able to receive any message. We will see this implementation below:

If there are 2 consumers for a topic having 3 partitions, then rebalancing is done by Kafka out of the box.

Kafka Maven Dependency

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.12</artifactId>
	<version>1.0.1</version>
</dependency>

We require kafka_2.12 artifact as a maven dependency in a java project. It has kafka-clients,zookeeper, zookepper client,scala included in it.

Kafka Producer in Java

public class Producer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        try{
            for(int i = 0; i < 100; i++){
                System.out.println(i);
                kafkaProducer.send(new ProducerRecord("devglan-test", Integer.toString(i), "test message - " + i ));
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            kafkaProducer.close();
        }
    }
}
.

A Kafka producer is instantiated by providing a set of key-value pairs as configuration.The complete details and explanation of different properties can be found here.Here, we are using default serializer called StringSerializer for key and value serialization.These serializer are used for converting objects to bytes.Similarly,devglan-test is the name of the broker.Finally block is must to avoid resource leaks.

It will send messages to the topic devglan-test.

Kafka Consumer in Java

public class Consumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "test-group");

        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        List topics = new ArrayList();
        topics.add("devglan-test");
        kafkaConsumer.subscribe(topics);
        try{
            while (true){
                ConsumerRecords records = kafkaConsumer.poll(10);
                for (ConsumerRecord record: records){
                    System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", record.topic(), record.partition(), record.value()));
                }
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }finally {
            kafkaConsumer.close();
        }
    }
}

A consumer is also instantiated by providing properties object as configuration.Similar to the StringSerialization in producer, we have StringDeserializer in consumer to convert bytes back to Object.group.id is a must have property and here it is an arbitrary value.This value becomes important for kafka broker when we have a consumer group of a broker.With this group id, kafka broker ensures that the same message is not consumed more then once by a consumer group meaning a message can be only consumed by any one member a consumer group.

Following is a sample output of running Consumer.java. By default, there is a single partition of a topic if unspecified.

Topic - devglan-test, Partition - 0, Value: test message - 0
Topic - devglan-test, Partition - 0, Value: test message - 1
Topic - devglan-test, Partition - 0, Value: test message - 2

Creating Topics with Multiple Partition

Now, we will be creating a topic having multiple partitions in it and then observe the behaviour of consumer and producer.As we have only one broker, we have a replication factor of 1 but we have have a partition of 3.

cd C:\D\softwares\kafka_2.12-1.0.1\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic devglan-partitions-topic

Now, it's time to produce message in the topic devglan-partitions-topic. We can do it in 2 ways. Either producer can specify the partition in which it wants to send the message or let kafka broker to decide in which partition to put the messages. By default, kafka used Round Robin algo to decide which partition will be used to put the message.

Hence, as we will allow kafka broker to decide this, we don't require to make any changes in our java producer code.

But since we have, 3 partitions let us create a consumer group having 3 consumers each having the same group id and consume the message from the above topic. Ideally we will make duplicate Consumer.java with name Consumer1.java and Conumer2.java and run each of them individually.

Now, start all the 3 consumers one by one and then the producer. You can see in the console that each consumer is assigned a particular partition and each consumer is reading messages of that particular partition only.

java-producer-consumer-output

Conclusion

In this article, we discussed about setting up kafka in windows local machine and creating Kafka consumer and producer on Java using a maven project.You can share your feedback in the comment section below.

Share

If You Appreciate This, You Can Consider:

We are thankful for your never ending support.

About The Author

author-image
A technology savvy professional with an exceptional capacity to analyze, solve problems and multi-task. Technical expertise in highly scalable distributed systems, self-healing systems, and service-oriented architecture. Technical Skills: Java/J2EE, Spring, Hibernate, Reactive Programming, Microservices, Hystrix, Rest APIs, Java 8, Kafka, Kibana, Elasticsearch, etc.

Further Reading on Core Java