mercoledì 30 agosto 2017

Publish to Kafka Jenkins plugin talk @ Dublin Jenkins Meetup

I am going to present a preview of the next release 1.5 and insights about the future release 2.0 of the publishtokafka Jenkins plugin at the Dublin Jenkins meetup on August 31st. Please come in if you're in the Dublin area to learn more about this plugin of mine and to have a chat about Jenkins stuff. Here's the map for the event:

lunedì 31 luglio 2017

Streamsets Data Collector pipeline execution scheduling through the SDC REST APIs

A hot topic in the sdc-user group during the past weeks has been about how to schedule the start and stop of SDC pipelines. Usage of the SDC REST APIs has been suggested in some threads, but because the general impression I have is that the audience doesn't have a clear idea about them, I decided to write an article on DZone to help and clarify once and for all how to do it. Enjoy it!

giovedì 8 giugno 2017

Unit testing Spark applications in Scala (Part 2): Intro to spark-testing-base

In the first part of this series we became familiar with ScalaTest. When it comes to unit test Scala Spark applications ScalaTest isn't enough: you need to add to the roster spark-testing-base. It is an Open Source framework which provides base classes for the main Spark abstractions like SparkContext, RDD, DataFrame, DataSet and Streaming. Let's start to explore all of the facilities provided by this framework and how it works along with ScalaTest with some simple examples. Let's consider the following Scala word count example found on the web:

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount { 
  def main(args: Array[String]) { 
   val inputFile = args(0) 
   val outputFile = args(1) 
   val conf = new SparkConf().setAppName("SparkWordCount") 
   // Create a Scala Spark Context. 
   val sc = new SparkContext(conf) 
   // Load our input data. 
   val input = sc.textFile(inputFile) 
   // Split up into words. 
   val words = input.flatMap(line => line.split(" ")) 
   // Transform into word and count. 
   val counts = => (word, 1)).reduceByKey{case (x, y) => x + y} 
   // Save the word count back out to a text file, causing evaluation.   counts.saveAsTextFile(outputFile) 

This class expects a text file as input and first splits all of the words this contains and finally counts all of the occurrences for each one.
Add the ScalaTest dependency to sbt or Maven as described in the first part of this series. Then add the scala-testing-base dependency to sbt:
libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.11" % "2.1.0_0.6.0"
or Maven:


The code above is definitively not the best way to implement this class. Probably a better way (in terms of readability and maintenance) could be the following, implementing two separate methods (one for the mapping and one for the reduction by key):

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SparkWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkWordCount")

    val sc = new SparkContext(conf)
    val file = sc.textFile(args(0))
  def countWordsInFile = splitFile _ andThen countWords _
  def splitFile(wordsByLine: RDD[String]): RDD[String] = {
    wordsByLine.flatMap(line => line.split(" "))

  def countWords(words: RDD[String]): RDD[(String, Int)] = { => (word, 1)).reduceByKey(_ + _)

Let's implement a test class for it now. We are going to combine ScalaTest and spark-testing-base together. Let's choose a ScalaTest style (for example FunSuite) and the SharedSparkContext trait:

class SparkWordCountScalaTestingBaseSuite extends FunSuite with SharedSparkContext

SharedSparkContext initializes (before the tests start) and provides a SparkContext instance to be shared through all of the test in the class and stops it at the end of the test method execution. So no need to add extra code to initialize it and stop it. This way the development of a test class focuses only on the body of the test methods. For the class under test above is:

val fileLines = Array("Line One", "Line Two", "Line Three", "Line Four")

test("splitFile should split the file into words"){
    val inputRDD: RDD[String] = sc.parallelize[String](fileLines)
    val wordsRDD = SparkWordCount.splitFile(inputRDD)
    assert(wordsRDD.count() == 8)

test("countWordsInFile should count words") {
    val inputRDD: RDD[String] = sc.parallelize[String](fileLines)
    val results = SparkWordCount.countWordsInFile(inputRDD).collect
    assert(results.contains(("Line", 4)))

sc is the variable name of the built-in SparkContext provided by SharedSparkContext.
In the next post(s) we will walk through all of the other base classes provided by the spark-testing-base framework, starting from those related to RDDs.

martedì 6 giugno 2017

3 signs your company doesn't have a DevOps culture

If in your company one or more of these 3 things happens:
  • new software releases use to go to production quarterly
  • there are QA teams
  • there are DevOps teams
then it's official: no DevOps culture over there.
Good news is that now you know where the problems are and could start fixing ;)

lunedì 5 giugno 2017

Hubot & SDC

My first Open Source Hubot script has been released and is available in my GitHub space. It provides support to check the status of pipelines in a Streamsets Data Collector server. It is still in alpha release, but the development is ongoing, so new features and improvements will be constantly implemented. Enjoy it!