I have a cluster of five nodes, 1 master + 4 workers, OS of Ubuntu 14.04. Hadoop 2.6 and Spark-1.4.0 are installed on all nodes and properly configured. Python 2.7 and important packages are also installed on all nodes (instruction of installing necessary packages on Ubuntu, see here).
I access to the master node from my laptop via the Python IDE, PyCharm. In my previous blog, I described how to enable PyCharm to execute a .py program remotely on the master, see here). In this blog, let's run K-means clustering on iris dataset using pyspark on hdfs. We will go through:
(1) prepare dataset
(2) load dataset onto hdfs
(3) configure Kmeans.y, run and monitor on Spark Web UI
Here we go!
(1) Iris dataset is composed of 150 examples from 3 classes, described by 4 attributes. More details on this dataset can be found on UCI dataset repository. For my case, I need to trim the label column, and to keep only the data of four attributes. Ideally, the figure below is what I need as input:
To do this, I made a small .py program, which does what I needed.
import numpy as np
from sklearn import datasets
iris = datasets.load_iris()
data = iris.data
dt = data.tolist()
import os.path
save_path = '/home/hduser/Documents/test_Spark'
completeName = os.path.join(save_path, "iris.txt")
file = open(completeName, "w")
for i in range(150):
file1.write(str(dt[i][0])+' '+str(dt[i][1])+' '+str(dt[i][2])+' '+str(dt[i][3])+'\n')
file.close()
I used the python package "sklearn" to load the dataset. You can download the dataset from UCI repository and process it in many different ways.I saved the dataset on master node, in the folder "test_Spark", which is synchronized with the "test_Spark" folder on my laptop.
(2) Next, let's copy the dataset "iris.txt" to hdfs. To do so, you have to turn on hdfs and make sure Hadoop works properly on the cluster. Hadoop file system shell can help us to make this work, simply type:
$ hadoop fs -copyFromLocal ~/Documents/test_Spark/iris.txt /user/xwang
/user/xywang is an existing folder on my hdfs. Change it to yours accordingly. To create a folder on hdfs, use
$ hadoop fs -mkdir folder_name
to view a folder content on hdfs, use $ hadoop fs -ls folder_name/*
to view a file on hdfs, use $ hadoop fs -cat path/file_name
everything is just like using terminal commands on Ubuntu, just you need to add "hadoop fs" before.By now, we should see "iris.txt" appears in the folder of "xywang" on my hdfs.
(3) If you have the same distribution of Spark as me, you can find a kmeans.py located in the example folder of Spark. Full path on my master node is:
spark-1.4.0-bin-hadoop2.6/examples/src/main/python/mllib
The content of kmeans.py file is:
from __future__ import print_function
import sys
import numpy as np
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans
def parseVector(line):
return np.array([float(x) for x in line.split(' ')])
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kmeans <file> <k>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="KMeans")
lines = sc.textFile(sys.argv[1])
data = lines.map(parseVector)
k = int(sys.argv[2])
model = KMeans.train(data, k)
print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))
sc.stop()
Two ways to execute this file: 1. through Ubuntu terminal or 2. through PyCharm
For 1. As I access to master via ssh, I first need to enable the communication between my laptop and the master node. Here it is more convenient if you can make it password-less. After connecting to the master node, on terminal, type:
python ~/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/mllib/kmeans.py "hdfs://master:9000/user/xywang/iris.txt" "3"
or go to the "bin" folder of your Spark, and type ./pyspark ~/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/mllib/kmeans.py "hdfs://master:9000/user/xywang/iris.txt" "3"
if everything is fine, you should see results like:
Final centers: [array([ 5.006, 3.418, 1.464, 0.244]), array([ 6.85, 3.07368421, 5.74210526, 2.07105263]), array([ 5.9016129 , 2.7483871 , 4.39354839, 1.43387097])]
Total Cost: 78.9408414261
But on Spark web UI, you won't see any job info, because this job is actually executed only on the master node. This trick is that, we need to add a parameter in "SparkContext()" to tell it that we want this job to be executed on the cluster:
sc = SparkContext(
master="spark://IP_of_master:7077", appName="KMeans")
Now run the command, you should see "Kmeans" in the "Running Applications" on Spark web UI when the job is running, and in "Completed Applications" when it is done.
For 2 to run it via PyCharm. I created a new python file, named as "Kmeans.py" in the project "test_Spark" on my laptop. To distinguish the "kmeans.py" in Spark example folder, I used capital letter "K". I copied the code of "kmeans.py" to Kmeans.py, and modified "SparkContext" as said previously.
Next I need to sent a copy of this file to folder "test_Spark" on master, otherwise, an error will occur, saying that "python: can't open file 'Kmeans.py': [Errno 2] No such file or directory". Simply, right click Kmeans.py in PyCharm, and chose "upload to mini-cluster".
Then in the "terminal" tab of PyCharm, ssh to master node, and type the command to run Kmeans.py:
$ python ~/Documents/test_Spark/Kmeans.py "hdfs://master:9000/user/xywang/iris.txt"
Here is the screenshot, you can see the code, the command and the result in the terminal tab:
I use "appName="Kmeans1", after the job is done, it appeared in the "Complete Application" on my Spark Web UI shown as the previous screenshot.
Troubleshooting:
- the paths of $SPARK_HOME and PYTHONPATH should be changed accordingly, otherwise error of "no module of pyspark" will occur.
- Numpy should be installed in all nodes (master+workers).
- omitting master="spark://IP_of_master_node: 7077" will only run the job on master but not on cluster