Skip to main content

Streamsets Data Collector log shipping and analysis using ElasticSearch, Kibana and... the Streamsets Data Collector

One common use case scenario for the Streamsets Data Collector (SDC) is the log shipping to some system, like ElasticSearch, for real-time analysis. To build a pipeline for this particular purpose in SDC is really simple and fast and doesn't require coding at all. For this quick tutorial I will use the SDC logs as example. The log data will be shipped to Elasticsearch and then visualized through a Kibana dashboard. Basic knowledge of SDC, Elasticsearch and Kibana is required for a better understanding of this post.
These are the releases I am referring to for each system involved in this tutorial:
  • JDK 8
  • Streamsets Data Collector 1.4.0
  • ElasticSearch 2.3.3
  • Kibana 4.5.1

Elasticsearch and Kibana installation

You should have your Elasticsearch cluster installed and configured and a Kibana instance pointing to that cluster in order to go on with this tutorial. Please refer to the official documentation for these two products in order to complete their installation (if you don't have already done it).

Streamsets Data Collector set up

You need then a single instance SDC (no need to run a cluster for this particular scenario) up and running as a service. Please follow the instructions in the official documentation if this is the first time you do this.

SDC logs pattern

The logs produced by SDC have their own specific pattern that doesn't fall into one of the structured formats (common, combined, Apache error and access, log4j) provided by the product for log files. So the best way to parse them is to use a Grok expression. A typical line of a SDC log file follows this pattern:

<timestamp> [user:<username>] [pipeline:<pipeline_name>] [thread:<thread_name>] <log_level>  <class_name> <message_separator> <message>

So a possible Grok pattern to use for it could be the following:

%{TIMESTAMP_ISO8601:timestamp} %{DATA:user} %{DATA:pipeline} %{DATA:thread} %{LOGLEVEL: level}  %{WORD:class} %{DATA:sep} %{GREEDYDATA:message}

Please notice that there are 2 spaces between the log level and the class name.

Create an index on Elasticsearch

Now that we know the pattern of the logs to ship we can create an index for them in Elasticsearch:

curl -XPUT 'http://<es_host>:<es_port>/sdclogs' -d '{
    "mappings": {
        "logs" : {
            "properties" : {
                "timestamp": {"type": "date"},
                "level": {"type": "string"},
                "message": {"type": "string"},
                "user": {"type": "string"},
                "pipeline": {"type": "string"},
                "class": {"type": "string"}
            }
        }
    }
}'


Run the following command then to verify that the index has been successfully created:

curl -XHEAD -i 'http://<es_host>:<es_port>/sdclogs'


Pipeline creation

Connect to the SDC UI as administrator and create a new empty pipeline. Then select an origin for the data. My suggestion is to use the File Tail origin rather than the Directory origin because in this second option if for some reason a log line doesn't match the selected Grok pattern the whole file will be skipped, while in the first option only the single log entry will be sent to the error bin and the parse of the log(s) file in the selected path will go on any way. This is the list of properties to set up for the origin in the Files tab:
  • Data Format: Log
  • File to Tail:
    • Path: $SDC_LOGS_DIR/${PATTERN}
    • Pattern: a regular expression to get the SDC logs only.
and in the Log tab:
  • Log Format: Grok Pattern
  • Grok Pattern: %{TIMESTAMP_ISO8601:timestamp} %{DATA:user} %{DATA:pipeline} %{DATA:thread} %{LOGLEVEL: level}  %{WORD:class} %{DATA:sep} %{GREEDYDATA:message}

Select, of course, Elasticsearch as destination and set up the following properties for it in the Elasticsearch tab:
  • Cluster Name: the name of your Elasticsearch cluster.
  • Cluster URIs: a comma-separated list of host:port pairs for one or more nodes of your cluster. For this property you have to specify the Elasticsearch transport protocol port (which by default is set to 9300), not its HTTP APIs port.
  • Cluster HTTP URI: the host:port pair to connect via HTTP to your Elasticsearch cluster (the same values you used to create the index previously).
  • Index: the name of the index created previously.
  • Mappings: the name of the mappings for the index to use. In the example above it is logs.
If you don't need the log files metadata you can send them to a Trash destination.
You could choose to discard some of the fields in the original records, like the message separator, that are meaningless in terms of log analysis. Use a Field Remover processor to do this.
Finally, before sending the records to the final destination, you need to convert the timestamp in a format suitable for Elasticsearch. Using a Field Converter processor you can change the type of the timestamp field to DATETIME using the yyyy-MM-dd HH:mm:ss date format.
Now you can connect all of the blocks in the pipeline and at the end you should have something like this:



Running the pipeline

Start the pipeline and check for the data quality through the dedicated widgets.

Set up Kibana to explore the data

Connect to the Kibana UI and then configure the Elasticsearch index pattern to perform search and analytics. Move to the Settings -> Indexes section, specify the index name (sdclogs) and click on the Create button. Then move to the Discover section, select the sdclogs index and have a look at the data:



Now you can perform any search and setup a custom dashboard for them.

Conclusion

To setup a pipeline for log shipping through SDC it is a matter of minutes. The overall process described above isn't restricted to the SDC logs, but it applies to any kind of log shipping into Elasticsearch. In order to keep the post the much simple as possible I have omitted other pipeline stages customization details and considerations about accessing a secured Elasticsearch, but after reading this tutorial it should be easier for you to understand how to customize the pipeline for any specific need you could have. 

Comments

Popular posts from this blog

Exporting InfluxDB data to a CVS file

Sometimes you would need to export a sample of the data from an InfluxDB table to a CSV file (for example to allow a data scientist to do some offline analysis using a tool like Jupyter, Zeppelin or Spark Notebook). It is possible to perform this operation through the influx command line client. This is the general syntax: sudo /usr/bin/influx -database '<database_name>' -host '<hostname>' -username '<username>'  -password '<password>' -execute 'select_statement' -format '<format>' > <file_path>/<file_name>.csv where the format could be csv , json or column . Example: sudo /usr/bin/influx -database 'telegraf' -host 'localhost' -username 'admin'  -password '123456789' -execute 'select * from mem' -format 'csv' > /home/googlielmo/influxdb-export/mem-export.csv

Using Rapids cuDF in a Colab notebook

During last Spark+AI Summit Europe 2019 I had a chance to attend a talk from Miguel Martinez  who was presenting Rapids , the new Open Source framework from NVIDIA for GPU accelerated end-to-end Data Science and Analytics. Fig. 1 - Overview of the Rapids eco-system Rapids is a suite of Open Source libraries: cuDF cuML cuGraph cuXFilter I enjoied the presentation and liked the idea of this initiative, so I wanted to start playing with the Rapids libraries in Python on Colab , starting from cuDF, but the first attempt came with an issue that I eventually solved. So in this post I am going to share how I fixed it, with the hope it would be useful to someone else running into the same blocker. I am assuming here you are already familiar with Google Colab. I am using Python 3.x as Python 2 isn't supported by Rapids. Once you have created a new notebook in Colab, you need to check if the runtime for it is set to use Python 3 and uses a GPU as hardware accelerator. You