BreakpointCounter

Unit Testing Hadoop on Azure (HDInsight) – Part 2: Logging Context

Hello again!

In this short post I’ll give a brief run through of tying together debugging options available in Hadoop, HDInsight and locally when using Microsoft’s HDInsight SDK.

Debugging and Logging

Telemetry is life – diagnostics are existence. Without telemetry on operational status, your applications whether local or remote could be in any state. It is not enough to assume a lack of error message is equivalent to the existence of a success message. To this end, Microsoft’s HDInsight SDK provides logging and debugging techniques in order that your Hadoop jobs can be monitored.

The most straightforward debugging technique is to utilise StreamingUnit, a debugging framework that exists within the Microsoft.Hadoop.MapReduce SDK. The framework allows in process debugging; step-through, breakpoints, immediates – the whole bag.

This debugging is an essential first step when building using this framework; but what do we do when we move beyond the local debugging environment and into the remote production (etc) hadoop environment?

Output Streams

The way the Microsoft.Hadoop.MapReduce SDK works is as a Streaming Job in Hadoop. This means that hadoop loads an executable by a shell invoke, and gathers the executable’s output from the Standard Output stream (stdout) and its logging and errors from the Standard Error stream (stderr).

This is equivalent to (in C#) writing your results in Console.WriteLine and your logs to Console.Error.WriteLine

Given this point, it becomes very important that the consumer of the HDInsight SDK, the author of the MapReduce job does not write spurious output to either of these streams. Indeed, a misplaced error handling such as try{ int i = 10/0; } catch (Exception ex) { Console.WriteLine(ex.ToString(); } can result in erroneous, multiline output being written where only results should be emitted.

In fact, in the story of HDInsight and Hadoop Stream Jobs in general, writing to the output streams should not be done directly at all. So how can it be done?

MapperContext and ReducerCombinerContext

MapperContext and ReducerCombinerContext are two classes that are passed in to the MapperBase.Map and ReducerCombinerBase.Reduce methods respectively. The classes derive from ContextBase, and this base class provides the correct way to write a log from the map/reduce job.

If we take our previous example of HelloWorldJob, we can expand our Mapper to emit a log if the conditional statement to detect whether the inputline matches the correct start of string is met.

We would change:

//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, ")) return;
view raw Original.cs hosted with ❤ by GitHub

to

//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, "))
{
context.Log(string.Format("The inputLine {0} is not in the correct format", inputLine));
return;
}
view raw Modified1.cs hosted with ❤ by GitHub

Using StreamingUnit, we can then modify our program to have invalid input (to cause a log), and then check that the log was correctly output by checking the StreamingUnit output

using System;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Map;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample
{
class Program
{
static void Main(string[] args)
{
var inputArray = new[]
{
"Hello, Andy",
"Hello, andy",
"Hello, why doesn't this work!",
"Hello, Andy",
"Hello, chickenface",
"Hello, Andy",
"Goodbye, Andy"
};
var output =
StreamingUnit.Execute<HelloWorldMapper, HelloWorldReducer>(inputArray);
Console.WriteLine("Map");
foreach (var mapperResult in output.MapperResult)
{
Console.WriteLine(mapperResult);
}
Console.WriteLine("MapLog");
foreach (var mapperLog in output.MapperLog)
{
Console.WriteLine(mapperLog);
}
Console.WriteLine("Reduce");
foreach (var reducerResult in output.ReducerResult)
{
Console.WriteLine(reducerResult);
}
Console.ReadLine();
}
}
}
view raw Program.cs hosted with ❤ by GitHub

The output can then be seen here

Map
Andy 1
Andy 1
Andy 1
andy 1
chickenface 1
why doesn't this work! 1
MapLog
The inputLine Goodbye, Andy is not in the correct format
Reduce
Andy 3
andy 1
chickenface 1
why doesn't this work! 1
view raw output.txt hosted with ❤ by GitHub

Counters

It is also possible to increment Hadoop counters by using the ContextBase derived classes and calling IncrementCounter. This provides a way of interfacing with the hadoop intrinsic “hadoop:counter:…..” format – these counters then are summed across execution of the hadoop job and reported on at job completion. This telemetry is exceptionally useful, and is the definitive state information that is so critical to understanding the operational status of a remote system.

In order to increment these counters, one simply calls the “IncrementCounter” method on the context. In our example, we can add a counter call to follow these formatting exceptions.

We will use a category of “RecoverableError”, a counter of “InputFormatIncorrect” and increment by 1.

using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job.Map
{
public class HelloWorldMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, "))
{
context.Log(string.Format("The inputLine {0} is not in the correct format", inputLine));
context.IncrementCounter("RecoverableError", "InputFormatIncorrect", 1);
return;
}
var key = inputLine.Substring(7);
if (key.EndsWith(".")) key = key.Trim('.');
context.EmitKeyValue(key, "1");//we are going to count instances, the value is irrelevant
}
}
}
view raw ModifiedIncrement.cs hosted with ❤ by GitHub

Happy big data-y clouding!

Andy

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>