Last Updated on September 5, 2024

Spring for Apache Kafka provides a high-level abstraction over native Kafka APIs, simplifying the development process while still allowing access to low-level features when needed. It offers template classes for sending messages, annotation-driven listener containers, and support for message conversion and routing.

In this article, we’ll explore the key components of Spring for Apache Kafka, discuss its benefits, and provide practical examples of how to use it in your applications.

Apache Kafka

Apache Kafka is a distributed streaming platform designed to handle real-time data processing at high throughput and low latency. It is widely used for building scalable and reliable data pipelines, handling big data, and powering real-time analytics applications.

Kafka operates on a publish-subscribe model, where producers publish messages to topics and consumers subscribe to these topics to receive messages. Messages are stored in partitions on a cluster of nodes, ensuring durability and fault tolerance. Kafka also supports features like message ordering, replication, and stream processing.

Spring Framework provides a seamless integration with Apache Kafka, simplifying the process of building Kafka-based applications. It offers a high-level abstraction over the Kafka client API, making it easier to consume and produce messages.

Key Components of Spring for Apache Kafka

KafkaTemplate

KafkaTemplate is a key component in the Spring for Apache Kafka framework.

It is a high-level abstraction for sending messages to Kafka topics. It handles the heavy lifting of creating producers and managing connections, allowing developers to focus on the business logic of sending messages.

ProducerFactory and ConsumerFactory

These factories create Kafka Producer and Consumer instances. They manage the lifecycle of these instances and provide a way to share common configurations across multiple producers or consumers.

@KafkaListener Annotation

This annotation simplifies the process of creating message consumers. It allows you to create message listeners with minimal boilerplate code, automatically handling deserialization and thread management.

Producer/Consumer Example

Let’s look at how to implement a simple producer and consumer using Spring for Apache Kafka. We will need Kafka locally installed on ou system.

Kafka with Docker

In this example we use Docker Compose to set up Kafka environment, as it makes the installation and configuration process much easier.

Create a new file called docker-compose.yml in your project directory and add the following content:

version: '2'

services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "my-topic:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

run the following command:

docker-compose up -d 

That will bring up two docker containers

✔ Container kafka-kafka-1 Running ✔ Container kafka-zookeeper-1 Running

For the spring part, we have two projects: producer and consumer. For each project we need to add Kafka dependency in respective pom.xml

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

Spring Boot Consumer

package org.codeline;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(id = "myId", topics = "mytopic")
    public void listen(String in) {
        System.out.println(in);
    }

}

In the above consumer example, the application uses @KafkaListener annotation to setup listener for topic “mytopic”.

By starting the application, listener is up and listening for messages in mytopic.

Spring Boot Producer

The producer application uses TopicBuilder classbto create topic with name “mytopic” with 10 partitions and 1 replica.

We send a message to the topic on the start by injecting KafkaTemplate into ApplicationRunner class.

package org.codeline;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {

        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("mytopic")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("mytopic", "message from producer");
        };
    }

}

If we start the application on different port than the consumer, the message is sent and output of consumer application console:

2024-09-04T21:20:13.340+02:00 INFO 68804 — [demo] [ myId-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-myId-1, groupId=myId] Node -1 disconnected. message from producer

Send CustomObjects

We can send or receive a Java object. We’ll be sending and receiving Message object in our example.

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {

    String msg;
}

To be able to do this we need to define custom serializers.

In consumer Configuration class:

public ConsumerFactory<String, Message> messageConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Message.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> messageKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(messageConsumerFactory());
        return factory;
    }

In producer Configuration class:

@Bean
public ProducerFactory<String, Message> messageProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Message> messageKafkaTemplate() {
       return new KafkaTemplate<>(messageProducerFactory());
}

If we start consumer to wait for messages, then start producer application, we get following message in consumer application console.

2024-09-05T18:48:38.655+02:00 INFO 31638 — [ntainer#0-0-C-1] org.codeline.Application : Received message through MessageConverterUserListener [Message(msg=message)]

Conclusion

This guide has offered a comprehensive overview of Apache Kafka, from its core concepts to practical implementation. We’ve demonstrated how to set up Kafka in Spring Boot, produce and consume messages, and handle different message scenarios. Kafka’s versatility and scalability make it a powerful choice for building real-time data pipelines.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top