In this blog, let's
make an anatomy of the implementation of PageRank in pyspark.
PageRank is well-know for Google's searching. If you are new to it,
here is a good
overview for this algorithm.
MapReduce
was firstly created to compute PageRank. I found this
a good lecture video on how to compute PageRank using MapReduce.
Spark is noted to be much more efficient than MapReduce because it
decreases I/O cost and allows iterative computing much faster. In
studying Spark, it would be a good exercise to understand how
PageRank is implemented under this framework.
If you follow my
previous blogs, you may be a bit familiar with pyspark, the Python
API of Spark. For this blog, we will use the example “pagerank.py”
in the Spark distribution, in detail, path:
$SPARK_HOME/examples/src/main/python.
To
start with, we need to have a proper dataset. To test this example
“pagerank.py”, I prepared a dataset called “dt_pagerank.txt”,
whose content is shown as the screenshot:
I generated this
dataset from the plot
of the PageRank wikipedia page. Though it is a directed graph, I
consider it undirected, and I omit small nodes without names. I used
tab to separate two letters in a line. One letter denotes a website,
followed by a neighbor.
To run this
pagerank.py, on terminal, I typed:
$ ~/spark-1.4.0-bin-hadoop2.6/bin/./pyspark ~/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/pagerank.py "hdfs://master:9000/user/xywang/dt_pagerank.txt" "10"
the command
specifies the path of pyspark, followed the path of pagerank.py, then
the path of dataset (that is pushed onto hdfs), and the number of
iterations.
Note that Hadoop and
Spark should be turned on. If everything is all right, you should get
this result on console.
Trouble shooting,
you may also run it using “python path/pagerank.py path/dataset
num_ite”, but I got the exception that “no module called
pyspark”. A solution is to specify the $SPARK_HOME and PYTHONPATH
on the top of the script of pagerank.py, specificly:
import os
import sys
os.environ['SPARK_HOME']="/home/hduser/spark-1.4.0-bin-hadoop2.6"
sys.path.append("/home/hduser/spark-1.4.0-bin-hadoop2.6/python")
The test of dt_pagerank.txt and pagerank.py look all right. Here let's have a closer look of pagerank.py to understand what is going on. I use pyspark on Ubuntu console to track down everything:
1. as said in the script itself, we need to initialize the spark context
>>> sc = SparkContext(appName="PythonPageRank")
only Spark context is initialized, RDD can be built.
2. built RDD by uploading a hdfs file
>>> lines = sc.textFile("hdfs://master:9000/user/xywang/dt_pagerank.txt",1)
as said before, I pushed the dataset dt_pagerank.txt onto hdfs, so that the path starts with hdfs://
“1” means the number of partition is 1.
“lines” is an RDD. As RDD can't be read until an action is called. To see the content of “lines”, we can type:
>>> lines.collect( )
“collect()” is an anction that returns all the elements of the dataset as an array at the driver program.
This command returns us:
[u'A \tD', u'B\tC', u'B\tD', u'B\tE', u'B\tF', u'C\tB', u'D\tA', u'D\tB', u'E\tB', u'F\tB']
Function “parseNeighbors(urls)” uses regular expression to separate each element in the list above into a tuple. For example:
>>> parseNeighbors(u'F\tB')
(u'F', u'B')
3. Separate the nodes in each list element, by
>>> links = lines.map(lambda urls: parseNeighbors(urls))
this is a map transformation on “lines”. Combined with the “lambda” function, it applies “sparseNeighbor( )” on each element in “lines”, separate them into a tuple. But this transformation will not be executed until an action is applied. Let's say
>>> links.collect()
[(u'A', u'D'), (u'B', u'C'), (u'B', u'D'), (u'B', u'E'), (u'B', u'F'), (u'C', u'B'), (u'D', u'A'), (u'D', u'B'), (u'E', u'B'), (u'F', u'B')]
in the script,
transformation distinct() is also applied, as in dataset
dt_pagerank.txt, there is no duplicated entry, so applying distinct()
will return the same result shown as above.
4. in Spark, key and
value is kept in a tuple. In the “links.collect()” we see that
some keys are duplicated, thus transformation groupByKey() is used
>>> links = links.groupByKey()
>>> links.collect()
[(u'A', <pyspark.resultiterable.ResultIterable object at 0x7efc08d8d590>),
…
gourpByKey() returns
a dataset of (K, Iterable<V>) pairs. But as we see, Iterable<V>
is a spark object, it is actually a list of values.
By now, we have the
(key, value list) for each node. This information is saved in
variable “links”
5. Initialize rank
>>> ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
this command uses
transformation map simply to assign “1.0” for each node, if you
see ranks.collect(), you should get
>>> ranks.collect()
[(u'A', 1.0), (u'C', 1.0), (u'B', 1.0), (u'E', 1.0), (u'D', 1.0), (u'F', 1.0)]
pay attention to the usage of index [0] and brackets
( ) in the lambda function. If
you use [1], this in fact indicates the 2nd
element in the tuple. And brackets ( ) is
to form a standard key,value pair.
6.
Next you see a loop starts with
for
iteration in range(int(sys.argv[2])):
this
is actually the “10” we input in the example. sys.argv[2] tells
the system to grab the 3rd
input as a parameter into this program.
In
this loop, two things are done:(1)
(2)
for (1), the script
firstly combine links and ranks, by
>>> links.join(ranks)
if you use collect()
on it, you will see that the (key, value list) is now (key, (value
list, 1.0)), “1.0” is appended to the value of each key.
Next, function
computeContribs(urls, rank) is called, by
>>> contribs = links.join(ranks).flatMap(
lambda x: computeContribs(x[1][0], x[1][1]))
to simplify it, we
use “x” to replace “url_urls_rank”. In the function
computeContribs(), “x” means (key, (value list, 1.0)), therefore
x[1][0] is the “value list” and x[1][1] is “1.0”. This
function computes how much pagerank score a node receives from its
neighbor. If you collect() contribs, you see
>>> contribs.collect()
[(u'D', 1.0), (u'B', 1.0), (u'B', 1.0), (u'C', 0.25), (u'D', 0.25), (u'E', 0.25), (u'F', 0.25), (u'A', 0.5), (u'B', 0.5), (u'B', 1.0)]
Then (2) is to sum
up the scores received by a node from different neighbors, taken into
account the damping factor. All of this is realized by
>>> new_ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
I used “new_ranks”
instead of rank. The transformation reduceByKey(add) actually sums up
the values for one key, e.g when for key u'B', its value equals to
1.0+1.0+0.5+1.0. Transformation mapValues(fun) applies a function on
the value of a key. In the end, if we collect() the result, we see
>>> new_ranks.collect()
[(u'A', 0.575), (u'C', 0.3625), (u'E', 0.3625), (u'B', 3.125), (u'D', 1.2125), (u'F', 0.3625)]
This is the result
after one iteration.
In the following
iterations, new_ranks will be passed to compute “contribs”, which
sill update the “new_ranks” until the numer of iterations is
reached.
Voila, this is how
pagerank is implemented in pyspark. By explaining this example step
by step, I hope that we can get a better understanding how Spark
realize this algorithm by applying transformations and actions on an
RDD. This process is a bit different from the common way of
programming. In my point of view, it is limited because we have to
use transformations and actions, which are not so many. But on the
other hand, we gain the benefit of parallelism.
I high appreciate this post. It’s hard to find the good from the bad sometimes, but I think you’ve nailed it! would you mind updating your blog with more information?
ReplyDeletecheck pr
I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Spark Certification
ReplyDelete