Friday, March 4, 2016

An anatomy of the implementation of K-means in pyspark

K-means is one of the classic flat clustering algorithms. It is called "flat" because it returns only scattered clusters instead of a inner structure among these clusters.

A very detailed lecture note with solid explanation and pseudo-python code can be found via this link. Simply speaking, given a dataset (separated into training set and testing set), K-means first initializes K centers, which are K data points randomly selected from the training set. It then scans the training set one data point after one, to find out which center one data point is closest to. It labels each data point with its cluster index, then it recomputes a new center of each cluster using these data points. The new centers will be updated though a number of iterations until they no more change.

Implementation of K-means using Spark is a bit different from the traditional way, as to the usage of transformations and actions. If you have a recent Spark distribution downloaded in your PC and configured it correctly, you can try the following code to get a better understanding how K-means is implemented in pyspark. We use the example file, kmeans.py located at:

 $SPARK_HOME/examples/src/main/python  

Note that this is just an example to show how K-means is implemented. When we call

from pyspark.mllib.clustering import KMeans  

in our script, this "Kmeans" is actually a function in the script "clustering.py", which is located at pyspark library:

 $SPARK_HOME/python/pyspark/mllib

(in this blog I showed an example of K-means by importing it from pyspark library)

In this blog, we will use a sample dataset, called "kmeans_data.txt", which contains 6 lines and 3 columns. It looks like:



I copied this file from local to the folder /user/xywang on hdfs.

I used ./pyspark to enter the pyspark API. First of all, I created a SparkContext by

 >>> sc = SparkContext(appName = "kmeans")  

Note that if you use pyspark API, SparkContext is built automatically. Actually, I used "sc.stop()" to kill the auto-built sc, and then type this command to build mine.

After SparkContext is successfully built, we can build an RDD by loading the text file from hdfs, by

 >>> lines = sc.textFile("hdfs://master:9000/user/xywang/kmeans_data.txt")  

Except for loading from hdfs, you can also use "sc.parallelize(object)" to create an RDD.

As .textFile() is a transformation, it will not be executed till one action is applied. To see the content of "lines", we use action .collect(), like

 >>> lines.collect()  
 [u'0.0 0.0 0.0', u'0.1 0.1 0.1', u'0.2 0.2 0.2', u'9.0 9.0 9.0', u'9.1 9.1 9.1', u'9.2 9.2 9.2']  

Function "parseVector()":

 >>> def parseVector(line):  
 ...   return np.array([float(x) for x in line.split(' ')])  

given a line, it parse it and produces an array of float numbers.

When it is used inside .map(), they produce "data" from "lines"

 >>> data = lines.map(parseVector)  
 >>> data.collect()  
 [array([ 0., 0., 0.]), array([ 0.1, 0.1, 0.1]), array([ 0.2, 0.2, 0.2]), array([ 9., 9., 9.]), array([ 9.1, 9.1, 9.1]), array([ 9.2, 9.2, 9.2])]  

Note that inside .map(), "parseVector" is called without any parameter passing to it. It looks wired from the classic usage of a function, but .map() actually passes each of the element in "lines" to the function "parseVector", and returns a list of arrays.

we can use .cache() to store data in memory.

"K" is the number of clusters, it uses sys.argv[2] to capture user input on console

 K = int(sys.argv[2])  

"convergeDist" is a threshold value, it is used to control the number of iterations in the "while" loop. In K-means, there are a few ways to control the number of iterations: (1) by defining a specific number of iterations (2) by defining a small value, when the sum of errors is lower than this value, loop breaks and (3) loop breaks when centers no more change. Given "convergeDist" belongs to case (2).

 convergeDist = float(sys.argv[3])  

"kPoints" are the initialized centers sampled from "data"

 kPoints = data.takeSample(False, K, 1)  

.takeSample(withReplacement, num, [seed]) is an action, it returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

 >>> kPoints  
 [array([ 0.1, 0.1, 0.1]), array([ 0.2, 0.2, 0.2]), array([ 9.2, 9.2, 9.2])]  

As "kPoints" is result of an action, we don't need to .collect() it.

The "while" loop starts with

 tempDist > convergeDist:  

"tempDist" is initially defined as "1.0", it is actually the inter-cluster sum of squares, e.g the sum of distances of each point in the cluster to its center. It will be updated in each iteration as the center is updated.

   while tempDist > convergeDist:  
      # the loop stops when tempDist < convergeDist  
     closest = data.map(lambda p: (closestPoint(p, kPoints), (p, 1)))  
     pointStats = closest.reduceByKey(lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))  
     newPoints = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()  
     tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)  
     for (iK, p) in newPoints:  
       kPoints[iK] = p  

The importance of this loop is the usage of ,map() and .reduceByKey() transformations. In the first .map(), it applies a lambda function on each element of data, and produces a tuple of (cluster_index, (data_point, 1)). In this tuple, the cluster_index is the "key" and tuple (data_point, 1) is the "value". Inside the lambda function, we can find function "cloestPoint()", it takes a data point and the centers, and returns a cluster_index, which is the closest center to the given data point. For the first iteration, we can collect the results of "closest", it is :

 >>> closest.collect()  
 [(0, (array([ 0., 0., 0.]), 1)), (0, (array([ 0.1, 0.1, 0.1]), 1)), (1, (array([ 0.2, 0.2, 0.2]), 1)), (2, (array([ 9., 9., 9.]), 1)), (2, (array([ 9.1, 9.1, 9.1]), 1)), (2, (array([ 9.2, 9.2, 9.2]), 1))]  

In next step, .reduceByKey() is used, it actually first reduces elements in "closest" by key, then applies lambda function on the values pair by pair. Take the first two elements in "closest" for example, in .reduceByKey()

 p1_c1[0] = array([ 0., 0., 0.]), p2_c2[0] = array([ 0.1, 0.1, 0.1]), p1_c1[1] = 1, p2_c2[1] = 1,   

Thus thus lambda p1_c1, p2_c2 on this pair returns (array([0.1,0.1,0.1]),2). If we collect "pointStats" we will have

 >>> pointStats.collect()  
 [(0, (array([ 0.1, 0.1, 0.1]), 2)), (2, (array([ 27.3, 27.3, 27.3]), 3)), (1, (array([ 0.2, 0.2, 0.2]), 1))]  

After this, .map() is applied on "pointStats", lambda function is applied on each element of "pointStats", symbolized by "st", and produces a tuple. Take the first element of "pointStats", (0, (array([ 0.1, 0.1, 0.1]), 2)),   for example : st[0] = 0, st[1][0] = array([0.1,0.1,0.1]), st[1][1] = 2. Therefore, "newPoints" are the updated centers, if we collect it, we see

 >>> newPoints  
 [(0, array([ 0.05, 0.05, 0.05])), (2, array([ 9.1, 9.1, 9.1])), (1, array([ 0.2, 0.2, 0.2]))]  

Then it is time to update "tempDist", which is obtained by list comprehension.

This "while" loops goes on until "tempDist < convergeDist".

I ran this "kmeans.py" on Iris dataset, and got result:



This is my command on console:

 ~/spark-1.4.0-bin-hadoop2.6/bin$ ./pyspark ../examples/src/main/python/kmeans.py "hdfs://master:9000/user/xywang/iris.txt" "3" "0.1"  


Note that Iris dataset is processed, details can be found in my blog.

This blog is titled with "An anatomy of the implementation of K-means in pyspark", the main aim of it is to give a detailed explanation of how K-means is implemented in pyspark. The sparkling point that interests me is the usage of .map(), .reduceByKey() combined with a user-defined function or with a lambda function. Writing down my discoveries help me to track my working process, I also hope this sharing can be found useful for other people.

No comments:

Post a Comment