All Articles

Introduction to Spring Kafka

Overview

Apache Kafka is a distributed streaming platform which allows you to:

  • Publish and subscribe to streams of records, like a message queue.
  • Store stream of records.
  • Process stream of records.

This article will focus only on the first use-case, using Kafka as a message queue. We will go through producing messages and consuming messages. You can go to Apache Kafka’s introduction page for more details.

Using Spring for Apache Kafka

Spring for Apache Kafka applies core Spring concepts to develop Kafka-based messaging solutions. It provides us with an abstraction over Kafka API’s so that we can focus on producing and consuming messages.

There are other Spring-based Kafka solutions such as Spring Integration and Spring Cloud Stream, but we won’t need those for now.

Installation and Setup

Kafka

This article assumes that you have installed Kafka and know how to start the server. If not, you can follow Kafka’s installation guide. If you are on Mac OS and using Homebrew, you can just do brew install kafka to install and then brew services start kafka to start the server.

If you have the server running, create a topic (i.e. person). You can follow Kafka’s topic creation guide.

Project Setup

Since we will be using Spring, we can get started by initialising a Spring Boot project with Spring Initiliazr with Kafka messaging as a dependency.

In some cases we need to make sure that spring-kafka, kafka-clients and Kafka installation (broker) versions are compatible. Please use the compatibility table in the spring-kafka project page if you encounter any problems.

Sending Messages

There are several ways you can produce messages using spring-kafka. The easiest way is to use KafkaTemplate. spring-kafka provides a default KafkaTemplate bean which uses String for the message key and String for the message itself. This may change depending on the spring-kafka version used, please refer to the appropriate reference doc.

These are the bare minimum configurations we need to put into our application.properties or application.yml in order to use KafkaTemplate:

spring.kafka.bootstrap-servers=localhost:9092
kafka.topics.person=person

spring.kafka.bootstrap-servers is a spring-kafka property which configures our Kafka broker address. By default this is set to localhost:9092.

kafka.topics.person is our custom property for the topic which we created earlier. We will need to tell KafkaTemplate to which topic we should send our messages.

We can test our application by making our main class implement org.springframework.boot.CommandLineRunner and using KafkaTemplate there.

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	@Value("${kafka.topics.person}")
	private String topic;

	public static void main(String[] args) {
		SpringApplication.run(SpringKafkaApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		String message = "{\"name\": \"John Doe\", \"age\": 99}";
		kafkaTemplate.send(topic, message);
	}
}

When we run the application you should see a log of the the producer configuration values. A shortened version looks like this:

INFO --- [main] o.a.k.clients.producer.ProducerConfig: ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]

If we want to set the message key ourselves, we can use the other send method which takes a message key: send(String topic, String key, String data). The send method is an overloaded method, please refer to the appropriate API doc as needed.

Consuming Messages

There are also several ways how we can consume messages. The easiest way is to use the @KafkaListener annotation on a method. For more information on how you can use @KafkaListener, please see the appropriate reference.

@Component
public class PersonEvents {

    private static final Logger logger = LoggerFactory.getLogger(PersonEvents.class);

    @KafkaListener(topics = "${kafka.topics.person}")
    public void handle(String message) {
        logger.info("Received message: {}", message);
    }
}

The bare minimum information we need to give @KafkaListener is the topic it should subscribe or listen to. The topics attribute can be a ‘topic name’, ‘property-placeholder keys’ or ‘expressions’. Our example uses a property-placeholder key.

topics is, actually, not the bare minimum we need for our @KafkaListener to work. Not seen in our example is the groupId. The groupId uniquely identifies the group of consumer processes to which our consumer belongs. Visit Kafka’s introduction for more information.

We don’t see the groupId attribute here because we’ve set a default one for all of the listeners in our application in application.properties or application.yml:

spring.kafka.consumer.group-id=bobsantosjr
spring.kafka.consumer.auto-offset-reset=earliest

If you want to override the default value set by spring.kafka.consumer.group-id, then you can set the groupId attribute of @KafkaListener.

You might be wondering what spring.kafka.consumer.auto-offset-reset is. By default, a consumer is set to get the latest message from the topic. This means that a consumer won’t be able to read messages sent to a topic before it’s created and connected to the topic.

In our example, we are sending the message using the KafkaTemplate and we are not sure if our consumer is ready when our message is sent. Setting the offset to earliest will make sure that our consumer will be able to read the message our KafkaTemplate produced. Just remember, you might not want to do this for your consumer group in a production environment.

If there are no errors, you should see the log message we have in our listener:

INFO --- [ntainer#0-0-C-1] c.bobsantosjr.springkafka.PersonEvents   : Received message: {"name": "John Doe", "age": 99}

Converting Messages to POJO’s

For non-trivial applications, you would want to consume messages and convert them to objects.

And as always, spring-kafka provides us with several ways to easily convert our messages to objects.

One of them is to use a deserializer called JsonDeserializer. However, this deserializer usually depends on message headers to infer the type conversion or you can create different consumer factory containers for each type you have. For more details on serialization and deserialization please see the reference.

An easier way to do it is to create a KafkaListenerContainerFactory and set StringJsonMessageConverter as its converter.

But first, we need to add the ever reliable jackson library as one of our dependencies for JSON processing:

implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.8'

Let’s create a configuration class where we will override the default KafkaListenerContainerFactory bean:

@Configuration
public class KafkaConfig {

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(kafkaProperties));
        factory.setMessageConverter(new StringJsonMessageConverter());

        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }
}

If you don’t want to override the default KafkaListenerContainerFactory bean, you can create a new bean and then set the containerFactory attribute in your @KafkaListener.

We need to create our POJO for the message:

public class Person {
    private String name;
    private int age;

   // getters and setters

    @Override
    public String toString() {
        return "Person{" + "name='" + name + '\'' + ", age=" + age + '}';
    }
}

And then we can replace our handle method’s parameter to be Person instead of String.

@KafkaListener(topics = "${kafka.topics.person}")
public void handle(Person person) {
    logger.info("Received message: {}", person);
}

Running our application will show us a log of the message like this:

INFO --- [ntainer#0-0-C-1] c.bobsantosjr.springkafka.PersonEvents: Received message: Person{name='John Doe', age=99}

For more details on custom message conversion, you can check the reference.

Conclusion

There are a lot of ways how you can use Kafka in Java. Fortunately using spring-kafka gives us a very easy way to do messaging over Kafka. One of its main advantages is it’s very good reference and documentation. You can find the list of references and documentation here.

You can also easily customise spring-kafka by adding or changing configuration through properties. If you find yourself searching for the correct property, just search for spring.kafka in this very helpful appendix.

The code sample is available at my GitHub. The master branch has the simple string producer and consumer. The custom message conversion is in the json-converter branch.