In part 3 of this series we have implemented a simple Kafka producer using the provided Java APIs. Now let's learn how to implement a consumer for the produced messages. The Kafka release I am referring to is always the 0.9.0.1.
Same as for the producer, I suggest to use Maven: this way you have to add only one direct dependency through the POM file:
Then create a new class called, for example, SimpleKafkaConsumer:
It implements the java.lang.Runnable interface and has a single instance variable, consumer, which type is org.apache.kafka.clients.consumer.KafkaConsumer<K,V>. It is the client that consumes records from a Kafka cluster. It isn't thread-safe.
In the initialization method of the class you have to setup the properties required for the consumer configuration. Here I am reporting a small list including the mandatory ones only:
Here's the meaning of the properties used in the example code above (a future post of this series will walk through almost all of the available properties to configure a consumer):
And finally you can implement a method to consume messages subscribing to one or more topics:
This is of course a very basic implementation: once the application consumes the messages you can do whatever you want with them.
In this example the subscribeToTopics() method is called in the java.lang.Runnable.run() method implementation:
Same as for the producer, I suggest to use Maven: this way you have to add only one direct dependency through the POM file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
Then create a new class called, for example, SimpleKafkaConsumer:
public class SimpleKafkaConsumer implements Runnable {
KafkaConsumer<String, String> consumer;
It implements the java.lang.Runnable interface and has a single instance variable, consumer, which type is org.apache.kafka.clients.consumer.KafkaConsumer<K,V>. It is the client that consumes records from a Kafka cluster. It isn't thread-safe.
In the initialization method of the class you have to setup the properties required for the consumer configuration. Here I am reporting a small list including the mandatory ones only:
Properties props = new Properties();
props.put("bootstrap.servers", "bootstrap1.blogspot.com:9092, bootstrap2.blogspot.com:9092");
props.put("group.id", "ktesting");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Here's the meaning of the properties used in the example code above (a future post of this series will walk through almost all of the available properties to configure a consumer):
- bootstrap.servers: a list of comma separated host/port pairs to use for establishing the initial connection to the Kafka cluster. No need to specify all of the nodes of the cluster. You need to add more than one just in case the first one in the list should be down at connection time.
- group.id: uniquely identifies the group of consumer processes to which this consumer belongs.
- enable.auto.commit: if true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
- auto.commit.intervals.ms: the frequency (in milliseconds) that the consumer offsets are committed to zookeeper. It is used only when enable.auto.commit is true.
- session.timeout.ms: the timeout (in milliseconds) used to detect failures.
- key.deserializer: defines the class for a message key that implements the org.apache.kafka.common.serialization.Serializer interface. In the code example above we used the org.apache.kafka.common.serialization.StringSerializer class.
- value.deserializer: defines the class for a message value that implements the org.apache.kafka.common.serialization.Serializer
interface. In the code example above we used the org.apache.kafka.common.serialization.StringSerializer class.
consumer = new KafkaConsumer<String, String>(props);
And finally you can implement a method to consume messages subscribing to one or more topics:
public void subscribeToTopics() {
consumer.subscribe(Arrays.asList("kafkatesting"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
// Do something with the messages
}
}
This is of course a very basic implementation: once the application consumes the messages you can do whatever you want with them.
In this example the subscribeToTopics() method is called in the java.lang.Runnable.run() method implementation:
public void run() {
subscribeToTopics();
}
Comments
Post a Comment