Extra Cookie

Yet Another Programmer's Blog

The Hadoop 2.x - Introduction to YARN

The Old MapReduce

The Hadoop 0.x MapReduce system composed of JobTracker and TaskTrackers.

The JobTracker is responsible for resource management, tracking resource usage and job life-cycle management, e.g. scheduling job tasks, tracking progress, providing fault-tolerance for tasks.

The TaskTracker is the per-node slave for JobTracker, takes orders from the JobTracker to launch or tear-down tasks, and provides task status information to the JobTracker periodically.

For those years, we are benefited from the MapReduce framework, it’s the most successful programming model in the big data world.

But MapReduce is not everything, we need to do graph processing, or real-time stream processing, since Hadoop is essentially batch oriented, we have to look for other systems to do those work.

And the hadoop community made a huge change.

The Hadoop YARN

The fundamental idea of YARN is to split up the two major responsibilities of the JobTracker i.e. resource management and job scheduling/monitoring, into separate daemons: a global ResourceManager (RM) and per-application ApplicationMaster (AM).

The ResourceManager is responsible for allocating resources to the running applications.

The NodeManager is a per-machine slave, works on launching the application’s containers, monitoring the resource usage, and reporting them to the ResourceManager.

The ApplicationMaster is a per-application framework, which runs as a normal container, responsible for negotiating appropriate resource containers from ResourceManager, tracking their status and monitoring for progress.

Click here to read the details about Hadoop YARN.

The Hadoop Distribution

My intention is to read the source code of hadoop, so I prefer to build a hadoop distribution from the source code.

First, git clone from github or apache.

Second, checkout the branch-2.2.0 branch, which is quite stable.

Third, apply below patch,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
diff --git a/hadoop-common-project/hadoop-auth/pom.xml b/hadoop-common-project/hadoop-auth/pom.xml
index 8819941..70ff207 100644
--- a/hadoop-common-project/hadoop-auth/pom.xml
+++ b/hadoop-common-project/hadoop-auth/pom.xml
@@ -55,6 +55,11 @@
     </dependency>
     <dependency>
       <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
       <artifactId>jetty</artifactId>
       <scope>test</scope>
     </dependency>

Fourth, type and run.

mvn package -Pdist -DskipTests -Dtar

The Installation Guide

This is a great guide to install Hadoop 2.2.0.

I did a single installation, after configuring everything, hdfs can be setup and daemons are started by below scripts.

1
2
3
4
5
$ /bin/hdfs namenode -format
$ sbin/hadoop-daemon.sh start namenode
$ sbin/hadoop-daemon.sh start datanode
$ sbin/yarn-daemon.sh start resourcemanager
$ sbin/yarn-daemon.sh start nodemanager

Then, run date -u commands on two containers.

1
$ bin/hadoop jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar -shell_command 'date' -shell_args "-u" -num_containers 2

The logs are printed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
14/02/20 22:50:59 INFO distributedshell.Client: Initializing Client
14/02/20 22:50:59 INFO distributedshell.Client: Running Client
14/02/20 22:50:59 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/02/20 22:50:59 INFO distributedshell.Client: Got Cluster metric info from ASM, numNodeManagers=1
14/02/20 22:50:59 INFO distributedshell.Client: Got Cluster node info from ASM
14/02/20 22:50:59 INFO distributedshell.Client: Got node report from ASM for, nodeId=192.168.0.102:52786, nodeAddress192.168.0.102:8042, nodeRackName/default-rack, nodeNumContainers0
14/02/20 22:50:59 INFO distributedshell.Client: Queue info, queueName=default, queueCurrentCapacity=0.0, queueMaxCapacity=1.0, queueApplicationCount=0, queueChildQueueCount=0
14/02/20 22:50:59 INFO distributedshell.Client: User ACL Info for Queue, queueName=root, userAcl=SUBMIT_APPLICATIONS

...

14/02/20 22:50:59 INFO distributedshell.Client: Max mem capabililty of resources in this cluster 2048
14/02/20 22:50:59 INFO distributedshell.Client: Copy App Master jar from local filesystem and add to local environment
14/02/20 22:51:00 INFO distributedshell.Client: Set the environment for the application master
14/02/20 22:51:00 INFO distributedshell.Client: Setting up app master command
14/02/20 22:51:00 INFO distributedshell.Client: Completed setting up app master command $JAVA_HOME/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --num_containers 2 --priority 0 --shell_command date --shell_args -u 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
14/02/20 22:51:00 INFO distributedshell.Client: Submitting application to ASM
14/02/20 22:51:00 INFO impl.YarnClientImpl: Submitted application application_1392907840296_0001 to ResourceManager at /0.0.0.0:8032
14/02/20 22:51:01 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToAMToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=default, appMasterRpcPort=0, appStartTime=1392907860335, yarnAppState=ACCEPTED, distributedFinalState=UNDEFINED, appTrackingUrl=192.168.0.102:8088/proxy/application_1392907840296_0001/, appUser=chris

...

14/02/20 22:51:08 INFO distributedshell.Client: Application has completed successfully. Breaking monitoring loop
14/02/20 22:51:08 INFO distributedshell.Client: Application completed successfully

And the results are:

1
2
3
$ cat logs/userlogs/application_1392907840296_0001/*/stdout
Thu Feb 20 14:51:05 UTC 2014
Thu Feb 20 14:51:06 UTC 2014

That’s my first YARN job running!

An Approach for User Behavior Clustering

Story

We, programmers built Apps for people to use, sometimes, we could benefit from our users, too.

We could collect anonymous data from users by recording their behaviors on using our App, then analyzing those data, we could find the most favorable features of our App for us to plan for future development, we could uncover some hidden needs of users for us to add new features or create new Apps, we could cluster the users and use different marketing strategy on each users group, etc.

This post will be an example of how I do user clustering.

Imagine I have a music player app, which has 2 millions users.

All data in my hand is how many times a user played, downloaded, purchased, and shared the songs as well as his active days (If a user opens the app one day, then the active days increment by one) as follows.

User id Downloaded Played Purchased Shared Active days
100035 7 53 0 0 4
150079 45 312 3 8 63
... ... ... ... ... ...
199972 114 2425 82 25 205

k-means Algorithms

k-means clustering aims to partition n observations into k clusters and each cluster is represented by its cluster center.

Euclidean distance can be used to represent the distance of each point.

Given cluster centers, we can simply assign each point to its nearest center. Similarly, if we know the assignment of points to clusters, we can compute the centers by their means.

This introduces a chicken-and-egg problem.

The general computer science answer to chicken-and-egg problems is iteration. We will start with a guess of the cluster centers, for example, randomly choose k points as cluster centers. Based on that guess, we will assign each data point to its closest center. Then we can recompute the cluster centers on these new assignments.

Repeat above process until clusters stop moving.

If you want to know more, please click.

Solution

Identify the features

From the data, there are five columns, “played”, “downloaded”, “purchased”, “shared”, and “active days”.

The first four are user behaviors, and we believe all are important, so those four will be our features.

Normalize the data

But the data is not “balanced”, some values are hundreds of times bigger than the others. Luckily, we have “active days”, simply divide each feature value by its “active days”, then the values are “balanced”.

Clustering

We will use scipy, believe me, it’s a great tool.

First, import the packages and load the data.

1
2
3
4
5
6
7
8
import numpy as np
from scipy.cluster import vq
from matplotlib import pyplot as plt

data = np.loadtxt('user_normalized_log', delimiter='\t')

# The first column is userId, better ignore it
subject = data[:,1:]

Then trying k = 4,

1
centers, dist = vq.kmeans(subject, 4)

and get the centers of each cluster.

1
2
3
4
5
6
7
8
9
10
11
array([[5.42879071e+00, 1.37091994e+00, 1.04975836e-01, 6.75508656e-02],
       [6.73032088e+01, 6.70227684e+00, 4.90761870e-01, 3.36144445e-01],
       [1.86167614e+01, 2.40616831e+00, 2.02231194e-01, 2.60513990e-01],
       [1.97979796e-01, 1.78638818e-01, 4.22213041e-02, 5.21060364e-02]]

=

array([[5.42879071, 1.37091994, 0.104975836, 0.0675508656],
       [67.3032088, 6.70227684, 0.49076187, 0.336144445],
       [18.6167614, 2.40616831, 0.202231194, 0.26051399],
       [0.197979796, 0.178638818, 0.0422213041, 0.0521060364]])

Below code will assign the code(cluster) to each subjects(observations).

1
2
3
4
5
code, distance = vq.vq(subject,centers)
a = subject[code == 0]
b = subject[code == 1]
c = subject[code == 2]
d = subject[code == 3]

And calculates each cluster size.

1
2
In [266]: a.shape, b.shape, c.shape, d.shape
Out[266]: ((393926, 8), (28301, 8), (208115, 8), (1855756, 8))

Then I cried, look at the biggest cluster, the fourth one, the number of songs played, downloaded, purchased, and shared per active day by users are all nearly 0.

The final truth is although I have 2 million users, nearly all are zombie users.

(Disclaimer, the data mentioned in this post is faked)