Monthly Archives: December 2012

sleepingduringmap

Bright HDInsight Part 5: Better status reporting in HDInsight

Following on from earlier blogs in this series, in this blog I will show another way of getting telemetry out of HDInsight. We have previously tackled counters and logging to the correct context, and now I will show an additional type of logging that is useful while a job is in process; the reporting of a job’s status in process.

This status is a transient record of the state of a Map/Reduce job – it is not persisted beyond the execution of a job and is volatile. Its volatility is evident in that any subsequent set of this status will overwrite the previous value. Consequently, any inspection of this value will only reflect the state of the system when it was inspected, and may already have changed by the time the user gets access to the value it holds.

This does not make the status irrelevant, in fact it gives a glimpse at the system state in process and allows the viewer to immediately determine a recent state. This is useful in its own right.

Controlling the Status

The status of a Map/Reduce job is reported at the Map, Combine and Reduce stages of execution, and the individual Task can report its status.

This status control is not available in the Hadoop C# SDK but is simple to achieve using the Console.Error – see Part 2 of this series for why this stream is used!

In order that we can remain in sync with the general approach taken by the C# Hadoop SDK, we will wrap the call into an extension method on ContextBase, the common base class for Mapper, Combiner and Reducer. With this new method – which we will call SetStatus(string status) – we will write a well known format to the output that Hadoop picks up for its Streaming API, and Hadoop will then use this as the status of the job.

The format of the string is:
reporter:status:Hello, World!

This will set the task executing’s state to “Hello, World!”.

Implementing the Extension method

The extension method is very simple:

public static class ContextBaseExtension
{
public static void SetStatus(this ContextBase context, string statusMessage)
{
Console.Error.WriteLine("reporter:status:{0}", statusMessage);
}
}
view raw file1.cs hosted with ❤ by GitHub

Once we have this method referenced and included as a using include to our map/reduce/combine class, we can use the context parameter of our task (in my case I chose a Map), and set a status!

I chose a trivial and unrealisting sleep statement, so that I can see the message easily in the portal :-)

context.SetStatus("Sleeping During Map!");
Thread.Sleep(180000);
view raw file2.cs hosted with ❤ by GitHub

Viewing the output

To view this, you can enter the Hadoop portal and check the state of the running task. In this screenshot you can see that status is set to “Sleeping During Map!” – and you can see why I used Thread.Sleep – to give me a few moments to log onto the RDP server and take the screenshot. I choose 3 minutes so I could also have time to get a coffee …. ;-)

Production use

A better example, and a very useful use case for this is to report whether a Task is running successfully or whether it is encountering errors that have to be recovered from. In order to achieve this, we will catch any exceptions that can be recovered from, report that we are doing so and then continue execution.

Notice that this is a very similar map to our previous example with Counters, but that we are also setting a state of our Task that is in process.

using System;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.SampleDataMapReduceJob
{
public class SampleMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
try
{
context.IncrementCounter("Line Processed");
var segments = inputLine.Split("\t".ToCharArray(), StringSplitOptions.RemoveEmptyEntries);
context.IncrementCounter("country", segments[2], 1);
context.EmitKeyValue(segments[2], inputLine);
context.IncrementCounter("Text chars processed", inputLine.Length);
}
catch(IndexOutOfRangeException ex)
{
//we still allow other exceptions to throw and set and error state on the task but this
//exception type we are confident is due to the input not having >3 \t separated segments
context.IncrementCounter("Logged recoverable error", "Input Format Error", 1);
context.Log(string.Format("Input Format Error on line {0} in {1} - {2} was {3}", inputLine, context.InputFilename,
context.InputPartitionId, ex.ToString()));
context.SetStatus("Running with recoverable errors");
}
}
}
public static class ContextBaseExtension
{
public static void SetStatus(this ContextBase context, string statusMessage)
{
Console.Error.WriteLine("reporter:status:{0}", statusMessage);
}
}
}
view raw file3.cs hosted with ❤ by GitHub

Happy big-data-cloudarama ;-)
Andy

clickasv

Bright HDInsight Part 4: Blob Storage in Azure Storage Vault (examples in C#)

When using a HDInsight cluster, a key concern is sourcing the data on which you will run your Map/Reduce jobs. The load time to transfer the data for your Job into your Mapper or Reducer context must be as low as possible; the quality of a source can be ranked by this load wait – the data latency before a Map/Reduce job can commence.

Typically, you tend to store your data for your jobs within Hadoop’s Distributed FileSystem, HDFS. With HDFS, you are limited by the size of the storage attached to your HDInsight nodes. For instance, in the HadoopOnAzure.com preview, you are limited to 1.5Tb of data. Alternatively, you can use Azure Storage Vault and access up to 150 Tb in an Azure Storage account.

Azure Storage Vault

The Azure Storage Vault is a storage location, backed by Windows Azure blob Storage that can be addressed and accessed by Hadoop’s native processes. This is achieved by specifying a protocol scheme for the URI of the assets you are trying to access, ASV://. This ASV:// is synonymous with other storage accessing schemes such as file://, hdfs:// and s3://.

With ASV, you configure access to a given account and key in the HDInsight portal Manage section.

Once in this section, one simply begins the configuration process by entering the Account Name and Key and clicking Save:

Using ASV

Once you have configured the Account Name and Key, you can use the new storage provided by ASV by addressing using the ASV:// scheme. The format of these URIs are:

ASV://container/path/to/blob.ext

As you can see, the ASV:// is hardwired to the configured account name (and key) so that you don’t have to specify the account or authentication to access this account. It is a shortcut to http(s)://myaccount.blob.core.windows.net. Furthmore, since it encapsulates security, you don’t need to worry about the access to these blobs.

Benefits of ASV

There are many benefits of ASV, from exceptionally low storage costs (lower than S3 at the time of writing), to the ability to seemlessly provide geo-located redundancy on the files for added resilience. For me, as a user of the time limited (at the time of writing again!) HadoopOnAzure.com clusters, a really big benefit is that I don’t lose the files when the cluster is released, as I would do if they were stored on HDFS. Additional benefits to me include the ability to read and write to ASV and access those files immediately off the cluster in a variety of tools that I have gotten to know very well over the past few years, such as Cerebrata’s Cloud Storage Studio.

How to use ASV

It is exceptionally easy to configure your C# Map/Reduce tasks to use ASV, due to the way it has been designed. The approach is also equivalent and compatible with any other Streaming, Native or ancillary job creation technique.

To Use ASV in a C# application, first configure the ASV in the HDInsight portal as above, and then configure the HadoopJob to access that resource as the InputPath or OutputFolder locations.

public override HadoopJobConfiguration Configure(ExecutorContext context)
{
try
{
HadoopJobConfiguration config = new HadoopJobConfiguration();
config.InputPath = "asv://container/inputpath/";
config.OutputFolder = "asv://container/outputpath" + DateTime.Now.ToString("yyyyMMddhhmmss");
return config;
}
catch(Exception ex)
{
Console.WriteLine(ex.ToString());
}
return null;
}
view raw asv.cs hosted with ❤ by GitHub

As you can see, this configuration commands Hadoop to load from the configured ASV:// storage account container “container”, find any files in the /inputpath/ folder location and include them all as input files. You can also specify an individual file.

Similarly, the outputFolder is specified as a location that the HDInsight job should write the output of the Map/Reduce job to.

As a nice additional benefit, using ASV adds counters on the amount of bytes read and written by the Hadoop system using ASV, allowing you to track your usage of the data in your Storage Account.

All very simple stuff, but amazingly powerful.

Happy big-data-cloudification ;-)

Andy

customcategory

Bright HDInsight Part 3: Using custom Counters in HDInsight

Telemetry is life! I repeat this mantra over and over; with any system, but especially with remote systems, the state of the system is dificult or impossible to ascertain without metrics on its internal processing. Computer systems operate outside the scope of human comprehension – they’re too quick, complex and transient for us to ever be able to confidently know their state in process. The best we can do is emit metrics and provide a way to view these metrics to judge a general sense of progress.

Metrics in HDInsight

The metrics I will present here relate to HDInsight and the Hadoop Streaming API, presented in C#. It is possible to access the same counters from other programmatic interfaces to HDInsight as they are a core Hadoop feature.

These metrics shouldn’t be used for data gathering as that is not their purpose. You should use them to track system state and not system result. However this line is a thin line ;-) For instance, if we know there are 100 million rows for data pertaining to “France” and 100 million rows for data pertaining to “UK” and these are across multiple files and partitions then we might want a metric which reports the progress across these two data aspects. In practice however, this type of scenario (progress through a job) is better measured without reverting to measuring data directly.

Typically we also want the ability to group similar metrics together in a category for easier reporting, and I shall show an example of that.

Scenario

The example data used here is randomly generated strings with a data identifier, action type and a country of origin. This slim 3 field file will be mapped to select the country of origin as the key and reduced to count everything by country of origin.

823708 rz=q UK
806439 rz=q UK
473709 sf=21 France
713282 wt.p=n UK
356149 sf=1 UK
595722 wt.p=n France
238589 sf=1 France
478163 sf=21 France
971029 rz=q France
……10000 rows…..

Mapper

This example shows how to add the counters to the Map class of the Map/Reduce job.

using System;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.SampleDataMapReduceJob
{
public class SampleMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
try
{
context.IncrementCounter("Line Processed");
var segments = inputLine.Split("\t".ToCharArray(), StringSplitOptions.RemoveEmptyEntries);
context.IncrementCounter("country", segments[2], 1);
context.EmitKeyValue(segments[2], inputLine);
context.IncrementCounter("Text chars processed", inputLine.Length);
}
catch(IndexOutOfRangeException ex)
{
//we still allow other exceptions to throw and set and error state on the task but this
//exception type we are confident is due to the input not having >3 separated segments
context.IncrementCounter("Logged recoverable error", "Input Format Error", 1);
context.Log(string.Format("Input Format Error on line {0} in {1} - {2} was {3}", inputLine, context.InputFilename,
context.InputPartitionId, ex.ToString()));
}
}
}
}
view raw SampleMapper-edit.cs hosted with ❤ by GitHub

Here we can see that we are using the parameter “context” to interact with the underlying Hadoop runtime. context.IncrementCounter is the key operation we are calling, using its underlying stderr output to write out in the format:
“reporter:counter:{0},{1},{2}”, category, counterName, increment

We are using this method’s overloads in different ways; to increment a country simply by name, to increment a counter my name and category and to increment by with a custom increment.

We are free to add these counters to any point of the map/reduce program in order that we can gain telemetry of our job as it progresses.

Viewing the counters

In order to view the counters, visit the Hadoop Job Tracking portal. The Hadoop console output will contain the details for your Streaming Job job id, for example for me it was http://10.174.120.28:50030/jobdetails.jsp?jobid=job_201212091755_0001, reported in the output as:

12/12/09 18:01:37 INFO streaming.StreamJob: Tracking URL: http://10.174.120.28:50030/jobdetails.jsp?jobid=job_201212091755_0001

Since I used two counters that were not given a category, they appear in the “Custom” category:

Another of my counters was given a custom category, and thus it appears in a separate section of the counters table:

In my next posts, I will focus on error handling, status and more data centric operations.

Happy big-data-cloudifying ;-)
Andy