Wednesday, February 24, 2016

How to programe in pyspark on Pycharm locally, and execute the spark job remotely.


I have a Hadoop cluster of 4 worker nodes and 1 master node. Spark distribution (spark-1.4.0-bin-hadoop2.6) in installed on all nodes. I use ssh to connect to the master node from my laptop to execute Hadoop or Spark jobs on the cluster. The problem is that it is troublesome code through terminal, so I need an IDE that allows me to program on my laptop but execute the code on the cluster. This means that I need to access the remote master mode via this IDE. It is possible to use Eclipse for scala, details can be found here. As I use Python and PyCharm much often, in this blog, I describe how to enable this function in PyCharm. There is instructions on the website of PyCharm. But I hope to make things more detailed in this blog.

Steps we need to go through:
1. to enable PyCharm on my laptop to access to the the master node (or the remote server).
3. to add project interpreter
2. to access pyspark in PyCharm

Step 1. The thing is that we want to code locally with a nice interface, and to execute it remotely. This requires that the IDE is able to communicate with the remove server, that is the script files can be synchronized. By PyCharm, this configuration can be done in “Deployment” (Tools → Deployment → Configuration, or File → Settings → Build, Execution, Deployment → Deployment).

There are three tabs in the dialog window of “Deployment”: “Connection”, “Mappings” and “Executed Paths”. What we need to do is to specify the files in tabs “ Connection” and “Mappings”.

As shown in this screenshot, we need to specify the
(1) type of file transfer protocol, information for
(2) Upload/download project files and
(3) Browse files on server.

 First, in the name field, I put "master-minicluster" for the deployment. For the type of file transfer protocol, I chose SFTP (safe file transfer protocol) because my laptop and the master node both can communicate via password-less SSH. I filled the IP address of the master node in “SFTP host”. If you choose SFTP the Port is filled automatically with 22. For “Root path”, I gave the path of the user home on remote master node. “User name” I use on the master node is “hduser”. For security reason, you need to specify password or tell the path to the “private key file”. As for me, SSH has already establised before, I just need to give the path where the private key file id_rsa is saved. For “web server root URL”, just put http://IP_remote_server. If everything is right, you can press the button of “test SFTP connection”, a window should pop up saying that connection is successful.
 
Then for “Mappings” tab, we need to specify the paths of files that need to be synchronizes on the local machine and on the remote machine. I created the python project “test_Spark” locally, a folder with the same name is created on the master node. For the “web path on server”, I just add the IP address of the master node before the full path to “test_Spark” on it.

After the configuration of step 1, my laptop is able to access the master via PyCharm, and python files in the test_Spark folder should be synchronized between my laptop and the master node. But I can't run any program yet, because Python remote interpreter isn't added and pyspark path isn't specified.

Step2. Add project interpreter (ref: here). Python code that is written and debugged locally on my laptop is actually synchronized onto the master node and being executed on the cluster. This means the script is interpreted remotely, thus the path of the remote Python interpreter (for this project) should be specified in PyCharm.

Highlight the project “test_Spark”, and go to “File → Settings”, in the left column find “Project: test_Spark” and “Project Interpreter” under. On the right part, click the gear icon, a small window should pop up with three options: “add local”, “add remote” and “Create VirtualEnv”. Click “Add remote”, a dialog window should pop out shown as the figure below. 
 
In the dialog window, there are three ways to configure the remote python interpreter. As we did the configuration of Deployment, we can just click the third button and select “master-minicluster”. The “Python interpreter path” should be the one on the master node.
After step 2, program synchronized onto the master node should be executed in Python. Next, we need to specify SPARK_HOME and PYTHONPATH, so that we can use Spark and execute a job on the cluster.

Step 3. pyspark in PyCharm. There are two ways to do this, as shown here. The first way is that we specify the SPARK_HOME and append PYTHONPATH each time we call “pyspark”. That means we write the following lines at the beginning of the .py file.
 import os  
 import sys  
 # Path for spark source folder  
 os.environ['SPARK_HOME']="/home/hduser/spark-1.4.0-bin-hadoop2.6"  
 # Append pyspark to Python Path  
 sys.path.append("/home/hduser/spark-1.4.0-bin-hadoop2.6/python")  
 from pyspark import SparkContext  
 from pyspark import SparkConf  

Another way is to add the SPARK_HOME and PYTHONPATH in “Environment variables”. Highlight one .py file in the project folder, then go to “Run → Edit configuration”. In the tab of “Configuration”, find the “Environment”, and click the “...” in the field of “Environment variables”. A dialog window should pop up with only one line “PYTHONUNBUFFERED” by default, we need to click the “+” mark on top right and add SPARK_HOME and PYTHONPATH directing to the corresponding path in the master node, so don't forget “ssh://user_name@IP_remote_node/” before the path. SPARK_HOME is just the folder when you run and configure Spark. For Pyspark, you need go to the “python/lib” folder under SPARK_HOME, where you will find a “py4j-0.X.X.X-src.zip” file, you need to add “ py4j-0.X.X.X-src.zip$PYTHONPATH” in specifying the PYTHONPATH, shown as the screenshot, maybe your versions vary, change it accordingly.

 
After step 3, you should be able to import pyspark in your .py file.

Example
Let's run the word count example to see how everything works. I created a python file named as “WordCount.py” under “test_Spark” projet, shown as the screenshot below. In this file, I specified the SPARK_HOME and PYTHONPATH by adding some lines at the beginning (shown in step 3), so that I can import pyspark.

Next I need to upload WordCount.py onto the “test_Spark” folder on the remote master node. To do this, just right click “WordCount.py” in the project tree, and click “upload to master-minicluster”, then you will see file transfer information appears below. The upload was successful. I visited the “test_Spark” folder on the master node, this WordCount.py appeared. Then you just need to run “WordCount.py” in PyCharm, the job is executed on the remote cluster. You will see a progress bar appears under you code in PyCharm. In this example, we won't see the results in PyCharm, but we can use SSH to access the master and run “hadoop fs -cat” command to see the results via terminal.

If you modify WordCount.py, just ctrl+s, all changes will be synchronized with the WordCount.py on the remote machine.


Code for word count:

 import os  
 import sys  
 # Path for spark source folder  
 os.environ['SPARK_HOME']="/home/hduser/spark-1.4.0-bin-hadoop2.6"  
 # Append pyspark to Python Path  
 sys.path.append("/home/hduser/spark-1.4.0-bin-hadoop2.6/python")  
 from pyspark import SparkContext  
 from pyspark import SparkConf  
 sc = SparkContext(master="spark://IP_address_of_master_node:7077", appName="WordCount")  
 text_file = sc.textFile("hdfs://master:9000/user/xywang/textsForWordCount")  
 counts = text_file.flatMap(lambda line: line.split(" ")) \  
        .map(lambda word: (word, 1)) \  
        .reduceByKey(lambda a, b: a + b)  
 counts.saveAsTextFile("hdfs://master:9000/user/output/SparkWordCount/SparkResult6")  
 sc.stop()  

PS: To run this job, you need to have Hadoop and Spark turned on. And If you want to try the example, make sure to change your paths accordingly.

Troubleshooting: if error returns saying that "transfer failed... permission denied", you need to assign permission to the files in test_Spark folder on the remote server. For Ubuntu, simply type "$ sudo chmod 777 filename" in the terminal, thus you don't need to sudo when edit this file, do so to make the file synchronization work via ssh.


10 comments:

  1. Thank you Xinyu Wang; It worked :)

    ReplyDelete
  2. Thank you for this helpful example.

    It seems to me that you are executing the spark job on the remote master node in local mode. That is, because you are using a python interpreter (/usr/bin/python) and not a pyspark interpreter (such as '/usr/lib/spark/bin/pyspark' for spark < 2.0 or '/usr/lib/spark/bin/spark-submit' for spark >= 2.0) the spark job will only run on the master node without distributing tasks to, or even awareness of, the worker nodes.

    Have you had any luck running spark jobs with `spark-submit` via a PyCharm remote interpeter? I've been scowering the internet and have largely come up dry.

    I started a thread in the PyCharm community about it here: https://intellij-support.jetbrains.com/hc/en-us/community/posts/360000044644-PyCharm-remove-python-u-option-with-remote-interpreter?page=1#community_comment_360000038310

    ReplyDelete
    Replies
    1. Thank you very much for that comment, that is exactly what I am trying to figure out. I have emailed the jetbrains support and all they did was sending me a link to that post. My solution so far is to switch to Atom, there you can execute your code line by line in a terminal in which you run pyspark2 to develop your code. To use spark-submit you can set up atom to synchronize your local working directory to a folder on your master node. You can then pass those to spark-submit.

      Delete
  3. Excellent tutorial. I'm new to all of this. I was wondering if I can run this on spark which is in an EMR cluster? I think what would change is that I would have to give public IP of the master node of EMR in AWS.

    ReplyDelete
  4. Really nice blog post.provided a helpful information.I hope that you will post more updates like thisBig data hadoop online Course Hyderabad


    ReplyDelete
  5. Good one, able to follow your steps and succesfully connected to Spark 2.2 on EMR from my Mac. Thank you.

    ReplyDelete
  6. im trying to use Spark SQL. I dont get to get it work. My Spark ware house directory is empty. I am unable to lookup hive metastore using this method. Please suggest.

    ReplyDelete
  7. im trying to use Spark SQL. I dont get to get it work. My Spark ware house directory is empty. I am unable to lookup hive metastore using this method. Please suggest.

    ReplyDelete
  8. I constantly like to read a top quality content having accurate info pertaining to the subject and the exact same thing I found in this article. Nice job.
    Web hosting services USA

    ReplyDelete