How to Subscribe a Kafka Consumer to Multiple Topics

1. Overview

In this tutorial, we’ll learn how to subscribe a Kafka consumer to multiple topics. This is a common requirement when the same business logic is used for various topics.

2. Create Model Class

We’ll consider a simple payment system with two Kafka topics, one for card payments and the other for bank transfers. Let’s create the model class:

public class PaymentData {
    private String paymentReference;
    private String type;
    private BigDecimal amount;
    private Currency currency;
    // standard getters and setters
}

3. Subscribe to Multiple Topics Using Kafka Consumer API

The first method we’ll discuss uses the Kafka Consumer API. Let’s add the required Maven dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>
</dependency>

Let’s also configure the Kafka consumer:

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
kafkaConsumer = new KafkaConsumer<>(properties);

Before consuming messages, we need to subscribe kafkaConsumer to both topics using the subscribe() method:

kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));

We’re ready now to test our configuration. Let’s publish one message on each of the topics:

void publishMessages() throws Exception {
    ProducerRecord<String, String> cardPayment = new ProducerRecord<>("card-payments", 
      "{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}");
    kafkaProducer.send(cardPayment).get();
    
    ProducerRecord<String, String> bankTransfer = new ProducerRecord<>("bank-transfers",
      "{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}");
    kafkaProducer.send(bankTransfer).get();
}

Finally, we can write the integration test:

@Test
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
    publishMessages();
    kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
    int eventsProcessed = 0;
    for (ConsumerRecord<String, String> record : kafkaConsumer.poll(Duration.ofSeconds(10))) {
        log.info("Event on topic={}, payload={}", record.topic(), record.value());
        eventsProcessed++;
    }
    assertThat(eventsProcessed).isEqualTo(2);
}

4. Subscribe to Multiple Topics Using Spring Kafka

The second method we’ll discuss uses Spring Kafka.

Let’s add the spring-kafka and jackson-databind dependencies to our pom.xml:

<dependency> 
    <groupId>org.springframework.kafka</groupId> 
    <artifactId>spring-kafka</artifactId>
    <version>3.0.11</version>
</dependency>
<dependency> 
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.15.2</version>
</dependency>

Let’s also define the ConsumerFactory and ConcurrentKafkaListenerContainerFactory beans:

@Bean
public ConsumerFactory<String, PaymentData> consumerFactory() {
    List<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(
      config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentData> containerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, PaymentData> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

We need to subscribe to both topics using the topics attribute of the @KafkaListener annotation:

@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")

Finally, we can create the consumer. Additionally, we’re also including the Kafka header to identify the topic where the message was received:

@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
public void handlePaymentEvents(
  PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    log.info("Event on topic={}, payload={}", topic, paymentData);
}

Let’s validate our configuration:

@Test
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
    CountDownLatch countDownLatch = new CountDownLatch(2);
    doAnswer(invocation -> {
        countDownLatch.countDown();
        return null;
    }).when(paymentsConsumer)
      .handlePaymentEvents(any(), any());
    kafkaTemplate.send("card-payments", createCardPayment());
    kafkaTemplate.send("bank-transfers", createBankTransfer());
    assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
}

5. Subscribe to Multiple Topics Using Kafka CLI

Kafka CLI is the last method we’ll discuss.

First, let’s send a message on each topic:

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic card-payments
>{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bank-transfers
>{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}

Now, we can start the Kafka CLI consumer. The include option allows us to specify the list of topics to include for message consumption:

$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --include "card-payments|bank-transfers"

Here’s the output when we run the previous command:

{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}

6. Conclusion

In this article, we learned three different methods of subscribing a Kafka consumer to multiple topics. This is useful when implementing the same functionality for several topics.

The first two methods are based on Kafka Consumer API and Spring Kafka and can be integrated into an existing application. The last one uses Kafka CLI and can be used to verify multiple topics quickly.

As always, the complete code can be found over on GitHub.

       

Commercials Cooperation Advertisements:


(1) IT Teacher IT Freelance

IT電腦補習

立刻註冊及報名電腦補習課程吧!
电子计算机 -教育 -IT 電腦班” ( IT電腦補習 ) 提供一個方便的电子计算机 教育平台, 為大家配對信息技术, 電腦 老師, IT freelance 和 programming expert. 讓大家方便地就能找到合適的電腦補習, 電腦班, 家教, 私人老師.
We are a education and information platform which you can find a IT private tutorial teacher or freelance.
Also we provide different information about information technology, Computer, programming, mobile, Android, apple, game, movie, anime, animation…


(2) ITSec

https://itsec.vip/

www.ITSec.vip

www.Sraa.com.hk

www.ITSec.hk

www.Penetrationtest.hk

www.ITSeceu.uk

Secure Your Computers from Cyber Threats and mitigate risks with professional services to defend Hackers.

ITSec provide IT Security and Compliance Services, including IT Compliance Services, Risk Assessment, IT Audit, Security Assessment and Audit, ISO 27001 Consulting and Certification, GDPR Compliance Services, Privacy Impact Assessment (PIA), Penetration test, Ethical Hacking, Vulnerabilities scan, IT Consulting, Data Privacy Consulting, Data Protection Services, Information Security Consulting, Cyber Security Consulting, Network Security Audit, Security Awareness Training.

Contact us right away.

Email (Prefer using email to contact us):
SalesExecutive@ITSec.vip

Leave a Reply

Your email address will not be published. Required fields are marked *