Wednesday, February 24, 2016

An example of SparkR for beginners using UK postcodes datasets


Maybe you are like me, trying to figure out how sparkR works, maybe you find out there aren't many examples for beginners, and maybe you find some examples but they don't work due to lack of dataset or some other reasons, you might want to give up because you are in short of time or get frustrated... Never mind, probably you can find something simple and useful for your in this post.

Here I detail an existing example, created by Erik Smit. I tried this example, but unfortunately there is no dataset attached. As my hardware and configuration also differ from those of this example, I spent some time in figuring out how this example can work for me and finally ran it successfully on a cluster of commodity machines.

Here is some info on my hardware and softwares:
- Cluster with 1 master node and 4 worker nodes. Each node has 8Gb memory and 4 cores.
- Ubuntu 14.04, Hadoop 2.7.0, Spark 1.4.0 and R are installed in all nodes.

PS: I connected to the master node by ssh from my personal computer, thus I can't use RStudio, as it is a GUI. I ran the script directly through command line.

After turning on Hadoop dfs.sh, yarn.sh and Spark, I can monitor the status of all worker nodes from the web interface of Spark:











Prepare datasets:

I used this UK postcodes dataset, organized by Christ Bell. I downloaded the datasets in the "by country" category onto folder "UKpostcodes" in HOME of the master node, by command:
 $ sudo wget www.doogal.co.uk/UKPostcodesCSV.php?country=England -O England.csv  

You need to change England to Scotland, Wales and Northern Ireland after "=" and the in the name of .csv for each download.

The size of the 4 .csv files is 619MB.

Next step is to transfer all files downloaded onto HDFS. Before this, we create a folder called "postcode" on HDFS by:
 $ hadoop fs -mkidr /postcode  

then transfer the .csv files from local to hdfs:
 $ hadoop fs -copyFromLocal ~/UKpostcodes/* /postcode  

if all your paths are passed correctly in the previous command, hopefully you can see the 4 .csv files under /postcode, by
 $ hadoop fs -ls /postcode  

Prepare R script:

1.  Specify SPARK_HOME path in Sys.setenv() in your .r script:
 Sys.setenv(SPARK_HOME = "/home/hduser/spark-1.4.0-bin-hadoop2.6")   

PS: you can use command "$ pwd" to find the path of current location
Note: use /home/hduser instead of ~ if your spark folder is in HOME, otherwise sparkR won't work

2.  Specify a spark-csv package that can read a CSV file
 Sys.setenv(SPARKR_SUBMIT_ARGS="--packages com.databricks:spark-csv_2.10:1.0.3 sparkr-shell")  

3. Specify the path of SparkR library
 .libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))  

4. load SparkR library
 library(SparkR)  
The above 4 lines will be executed silently, the console won't return any errors even there is any error, until when you specify a spark context, like

5. Create two contexts; the Spark context and the SparkRSQL context:
 sc <- sparkR.init(master = "spark://master_IP:7077", appName="Name")   
 sqlContext <- sparkRSQL.init(sc)  

You need to replace the master_IP of yours in the above, and give a name inside the quote for your appName, this will appear in the Spark Web interface when the job is executed.

If you get an error saying that "spark-submit" not found, this probably means that the SPARK_HOME path isn't correct.

6. Read datasets from Hadoop hdfs
 postcodes <- read.df(sqlContext, "hdfs://master_IP:9000/postcode/*", source = "com.databricks.spark.csv")  

The same as above, you have to replace master_IP with the IP address of your master node. port:9000 allows to access hdfs and "postcode" is the hdfs folder that we created before. "*" means that take all the files. If you want to work on a single file, just replace "*" with the file's full name.

If you simply copy-paste the lines in blue above to your "postcode.r" R script, with limited editing (like change master_IP). You can run from your console by:
 $ Rscript postcode.r  

If no error occurs, congratulations! It works! Then you can start query your datasets:
 head(postcodes)  

It returns all the column names and a few entries of your datasets(everything on hdfs). There are 25 column names, indexed from C0 to C24.

  count(postcodes)  
This command counts the total number of records. For me I got 2551963 records.

 head(summarize(groupBy(postcodes, postcodes$C11), count = n(postcodes$C11)))  
This command counts how many records for each country, and display the result. That looks good.

 registerTempTable(postcodes, "pc")  
This command registers postcodes as a table named as "pc".

Then we can pass sql queries onto the dataset hosted on hdfs, such as:
 distRegion_intro <- (sql(sqlContext, "SELECT p1.C7, p1.C24 FROM pc p1, pc p2 WHERE p1.C14 < p2.C14"))  
 head(distRegion_intro)  
this query return the records whose introduced date (C14) is older. It has two columns: the name of District (C7) and the name of Region (C24).

Last, remember to stop current spark context, otherwise an error will occur when you run the following spark job.
 sparkR.stop()  

If everything works well, from spark web interface, you
can see the status of computation.



So here I detailed of an existing example on SparkR. I hope this would help beginners like me go on a smoother path of SparkR.

*Credit to the posts of Erik Smit and the dataset of Christ Bell.


No comments:

Post a Comment