Friday, February 26, 2016

How to run K-means clustering on iris dataset using pyspark on a Hadoop cluster through PyCharm and through Ubuntu terminal

I admit that the title is a bit long, but it well summarizes the content of this blog.

I have a cluster of five nodes, 1 master + 4 workers, OS of Ubuntu 14.04. Hadoop 2.6 and Spark-1.4.0 are installed on all nodes and properly configured. Python 2.7 and important packages are also installed on all nodes (instruction of installing necessary packages on Ubuntu, see here).

I access to the master node from my laptop via the Python IDE, PyCharm. In my previous blog, I described how to enable PyCharm to execute a .py program remotely on the master, see here). In this blog, let's run K-means clustering on iris dataset using pyspark on hdfs. We will go through:

(1) prepare dataset
(2) load dataset onto hdfs
(3) configure Kmeans.y, run and monitor on Spark Web UI

Here we go!

(1) Iris dataset is composed of 150 examples from 3 classes, described by 4 attributes. More details on this dataset can be found on UCI dataset repository. For my case, I need to trim the label column, and to keep only the data of four attributes. Ideally, the figure below is what I need as input:
To do this, I made a small .py program, which does what I needed.
 import numpy as np  
 from sklearn import datasets  
 iris = datasets.load_iris()  
 data = iris.data   
 dt = data.tolist()  
 import os.path  
 save_path = '/home/hduser/Documents/test_Spark'  
 completeName = os.path.join(save_path, "iris.txt")       
 file = open(completeName, "w")  
 for i in range(150):  
      file1.write(str(dt[i][0])+' '+str(dt[i][1])+' '+str(dt[i][2])+' '+str(dt[i][3])+'\n')  
 file.close()  
I used the python package "sklearn" to load the dataset. You can download the dataset from UCI repository and process it in many different ways.

I saved the dataset on master node, in the folder "test_Spark", which is synchronized with the "test_Spark" folder on my laptop.

(2) Next, let's copy the dataset "iris.txt" to hdfs. To do so, you have to turn on hdfs and make sure Hadoop works properly on the cluster. Hadoop file system shell can help us to make this work, simply type:

 $ hadoop fs -copyFromLocal ~/Documents/test_Spark/iris.txt /user/xwang  

/user/xywang is an existing folder on my hdfs. Change it to yours accordingly. To create a folder on hdfs, use
 $ hadoop fs -mkdir folder_name  
to view a folder content on hdfs, use
 $ hadoop fs -ls folder_name/*
to view a file on hdfs, use
 $ hadoop fs -cat path/file_name  
everything is just like using terminal commands on Ubuntu, just you need to add "hadoop fs" before.

By now, we should see "iris.txt" appears in the folder of "xywang" on my hdfs.

(3) If you have the same distribution of Spark as me, you can find a kmeans.py located in the example folder of Spark. Full path on my master node is:
 spark-1.4.0-bin-hadoop2.6/examples/src/main/python/mllib  

The content of kmeans.py file is:
 from __future__ import print_function  
 import sys  
 import numpy as np  
 from pyspark import SparkContext  
 from pyspark.mllib.clustering import KMeans  
 def parseVector(line):  
   return np.array([float(x) for x in line.split(' ')])  
 if __name__ == "__main__":  
   if len(sys.argv) != 3:  
     print("Usage: kmeans <file> <k>", file=sys.stderr)  
     exit(-1)  
   sc = SparkContext(appName="KMeans")  
   lines = sc.textFile(sys.argv[1])  
   data = lines.map(parseVector)  
   k = int(sys.argv[2])  
   model = KMeans.train(data, k)  
   print("Final centers: " + str(model.clusterCenters))  
   print("Total Cost: " + str(model.computeCost(data)))  
   sc.stop()  

Two ways to execute this file: 1. through Ubuntu terminal or 2. through PyCharm

For 1. As I access to master via ssh, I first need to enable the communication between my laptop and the master node. Here it is more convenient if you can make it password-less. After connecting to the master node, on terminal, type:
 python ~/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/mllib/kmeans.py "hdfs://master:9000/user/xywang/iris.txt" "3"  
or go to the "bin" folder of your Spark, and type
 ./pyspark ~/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/mllib/kmeans.py "hdfs://master:9000/user/xywang/iris.txt" "3"  

if everything is fine, you should see results like:
 Final centers: [array([ 5.006, 3.418, 1.464, 0.244]), array([ 6.85, 3.07368421, 5.74210526, 2.07105263]), array([ 5.9016129 , 2.7483871 , 4.39354839, 1.43387097])]  
 Total Cost: 78.9408414261    

But on Spark web UI, you won't see any job info, because this job is actually executed only on the master node. This trick is that, we need to add a parameter in "SparkContext()" to tell it that we want this job to be executed on the cluster:
sc = SparkContext(master="spark://IP_of_master:7077", appName="KMeans")  

Now run the command, you should see "Kmeans" in the "Running Applications" on Spark web UI when the job is running, and in "Completed Applications" when it is done. 

For 2 to run it via PyCharm. I created a new python file, named as "Kmeans.py"  in the project "test_Spark" on my laptop. To distinguish the "kmeans.py" in Spark example folder, I used capital letter "K".  I copied the code of "kmeans.py" to Kmeans.py, and modified "SparkContext" as said previously.

Next I need to sent a copy of this file to folder "test_Spark" on master, otherwise, an error will occur, saying that "python: can't open file 'Kmeans.py': [Errno 2] No such file or directory". Simply, right click Kmeans.py in PyCharm, and chose "upload to mini-cluster".

Then in the "terminal" tab of PyCharm, ssh to master node, and type the command to run Kmeans.py:
 $ python ~/Documents/test_Spark/Kmeans.py "hdfs://master:9000/user/xywang/iris.txt"  

Here is the screenshot, you can see the code, the command and the result in the terminal tab:
I use "appName="Kmeans1", after the job is done, it appeared in the "Complete Application" on my Spark Web UI shown as the previous screenshot.

Troubleshooting:
- the paths of $SPARK_HOME and PYTHONPATH should be changed accordingly, otherwise error of "no module of pyspark" will occur.
- Numpy should be installed in all nodes (master+workers).
- omitting master="spark://IP_of_master_node: 7077" will only run the job on master but not on cluster



Wednesday, February 24, 2016

How to programe in pyspark on Pycharm locally, and execute the spark job remotely.


I have a Hadoop cluster of 4 worker nodes and 1 master node. Spark distribution (spark-1.4.0-bin-hadoop2.6) in installed on all nodes. I use ssh to connect to the master node from my laptop to execute Hadoop or Spark jobs on the cluster. The problem is that it is troublesome code through terminal, so I need an IDE that allows me to program on my laptop but execute the code on the cluster. This means that I need to access the remote master mode via this IDE. It is possible to use Eclipse for scala, details can be found here. As I use Python and PyCharm much often, in this blog, I describe how to enable this function in PyCharm. There is instructions on the website of PyCharm. But I hope to make things more detailed in this blog.

Steps we need to go through:
1. to enable PyCharm on my laptop to access to the the master node (or the remote server).
3. to add project interpreter
2. to access pyspark in PyCharm

Step 1. The thing is that we want to code locally with a nice interface, and to execute it remotely. This requires that the IDE is able to communicate with the remove server, that is the script files can be synchronized. By PyCharm, this configuration can be done in “Deployment” (Tools → Deployment → Configuration, or File → Settings → Build, Execution, Deployment → Deployment).

There are three tabs in the dialog window of “Deployment”: “Connection”, “Mappings” and “Executed Paths”. What we need to do is to specify the files in tabs “ Connection” and “Mappings”.

As shown in this screenshot, we need to specify the
(1) type of file transfer protocol, information for
(2) Upload/download project files and
(3) Browse files on server.

 First, in the name field, I put "master-minicluster" for the deployment. For the type of file transfer protocol, I chose SFTP (safe file transfer protocol) because my laptop and the master node both can communicate via password-less SSH. I filled the IP address of the master node in “SFTP host”. If you choose SFTP the Port is filled automatically with 22. For “Root path”, I gave the path of the user home on remote master node. “User name” I use on the master node is “hduser”. For security reason, you need to specify password or tell the path to the “private key file”. As for me, SSH has already establised before, I just need to give the path where the private key file id_rsa is saved. For “web server root URL”, just put http://IP_remote_server. If everything is right, you can press the button of “test SFTP connection”, a window should pop up saying that connection is successful.
 
Then for “Mappings” tab, we need to specify the paths of files that need to be synchronizes on the local machine and on the remote machine. I created the python project “test_Spark” locally, a folder with the same name is created on the master node. For the “web path on server”, I just add the IP address of the master node before the full path to “test_Spark” on it.

After the configuration of step 1, my laptop is able to access the master via PyCharm, and python files in the test_Spark folder should be synchronized between my laptop and the master node. But I can't run any program yet, because Python remote interpreter isn't added and pyspark path isn't specified.

Step2. Add project interpreter (ref: here). Python code that is written and debugged locally on my laptop is actually synchronized onto the master node and being executed on the cluster. This means the script is interpreted remotely, thus the path of the remote Python interpreter (for this project) should be specified in PyCharm.

Highlight the project “test_Spark”, and go to “File → Settings”, in the left column find “Project: test_Spark” and “Project Interpreter” under. On the right part, click the gear icon, a small window should pop up with three options: “add local”, “add remote” and “Create VirtualEnv”. Click “Add remote”, a dialog window should pop out shown as the figure below. 
 
In the dialog window, there are three ways to configure the remote python interpreter. As we did the configuration of Deployment, we can just click the third button and select “master-minicluster”. The “Python interpreter path” should be the one on the master node.
After step 2, program synchronized onto the master node should be executed in Python. Next, we need to specify SPARK_HOME and PYTHONPATH, so that we can use Spark and execute a job on the cluster.

Step 3. pyspark in PyCharm. There are two ways to do this, as shown here. The first way is that we specify the SPARK_HOME and append PYTHONPATH each time we call “pyspark”. That means we write the following lines at the beginning of the .py file.
 import os  
 import sys  
 # Path for spark source folder  
 os.environ['SPARK_HOME']="/home/hduser/spark-1.4.0-bin-hadoop2.6"  
 # Append pyspark to Python Path  
 sys.path.append("/home/hduser/spark-1.4.0-bin-hadoop2.6/python")  
 from pyspark import SparkContext  
 from pyspark import SparkConf  

Another way is to add the SPARK_HOME and PYTHONPATH in “Environment variables”. Highlight one .py file in the project folder, then go to “Run → Edit configuration”. In the tab of “Configuration”, find the “Environment”, and click the “...” in the field of “Environment variables”. A dialog window should pop up with only one line “PYTHONUNBUFFERED” by default, we need to click the “+” mark on top right and add SPARK_HOME and PYTHONPATH directing to the corresponding path in the master node, so don't forget “ssh://user_name@IP_remote_node/” before the path. SPARK_HOME is just the folder when you run and configure Spark. For Pyspark, you need go to the “python/lib” folder under SPARK_HOME, where you will find a “py4j-0.X.X.X-src.zip” file, you need to add “ py4j-0.X.X.X-src.zip$PYTHONPATH” in specifying the PYTHONPATH, shown as the screenshot, maybe your versions vary, change it accordingly.

 
After step 3, you should be able to import pyspark in your .py file.

Example
Let's run the word count example to see how everything works. I created a python file named as “WordCount.py” under “test_Spark” projet, shown as the screenshot below. In this file, I specified the SPARK_HOME and PYTHONPATH by adding some lines at the beginning (shown in step 3), so that I can import pyspark.

Next I need to upload WordCount.py onto the “test_Spark” folder on the remote master node. To do this, just right click “WordCount.py” in the project tree, and click “upload to master-minicluster”, then you will see file transfer information appears below. The upload was successful. I visited the “test_Spark” folder on the master node, this WordCount.py appeared. Then you just need to run “WordCount.py” in PyCharm, the job is executed on the remote cluster. You will see a progress bar appears under you code in PyCharm. In this example, we won't see the results in PyCharm, but we can use SSH to access the master and run “hadoop fs -cat” command to see the results via terminal.

If you modify WordCount.py, just ctrl+s, all changes will be synchronized with the WordCount.py on the remote machine.


Code for word count:

 import os  
 import sys  
 # Path for spark source folder  
 os.environ['SPARK_HOME']="/home/hduser/spark-1.4.0-bin-hadoop2.6"  
 # Append pyspark to Python Path  
 sys.path.append("/home/hduser/spark-1.4.0-bin-hadoop2.6/python")  
 from pyspark import SparkContext  
 from pyspark import SparkConf  
 sc = SparkContext(master="spark://IP_address_of_master_node:7077", appName="WordCount")  
 text_file = sc.textFile("hdfs://master:9000/user/xywang/textsForWordCount")  
 counts = text_file.flatMap(lambda line: line.split(" ")) \  
        .map(lambda word: (word, 1)) \  
        .reduceByKey(lambda a, b: a + b)  
 counts.saveAsTextFile("hdfs://master:9000/user/output/SparkWordCount/SparkResult6")  
 sc.stop()  

PS: To run this job, you need to have Hadoop and Spark turned on. And If you want to try the example, make sure to change your paths accordingly.

Troubleshooting: if error returns saying that "transfer failed... permission denied", you need to assign permission to the files in test_Spark folder on the remote server. For Ubuntu, simply type "$ sudo chmod 777 filename" in the terminal, thus you don't need to sudo when edit this file, do so to make the file synchronization work via ssh.


An example of SparkR for beginners using UK postcodes datasets


Maybe you are like me, trying to figure out how sparkR works, maybe you find out there aren't many examples for beginners, and maybe you find some examples but they don't work due to lack of dataset or some other reasons, you might want to give up because you are in short of time or get frustrated... Never mind, probably you can find something simple and useful for your in this post.

Here I detail an existing example, created by Erik Smit. I tried this example, but unfortunately there is no dataset attached. As my hardware and configuration also differ from those of this example, I spent some time in figuring out how this example can work for me and finally ran it successfully on a cluster of commodity machines.

Here is some info on my hardware and softwares:
- Cluster with 1 master node and 4 worker nodes. Each node has 8Gb memory and 4 cores.
- Ubuntu 14.04, Hadoop 2.7.0, Spark 1.4.0 and R are installed in all nodes.

PS: I connected to the master node by ssh from my personal computer, thus I can't use RStudio, as it is a GUI. I ran the script directly through command line.

After turning on Hadoop dfs.sh, yarn.sh and Spark, I can monitor the status of all worker nodes from the web interface of Spark:











Prepare datasets:

I used this UK postcodes dataset, organized by Christ Bell. I downloaded the datasets in the "by country" category onto folder "UKpostcodes" in HOME of the master node, by command:
 $ sudo wget www.doogal.co.uk/UKPostcodesCSV.php?country=England -O England.csv  

You need to change England to Scotland, Wales and Northern Ireland after "=" and the in the name of .csv for each download.

The size of the 4 .csv files is 619MB.

Next step is to transfer all files downloaded onto HDFS. Before this, we create a folder called "postcode" on HDFS by:
 $ hadoop fs -mkidr /postcode  

then transfer the .csv files from local to hdfs:
 $ hadoop fs -copyFromLocal ~/UKpostcodes/* /postcode  

if all your paths are passed correctly in the previous command, hopefully you can see the 4 .csv files under /postcode, by
 $ hadoop fs -ls /postcode  

Prepare R script:

1.  Specify SPARK_HOME path in Sys.setenv() in your .r script:
 Sys.setenv(SPARK_HOME = "/home/hduser/spark-1.4.0-bin-hadoop2.6")   

PS: you can use command "$ pwd" to find the path of current location
Note: use /home/hduser instead of ~ if your spark folder is in HOME, otherwise sparkR won't work

2.  Specify a spark-csv package that can read a CSV file
 Sys.setenv(SPARKR_SUBMIT_ARGS="--packages com.databricks:spark-csv_2.10:1.0.3 sparkr-shell")  

3. Specify the path of SparkR library
 .libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))  

4. load SparkR library
 library(SparkR)  
The above 4 lines will be executed silently, the console won't return any errors even there is any error, until when you specify a spark context, like

5. Create two contexts; the Spark context and the SparkRSQL context:
 sc <- sparkR.init(master = "spark://master_IP:7077", appName="Name")   
 sqlContext <- sparkRSQL.init(sc)  

You need to replace the master_IP of yours in the above, and give a name inside the quote for your appName, this will appear in the Spark Web interface when the job is executed.

If you get an error saying that "spark-submit" not found, this probably means that the SPARK_HOME path isn't correct.

6. Read datasets from Hadoop hdfs
 postcodes <- read.df(sqlContext, "hdfs://master_IP:9000/postcode/*", source = "com.databricks.spark.csv")  

The same as above, you have to replace master_IP with the IP address of your master node. port:9000 allows to access hdfs and "postcode" is the hdfs folder that we created before. "*" means that take all the files. If you want to work on a single file, just replace "*" with the file's full name.

If you simply copy-paste the lines in blue above to your "postcode.r" R script, with limited editing (like change master_IP). You can run from your console by:
 $ Rscript postcode.r  

If no error occurs, congratulations! It works! Then you can start query your datasets:
 head(postcodes)  

It returns all the column names and a few entries of your datasets(everything on hdfs). There are 25 column names, indexed from C0 to C24.

  count(postcodes)  
This command counts the total number of records. For me I got 2551963 records.

 head(summarize(groupBy(postcodes, postcodes$C11), count = n(postcodes$C11)))  
This command counts how many records for each country, and display the result. That looks good.

 registerTempTable(postcodes, "pc")  
This command registers postcodes as a table named as "pc".

Then we can pass sql queries onto the dataset hosted on hdfs, such as:
 distRegion_intro <- (sql(sqlContext, "SELECT p1.C7, p1.C24 FROM pc p1, pc p2 WHERE p1.C14 < p2.C14"))  
 head(distRegion_intro)  
this query return the records whose introduced date (C14) is older. It has two columns: the name of District (C7) and the name of Region (C24).

Last, remember to stop current spark context, otherwise an error will occur when you run the following spark job.
 sparkR.stop()  

If everything works well, from spark web interface, you
can see the status of computation.



So here I detailed of an existing example on SparkR. I hope this would help beginners like me go on a smoother path of SparkR.

*Credit to the posts of Erik Smit and the dataset of Christ Bell.