Friday, March 4, 2016

An anatomy of the implementation of K-means in pyspark

K-means is one of the classic flat clustering algorithms. It is called "flat" because it returns only scattered clusters instead of a inner structure among these clusters.

A very detailed lecture note with solid explanation and pseudo-python code can be found via this link. Simply speaking, given a dataset (separated into training set and testing set), K-means first initializes K centers, which are K data points randomly selected from the training set. It then scans the training set one data point after one, to find out which center one data point is closest to. It labels each data point with its cluster index, then it recomputes a new center of each cluster using these data points. The new centers will be updated though a number of iterations until they no more change.

Implementation of K-means using Spark is a bit different from the traditional way, as to the usage of transformations and actions. If you have a recent Spark distribution downloaded in your PC and configured it correctly, you can try the following code to get a better understanding how K-means is implemented in pyspark. We use the example file, kmeans.py located at:

 $SPARK_HOME/examples/src/main/python  

Note that this is just an example to show how K-means is implemented. When we call

from pyspark.mllib.clustering import KMeans  

in our script, this "Kmeans" is actually a function in the script "clustering.py", which is located at pyspark library:

 $SPARK_HOME/python/pyspark/mllib

(in this blog I showed an example of K-means by importing it from pyspark library)

In this blog, we will use a sample dataset, called "kmeans_data.txt", which contains 6 lines and 3 columns. It looks like:



I copied this file from local to the folder /user/xywang on hdfs.

I used ./pyspark to enter the pyspark API. First of all, I created a SparkContext by

 >>> sc = SparkContext(appName = "kmeans")  

Note that if you use pyspark API, SparkContext is built automatically. Actually, I used "sc.stop()" to kill the auto-built sc, and then type this command to build mine.

After SparkContext is successfully built, we can build an RDD by loading the text file from hdfs, by

 >>> lines = sc.textFile("hdfs://master:9000/user/xywang/kmeans_data.txt")  

Except for loading from hdfs, you can also use "sc.parallelize(object)" to create an RDD.

As .textFile() is a transformation, it will not be executed till one action is applied. To see the content of "lines", we use action .collect(), like

 >>> lines.collect()  
 [u'0.0 0.0 0.0', u'0.1 0.1 0.1', u'0.2 0.2 0.2', u'9.0 9.0 9.0', u'9.1 9.1 9.1', u'9.2 9.2 9.2']  

Function "parseVector()":

 >>> def parseVector(line):  
 ...   return np.array([float(x) for x in line.split(' ')])  

given a line, it parse it and produces an array of float numbers.

When it is used inside .map(), they produce "data" from "lines"

 >>> data = lines.map(parseVector)  
 >>> data.collect()  
 [array([ 0., 0., 0.]), array([ 0.1, 0.1, 0.1]), array([ 0.2, 0.2, 0.2]), array([ 9., 9., 9.]), array([ 9.1, 9.1, 9.1]), array([ 9.2, 9.2, 9.2])]  

Note that inside .map(), "parseVector" is called without any parameter passing to it. It looks wired from the classic usage of a function, but .map() actually passes each of the element in "lines" to the function "parseVector", and returns a list of arrays.

we can use .cache() to store data in memory.

"K" is the number of clusters, it uses sys.argv[2] to capture user input on console

 K = int(sys.argv[2])  

"convergeDist" is a threshold value, it is used to control the number of iterations in the "while" loop. In K-means, there are a few ways to control the number of iterations: (1) by defining a specific number of iterations (2) by defining a small value, when the sum of errors is lower than this value, loop breaks and (3) loop breaks when centers no more change. Given "convergeDist" belongs to case (2).

 convergeDist = float(sys.argv[3])  

"kPoints" are the initialized centers sampled from "data"

 kPoints = data.takeSample(False, K, 1)  

.takeSample(withReplacement, num, [seed]) is an action, it returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

 >>> kPoints  
 [array([ 0.1, 0.1, 0.1]), array([ 0.2, 0.2, 0.2]), array([ 9.2, 9.2, 9.2])]  

As "kPoints" is result of an action, we don't need to .collect() it.

The "while" loop starts with

 tempDist > convergeDist:  

"tempDist" is initially defined as "1.0", it is actually the inter-cluster sum of squares, e.g the sum of distances of each point in the cluster to its center. It will be updated in each iteration as the center is updated.

   while tempDist > convergeDist:  
      # the loop stops when tempDist < convergeDist  
     closest = data.map(lambda p: (closestPoint(p, kPoints), (p, 1)))  
     pointStats = closest.reduceByKey(lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))  
     newPoints = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()  
     tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)  
     for (iK, p) in newPoints:  
       kPoints[iK] = p  

The importance of this loop is the usage of ,map() and .reduceByKey() transformations. In the first .map(), it applies a lambda function on each element of data, and produces a tuple of (cluster_index, (data_point, 1)). In this tuple, the cluster_index is the "key" and tuple (data_point, 1) is the "value". Inside the lambda function, we can find function "cloestPoint()", it takes a data point and the centers, and returns a cluster_index, which is the closest center to the given data point. For the first iteration, we can collect the results of "closest", it is :

 >>> closest.collect()  
 [(0, (array([ 0., 0., 0.]), 1)), (0, (array([ 0.1, 0.1, 0.1]), 1)), (1, (array([ 0.2, 0.2, 0.2]), 1)), (2, (array([ 9., 9., 9.]), 1)), (2, (array([ 9.1, 9.1, 9.1]), 1)), (2, (array([ 9.2, 9.2, 9.2]), 1))]  

In next step, .reduceByKey() is used, it actually first reduces elements in "closest" by key, then applies lambda function on the values pair by pair. Take the first two elements in "closest" for example, in .reduceByKey()

 p1_c1[0] = array([ 0., 0., 0.]), p2_c2[0] = array([ 0.1, 0.1, 0.1]), p1_c1[1] = 1, p2_c2[1] = 1,   

Thus thus lambda p1_c1, p2_c2 on this pair returns (array([0.1,0.1,0.1]),2). If we collect "pointStats" we will have

 >>> pointStats.collect()  
 [(0, (array([ 0.1, 0.1, 0.1]), 2)), (2, (array([ 27.3, 27.3, 27.3]), 3)), (1, (array([ 0.2, 0.2, 0.2]), 1))]  

After this, .map() is applied on "pointStats", lambda function is applied on each element of "pointStats", symbolized by "st", and produces a tuple. Take the first element of "pointStats", (0, (array([ 0.1, 0.1, 0.1]), 2)),   for example : st[0] = 0, st[1][0] = array([0.1,0.1,0.1]), st[1][1] = 2. Therefore, "newPoints" are the updated centers, if we collect it, we see

 >>> newPoints  
 [(0, array([ 0.05, 0.05, 0.05])), (2, array([ 9.1, 9.1, 9.1])), (1, array([ 0.2, 0.2, 0.2]))]  

Then it is time to update "tempDist", which is obtained by list comprehension.

This "while" loops goes on until "tempDist < convergeDist".

I ran this "kmeans.py" on Iris dataset, and got result:



This is my command on console:

 ~/spark-1.4.0-bin-hadoop2.6/bin$ ./pyspark ../examples/src/main/python/kmeans.py "hdfs://master:9000/user/xywang/iris.txt" "3" "0.1"  


Note that Iris dataset is processed, details can be found in my blog.

This blog is titled with "An anatomy of the implementation of K-means in pyspark", the main aim of it is to give a detailed explanation of how K-means is implemented in pyspark. The sparkling point that interests me is the usage of .map(), .reduceByKey() combined with a user-defined function or with a lambda function. Writing down my discoveries help me to track my working process, I also hope this sharing can be found useful for other people.

Wednesday, March 2, 2016

An anatomy of the implementation of PageRank in pyspark


In this blog, let's make an anatomy of the implementation of PageRank in pyspark. PageRank is well-know for Google's searching. If you are new to it, here is a good overview for this algorithm.
MapReduce was firstly created to compute PageRank. I found this a good lecture video on how to compute PageRank using MapReduce. Spark is noted to be much more efficient than MapReduce because it decreases I/O cost and allows iterative computing much faster. In studying Spark, it would be a good exercise to understand how PageRank is implemented under this framework.

If you follow my previous blogs, you may be a bit familiar with pyspark, the Python API of Spark. For this blog, we will use the example “pagerank.py” in the Spark distribution, in detail, path: $SPARK_HOME/examples/src/main/python.

To start with, we need to have a proper dataset. To test this example “pagerank.py”, I prepared a dataset called “dt_pagerank.txt”, whose content is shown as the screenshot:

 
I generated this dataset from the plot of the PageRank wikipedia page. Though it is a directed graph, I consider it undirected, and I omit small nodes without names. I used tab to separate two letters in a line. One letter denotes a website, followed by a neighbor.
To run this pagerank.py, on terminal, I typed:
 $ ~/spark-1.4.0-bin-hadoop2.6/bin/./pyspark ~/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/pagerank.py "hdfs://master:9000/user/xywang/dt_pagerank.txt" "10"  


the command specifies the path of pyspark, followed the path of pagerank.py, then the path of dataset (that is pushed onto hdfs), and the number of iterations.

Note that Hadoop and Spark should be turned on. If everything is all right, you should get this result on console.

 

Trouble shooting, you may also run it using “python path/pagerank.py path/dataset num_ite”, but I got the exception that “no module called pyspark”. A solution is to specify the $SPARK_HOME and PYTHONPATH on the top of the script of pagerank.py, specificly:
 import os  
 import sys  
 os.environ['SPARK_HOME']="/home/hduser/spark-1.4.0-bin-hadoop2.6"  
 sys.path.append("/home/hduser/spark-1.4.0-bin-hadoop2.6/python")  


The test of dt_pagerank.txt and pagerank.py look all right. Here let's have a closer look of pagerank.py to understand what is going on. I use pyspark on Ubuntu console to track down everything:

1. as said in the script itself, we need to initialize the spark context
 >>> sc = SparkContext(appName="PythonPageRank")  

only Spark context is initialized, RDD can be built.


2. built RDD by uploading a hdfs file
 >>> lines = sc.textFile("hdfs://master:9000/user/xywang/dt_pagerank.txt",1)  

as said before, I pushed the dataset dt_pagerank.txt onto hdfs, so that the path starts with hdfs://
“1” means the number of partition is 1.
“lines” is an RDD. As RDD can't be read until an action is called. To see the content of “lines”, we can type:
 >>> lines.collect( )  

“collect()” is an anction that returns all the elements of the dataset as an array at the driver program.
This command returns us:
 [u'A \tD', u'B\tC', u'B\tD', u'B\tE', u'B\tF', u'C\tB', u'D\tA', u'D\tB', u'E\tB', u'F\tB']  

Function “parseNeighbors(urls)” uses regular expression to separate each element in the list above into a tuple. For example:
 >>> parseNeighbors(u'F\tB')  
 (u'F', u'B')  


3. Separate the nodes in each list element, by
 >>> links = lines.map(lambda urls: parseNeighbors(urls))  

this is a map transformation on “lines”. Combined with the “lambda” function, it applies “sparseNeighbor( )” on each element in “lines”, separate them into a tuple. But this transformation will not be executed until an action is applied. Let's say
 >>> links.collect()  
 [(u'A', u'D'), (u'B', u'C'), (u'B', u'D'), (u'B', u'E'), (u'B', u'F'), (u'C', u'B'), (u'D', u'A'), (u'D', u'B'), (u'E', u'B'), (u'F', u'B')]  

in the script, transformation distinct() is also applied, as in dataset dt_pagerank.txt, there is no duplicated entry, so applying distinct() will return the same result shown as above.
 
4. in Spark, key and value is kept in a tuple. In the “links.collect()” we see that some keys are duplicated, thus transformation groupByKey() is used
 >>> links = links.groupByKey()  
 >>> links.collect()  
 [(u'A', <pyspark.resultiterable.ResultIterable object at 0x7efc08d8d590>),  
 …  

gourpByKey() returns a dataset of (K, Iterable<V>) pairs. But as we see, Iterable<V> is a spark object, it is actually a list of values.
By now, we have the (key, value list) for each node. This information is saved in variable “links”
5. Initialize rank
 >>> ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))  

this command uses transformation map simply to assign “1.0” for each node, if you see ranks.collect(), you should get
 >>> ranks.collect()  
 [(u'A', 1.0), (u'C', 1.0), (u'B', 1.0), (u'E', 1.0), (u'D', 1.0), (u'F', 1.0)]  

pay attention to the usage of index [0] and brackets ( ) in the lambda function. If you use [1], this in fact indicates the 2nd element in the tuple. And brackets ( ) is to form a standard key,value pair.


6. Next you see a loop starts with
for iteration in range(int(sys.argv[2])):
this is actually the “10” we input in the example. sys.argv[2] tells the system to grab the 3rd input as a parameter into this program.
In this loop, two things are done:
(1)
(2)


for (1), the script firstly combine links and ranks, by
 >>> links.join(ranks)  

if you use collect() on it, you will see that the (key, value list) is now (key, (value list, 1.0)), “1.0” is appended to the value of each key.
Next, function computeContribs(urls, rank) is called, by
 >>> contribs = links.join(ranks).flatMap(  
 lambda x: computeContribs(x[1][0], x[1][1]))   

to simplify it, we use “x” to replace “url_urls_rank”. In the function computeContribs(), “x” means (key, (value list, 1.0)), therefore x[1][0] is the “value list” and x[1][1] is “1.0”. This function computes how much pagerank score a node receives from its neighbor. If you collect() contribs, you see
 >>> contribs.collect()  
 [(u'D', 1.0), (u'B', 1.0), (u'B', 1.0), (u'C', 0.25), (u'D', 0.25), (u'E', 0.25), (u'F', 0.25), (u'A', 0.5), (u'B', 0.5), (u'B', 1.0)]  


Then (2) is to sum up the scores received by a node from different neighbors, taken into account the damping factor. All of this is realized by
 >>> new_ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)   

I used “new_ranks” instead of rank. The transformation reduceByKey(add) actually sums up the values for one key, e.g when for key u'B', its value equals to 1.0+1.0+0.5+1.0. Transformation mapValues(fun) applies a function on the value of a key. In the end, if we collect() the result, we see
 >>> new_ranks.collect()  
 [(u'A', 0.575), (u'C', 0.3625), (u'E', 0.3625), (u'B', 3.125), (u'D', 1.2125), (u'F', 0.3625)]  

This is the result after one iteration.
In the following iterations, new_ranks will be passed to compute “contribs”, which sill update the “new_ranks” until the numer of iterations is reached.

Voila, this is how pagerank is implemented in pyspark. By explaining this example step by step, I hope that we can get a better understanding how Spark realize this algorithm by applying transformations and actions on an RDD. This process is a bit different from the common way of programming. In my point of view, it is limited because we have to use transformations and actions, which are not so many. But on the other hand, we gain the benefit of parallelism.