{"id":74600,"date":"2024-01-07T13:13:13","date_gmt":"2024-01-07T13:13:13","guid":{"rendered":"https:\/\/www.baeldung.com\/?p=171971"},"modified":"2024-01-07T13:13:13","modified_gmt":"2024-01-07T13:13:13","slug":"read-multiple-messages-with-apache-kafka","status":"publish","type":"post","link":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/2024\/01\/07\/read-multiple-messages-with-apache-kafka\/","title":{"rendered":"Read Multiple Messages with Apache Kafka"},"content":{"rendered":"<p><img src=\"https:\/\/www.baeldung.com\/wp-content\/uploads\/2021\/09\/Java-5-Featured-1024x536.png\" class=\"webfeedsFeaturedVisual wp-post-image\" alt=\"\" style=\"float: left; margin-right: 5px;\" decoding=\"async\" srcset=\"https:\/\/www.baeldung.com\/wp-content\/uploads\/2021\/09\/Java-5-Featured-1024x536.png 1024w, https:\/\/www.baeldung.com\/wp-content\/uploads\/2021\/09\/Java-5-Featured-300x157.png 300w, https:\/\/www.baeldung.com\/wp-content\/uploads\/2021\/09\/Java-5-Featured-768x402.png 768w, https:\/\/www.baeldung.com\/wp-content\/uploads\/2021\/09\/Java-5-Featured-100x52.png 100w, https:\/\/www.baeldung.com\/wp-content\/uploads\/2021\/09\/Java-5-Featured.png 1200w\" sizes=\"(max-width: 580px) 100vw, 580px\" \/><\/p>\n<h2 id=\"bd-overview\" data-id=\"overview\">1. Overview<\/h2>\n<div class=\"bd-anchor\" id=\"overview\"><\/div>\n<p>In this tutorial, we&#8217;ll explore how the Kafka <em>Consumer<\/em> retrieves messages from the broker. We&#8217;ll learn the configurable properties that can directly impact how many messages the Kafka Consumer reads at once. Finally, we&#8217;ll explore how adjusting these settings affects the <em>Consumer<\/em>&#8216;s behavior.<\/p>\n<h2 id=\"bd-setting-up-the-environment\" data-id=\"setting-up-the-environment\">2. Setting up the Environment<\/h2>\n<div class=\"bd-anchor\" id=\"setting-up-the-environment\"><\/div>\n<p><strong>Kafka Consumers are fetching records for a given partition in batches of configurable sizes. We cannot configure the exact number of records to be fetched in one batch, but we can configure the size of these batches, measured in bytes.<\/strong><\/p>\n<p>For the code snippets in this article, we&#8217;ll need a simple Spring application that uses the <a href=\"https:\/\/feeds.feedblitz.com\/~\/t\/0\/0\/baeldung\/~https:\/\/mvnrepository.com\/artifact\/org.apache.kafka\/kafka-clients\">kafka-clients<\/a> library to interact with the Kafka broker. We&#8217;ll create a Java class that internally uses a <em>KafkaConsumer<\/em> to subscribe to a topic and log the incoming messages. If you want to dive deeper, feel free to read through our article dedicated to the <a href=\"https:\/\/feeds.feedblitz.com\/~\/t\/0\/0\/baeldung\/~https:\/\/www.baeldung.com\/kafka-create-listener-consumer-api\">Kafka Consumer API<\/a> and follow along.<\/p>\n<p>One of the key differences in our example will be the logging: Instead of logging one message at a time, let&#8217;s collect them and log the whole batch. This way, we&#8217;ll be able to see exactly how many messages are fetched for each <em>poll(). <\/em>Additionally, let&#8217;s enrich the log by incorporating details like the initial and final offsets of the batch along with the consumer&#8217;s <em>groupId:<\/em><\/p>\n<pre><code class=\"language-java\">class VariableFetchSizeKafkaListener implements Runnable {\r\n    private final String topic;\r\n    private final KafkaConsumer&lt;String, String&gt; consumer;\r\n    \r\n    \/\/ constructor\r\n    @Override\r\n    public void run() {\r\n        consumer.subscribe(singletonList(topic));\r\n        int pollCount = 1;\r\n        while (true) {\r\n            List&lt;ConsumerRecord&lt;String, String&gt;&gt; records = new ArrayList&lt;&gt;();\r\n            for (var record : consumer.poll(ofMillis(500))) {\r\n                records.add(record);\r\n            }\r\n            if (!records.isEmpty()) {\r\n                String batchOffsets = String.format(&quot;%s -&gt; %s&quot;, records.get(0).offset(), records.get(records.size() - 1).offset());\r\n                String groupId = consumer.groupMetadata().groupId();\r\n                log.info(&quot;groupId: {}, poll: #{}, fetched: #{} records, offsets: {}&quot;, groupId, pollCount++, records.size(), batchOffsets);\r\n            }\r\n        }\r\n    }\r\n}<\/code><\/pre>\n<p><strong>The <a href=\"https:\/\/feeds.feedblitz.com\/~\/t\/0\/0\/baeldung\/~https:\/\/www.baeldung.com\/docker-test-containers\">Testcontainers<\/a> library will help us set up the test environment by spinning up a Docker container with a running Kafka broker. <\/strong>If you want to learn more about setting up the Testcontainer&#8217;s Kafka module, check out how we <a href=\"https:\/\/feeds.feedblitz.com\/~\/t\/0\/0\/baeldung\/~https:\/\/www.baeldung.com\/kafka-create-listener-consumer-api#testing\">configured the test environment here<\/a> and follow along.<\/p>\n<p>In our particular case, we can define an additional method that publishes several messages on a given topic. For instance, let&#8217;s assume we are streaming values read by a temperature sensor to a topic named &#8220;<em>engine.sensor.temperature<\/em>&#8220;:<\/p>\n<pre><code class=\"language-java\">void publishTestData(int recordsCount, String topic) {\r\n    List&lt;ProducerRecord&lt;String, String&gt;&gt; records = IntStream.range(0, recordsCount)\r\n      .mapToObj(__ -&gt; new ProducerRecord&lt;&gt;(topic, &quot;key1&quot;, &quot;temperature=255F&quot;))\r\n      .collect(toList());\r\n    \/\/ publish all to kafka\r\n}<\/code><\/pre>\n<p>As we can see, we have used the same key for all the messages. As a result, all records will be sent to the same partition. For payload, we&#8217;ve used a short, fixed text depicting a temperature measurement.<\/p>\n<h2 id=\"bd-testing-the-default-behaviour\" data-id=\"testing-the-default-behaviour\">3. Testing the Default Behaviour<\/h2>\n<div class=\"bd-anchor\" id=\"testing-the-default-behaviour\"><\/div>\n<p><strong>Let&#8217;s start by creating a Kafka listener using the default consumer configuration. Then, we&#8217;ll publish a few messages to see how many batches our listener consumes. <\/strong>As we can see, our custom listener uses the Consumer API internally. As a result, to instantiate <em>VariableFetchSizeKafkaListener<\/em>, we&#8217;ll have to configure and create a <em>KafkaConsumer<\/em> first:<\/p>\n<pre><code class=\"language-java\">Properties props = new Properties();\r\nprops.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());\r\nprops.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);\r\nprops.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\r\nprops.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\r\nprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;default_config&quot;);\r\nKafkaConsumer&lt;String, String&gt; kafkaConsumer = new KafkaConsumer&lt;&gt;(props);<\/code><\/pre>\n<p>For now, we&#8217;ll use <em>KafkaConsumer<\/em>&#8216;s default values for the minimum and maximum fetch sizes. Based on this consumer, we can instantiate our listener and run it asynchronously to avoid blocking the main thread:<\/p>\n<pre><code class=\"language-\">CompletableFuture.runAsync(\r\n  new VariableFetchSizeKafkaListener(topic, kafkaConsumer)\r\n);<\/code><\/pre>\n<p>Finally, let&#8217;s block the test thread for a few seconds, giving some time for the listener to consume the messages. The goal of this article is to start the listeners and observe how they perform. We&#8217;ll use the Junit5 tests as a convenient way of setting up and exploring their behavior, but for simplicity, we won&#8217;t include any specific assertions. As a result, this will be our starting <em>@Test:<\/em><\/p>\n<pre><code class=\"language-java\">@Test\r\nvoid whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {\r\n    String topic = &quot;engine.sensors.temperature&quot;;\r\n    publishTestData(300, topic);\r\n    Properties props = new Properties();\r\n    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());\r\n    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);\r\n    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\r\n    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\r\n    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;default_config&quot;);\r\n    KafkaConsumer&lt;String, String&gt; kafkaConsumer = new KafkaConsumer&lt;&gt;(props);\r\n    CompletableFuture.runAsync(\r\n      new VariableFetchSizeKafkaListener(topic, kafkaConsumer)\r\n    );\r\n    Thread.sleep(5_000L);\r\n}<\/code><\/pre>\n<p>Now, let&#8217;s run the test and inspect the logs to see how many records will be fetched in a single batch:<\/p>\n<pre><code class=\"language-markdown\">10:48:46.958 [ForkJoinPool.commonPool-worker-2] INFO  c.b.k.c.VariableFetchSizeKafkaListener - groupId: default_config, poll: #1, fetched: #300 records, offsets: 0 -&gt; 299\r\n<\/code><\/pre>\n<p>As we can notice, we fetched all the 300 records in a single batch because our messages are small. Both the key and body are short strings: the key is four characters, and the body is 16 characters long. That&#8217;s a total of 20 bytes, plus some extra for the record&#8217;s metadata. On the other hand, the default value for the maximum batch size is one mebibyte (1.024 x 1.024 bytes), or simply 1,048,576 bytes.<\/p>\n<h2 id=\"bd-configuring-maximum-partition-fetch-size\" data-id=\"configuring-maximum-partition-fetch-size\">4. Configuring Maximum Partition Fetch Size<\/h2>\n<div class=\"bd-anchor\" id=\"configuring-maximum-partition-fetch-size\"><\/div>\n<p><strong>The <em>&#8220;max.partition.fetch.bytes&#8221;<\/em> in Kafka determines the largest amount of data that a consumer can fetch from a single partition in a single request. <\/strong>Consequently, even for a small number of short messages, we can force our listeners to fetch the records in multiple batches by changing the property.<\/p>\n<p>To observe this, let&#8217;s create two more<em>VariableFetchSizeKafkaListener<\/em>s and configure them setting this property to only 500B, respectively 5KB. Firstly, let&#8217;s extract all the common consumer <em>Properties <\/em>in a dedicated\u00a0 method to avoid code duplication:<\/p>\n<pre><code class=\"language-java\">Properties commonConsumerProperties() {\r\n    Properties props = new Properties();\r\n    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());\r\n    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);\r\n    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\r\n    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\r\n    return props;\r\n}<\/code><\/pre>\n<p>Then, let&#8217;s create the first listener and run it asynchronously:<\/p>\n<pre><code class=\"language-java\">Properties fetchSize_500B = commonConsumerProperties();\r\nfetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;max_fetch_size_500B&quot;);\r\nfetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, &quot;500&quot;);\r\nCompletableFuture.runAsync(\r\n  new VariableFetchSizeKafkaListener(&quot;engine.sensors.temperature&quot;, new KafkaConsumer&lt;&gt;(fetchSize_500B))\r\n);<\/code><\/pre>\n<p>As we can see, we are setting different consumer group IDs for the various listeners. This will allow them to consume the same test data. Now, let&#8217;s proceed with the second listener and complete the test:<\/p>\n<pre><code class=\"language-java\">@Test\r\nvoid whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {\r\n    String topic = &quot;engine.sensors.temperature&quot;;\r\n    publishTestData(300, topic);\r\n    \r\n    Properties fetchSize_500B = commonConsumerProperties();\r\n    fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;max_fetch_size_500B&quot;);\r\n    fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, &quot;500&quot;);\r\n    CompletableFuture.runAsync(\r\n      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer&lt;&gt;(fetchSize_500B))\r\n    );\r\n    Properties fetchSize_5KB = commonConsumerProperties();\r\n    fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;max_fetch_size_5KB&quot;);\r\n    fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, &quot;5000&quot;);\r\n    CompletableFuture.runAsync(\r\n      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer&lt;&gt;(fetchSize_5KB))\r\n    );\r\n    Thread.sleep(10_000L);\r\n}<\/code><\/pre>\n<p>If we run this test, we can assume that the first consumer will fetch batches roughly ten times smaller than the second consumer. Let&#8217;s analyze the logs:<\/p>\n<pre><code class=\"language-Plaintext\">[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #1, fetched: #56 records, offsets: 0 -&gt; 55\r\n[worker-2] INFO - groupId: max_fetch_size_500B, poll: #1, fetched: #5 records, offsets: 0 -&gt; 4\r\n[worker-2] INFO - groupId: max_fetch_size_500B, poll: #2, fetched: #5 records, offsets: 5 -&gt; 9\r\n[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #2, fetched: #56 records, offsets: 56 -&gt; 111\r\n[worker-2] INFO - groupId: max_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -&gt; 14\r\n[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #3, fetched: #56 records, offsets: 112 -&gt; 167\r\n[worker-2] INFO - groupId: max_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -&gt; 19\r\n[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #4, fetched: #51 records, offsets: 168 -&gt; 218\r\n[worker-2] INFO - groupId: max_fetch_size_500B, poll: #5, fetched: #5 records, offsets: 20 -&gt; 24\r\n[...]<\/code><\/pre>\n<p>As expected, one of the listeners fetches, indeed, batches of data that are almost ten times larger than the other. <strong>Moreover, it&#8217;s important to understand that the number of records within a batch depends on the size of these records and their metadata. <\/strong>To highlight this, we can observe that the consumer with <em>groupId<\/em>\u00a0 &#8220;<em>max_fetch_size_5KB<\/em>&#8221; fetched fewer records when polling the fourth time.<\/p>\n<h2 id=\"bd-configuring-minimum-fetch-size\" data-id=\"configuring-minimum-fetch-size\">5. Configuring Minimum Fetch Size<\/h2>\n<div class=\"bd-anchor\" id=\"configuring-minimum-fetch-size\"><\/div>\n<p><strong>The <em>Consumer API<\/em> also allows customizing the minimum fetch size through the <em>&#8220;fetch.min.bytes&#8221;\u00a0<\/em>property. We can change this property to specify the minimum amount of data that a broker needs to respond. <\/strong>If this minimum value is not met, the broker waits longer before sending a response to the consumer&#8217;s fetch request. To emphasize this, we can add a delay to our test publisher within our test helper method. As a result, the producer will wait a specific number of milliseconds between sending each message:<\/p>\n<pre><code class=\"language-java\">@Test\r\nvoid whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {\r\n    String topic = &quot;engine.sensors.temperature&quot;;\r\n    publishTestData(300, topic, 100L);  \r\n    \/\/ ...\r\n}\r\nvoid publishTestData(int measurementsCount, String topic, long delayInMillis) {\r\n    \/\/ ...\r\n}<\/code><\/pre>\n<p>Let&#8217;s start by creating a <em>VariableFetchSizeKafkaListener <\/em>that will use the default configuration, having &#8220;<em>fetch.min.bytes<\/em>&#8221; equal to one byte. Similar to the previous examples, we&#8217;ll run this consumer asynchronously within a <em>CompletableFuture<\/em>:<\/p>\n<pre><code class=\"language-java\">\/\/ fetch.min.bytes = 1 byte (default)\r\nProperties minFetchSize_1B = commonConsumerProperties();\r\nminFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;min_fetch_size_1B&quot;);\r\nCompletableFuture.runAsync(\r\n  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer&lt;&gt;(minFetchSize_1B))\r\n);<\/code><\/pre>\n<p><strong>With this setup, and due to the delay we introduced, we can expect each record to be retrieved individually, one after the other. <\/strong>In other words, we can expect many batches of a single record. Also, we expect these batches to be consumed at a similar speed as our <em>KafkaProducer<\/em> publishes the data, which in our case is every 100 milliseconds. Let&#8217;s run the test and analyze the logs:<\/p>\n<pre><code class=\"language-plaintext\">14:23:22.368 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #1, fetched: #1 records, offsets: 0 -&gt; 0\r\n14:23:22.472 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #2, fetched: #1 records, offsets: 1 -&gt; 1\r\n14:23:22.582 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #3, fetched: #1 records, offsets: 2 -&gt; 2\r\n14:23:22.689 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #4, fetched: #1 records, offsets: 3 -&gt; 3\r\n[...]<\/code><\/pre>\n<p>Moreover, we can force the consumer to wait for more data to accumulate by adjusting the &#8220;<em>fetch.min.bytes<\/em>&#8221; value to a larger size:<\/p>\n<pre><code class=\"language-java\">\/\/ fetch.min.bytes = 500 bytes\r\nProperties minFetchSize_500B = commonConsumerProperties();\r\nminFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, &quot;mim_fetch_size_500B&quot;);\r\nminFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, &quot;500&quot;);\r\nCompletableFuture.runAsync(\r\n  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer&lt;&gt;(minFetchSize_500B))\r\n);<\/code><\/pre>\n<p>With the property set to 500 bytes, we can anticipate the consumer to wait longer and fetch more data. Let&#8217;s run this example as well and observe the outcomes:<\/p>\n<pre><code class=\"language-plaintext\">14:24:49.303 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #1, fetched: #6 records, offsets: 0 -&gt; 5\r\n14:24:49.812 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #2, fetched: #4 records, offsets: 6 -&gt; 9\r\n14:24:50.315 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -&gt; 14\r\n14:24:50.819 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -&gt; 19\r\n[...]<\/code><\/pre>\n<h2 id=\"bd-conclusion\" data-id=\"conclusion\">6. Conclusion<\/h2>\n<div class=\"bd-anchor\" id=\"conclusion\"><\/div>\n<p>In this article, we discussed the way <em>KafkaConsumer<\/em>s are fetching data from the broker. We learned that, by default, the consumer will fetch data if there is at least one new record. On the other hand, if the new data from the partition exceeds 1,048,576 bytes, it will be split into multiple batches of that maximum size. We discovered that customizing the &#8220;<em>fetch.min.bytes<\/em>&#8221; and &#8220;<em>max.partition.fetch.bytes<\/em>&#8221; properties allows us to tailor Kafka&#8217;s behavior to suit our specific requirements.<\/p>\n<p>As always, the source for the examples is available\u00a0<a href=\"https:\/\/feeds.feedblitz.com\/~\/t\/0\/0\/baeldung\/~https:\/\/github.com\/eugenp\/tutorials\/tree\/master\/apache-kafka-2\">over on GitHub<\/a>.<\/p>\n<p><Img align=\"left\" border=\"0\" height=\"1\" width=\"1\" alt=\"\" style=\"border:0;float:left;margin:0;padding:0;width:1px!important;height:1px!important;\" hspace=\"0\" src=\"https:\/\/feeds.feedblitz.com\/~\/i\/859407251\/0\/baeldung\"><\/p>\n<div style=\"clear:both;padding-top:0.2em;\"><a title=\"Like on Facebook\" href=\"https:\/\/feeds.feedblitz.com\/_\/28\/859407251\/baeldung\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/fblike20.png\" style=\"border:0;margin:0;padding:0;\"><\/a>&#160;<a title=\"Pin it!\" href=\"https:\/\/feeds.feedblitz.com\/_\/29\/859407251\/baeldung,https%3A%2F%2Fwww.baeldung.com%2Fwp-content%2Fuploads%2F2021%2F09%2FJava-5-Featured-1024x536.png\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/pinterest20.png\" style=\"border:0;margin:0;padding:0;\"><\/a>&#160;<a title=\"Tweet This\" href=\"https:\/\/feeds.feedblitz.com\/_\/24\/859407251\/baeldung\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/twitter20.png\" style=\"border:0;margin:0;padding:0;\"><\/a>&#160;<a title=\"Subscribe by email\" href=\"https:\/\/feeds.feedblitz.com\/_\/19\/859407251\/baeldung\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/email20.png\" style=\"border:0;margin:0;padding:0;\"><\/a>&#160;<a title=\"Subscribe by RSS\" href=\"https:\/\/feeds.feedblitz.com\/_\/20\/859407251\/baeldung\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/rss20.png\" style=\"border:0;margin:0;padding:0;\"><\/a>&#160;<a rel=\"NOFOLLOW\" title=\"View Comments\" href=\"https:\/\/www.baeldung.com\/kafka-read-multiple-messages#respond\"><img decoding=\"async\" height=\"20\" style=\"border:0;margin:0;padding:0;\" src=\"https:\/\/assets.feedblitz.com\/i\/comments20.png\"><\/a>&#160;<a title=\"Follow Comments via RSS\" href=\"https:\/\/www.baeldung.com\/kafka-read-multiple-messages\/feed\"><img decoding=\"async\" height=\"20\" style=\"border:0;margin:0;padding:0;\" src=\"https:\/\/assets.feedblitz.com\/i\/commentsrss20.png\"><\/a>&#160;<\/div>\n\n<h2><b>Commercials Cooperation Advertisements:<\/b><\/h2>\r\n<p><br>(1) IT Teacher IT Freelance<br> <\/p>\r\n<a href=https:\/\/itteacheritfreelance.hk\/wordpress><img src=http:\/\/gamefootballmobileanimeiphone.com\/wp-content\/uploads\/2023\/09\/ITTeacherITFreelance-Website.png alt=IT\u96fb\u8166\u88dc\u7fd2 java\u88dc\u7fd2 \u70ba\u5927\u5bb6\u914d\u5c0d\u96fb\u8166\u88dc\u7fd2,IT freelance, \u79c1\u4eba\u8001\u5e2b, PHP\u88dc\u7fd2,CSS\u88dc\u7fd2,XML,Java\u88dc\u7fd2,MySQL\u88dc\u7fd2,graphic design\u88dc\u7fd2,\u4e2d\u5c0f\u5b78ICT\u88dc\u7fd2,\u4e00\u5c0d\u4e00\u79c1\u4eba\u88dc\u7fd2\u548cFreelance\u81ea\u7531\u5de5\u4f5c\u914d\u5c0d\u3002\/><\/a><p><a href=https:\/\/itteacheritfreelance.hk\/wordpress\/index.php\/findteacher>\u7acb\u523b\u8a3b\u518a\u53ca\u5831\u540d\u96fb\u8166\u88dc\u7fd2\u8ab2\u7a0b\u5427! <\/a><br>\r\n\r\n\u7535\u5b50\u8ba1\u7b97\u673a -\u6559\u80b2 -IT \u96fb\u8166\u73ed\u201d ( IT\u96fb\u8166\u88dc\u7fd2 ) \u63d0\u4f9b\u4e00\u500b\u65b9\u4fbf\u7684\u7535\u5b50\u8ba1\u7b97\u673a \u6559\u80b2\u5e73\u53f0, \u70ba\u5927\u5bb6\u914d\u5c0d\u4fe1\u606f\u6280\u672f, \u96fb\u8166 \u8001\u5e2b, IT freelance \u548c programming expert. \u8b93\u5927\u5bb6\u65b9\u4fbf\u5730\u5c31\u80fd\u627e\u5230\u5408\u9069\u7684\u96fb\u8166\u88dc\u7fd2, \u96fb\u8166\u73ed, \u5bb6\u6559, \u79c1\u4eba\u8001\u5e2b.  <br>\r\n\r\nWe are a education and information platform which you can find a IT private tutorial teacher or freelance. <br>\r\n\r\nAlso we provide different information about information technology, Computer, programming, mobile, Android, apple, game, movie, anime, animation\u2026 \r\n<\/p>\n<p><br>(2) ITSec<br> <\/p><a href=https:\/\/itsec.vip><img src=http:\/\/gamefootballmobileanimeiphone.com\/wp-content\/uploads\/2023\/09\/ITSec-Main-Promotion-Image.png alt= https:\/\/itsec.vip\/\r\nSecure Your Computers from Cyber Threats and mitigate risks with professional services to defend Hackers.  \r\nITSec provide IT Security and Compliance Services, including IT Compliance Services, Risk Assessment, IT Audit, Security Assessment and Audit, ISO 27001 Consulting and Certification, GDPR Compliance Services, Privacy Impact Assessment (PIA), Penetration test, Ethical Hacking, Vulnerabilities scan, IT Consulting, Data Privacy Consulting, Data Protection Services, Information Security Consulting, Cyber Security Consulting, Network Security Audit, Security Awareness Training.\/><\/a> \r\n<br><br> \r\n<p><a href=https:\/\/itsec.vip>www.ITSec.vip<\/a> <br> <br> \r\n<p><a href=https:\/\/sraa.com.hk>www.Sraa.com.hk<\/a> <br> <br> \r\n<p><a href=https:\/\/itsec.hk>www.ITSec.hk<\/a> <br> <br> \r\n<p><a href=https:\/\/penetrationtest.hk>www.Penetrationtest.hk<\/a> <br> <br> \r\n<p><a href=https:\/\/itseceu.uk>www.ITSeceu.uk<\/a> <br> <br> \r\nSecure Your Computers from Cyber Threats and mitigate risks with professional services to defend Hackers. <br><br>\r\nITSec provide IT Security and Compliance Services, including IT Compliance Services, Risk Assessment, IT Audit, Security Assessment and Audit, ISO 27001 Consulting and Certification, GDPR Compliance Services, Privacy Impact Assessment (PIA), Penetration test, Ethical Hacking, Vulnerabilities scan, IT Consulting, Data Privacy Consulting, Data Protection Services, Information Security Consulting, Cyber Security Consulting, Network Security Audit, Security Awareness Training. \r\n<br><br>Contact us right away. <br><br>Email (Prefer using email to contact us): <br>SalesExecutive@ITSec.vip<\/p>","protected":false},"excerpt":{"rendered":"<p><img decoding=\"async\" src=\"https:\/\/www.baeldung.com\/wp-content\/uploads\/2021\/09\/Java-5-Featured-1024x536.png\" class=\"webfeedsFeaturedVisual wp-post-image\" alt=\"\"><\/p>\n<p>Explore how the Kafka Consumer retrieves messages from the broker.<\/p>\n<div><a title=\"Like on Facebook\" href=\"https:\/\/feeds.feedblitz.com\/_\/28\/859407251\/baeldung\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/fblike20.png\"><\/a>\u00a0<a title=\"Pin it!\" href=\"https:\/\/feeds.feedblitz.com\/_\/29\/859407251\/baeldung,https%3A%2F%2Fwww.baeldung.com%2Fwp-content%2Fuploads%2F2021%2F09%2FJava-5-Featured-1024x536.png\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/pinterest20.png\"><\/a>\u00a0<a title=\"Tweet This\" href=\"https:\/\/feeds.feedblitz.com\/_\/24\/859407251\/baeldung\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/twitter20.png\"><\/a>\u00a0<a title=\"Subscribe by email\" href=\"https:\/\/feeds.feedblitz.com\/_\/19\/859407251\/baeldung\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/email20.png\"><\/a>\u00a0<a title=\"Subscribe by RSS\" href=\"https:\/\/feeds.feedblitz.com\/_\/20\/859407251\/baeldung\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/rss20.png\"><\/a>\u00a0<a rel=\"NOFOLLOW\" title=\"View Comments\" href=\"https:\/\/www.baeldung.com\/kafka-read-multiple-messages#respond\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/comments20.png\"><\/a>\u00a0<a title=\"Follow Comments via RSS\" href=\"https:\/\/www.baeldung.com\/kafka-read-multiple-messages\/feed\"><img decoding=\"async\" height=\"20\" src=\"https:\/\/assets.feedblitz.com\/i\/commentsrss20.png\"><\/a>\u00a0<\/div>\n","protected":false},"author":865,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_monsterinsights_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"site-container-style":"default","site-container-layout":"default","site-sidebar-layout":"default","disable-article-header":"default","disable-site-header":"default","disable-site-footer":"default","disable-content-area-spacing":"default","footnotes":""},"categories":[22],"tags":[61,122,127,129,124,128,125,132,131,133,126,130,123,66,94,88,97,56,64,65,60,112,40,75,95,104,33,120,105,101,98,115,30,29,41,86,70,69,68,72,71,26,118,108,87,46,55,48,52,54,51,50,83,62,58,57,109,35,59,63,85,79,82,96,80,27,81,114,44,42,43,45,38,39,110,117,100,111,116,73,89,90,92,91,93,84,78,37,102,34,36,77,67,74,99,113,119,28,121,32,47,49,53,103,31,76],"class_list":["post-74600","post","type-post","status-publish","format-standard","hentry","category-mobile","tag-airpods","tag-anime","tag-anime-characters","tag-anime-cosplay","tag-anime-edits","tag-anime-merchandise","tag-anime-movies","tag-anime-news","tag-anime-recommendations","tag-anime-reviews","tag-anime-series","tag-anime-streaming","tag-animes","tag-app-store","tag-app-store-samsung","tag-appgallery","tag-appgallery-oneplus","tag-apple","tag-apple-music","tag-apple-tv","tag-apple-watch","tag-bbc-sport","tag-best-mobile-games","tag-bixby","tag-bixby-xiaomi","tag-champions-league","tag-cyberpunk","tag-cyberpunk-2077","tag-fantasy-football","tag-fifa","tag-football","tag-formula-1","tag-fortnite","tag-free-fire","tag-free-mobile-games","tag-freebuds-pro","tag-galaxy-a52","tag-galaxy-note-20","tag-galaxy-s21","tag-galaxy-watch-4","tag-galaxy-z-fold-3","tag-game","tag-games","tag-golf","tag-harmonyos","tag-how-to-backup-iphone","tag-how-to-factory-reset-iphone","tag-how-to-reset-iphone","tag-how-to-restore-iphone","tag-how-to-unlock-iphone","tag-how-to-unlock-iphone-5","tag-how-to-unlock-iphone-6","tag-huawei","tag-ios","tag-ipad","tag-iphone","tag-live-soccer","tag-lol","tag-macbook","tag-macos","tag-mate-40-pro","tag-mi-11-lite","tag-mi-home-security-camera-basic-1080p","tag-mi-home-security-camera-basic-1080p-huawei","tag-mi-smart-band-6","tag-minecraft","tag-miui","tag-mlb-scores","tag-mobile-game-design","tag-mobile-game-development","tag-mobile-game-marketing","tag-mobile-game-monetization","tag-mobile-games","tag-mobile-gaming","tag-nba-scores","tag-nba-standings","tag-nfl","tag-nfl-scores","tag-nhl-scores","tag-one-ui","tag-oneplus","tag-oneplus-9-pro","tag-oneplus-buds-pro","tag-oneplus-nord-ce-5g","tag-oxygenos","tag-p40-pro-plus","tag-poco-x3-pro","tag-pokemon","tag-premier-league","tag-pubg","tag-pubg-mobile","tag-redmi-note-10-pro","tag-samsung","tag-samsung-pay","tag-soccer","tag-sports","tag-steam","tag-steeam","tag-top-10-anime","tag-valorant","tag-when-do-the-iphone-7-come-out","tag-when-does-the-iphone-7-come-out","tag-when-is-the-iphone-7-coming-out","tag-world-cup","tag-xbox-series-x","tag-xiaomi"],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/posts\/74600","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/users\/865"}],"replies":[{"embeddable":true,"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/comments?post=74600"}],"version-history":[{"count":0,"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/posts\/74600\/revisions"}],"wp:attachment":[{"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/media?parent=74600"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/categories?post=74600"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/gamefootballmobileanimeiphone.com\/index.php\/wp-json\/wp\/v2\/tags?post=74600"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}