Category Archives: Uncategorized

Bright HDInsight Part 6: Take advantage of your console context

The Microsoft.Hadoop.MapReduce SDK for HDInsight (Hadoop on Azure) requires you to package your map reduce job in a windows console application that is then used by the Hadoop Streaming API in order to run your logic. This gives a very nature way to configure and orient your application at startup, which this blog will give a simple example of.

Console Arguments

The natural requirement to explore is the supply of runtime arguments to the Map Reduce console application (hereafter, package), which can be used for configuration purposes. We may use them to manually specify additional generic parameters, input or output locations or control the type of job being run. Consider the input location purpose; we may want to write a single Map Reduce job, but then be able to run that job against different datasets, differententiated by their storage location.

A very natural way to do this for anyone familiar with a commandshell is that you may wish to run:

mypackage.exe /path/to/input.txt

This would specify a path to an input location on which the application could work. In order to achieve this with the SDK, given that we have parsed and verified the input, we can then include the arguments that we are interested in as a parameter to the HadoopJobExecutor.ExecuteJob(string[] args) method.

Once we have done that, in our TJobClass (subclass of HadoopJob) Configure method, we are provided with a ExecutorContext class. This ExecutorContext has a single Property “Arguments”.

This Property is the string[] parameter that we passed into the ExecuteJob method. We can then use this to configure the Job during setup:

public override HadoopJobConfiguration Configure(ExecutorContext context)
{
var hadoopConfiguration = new HadoopJobConfiguration();
hadoopConfiguration.InputPath = context.Arguments[0];
hadoopConfiguration.OutputFolder = “asv://output/” + DateTime.Now.Ticks.ToString(CultureInfo.InvariantCulture);

return hadoopConfiguration;
}

Conclusion

Use your console context well. Further examples of what you could do: capture the output of the package host to a text file with > or >> commands; pipe the results to another program using |; chain execution together with a batch file; schedule execution with the Windows Scheduler.

Happy cloudy-big-dataification (and happy new year),
Andy

clickasv

Bright HDInsight Part 4: Blob Storage in Azure Storage Vault (examples in C#)

When using a HDInsight cluster, a key concern is sourcing the data on which you will run your Map/Reduce jobs. The load time to transfer the data for your Job into your Mapper or Reducer context must be as low as possible; the quality of a source can be ranked by this load wait – the data latency before a Map/Reduce job can commence.

Typically, you tend to store your data for your jobs within Hadoop’s Distributed FileSystem, HDFS. With HDFS, you are limited by the size of the storage attached to your HDInsight nodes. For instance, in the HadoopOnAzure.com preview, you are limited to 1.5Tb of data. Alternatively, you can use Azure Storage Vault and access up to 150 Tb in an Azure Storage account.

Azure Storage Vault

The Azure Storage Vault is a storage location, backed by Windows Azure blob Storage that can be addressed and accessed by Hadoop’s native processes. This is achieved by specifying a protocol scheme for the URI of the assets you are trying to access, ASV://. This ASV:// is synonymous with other storage accessing schemes such as file://, hdfs:// and s3://.

With ASV, you configure access to a given account and key in the HDInsight portal Manage section.

Once in this section, one simply begins the configuration process by entering the Account Name and Key and clicking Save:

Using ASV

Once you have configured the Account Name and Key, you can use the new storage provided by ASV by addressing using the ASV:// scheme. The format of these URIs are:

ASV://container/path/to/blob.ext

As you can see, the ASV:// is hardwired to the configured account name (and key) so that you don’t have to specify the account or authentication to access this account. It is a shortcut to http(s)://myaccount.blob.core.windows.net. Furthmore, since it encapsulates security, you don’t need to worry about the access to these blobs.

Benefits of ASV

There are many benefits of ASV, from exceptionally low storage costs (lower than S3 at the time of writing), to the ability to seemlessly provide geo-located redundancy on the files for added resilience. For me, as a user of the time limited (at the time of writing again!) HadoopOnAzure.com clusters, a really big benefit is that I don’t lose the files when the cluster is released, as I would do if they were stored on HDFS. Additional benefits to me include the ability to read and write to ASV and access those files immediately off the cluster in a variety of tools that I have gotten to know very well over the past few years, such as Cerebrata’s Cloud Storage Studio.

How to use ASV

It is exceptionally easy to configure your C# Map/Reduce tasks to use ASV, due to the way it has been designed. The approach is also equivalent and compatible with any other Streaming, Native or ancillary job creation technique.

To Use ASV in a C# application, first configure the ASV in the HDInsight portal as above, and then configure the HadoopJob to access that resource as the InputPath or OutputFolder locations.

public override HadoopJobConfiguration Configure(ExecutorContext context)
{
try
{
HadoopJobConfiguration config = new HadoopJobConfiguration();
config.InputPath = "asv://container/inputpath/";
config.OutputFolder = "asv://container/outputpath" + DateTime.Now.ToString("yyyyMMddhhmmss");
return config;
}
catch(Exception ex)
{
Console.WriteLine(ex.ToString());
}
return null;
}
view raw asv.cs hosted with ❤ by GitHub

As you can see, this configuration commands Hadoop to load from the configured ASV:// storage account container “container”, find any files in the /inputpath/ folder location and include them all as input files. You can also specify an individual file.

Similarly, the outputFolder is specified as a location that the HDInsight job should write the output of the Map/Reduce job to.

As a nice additional benefit, using ASV adds counters on the amount of bytes read and written by the Hadoop system using ASV, allowing you to track your usage of the data in your Storage Account.

All very simple stuff, but amazingly powerful.

Happy big-data-cloudification ;-)

Andy

customcategory

Bright HDInsight Part 3: Using custom Counters in HDInsight

Telemetry is life! I repeat this mantra over and over; with any system, but especially with remote systems, the state of the system is dificult or impossible to ascertain without metrics on its internal processing. Computer systems operate outside the scope of human comprehension – they’re too quick, complex and transient for us to ever be able to confidently know their state in process. The best we can do is emit metrics and provide a way to view these metrics to judge a general sense of progress.

Metrics in HDInsight

The metrics I will present here relate to HDInsight and the Hadoop Streaming API, presented in C#. It is possible to access the same counters from other programmatic interfaces to HDInsight as they are a core Hadoop feature.

These metrics shouldn’t be used for data gathering as that is not their purpose. You should use them to track system state and not system result. However this line is a thin line ;-) For instance, if we know there are 100 million rows for data pertaining to “France” and 100 million rows for data pertaining to “UK” and these are across multiple files and partitions then we might want a metric which reports the progress across these two data aspects. In practice however, this type of scenario (progress through a job) is better measured without reverting to measuring data directly.

Typically we also want the ability to group similar metrics together in a category for easier reporting, and I shall show an example of that.

Scenario

The example data used here is randomly generated strings with a data identifier, action type and a country of origin. This slim 3 field file will be mapped to select the country of origin as the key and reduced to count everything by country of origin.

823708 rz=q UK
806439 rz=q UK
473709 sf=21 France
713282 wt.p=n UK
356149 sf=1 UK
595722 wt.p=n France
238589 sf=1 France
478163 sf=21 France
971029 rz=q France
……10000 rows…..

Mapper

This example shows how to add the counters to the Map class of the Map/Reduce job.

using System;
using Microsoft.Hadoop.MapReduce;
namespace Elastacloud.Hadoop.SampleDataMapReduceJob
{
public class SampleMapper : MapperBase
{
public override void Map(string inputLine, MapperContext context)
{
try
{
context.IncrementCounter("Line Processed");
var segments = inputLine.Split("\t".ToCharArray(), StringSplitOptions.RemoveEmptyEntries);
context.IncrementCounter("country", segments[2], 1);
context.EmitKeyValue(segments[2], inputLine);
context.IncrementCounter("Text chars processed", inputLine.Length);
}
catch(IndexOutOfRangeException ex)
{
//we still allow other exceptions to throw and set and error state on the task but this
//exception type we are confident is due to the input not having >3 separated segments
context.IncrementCounter("Logged recoverable error", "Input Format Error", 1);
context.Log(string.Format("Input Format Error on line {0} in {1} - {2} was {3}", inputLine, context.InputFilename,
context.InputPartitionId, ex.ToString()));
}
}
}
}
view raw SampleMapper-edit.cs hosted with ❤ by GitHub

Here we can see that we are using the parameter “context” to interact with the underlying Hadoop runtime. context.IncrementCounter is the key operation we are calling, using its underlying stderr output to write out in the format:
“reporter:counter:{0},{1},{2}”, category, counterName, increment

We are using this method’s overloads in different ways; to increment a country simply by name, to increment a counter my name and category and to increment by with a custom increment.

We are free to add these counters to any point of the map/reduce program in order that we can gain telemetry of our job as it progresses.

Viewing the counters

In order to view the counters, visit the Hadoop Job Tracking portal. The Hadoop console output will contain the details for your Streaming Job job id, for example for me it was http://10.174.120.28:50030/jobdetails.jsp?jobid=job_201212091755_0001, reported in the output as:

12/12/09 18:01:37 INFO streaming.StreamJob: Tracking URL: http://10.174.120.28:50030/jobdetails.jsp?jobid=job_201212091755_0001

Since I used two counters that were not given a category, they appear in the “Custom” category:

Another of my counters was given a custom category, and thus it appears in a separate section of the counters table:

In my next posts, I will focus on error handling, status and more data centric operations.

Happy big-data-cloudifying ;-)
Andy

Bright HDInsight Part 2: Smart compressed ouput

Following on from Bright HDInsight Part 1, which dealt with consuming a compressed input stream in a Map/Reduce job, we’ll now see how to extend this scenario to emit compressed data from the output of your Map/Reduce job.

Scenario

Again, the scenario is very much one of reducing overhead on the management, security and storage of your data. If you are to leave your resulting work at rest in a remote system, you should reduce its footprint as much as possible.

Reiteration of tradeoff

Remember that you are shifting an IO to a Compute bound problem – compression requires inflation prior to utilisation of the data. You should run metrics on this to see if you’re truly saving what you think you might be.

Command Line

Again, this is achieved by using an argument on the command line:

mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

Using the C# SDK for HDInsight

To do this, in your configure method, simply append the AdditionalGenericParameter as below;

config.AdditionalGenericArguments.Add(“-D \”mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec\”");

config.AdditionalGenericArguments.Add(“-D \”mapred.output.compress=true\”");

UPDATE:¬†Alternatively, you can set the “CompressOutput” property to true on the config object, and the SDK will take care of this for you.

Happy clouding,

Andy

Bright HDInsight Part 1: Consume Compressed Input

This is the first of a series of quick tips on HDInsight (Hadoop on Azure), and deals with how to consume a compressed input stream.

The scenario

The HDInsight product and Big Data solutions in general by definition deal with large amounts of data. Data at rest incurs a cost, whether in terms of management of that data, security of it or simply the storing of it. Where possible, it is good practise to reduce the weight of data in a lossless way, without losing or compromising the data quality.

The standard way to do this is by using compression on the static data, where common strings are deflated from the static file and indexed so that they can be later repositioned in an inflation stage.

The problem with this for HDInsight or Hadoop in general is that the stream becomes a binary stream which it cannot access directly.

Configuring a Hadoop Job to read a Compressed input file

In order to configure a Hadoop Job to read the Compressed input, you simply have to specify a flag on the job command line. That flag is:

-D "io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec"
view raw inputflag.txt hosted with ❤ by GitHub

This causes an additional map task to be undertaken which loads the input as a GzipStream before your map/reduce job begins. NOTE: This can be a time consuming activity – if you’re planning on loading this file many times for parsing, your efficiency saving by doing this will be limited.

A 2Gb GZipped example file of mine was inflated to 11Gb, taking 20mins.

Filename dependency

If you try this approach, you might find a problem where the input strings to your Map job still seem to be binary. This is a sign of the stream not be inflated by the infrastructure. There is one last thing you must ensure in order to trigger the system to begin the inflation process – this is that the filename needs to be of the relevant extension. As an example, to use GZip, the filename must end with .gz such as “mylogfile.log.gz”

Using the HDInsight C# Streaming SDK to control Input Compression

In order to use the C# Streaming SDK with this flag, one simply modifies the Job.Configure override in order to add and additional generic argument specifying this flag.

//public override HadoopJobConfiguration Configure(ExecutorContext context)
HadoopJobConfiguration config = new HadoopJobConfiguration();
config.InputPath = "/input/data.log.gz";
config.OutputFolder = "/output/output" + DateTime.Now.ToString("yyyyMMddhhmmss");
config.AdditionalGenericArguments.Add("-D \"io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec\"");
return config;
view raw Job.cs hosted with ❤ by GitHub
You will find an additional Task specified in your JobTracker, which takes the stream and inflates for your runtime code.

A better way

If you can control your input stream (i.e. it is not provided by a third party), you should look at a better compression algorithm than the one shown here, Gzip. A better approach is to use LOZ, a splittable algorithm that allows you to better distribute your work. The Gzip algorithm must be processed in a sequential way, which makes it very hard to distribute the workload. LOZ (which is configued by using com.hadoop.compression.lzo.LzoCodec) is splittable, allowing better workload distribution.

Happy Hadoopy Cloudy Times :-)
Andy

 

Windows Azure Conf

hey all,

I’m back at the keyboard again after my session at http://www.windowsazureconf.net. It was a great experience, hats off to @BradyGaster for organising a great event. You can catch my talk right here:

I’ll be back this week with more technical posts.

Andy