Skip to main content

The Kafka Series (part 4): implementing a consumer in Java

In part 3 of this series we have implemented a simple Kafka producer using the provided Java APIs. Now let's learn how to implement a consumer for the produced messages. The Kafka release I am referring to is always the 0.9.0.1.
Same as for the producer, I suggest to use Maven: this way you have to add only one direct dependency through the POM file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>


Then create a new class called, for example, SimpleKafkaConsumer:

public class SimpleKafkaConsumer implements Runnable {
    KafkaConsumer<String, String> consumer;


It implements the java.lang.Runnable interface and has a single instance variable, consumer, which type is org.apache.kafka.clients.consumer.KafkaConsumer<K,V>. It is the client that consumes records from a Kafka cluster. It isn't thread-safe.
In the initialization method of the class you have to setup the properties required for the consumer configuration. Here I am reporting a small list including the mandatory ones only:

Properties props = new Properties();
props.put("bootstrap.servers", "bootstrap1.blogspot.com:9092, bootstrap2.blogspot.com:9092");
props.put("group.id", "ktesting");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


Here's the meaning of the properties used in the example code above (a future post of this series will walk through almost all of the available properties to configure a consumer):

  • bootstrap.servers: a list of comma separated host/port pairs to use for establishing the initial connection to the Kafka cluster. No need to specify all of the nodes of the cluster. You need to add more than one just in case the first one in the list should be down at connection time.
  • group.id: uniquely identifies the group of consumer processes to which this consumer belongs.
  • enable.auto.commit: if true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
  • auto.commit.intervals.ms: the frequency (in milliseconds) that the consumer offsets are committed to zookeeper. It is used only when enable.auto.commit is true.
  • session.timeout.ms: the timeout (in milliseconds) used to detect failures.
  • key.deserializer: defines the class for a message key that implements the org.apache.kafka.common.serialization.Serializer interface. In the code example above we used the org.apache.kafka.common.serialization.StringSerializer class.
  • value.deserializer: defines the class for a message value that implements the org.apache.kafka.common.serialization.Serializer  interface. In the code example above we used the org.apache.kafka.common.serialization.StringSerializer class.
Now you can create an instance of the KafkaConsumer client using the settings above:

consumer = new KafkaConsumer<String, String>(props);

And finally you can implement a method to consume messages subscribing to one or more topics:

public void subscribeToTopics() {
    consumer.subscribe(Arrays.asList("kafkatesting"));
    while(true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for(ConsumerRecord<String, String> record : records)
              System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());

         // Do something with the messages
    }
}


This is of course a very basic implementation: once the application consumes the messages you can do whatever you want with them.
In this example the subscribeToTopics() method is called in the java.lang.Runnable.run() method implementation:

public void run() {
    subscribeToTopics();
}


What's next?

In the next part of this series we will have a detailed look at all of the available configuration properties for a producer.

Comments

Popular posts from this blog

Turning Python Scripts into Working Web Apps Quickly with Streamlit

 I just realized that I am using Streamlit since almost one year now, posted about in Twitter or LinkedIn several times, but never wrote a blog post about it before. Communication in Data Science and Machine Learning is the key. Being able to showcase work in progress and share results with the business makes the difference. Verbal and non-verbal communication skills are important. Having some tool that could support you in this kind of conversation with a mixed audience that couldn't have a technical background or would like to hear in terms of results and business value would be of great help. I found that Streamlit fits well this scenario. Streamlit is an Open Source (Apache License 2.0) Python framework that turns data or ML scripts into shareable web apps in minutes (no kidding). Python only: no front‑end experience required. To start with Streamlit, just install it through pip (it is available in Anaconda too): pip install streamlit and you are ready to execute the working de...

Load testing MongoDB using JMeter

Apache JMeter ( http://jmeter.apache.org/ ) added support for MongoDB since its 2.10 release. In this post I am referring to the latest JMeter release (2.13). A preliminary JMeter setup is needed before starting your first test plan for MongoDB. It uses Groovy as scripting reference language, so Groovy needs to be set up for our favorite load testing tool. Follow these steps to complete the set up: Download Groovy from the official website ( http://www.groovy-lang.org/download.html ). In this post I am referring to the Groovy release 2.4.4, but using later versions is fine. Copy the groovy-all-2.4.4.jar to the $JMETER_HOME/lib folder. Restart JMeter if it was running while adding the Groovy JAR file. Now you can start creating a test plan for MongoDB load testing. From the UI select the MongoDB template ( File -> Templates... ). The new test plan has a MongoDB Source Config element. Here you have to setup the connection details for the database to be tested: The Threa...

Evaluating Pinpoint APM (Part 1)

I started a journey evaluating Open Source alternatives to commercial New Relic and AppDynamics tools to check if some is really ready to be used in a production environment. One cross-platform Application Performance Management (APM) tool that particularly caught my attention is Pinpoint . The current release supports mostly Java applications and JEE application servers and provides support also for the most popular OS and commercial relational databases. APIs are available to implement new plugins to support specific systems. Pinpoint has been modeled after Google Dapper and promises to install agents without changing a single line of code and mininal impact (about 3% increase in resource usage) on applications performance. Pinpoint is licensed under the Apache License, Version 2.0 . Architecture Pinpoint has three main components:  - The collector: it receives monitoring data from the profiled applications. It stores those information in HBase .  - The web UI: the f...