Monthly Archives: November 2012

Bright HDInsight Part 2: Smart compressed ouput

Following on from Bright HDInsight Part 1, which dealt with consuming a compressed input stream in a Map/Reduce job, we’ll now see how to extend this scenario to emit compressed data from the output of your Map/Reduce job.

Scenario

Again, the scenario is very much one of reducing overhead on the management, security and storage of your data. If you are to leave your resulting work at rest in a remote system, you should reduce its footprint as much as possible.

Reiteration of tradeoff

Remember that you are shifting an IO to a Compute bound problem – compression requires inflation prior to utilisation of the data. You should run metrics on this to see if you’re truly saving what you think you might be.

Command Line

Again, this is achieved by using an argument on the command line:

mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

Using the C# SDK for HDInsight

To do this, in your configure method, simply append the AdditionalGenericParameter as below;

config.AdditionalGenericArguments.Add(“-D \”mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec\”");

config.AdditionalGenericArguments.Add(“-D \”mapred.output.compress=true\”");

UPDATE: Alternatively, you can set the “CompressOutput” property to true on the config object, and the SDK will take care of this for you.

Happy clouding,

Andy

Bright HDInsight Part 1: Consume Compressed Input

This is the first of a series of quick tips on HDInsight (Hadoop on Azure), and deals with how to consume a compressed input stream.

The scenario

The HDInsight product and Big Data solutions in general by definition deal with large amounts of data. Data at rest incurs a cost, whether in terms of management of that data, security of it or simply the storing of it. Where possible, it is good practise to reduce the weight of data in a lossless way, without losing or compromising the data quality.

The standard way to do this is by using compression on the static data, where common strings are deflated from the static file and indexed so that they can be later repositioned in an inflation stage.

The problem with this for HDInsight or Hadoop in general is that the stream becomes a binary stream which it cannot access directly.

Configuring a Hadoop Job to read a Compressed input file

In order to configure a Hadoop Job to read the Compressed input, you simply have to specify a flag on the job command line. That flag is:

-D "io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec"
view raw inputflag.txt hosted with ❤ by GitHub

This causes an additional map task to be undertaken which loads the input as a GzipStream before your map/reduce job begins. NOTE: This can be a time consuming activity – if you’re planning on loading this file many times for parsing, your efficiency saving by doing this will be limited.

A 2Gb GZipped example file of mine was inflated to 11Gb, taking 20mins.

Filename dependency

If you try this approach, you might find a problem where the input strings to your Map job still seem to be binary. This is a sign of the stream not be inflated by the infrastructure. There is one last thing you must ensure in order to trigger the system to begin the inflation process – this is that the filename needs to be of the relevant extension. As an example, to use GZip, the filename must end with .gz such as “mylogfile.log.gz”

Using the HDInsight C# Streaming SDK to control Input Compression

In order to use the C# Streaming SDK with this flag, one simply modifies the Job.Configure override in order to add and additional generic argument specifying this flag.

//public override HadoopJobConfiguration Configure(ExecutorContext context)
HadoopJobConfiguration config = new HadoopJobConfiguration();
config.InputPath = "/input/data.log.gz";
config.OutputFolder = "/output/output" + DateTime.Now.ToString("yyyyMMddhhmmss");
config.AdditionalGenericArguments.Add("-D \"io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec\"");
return config;
view raw Job.cs hosted with ❤ by GitHub
You will find an additional Task specified in your JobTracker, which takes the stream and inflates for your runtime code.

A better way

If you can control your input stream (i.e. it is not provided by a third party), you should look at a better compression algorithm than the one shown here, Gzip. A better approach is to use LOZ, a splittable algorithm that allows you to better distribute your work. The Gzip algorithm must be processed in a sequential way, which makes it very hard to distribute the workload. LOZ (which is configued by using com.hadoop.compression.lzo.LzoCodec) is splittable, allowing better workload distribution.

Happy Hadoopy Cloudy Times :-)
Andy

 

BreakpointCounter

Unit Testing Hadoop on Azure (HDInsight) – Part 2: Logging Context

Hello again!

In this short post I’ll give a brief run through of tying together debugging options available in Hadoop, HDInsight and locally when using Microsoft’s HDInsight SDK.

Debugging and Logging

Telemetry is life – diagnostics are existence. Without telemetry on operational status, your applications whether local or remote could be in any state. It is not enough to assume a lack of error message is equivalent to the existence of a success message. To this end, Microsoft’s HDInsight SDK provides logging and debugging techniques in order that your Hadoop jobs can be monitored.

The most straightforward debugging technique is to utilise StreamingUnit, a debugging framework that exists within the Microsoft.Hadoop.MapReduce SDK. The framework allows in process debugging; step-through, breakpoints, immediates – the whole bag.

This debugging is an essential first step when building using this framework; but what do we do when we move beyond the local debugging environment and into the remote production (etc) hadoop environment?

Output Streams

The way the Microsoft.Hadoop.MapReduce SDK works is as a Streaming Job in Hadoop. This means that hadoop loads an executable by a shell invoke, and gathers the executable’s output from the Standard Output stream (stdout) and its logging and errors from the Standard Error stream (stderr).

This is equivalent to (in C#) writing your results in Console.WriteLine and your logs to Console.Error.WriteLine

Given this point, it becomes very important that the consumer of the HDInsight SDK, the author of the MapReduce job does not write spurious output to either of these streams. Indeed, a misplaced error handling such as try{ int i = 10/0; } catch (Exception ex) { Console.WriteLine(ex.ToString(); } can result in erroneous, multiline output being written where only results should be emitted.

In fact, in the story of HDInsight and Hadoop Stream Jobs in general, writing to the output streams should not be done directly at all. So how can it be done?

MapperContext and ReducerCombinerContext

MapperContext and ReducerCombinerContext are two classes that are passed in to the MapperBase.Map and ReducerCombinerBase.Reduce methods respectively. The classes derive from ContextBase, and this base class provides the correct way to write a log from the map/reduce job.

If we take our previous example of HelloWorldJob, we can expand our Mapper to emit a log if the conditional statement to detect whether the inputline matches the correct start of string is met.

We would change:

//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, ")) return;
view raw Original.cs hosted with ❤ by GitHub

to

//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, "))
{
context.Log(string.Format("The inputLine {0} is not in the correct format", inputLine));
return;
}
view raw Modified1.cs hosted with ❤ by GitHub

Using StreamingUnit, we can then modify our program to have invalid input (to cause a log), and then check that the log was correctly output by checking the StreamingUnit output

using System;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Map;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample
{
class Program
{
static void Main(string[] args)
{
var inputArray = new[]
{
"Hello, Andy",
"Hello, andy",
"Hello, why doesn't this work!",
"Hello, Andy",
"Hello, chickenface",
"Hello, Andy",
"Goodbye, Andy"
};
var output =
StreamingUnit.Execute<HelloWorldMapper, HelloWorldReducer>(inputArray);
Console.WriteLine("Map");
foreach (var mapperResult in output.MapperResult)
{
Console.WriteLine(mapperResult);
}
Console.WriteLine("MapLog");
foreach (var mapperLog in output.MapperLog)
{
Console.WriteLine(mapperLog);
}
Console.WriteLine("Reduce");
foreach (var reducerResult in output.ReducerResult)
{
Console.WriteLine(reducerResult);
}
Console.ReadLine();
}
}
}
view raw Program.cs hosted with ❤ by GitHub

The output can then be seen here

Map
Andy 1
Andy 1
Andy 1
andy 1
chickenface 1
why doesn't this work! 1
MapLog
The inputLine Goodbye, Andy is not in the correct format
Reduce
Andy 3
andy 1
chickenface 1
why doesn't this work! 1
view raw output.txt hosted with ❤ by GitHub

Counters

It is also possible to increment Hadoop counters by using the ContextBase derived classes and calling IncrementCounter. This provides a way of interfacing with the hadoop intrinsic “hadoop:counter:…..” format – these counters then are summed across execution of the hadoop job and reported on at job completion. This telemetry is exceptionally useful, and is the definitive state information that is so critical to understanding the operational status of a remote system.

In order to increment these counters, one simply calls the “IncrementCounter” method on the context. In our example, we can add a counter call to follow these formatting exceptions.

We will use a category of “RecoverableError”, a counter of “InputFormatIncorrect” and increment by 1.

using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job.Map
{
public class HelloWorldMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, "))
{
context.Log(string.Format("The inputLine {0} is not in the correct format", inputLine));
context.IncrementCounter("RecoverableError", "InputFormatIncorrect", 1);
return;
}
var key = inputLine.Substring(7);
if (key.EndsWith(".")) key = key.Trim('.');
context.EmitKeyValue(key, "1");//we are going to count instances, the value is irrelevant
}
}
}
view raw ModifiedIncrement.cs hosted with ❤ by GitHub

Happy big data-y clouding!

Andy

BreakpointHit

Unit Testing in Hadoop on Azure (HDInsight) – Part 1: StreamingUnit

Hey all,

Over the past months I’ve been working hard on Hadoop on Azure – now known as HDInsight – with our customers. Taking existing practises to this new world of Big Data is important to me, especially ones that I’d never abandon in the Big Compute and standard software engineering worlds. One such is unit testing and continuous integration. I’m going to start a series of posts on the tools available to aid unit testing in Hadoop, and how you can perfect your map/reduce jobs in isolation from your hadoop cluster, and guarantee their integrity during development.

This tutorial covers the Microsoft HDInsight .net SDK and examples are written in C#.

HDInsight testing

The basic premise of testing a HDInsight job is that given a known set of input, we’ll be presented with a mapped set that is integral and a reduced result that is expected. Ideally we want to be able to do this in process during development, so that we can debug as easily as possible and not have to rely on the infrastructure of Hadoop – the configuration and availability of which is not a develop-time concern. Furthermore, we want to be able to step through code and use the excellent tooling built into Visual Studio to be able to inspect runtime conditions. Finally, we want to be able to control the inputs to the system in a very robust way, in order that the testing is guaranteed a consistent input from which to assert consistent results; we want to be able to submit a job to a framework with known literal input.

We could use nUnit or MSTest to provide this testing framework, but we would be testing in isolation between the Mapper and Reducer classes. This has merit in its own regard, but there is complexity in that a MapperBase or ReducerCombinerBase method does not return a result value, instead it writes to a “context”. This context in reality is StdOut or StdErr console outputs, and to test this we would need to write code to interact with these streams. We could abstract our logic to return values from methods that the Map and Reduce classes simply marshal to the context, indeed this is a valid abstraction, but the further our abstraction moves from the runtime operation of the software the greater the need becomes for an integration test – a test that mimics a real runtime environment.

In the Microsoft.Hadoop.MapReduce framework (nuget http://nuget.org/packages/Microsoft.Hadoop.MapReduce/0.0.0.4), Microsoft provides StreamingUnit – a lightweight framework for testing a Hadoop Job written in .net in process on a development machine. StreamingUnit allows a developer to write Map Reduce jobs, and test their execution in Visual Studio.

Example Use of StreamingUnit

Firstly, we will start out with a blank Console Application. This is the project type used for creating Hadoop Jobs with the Microsoft HDInsight SDK.

Once we have this vanilla Console Application, we can add in the required assemblies through Nuget. Use the Package Manager window or the Manage Nuget Packages project context menu to add Microsoft.Hadoop.MapReduce (the version I am using is 0.0.0.4).

Once you have added the package, your can go right ahead and create a Mapper and Reducer class and a Job class. You might already have these if you have already begun development of your Hadoop Job, but I will produce some and include them as an attachment at the end of this post.

Once you have done this, your solution will look similar to this:

In our example, we are taking all the output from all the different example applications I’ve ever written and running a Hadoop query over them to count all the outputs I’ve written. It’s very common for example applications to follow the HelloWorld example, which after you write the simplest “print(‘Hello world’)” the next example is almost always a method with a signature “helloworld(string data)”, which outputs “Hello, ‘data’”. So my data sample will be “Hello, Andy”, “Hello, andy”, “Hello, why doesn’t this stupid thing work” etc etc. The output from the job will be a count of the different strings.

Lets implement that Map and Reduce logic.

Our Map extracts the Name of who said hello:

using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job.Map
{
public class HelloWorldMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
//example input: Hello, Andy
if (!inputLine.StartsWith("Hello, ")) return;
var key = inputLine.Substring(7);
if (key.EndsWith(".")) key = key.Trim('.');
context.EmitKeyValue(key, "1");//we are going to count instances, the value is irrelevant
}
}
}
view raw HelloWorldMapper.cs hosted with ❤ by GitHub

Our reducer will simply count the inputs to it.

using System.Collections.Generic;
using System.Linq;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce
{
public class HelloWorldReducer : ReducerCombinerBase
{
public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context)
{
context.EmitKeyValue(key, values.Count().ToString());//count instances of this key
}
}
}
view raw HelloWorldReducer.cs hosted with ❤ by GitHub

Additionally, we’ll build a Job class that links the two together:

using Elastacloud.Hadoop.StreamingUnitExample.Job.Map;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample.Job
{
public class HelloWorldJob : HadoopJob<HelloWorldMapper, HelloWorldReducer>
{
public override HadoopJobConfiguration Configure(ExecutorContext context)
{
return new HadoopJobConfiguration();//here you would normally set up some input ;-)
}
}
}
view raw HelloWorldJob.cs hosted with ❤ by GitHub

Normally we might expect a little more work to be undertaken in the Job class, it should define input and output locations, however for our demo this is not required.

Now we will use the Program class to define some simple input and execute the job with StreamingUnit.

using System;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Map;
using Elastacloud.Hadoop.StreamingUnitExample.Job.Reduce;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.StreamingUnitExample
{
class Program
{
static void Main(string[] args)
{
var inputArray = new[]
{
"Hello, Andy",
"Hello, andy",
"Hello, why doesn't this work!",
"Hello, Andy",
"Hello, chickenface",
"Hello, Andy"
};
var output =
StreamingUnit.Execute<HelloWorldMapper, HelloWorldReducer>(inputArray);
Console.WriteLine("Map");
foreach (var mapperResult in output.MapperResult)
{
Console.WriteLine(mapperResult);
}
Console.WriteLine("Reduce");
foreach (var reducerResult in output.ReducerResult)
{
Console.WriteLine(reducerResult);
}
Console.ReadLine();
}
}
}
view raw Program.cs hosted with ❤ by GitHub

The output for this shows first the values produced (sent to the context) by the Mapper then by the Reducer. More importantly, this can be debugged through by adding breakpoints and then running the executable.

The Program continues to list all Map output and then Reduce output by writing to the console.

We can make this file beautiful and searchable if this error is corrected: No commas found in this CSV file
Map
Andy 1
Andy 1
Andy 1
andy 1
chickenface 1
why doesn't this work! 1
Reduce
Andy 3
andy 1
chickenface 1
why doesn't this work! 1
view raw output.csv hosted with ❤ by GitHub

Next time, my blog focusses on more advanced approaches to unit testing, before a final post on Mocking and Unit Testing.

Here’s the source: Elastacloud.Hadoop.StreamingUnitExample (note, packages had to be deleted)

Happy cloudy big data-ing ;-)

Andy

Windows Azure Conf

hey all,

I’m back at the keyboard again after my session at http://www.windowsazureconf.net. It was a great experience, hats off to @BradyGaster for organising a great event. You can catch my talk right here:

I’ll be back this week with more technical posts.

Andy

 

Setting up a new blog

Hello everyone,

I’m setting up a new blog that can have some general thoughts outside of things related to Elastacloud. Elastacloud is doing well and we thought to reuse that blog to talk about things that are more related to the activities of the company … but that left me with nowhere to post my thoughts ;-)

I’m going to start adding some posts as soon as possible!

Best

Andy