Wednesday, March 2, 2016

An anatomy of the implementation of PageRank in pyspark


In this blog, let's make an anatomy of the implementation of PageRank in pyspark. PageRank is well-know for Google's searching. If you are new to it, here is a good overview for this algorithm.
MapReduce was firstly created to compute PageRank. I found this a good lecture video on how to compute PageRank using MapReduce. Spark is noted to be much more efficient than MapReduce because it decreases I/O cost and allows iterative computing much faster. In studying Spark, it would be a good exercise to understand how PageRank is implemented under this framework.

If you follow my previous blogs, you may be a bit familiar with pyspark, the Python API of Spark. For this blog, we will use the example “pagerank.py” in the Spark distribution, in detail, path: $SPARK_HOME/examples/src/main/python.

To start with, we need to have a proper dataset. To test this example “pagerank.py”, I prepared a dataset called “dt_pagerank.txt”, whose content is shown as the screenshot:

 
I generated this dataset from the plot of the PageRank wikipedia page. Though it is a directed graph, I consider it undirected, and I omit small nodes without names. I used tab to separate two letters in a line. One letter denotes a website, followed by a neighbor.
To run this pagerank.py, on terminal, I typed:
 $ ~/spark-1.4.0-bin-hadoop2.6/bin/./pyspark ~/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/pagerank.py "hdfs://master:9000/user/xywang/dt_pagerank.txt" "10"  


the command specifies the path of pyspark, followed the path of pagerank.py, then the path of dataset (that is pushed onto hdfs), and the number of iterations.

Note that Hadoop and Spark should be turned on. If everything is all right, you should get this result on console.

 

Trouble shooting, you may also run it using “python path/pagerank.py path/dataset num_ite”, but I got the exception that “no module called pyspark”. A solution is to specify the $SPARK_HOME and PYTHONPATH on the top of the script of pagerank.py, specificly:
 import os  
 import sys  
 os.environ['SPARK_HOME']="/home/hduser/spark-1.4.0-bin-hadoop2.6"  
 sys.path.append("/home/hduser/spark-1.4.0-bin-hadoop2.6/python")  


The test of dt_pagerank.txt and pagerank.py look all right. Here let's have a closer look of pagerank.py to understand what is going on. I use pyspark on Ubuntu console to track down everything:

1. as said in the script itself, we need to initialize the spark context
 >>> sc = SparkContext(appName="PythonPageRank")  

only Spark context is initialized, RDD can be built.


2. built RDD by uploading a hdfs file
 >>> lines = sc.textFile("hdfs://master:9000/user/xywang/dt_pagerank.txt",1)  

as said before, I pushed the dataset dt_pagerank.txt onto hdfs, so that the path starts with hdfs://
“1” means the number of partition is 1.
“lines” is an RDD. As RDD can't be read until an action is called. To see the content of “lines”, we can type:
 >>> lines.collect( )  

“collect()” is an anction that returns all the elements of the dataset as an array at the driver program.
This command returns us:
 [u'A \tD', u'B\tC', u'B\tD', u'B\tE', u'B\tF', u'C\tB', u'D\tA', u'D\tB', u'E\tB', u'F\tB']  

Function “parseNeighbors(urls)” uses regular expression to separate each element in the list above into a tuple. For example:
 >>> parseNeighbors(u'F\tB')  
 (u'F', u'B')  


3. Separate the nodes in each list element, by
 >>> links = lines.map(lambda urls: parseNeighbors(urls))  

this is a map transformation on “lines”. Combined with the “lambda” function, it applies “sparseNeighbor( )” on each element in “lines”, separate them into a tuple. But this transformation will not be executed until an action is applied. Let's say
 >>> links.collect()  
 [(u'A', u'D'), (u'B', u'C'), (u'B', u'D'), (u'B', u'E'), (u'B', u'F'), (u'C', u'B'), (u'D', u'A'), (u'D', u'B'), (u'E', u'B'), (u'F', u'B')]  

in the script, transformation distinct() is also applied, as in dataset dt_pagerank.txt, there is no duplicated entry, so applying distinct() will return the same result shown as above.
 
4. in Spark, key and value is kept in a tuple. In the “links.collect()” we see that some keys are duplicated, thus transformation groupByKey() is used
 >>> links = links.groupByKey()  
 >>> links.collect()  
 [(u'A', <pyspark.resultiterable.ResultIterable object at 0x7efc08d8d590>),  
 …  

gourpByKey() returns a dataset of (K, Iterable<V>) pairs. But as we see, Iterable<V> is a spark object, it is actually a list of values.
By now, we have the (key, value list) for each node. This information is saved in variable “links”
5. Initialize rank
 >>> ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))  

this command uses transformation map simply to assign “1.0” for each node, if you see ranks.collect(), you should get
 >>> ranks.collect()  
 [(u'A', 1.0), (u'C', 1.0), (u'B', 1.0), (u'E', 1.0), (u'D', 1.0), (u'F', 1.0)]  

pay attention to the usage of index [0] and brackets ( ) in the lambda function. If you use [1], this in fact indicates the 2nd element in the tuple. And brackets ( ) is to form a standard key,value pair.


6. Next you see a loop starts with
for iteration in range(int(sys.argv[2])):
this is actually the “10” we input in the example. sys.argv[2] tells the system to grab the 3rd input as a parameter into this program.
In this loop, two things are done:
(1)
(2)


for (1), the script firstly combine links and ranks, by
 >>> links.join(ranks)  

if you use collect() on it, you will see that the (key, value list) is now (key, (value list, 1.0)), “1.0” is appended to the value of each key.
Next, function computeContribs(urls, rank) is called, by
 >>> contribs = links.join(ranks).flatMap(  
 lambda x: computeContribs(x[1][0], x[1][1]))   

to simplify it, we use “x” to replace “url_urls_rank”. In the function computeContribs(), “x” means (key, (value list, 1.0)), therefore x[1][0] is the “value list” and x[1][1] is “1.0”. This function computes how much pagerank score a node receives from its neighbor. If you collect() contribs, you see
 >>> contribs.collect()  
 [(u'D', 1.0), (u'B', 1.0), (u'B', 1.0), (u'C', 0.25), (u'D', 0.25), (u'E', 0.25), (u'F', 0.25), (u'A', 0.5), (u'B', 0.5), (u'B', 1.0)]  


Then (2) is to sum up the scores received by a node from different neighbors, taken into account the damping factor. All of this is realized by
 >>> new_ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)   

I used “new_ranks” instead of rank. The transformation reduceByKey(add) actually sums up the values for one key, e.g when for key u'B', its value equals to 1.0+1.0+0.5+1.0. Transformation mapValues(fun) applies a function on the value of a key. In the end, if we collect() the result, we see
 >>> new_ranks.collect()  
 [(u'A', 0.575), (u'C', 0.3625), (u'E', 0.3625), (u'B', 3.125), (u'D', 1.2125), (u'F', 0.3625)]  

This is the result after one iteration.
In the following iterations, new_ranks will be passed to compute “contribs”, which sill update the “new_ranks” until the numer of iterations is reached.

Voila, this is how pagerank is implemented in pyspark. By explaining this example step by step, I hope that we can get a better understanding how Spark realize this algorithm by applying transformations and actions on an RDD. This process is a bit different from the common way of programming. In my point of view, it is limited because we have to use transformations and actions, which are not so many. But on the other hand, we gain the benefit of parallelism.

2 comments:

  1. I high appreciate this post. It’s hard to find the good from the bad sometimes, but I think you’ve nailed it! would you mind updating your blog with more information?
    check pr

    ReplyDelete
  2. I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Spark Certification

    ReplyDelete