AWS and Hadoop developer Tom White shows how to use Hadoop and Amazon Web Services together to process a large collection of web access logs.
Submitted By: Craig@AWS
AWS Products Used: Amazon EC2, Amazon S3
Language(s): Java
Created On: July 18, 2007 7:51 PM GMT
Last Updated: September 21, 2008 10:08 PM GMT
By Tom White
Managing large datasets is hard; running computations on large datasets is even harder. Once a dataset has exceeded the capacity of a single filesystem or a single machine, running data processing tasks requires specialist hardware and applications, or, if attempted on a network of commodity machines, it requires a lot of manual work to manage the process: splitting the dataset into manageable chunks, launching jobs, handling failures, and combining job output into a final result.
Apache's Hadoop project aims to solve these problems by providing a framework for running large data processing applications on clusters of commodity hardware. Combined with Amazon EC2 for running the application, and Amazon S3 for storing the data, we can run large jobs very economically. This paper describes how to use Amazon Web Services and Hadoop to run an ad hoc analysis on a large collection of web access logs that otherwise would have cost a prohibitive amount in either time or money.
Apache Hadoop, a sub-project of the well-known Lucene text search library, provides several components for building distributed applications. For this article we shall focus on the MapReduce component, which provides a simple but powerful programming model for writing parallel programs by defining how the processing can be split into small fragments of work.
The MapReduce concept (and name) comes from Google, which is described in an excellent paper by Jeffrey Dean and Sanjay Ghemawat, which is well worth reading. Google's MapReduce implementation, while extensively used inside the company, is obviously not available for general use. A goal of the Hadoop project is to provide an open source implementation of MapReduce that anyone can run on their own cluster, or on rented hardware, such as an Amazon EC2 cluster. While the Hadoop implementation is similar to that described in the Dean and Ghemawat paper, it is worth noting that there are differences in design and nomenclature.
For our example we are going to write a program that takes web server access log files (as produced by an Apache Web Server, for example) and counts the number of hits in each minute slot over a week. We will analyze months of logs and plot the distribution in order to get a view of how traffic volumes tend to vary over the course of a week. The beauty of this approach is that the same program will scale to months or years of massive logs, simply by increasing the cluster size.
The best way to think about MapReduce programs is to think about the input and output of each phase: the Map phase and the Reduce phase. Each phase has key-value pairs as input, and key-value pairs as output. The types and number of records of the input and output may be different, although the Map output types must be the same as the Reduce input types. Let's see how to choose the types for our MapReduce program
The input to the Map phase is the access log files, and we use an input format that gives us key-value pairs which are the character offset within the access log (which we ignore) and the corresponding line. Our Map function takes a log line, pulls out the timestamp field for when the server finished processing the request, converts it into a minute-in-week slot, then writes out a ( , <1>) key-value pair. We are mapping each line in the access log to its minute-in-week slot.
The Reduce is given keys and an iterator over all the values for the key that were produced by the maps. So all we have to do is sum the values as we iterate over them, in order to produce a final output which are ( , ) key-value pairs. Hadoop's MapReduce infrastructure is actually doing a lot behind the scenes to make this work, and to make it reliable in the face of hardware failure. It even sorts the output of the Reduce phase by key, so we have exactly what we need to plot our graph. (We won't use Hadoop to plot our graph for us, but more of that later.)
To illustrate the way this all works, take the following three-line access log:
192.168.0.5 - - [22/Aug/2005:22:07:52 +0000] "GET / HTTP/1.1" 200 1722 192.168.0.5 - - [22/Aug/2005:22:07:52 +0000] "GET /styles/layout.css HTTP/1.1" 200 2187 192.168.0.5 - - [22/Aug/2005:22:08:00 +0000] "GET /projects.html HTTP/1.1" 200 4024Our Map takes each input line and produces the following output (note 22:07 on 22 August 2005 is the 1387th minute in the week, where the week starts on Monday):
<0, 192.168.0.5 - - [22/Aug/2005:22:07:52 +0000] "GET / HTTP/1.1" 200 1722> -> <1387, 1> <71, 192.168.0.5 - - [22/Aug/2005:22:07:52 +0000] "GET /styles/layout.css HTTP/1.1" 200 2187> -> <1387, 1> <159, 192.168.0.5 - - [22/Aug/2005:22:08:00 +0000] "GET /projects.html HTTP/1.1" 200 40247> -> <1388, 1>Our Reduce then adds up the 1 values emitted by the Map, to produce totals:
<1387, (1, 1)> -> <1387, 2> <1388, (1)> -> <1388, 1>The Reduce output is written out as a tab-separated file. It says there were two requests in the 1387th minute-in-week slot, and one in the 1388th - which is correct.
1387 2 1388 1Let's translate the above ideas into Java code. First we create a class that will run our program:
public class AccessLogFileAnalyzer { //... }It contains our Map and Reduce inner classes and a main method to launch the job. Here's the Map class:
public static class MapClass extends MapReduceBase implements Mapper { private final static LongWritable ONE = new LongWritable(1); private static Pattern p = Pattern .compile("([^ ]*) ([^ ]*) ([^ ]*) //[([^]]*)//] /"([^/"]*)/"" + " ([^ ]*) ([^ ]*).*"); private static DateTimeFormatter formatter = DateTimeFormat .forPattern("dd/MMM/yyyy:HH:mm:ss Z"); private IntWritable minute = new IntWritable(); public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String line = ((Text) value).toString(); Matcher matcher = p.matcher(line); if (matcher.matches()) { String timestamp = matcher.group(4); minute.set(getMinuteBucket(timestamp)); output.collect(minute, ONE); } } private int getMinuteBucket(String timestamp) { DateTime dt = formatter.parseDateTime(timestamp); return dt.getMinuteOfDay() + (dt.getDayOfWeek() - 1) * DateTimeConstants.MINUTES_PER_DAY; } }The interesting work is done by the map() method, which is specified in the org.apache.hadoop.mapred.Mapper interface. It takes the value parameter that it is passed and casts it to a org.apache.hadoop.io.Text type - we shall see later how this is specified later. The Text object is a Hadoop framework class that can be serialized using Hadoop's serialization protocol and stores text in a UTF-8 encoding. We convert it to a regular Java String, before we use a regular expression for extracting the timestamp field from a Extended Log File Format record. We call the utility method getMinuteBucket(), which uses the handy Joda Time library to convert the timestamp to an integer minute-in-week slot, then write our output to the org.apache.hadoop.mapred.OutputCollector. Notice we use anorg.apache.hadoop.io.IntWritable to wrap the key, and a org.apache.hadoop.io.LongWritable to wrap the value, so that Hadoop can serialize the values.
The Reduce code is much simpler:
public static class ReduceClass extends MapReduceBase implements Reducer { public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += ((LongWritable) values.next()).get(); } output.collect(key, new LongWritable(sum)); } }We simply iterate through the values we are passed, which are the same types as the Map output values (LongWritable), and sum them. The key is also the same as the Map output key, an (IntWritable), and we use it to emit the final key-value pair: the minute-in-week slot, and total hit count for that slot.
Hadoop actually comes with a library of stock maps and reducers, and in this case we could have usedLongSumReducer which does the same as our reducer, but it's useful to see how you can implement your own reducer.
The final piece of code is the main() method that runs the MapReduce job.
public static void main(String[] args) throws IOException { if (args.length != 2) { System.err .println("Usage: AccessLogFileAnalyzer "); System.exit(-1); } JobConf conf = new JobConf(AccessLogFileAnalyzer.class); conf.setInputPath(new Path(args[0])); conf.setOutputPath(new Path(args[1])); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(LongWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(ReduceClass.class); conf.setReducerClass(ReduceClass.class); conf.setNumReduceTasks(1); JobClient.runJob(conf); }A org.apache.hadoop.mapred.JobConf object hold details of how to run a MapReduce job. First, we specify the input and output directory paths, which are set from the command line arguments. Next, we set the output key-value types, which are the types that the Reducer emits. We didn't set the input types, since the defaults (org.apache.hadoop.io.LongWritable for the beginning of line character offsets, and org.apache.hadoop.io.Text for the lines) are what we need. Also, the input format and output format - how the input files are turned into key-value pairs, and how the output key-value pairs are turned into output files - are not specified since the default is to use text files (as opposed to using a more compact binary format).
Having set the input and output configuration parameters, we specify the Map and Reduce classes to use. We also set the Combiner class. A Combiner is just a Reduce task that runs in the same process as the Map task after the Map task has finished. The reason this is often a good idea is to greatly reduce the amount of data that has to be sent across the network as input to the Reduce phase. For the current application the reduction in network traffic is stark: rather than serializing as many key-value pairs as there are lines in the input file being processed by the Map task, in the order of 107, the number of pairs emitted after the Combine task runs is the number of distinct minute-in-week slots, 104, a difference of three orders of magnitude.
The last job configuration setting concerns the number of maps and reduces. Selecting good values for these issomething of an art, but for this application one Reduce task works well, primarily due to the great optimization achieved by the effect of the Combine phase. This is not typical however, as it is common to have a number of Reduces, which produce a corresponding number of output files. The outputs may need to be combined into a single output as a final post-processing step, or they may be input to a further MapReduce job.
The final line of the program does all the work of submitting the job and waiting for it to finish.
Before letting our new program loose on a large cluster of machines it is a good idea to check it works on a small amount of test data. This is easy to do from your IDE. All you need is to put the Hadoop jar (and its dependencies) on the classpath, with your MapReduce job (and its dependencies, Joda Time in this case). When you run the main method of the job it will use a local job runner that runs Hadoop in the same JVM, which allows you to run a debugger, should you need to.
We run AccessLogFileAnalyzer with two command line arguments: the first to specify the input directory containing a few small access log files, and the second to specify the (initially non-existent) output directory. After the job completes there is a file called part-00000 in the output directory which contains the output key-value pairs for the weekly distribution:
0 6 1 15 2 30 3 22 4 20 5 25 ... 10075 6 10076 10 10077 11 10078 4 10079 4This looks good. So let's try analyzing a big dataset.
So far we have only focused on the MapReduce programming model, without much regard for where the data comes from. Hadoop is designed to process large datasets, typically larger than can be stored on a single machine, and its MapReduce implementation is tuned to process data that is stored on the Hadoop Distributed File System (HDFS). HDFS is another component of Hadoop and was inspired by The Google File System, described in apaper by Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung.
We have several options about how to get the data into HDFS for processing, including:
Write the data to HDFS as it is created. Copy the data from the machines it was created on to HDFS. Copy the data from the machines it was created on to Amazon S3, then to HDFS.Note that files stored on S3 are not subject to the usual 5GB limitation since there is a special Hadoop filesystem that breaks files into blocks so we can store arbitrarily large files on S3.
We'll illustrate option 3 for the purposes of this article. And while a full discussion of the pros and cons of each option is beyond the scope of this article, it's worth mentioning that S3 scores well as a persistent store for the log files (it can double as a back up) and it is easy enough to copy them from the web server after they have been rotated at the end of the day. Transfers between S3 and EC2 are free, so we can launch our EC2 Hadoop cluster, transfer the files to it, run our job, then shut down the cluster. This means we only pay for the EC2 instances for as long as the job runs. For very large datasets the time taken to transfer the data from S3 may become prohibitive, in which case it is worth considering storing the data on HDFS on a long-running EC2 cluster.
Hadoop comes with tools to move files between different filesystems. To copy files from a locally mounted filesystem you would install Hadoop locally, then run a command like:
bin/hadoop fs -put /path/to/source /path/to/targetThe target path is actually a URI. To copy files to S3 we use the s3 URI scheme, so we would do something like
bin/hadoop fs -put /path/to/source s3:// : @ /path/to/targetwhere , , are your Amazon S3 credentials. (Note that since the secret access key can contain slashes, you must remember to escape them by replacing each slash / with the string %2F)
How do we get our MapReduce application to the cluster? Simple, we package it in a jar along with its dependencies. It's a bit like a WAR file, except dependent jars go in a lib subdirectory, rather than in WEB-INF/lib. Here's the relevant Ant task for our application:
Hadoop is packaged as a public EC2 image (an AMI) so it is easy for us to get up and running with a cluster. If the version of Hadoop you want to run is not available - for example, if you want to run a patched version - then it is easy to build your own.
Here we'll just use a stock Hadoop AMI. We can find which versions of Hadoop are available as AMIs by using the Amazon EC2 tools. (Version 0.13.0 was used for this article.)
ec2-describe-images -a | grep hadoop-ec2-imagesWhile it is possible to use the EC2 tools to launch Hadoop instances, Hadoop comes with a set of scripts that make the job of launching a cluster much easier. The scripts come with the standard Hadoop distribution, so start by downloading and unpacking the latest version of Hadoop on your local workstation. (The latest nightly build is recommended since it fixes a bug in the scripts that at the time of writing is not in an official released version). Then edit the EC2 configuration in src/contrib/ec2/bin/hadoop-ec2-env.sh to specify your Amazon Web Service settings, the Hadoop version to run on the cluster (which does not have to match the version of the distribution we unpacked on our workstation), the hostname for the master, and the size of the cluster.
The hostname you select should be one you have control over, as you will be asked to set it to point to a particular IP address during launch. Free services such as DynDNS make this very easy.
How big should the cluster be? It depends on the number of maps and reduces, but in most cases make it as big as you can. By default EC2 users are limited to 20 instances, so this is a natural starting point.
With the configuration out of the way (and see the Running Hadoop on EC2 wiki page if you need more pointers) we're ready to go. Here's what to type:
bin/hadoop-ec2 runThis command does the following:
Starts a cluster of Hadoop nodes Prompts you to set up DNS with the given IP address Formats the HDFS filesystem on the cluster Starts the Hadoop daemons on the cluster Logs you onto the master nodeIt's also possible to run these commands one at a time, which can be useful for debugging. Type bin/hadoop-ec2 for usage instructions.
The next step is to copy our data from S3 to HDFS on our pristine cluster. First create a logs directory in HDFS, then do the copy using the distcp tool that comes with Hadoop by running the following on the master node:
cd /usr/local/hadoop- bin/hadoop fs -mkdir logs bin/hadoop distcp s3:// : @ /path/to/logs logsWe also need to copy our job jar from our workstation by running (from our workstation):
. bin/hadoop-ec2-env.sh scp $SSH_OPTS /path/to/aws-job.jar root@$MASTER_HOST:We are ready to run the job at last. From the master node:
bin/hadoop jar ~/aws-job.jar logs outThe command line output will periodically report progress. However, it is worth using the web interface to monitor the job, since it gives more information and allows you to drill down into log files running on the various nodes, which can be invaluable in the event of failures. The interface is available at http:// :50030/.
The final summary output is shown in the table below. The system maintains counts of input and output data records and bytes, in this case the job processed just under 100GB of data - six weeks of logs - and it took about 35 minutes. (It should go without saying that this is a great deal faster than processing on a single machine would have taken.)
It took about 5 minutes to transfer the data (which was compressed - Hadoop can read compressed input data out of the box) from S3 to to HDFS, so the whole job took less than a hour. At $0.10 per instance-hour, this works out at only $2 for the whole job, plus S3 storage and transfer costs - that's external transfer costs, because remember transfers between EC2 and S3 are free.
Counter Map Reduce Total
Map-Reduce Framework Map input records 449,662,417 0 449,662,417
Map output records 449,661,579 0 449,661,579
Map input bytes 105,793,389,172 0 105,793,389,172
Map output bytes 5,395,938,948 0 5,395,938,948
Combine input records 449,661,579 0 449,661,579
Combine output records 60,730 0 60,730
Reduce input groups 0 10,080 10,080
Reduce input records 0 60,730 60,730
Reduce output records 0 10,080 10,080
The output from the job is very small, so we can easily copy it to our workstation then run a simple R script to produce a graph of traffic over the week.
png("web_hits_over_week.png") data <- read.table("part-00000") plot(data, axes=FALSE, ann=FALSE, type="p", pch=".") lines(c(1440*7,0), c(0, 0), col="gray") for (i in 0:7) { lines(c(1440,1440)*i, c(0, max(data)), col="gray") } dev.off()We've run our job and we're happy with the results (which we've safely copied from the cluster), so we can shut down the cluster. From our workstation:
bin/hadoop-ec2 terminate-clusterMapReduce is a great programming model for large scale data processing tasks, and Amazon EC2 is a great place to run your MapReduce programs to process data stored on Amazon S3. In this article we've looked at how to doad hoc web access log analysis, but there are plenty of other applications that can be expressed as MapReduce programs - applications that process your data. So get writing your data processing programs with Hadoop and use Amazon Web Services to run them at the scale you need.
Tom White is a committer on the Apache Hadoop project. He is the primary author of Hadoop's Amazon EC2 and S3 integration.