Skip to main content

MRUnit Tutorial

Apache MRUnit (https://mrunit.apache.org/) is an Open Source library that allows unit-testing for Hadoop Mappers, Reducers, and MapReduce programs. It provides a convenient integration between MapReduce and standard testing libraries such as JUnit and Mockito and helps (providing a set of interfaces and test harnesses) bridging the gap between MapReduce programs and those traditional libraries. It doesn't replace JUnit, but works on top of it.
Before reading further, please be aware that knowledge of Hadoop MapReduce and JUnit is required for a better understanding of this post.
The three core classes of MRUnit are the following:
MapDriver: the driver class responsible for calling the Mapper’s map() method.
ReducerDriver: the driver class responsible for calling the Reducer’s reduce() method.
MapReduceDriver: the combined MapReduce driver responsible for calling the Mapper’s map() method first, followed by an in-memory Shuffle phase. At the end of this phase the Reducer’s reduce() method is invoked.
Each of the classes above has methods that allow to provide inputs and expected outputs for the tests. The JUnit API’s setup() method is responsible for creating new instances of the Mapper, Reducer, and the appropriate MRUnit drivers needed for each specific test purposes.

In order to add MRUnit to an Hadoop MapReduce project you need to add it as test dependency in the project POM file (of course the project is a Maven project,  I am sure you're not planning to skip Maven for this kind of project):

<dependency>
        <groupId>org.apache.mrunit</groupId>
        <artifactId>mrunit</artifactId>
        <version>1.1.0</version>
        <classifier>hadoop2</classifier>
        <scope>test</scope>

</dependency>

and of course JUnit should be present as well:

<dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>

</dependency>

I am not referring to any particular IDE in this discussion. The example project can be created and managed through Maven by a shell.You can then choose to import it in your favourite one.

The MapReduce application under test is the popular word counter (the Hello World of MapReduce). It processes text files and counts how often words occur. Browsing the web you can find hundred of links with this example, but just in case here's the code for the Mapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String w = value.toString();
        context.write(new Text(w), new IntWritable(1));
    }

}


and the Reducer

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}


Now let's start implementing a MRUnit test case. It has the same structure as for a JUnit test case. First of all declare the instance variable for the MapReduce test purposes. They include the drivers provided by the MRUnit framework:
private Mapper mapper;
private Reducer reducer;
private MapDriver mapDriver;
private ReduceDriver reduceDriver;
private MapReduceDriver mapReduceDriver;


Then create instances of them in the JUnit setUp() method:

@Before
public void setUp() throws Exception {
        mapper = new WordCountMapper();
        reducer = new WordCountReducer();
        mapDriver = new MapDriver(mapper);
        reduceDriver = new ReduceDriver(reducer);
        mapReduceDriver = new MapReduceDriver(mapper, reducer);

}

To test the Mapper you need just to provide the inputs and the expected output to the MapperDriver instance and then execute its runTest() method:

@Test
 public void testWordCountMapper() throws IOException {
        mapDriver.withInput(new LongWritable(1), new Text(firstTestKey))
        .withInput(new LongWritable(2), new Text(secondTestKey))
        .withOutput(new Text(firstTestKey), new IntWritable(1))
        .withOutput(new Text("blogspot"), new IntWritable(1))
        .runTest();
 }


As you can see from the code above, MRUnit supports multiple inputs. firstTestKey and secondTestKey are String variables you can initialize in the setUp() method as well.
It is the same process to test the Reducer

@Test
 public void testWordCountReducer() throws IOException {
        Text firstMapKey = new Text(firstTestKey);
        List<IntWritable> firstMapValues = new ArrayList<IntWritable>();
        firstMapValues.add(new IntWritable(1));
        firstMapValues.add(new IntWritable(1));
       
        Text secondMapKey = new Text(secondTestKey);
        List<IntWritable> secondMapValues = new ArrayList<IntWritable>();
        secondMapValues.add(new IntWritable(1));
        secondMapValues.add(new IntWritable(1));
        secondMapValues.add(new IntWritable(1));
       
        reduceDriver.withInput(firstMapKey, firstMapValues)
        .withInput(secondMapKey, secondMapValues)
        .withOutput(firstMapKey, new IntWritable(2))
        .withOutput(secondMapKey, new IntWritable(3))
        .runTest();
 }


and the overall MapReduce flow

@Test
 public void testWordCountMapReducer() throws IOException {
        mapReduceDriver.withInput(new LongWritable(1), new Text(firstTestKey))
        .withInput(new LongWritable(2), new Text(firstTestKey))
        .withInput(new LongWritable(3), new Text(secondTestKey))
        .withOutput(new Text(firstTestKey), new IntWritable(2))
        .withOutput(new Text(secondTestKey), new IntWritable(1))
        .runTest();
 }


Just the specific driver to use changes. Any MRUnit test case can be executed the same way as for the JUnit test cases. So you can run all of them together for your applications. When you execute the command

mvn test

Maven will run all of the unit tests (JUnit and MRUnit both) available for the given MapReduce application and generate their execution reports.

Comments

Popular posts from this blog

Streamsets Data Collector log shipping and analysis using ElasticSearch, Kibana and... the Streamsets Data Collector

One common use case scenario for the Streamsets Data Collector (SDC) is the log shipping to some system, like ElasticSearch, for real-time analysis. To build a pipeline for this particular purpose in SDC is really simple and fast and doesn't require coding at all. For this quick tutorial I will use the SDC logs as example. The log data will be shipped to Elasticsearch and then visualized through a Kibana dashboard. Basic knowledge of SDC, Elasticsearch and Kibana is required for a better understanding of this post. These are the releases I am referring to for each system involved in this tutorial: JDK 8 Streamsets Data Collector 1.4.0 ElasticSearch 2.3.3 Kibana 4.5.1 Elasticsearch and Kibana installation You should have your Elasticsearch cluster installed and configured and a Kibana instance pointing to that cluster in order to go on with this tutorial. Please refer to the official documentation for these two products in order to complete their installation (if you do

Exporting InfluxDB data to a CVS file

Sometimes you would need to export a sample of the data from an InfluxDB table to a CSV file (for example to allow a data scientist to do some offline analysis using a tool like Jupyter, Zeppelin or Spark Notebook). It is possible to perform this operation through the influx command line client. This is the general syntax: sudo /usr/bin/influx -database '<database_name>' -host '<hostname>' -username '<username>'  -password '<password>' -execute 'select_statement' -format '<format>' > <file_path>/<file_name>.csv where the format could be csv , json or column . Example: sudo /usr/bin/influx -database 'telegraf' -host 'localhost' -username 'admin'  -password '123456789' -execute 'select * from mem' -format 'csv' > /home/googlielmo/influxdb-export/mem-export.csv

Using Rapids cuDF in a Colab notebook

During last Spark+AI Summit Europe 2019 I had a chance to attend a talk from Miguel Martinez  who was presenting Rapids , the new Open Source framework from NVIDIA for GPU accelerated end-to-end Data Science and Analytics. Fig. 1 - Overview of the Rapids eco-system Rapids is a suite of Open Source libraries: cuDF cuML cuGraph cuXFilter I enjoied the presentation and liked the idea of this initiative, so I wanted to start playing with the Rapids libraries in Python on Colab , starting from cuDF, but the first attempt came with an issue that I eventually solved. So in this post I am going to share how I fixed it, with the hope it would be useful to someone else running into the same blocker. I am assuming here you are already familiar with Google Colab. I am using Python 3.x as Python 2 isn't supported by Rapids. Once you have created a new notebook in Colab, you need to check if the runtime for it is set to use Python 3 and uses a GPU as hardware accelerator. You