Are you looking for an advent calendar? You found it!

Creating Hadoop Streaming Job with Spring Data Apache Hadoop

Binary Stream

A Hadoop streaming job is a MapReduce job that uses standard Unix streams as an interface between the Apache Hadoop and our program. This naturally means that we can write MapReduce jobs by using any programming language that can read data from standard input and write data to standard input.

This tutorial describes how we can create a Hadoop streaming job by using Spring Data Apache Hadoop and Python. As an example we will analyze a novel called The Adventures of Sherlock Holmes and find out how many times the last name of Sherlock’s loyal sidekick Dr. Watson is mentioned in the novel.

Note: This blog entry assumes that we have already configured the used Apache Hadoop instance and that Python 2.7.2 is used.

We can create a streaming MapReduce job by following these steps:

  1. Get the required dependencies by using Maven.
  2. Create the mapper script.
  3. Create the reducer script.
  4. Configure the application context.
  5. Load the application context when the application starts.

These steps are described with more details in the following Sections. We will also learn how we can run our Hadoop streaming job.

Getting the Required Dependencies with Maven

We can download the required dependencies with Maven by following these steps:

  1. Add the Spring milestone repository to the list of repositories.
  2. Configure the required dependencies.

Since we are using the 1.0.0.RC2 version of Spring Data Apache Hadoop, we must add the Spring milestone repository to our pom.xml file. In other other words, we have to add the following repository declaration to our POM file:

<repositories>
    <repository>
        <id>spring-milestone</id>
        <name>Spring Maven SNAPSHOT Repository</name>
        <url>http://repo.springframework.org/milestone</url>
    </repository>
</repositories>

Our last step is to configure the required dependencies. We have to declare the following dependencies in the pom.xml file:

  • Spring Data Apache Hadoop
  • Apache Hadoop Core
  • Apache Hadoop Streaming

We can configure these dependencies by adding the following lines to the pom.xml file:

<!-- Spring Data Apache Hadoop -->
<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-hadoop</artifactId>
    <version>1.0.0.RC2</version>
</dependency>
<!-- Apache Hadoop Core -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.0.3</version>
</dependency>
<!-- Apache Hadoop Streaming -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-streaming</artifactId>
    <version>1.0.3</version>
</dependency>

Creating the Mapper Script

The implementation of the mapper script must follow these guidelines:

  • The mapper script receives its input data from the standard input.
  • The input is processed one line at the time.
  • The output is written to standard output as tab-delimited line.

We can implement our mapper script by following these steps:

  1. Read the input from standard input and process it one line at the time.
  2. Convert the processed line to UTF-8.
  3. Split the converted line into words.
  4. Remove special characters from each word.
  5. Encode the output to UTF-8 and write the key-value pair as a tab-delimited line to standard output.

The source code of the mapper.py file is given in following:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
import unicodedata

# Removes punctuation characters from the string
def strip_punctuation(word):
    return ''.join(x for x in word if unicodedata.category(x) != 'Po')

#Process input one line at the time
for line in sys.stdin:
    #Converts the line to Unicode
    line = unicode(line, "utf-8")
    #Splits the line to individual words
    words = line.split()
    #Processes each word one by one
    for word in words:
        #Removes punctuation characters
        word = strip_punctuation(word)
        #Prints the output
        print ("%s\t%s" % (word, 1)).encode("utf-8")

Creating the Reducer Script

The implementation of the reducer script must follow these guidelines:

  • The reducer script receives its input from standard input as tab-delimited key-value pairs.
  • The reducer script writes its output to standard output.

We can implement our reducer script by following these steps:

  1. Read key-value pairs from the standard input and process them one by one.
  2. Convert the processed line to UTF-8.
  3. Obtain the key and value by splitting the word.
  4. Count how many times the string “Watson” is given as a key to our reducer script.
  5. Encode the output to UTF-8 and write it to standard output.

The source code of the reducer.py file is given in following:

#!/usr/bin/python
# -*- coding: utf-8 -*-s

import sys

wordCount = 0

#Process input one line at the time
for line in sys.stdin:
    #Converts the line to Unicode
    line = unicode(line, "utf-8")
    #Gets key and value from the current line
    (key, value) = line.split("\t")
    if key == "Watson":
        #Increase word count by one
        wordCount = int(wordCount + 1);
#Prints the output
print ("Watson\t%s" % wordCount).encode("utf-8")

Configuring the Application Context

We can configure the application context of our application by following these steps:

  1. Create a properties file that contains the values of configuration parameters.
  2. Configure the property placeholder that is used to fetch the values of configuration parameters from the created properties file.
  3. Configure Apache Hadoop.
  4. Configure the Hadoop streaming job.
  5. Configure the Job Runner

Creating the Properties File

We can create the properties file by following these steps:

  1. Configure the default file system of Apache Hadoop.
  2. Configure the path that contains the input files.
  3. Configure the path in which the output files are written.
  4. Configure the path of our mapper script.
  5. Configure the path of our reducer script.

The contents of the application.properties file is given in following:

#Configures the default file system of Apache Hadoop
fs.default.name=hdfs://localhost:9000

#The path to the directory that contains our input files
input.path=/input/

#The path to the directory in which the output is written
output.path=/output/

#Configure the path of the mapper script
mapper.script.path=mapper.py

#Configure the path of the reducer script
reducer.script.path=reducer.py

Configuring the Property Placeholder

We can configure the property placeholder by adding the following element to our application context configuration file:

<context:property-placeholder location="classpath:application.properties" />

Configuring Apache Hadoop

We can use the configuration namespace element for providing configuration parameters to Apache Hadoop. In order to execute our job by using our Apache Hadoop instance, we have to configure the default file system. We can configure the default file system by adding the following element to the applicationContext.xml file:

<hdp:configuration>
    fs.default.name=${fs.default.name}
</hdp:configuration>

Configuring the Hadoop Streaming Job

We can create a Hadoop streaming job by using the streaming namespace element. The configuration process has the following steps:

  1. Configure the input path of the job.
  2. Configure the output path of the job.
  3. Configure the path of the mapper script.
  4. Configure the path of the reducer script.

The configuration of the Hadoop streaming job looks as follows:

<hdp:streaming id="streamingJob"
	input-path="${input.path}"
	output-path="${output.path}"
	mapper="${mapper.script.path}"
	reducer="${reducer.script.path}"/>

Configuring the Job Runner

A job runner is component that executes the Hadoop streaming job when the application context is loaded. We can configure it by using the job-runner namespace element. This process has the following steps:

  1. Configure the job runner bean.
  2. Configure the executed jobs.
  3. Configure the job runner to run the configured jobs when it is started.

The configuration of the job runner looks as follows:

<hdp:job-runner id="streamingJobRunner" job-ref="streamingJob" run-at-startup="true"/>

Loading the Application Context When the Application Starts

We have now created a Hadoop streaming job with Spring Data Apache Hadoop. This job is executed when the application context is loaded. We can load the application context during startup by creating a new ClasspathXmlApplicationContext object and providing the name of our application context configuration file as a constructor parameter. The source code of the Main class is given in following:

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {
    public static void main(String[] arguments) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
    }
}

Running the MapReduce Job

We have now learned how we can created a streaming MapReduce job by using Spring Data Apache Hadoop and Python. Our last step is to run the created job. Before we can run our job, we must download The Adventures of Sherlock Holmes. We must download the plain text version of this novel manually because Project Gutenberg blocks download utilities such as wget.

After we have downloaded the input file, we are ready to execute our MapReduce job. We can run our job by starting our Apache Hadoop instance in a pseudo-distributed mode and following these steps:

  1. Upload the input file to HDFS.
  2. Run the MapReduce job.

Uploading the Input File to HDFS

We can upload our input file to HDFS by running the following command at command prompt:

hadoop dfs -put pg1661.txt /input/pg1661.txt

We can verify that the upload was successful by running the following command at command prompt:

hadoop dfs -ls /input

If the file was uploaded successfully, we should see the following directory listing:

Found 1 items
-rw-r--r--   1 xxxx supergroup     594933 2012-08-05 12:07 /input/pg1661.txt

Running the MapReduce Job

We have two alternative methods for running our MapReduce job:

  • We can execute the main() method of the Main class from our IDE.
  • We can build a binary distribution of our example project by running the command mvn assembly:assembly at command prompt. This creates a zip package to the target directory. We can run the created MapReduce job by unzipping this package and using the provided startup scripts.

Note: If you are not familiar with the Maven assembly plugin, you might want to read my blog entry that describes how you can create a runnable binary distribution with the Maven assembly plugin.

The outcome of our Hadoop streaming job does not depend from the method that is used to run it. The outcome of our job should be written to the configured outcome directory of HDFS.

Note: If the execution of our MapReduce job fails because the output directory exists, we can delete the output directory by running the following command at command prompt:

hadoop dfs -rmr /output

We can check the output of our job by running the following command at command prompt:

hadoop dfs -ls /output

This command lists the files found from the /output directory of HDFS. If our job was executed successfully, we should see a similar directory listing:

Found 2 items
-rw-r--r--   3 xxxx supergroup          0 2012-08-05 12:31 /output/_SUCCESS
-rw-r--r--   3 xxxx supergroup         10 2012-08-05 12:31 /output/part-00000

Now we will finally find out the answer to our question. We can get the answer by running the following command at command prompt:

hadoop dfs -cat /output/part-00000

We should see the following output:

Watson  81

We now know that the last name of doctor Watson was mentioned 81 times in the novel The Adventures of Sherlock Holmes.

PS. A fully functional example application is available at Github.

If you enjoyed reading this blog post, you should follow me on Twitter:

About the Author

Petri Kainulainen is passionate about software development and continuous improvement. He is specialized in software development with the Spring Framework and is the author of Spring Data book.

About Petri Kainulainen →

1 comment… add one

  • I updated the Spring Data Apache Hadoop version to 1.0.0.RC2.

    Reply

Leave a Comment