Spark Streaming brings Apache Spark's language integrated APIs to write streaming jobs the same way as for writing batch jobs. It allows to build fault tolerant applications and reuse the same code for batch and interactive queries.
Kafka is an Open Source message broker written in Scala. It is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant and wicked fast.
Cassandra is an Open Source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure.
This post walks through the basics of the implementation of a simple streaming application integrating those three technologies. The code example is written in Scala. The releases I am referring to in this post are the following:
- Scala 2.11.8
- Spark 1.6.2
- Kafka Client APIs 0.8.2.11
- Cassandra 3.9
- Datastax Spark-Cassandra Connector compatible with Spark 1.6.2
This is the list of dependencies to be added to the project POM file (supposing you're using Maven to build your project, but you know how to do the same for different build tools like sbt or Gradle):
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>1.6.2</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
In order to understand the integration between Spark and Kafka let's start from the code of DirectKafkaWordCount, one of the Scala examples coming with any Spark distribution.
The first thing to do is to create the Spark job configuration setting up the connection details (hostname (or IP address) and listening port) to the Cassandra database:
val sparkConf = new SparkConf()
.setAppName("KakfaStreamToCassandra").setMaster("local[*]")
.set("spark.cassandra.connection.host", cassandra_host)
.set("spark.cassandra.connection.port", cassandra_port)
Then we can create the StreamingContext using that configuration:
val ssc = new StreamingContext(sparkConf, Seconds(5))
We can now create a direct Kafka stream starting from a list of brokers and topics:
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
Now let's apply some transformations to the incoming messages to get the lines, split them into single words and count the words:
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
The chosen driver to Cassandra is the Spark-Cassandra connector provided by Datastax. It lets to expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables and execute arbitrary CQL queries in Spark applications.
We need to enable the specific Cassandra functions for the DStream wordCounts
import com.datastax.spark.connector.streaming._
so we can save the results to the database:
wordCounts.saveToCassandra(keysspace, table, SomeColumns("word", "count"))
Finally let's start the job execution
ssc.start()
and keep it alive until a termination signal arrives
ssc.awaitTermination()
In order to execute the example you need to setup a standalone Kafka cluster, create a test topic, implement and run a simple Kafka producer (as described in this old post of mines), and then create a test keyspace and a table in Cassandra:
CREATE KEYSPACE IF NOT EXISTS KafkaWordCount WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
CREATE TABLE IF NOT EXISTS KafkaWordCount.wordcounts (word text PRIMARY KEY, count text);
This post describes only the very basics to start integration of these three technologies. Much more needs to be considered when implementing a streaming pipeline, but the goal here is to give you a good idea on how to start. Let's discover more details (and issues) in future posts.
Comments
Post a Comment