Spring Boot Apache Kafka

By Dhiraj Ray, 12 April,2018   2K

In my last article, we created a sample Java and Apache Kafka subscriber and producer example.In this article, we will be using spring boot 2 feature to develop a sample Kafka subscriber and producer application.We will take a look at the use of KafkaTemplate to send messages to Kafka topics, @KafkaListener annotation to listen to those messages and @SendTo annotation to forward messages to a specified topic.We will also take a look at how to produce messages to multiple partitions of a single topic and how those messages are consumed by a consumer group along with custom message converters in spring boot.

What is Kafka

Kafka is a distributed, partitioned, replicated commit log service which provides the functionality of a messaging system but with a unique design.It guarantees to provide high throughput, speed, scalability, and durability.It can handle hundreds of thousands to millions of messages per second on a small cluster.Kafka cluster is a collection of no. of brokers and clients do not connect to brokers directly and failover.Instead, clients connect to c-brokers which actually distributes the connection to the clients. Similarly, each Kafka topic partition is an ordered log of immutable messages, append-only.Not only the messages produced by the producers are saved in an order, Kafka guarantees the consumer to receive the messages in the same order.

Setting Up Apache Kafka and Zookeeper

We have already done our Kafka and Zookeeper setup in our last article here. Hence, we will be skipping the detailed explanation here. Instead, we will summarise our configurations here.For now, we are creating a topic with a single partition and later we will create multiple partitions and observe the behavior with spring boot.As we have only one broker, we have a replication factor of 1.
Zookeeper - http://localhost:2181
Kafka - http://localhost:9092
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic devglan-test

Setting Up Spring Boot and Kafka

Let us head over to start.spring.io to generate our spring boot project. spring-boot-kafka This will bring following kafka maven dependencies.With spring boot 2.0.1.RELEASE, we have spring-kafka:2.1.5 version and kafka-clients:1.0.1.
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

Kafka Producer in Spring Boot

There is a bare minimum configuration required to get started with Kafka producer in a spring boot app.We don's have to manually define a KafkaTemplate bean with all those Kafka properties. Spring boot will by default do it for us. Following is our implementation of Kafka producer.

package com.devglan.springbootkafka.service.impl;

import com.devglan.springbootkafka.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class ProducerServiceImpl implements ProducerService{

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Value("${devglan.kafka.topic}")
    private String kafkaTopic;

    public void send() {
         for(int i = 0; i < 100; i++){
            System. out.println(i);
            kafkaTemplate.send(kafkaTopic, Integer.toString(i),  "test message - " + i );
        }

    }
}

Above, Producer class will send message to the topic - devglan-test and spring boot will by default inject KafkaTemplate. By default, spring will look at http://localhost:8080 to bootstrap Kafka server and this can be changed with a simple key entry - spring.kafka.bootstrap-servers in application.properties.Different default Kafka properties that is used by KafkaTemplate are summarised below:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

Kafka Consumer in Spring Boot

To initialise a Kafka consumer in a normal spring app, we need to define individual beans for ConcurrentKafkaListenerContainerFactory, ConsumerFactory, Listener but while dealing with spring boot, it will by default create these beans for us.We only require to add some properties configurations in our application.properties.

ConsumerServiceImpl.java
package com.devglan.springbootkafka.service.impl;

import com.devglan.springbootkafka.service.ConsumerService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class ConsumerServiceImpl implements ConsumerService{

    @Value("${devglan.kafka.topic}")
    private String kafkaTopic;

    @KafkaListener(topics = "${kafka.topic.boot}")
    public void receive(ConsumerRecord record) {
        System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", kafkaTopic, record.partition(), record.value()));
    }
}

Boot takes care of most of the configuration; when using a local broker, the only properties we need are: application.properties
spring.kafka.consumer.group-id=test-group
spring.kafka.consumer.auto-offset-reset=earliest

The first because we are using group management to assign topic partitions to consumers so we need a group, the second to ensure the new consumer group will get the messages we just sent, because the container might start after the sends have completed.

Testing Application

Now, we are all set to test this application.For this purpose, first of all let us start our zookeeper.

C:\D\softwares\kafka-new\zookeeper-3.4.10\zookeeper-3.4.10\bin\zkServer

Next, let us start our Kafka server.

C:\Users\only2dhir>cd C:\D\softwares\kafka_2.12-1.0.1
C:\D\softwares\kafka_2.12-1.0.1>.\bin\windows\kafka-server-start.bat .\config\server.properties

Now our Kafka server is up and let us create a topic with name - devglan-test

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic devglan-test

Once, this is done we can now run our SpringBootKafkaApplication.java as a java application and hit http://localhost:8080/send. Doing so the messages would be sent the Kafka topic and same will be consumed by our Consumer class.

spring-boot-kafka-test-result

KafkaTemplate ProducerListener

Apart from the send(String topic, V data) method, KafkaTemplate provides many other convenience methods to send data to kafka topics.All the methods can be found here.The send() method returns ListenableFuture> and we require to invoke get() that blocks the sending thread, to await the result.Instead, we can configure the KafkaTemplate with a ProducerListener to get an async callback with the results of the send (success or failure) instead of waiting for the Future to complete.

By default, the KafkaTemplate is configured with a LoggingProducerListener which logs errors and does nothing when the send is successful.We can also configure the KafkaTemplate with a ProducerListener to get an async callback instead of waiting for the Future to complete.

public interface ProducerListener {

    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);

    void onError(String topic, Integer partition, K key, V value, Exception exception);

    boolean isInterestedInSuccess();

}

KafkaListener Annotation

The @KafkaListener annotation provides a mechanism for simple POJO listeners but we can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets):

@KafkaListener(id = "bar", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord record) {
    ...
}

Also, we can parse all the headers metadata from the message. Different header lists are:

KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

Also the method invocation result can be forwared to the topic specified by the @SendTo.

@KafkaListener(topics = "test-topic")
@SendTo("${replyTopic}")
public Collection replyingBatchListener(String in) {
 ...
}

Conclusion

In this article, we discussed about integrating Spring boot with kafka with a sample application.We also covered the different variants of KafkaTemplate and @KafkaListener.

Further Reading on Apache Kafka

1. Stream Log4j Logs To Kafka

2. Apache Kafka Java Example

3. Kafka Elasticsearch Logstash Example

If You Appreciate What We Do Here On Devglan, You Should Consider: