Apache Kafka Java Example(Producer + Consumer)

By Dhiraj Ray, 20 March,2018   0K

In this tutorial, we will be developing a sample apache kafka java application using maven. We will be configuring apache kafka and zookeeper in our local machine and create a test topic with multiple partitions in a kafka broker.We will have a separate consumer and producer defined in java that will produce message to the topic and also consume message from it.We will also take a look into how to produce messages to multiple partitions of a single topic and how those messages are consumed by consumer group.

First of all, let us get started with installing and configuring Apache Kafka on local system and create a simple topic with 1 partition and write java program for producer and consumer.The project will be a maven based project. After this, we will be creating another topic with multiple partitions and equivalent number of consumers in a consumer-group to balance the consuming between the partitions. Also, we will be having multiple java implementations of the different consumers.

Zookeeper Setup On windows

Assuming that you have jdk 8 installed already let us start with installing and configuring zookeeper on Windows.Download zookeeper from https://zookeeper.apache.org/releases.html. I have downloaded zookeeper version 3.4.10 as in the kafka lib directory, the existing version of zookeeper is 3.4.10.Once downloaded, follow following steps:

1. Extract it and in my case I have extracted kafka and zookeeper in following directory:

C:\D\softwares\kafka_2.12-1.0.1  --kafka location
C:\D\softwares\kafka-new\zookeeper-3.4.10  --zookeeper location

2. Once this is extracted, let us add zookeeper in the environment variables.For this go to Control Panel\All Control Panel Items\System and click on the Advanced System Settings and then Environment Variables and then edit the system variables as below:

zookeeper-windows-config

3. Also, edit the PATH variable and add new entry as %ZOOKEEPER_HOME%\bin\ for zookeeper.

4. Rename file C:\D\softwares\kafka-new\zookeeper-3.4.10\zookeeper-3.4.10\conf\zoo_sample.cfg to zoo.cfg

5. Now, in the command prompt, enter the command zkserver and the zookeeper is up and running on http://localhost:2181

zookeeper-console

Kafka Setup On windows

Head over to http://kafka.apache.org/downloads.html and download Scala 2.12. This version has scala and zookepper already included in it.Follow below steps to set up kafka.

1. Unzip the downloaded binary. In my case it is - C:\D\softwares\kafka_2.12-1.0.1

2. Go to folder C:\D\softwares\kafka_2.12-1.0.1\config and edit server.properties

3. log.dirs=.\logs

4. Now open a new terminal at C:\D\softwares\kafka_2.12-1.0.1.

5. Execute .\bin\windows\kafka-server-start.bat .\config\server.properties to start Kafka. Since, we have not made any changes in the default configuration, Kafka should be up and running on http://localhost:9092

running-kafka-console

Let us create a topic with a name devglan-test

cd C:\D\softwares\kafka_2.12-1.0.1\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic devglan-test

Above command will create a topic named devglan-test with single partition and hence with a replication-factor of 1. This will be a single node - single broker kafka cluster.

create-kafka-topic

Now let us create a producer and consumer for this topic.

cd C:\D\softwares\kafka_2.12-1.0.1\bin\windows
C:\D\softwares\kafka_2.12-1.0.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic devglan-test --producer
C:\D\softwares\kafka_2.12-1.0.1\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic devglan-test   --consumer

Producer can produce messages and consumer can consume messages in the following way from the terminal

producer-consumer-console

Kafka Maven Dependency

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.12</artifactId>
	<version>1.0.1</version>
</dependency>

We require kafka_2.12 artifact as a maven dependency in a java project. It has kafka-clients,zookeeper, zookepper client,scala included in it.

Kafka Producer in Java

public class Producer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        try{
            for(int i = 0; i < 100; i++){
                System.out.println(i);
                kafkaProducer.send(new ProducerRecord("devglan-test", Integer.toString(i), "test message - " + i ));
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            kafkaProducer.close();
        }
    }
}
.

A Kafka producer is instantiated by providing a set of key-value pairs as configuration.The complete details and explanation of different properties can be found here.Here, we are using default serializer called StringSerializer for key and value serialization.These serializer are used for converting objects to bytes.Similarly,devglan-test is the name of the broker.Finally block is must to avoid resource leaks.

It will send messages to the topic devglan-test.

Kafka Consumer in Java

public class Consumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "test-group");

        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        List topics = new ArrayList();
        topics.add("devglan-test");
        kafkaConsumer.subscribe(topics);
        try{
            while (true){
                ConsumerRecords records = kafkaConsumer.poll(10);
                for (ConsumerRecord record: records){
                    System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", record.topic(), record.partition(), record.value()));
                }
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }finally {
            kafkaConsumer.close();
        }
    }
}

A consumer is also instantiated by providing properties object as configuration.Similar to the StringSerialization in producer, we have StringDeserializer in consumer to convert bytes back to Object.group.id is a must have property and here it is an arbitrary value.This value becomes important for kafka broker when we have a consumer group of a broker.With this group id, kafka broker ensures that the same message is not consumed more then once by a consumer group meaning a message can be only consumed by any one member a consumer group.

Following is a sample output of running Consumer.java. By default, there is a single partition of a topic if unspecified.

Topic - devglan-test, Partition - 0, Value: test message - 0
Topic - devglan-test, Partition - 0, Value: test message - 1
Topic - devglan-test, Partition - 0, Value: test message - 2

Creating Topics with Multiple Partition

Now, we will be creating a topic having multiple partitions in it and then observe the behaviour of consumer and producer.As we have only one broker, we have a replication factor of 1 but we have have a partition of 3.

cd C:\D\softwares\kafka_2.12-1.0.1\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic devglan-partitions-topic

Now, it's time to produce message in the topic devglan-partitions-topic. We can do it in 2 ways. Either producer can specify the partition in which it wants to send the message or let kafka broker to decide in which partition to put the messages. By default, kafka used Round Robin algo to decide which partition will be used to put the message.

Hence, as we will allow kafka broker to decide this, we don't require to make any changes in our java producer code.

But since we have, 3 partitions let us create a consumer group having 3 consumers each having the same group id and consume the message from the above topic. Ideally we will make duplicate Consumer.java with name Consumer1.java and Conumer2.java and run each of them individually.

Now, start all the 3 consumers one by one and then the producer. You can see in the console that each consumer is assigned a particular partition and each consumer is reading messages of that particular partition only.

java-producer-consumer-output

Conclusion

In this article, we discussed about setting up kafka in windows local machine and creating Kafka consumer and producer on Java using a maven project.You can share your feedback in the comment section below.

Further Reading on Apache Kafka

1. Stream Log4j Logs To Kafka

2. Spring Boot Jms Activemq Example

3. Spring Jms Activemq Integration Example

4. Kafka Elasticsearch Logstash Example

If You Appreciate What We Do Here On Devglan, You Should Consider:

References

Kafka Documentation