Skip to main content

Posts

Showing posts from April, 2016

Discovering Streamsets Data Collector (Part 2)

Before moving on to the other planned posts for this series, an important update about the Data Collector. Two new versions (1.3.0 and 1.3.1) that solve some critical bugs and introduce new features have been released since the first post publishing. This is a short list of the most significant benefits you get moving to the new releases: No issue running the Data Collector as for https://issues.streamsets.com/browse/SDC-2657 The workaround for this issue was to downgrade to the release 1.2.1.0, this way missing an important new feature like the Groovy Evaluator processor. The Hadoop FS and Local FS destination can now write files larger that 2 GB. A MongoDB destination is now available (up to release 1.2.2.0 a MongoDB database could have been set as origin only). Two new processors, Base64 Field Decoder and Base64 Field Encoder, have been implemented to work with Base64 binary data encoding/decoding. Enjoy it!

The Kafka Series (part 5): deleting topics

Before going further a quick post about topic deletion in Kafka (someone asked me about this). In part 2 of this series we created a topic called kafkatesting for testing purposes and to get familiar with the Java APIs to implement producers and consumers. When you're done with testing you will need to delete it. This could be done running the following command from a shell: $KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic kafkatesting Then if you check the list of existing topics for that cluster you could still see the topic there having the label "marked for deletion". This happens when you use the default properties file for Kafka or you didn't explicitly set to true the value of the delete.topic.enable property (the default value for it is false ) in your custom copy of that file. In order to make this configuration change effective you have to restart both Kafka and ZooKeeper.

The Kafka Series (part 4): implementing a consumer in Java

In part 3 of this series we have implemented a simple Kafka producer using the provided Java APIs. Now let's learn how to implement a consumer for the produced messages. The Kafka release I am referring to is always the 0.9.0.1. Same as for the producer, I suggest to use Maven: this way you have to add only one direct dependency through the POM file: <dependency>     <groupId>org.apache.kafka</groupId>     <artifactId>kafka-clients</artifactId>     <version>0.9.0.1</version> </dependency> Then create a new class called, for example, SimpleKafkaConsumer : public class SimpleKafkaConsumer implements Runnable {     KafkaConsumer<String, String> consumer; It implements the java.lang.Runnable interface and has a single instance variable, consumer , which type is org.apache.kafka.clients.consumer.KafkaConsumer<K,V> . It is the client that consumes records from a Kafka cluster. It isn't thread-safe. In the initia