Skip to main content

Unit testing Spark applications in Scala (Part 2): Intro to spark-testing-base

In the first part of this series we became familiar with ScalaTest. When it comes to unit test Scala Spark applications ScalaTest isn't enough: you need to add to the roster spark-testing-base. It is an Open Source framework which provides base classes for the main Spark abstractions like SparkContext, RDD, DataFrame, DataSet and Streaming. Let's start to explore all of the facilities provided by this framework and how it works along with ScalaTest with some simple examples. Let's consider the following Scala word count example found on the web:

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount { 
  def main(args: Array[String]) { 
   val inputFile = args(0) 
   val outputFile = args(1) 
   val conf = new SparkConf().setAppName("SparkWordCount") 
   // Create a Scala Spark Context. 
   val sc = new SparkContext(conf) 
   // Load our input data. 
   val input = sc.textFile(inputFile) 
   // Split up into words. 
   val words = input.flatMap(line => line.split(" ")) 
   // Transform into word and count. 
   val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y} 
   // Save the word count back out to a text file, causing evaluation.   counts.saveAsTextFile(outputFile) 
  } 
}

This class expects a text file as input and first splits all of the words this contains and finally counts all of the occurrences for each one.
Add the ScalaTest dependency to sbt or Maven as described in the first part of this series. Then add the scala-testing-base dependency to sbt:
libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.11" % "2.1.0_0.6.0"
or Maven:

<dependency>
    <groupId>com.holdenkarau</groupId>
    <artifactId>spark-testing-base_2.11</artifactId>
    <version>2.1.0_0.6.0</version>
</dependency>

The code above is definitively not the best way to implement this class. Probably a better way (in terms of readability and maintenance) could be the following, implementing two separate methods (one for the mapping and one for the reduction by key):

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SparkWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkWordCount")

    val sc = new SparkContext(conf)
  
    val file = sc.textFile(args(0))
    countWordsInFile(file).saveAsTextFile(args(1))
  }
 
  def countWordsInFile = splitFile _ andThen countWords _
 
  def splitFile(wordsByLine: RDD[String]): RDD[String] = {
    wordsByLine.flatMap(line => line.split(" "))
  }

  def countWords(words: RDD[String]): RDD[(String, Int)] = {
    words.map(word => (word, 1)).reduceByKey(_ + _)
  }
}


Let's implement a test class for it now. We are going to combine ScalaTest and spark-testing-base together. Let's choose a ScalaTest style (for example FunSuite) and the SharedSparkContext trait:

class SparkWordCountScalaTestingBaseSuite extends FunSuite with SharedSparkContext

SharedSparkContext initializes (before the tests start) and provides a SparkContext instance to be shared through all of the test in the class and stops it at the end of the test method execution. So no need to add extra code to initialize it and stop it. This way the development of a test class focuses only on the body of the test methods. For the class under test above is:

val fileLines = Array("Line One", "Line Two", "Line Three", "Line Four")

test("splitFile should split the file into words"){
    val inputRDD: RDD[String] = sc.parallelize[String](fileLines)
    val wordsRDD = SparkWordCount.splitFile(inputRDD)
    assert(wordsRDD.count() == 8)
}


test("countWordsInFile should count words") {
    val inputRDD: RDD[String] = sc.parallelize[String](fileLines)
    val results = SparkWordCount.countWordsInFile(inputRDD).collect
    assert(results.contains(("Line", 4)))
}


sc is the variable name of the built-in SparkContext provided by SharedSparkContext.
In the next post(s) we will walk through all of the other base classes provided by the spark-testing-base framework, starting from those related to RDDs.

Comments

Post a Comment

Popular posts from this blog

Exporting InfluxDB data to a CVS file

Sometimes you would need to export a sample of the data from an InfluxDB table to a CSV file (for example to allow a data scientist to do some offline analysis using a tool like Jupyter, Zeppelin or Spark Notebook). It is possible to perform this operation through the influx command line client. This is the general syntax: sudo /usr/bin/influx -database '<database_name>' -host '<hostname>' -username '<username>'  -password '<password>' -execute 'select_statement' -format '<format>' > <file_path>/<file_name>.csv where the format could be csv , json or column . Example: sudo /usr/bin/influx -database 'telegraf' -host 'localhost' -username 'admin'  -password '123456789' -execute 'select * from mem' -format 'csv' > /home/googlielmo/influxdb-export/mem-export.csv

jOOQ: code generation in Eclipse

jOOQ allows code generation from a database schema through ANT tasks, Maven and shell command tools. But if you're working with Eclipse it's easier to create a new Run Configuration to perform this operation. First of all you have to write the usual XML configuration file for the code generation starting from the database: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <configuration xmlns="http://www.jooq.org/xsd/jooq-codegen-2.0.4.xsd">   <jdbc>     <driver>oracle.jdbc.driver.OracleDriver</driver>     <url>jdbc:oracle:thin:@dbhost:1700:DBSID</url>     <user>DB_FTRS</user>     <password>password</password>   </jdbc>   <generator>     <name>org.jooq.util.DefaultGenerator</name>     <database>       <name>org.jooq.util.oracle.OracleDatabase</name>     ...

Turning Python Scripts into Working Web Apps Quickly with Streamlit

 I just realized that I am using Streamlit since almost one year now, posted about in Twitter or LinkedIn several times, but never wrote a blog post about it before. Communication in Data Science and Machine Learning is the key. Being able to showcase work in progress and share results with the business makes the difference. Verbal and non-verbal communication skills are important. Having some tool that could support you in this kind of conversation with a mixed audience that couldn't have a technical background or would like to hear in terms of results and business value would be of great help. I found that Streamlit fits well this scenario. Streamlit is an Open Source (Apache License 2.0) Python framework that turns data or ML scripts into shareable web apps in minutes (no kidding). Python only: no front‑end experience required. To start with Streamlit, just install it through pip (it is available in Anaconda too): pip install streamlit and you are ready to execute the working de...