Skip to main content

Integrating Kakfa, Spark Streaming and Cassandra: the basics

Spark Streaming brings Apache Spark's language integrated APIs to write streaming jobs the same way as for writing batch jobs. It allows to build fault tolerant applications and reuse the same code for batch and interactive queries.
Kafka is an Open Source message broker written in Scala. It is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant and wicked fast.
Cassandra is an Open Source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure.
This post walks through the basics of the implementation of a simple streaming application integrating those three technologies. The code example is written in Scala. The releases I am referring to in this post are the following:
  •  Scala 2.11.8
  •  Spark 1.6.2
  •  Kafka Client APIs 0.8.2.11
  •  Cassandra 3.9
  •  Datastax Spark-Cassandra Connector compatible with Spark 1.6.2
This is the list of dependencies to be added to the project POM file (supposing you're using Maven to build your project, but you know how to do the same for different build tools like sbt or Gradle):

  <properties>
       <scala.version>2.11.8</scala.version>
       <spark.version>1.6.2</spark.version>
  </properties>

  <dependencies>
     <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
     </dependency>
    
     <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_2.11</artifactId>
         <version>${spark.version}</version>
     </dependency>
   
     <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-sql_2.11</artifactId>
         <version>${spark.version}</version>
     </dependency>
   
     <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-streaming_2.11</artifactId>
         <version>${spark.version}</version>
     </dependency>
 
     <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.11</artifactId>
        <version>${spark.version}</version>
     </dependency>
 
     <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>${spark.version}</version>
     </dependency>
  </dependencies>

In order to understand the integration between Spark and Kafka let's start from the code of DirectKafkaWordCount, one of the Scala examples coming with any Spark distribution.
The first thing to do is to create the Spark job configuration setting up the connection details (hostname (or IP address) and listening port) to the Cassandra database:

 val sparkConf = new SparkConf()
     .setAppName("KakfaStreamToCassandra").setMaster("local[*]")
     .set("spark.cassandra.connection.host", cassandra_host)
     .set("spark.cassandra.connection.port", cassandra_port)

Then we can create the StreamingContext using that configuration:

 val ssc = new StreamingContext(sparkConf, Seconds(5))

We can now create a direct Kafka stream starting from a list of brokers and topics:

 val topicsSet = topics.split(",").toSet
 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

Now let's apply some transformations to the incoming messages to get the lines, split them into single words and count the words:

 val lines = messages.map(_._2)

 val words = lines.flatMap(_.split(" "))

 val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

The chosen driver to Cassandra is the Spark-Cassandra connector provided by Datastax. It lets to expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables and execute arbitrary CQL queries in Spark applications.
We need to enable the specific Cassandra functions for the DStream wordCounts

 import com.datastax.spark.connector.streaming._

so we can save the results to the database:

 wordCounts.saveToCassandra(keysspace, table, SomeColumns("word", "count"))

Finally let's start the job execution

 ssc.start()

and keep it alive until a termination signal arrives

 ssc.awaitTermination()

 
In order to execute the example you need to setup a standalone Kafka cluster, create a test topic, implement and run a simple Kafka producer (as described in this old post of mines), and then create a test keyspace and a table in Cassandra:


 CREATE KEYSPACE IF NOT EXISTS KafkaWordCount WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};

  CREATE TABLE IF NOT EXISTS KafkaWordCount.wordcounts (word text PRIMARY KEY, count text);

 
This post describes only the very basics to start integration of these three technologies. Much more needs to be considered when implementing a streaming pipeline, but the goal here is to give you a good idea on how to start. Let's discover more details (and issues) in future posts.

Comments

Popular posts from this blog

Streamsets Data Collector log shipping and analysis using ElasticSearch, Kibana and... the Streamsets Data Collector

One common use case scenario for the Streamsets Data Collector (SDC) is the log shipping to some system, like ElasticSearch, for real-time analysis. To build a pipeline for this particular purpose in SDC is really simple and fast and doesn't require coding at all. For this quick tutorial I will use the SDC logs as example. The log data will be shipped to Elasticsearch and then visualized through a Kibana dashboard. Basic knowledge of SDC, Elasticsearch and Kibana is required for a better understanding of this post. These are the releases I am referring to for each system involved in this tutorial: JDK 8 Streamsets Data Collector 1.4.0 ElasticSearch 2.3.3 Kibana 4.5.1 Elasticsearch and Kibana installation You should have your Elasticsearch cluster installed and configured and a Kibana instance pointing to that cluster in order to go on with this tutorial. Please refer to the official documentation for these two products in order to complete their installation (if you do

Exporting InfluxDB data to a CVS file

Sometimes you would need to export a sample of the data from an InfluxDB table to a CSV file (for example to allow a data scientist to do some offline analysis using a tool like Jupyter, Zeppelin or Spark Notebook). It is possible to perform this operation through the influx command line client. This is the general syntax: sudo /usr/bin/influx -database '<database_name>' -host '<hostname>' -username '<username>'  -password '<password>' -execute 'select_statement' -format '<format>' > <file_path>/<file_name>.csv where the format could be csv , json or column . Example: sudo /usr/bin/influx -database 'telegraf' -host 'localhost' -username 'admin'  -password '123456789' -execute 'select * from mem' -format 'csv' > /home/googlielmo/influxdb-export/mem-export.csv

Using Rapids cuDF in a Colab notebook

During last Spark+AI Summit Europe 2019 I had a chance to attend a talk from Miguel Martinez  who was presenting Rapids , the new Open Source framework from NVIDIA for GPU accelerated end-to-end Data Science and Analytics. Fig. 1 - Overview of the Rapids eco-system Rapids is a suite of Open Source libraries: cuDF cuML cuGraph cuXFilter I enjoied the presentation and liked the idea of this initiative, so I wanted to start playing with the Rapids libraries in Python on Colab , starting from cuDF, but the first attempt came with an issue that I eventually solved. So in this post I am going to share how I fixed it, with the hope it would be useful to someone else running into the same blocker. I am assuming here you are already familiar with Google Colab. I am using Python 3.x as Python 2 isn't supported by Rapids. Once you have created a new notebook in Colab, you need to check if the runtime for it is set to use Python 3 and uses a GPU as hardware accelerator. You