martedì 18 dicembre 2018

Exploring the Spline Data Tracker and Visualization tool for Apache Spark (Part 2)

In part 1 we have learned how to test data lineage info collection with Spline from a Spark shell. The same can be done in any Scala or Java Spark application. The same dependencies for the Spark shell need to be registered in your build tool of choice (Maven, Gradle or sbt):

groupId: za.co.absa.spline
artifactId: spline-core
version: 0.3.5

groupId: za.co.absa.spline
artifactId: spline-persistence-mongo
version:0.3.5

groupId: za.co.absa.spline
artifactId:spline-core-spark-adapter-2.3
version:0.3.5

With reference to Scala and Spark 2.3.x, a Spark job like this:

// Create the Spark session
val sparkSession = SparkSession
   .builder()
   .appName("Spline Tester")
   .getOrCreate()
 
// Init Spline
System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.mongo.MongoPersistenceFactory")
System.setProperty("spline.mongodb.url", args(0))
System.setProperty("spline.mongodb.name", args(1))
import za.co.absa.spline.core.SparkLineageInitializer._
sparkSession.enableLineageTracking()

//Do something with DataFrames
import sparkSession.sqlContext.implicits._
val df1 = sparkSession.sparkContext.parallelize(1 to 10000, 42).toDF("FirstValue")
val df2 = sparkSession.sparkContext.parallelize(1.to(100000, 17), 42).toDF("SecondValue")

val output = df1.crossJoin(df2).where('FirstValue % 42 === 'SecondValue % 42)

// Write results to file system
output.write.format("parquet").save("splinetester.parquet")

// Stop the Spark Session
sparkSession.stop()


can be submitted to a Spark cluster this way:

$SPARK_HOME/bin/spark-submit --class org.googlielmo.splinetest.SplineExample --master <url> --packages "za.co.absa.spline:spline-core:0.3.5,za.co.absa.spline:spline-persistence-mongo:0.3.5,za.co.absa.spline:spline-core-spark-adapter-2.3:0.3.5" splinetest-1.0.jar mongodb://<username>:<password>@<hostname>:<port> <dbname>

The Spline configuration properties can be also stored into a properties file in the application classpath. Here's the full list of the available Spline properties:

  • spline.mode: 3 possible values, BEST_EFFORT (default), DISABLED, REQUIRED. If BEST_EFFORT, Spline tries to initialize itself, but if fails it switches to DISABLED mode so that the Spark application can proceed normally with no lineage tracking. If DISABLED, no lineage tracking at all happens. If REQUIRED, whether Spline should fail, for any reason, to initialize itself, the Spark application aborts with an error.
  • spline.persistence.factory: could be za.co.absa.spline.persistence.mongo.MongoPersistenceFactory (for persistence to MongoDB) or za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory (for persistence to HDFS).
  • spline.mongodb.url: the MongoDB connection string (for MongoDB persistence only).
  • spline.mongodb.name: the MongoDB database name (for MongoDB persistence only).
  • spline.persistence.composition.factories: a comma separated list of factories to delegate to (in case of Composition Factories only).

The first time Spline is enabled from a Spark job, it creates 6 collections into the destination MongoDB database:

  • attributes_v4: info about the attributes of the involved  Spark Datasets.
  • dataTypes_v4: info about the data types for each data lineage.
  • datasets_v4: info about the DataSets.
  • lineages_v4: the data lineages graphs for Spark Datasets.
  • operations_v4: the operations on DataSets across lineages.
  • transformations_v4: the transformations on DataSets across lineages.

The documents in those 6 collections are used by the Spline web application to generate the visual representation of the lineages in the UI.
In the third and last part of this series, I am going to share the outcome after the first weeks of adoption of this project in Spark pre-production environments.

giovedì 29 novembre 2018

Exploring the Spline Data Tracker and Visualization tool for Apache Spark (Part 1)

One interesting and promising Open Source project that caught my attention lately is Spline, a data lineage tracking and visualization tool for Apache Spark, maintained at Absa. This project consists of 2 parts: a Scala library that works on the drivers which, by analyzing the Spark execution plans, captures the data lineages and a web application which provides a UI to visualize them.
Spline supports MongoDB and HDFS as storage systems for the data lineages in JSON format. In this post I am referring to MongoDB.
You can start playing with Spline through the Spark shell. Just add the required dependencies to the shell classpath as follows (with reference to the latest 0.3.5 release of this project):

spark-shell --packages "za.co.absa.spline:spline-core:0.3.5,za.co.absa.spline:spline-persistence-mongo:0.3.5,za.co.absa.spline:spline-core-spark-adapter-2.3:0.3.5"

Running the Spark shell with the command above on Ubuntu and some other Linux distro, whether some issue on downloading the Joda Time library (transitive dependency for one of the Spline components) should occur, please delete the .ivy1 and .m2 hidden sub-directories of the directory where the spark-shell command has been executed and then re-run it.
Assuming you have your Mongo server up and running and that you have already created an empty database for Spline, the first thing you need to do in the Spark shell is to specify the persistence factory class to use and then the connection string and the database name:

System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.mongo.MongoPersistenceFactory")
System.setProperty("spline.mongodb.url", "mongodb://<username>:<password>@<server_name_or_ip>:<port>")
System.setProperty("spline.mongodb.name", "<database_name>")


You can now enable the Spline data lineage tracking:

import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()


and then start doing something which involves data:

val employeesJson =
    spark.read.json("/home/guglielmo/spark-2.3.2-bin-hadoop2.7/examples/src/main/resources/employees.json")

import spark.implicits._
val employeeNames = employeesJson.select(employeesJson("name"))
employeeNames.write.parquet("/home/guglielmo/spline/example/employee_names")


Whether the following exception should happen:

com.mongodb.MongoCommandException: Command failed with error 9: 'The 'cursor' option is required, except for aggregate with the explain argument' on server localhost:27017. The full response is { "ok" : 0.0, "errmsg" : "The 'cursor' option is required, except for aggregate with the explain argument", "code" : 9, "codeName" : "FailedToParse" }

then you have to update the MongoDB Java driver dependency to any release 3.6+ (it could be done by simply adding it to the list of packages when running the spark-shell command).
Starting the Spline web application:

java -jar spline-web-0.3.5-exec-war.jar -Dspline.mongodb.url=mongodb://<username>:<password>@<server_name_or_ip>:<port> -Dspline.mongodb.name=<database_name>

you can see the captured data lineage in the web UI (the default listening port is 8080):




This is just a starter. In part 2 of this series we are going to explore Spline under the hood.

mercoledì 21 novembre 2018

Black Friday @Packt Publishing!

This Friday November 23rd 2018 would be Black Friday at Packt Publishing too! Each book or video, including the latest releases, could be purchased for US$ 10 only. It would be also possible to pre-order my upcoming book "Hands-on Deep Learning with Apache Spark" for US$ 10. Please remember that this convenient price is valid on Friday 23rd only. Enjoy it!


domenica 30 settembre 2018

Ultralight Data Movement for IoT with SDC Edge @ Predict 2018

On October 2nd 2018 I am going to give a talk at the Predict 2018 conference, RDS, Dublin, Ireland.


My talk will start at 2:20 PM. It is part of the Technology IoT & Manufacturing 4.0 section.
Please feel free to get in touch during the conference day to discuss about IIoT, Open Source adoption, data streaming, edge analytics, Deep Learning and more.  

sabato 25 agosto 2018

AI with the Best 2018 conference


I am proud to share that I will give a talk at the AI With the Best 2018 conference. The title of my talk is "Why Scala for Data Science?" and it is part of the "AI in Action" track. There I am going to cover some topics of my upcoming book. The conference will happen on September Friday 14th 2018. It is an online event. Buying a ticket for this event will give you also access to the recording of all the talks in the next 2 months after the conference end (just in case you should miss some during the live streaming). You will have also a chance to interact with the speakers and book 1:1 time with some of them. Please have a look at the list of speakers and talk topics: it is very impressive. I hope you will attend it!