Skip to main content

Unit testing Spark applications in Scala (Part 2): Intro to spark-testing-base

In the first part of this series we became familiar with ScalaTest. When it comes to unit test Scala Spark applications ScalaTest isn't enough: you need to add to the roster spark-testing-base. It is an Open Source framework which provides base classes for the main Spark abstractions like SparkContext, RDD, DataFrame, DataSet and Streaming. Let's start to explore all of the facilities provided by this framework and how it works along with ScalaTest with some simple examples. Let's consider the following Scala word count example found on the web:

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount { 
  def main(args: Array[String]) { 
   val inputFile = args(0) 
   val outputFile = args(1) 
   val conf = new SparkConf().setAppName("SparkWordCount") 
   // Create a Scala Spark Context. 
   val sc = new SparkContext(conf) 
   // Load our input data. 
   val input = sc.textFile(inputFile) 
   // Split up into words. 
   val words = input.flatMap(line => line.split(" ")) 
   // Transform into word and count. 
   val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y} 
   // Save the word count back out to a text file, causing evaluation.   counts.saveAsTextFile(outputFile) 
  } 
}

This class expects a text file as input and first splits all of the words this contains and finally counts all of the occurrences for each one.
Add the ScalaTest dependency to sbt or Maven as described in the first part of this series. Then add the scala-testing-base dependency to sbt:
libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.11" % "2.1.0_0.6.0"
or Maven:

<dependency>
    <groupId>com.holdenkarau</groupId>
    <artifactId>spark-testing-base_2.11</artifactId>
    <version>2.1.0_0.6.0</version>
</dependency>

The code above is definitively not the best way to implement this class. Probably a better way (in terms of readability and maintenance) could be the following, implementing two separate methods (one for the mapping and one for the reduction by key):

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SparkWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkWordCount")

    val sc = new SparkContext(conf)
  
    val file = sc.textFile(args(0))
    countWordsInFile(file).saveAsTextFile(args(1))
  }
 
  def countWordsInFile = splitFile _ andThen countWords _
 
  def splitFile(wordsByLine: RDD[String]): RDD[String] = {
    wordsByLine.flatMap(line => line.split(" "))
  }

  def countWords(words: RDD[String]): RDD[(String, Int)] = {
    words.map(word => (word, 1)).reduceByKey(_ + _)
  }
}


Let's implement a test class for it now. We are going to combine ScalaTest and spark-testing-base together. Let's choose a ScalaTest style (for example FunSuite) and the SharedSparkContext trait:

class SparkWordCountScalaTestingBaseSuite extends FunSuite with SharedSparkContext

SharedSparkContext initializes (before the tests start) and provides a SparkContext instance to be shared through all of the test in the class and stops it at the end of the test method execution. So no need to add extra code to initialize it and stop it. This way the development of a test class focuses only on the body of the test methods. For the class under test above is:

val fileLines = Array("Line One", "Line Two", "Line Three", "Line Four")

test("splitFile should split the file into words"){
    val inputRDD: RDD[String] = sc.parallelize[String](fileLines)
    val wordsRDD = SparkWordCount.splitFile(inputRDD)
    assert(wordsRDD.count() == 8)
}


test("countWordsInFile should count words") {
    val inputRDD: RDD[String] = sc.parallelize[String](fileLines)
    val results = SparkWordCount.countWordsInFile(inputRDD).collect
    assert(results.contains(("Line", 4)))
}


sc is the variable name of the built-in SparkContext provided by SharedSparkContext.
In the next post(s) we will walk through all of the other base classes provided by the spark-testing-base framework, starting from those related to RDDs.

Comments

Post a Comment

Popular posts from this blog

Turning Python Scripts into Working Web Apps Quickly with Streamlit

 I just realized that I am using Streamlit since almost one year now, posted about in Twitter or LinkedIn several times, but never wrote a blog post about it before. Communication in Data Science and Machine Learning is the key. Being able to showcase work in progress and share results with the business makes the difference. Verbal and non-verbal communication skills are important. Having some tool that could support you in this kind of conversation with a mixed audience that couldn't have a technical background or would like to hear in terms of results and business value would be of great help. I found that Streamlit fits well this scenario. Streamlit is an Open Source (Apache License 2.0) Python framework that turns data or ML scripts into shareable web apps in minutes (no kidding). Python only: no front‑end experience required. To start with Streamlit, just install it through pip (it is available in Anaconda too): pip install streamlit and you are ready to execute the working de...

jOOQ: code generation in Eclipse

jOOQ allows code generation from a database schema through ANT tasks, Maven and shell command tools. But if you're working with Eclipse it's easier to create a new Run Configuration to perform this operation. First of all you have to write the usual XML configuration file for the code generation starting from the database: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <configuration xmlns="http://www.jooq.org/xsd/jooq-codegen-2.0.4.xsd">   <jdbc>     <driver>oracle.jdbc.driver.OracleDriver</driver>     <url>jdbc:oracle:thin:@dbhost:1700:DBSID</url>     <user>DB_FTRS</user>     <password>password</password>   </jdbc>   <generator>     <name>org.jooq.util.DefaultGenerator</name>     <database>       <name>org.jooq.util.oracle.OracleDatabase</name>     ...

Load testing MongoDB using JMeter

Apache JMeter ( http://jmeter.apache.org/ ) added support for MongoDB since its 2.10 release. In this post I am referring to the latest JMeter release (2.13). A preliminary JMeter setup is needed before starting your first test plan for MongoDB. It uses Groovy as scripting reference language, so Groovy needs to be set up for our favorite load testing tool. Follow these steps to complete the set up: Download Groovy from the official website ( http://www.groovy-lang.org/download.html ). In this post I am referring to the Groovy release 2.4.4, but using later versions is fine. Copy the groovy-all-2.4.4.jar to the $JMETER_HOME/lib folder. Restart JMeter if it was running while adding the Groovy JAR file. Now you can start creating a test plan for MongoDB load testing. From the UI select the MongoDB template ( File -> Templates... ). The new test plan has a MongoDB Source Config element. Here you have to setup the connection details for the database to be tested: The Threa...