In part 2 of this series we learned how to set up a Kafka single-node single-broker cluster. Before moving to more complex cluster configurations let's understand how to implement a consumer using the Kafka Java APIs. The Kafka release I am referring to is always the 0.9.0.1.
I suggest to use Maven for any producer you would need to implement: this way you have to add only one direct dependency through the POM file:
Create a new class called SimpleKakfaProducer:
and add the two instance variables above. org.apache.kafka.clients.producer.ProducerConfig is the configuration class for a Kakfa producer. org.apache.kafka.clients.producer.KafkaProducer<K,V> is the client that publishes to the Kafka topics. This class is thread safe, so a best practice in terms of performance in a real application is to share a single instance of a producer to all of the threads.
In the initialization method of the class you have to setup the properties required for the producer configuration. Here I am reporting a small list including the mandatory ones only:
One of the next posts of this series will walk through almost all of the available properties for a producer. Here's an explanation of the ones used in the example code above:
Once you have set the required properties you can create the producer:
Finally you can implement a method to publish messages:
Don't forget to close the connection to the broker when destroying the producer:
I suggest to use Maven for any producer you would need to implement: this way you have to add only one direct dependency through the POM file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
</dependency>
Create a new class called SimpleKakfaProducer:
public class SimpleKakfaProducer {
private ProducerConfig config;
private KafkaProducer<String, String> producer;
and add the two instance variables above. org.apache.kafka.clients.producer.ProducerConfig is the configuration class for a Kakfa producer. org.apache.kafka.clients.producer.KafkaProducer<K,V> is the client that publishes to the Kafka topics. This class is thread safe, so a best practice in terms of performance in a real application is to share a single instance of a producer to all of the threads.
In the initialization method of the class you have to setup the properties required for the producer configuration. Here I am reporting a small list including the mandatory ones only:
Properties props = new Properties();
props.put("bootstrap.servers", "bootstrap1.blogspot.com:9092, bootstrap2.blogspot.com:9092");
props.put("metadata.broker.list","bootstrap1.blogspot.com:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "1");
One of the next posts of this series will walk through almost all of the available properties for a producer. Here's an explanation of the ones used in the example code above:
- bootstrap.servers: a list of comma separated host/port pairs to use for establishing the initial connection to the Kafka cluster. No need to specify all of the nodes of the cluster. You need to add more than one just in case the first one in the list should be down at connection time.
- metadata.broker.list: a list of comma separated host/port pairs so that the producer can find one or more brokers to determine the leader for each topic. No need to specify the full set of brokers in the cluster. You need to include at least two in case the first in the list is not available.
- key.serializer: defines the class for a message key that implements the org.apache.kafka.common.serialization.Serializer interface. In the code example above we used the org.apache.kafka.common.serialization.StringSerializer class.
- value.serializer: defines the class for a message value that implements the org.apache.kafka.common.serialization.Serializer
interface. In the code example above we used the org.apache.kafka.common.serialization.StringSerializer class.
- acks: the number of acknowledgments the producer requires to have received before considering a request complete. Typical values are 0 (wait for no acknowledgment), 1 (the leader will write the record to its local log but will respond without awaiting full acknowledgement from all the followers), all (the leader will wait for the full set of in-sync replicas to acknowledge the record).
Once you have set the required properties you can create the producer:
producer = new KafkaProducer<String, String>(props);
Finally you can implement a method to publish messages:
public void sendMessage(String topic, String msg) {
// Create a KeyedMessage instance
KeyedMessage<String, String> data =
new KeyedMessage<String, String>(topic, msg);
// Publish the message
producer.send(
new ProducerRecord<String, String>(topic, "TestKey", msg));
}
Don't forget to close the connection to the broker when destroying the producer:
producer.close();
Comments
Post a Comment