Skip to main content

Quick start with Apache Livy (part 1)

I have started doing evaluation of Livy for potential case scenarios where this technology could help and I'd like to share some findings with others who would like to approach this interesting Open Source project. It has been started by Cloudera and Microsoft and it is currently in the process of being incubated by the Apache Software Foundation. The official documentation isn't comprehensive at the moment, so I hope my posts on this topic could help someone else.

Apache Livy is a service to interact with Apache Spark through a REST interface. It enables both submissions of Spark jobs or snippets of Spark code. The following features are supported:
  • The jobs can be submitted as pre-compiled jars, snippets of code or via Java/Scala client API.
  • Interactive Scala, Python, and R shells.
  • Support for Spark 2.x and Spark1.x, Scala 2.10 and 2.11.
  • It doesn't require any change to Spark code.
  • It allows long running Spark Contexts that can be used for multiple Spark jobs, by multiple clients.
  • Multiple Spark Contexts can be managed simultaneously: they run on the cluster instead of the Livy Server, in order to have good fault tolerance and concurrency.
  • Possibility to share cached RDDs or Dataframes across multiple jobs and clients.
  • Secure authenticated communication.
The following image, taken from the official website, shows what happens when submitting Spark jobs/code through the Livy REST APIs:


In the second part of this series I am going to cover the details on starting a Livy server and submitting PySpark code.

Comments

  1. Hi,
    I have write this class:

    import org.apache.livy.Job;
    import org.apache.livy.JobContext;
    import org.apache.spark.api.java.JavaRDD;

    import java.util.ArrayList;
    import java.util.Arrays;


    public class YourJob implements Job {

    public Long call(JobContext jc) throws Exception {
    ArrayList list= new ArrayList<>();
    list.add(1l);
    list.add(2l);
    list.add(3l);
    list.add(4l);
    list.add(5l);
    JavaRDD rdd=jc.sc().parallelize(list);
    return rdd.count();
    }
    }

    ---------------
    and then I write the client to contact with GCP spark server

    import org.apache.livy.JobHandle;
    import org.apache.livy.LivyClient;
    import org.apache.livy.LivyClientBuilder;


    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;

    public class YourJobClient {
    public static void main(String[]args) throws URISyntaxException, IOException, InterruptedException, ExecutionException, TimeoutException {


    LivyClient client = new LivyClientBuilder()
    .setURI(new URI("http://localhost:8998"))
    // .setURI(new URI("http://35.204.128.185:8998"))
    .build();

    JobHandle handle =client.submit(new YourJob() );

    Long result=handle.get(10000, TimeUnit.SECONDS);
    client.stop(true);

    }
    }

    and I have test the client in local and remote server.
    In both cases it give me error that livy cannot find YourJob class!

    ReplyDelete

Post a Comment

Popular posts from this blog

Streamsets Data Collector log shipping and analysis using ElasticSearch, Kibana and... the Streamsets Data Collector

One common use case scenario for the Streamsets Data Collector (SDC) is the log shipping to some system, like ElasticSearch, for real-time analysis. To build a pipeline for this particular purpose in SDC is really simple and fast and doesn't require coding at all. For this quick tutorial I will use the SDC logs as example. The log data will be shipped to Elasticsearch and then visualized through a Kibana dashboard. Basic knowledge of SDC, Elasticsearch and Kibana is required for a better understanding of this post. These are the releases I am referring to for each system involved in this tutorial: JDK 8 Streamsets Data Collector 1.4.0 ElasticSearch 2.3.3 Kibana 4.5.1 Elasticsearch and Kibana installation You should have your Elasticsearch cluster installed and configured and a Kibana instance pointing to that cluster in order to go on with this tutorial. Please refer to the official documentation for these two products in order to complete their installation (if you do

Exporting InfluxDB data to a CVS file

Sometimes you would need to export a sample of the data from an InfluxDB table to a CSV file (for example to allow a data scientist to do some offline analysis using a tool like Jupyter, Zeppelin or Spark Notebook). It is possible to perform this operation through the influx command line client. This is the general syntax: sudo /usr/bin/influx -database '<database_name>' -host '<hostname>' -username '<username>'  -password '<password>' -execute 'select_statement' -format '<format>' > <file_path>/<file_name>.csv where the format could be csv , json or column . Example: sudo /usr/bin/influx -database 'telegraf' -host 'localhost' -username 'admin'  -password '123456789' -execute 'select * from mem' -format 'csv' > /home/googlielmo/influxdb-export/mem-export.csv

Using Rapids cuDF in a Colab notebook

During last Spark+AI Summit Europe 2019 I had a chance to attend a talk from Miguel Martinez  who was presenting Rapids , the new Open Source framework from NVIDIA for GPU accelerated end-to-end Data Science and Analytics. Fig. 1 - Overview of the Rapids eco-system Rapids is a suite of Open Source libraries: cuDF cuML cuGraph cuXFilter I enjoied the presentation and liked the idea of this initiative, so I wanted to start playing with the Rapids libraries in Python on Colab , starting from cuDF, but the first attempt came with an issue that I eventually solved. So in this post I am going to share how I fixed it, with the hope it would be useful to someone else running into the same blocker. I am assuming here you are already familiar with Google Colab. I am using Python 3.x as Python 2 isn't supported by Rapids. Once you have created a new notebook in Colab, you need to check if the runtime for it is set to use Python 3 and uses a GPU as hardware accelerator. You