Skip to main content

Integrating Kakfa, Spark Streaming and Cassandra: the basics

Spark Streaming brings Apache Spark's language integrated APIs to write streaming jobs the same way as for writing batch jobs. It allows to build fault tolerant applications and reuse the same code for batch and interactive queries.
Kafka is an Open Source message broker written in Scala. It is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant and wicked fast.
Cassandra is an Open Source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure.
This post walks through the basics of the implementation of a simple streaming application integrating those three technologies. The code example is written in Scala. The releases I am referring to in this post are the following:
  •  Scala 2.11.8
  •  Spark 1.6.2
  •  Kafka Client APIs 0.8.2.11
  •  Cassandra 3.9
  •  Datastax Spark-Cassandra Connector compatible with Spark 1.6.2
This is the list of dependencies to be added to the project POM file (supposing you're using Maven to build your project, but you know how to do the same for different build tools like sbt or Gradle):

  <properties>
       <scala.version>2.11.8</scala.version>
       <spark.version>1.6.2</spark.version>
  </properties>

  <dependencies>
     <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
     </dependency>
    
     <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_2.11</artifactId>
         <version>${spark.version}</version>
     </dependency>
   
     <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-sql_2.11</artifactId>
         <version>${spark.version}</version>
     </dependency>
   
     <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-streaming_2.11</artifactId>
         <version>${spark.version}</version>
     </dependency>
 
     <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.11</artifactId>
        <version>${spark.version}</version>
     </dependency>
 
     <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>${spark.version}</version>
     </dependency>
  </dependencies>

In order to understand the integration between Spark and Kafka let's start from the code of DirectKafkaWordCount, one of the Scala examples coming with any Spark distribution.
The first thing to do is to create the Spark job configuration setting up the connection details (hostname (or IP address) and listening port) to the Cassandra database:

 val sparkConf = new SparkConf()
     .setAppName("KakfaStreamToCassandra").setMaster("local[*]")
     .set("spark.cassandra.connection.host", cassandra_host)
     .set("spark.cassandra.connection.port", cassandra_port)

Then we can create the StreamingContext using that configuration:

 val ssc = new StreamingContext(sparkConf, Seconds(5))

We can now create a direct Kafka stream starting from a list of brokers and topics:

 val topicsSet = topics.split(",").toSet
 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

Now let's apply some transformations to the incoming messages to get the lines, split them into single words and count the words:

 val lines = messages.map(_._2)

 val words = lines.flatMap(_.split(" "))

 val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

The chosen driver to Cassandra is the Spark-Cassandra connector provided by Datastax. It lets to expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables and execute arbitrary CQL queries in Spark applications.
We need to enable the specific Cassandra functions for the DStream wordCounts

 import com.datastax.spark.connector.streaming._

so we can save the results to the database:

 wordCounts.saveToCassandra(keysspace, table, SomeColumns("word", "count"))

Finally let's start the job execution

 ssc.start()

and keep it alive until a termination signal arrives

 ssc.awaitTermination()

 
In order to execute the example you need to setup a standalone Kafka cluster, create a test topic, implement and run a simple Kafka producer (as described in this old post of mines), and then create a test keyspace and a table in Cassandra:


 CREATE KEYSPACE IF NOT EXISTS KafkaWordCount WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};

  CREATE TABLE IF NOT EXISTS KafkaWordCount.wordcounts (word text PRIMARY KEY, count text);

 
This post describes only the very basics to start integration of these three technologies. Much more needs to be considered when implementing a streaming pipeline, but the goal here is to give you a good idea on how to start. Let's discover more details (and issues) in future posts.

Comments

Popular posts from this blog

Exporting InfluxDB data to a CVS file

Sometimes you would need to export a sample of the data from an InfluxDB table to a CSV file (for example to allow a data scientist to do some offline analysis using a tool like Jupyter, Zeppelin or Spark Notebook). It is possible to perform this operation through the influx command line client. This is the general syntax: sudo /usr/bin/influx -database '<database_name>' -host '<hostname>' -username '<username>'  -password '<password>' -execute 'select_statement' -format '<format>' > <file_path>/<file_name>.csv where the format could be csv , json or column . Example: sudo /usr/bin/influx -database 'telegraf' -host 'localhost' -username 'admin'  -password '123456789' -execute 'select * from mem' -format 'csv' > /home/googlielmo/influxdb-export/mem-export.csv

jOOQ: code generation in Eclipse

jOOQ allows code generation from a database schema through ANT tasks, Maven and shell command tools. But if you're working with Eclipse it's easier to create a new Run Configuration to perform this operation. First of all you have to write the usual XML configuration file for the code generation starting from the database: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <configuration xmlns="http://www.jooq.org/xsd/jooq-codegen-2.0.4.xsd">   <jdbc>     <driver>oracle.jdbc.driver.OracleDriver</driver>     <url>jdbc:oracle:thin:@dbhost:1700:DBSID</url>     <user>DB_FTRS</user>     <password>password</password>   </jdbc>   <generator>     <name>org.jooq.util.DefaultGenerator</name>     <database>       <name>org.jooq.util.oracle.OracleDatabase</name>     ...

Turning Python Scripts into Working Web Apps Quickly with Streamlit

 I just realized that I am using Streamlit since almost one year now, posted about in Twitter or LinkedIn several times, but never wrote a blog post about it before. Communication in Data Science and Machine Learning is the key. Being able to showcase work in progress and share results with the business makes the difference. Verbal and non-verbal communication skills are important. Having some tool that could support you in this kind of conversation with a mixed audience that couldn't have a technical background or would like to hear in terms of results and business value would be of great help. I found that Streamlit fits well this scenario. Streamlit is an Open Source (Apache License 2.0) Python framework that turns data or ML scripts into shareable web apps in minutes (no kidding). Python only: no front‑end experience required. To start with Streamlit, just install it through pip (it is available in Anaconda too): pip install streamlit and you are ready to execute the working de...