Tag Archives: HDInsight

sleepingduringmap

Bright HDInsight Part 5: Better status reporting in HDInsight

Following on from earlier blogs in this series, in this blog I will show another way of getting telemetry out of HDInsight. We have previously tackled counters and logging to the correct context, and now I will show an additional type of logging that is useful while a job is in process; the reporting of a job’s status in process.

This status is a transient record of the state of a Map/Reduce job – it is not persisted beyond the execution of a job and is volatile. Its volatility is evident in that any subsequent set of this status will overwrite the previous value. Consequently, any inspection of this value will only reflect the state of the system when it was inspected, and may already have changed by the time the user gets access to the value it holds.

This does not make the status irrelevant, in fact it gives a glimpse at the system state in process and allows the viewer to immediately determine a recent state. This is useful in its own right.

Controlling the Status

The status of a Map/Reduce job is reported at the Map, Combine and Reduce stages of execution, and the individual Task can report its status.

This status control is not available in the Hadoop C# SDK but is simple to achieve using the Console.Error – see Part 2 of this series for why this stream is used!

In order that we can remain in sync with the general approach taken by the C# Hadoop SDK, we will wrap the call into an extension method on ContextBase, the common base class for Mapper, Combiner and Reducer. With this new method – which we will call SetStatus(string status) – we will write a well known format to the output that Hadoop picks up for its Streaming API, and Hadoop will then use this as the status of the job.

The format of the string is:
reporter:status:Hello, World!

This will set the task executing’s state to “Hello, World!”.

Implementing the Extension method

The extension method is very simple:

public static class ContextBaseExtension
{
public static void SetStatus(this ContextBase context, string statusMessage)
{
Console.Error.WriteLine("reporter:status:{0}", statusMessage);
}
}
view raw file1.cs hosted with ❤ by GitHub

Once we have this method referenced and included as a using include to our map/reduce/combine class, we can use the context parameter of our task (in my case I chose a Map), and set a status!

I chose a trivial and unrealisting sleep statement, so that I can see the message easily in the portal :-)

context.SetStatus("Sleeping During Map!");
Thread.Sleep(180000);
view raw file2.cs hosted with ❤ by GitHub

Viewing the output

To view this, you can enter the Hadoop portal and check the state of the running task. In this screenshot you can see that status is set to “Sleeping During Map!” – and you can see why I used Thread.Sleep – to give me a few moments to log onto the RDP server and take the screenshot. I choose 3 minutes so I could also have time to get a coffee …. ;-)

Production use

A better example, and a very useful use case for this is to report whether a Task is running successfully or whether it is encountering errors that have to be recovered from. In order to achieve this, we will catch any exceptions that can be recovered from, report that we are doing so and then continue execution.

Notice that this is a very similar map to our previous example with Counters, but that we are also setting a state of our Task that is in process.

using System;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.SampleDataMapReduceJob
{
public class SampleMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
try
{
context.IncrementCounter("Line Processed");
var segments = inputLine.Split("\t".ToCharArray(), StringSplitOptions.RemoveEmptyEntries);
context.IncrementCounter("country", segments[2], 1);
context.EmitKeyValue(segments[2], inputLine);
context.IncrementCounter("Text chars processed", inputLine.Length);
}
catch(IndexOutOfRangeException ex)
{
//we still allow other exceptions to throw and set and error state on the task but this
//exception type we are confident is due to the input not having >3 \t separated segments
context.IncrementCounter("Logged recoverable error", "Input Format Error", 1);
context.Log(string.Format("Input Format Error on line {0} in {1} - {2} was {3}", inputLine, context.InputFilename,
context.InputPartitionId, ex.ToString()));
context.SetStatus("Running with recoverable errors");
}
}
}
public static class ContextBaseExtension
{
public static void SetStatus(this ContextBase context, string statusMessage)
{
Console.Error.WriteLine("reporter:status:{0}", statusMessage);
}
}
}
view raw file3.cs hosted with ❤ by GitHub

Happy big-data-cloudarama ;-)
Andy

BreakpointCounter

Unit Testing Hadoop on Azure (HDInsight) – Part 2: Logging Context

Hello again!

In this short post I’ll give a brief run through of tying together debugging options available in Hadoop, HDInsight and locally when using Microsoft’s HDInsight SDK.

Debugging and Logging

Telemetry is life – diagnostics are existence. Without telemetry on operational status, your applications whether local or remote could be in any state. It is not enough to assume a lack of error message is equivalent to the existence of a success message. To this end, Microsoft’s HDInsight SDK provides logging and debugging techniques in order that your Hadoop jobs can be monitored.

The most straightforward debugging technique is to utilise StreamingUnit, a debugging framework that exists within the Microsoft.Hadoop.MapReduce SDK. The framework allows in process debugging; step-through, breakpoints, immediates – the whole bag.

This debugging is an essential first step when building using this framework; but what do we do when we move beyond the local debugging environment and into the remote production (etc) hadoop environment?

Output Streams

The way the Microsoft.Hadoop.MapReduce SDK works is as a Streaming Job in Hadoop. This means that hadoop loads an executable by a shell invoke, and gathers the executable’s output from the Standard Output stream (stdout) and its logging and errors from the Standard Error stream (stderr).

This is equivalent to (in C#) writing your results in Console.WriteLine and your logs to Console.Error.WriteLine

Given this point, it becomes very important that the consumer of the HDInsight SDK, the author of the MapReduce job does not write spurious output to either of these streams. Indeed, a misplaced error handling such as try{ int i = 10/0; } catch (Exception ex) { Console.WriteLine(ex.ToString(); } can result in erroneous, multiline output being written where only results should be emitted.

In fact, in the story of HDInsight and Hadoop Stream Jobs in general, writing to the output streams should not be done directly at all. So how can it be done?

MapperContext and ReducerCombinerContext

MapperContext and ReducerCombinerContext are two classes that are passed in to the MapperBase.Map and ReducerCombinerBase.Reduce methods respectively. The classes derive from ContextBase, and this base class provides the correct way to write a log from the map/reduce job.

If we take our previous example of HelloWorldJob, we can expand our Mapper to emit a log if the conditional statement to detect whether the inputline matches the correct start of string is met.

We would change:

//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, ")) return;
view raw Original.cs hosted with ❤ by GitHub

to

//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, "))
{
context.Log(string.Format("The inputLine {0} is not in the correct format", inputLine));
return;
}
view raw Modified1.cs hosted with ❤ by GitHub

Using StreamingUnit, we can then modify our program to have invalid input (to cause a log), and then check that the log was correctly output by checking the StreamingUnit output

using System;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Map;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample
{
class Program
{
static void Main(string[] args)
{
var inputArray = new[]
{
"Hello, Andy",
"Hello, andy",
"Hello, why doesn't this work!",
"Hello, Andy",
"Hello, chickenface",
"Hello, Andy",
"Goodbye, Andy"
};
var output =
StreamingUnit.Execute<HelloWorldMapper, HelloWorldReducer>(inputArray);
Console.WriteLine("Map");
foreach (var mapperResult in output.MapperResult)
{
Console.WriteLine(mapperResult);
}
Console.WriteLine("MapLog");
foreach (var mapperLog in output.MapperLog)
{
Console.WriteLine(mapperLog);
}
Console.WriteLine("Reduce");
foreach (var reducerResult in output.ReducerResult)
{
Console.WriteLine(reducerResult);
}
Console.ReadLine();
}
}
}
view raw Program.cs hosted with ❤ by GitHub

The output can then be seen here

Map
Andy 1
Andy 1
Andy 1
andy 1
chickenface 1
why doesn't this work! 1
MapLog
The inputLine Goodbye, Andy is not in the correct format
Reduce
Andy 3
andy 1
chickenface 1
why doesn't this work! 1
view raw output.txt hosted with ❤ by GitHub

Counters

It is also possible to increment Hadoop counters by using the ContextBase derived classes and calling IncrementCounter. This provides a way of interfacing with the hadoop intrinsic “hadoop:counter:…..” format – these counters then are summed across execution of the hadoop job and reported on at job completion. This telemetry is exceptionally useful, and is the definitive state information that is so critical to understanding the operational status of a remote system.

In order to increment these counters, one simply calls the “IncrementCounter” method on the context. In our example, we can add a counter call to follow these formatting exceptions.

We will use a category of “RecoverableError”, a counter of “InputFormatIncorrect” and increment by 1.

using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job.Map
{
public class HelloWorldMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, "))
{
context.Log(string.Format("The inputLine {0} is not in the correct format", inputLine));
context.IncrementCounter("RecoverableError", "InputFormatIncorrect", 1);
return;
}
var key = inputLine.Substring(7);
if (key.EndsWith(".")) key = key.Trim('.');
context.EmitKeyValue(key, "1");//we are going to count instances, the value is irrelevant
}
}
}
view raw ModifiedIncrement.cs hosted with ❤ by GitHub

Happy big data-y clouding!

Andy

BreakpointHit

Unit Testing in Hadoop on Azure (HDInsight) – Part 1: StreamingUnit

Hey all,

Over the past months I’ve been working hard on Hadoop on Azure – now known as HDInsight – with our customers. Taking existing practises to this new world of Big Data is important to me, especially ones that I’d never abandon in the Big Compute and standard software engineering worlds. One such is unit testing and continuous integration. I’m going to start a series of posts on the tools available to aid unit testing in Hadoop, and how you can perfect your map/reduce jobs in isolation from your hadoop cluster, and guarantee their integrity during development.

This tutorial covers the Microsoft HDInsight .net SDK and examples are written in C#.

HDInsight testing

The basic premise of testing a HDInsight job is that given a known set of input, we’ll be presented with a mapped set that is integral and a reduced result that is expected. Ideally we want to be able to do this in process during development, so that we can debug as easily as possible and not have to rely on the infrastructure of Hadoop – the configuration and availability of which is not a develop-time concern. Furthermore, we want to be able to step through code and use the excellent tooling built into Visual Studio to be able to inspect runtime conditions. Finally, we want to be able to control the inputs to the system in a very robust way, in order that the testing is guaranteed a consistent input from which to assert consistent results; we want to be able to submit a job to a framework with known literal input.

We could use nUnit or MSTest to provide this testing framework, but we would be testing in isolation between the Mapper and Reducer classes. This has merit in its own regard, but there is complexity in that a MapperBase or ReducerCombinerBase method does not return a result value, instead it writes to a “context”. This context in reality is StdOut or StdErr console outputs, and to test this we would need to write code to interact with these streams. We could abstract our logic to return values from methods that the Map and Reduce classes simply marshal to the context, indeed this is a valid abstraction, but the further our abstraction moves from the runtime operation of the software the greater the need becomes for an integration test – a test that mimics a real runtime environment.

In the Microsoft.Hadoop.MapReduce framework (nuget http://nuget.org/packages/Microsoft.Hadoop.MapReduce/0.0.0.4), Microsoft provides StreamingUnit – a lightweight framework for testing a Hadoop Job written in .net in process on a development machine. StreamingUnit allows a developer to write Map Reduce jobs, and test their execution in Visual Studio.

Example Use of StreamingUnit

Firstly, we will start out with a blank Console Application. This is the project type used for creating Hadoop Jobs with the Microsoft HDInsight SDK.

Once we have this vanilla Console Application, we can add in the required assemblies through Nuget. Use the Package Manager window or the Manage Nuget Packages project context menu to add Microsoft.Hadoop.MapReduce (the version I am using is 0.0.0.4).

Once you have added the package, your can go right ahead and create a Mapper and Reducer class and a Job class. You might already have these if you have already begun development of your Hadoop Job, but I will produce some and include them as an attachment at the end of this post.

Once you have done this, your solution will look similar to this:

In our example, we are taking all the output from all the different example applications I’ve ever written and running a Hadoop query over them to count all the outputs I’ve written. It’s very common for example applications to follow the HelloWorld example, which after you write the simplest “print(‘Hello world’)” the next example is almost always a method with a signature “helloworld(string data)”, which outputs “Hello, ‘data’”. So my data sample will be “Hello, Andy”, “Hello, andy”, “Hello, why doesn’t this stupid thing work” etc etc. The output from the job will be a count of the different strings.

Lets implement that Map and Reduce logic.

Our Map extracts the Name of who said hello:

using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job.Map
{
public class HelloWorldMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, ")) return;
var key = inputLine.Substring(7);
if (key.EndsWith(".")) key = key.Trim('.');
context.EmitKeyValue(key, "1");//we are going to count instances, the value is irrelevant
}
}
}
view raw HelloWorldMapper.cs hosted with ❤ by GitHub

Our reducer will simply count the inputs to it.

using System.Collections.Generic;
using System.Linq;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce
{
public class HelloWorldReducer : ReducerCombinerBase
{
public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context)
{
context.EmitKeyValue(key, values.Count().ToString());//count instances of this key
}
}
}
view raw HelloWorldReducer.cs hosted with ❤ by GitHub

Additionally, we’ll build a Job class that links the two together:

using Elastacloud.Hadoop.StreamingUnitExample.Job.Map;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job
{
public class HelloWorldJob : HadoopJob<HelloWorldMapper, HelloWorldReducer>
{
public override HadoopJobConfiguration Configure(ExecutorContext context)
{
return new HadoopJobConfiguration();//here you would normally set up some input ;-)
}
}
}
view raw HelloWorldJob.cs hosted with ❤ by GitHub

Normally we might expect a little more work to be undertaken in the Job class, it should define input and output locations, however for our demo this is not required.

Now we will use the Program class to define some simple input and execute the job with StreamingUnit.

using System;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Map;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample
{
class Program
{
static void Main(string[] args)
{
var inputArray = new[]
{
"Hello, Andy",
"Hello, andy",
"Hello, why doesn't this work!",
"Hello, Andy",
"Hello, chickenface",
"Hello, Andy"
};
var output =
StreamingUnit.Execute<HelloWorldMapper, HelloWorldReducer>(inputArray);
Console.WriteLine("Map");
foreach (var mapperResult in output.MapperResult)
{
Console.WriteLine(mapperResult);
}
Console.WriteLine("Reduce");
foreach (var reducerResult in output.ReducerResult)
{
Console.WriteLine(reducerResult);
}
Console.ReadLine();
}
}
}
view raw Program.cs hosted with ❤ by GitHub

The output for this shows first the values produced (sent to the context) by the Mapper then by the Reducer. More importantly, this can be debugged through by adding breakpoints and then running the executable.

The Program continues to list all Map output and then Reduce output by writing to the console.

We can make this file beautiful and searchable if this error is corrected: No commas found in this CSV file
Map
Andy 1
Andy 1
Andy 1
andy 1
chickenface 1
why doesn't this work! 1
Reduce
Andy 3
andy 1
chickenface 1
why doesn't this work! 1
view raw output.csv hosted with ❤ by GitHub

Next time, my blog focusses on more advanced approaches to unit testing, before a final post on Mocking and Unit Testing.

Here’s the source: Elastacloud.Hadoop.StreamingUnitExample (note, packages had to be deleted)

Happy cloudy big data-ing ;-)

Andy