Skip to main content

The Kafka Series (part 4): implementing a consumer in Java

In part 3 of this series we have implemented a simple Kafka producer using the provided Java APIs. Now let's learn how to implement a consumer for the produced messages. The Kafka release I am referring to is always the 0.9.0.1.
Same as for the producer, I suggest to use Maven: this way you have to add only one direct dependency through the POM file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>


Then create a new class called, for example, SimpleKafkaConsumer:

public class SimpleKafkaConsumer implements Runnable {
    KafkaConsumer<String, String> consumer;


It implements the java.lang.Runnable interface and has a single instance variable, consumer, which type is org.apache.kafka.clients.consumer.KafkaConsumer<K,V>. It is the client that consumes records from a Kafka cluster. It isn't thread-safe.
In the initialization method of the class you have to setup the properties required for the consumer configuration. Here I am reporting a small list including the mandatory ones only:

Properties props = new Properties();
props.put("bootstrap.servers", "bootstrap1.blogspot.com:9092, bootstrap2.blogspot.com:9092");
props.put("group.id", "ktesting");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


Here's the meaning of the properties used in the example code above (a future post of this series will walk through almost all of the available properties to configure a consumer):

  • bootstrap.servers: a list of comma separated host/port pairs to use for establishing the initial connection to the Kafka cluster. No need to specify all of the nodes of the cluster. You need to add more than one just in case the first one in the list should be down at connection time.
  • group.id: uniquely identifies the group of consumer processes to which this consumer belongs.
  • enable.auto.commit: if true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
  • auto.commit.intervals.ms: the frequency (in milliseconds) that the consumer offsets are committed to zookeeper. It is used only when enable.auto.commit is true.
  • session.timeout.ms: the timeout (in milliseconds) used to detect failures.
  • key.deserializer: defines the class for a message key that implements the org.apache.kafka.common.serialization.Serializer interface. In the code example above we used the org.apache.kafka.common.serialization.StringSerializer class.
  • value.deserializer: defines the class for a message value that implements the org.apache.kafka.common.serialization.Serializer  interface. In the code example above we used the org.apache.kafka.common.serialization.StringSerializer class.
Now you can create an instance of the KafkaConsumer client using the settings above:

consumer = new KafkaConsumer<String, String>(props);

And finally you can implement a method to consume messages subscribing to one or more topics:

public void subscribeToTopics() {
    consumer.subscribe(Arrays.asList("kafkatesting"));
    while(true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for(ConsumerRecord<String, String> record : records)
              System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());

         // Do something with the messages
    }
}


This is of course a very basic implementation: once the application consumes the messages you can do whatever you want with them.
In this example the subscribeToTopics() method is called in the java.lang.Runnable.run() method implementation:

public void run() {
    subscribeToTopics();
}


What's next?

In the next part of this series we will have a detailed look at all of the available configuration properties for a producer.

Comments

Popular posts from this blog

Streamsets Data Collector log shipping and analysis using ElasticSearch, Kibana and... the Streamsets Data Collector

One common use case scenario for the Streamsets Data Collector (SDC) is the log shipping to some system, like ElasticSearch, for real-time analysis. To build a pipeline for this particular purpose in SDC is really simple and fast and doesn't require coding at all. For this quick tutorial I will use the SDC logs as example. The log data will be shipped to Elasticsearch and then visualized through a Kibana dashboard. Basic knowledge of SDC, Elasticsearch and Kibana is required for a better understanding of this post. These are the releases I am referring to for each system involved in this tutorial: JDK 8 Streamsets Data Collector 1.4.0 ElasticSearch 2.3.3 Kibana 4.5.1 Elasticsearch and Kibana installation You should have your Elasticsearch cluster installed and configured and a Kibana instance pointing to that cluster in order to go on with this tutorial. Please refer to the official documentation for these two products in order to complete their installation (if you do

Exporting InfluxDB data to a CVS file

Sometimes you would need to export a sample of the data from an InfluxDB table to a CSV file (for example to allow a data scientist to do some offline analysis using a tool like Jupyter, Zeppelin or Spark Notebook). It is possible to perform this operation through the influx command line client. This is the general syntax: sudo /usr/bin/influx -database '<database_name>' -host '<hostname>' -username '<username>'  -password '<password>' -execute 'select_statement' -format '<format>' > <file_path>/<file_name>.csv where the format could be csv , json or column . Example: sudo /usr/bin/influx -database 'telegraf' -host 'localhost' -username 'admin'  -password '123456789' -execute 'select * from mem' -format 'csv' > /home/googlielmo/influxdb-export/mem-export.csv

Using Rapids cuDF in a Colab notebook

During last Spark+AI Summit Europe 2019 I had a chance to attend a talk from Miguel Martinez  who was presenting Rapids , the new Open Source framework from NVIDIA for GPU accelerated end-to-end Data Science and Analytics. Fig. 1 - Overview of the Rapids eco-system Rapids is a suite of Open Source libraries: cuDF cuML cuGraph cuXFilter I enjoied the presentation and liked the idea of this initiative, so I wanted to start playing with the Rapids libraries in Python on Colab , starting from cuDF, but the first attempt came with an issue that I eventually solved. So in this post I am going to share how I fixed it, with the hope it would be useful to someone else running into the same blocker. I am assuming here you are already familiar with Google Colab. I am using Python 3.x as Python 2 isn't supported by Rapids. Once you have created a new notebook in Colab, you need to check if the runtime for it is set to use Python 3 and uses a GPU as hardware accelerator. You