I feel a necessary to update my Spark on a cluster of nodes because new libraries are added. To upgrade the Spark distribution:
1. For each node, download spark-x.x.x-bin-hadoopx.x.tgz and unzip it
2. Go to folder “conf” in the unzipped file and configure for master and for workers differently:
(1) For worker node(s):
a. Generate file “log4j.properties” from “log4j.properties.template”, change “INFO” to “ERROR” for line “log4j.rootCategory=INFOR, console”
b. Generate file “spark-env.sh” from “spark-env.sh.template”, add the line “export SPARK_MASTER_IP=xxx.xxx.xxx.xxx” to specify master PC's IP address
(2)For master node:
a. Create files “log4j.properties” and “spark-env.sh” exactly as for woker node
b. Generate file “slaves” from “slaves.template”, invalid first line “localhost” by adding an # before it. And add IP addresses of workers below, one line for one worker IP address
3. Test on address "http://master-IP-address:8080/". If everything is fine, you would see a table with worker IDs, addresses, status, number of cores and memories. Their status are marked as "alive".
“... if you have an idea and I have an idea and we exchange these ideas, then each of us will have two ideas.” ― George Bernard Shaw
Friday, June 10, 2016
Friday, March 4, 2016
An anatomy of the implementation of K-means in pyspark
K-means is one of the classic flat clustering algorithms. It is called "flat" because it returns only scattered clusters instead of a inner structure among these clusters.
A very detailed lecture note with solid explanation and pseudo-python code can be found via this link. Simply speaking, given a dataset (separated into training set and testing set), K-means first initializes K centers, which are K data points randomly selected from the training set. It then scans the training set one data point after one, to find out which center one data point is closest to. It labels each data point with its cluster index, then it recomputes a new center of each cluster using these data points. The new centers will be updated though a number of iterations until they no more change.
Implementation of K-means using Spark is a bit different from the traditional way, as to the usage of transformations and actions. If you have a recent Spark distribution downloaded in your PC and configured it correctly, you can try the following code to get a better understanding how K-means is implemented in pyspark. We use the example file, kmeans.py located at:
Note that this is just an example to show how K-means is implemented. When we call
in our script, this "Kmeans" is actually a function in the script "clustering.py", which is located at pyspark library:
(in this blog I showed an example of K-means by importing it from pyspark library)
In this blog, we will use a sample dataset, called "kmeans_data.txt", which contains 6 lines and 3 columns. It looks like:

I copied this file from local to the folder /user/xywang on hdfs.
I used ./pyspark to enter the pyspark API. First of all, I created a SparkContext by
Note that if you use pyspark API, SparkContext is built automatically. Actually, I used "sc.stop()" to kill the auto-built sc, and then type this command to build mine.
After SparkContext is successfully built, we can build an RDD by loading the text file from hdfs, by
Except for loading from hdfs, you can also use "sc.parallelize(object)" to create an RDD.
As .textFile() is a transformation, it will not be executed till one action is applied. To see the content of "lines", we use action .collect(), like
Function "parseVector()":
given a line, it parse it and produces an array of float numbers.
When it is used inside .map(), they produce "data" from "lines"
Note that inside .map(), "parseVector" is called without any parameter passing to it. It looks wired from the classic usage of a function, but .map() actually passes each of the element in "lines" to the function "parseVector", and returns a list of arrays.
we can use .cache() to store data in memory.
"K" is the number of clusters, it uses sys.argv[2] to capture user input on console
"convergeDist" is a threshold value, it is used to control the number of iterations in the "while" loop. In K-means, there are a few ways to control the number of iterations: (1) by defining a specific number of iterations (2) by defining a small value, when the sum of errors is lower than this value, loop breaks and (3) loop breaks when centers no more change. Given "convergeDist" belongs to case (2).
"kPoints" are the initialized centers sampled from "data"
.takeSample(withReplacement, num, [seed]) is an action, it returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
As "kPoints" is result of an action, we don't need to .collect() it.
The "while" loop starts with
"tempDist" is initially defined as "1.0", it is actually the inter-cluster sum of squares, e.g the sum of distances of each point in the cluster to its center. It will be updated in each iteration as the center is updated.
The importance of this loop is the usage of ,map() and .reduceByKey() transformations. In the first .map(), it applies a lambda function on each element of data, and produces a tuple of (cluster_index, (data_point, 1)). In this tuple, the cluster_index is the "key" and tuple (data_point, 1) is the "value". Inside the lambda function, we can find function "cloestPoint()", it takes a data point and the centers, and returns a cluster_index, which is the closest center to the given data point. For the first iteration, we can collect the results of "closest", it is :
In next step, .reduceByKey() is used, it actually first reduces elements in "closest" by key, then applies lambda function on the values pair by pair. Take the first two elements in "closest" for example, in .reduceByKey()
Thus thus lambda p1_c1, p2_c2 on this pair returns (array([0.1,0.1,0.1]),2). If we collect "pointStats" we will have
After this, .map() is applied on "pointStats", lambda function is applied on each element of "pointStats", symbolized by "st", and produces a tuple. Take the first element of "pointStats", (0, (array([ 0.1, 0.1, 0.1]), 2)), for example : st[0] = 0, st[1][0] = array([0.1,0.1,0.1]), st[1][1] = 2. Therefore, "newPoints" are the updated centers, if we collect it, we see
Then it is time to update "tempDist", which is obtained by list comprehension.
This "while" loops goes on until "tempDist < convergeDist".
I ran this "kmeans.py" on Iris dataset, and got result:

This is my command on console:
Note that Iris dataset is processed, details can be found in my blog.
This blog is titled with "An anatomy of the implementation of K-means in pyspark", the main aim of it is to give a detailed explanation of how K-means is implemented in pyspark. The sparkling point that interests me is the usage of .map(), .reduceByKey() combined with a user-defined function or with a lambda function. Writing down my discoveries help me to track my working process, I also hope this sharing can be found useful for other people.
A very detailed lecture note with solid explanation and pseudo-python code can be found via this link. Simply speaking, given a dataset (separated into training set and testing set), K-means first initializes K centers, which are K data points randomly selected from the training set. It then scans the training set one data point after one, to find out which center one data point is closest to. It labels each data point with its cluster index, then it recomputes a new center of each cluster using these data points. The new centers will be updated though a number of iterations until they no more change.
Implementation of K-means using Spark is a bit different from the traditional way, as to the usage of transformations and actions. If you have a recent Spark distribution downloaded in your PC and configured it correctly, you can try the following code to get a better understanding how K-means is implemented in pyspark. We use the example file, kmeans.py located at:
$SPARK_HOME/examples/src/main/python
Note that this is just an example to show how K-means is implemented. When we call
from pyspark.mllib.clustering import KMeans
in our script, this "Kmeans" is actually a function in the script "clustering.py", which is located at pyspark library:
$SPARK_HOME/python/pyspark/mllib
(in this blog I showed an example of K-means by importing it from pyspark library)
In this blog, we will use a sample dataset, called "kmeans_data.txt", which contains 6 lines and 3 columns. It looks like:
I copied this file from local to the folder /user/xywang on hdfs.
I used ./pyspark to enter the pyspark API. First of all, I created a SparkContext by
>>> sc = SparkContext(appName = "kmeans")
Note that if you use pyspark API, SparkContext is built automatically. Actually, I used "sc.stop()" to kill the auto-built sc, and then type this command to build mine.
After SparkContext is successfully built, we can build an RDD by loading the text file from hdfs, by
>>> lines = sc.textFile("hdfs://master:9000/user/xywang/kmeans_data.txt")
Except for loading from hdfs, you can also use "sc.parallelize(object)" to create an RDD.
As .textFile() is a transformation, it will not be executed till one action is applied. To see the content of "lines", we use action .collect(), like
>>> lines.collect()
[u'0.0 0.0 0.0', u'0.1 0.1 0.1', u'0.2 0.2 0.2', u'9.0 9.0 9.0', u'9.1 9.1 9.1', u'9.2 9.2 9.2']
Function "parseVector()":
>>> def parseVector(line):
... return np.array([float(x) for x in line.split(' ')])
given a line, it parse it and produces an array of float numbers.
When it is used inside .map(), they produce "data" from "lines"
>>> data = lines.map(parseVector)
>>> data.collect()
[array([ 0., 0., 0.]), array([ 0.1, 0.1, 0.1]), array([ 0.2, 0.2, 0.2]), array([ 9., 9., 9.]), array([ 9.1, 9.1, 9.1]), array([ 9.2, 9.2, 9.2])]
Note that inside .map(), "parseVector" is called without any parameter passing to it. It looks wired from the classic usage of a function, but .map() actually passes each of the element in "lines" to the function "parseVector", and returns a list of arrays.
we can use .cache() to store data in memory.
"K" is the number of clusters, it uses sys.argv[2] to capture user input on console
K = int(sys.argv[2])
"convergeDist" is a threshold value, it is used to control the number of iterations in the "while" loop. In K-means, there are a few ways to control the number of iterations: (1) by defining a specific number of iterations (2) by defining a small value, when the sum of errors is lower than this value, loop breaks and (3) loop breaks when centers no more change. Given "convergeDist" belongs to case (2).
convergeDist = float(sys.argv[3])
"kPoints" are the initialized centers sampled from "data"
kPoints = data.takeSample(False, K, 1)
.takeSample(withReplacement, num, [seed]) is an action, it returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
>>> kPoints
[array([ 0.1, 0.1, 0.1]), array([ 0.2, 0.2, 0.2]), array([ 9.2, 9.2, 9.2])]
As "kPoints" is result of an action, we don't need to .collect() it.
The "while" loop starts with
tempDist > convergeDist:
"tempDist" is initially defined as "1.0", it is actually the inter-cluster sum of squares, e.g the sum of distances of each point in the cluster to its center. It will be updated in each iteration as the center is updated.
while tempDist > convergeDist:
# the loop stops when tempDist < convergeDist
closest = data.map(lambda p: (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
newPoints = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)
for (iK, p) in newPoints:
kPoints[iK] = p
The importance of this loop is the usage of ,map() and .reduceByKey() transformations. In the first .map(), it applies a lambda function on each element of data, and produces a tuple of (cluster_index, (data_point, 1)). In this tuple, the cluster_index is the "key" and tuple (data_point, 1) is the "value". Inside the lambda function, we can find function "cloestPoint()", it takes a data point and the centers, and returns a cluster_index, which is the closest center to the given data point. For the first iteration, we can collect the results of "closest", it is :
>>> closest.collect()
[(0, (array([ 0., 0., 0.]), 1)), (0, (array([ 0.1, 0.1, 0.1]), 1)), (1, (array([ 0.2, 0.2, 0.2]), 1)), (2, (array([ 9., 9., 9.]), 1)), (2, (array([ 9.1, 9.1, 9.1]), 1)), (2, (array([ 9.2, 9.2, 9.2]), 1))]
In next step, .reduceByKey() is used, it actually first reduces elements in "closest" by key, then applies lambda function on the values pair by pair. Take the first two elements in "closest" for example, in .reduceByKey()
p1_c1[0] = array([ 0., 0., 0.]), p2_c2[0] = array([ 0.1, 0.1, 0.1]), p1_c1[1] = 1, p2_c2[1] = 1,
Thus thus lambda p1_c1, p2_c2 on this pair returns (array([0.1,0.1,0.1]),2). If we collect "pointStats" we will have
>>> pointStats.collect()
[(0, (array([ 0.1, 0.1, 0.1]), 2)), (2, (array([ 27.3, 27.3, 27.3]), 3)), (1, (array([ 0.2, 0.2, 0.2]), 1))]
After this, .map() is applied on "pointStats", lambda function is applied on each element of "pointStats", symbolized by "st", and produces a tuple. Take the first element of "pointStats", (0, (array([ 0.1, 0.1, 0.1]), 2)), for example : st[0] = 0, st[1][0] = array([0.1,0.1,0.1]), st[1][1] = 2. Therefore, "newPoints" are the updated centers, if we collect it, we see
>>> newPoints
[(0, array([ 0.05, 0.05, 0.05])), (2, array([ 9.1, 9.1, 9.1])), (1, array([ 0.2, 0.2, 0.2]))]
Then it is time to update "tempDist", which is obtained by list comprehension.
This "while" loops goes on until "tempDist < convergeDist".
I ran this "kmeans.py" on Iris dataset, and got result:
This is my command on console:
~/spark-1.4.0-bin-hadoop2.6/bin$ ./pyspark ../examples/src/main/python/kmeans.py "hdfs://master:9000/user/xywang/iris.txt" "3" "0.1"
Note that Iris dataset is processed, details can be found in my blog.
This blog is titled with "An anatomy of the implementation of K-means in pyspark", the main aim of it is to give a detailed explanation of how K-means is implemented in pyspark. The sparkling point that interests me is the usage of .map(), .reduceByKey() combined with a user-defined function or with a lambda function. Writing down my discoveries help me to track my working process, I also hope this sharing can be found useful for other people.
Wednesday, March 2, 2016
An anatomy of the implementation of PageRank in pyspark
In this blog, let's
make an anatomy of the implementation of PageRank in pyspark.
PageRank is well-know for Google's searching. If you are new to it,
here is a good
overview for this algorithm.
MapReduce
was firstly created to compute PageRank. I found this
a good lecture video on how to compute PageRank using MapReduce.
Spark is noted to be much more efficient than MapReduce because it
decreases I/O cost and allows iterative computing much faster. In
studying Spark, it would be a good exercise to understand how
PageRank is implemented under this framework.
If you follow my
previous blogs, you may be a bit familiar with pyspark, the Python
API of Spark. For this blog, we will use the example “pagerank.py”
in the Spark distribution, in detail, path:
$SPARK_HOME/examples/src/main/python.
To
start with, we need to have a proper dataset. To test this example
“pagerank.py”, I prepared a dataset called “dt_pagerank.txt”,
whose content is shown as the screenshot:

I generated this
dataset from the plot
of the PageRank wikipedia page. Though it is a directed graph, I
consider it undirected, and I omit small nodes without names. I used
tab to separate two letters in a line. One letter denotes a website,
followed by a neighbor.
To run this
pagerank.py, on terminal, I typed:
$ ~/spark-1.4.0-bin-hadoop2.6/bin/./pyspark ~/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/pagerank.py "hdfs://master:9000/user/xywang/dt_pagerank.txt" "10"
the command
specifies the path of pyspark, followed the path of pagerank.py, then
the path of dataset (that is pushed onto hdfs), and the number of
iterations.
Note that Hadoop and
Spark should be turned on. If everything is all right, you should get
this result on console.
Trouble shooting,
you may also run it using “python path/pagerank.py path/dataset
num_ite”, but I got the exception that “no module called
pyspark”. A solution is to specify the $SPARK_HOME and PYTHONPATH
on the top of the script of pagerank.py, specificly:
import os
import sys
os.environ['SPARK_HOME']="/home/hduser/spark-1.4.0-bin-hadoop2.6"
sys.path.append("/home/hduser/spark-1.4.0-bin-hadoop2.6/python")
The test of dt_pagerank.txt and pagerank.py look all right. Here let's have a closer look of pagerank.py to understand what is going on. I use pyspark on Ubuntu console to track down everything:
1. as said in the script itself, we need to initialize the spark context
>>> sc = SparkContext(appName="PythonPageRank")
only Spark context is initialized, RDD can be built.
2. built RDD by uploading a hdfs file
>>> lines = sc.textFile("hdfs://master:9000/user/xywang/dt_pagerank.txt",1)
as said before, I pushed the dataset dt_pagerank.txt onto hdfs, so that the path starts with hdfs://
“1” means the number of partition is 1.
“lines” is an RDD. As RDD can't be read until an action is called. To see the content of “lines”, we can type:
>>> lines.collect( )
“collect()” is an anction that returns all the elements of the dataset as an array at the driver program.
This command returns us:
[u'A \tD', u'B\tC', u'B\tD', u'B\tE', u'B\tF', u'C\tB', u'D\tA', u'D\tB', u'E\tB', u'F\tB']
Function “parseNeighbors(urls)” uses regular expression to separate each element in the list above into a tuple. For example:
>>> parseNeighbors(u'F\tB')
(u'F', u'B')
3. Separate the nodes in each list element, by
>>> links = lines.map(lambda urls: parseNeighbors(urls))
this is a map transformation on “lines”. Combined with the “lambda” function, it applies “sparseNeighbor( )” on each element in “lines”, separate them into a tuple. But this transformation will not be executed until an action is applied. Let's say
>>> links.collect()
[(u'A', u'D'), (u'B', u'C'), (u'B', u'D'), (u'B', u'E'), (u'B', u'F'), (u'C', u'B'), (u'D', u'A'), (u'D', u'B'), (u'E', u'B'), (u'F', u'B')]
in the script,
transformation distinct() is also applied, as in dataset
dt_pagerank.txt, there is no duplicated entry, so applying distinct()
will return the same result shown as above.
4. in Spark, key and
value is kept in a tuple. In the “links.collect()” we see that
some keys are duplicated, thus transformation groupByKey() is used
>>> links = links.groupByKey()
>>> links.collect()
[(u'A', <pyspark.resultiterable.ResultIterable object at 0x7efc08d8d590>),
…
gourpByKey() returns
a dataset of (K, Iterable<V>) pairs. But as we see, Iterable<V>
is a spark object, it is actually a list of values.
By now, we have the
(key, value list) for each node. This information is saved in
variable “links”
5. Initialize rank
>>> ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
this command uses
transformation map simply to assign “1.0” for each node, if you
see ranks.collect(), you should get
>>> ranks.collect()
[(u'A', 1.0), (u'C', 1.0), (u'B', 1.0), (u'E', 1.0), (u'D', 1.0), (u'F', 1.0)]
pay attention to the usage of index [0] and brackets
( ) in the lambda function. If
you use [1], this in fact indicates the 2nd
element in the tuple. And brackets ( ) is
to form a standard key,value pair.
6.
Next you see a loop starts with
for
iteration in range(int(sys.argv[2])):
this
is actually the “10” we input in the example. sys.argv[2] tells
the system to grab the 3rd
input as a parameter into this program.
In
this loop, two things are done:(1)
(2)
for (1), the script
firstly combine links and ranks, by
>>> links.join(ranks)
if you use collect()
on it, you will see that the (key, value list) is now (key, (value
list, 1.0)), “1.0” is appended to the value of each key.
Next, function
computeContribs(urls, rank) is called, by
>>> contribs = links.join(ranks).flatMap(
lambda x: computeContribs(x[1][0], x[1][1]))
to simplify it, we
use “x” to replace “url_urls_rank”. In the function
computeContribs(), “x” means (key, (value list, 1.0)), therefore
x[1][0] is the “value list” and x[1][1] is “1.0”. This
function computes how much pagerank score a node receives from its
neighbor. If you collect() contribs, you see
>>> contribs.collect()
[(u'D', 1.0), (u'B', 1.0), (u'B', 1.0), (u'C', 0.25), (u'D', 0.25), (u'E', 0.25), (u'F', 0.25), (u'A', 0.5), (u'B', 0.5), (u'B', 1.0)]
Then (2) is to sum
up the scores received by a node from different neighbors, taken into
account the damping factor. All of this is realized by
>>> new_ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
I used “new_ranks”
instead of rank. The transformation reduceByKey(add) actually sums up
the values for one key, e.g when for key u'B', its value equals to
1.0+1.0+0.5+1.0. Transformation mapValues(fun) applies a function on
the value of a key. In the end, if we collect() the result, we see
>>> new_ranks.collect()
[(u'A', 0.575), (u'C', 0.3625), (u'E', 0.3625), (u'B', 3.125), (u'D', 1.2125), (u'F', 0.3625)]
This is the result
after one iteration.
In the following
iterations, new_ranks will be passed to compute “contribs”, which
sill update the “new_ranks” until the numer of iterations is
reached.
Voila, this is how
pagerank is implemented in pyspark. By explaining this example step
by step, I hope that we can get a better understanding how Spark
realize this algorithm by applying transformations and actions on an
RDD. This process is a bit different from the common way of
programming. In my point of view, it is limited because we have to
use transformations and actions, which are not so many. But on the
other hand, we gain the benefit of parallelism.
Subscribe to:
Posts (Atom)