How To Write Spark Applications in Python

by Shahid Ashraf

MapReduce is a programming model and an associated implementation tool for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Google presented this in a paper in 2004 and now in the world of big data, processing petabytes of data seem easy. Later in 2010, another paper by Matei Zaharia added Spark. Thus, Spark came into existence and became Apache’s top-level project in 2014.

Screen Shot 2015-07-03 at 2.32.17 PM

What is Apache Spark ?

Spark is a distributed memory computational framework.

It aims to provide a single platform that can be used for real-time applications (streaming), complex analysis (machine learning), interactive queries(shell), and batch processing (Hadoop integration).

  • It has specialized modules that run on top of Spark – SQL , Streaming, Graph Processing(GraphX), Machine Learning(MLLIb).
  • Spark introduces an abstract common data format that is used for efficient data sharing across parallel computation – RDD.
  • Spark supports Map/Reduce programming model. (Note: Not same as Hadoop MR).

Spark Application Building Blocks 

Spark Context

First thing that a Spark program does is create a SparkContext object, which tells Spark how to access a cluster.

Spark Master

  • Connects to a cluster manager which allocates resources across applications.
  • Acquires executors on cluster nodes – worker processes to run computations and store data.
  • Sends app code to the executors.
  • Sends tasks for the executors to run.

RDD

Resilient Distributed Datasets (RDD) are the primary abstraction in Spark – a fault-tolerant collection of elements that can be operated on in parallel.

There are currently two types:

  • Parallelized collections – takes an existing Scala collection and runs functions on it in parallel.
  • Hadoop datasets – runs functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop.

Two types of operations can be performed on RDDs – transformations and actions.

  • Transformations are lazy (not computed immediately).
  • The transformed RDD gets recomputed when an action is run on it.
  • However, an RDD can be persisted into storage in memory or disk.Screen Shot 2015-07-03 at 3.34.57 PM

Installing Spark

Spark is very easy to set up and run. Assuming that you have Java and Python installed we can download pre-built spark packages from https://spark.apache.org/downloads.html.

$ wget http://supergsego.com/apache/spark/spark-1.4.1/spark-1.4.1-bin-hadoop2.4.tgz 

$tar xvf spark-1.4.1-bin-hadoop2.4.tgz

$mv spark-1.4.1-bin-hadoop2.4 /usr/local/spark

Edit your BASH file to add Spark to your PATH and to set the SPARK_HOME environment variable. These help in using spark via bash shell/terminal.

export SPARK_HOME=/usr/local/spark

export PATH=$SPARK_HOME/bin:$PATH

Now we are ready to run our Spark shell called pyspark (python interpreter with Spark API access).

$pyspark

Screen Shot 2015-07-16 at 4.51.19 PM

 

Single node stand alone Spark cluster installation is now ready to use. In the above shell, we can perform or execute Spark API as well as python code. But the workflow we follow is limited to the application architecture of Spark, which usually includes manipulating the RDD (transformations and actions).

Building the Application

Let us start with creating a driver program, in which we will first define Spark context.

  • Create a RDD from file (can be on local , hdfs or data on Cassandra, hbase) or create another RDD by transforming an existing RDD.
  • Invoke closures (functions) on each element of the RDD.
  • Perform actions on RDDs, which will kick off the computation on cluster.

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. Also, when Spark runs a closure on a worker, Spark operations work on RDDs containing any type of objects. A few special operations are available only on RDDs of key-value pairs. Visit links for complete list of transformations and actions.

Screen Shot 2015-07-03 at 3.39.40 PM

Spark applications run as independent set of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program). Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own stand alone cluster manager or Mesos/YARN), which allocates resources across applications. After connecting to the cluster, application code and libraries specified are passed to executors and finally, SparkContext assigns tasks to executors.

Spark Application Template

## Imports

from pyspark import SparkConf, SparkContext

## CONSTANTS

APP_NAME = "My Spark Application"

##OTHER FUNCTIONS/CLASSES

## Main functionality

def main(sc):

rdd = sc.parallelize(range(1000), 10)

print rdd.mean()

if __name__ == "__main__":
    # Configure OPTIONS
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local[*]")
    #in cluster this will be like
    #"spark://ec2-0-17-03-078.compute-#1.amazonaws.com:7077"
    sc   = SparkContext(conf=conf)
    # Execute Main functionality
    main(sc)

Let’s write an application for word count following the above template.

HelloWorld_big_data.py

"""Calculates the word count of the given file.

the file can be local or if you setup cluster.

It can be hdfs file path"""

## Imports

from pyspark import SparkConf, SparkContext

from operator import add
import sys
## Constants
APP_NAME = " HelloWorld of Big Data"
##OTHER FUNCTIONS/CLASSES

def main(sc,filename):
   textRDD = sc.textFile(filename)
   words = textRDD.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1))
   wordcount = words.reduceByKey(add).collect()
   for wc in wordcount:
      print wc[0],wc[1]

if __name__ == "__main__":

   # Configure Spark
   conf = SparkConf().setAppName(APP_NAME)
   conf = conf.setMaster("local[*]")
   sc   = SparkContext(conf=conf)
   filename = sys.argv[1]
   # Execute Main functionality
   main(sc, filename)

In order to run the above application we need to run following commands. This will submit the application to cluster for execution.

$spark-submit hello.py abctext.txt

output

Spark 8 a 6 is 4 and 4

to 4 distributed 3 cluster 3

storage 2 the 2 for 2

machine 2 Apache 2 supports 2

Some of the commonly used options for spark-submit are:

  • –class: The entry point for your application (e.g.apache.spark.examples.SparkPi)
  • –master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
  • –deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default:client)*
  • –conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, anhdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any
  • –py-files option can be used to distribute .egg, .zip and .py libraries to executors.

When using Spark-submit, the application jar along with any jars included with the –jars option will be automatically transferred to the cluster and copied to the working directory for each SparkContext on the executor nodes.

Enough of WordCount

Here is how we can write an application to calculate the results of following query in Spark application (and not with SparkSQL).

SELECT SUM(salary) FROM employees WHERE salary > 1000 GROUP by deptname
from pyspark import SparkConf, SparkContext
from operator import add
import sys

## Constants

APP_NAME = " HelloWorld of Big Data"

##OTHER FUNCTIONS/CLASSES

## Main functionality

def main(sc,filename):

rdd = sc.textFile(filename) 
rdd_tranf_filter = rdd.filter(lambda l: l[‘salary’]> 10000).map(lambda l: (l[‘department’],l[‘salary’])) 
dpt_salary = rdd_tranf_filter.reduceByKey(add).collect()
if __name__ == "__main__":
   # Configure Spark
   conf = SparkConf().setAppName(APP_NAME)
   conf = conf.setMaster("local[*]")
   sc   = SparkContext(conf=conf)
   filename = sys.argv[1]
   # Execute Main functionality
   main(sc, filename)

$ spark-submit salary.py employee.jsonl
In this blog I have introduced a way to write scalable application using apache spark in python. Also, have tried to give an insight into the building blocks of spark application. Stay tuned for the next blog, where I will be blogging about how we at Applied are leveraging apache spark to process millions of healthcare records to affiliate them with respective organizations. We mostly use spark for getting insights from healthcare records like pubmed,ctgov,nihr and record linking at scale. Follow me on twitter

 

Post your feedbacks and queries in the comments section and I will get back to you. 

2 thoughts on “How To Write Spark Applications in Python”

  1. Hi,
    Thanks for writing this article. I want to know how to submit one spark job for one folder-

    Here are more details-
    we want to use a python script which sequentially executes on all the files inside one folder. This script should be launched on multiple folders.
    Please let me know how to approach it.

    Thanks in advance

Leave a Reply to sharath Cancel reply

Your email address will not be published. Required fields are marked *

Data Science & PopHealth

Methods, tools, systems for healthcare data analysis

Contact us now

Popular Posts