How To Write Spark Applications in Pythonby Shahid Ashraf
MapReduce is a programming model and an associated implementation tool for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Google presented this in a paper in 2004 and now in the world of big data, processing petabytes of data seem easy. Later in 2010, another paper by Matei Zaharia added Spark. Thus, Spark came into existence and became Apache’s top-level project in 2014.
What is Apache Spark ?
Spark is a distributed memory computational framework.
It aims to provide a single platform that can be used for real-time applications (streaming), complex analysis (machine learning), interactive queries(shell), and batch processing (Hadoop integration).
- It has specialized modules that run on top of Spark – SQL , Streaming, Graph Processing(GraphX), Machine Learning(MLLIb).
- Spark introduces an abstract common data format that is used for efficient data sharing across parallel computation – RDD.
- Spark supports Map/Reduce programming model. (Note: Not same as Hadoop MR).
Spark Application Building Blocks
First thing that a Spark program does is create a SparkContext object, which tells Spark how to access a cluster.
- Connects to a cluster manager which allocates resources across applications.
- Acquires executors on cluster nodes – worker processes to run computations and store data.
- Sends app code to the executors.
- Sends tasks for the executors to run.
Resilient Distributed Datasets (RDD) are the primary abstraction in Spark – a fault-tolerant collection of elements that can be operated on in parallel.
There are currently two types:
- Parallelized collections – takes an existing Scala collection and runs functions on it in parallel.
- Hadoop datasets – runs functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop.
Two types of operations can be performed on RDDs – transformations and actions.
- Transformations are lazy (not computed immediately).
- The transformed RDD gets recomputed when an action is run on it.
- However, an RDD can be persisted into storage in memory or disk.
Spark is very easy to set up and run. Assuming that you have Java and Python installed we can download pre-built spark packages from https://spark.apache.org/downloads.html.
$tar xvf spark-1.4.1-bin-hadoop2.4.tgz
$mv spark-1.4.1-bin-hadoop2.4 /usr/local/spark
Edit your BASH file to add Spark to your PATH and to set the SPARK_HOME environment variable. These help in using spark via bash shell/terminal.
Now we are ready to run our Spark shell called pyspark (python interpreter with Spark API access).
Single node stand alone Spark cluster installation is now ready to use. In the above shell, we can perform or execute Spark API as well as python code. But the workflow we follow is limited to the application architecture of Spark, which usually includes manipulating the RDD (transformations and actions).
Building the Application
Let us start with creating a driver program, in which we will first define Spark context.
- Create a RDD from file (can be on local , hdfs or data on Cassandra, hbase) or create another RDD by transforming an existing RDD.
- Invoke closures (functions) on each element of the RDD.
- Perform actions on RDDs, which will kick off the computation on cluster.
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. Also, when Spark runs a closure on a worker, Spark operations work on RDDs containing any type of objects. A few special operations are available only on RDDs of key-value pairs. Visit links for complete list of transformations and actions.
Spark applications run as independent set of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program). Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own stand alone cluster manager or Mesos/YARN), which allocates resources across applications. After connecting to the cluster, application code and libraries specified are passed to executors and finally, SparkContext assigns tasks to executors.
Spark Application Template
## Imports from pyspark import SparkConf, SparkContext ## CONSTANTS APP_NAME = "My Spark Application" ##OTHER FUNCTIONS/CLASSES ## Main functionality def main(sc): rdd = sc.parallelize(range(1000), 10) print rdd.mean() if __name__ == "__main__": # Configure OPTIONS conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") #in cluster this will be like #"spark://ec2-0-17-03-078.compute-#1.amazonaws.com:7077" sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
Let’s write an application for word count following the above template.
HelloWorld_big_data.py """Calculates the word count of the given file. the file can be local or if you setup cluster. It can be hdfs file path""" ## Imports from pyspark import SparkConf, SparkContext from operator import add import sys ## Constants APP_NAME = " HelloWorld of Big Data" ##OTHER FUNCTIONS/CLASSES def main(sc,filename): textRDD = sc.textFile(filename) words = textRDD.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)) wordcount = words.reduceByKey(add).collect() for wc in wordcount: print wc,wc if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) filename = sys.argv # Execute Main functionality main(sc, filename)
In order to run the above application we need to run following commands. This will submit the application to cluster for execution.
$spark-submit hello.py abctext.txt output Spark 8 a 6 is 4 and 4 to 4 distributed 3 cluster 3 storage 2 the 2 for 2 machine 2 Apache 2 supports 2
Some of the commonly used options for spark-submit are:
- –class: The entry point for your application (e.g.apache.spark.examples.SparkPi)
- –master: The master URL for the cluster (e.g. spark://220.127.116.11:7077)
- –deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default:client)*
- –conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
- application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, anhdfs:// path or a file:// path that is present on all nodes.
- application-arguments: Arguments passed to the main method of your main class, if any
- –py-files option can be used to distribute .egg, .zip and .py libraries to executors.
When using Spark-submit, the application jar along with any jars included with the –jars option will be automatically transferred to the cluster and copied to the working directory for each SparkContext on the executor nodes.
Enough of WordCount
Here is how we can write an application to calculate the results of following query in Spark application (and not with SparkSQL).
SELECT SUM(salary) FROM employees WHERE salary > 1000 GROUP by deptname
from pyspark import SparkConf, SparkContext from operator import add import sys ## Constants APP_NAME = " HelloWorld of Big Data" ##OTHER FUNCTIONS/CLASSES ## Main functionality def main(sc,filename): rdd = sc.textFile(filename) rdd_tranf_filter = rdd.filter(lambda l: l[‘salary’]> 10000).map(lambda l: (l[‘department’],l[‘salary’])) dpt_salary = rdd_tranf_filter.reduceByKey(add).collect() if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) filename = sys.argv # Execute Main functionality main(sc, filename)
$ spark-submit salary.py employee.jsonl
In this blog I have introduced a way to write scalable application using apache spark in python. Also, have tried to give an insight into the building blocks of spark application. Stay tuned for the next blog, where I will be blogging about how we at Applied are leveraging apache spark to process millions of healthcare records to affiliate them with respective organizations. We mostly use spark for getting insights from healthcare records like pubmed,ctgov,nihr and record linking at scale. Follow me on twitter
Post your feedbacks and queries in the comments section and I will get back to you.