Skip to content

Latest commit

 

History

History
1183 lines (805 loc) · 61.8 KB

spark.md

File metadata and controls

1183 lines (805 loc) · 61.8 KB

An Introduction to Using Distributed File

Systems and MapReduce through Spark

Chris Paciorek, Statistical Computing Facility / Department of Statistics and Berkeley Research Computing, UC Berkeley

Presented: April 24, 2015

Last Revised: April 24, 2015

0) This Tutorial

The first half of this material covers starting up a Spark virtual cluster on Amazon's EC2 service, working with the Hadoop distributed file system (HDFS), reading data into spark and basic data processing.

The second half covers fitting basic statistical models, with examples of iterative and non-iterative fitting. We'll also see the use of Spark for simulation work and batch computation. Time permitting and based on participant interest, we can also have some discussion of what computations would make sense for doing in Spark and comparison with other potential approaches.

Materials for this tutorial, including the R markdown file that was used to create this document are available on github at https://github.com/ucberkeley/ph290-spark-2015-spring. You can download the files by doing a git clone:

git clone https://github.com/ucberkeley/ph290-spark-2015-spring

To create this HTML document, simply compile the corresponding R Markdown file in R:

Rscript -e "library(knitr); knit2html('spark.Rmd')"

To get the airline dataset we'll be using as a running example, you can download from this URL or as individual .bz2 files from http://www.stat.berkeley.edu/share/paciorek/{1987,...,2008}.csv.bz2. The demo code later in this document will do the downloading automatically.

There is a version of Spark for R (SparkR, see notes at the end of this document), so if you get the big picture from this but don't follow all the Python syntax, you should be able to get up to speed on Spark with R fairly quickly.

I'm using the R extension of Markdown for ease of dealing with Latex equations, though it is admittedly a bit strange to use R Markdown for a tutorial that doesn't involve R. For the most part this could have just been done as a Markdown document.

1) Introduction

1.1) Hadoop, MapReduce, and Spark

The goal of this tutorial is to introduce Hadoop-style distributed computing for statistical use. We'll focus on using MapReduce algorithms for data preprocessing and statistical model fitting. We'll make use of the Hadoop distributed file system (HDFS) for storing data and of Amazon's EC2 for creating virtual Linux clusters on which to do our computation.

To do all this we'll be making use of Spark, a project out of the AMPLab here at Berkeley that aims to greatly increase the speed of Hadoop MapReduce by keeping data in memory when possible.

Spark allows one to write code in Java, Scala, and Python. We'll use the Python interface, called PySpark, as Python is more widely known in the Statistics and Economics communities and should be easier to get started with for those who don't know any of those languages.

I won't assume knowledge of Python but will assume basic familiarity with operating in UNIX shell and with a scripting languages similar to Python such as R or Matlab. We'll also make use of some basic concepts of object-oriented programming.

1.2) Some Cautionary Notes

  1. Since PySpark involves serializing Python objects to pass them to Java, it may be slower than using Java or Scala for your MapReduce code. Here's a thread with some info.

  2. If you can run your job using the memory and disk space on a single machine, you may be better off doing so, given the overhead involved in working in a distributed fashion. That said, Spark allows you to scale in a way that would otherwise be difficult.

  3. Note that I've found the error messages in Spark (and also Hadoop) to be often very hard to understand. Some of this is because the messages are from the underlying Java implementation. So the process of figuring out syntax can be a bit painful at times. As usual, it's good to start with small test problems and to the extent possible test your Python code does what you expect before using that Python code in a Spark operation. Look carefully for error messages coming from Python that indicate bugs in your Python code.

In general, I'm not advocating for you to use MapReduce or Spark. They might or might not be the right tool for your job. But I do think it's valuable to understand how they work and be able to assess if they would be helpful for a given computation you need to do.

1.3) Getting Spark running on an EC2-based cluster

We'll focus on using the Amazon EC2 tools provided in the Spark software to start up an EC2 cluster with Spark and a distributed file system (the HDFS) already set up.

Note that a detailed set of instructions that mimics those here for starting up and logging into a Spark EC2 cluster are provided in the startSparkVirtualCluster.txt file in the workshop Github repository.

We'll use the spark-ec2 script provided by Spark. To get the script, download and unzip the source code from the Spark downloads page. Select the current release, the "Source Code" package type and "Direct Download" and then download the .tgz file. Simply unzip the .tgz file and navigate to the ec2 directory and you'll find the spark-ec2 script there. For this tutorial we'll be using Spark 1.3.0.

SPARK_VERSION=1.3.0
tar -xvzf spark-${SPARK_VERSION}.tgz
cd spark-${SPARK_VERSION}/ec2

1.3.1) Starting your cluster

First let's launch an EC2 cluster. We can choose the number of slave nodes. You'll need to have the authentication information for your Amazon AWS account as well as have generated an SSH public-private key pair for use with your Amazon account.

CLUSTER_SIZE=6
SPARK_VERSION=1.3.0
REGION=us-west-2
SSH_KEY_NAME=FILL_IN_NAME_OF_YOUR_SSH_KEY   # e.g., id_rsa
SSH_KEY_FILENAME=FILL_IN_NAME_OF_YOUR_SSH_KEY_FILENAME  # e.g., id_rsa
export AWS_ACCESS_KEY_ID=FILL_IN_YOUR_ACCESS_KEY
export AWS_SECRET_ACCESS_KEY=FILL_IN_YOUR_SECRET_KEY
YOUR_NAME=SOME_UNIQUE_NAME_FOR_YOUR_CLUSTER
cd /usr/local/linux/spark/ec2
./spark-ec2 -k ${SSH_KEY_NAME} -i ~/.ssh/${SSH_KEY_FILENAME} --region=${REGION} -s ${CLUSTER_SIZE} -v ${SPARK_VERSION} launch sparkvm-${YOUR_NAME}

You'll need to wait for a few minutes. Look carefully for error messages as sometimes the startup of the nodes and installation of the Spark software does not go smoothly. If all goes well, the end of the messages printed to the screen should look like this:

Setting up security groups...
Setting up security groups...
Creating security group sparkvm-paciorek-master
Creating security group sparkvm-paciorek-slaves
Searching for existing cluster sparkvm-paciorek...
Spark AMI: ami-9a6e0daa
Launching instances...
Launched 6 slaves in us-west-2c, regid = r-fad6cef5
Launched master in us-west-2c, regid = r-64d7cf6b
Waiting for AWS to propagate instance metadata...
Waiting for cluster to enter 'ssh-ready' state.........

[thousands of lines of information about software installation will scroll by ...]

Shutting down GANGLIA gmond:                               [FAILED]
Starting GANGLIA gmond:                                    [  OK  ]
Shutting down GANGLIA gmond:                               [FAILED]
Starting GANGLIA gmond:                                    [  OK  ]
Connection to ec2-54-212-54-11.us-west-2.compute.amazonaws.com closed.
Shutting down GANGLIA gmond:                               [FAILED]
Starting GANGLIA gmond:                                    [  OK  ]
Connection to ec2-54-218-203-231.us-west-2.compute.amazonaws.com closed.
Shutting down GANGLIA gmond:                               [FAILED]
Starting GANGLIA gmond:                                    [  OK  ]
Connection to ec2-50-112-80-198.us-west-2.compute.amazonaws.com closed.
Shutting down GANGLIA gmetad:                              [FAILED]
Starting GANGLIA gmetad:                                   [  OK  ]
Stopping httpd:                                            [FAILED]
Starting httpd:                                            [  OK  ]
Connection to ec2-54-190-212-116.us-west-2.compute.amazonaws.com closed.
Spark standalone cluster started at http://ec2-54-190-212-116.us-west-2.compute.amazonaws.com:8080
Ganglia started at http://ec2-54-190-212-116.us-west-2.compute.amazonaws.com:5080/ganglia
Done!

Also note that I've had other glitches in starting up a Spark cluster. You should not get messages about files not found nor should the startup interrupt you to ask you to answer any questions. If either of these happen, something may have gone wrong. However, you are likely to see messages that indicate SSH connection errors while the startup script waits for the EC2 nodes to become available for SSH access.

Note that when you want to shut down your cluster (and remember it will cost you money as long as it is running), do the following and make sure to answer 'y' when prompted.

./spark-ec2 --region=${REGION} --delete-groups destroy sparkvm-${YOUR_NAME}

If you want to see some of the various options and actions possible with the spark-ec2 script just do:

./spark-ec2

1.3.2) Navigating around your cluster

First we'll login to the master node of our cluster.

./spark-ec2 -k ${SSH_KEY_NAME} -i ~/.ssh/${SSH_KEY_FILENAME} --region=${REGION} login sparkvm-${YOUR_NAME}

You could also login directly with ssh provided you notice the IP address of your master in the startup messages or on the AWS EC2 webpage for your AWS account:

ssh -i ~/.ssh/${SSH_KEY_FILENAME} root@ec2-54-71-181-49.us-west-2.compute.amazonaws.com

Note that Spark expects you to be the root user so we'll do everything as root. Since this is a virtual cluster that we can shut down whenever we want, this isn't as dangerous as it would be with a regular computer.

Sometimes your connection might be interrupted and you'll lose your work, so you may want to run your work on the master node using the UNIX screen program:

screen -x -RR

If you get disconnected, simply login to the master node and type the command above and you'll be reconnected to your ongoing session.

Once on the master, the addresses of the slave nodes are given in /root/ephemeral-hdfs/conf/slaves. You can choose one of them and ssh to it using the address. Then tools such as top will allow you to see what is going on on the machine.

Spark provides graphical interfaces that allow you to view the status of the system. Open a browser on your local machine. You'll need the IP address (I'll call this <master_url> below) stored in /root/ephemeral-hdfs/conf/masters on the master node. Here are the web interfaces you might take a look at:

  1. http://<master_url>:8080 -- general information about the Spark cluster
  2. http://<master_url>:4040 -- information about the Spark tasks being executed
  3. http://<master_url>:50070 -- information about the HDFS

1.3.3) Getting data onto your cluster and onto the HDFS

Here are some options.

Copying to the master node followed by copying to the HDFS

You can use scp to copy files to the master node. The easiest thing is to be logged onto the master node and scp from your local machine to the master. But you can also use the IP address from /root/ephemeral-hdfs/conf/masters to scp from your local machine to the master, making sure to use the -i ~/.ssh/${SSH_KEY_FILENAME} flag to point to your private key.

We'll copy the ASA Data Expo Airline dataset:

mkdir /mnt/airline
cd /mnt/airline
wget http://www.stat.berkeley.edu/share/paciorek/1987-2008.csvs.tgz
tar -xvzf 1987-2008.csvs.tgz

Note that transfer to your cluster is free, but transfer from your cluster can be expensive. Campus is setting up a portal to AWS to greatly reduce that cost. Email consult@stat.berkeley.edu or consult@econ.berkeley.edu for more information.

Once you have files on the master you can set up directories in the HDFS as desired and copy the dataset onto the HDFS as follows. Often your dataset will be a collection of (possibly zipped) plain text files, such as CSVs. You don't need to unzip the files - Spark can read data from zipped files.

export PATH=$PATH:/root/ephemeral-hdfs/bin/
hadoop fs -mkdir /data/airline
hadoop fs -copyFromLocal /mnt/airline/*bz2 /data/airline
hadoop fs -ls /data/airline

Note that the commands to interact with the HDFS are similar to UNIX commands (mkdir, ls, etc.) but they must follow the hadoop fs - syntax.

Note that in general you want your dataset to be split into multiple chunks but not so many that each chunk is tiny (HDFS has trouble when there are millions of files -- see this explanation, nor so few that the dataset can't be distributed across the nodes of the distributed file system or cannot be distributed roughly equally in size across the nodes. However, in terms of the latter issues, one can "repartition" the data in Spark.

If you've done some operations and created a dataset (called airline-sfo here) on the HDFS that you want to transfer back to the master node, you can do the following. Note that this may give you back a dataset broken into pieces (one per Spark partition) so you may need to use UNIX tools to combine into one file.

hadoop fs -copyToLocal /data/airline-sfo /mnt/airline-sfo

Copying directly from Amazon's S3

If the files are stored on Amazon's S3, you can copy directly onto the HDFS. Here's how, using the Google ngrams dataset as an example. Note that we need Hadoop's MapReduce processes running to do this distributed copy operation and Spark does not start these by default. (For doing MapReduce within Spark, we do NOT need to do this.)

# start MapReduce processes
/root/ephemeral-hdfs/bin/start-mapred.sh
hadoop fs -mkdir /data/ngrams
hadoop distcp -D fs.s3n.awsAccessKeyId="<AWS_ACCESS_KEY_ID>" -D fs.s3n.awsSecretAccessKey="<AWS_SECRET_ACCESS_KEY>" \
   s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram hdfs:///data/ngrams

You'll need to use your AWS account information in place of the <AWS_ACCESS_KEY_ID> and <AWS_SECRET_ACCESS_KEY> strings here. Also check to see whether you need "s3" or "s3n" for the dataset you are interested in. Also, you may want to see if the dataset is stored in a particular AWS region as this may incur additional charges to transfer across regions. If so you may want to start up your cluster in the region in which the data reside.

2) Using Spark

Ok, now we're in a position to do computations with Spark.

We can run PySpark in the form of a Python command line interpreter session or in batch mode to run the code from a PySpark/Python code file.

Before you do so, if you plan to use numpy, make sure you do the following. There is a (new) glitch in the EC2 setup that Spark provides and numpy is not installed on the version of Python that Spark uses (Python 2.7). To install numpy on both the master and worker nodes, do the following as root on the master node.

wget https://bootstrap.pypa.io/get-pip.py
python27 get-pip.py
yum install python27-devel
pip2.7 install numpy
/root/spark-ec2/copy-dir /usr/local/lib64/python2.7/site-packages/numpy

To start PySpark in an interactive session, we do this

export PATH=${PATH}:/root/spark/bin
pyspark

2.1) Background

Spark is AMPLab's effort to create a version of Hadoop that uses memory as much as possible to reduce the speed of computation, particularly for iterative computations. So in addition to the distributed file system, you can think of Spark as providing you with access to distributed memory - the collective memory of all of the nodes in your cluster. However, that memory is still physically separated so you're not operating as if you had a single machine. But the abstractions provided by Spark allow you to do a lot of computations that would be a pain to program yourself even with access to a cluster and use of something like MPI for passing messages between machines.

2.2) Reading data into Spark and doing basic manipulations

2.2.1) Reading data into Spark

Reading from HDFS

First we'll see how to read data into Spark from the HDFS and do some basic manipulations of the data.

lines = sc.textFile('/data/airline')
lines.count()

The textFile() method reads the data from the airline directory, dealing with the compression and with the various files that data is split into.

Note that the first line seems to take no time at all. That is because Spark uses lazy evaluation, delaying evaluation of code until it needs to actually produce a result. So sometimes it can be hard to figure out how long each piece of a calculation takes because multiple pieces are being combined together. I'll sometimes do a count() just to force the evaluation of the previous command. Transformations such as map() and reduce() do not trigger evaluation but actions such as count(), collect(), and saveAsTextFile() do. Note also that serialization between Java and Python objects only occurs at barriers defined by actions, so if you have a series of transformations, you shouldn't incur repeated serialization.

You should see a bunch of logging messages that look like this:

14/10/31 01:15:53 INFO mapred.FileInputFormat: Total input paths to process : 22
14/10/31 01:15:53 INFO spark.SparkContext: Starting job: count at <stdin>:1
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Got job 3 (count at <stdin>:1) with 22 output partitions (allowLocal=false)
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Final stage: Stage 3(count at <stdin>:1)
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Missing parents: List()
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Submitting Stage 3 (PythonRDD[16] at RDD at PythonRDD.scala:43), which has no missing parents
14/10/31 01:15:53 INFO storage.MemoryStore: ensureFreeSpace(5128) called with curMem=273374, maxMem=278302556
14/10/31 01:15:53 INFO storage.MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.0 KB, free 265.1 MB)
14/10/31 01:15:53 INFO storage.MemoryStore: ensureFreeSpace(3265) called with curMem=278502, maxMem=278302556
14/10/31 01:15:53 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 3.2 KB, free 265.1 MB)
14/10/31 01:15:53 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on ip-10-225-185-17.us-west-2.compute.internal:54745 (size: 3.2 KB, free: 265.4 MB)
14/10/31 01:15:53 INFO storage.BlockManagerMaster: Updated info of block broadcast_9_piece0
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Submitting 22 missing tasks from Stage 3 (PythonRDD[16] at RDD at PythonRDD.scala:43)
14/10/31 01:15:53 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 22 tasks
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 6, ip-10-249-30-169.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 3.0 (TID 7, ip-10-249-65-147.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 3.0 (TID 8, ip-10-249-62-60.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 12.0 in stage 3.0 (TID 9, ip-10-249-14-81.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 3.0 (TID 10, ip-10-249-14-142.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 8.0 in stage 3.0 (TID 11, ip-10-249-71-5.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 3.0 (TID 12, ip-10-249-25-57.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 3.0 (TID 13, ip-10-226-152-67.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 9.0 in stage 3.0 (TID 14, ip-10-226-142-5.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 11.0 in stage 3.0 (TID 15, ip-10-249-31-110.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes
...
...
14/10/31 01:15:54 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on ip-10-249-71-5.us-west-2.compute.internal:49964 (size: 4.7 KB, free: 3.1 GB)
14/10/31 01:15:54 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on ip-10-249-25-57.us-west-2.compute.internal:37497 (size: 4.7 KB, free: 3.1 GB)
14/10/31 01:15:54 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on ip-10-249-62-60.us-west-2.compute.internal:56086 (size: 4.7 KB, free: 3.1 GB)
14/10/31 01:15:57 INFO scheduler.TaskSetManager: Starting task 19.0 in stage 3.0 (TID 27, ip-10-249-31-110.us-west-2.compute.internal, ANY, 1243 bytes)
14/10/31 01:16:14 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 6) in 21020 ms on ip-10-249-30-169.us-west-2.compute.internal (1/22)
14/10/31 01:16:29 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 3.0 (TID 17) in 35919 ms on ip-10-249-15-15.us-west-2.compute.internal (2/22)
14/10/31 01:16:31 INFO scheduler.TaskSetManager: Finished task 9.0 in stage 3.0 (TID 14) in 38311 ms on ip-10-226-142-5.us-west-2.compute.internal (3/22)
14/10/31 01:16:37 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 3.0 (TID 18) in 43942 ms on ip-10-249-30-169.us-west-2.compute.internal (4/22)
14/10/31 01:16:55 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 3.0 (TID 10) in 62229 ms on ip-10-249-14-142.us-west-2.compute.internal (5/22)
...

There are 22 tasks because there are 22 data files. Each task is done as a single process on one of the nodes.

You should see there are about 123 million observations (rows) in the dataset (the dataset is an RDD object). More on RDDs in a bit.

What if the data are not one-line-per-row?

You can read in each file as an individual observation using wholeTextFiles().

You can use flatMap() if each row contains multiple observations.

Reading into Spark from S3

If we wanted to read directly into Spark from S3 (again checking if we need s3 or s3n), we can do something like this, here reading the Google n-grams word co-occurrence dataset:

lines = sc.textFile("s3n://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram")

Note the ":" to separate the two parts of your AWS credential information. That was to read from a set of files but you could also point to a specific individual file. And you may be able to use wildcards.

Note that if either of your AWD credential strings has a "/" in it, that can cause problems because it will be interpreted as a file path separator (":" would probably also cause similar problems). As an alternative, you can add your credentials in the following format as lines in the /root/ephemeral-hdfs/conf/core-site.xml configuration file.

<property>
  <name>fs.s3n.awsAccessKeyId</name>
  <value>YOUR_AWS_ACCESS_KEY_ID</value>
</property>
<property>
  <name>fs.s3n.awsSecretAccessKey</name>
  <value>YOUR_AWS_SECRET_ACCESS_KEY</value>
</property>

In that case you would just do:

lines = sc.textFile("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram")

The Google ngrams data is somewhat big so that may take a while. To test that things are working, try this test file:

lines = sc.textFile("s3n://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@berkeley-scf-test/test/2009_nfl_pbp_data.csv") 
lines.count()

You should get that there are 43064 lines.

To get a directory of files rather than a single file you should be able to give a directory name, and I think wildcards are also allowed to indicate multiple files.

Repartitioning the input

To better distribute the data across the nodes, I'm going to repartition the dataset into more than 22 chunks.

lines = sc.textFile('/data/airline').repartition(192).cache()

And I also "cached" the RDD so that it is retained in memory for later processing steps.

We'll come back to both of these ideas in a bit.

2.2.2) Subsetting in Spark

The next thing we could do is subset the dataset using the filter() method. Here's an example:

sfo_lines = lines.filter(lambda line: "SFO" in line.split(',')[16]).cache()
sfo_lines.count()

The filtering function (in this case my anonymous (lambda) function) should produce a single boolean. I could have used a named function here too.

There are 2373910 observations corresponding to flights from SFO.

Note that the processing here shows 192 tasks, corresponding to the now 192 partitions.

2.2.3) Extracting data from an RDD

If the subset is sufficiently small we can pull it into Python as a standard Python object in the PySpark/Python session on our master node.

sfo = sfo_lines.collect()
type(sfo) 
# <type 'list'>
len(sfo) 
# 2733910
sfo[1] # but remember Python indexing starts at 0 
# u'1992,1,2,4,708,705,1243,1222,US,1118,NA,215,197,NA,21,3,SFO,MCI,1499,NA,NA,0,NA,0,NA,NA,NA,NA,NA'

Or I could save the subset as a new HDFS dataset that I could transfer back to the standard filesystem on the master using the HDFS copyToLocal command from the UNIX prompt on the master node.

sfo_lines.saveAsTextFile('/data/airline-sfo')

2.2.4) Stratifying in Spark

Now suppose I want to use Spark to do some stratification. Here's a very basic stratification. We'll do it using MapReduce.

I'm going to group by departure/arrival airport pairs and then count the number of flights. I'll use a mapper, createKeyValue(), to create key-value pairs where the key is the route and the value is just a 1 and then I do a reduction, stratifying by key, where I simply count the records associated with each key by using add as the reduce function. In Python, the key-value pairs are just a 2-element tuple (I think a 2-element list would be fine), but the value could be arbitrarily more complicated than the scalar quantity 1 that I use here.

from operator import add

def createKeyValue(line):
    vals = line.split(',')
    return(vals[16]+'-'+vals[17], 1)

routeCount = lines.map(createKeyValue).reduceByKey(add).collect()

mx = routeCount[0]
for i in xrange(len(routeCount)):
     if routeCount[i][1] > mx[1]:
        mx = routeCount[i]

Any guesses about the route with the most flights?

I wrote that as presented above to illustrate key-value pairs and using a map-reduce pair of functions, but for simply counting, I could have done it more efficiently like this:

routeCount2 = lines.map(createKeyValue).countByKey()

Note that the result is a dictionary not a list.

2.2.5) Getting a few elements for inspection and debugging

One useful thing is to be able to grab a single element or a few elements from an Spark dataset. This allows us to look at the structure of units in the dataset and test our code by applying our Python functions directly to the units.

oneLine = lines.take(1)
# test my createKeyValue() function:
createKeyValue(oneLine)
createKeyValue(oneLine[0])
keyValues = lines.map(createKeyValue).cache()
twoKeyValues = keyValues.take(2)
twoKeyValues
# [(u'AVP-PIT', 1), (u'LAX-IND', 1)]
twoKeyCounts = keyValues.reduceByKey(add).take(2)
# [(u'ORD-SEA', 101949), (u'MCI-MSY', 925)]

Make sure you know what kind of Python object is in use for each observation in the RDD - originally it's often just a text line from CSV, but after mapping it would often be a tuple or a list. It's good to check your code on individual elements or pairs of elements from the RDD obtained via take(). Note that sometimes things will be embedded in a list and you'll need to burrow into it as seen in the example where we had a list of length one containing the string.

2.3) Some fundamental ideas in MapReduce and Spark

Now that we've seen a few example computations, let's step back and discuss a few concepts.

2.3.1) RDDs

Spark datasets are called Resilient Distributed Datasets (RDDs). Spark operations generally take the form of a method applied to an RDD object and the methods can be chained together, as we've already seen.

Note also the sc object we used for reading from the HDFS was a SparkContext object that encapsulates information about the Spark setup. The textfile() function is a method used with SparkContext objects.

2.3.2) Caching

Spark by default will try to manipulate RDDs in memory if they fit in the collective memory of the nodes. You can tell Spark that a given RDD should be kept in memory so that you can quickly access it later by using the cache() method. To see the RDDs that are cached in memory, go to http://<master_url>:4040 and click on the "Storage" tab.

If an RDD won't fit in memory, Spark will recompute it as needed, but this will involve a lot of additional computation. In some (but not all) cases in which there is not enough collective memory you may want it cached on disk. The Spark programming guide has more information.

2.3.3) Partitions and repartitioning

The number of chunks your dataset is broken into can greatly impact computational efficiency. When a map function is applied to the RDD, the work is broken into one task per partition.

You want each partition to be able to fit in the memory availalbe on a node, and if you have multi-core nodes, you want that as many partitions as there are cores be able to fit in memory.

For load-balancing you'll want at least as many partitions as total computational cores in your cluster and probably rather more partitions. The Spark documentation suggests 2-4 partitions (which they also seem to call slices) per CPU. Often there are 100-10,000 partitions. Another rule of thumb is that tasks should take at least 100 ms. If less than that, you may want to repartition to have fewer partitions.

As an example that we've already seen, the original airline dataset is in 22 partitions, one per input CSV file, and those partitions are different sizes because the data size varies by year. So it's a good idea to repartition the dataset as you're doing your initial manipulations on it, as we did above. But it did take a while to do that because of all the transferring of data across nodes.

2.3.4) MapReduce

MapReduce is a sort of meta-algorithm. If you can write your algorithm/computation as a MapReduce algorithm, then you should be able to implement it in Spark or in standard Hadoop, including Amazon's Elastic MapReduce framework.

The basic idea is that your computation be written as a one or more iterations over a Map step and a Reduce step. The map step takes each 'observation' (each unit that can be treated independently) and applies some transformation to it. Think of a mathematical mapping. The reduction step then takes the results and does some sort of aggregation operation that returns a summary measure. Sometimes the mapping step involves computing a key for each unit. The units that share the same key are then collected together and a reduction step may be done for each key value.

Actually, it's more general than that. We may have multiple map steps before any reduction or no reduction at all. Also reduceByKey() returns an RDD, so there could potentially be multiple reductions.

Reduction functions should be associative and commutative. It shouldn't matter what order the operations are done in or which observations are grouped with which others in order that the reduction can be done in parallel in a simple fashion.

We'll see a variety of MapReduce algorithms here so this should become more concrete.

In addition to map and reduce steps, we can explicitly pass shared data to all the nodes via a broadcast step. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

2.4) Additional data processing in Spark

Now let's do some aggregation/summarization calculations. One might do this sort of thing in the process of reducing down a large dataset to some summary values on which to do the core analysis.

First we'll do some calculations involving sums/means that naturally fit within a MapReduce paradigm and then we'll do a median calculation, which is not associative and commutative so is a bit harder and wouldn't scale well.

2.4.1) Calculating means as a simple MapReduce task

Here we'll compute the mean departure delay by airline-month-origin-destination sets. So we'll create a compound key and then do reduction separately for each key value (stratum).

Our map function will exclude missing data and the header 'observations'.

def mapper(line):
    vals = line.split(',')
    key = '-'.join([vals[x] for x in [8,1,16,17]])
    if vals[0] == 'Year' or vals[14] == 'NA':
       key = '0'
       delay = 0.0
       valid = 0.0
    else: 
       delay = float(vals[14])
       valid = 1.0
    return(key, (delay, valid))


def reducer( (dep1, valid1), (dep2, valid2) ):
    return( (dep1 + dep2), (valid1 + valid2) )

mappedLines = lines.map(mapper).cache()
tmp = mappedLines.reduceByKey(reducer)
tmp
# PythonRDD[11] at RDD at PythonRDD.scala:43
results = tmp.collect()
results[0:3]  
# [(u'EA-11-ATL-DEN', (1531.0, 265.0)), (u'NW-11-MSP-SDF', (3290.0, 882.0)), (u'DL-9-SEA-JFK', (5301.0, 577.0))]
means = [(val[0], val[1][0]/val[1][1]) for val in results if val[1][1] > 0.0]
means[0:3]
# [(u'EA-11-ATL-DEN', 5.777358490566038), (u'NW-11-MSP-SDF', 3.7301587301587302), (u'DL-9-SEA-JFK', 9.1871750433275565)]

Note that the result of the reduceByKey() is just another (small) RDD.

Note that we wrote our own reduce function because Python lists/tuples don't add in a vectorized fashion so we couldn't just use operator.add(), but if we had made the values in the key-value pairs to be numpy arrays we could have just done reduceByKey(add).

2.4.2) Calculating medians as a non-standard MapReduce task

This will be more computationally intensive because we don't have an associative and commutative function for reduction. Instead we need all the values for a given key to calculate the summary statistic of interest. So we use groupByKey() and then apply the median function to the RDD where each key has as its value the entire set of values for that key from the mapper. I suspect that Spark/Hadoop experts would frown on what I do here as it wouldn't scale as the strata sizes increase, but it seems fairly effective for these sizes of strata.

def medianFun(input):
    import numpy as np
    if len(input) == 2:
        if len(input[1]) > 0:
            med = np.median([val[0] for val in input[1] if val[1] == 1.0])
            return((input[0], med))
        else:
            return((input[0], -999))
    else:
        return((input[0], -9999))

output = mappedLines.groupByKey()
output.count()
# 186116
medianResults = output.map(medianFun).collect()
medianResults[0:3]
# [(u'EA-11-ATL-DEN', 0.0), (u'NW-11-MSP-SDF', -3.0), (u'DH-8-HPN-ORD', -7.0)]

# here's code to show the iterable object produced by groupByKey()
dat = output.take(1)
dat[0][0]
len(dat[0][1])
[val for val in dat[0][1]]

Note that the median calculation is another map function not a reduce, because it's just a 1-1 transformation of the RDD that is returned by groupByKey(). But of course the size of the value associated with each key is much larger before the application of the medianFun() map step.

2.4.3) Stratifying data and exporting

Now suppose we wanted to create multiple datasets, one per stratum. This does not fit all that well into the MapReduce/HDFS paradigm. We could loop through the unique values of the stratifying variable using filter() and saveAsTextFile() as seen above. There are not really any better ways to do this. In previous tries, I used sortByKey() first and that improved speed but I've been running into errors recently when doing that, with Spark losing access to some of the worker tasks (executors). Here I'll stratify by departure airport.

def createKeyValue(line):
    vals = line.split(',')
    keyVal = vals[16]
    return(keyVal, line)

keyedLines = lines.map(createKeyValue).cache()

keys = keyedLines.countByKey().keys()

for curkey in keys:
    fn = '/data/' + curkey
    keyedLines.filter(lambda x: curkey == x[0]).map(lambda x: x[1]).repartition(1).saveAsTextFile(fn)

The repartition(1) ensures that the entire RDD for a given key is on a single partition so that we get a single file for each stratum instead of having each stratum split into pieces across the different nodes hosting the HDFS. The mapper simply pulls out the value for each key-value pair, discarding the key.

2.5) Doing simulation via a simple MapReduce calculation

We'll do a simulation to estimate the value of $\pi$ as an embarrassingly parallel calculation using MapReduce.

Here's the Python code

import numpy.random as rand
from operator import add

samples_per_slice = 1000000
num_cores = 24
num_slices = num_cores * 20

def sample(p):
    rand.seed(p)
    x, y = rand.random(samples_per_slice), rand.random(samples_per_slice)
    return sum(x*x + y*y < 1)

count = sc.parallelize(xrange(0, num_slices), num_slices).map(sample).reduce(add)
print "Pi is roughly %f" % (4.0 * count / (num_slices*samples_per_slice))

Let's piece that apart. The parallelize() call basically takes the numbers {0,1,2,...,num_slices-1} and treats them as the input "dataset". These are the initial units and then the transformation (the mapper) is to do the random number generation and compute the partial sum. The second argument to parallelize() indicates how many partitions to create, so this will correspond to the number of tasks that will be carried out in a subsequent map step. Using paralllelize() to create an RDD from an existing Python list is another way to create an RDD, in addition to reading data from an external storage system such as the HDFS.

A few comments here. First we could have sampled a single random vector, thereby calling parallelize() across numslices*samples_per_slice, with one task per {x,y} pair. But it's much more computationally efficient to do a larger chunk of calculation in each process since there is overhead in running each process. But we want enough processes to make use of all the processors collectively in our Spark cluster and probably rather more than that in case some processors do the computation more slowly than others. Also, if the computation involved a large amount of memory we wouldn't want to exceed the physical RAM available to the processes that work simultaneously on a node. However, we also don't want too many tasks. For example, if we try to have as many as 50 million parallel tasks, Spark will hang. Somehow the overhead in creating that many tasks is too great, though I need to look further for a complete explanation.

2.6) Random number generation in parallel

A basic approach to deal with parallel random number generation (RNG) is to set the seed based on the task id, e.g., passed in via parallelize() above. This doesn't guarantee that the random number streams for the different tasks will not overlap, but it should be unlikely. Unfortunately Python does not provide the L'Ecuyer algorithm, or other algorithms to ensure non-overlapping streams, unlike R and Matlab.

2.7) Using PySpark in batch (non-interactive mode)

We can also do that calculation as a batch job.

Here's the code in the file piCalc.py:

import sys
import numpy.random as rand
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print >> sys.stderr, "Usage: spark-submit piCalc.py <total_samples> <slices>"
        exit(-1)
    sc = SparkContext() 
    total_samples = int(sys.argv[1]) if len(sys.argv) > 1 else 1000000
    num_slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
    samples_per_slice = round(total_samples / num_slices)

    def sample(p):
        rand.seed(p)
        x, y = rand.random(samples_per_slice), rand.random(samples_per_slice)
        return sum(x*x + y*y < 1)

    count = sc.parallelize(xrange(0, num_slices), num_slices).map(sample).reduce(add)
    print "Pi is roughly %f" % (4.0 * count / (num_slices*samples_per_slice))

Note that by starting Python via PySpark, the sc object is created and available to you, but for batch jobs we need to import SparkContext and instantiate the sc object ourselves.

And here's how we run it from the UNIX command line on the master node

spark-submit piCalc.py 100000000 1000

3) Model fitting with Spark

3.1) Regression example with sufficient statistics

Suppose we wanted to run a regression with very many observations, $n$, but relatively few predictors, $p$. For our purposes here, let's assume $p &lt; 5000$ so that $X^\top X$ takes up at most about 200 Mb. In this case we can compute the sufficient statistics, $X^\top X$ and $X^\top Y$ using MapReduce and return them to the master for the final calculation.

The basic calculation we are doing is $\sum_{i=1}^n x_i x_i^\top$ and $\sum_{i=1}^n x_i y_i$.

import numpy as np
from operator import add

def screen(vals):
    vals = vals.split(',')
    return(vals[0] != 'Year' and vals[14] != 'NA' and vals[18] != 'NA' 
           and vals[3] != 'NA' and
           float(vals[14]) < 720 and float(vals[14]) > (-30) )

lines = sc.textFile('/data/airline').filter(screen).repartition(192).cache()

P = 8

def crossprod(line):
    vals = line.split(',')
    y = float(vals[14])
    dist = float(vals[18])
    dayOfWeek = int(vals[3])
    xVec = np.array([0.0] * P)
    xVec[0] = 1.0
    xVec[1] = float(dist)/1000
    if dayOfWeek > 1:
        xVec[dayOfWeek] = 1.0
    xtx = np.outer(xVec, xVec)
    xty = xVec * y
    return(np.c_[xtx, xty])

suffStats = lines.map(crossprod).reduce(add)
mles = np.linalg.solve(suffStats[0:P,0:P], suffStats[0:P,P])
# array([ 6.366239  ,  0.76375807, -0.69956913,  0.39282445,  2.22473943,
#        2.88670704, -2.42725064, -0.13621746])

More efficient calculation using mapPartitions()

That calculation of the sufficient statistics applied the linear algebra row by row. For the full airline dataset it took about 11 minutes. A more efficient alternative is to put all the data for a given partition into a matrix and do the linear algebra on the matrix. The code below takes about 2.5 minutes.

Here we use mapPartitions() to apply a map to an entire partition - the readPointBatch() function iterates through the observations in the partition and constructs the $X$ matrix before then doing the crossproduct.

In contrast to the approach above, our reduction is now done over the $K$ matrices, one per partition: $\sum_{k=1}^K X_k^\top X_k$ and $\sum_{k=1}^K X_k^\top y_k$.

def readPointBatch(iterator):
    strs = list(iterator)
    matrix = np.zeros((len(strs), P+1))
    for i in xrange(len(strs)):
        vals = strs[i].split(',')
        dist = float(vals[18])
        dayOfWeek = int(vals[3])
        xVec = np.array([0.0] * (P+1))
        xVec[8] = float(vals[14]) # y
        xVec[0] = 1.0  # int
        xVec[1] = float(dist) / 1000
        if(dayOfWeek > 1):
            xVec[dayOfWeek] = 1.0
        matrix[i] = xVec
    return [matrix.T.dot(matrix)]

suffStats2 = lines.mapPartitions(readPointBatch).reduce(add)
mles = np.linalg.solve(suffStats2[0:P,0:P], suffStats2[0:P,P])

It's not entirely clear to me why I needed to return a list with a single matrix from readPointBatch() as opposed to simply returning the matrix, but I expect it's just that PySpark needs each element of the RDD to be a list (or perhaps a tuple instead).

3.2) Coordinate descent for linear regression

The strategy above makes a lot of sense but breaks down as $P$ gets large. We might consider coordinate descent, iterating through the individual regression coefficients. Of course this would probably be quite slow for large $P$, but it's worthwhile to illustrate and sets the stage for our use of coordinate descent for lasso problems in the next section.

For linear regression, the optimization for a single coefficient, holding the others constant, can be done in closed form: $\hat{\beta}{k} = \frac{\sum{i=1}^n r_i y_i} {\sum_{i=1}^n x_{k,i}^2 }$ where $r_i = y_i - \sum_{j \ne k} x_{j,i}\beta_j$.

We'll precompute the denominator values in a MapReduce step. Our coordinate descent is then an iterative procedure with an inner loop that loops over the coefficients. For simplicity below I just do a fixed number of iterations, but in reality of course we would want a reasonable stopping criterion for the optimization.

Note that for efficiency we'll again use the mapPartitions() version of this algorithm.

P=8

def readPointBatch(iterator):
    strs = list(iterator)
    matrix = np.zeros((len(strs), P+1))
    print(len(strs))
    for i in xrange(len(strs)):
        vals = strs[i].split(',')
        dist = float(vals[18])
        dayOfWeek = int(vals[3])
        xVec = np.array([0.0] * (P+1))
        xVec[8] = float(vals[14]) # y
        xVec[0] = 1.0  # int
        xVec[1] = float(dist) / 1000
        if(dayOfWeek > 1):
            xVec[dayOfWeek] = 1.0
        matrix[i] = xVec
    return [matrix]

# set up the partitions with the submatrices
batches = lines.mapPartitions(readPointBatch).cache()

def denomSumSqBatch(mat):
# compute denominator of update: sum of squared predictor
    return((mat*mat).sum(axis=0))

def getNumBatch(mat):
# compute numerator of update: covariance of residual and predictor
    beta[p] = 0
    sumXb = mat[:, 0:P].dot(beta)
    return(sum((mat[:,P] - sumXb)*mat[:,p]))

# compute denominator of update 
sumx2 = batches.map(denomSumSqBatch).reduce(add)

# initialize coefficient values
beta = np.array([0.0] * P)
p = 0
oldBeta = beta.copy()
it = 0

# finally.... do the optimization!

#while crit > tol and its < maxIts:
for it in range(1,100):
    for p in xrange(P):
        # distribute current beta and current coordinate
        sumNum = batches.map(getNumBatch).reduce(add)
        # update coordinate:
        beta[p] = sumNum / sumx2[p]   
    crit = sum(abs(beta - oldBeta))
    oldBeta = beta.copy()
    # interim reporting of estimates:
    print("-"*100)
    print(beta)
    print(crit)
    print("-"*100)

print beta
# after 4 iterations
#[ 7.0987348   0.57795465 -1.30359727 -0.21110881  1.6211203   2.28315019
# -3.02584311 -0.73731912]
# after 100 iterations
#[ 6.36623919  0.76375807 -0.69956932  0.39282426  2.22473924  2.88670685
# -2.42725082 -0.13621764]

The parameters haven't converged fully after four iterations, but are equal to the MLE by 100 iterations (I didn't check carefully to see where convergence had been reached, so 100 iterations may be serious overkill).

Each update of a coordinate takes about a 7 seconds.

3.3) Lasso via coordinate descent

A standard approach to fitting the Lasso is coordinate descent, as done in R's glmnet package, documented here in Friedman et al.'s well-known paper (1700 citations according to Google scholar!) on computation for penalized GLMs.

To test out my code, I'm going to generate a large simulated dataset, with 10 non-zero coefficients of varying size and 90 zero coefficients. The individual coefficient updates are similar to the coordinate descent for linear regression, involving covariances between residuals and predictor, but with thresholding. For the example here, I just do this with a single value of the penalty parameter, $\lambda$, but one could readily extend this to compute the solution path across different values of $\lambda$, using the idea of warm starts, in which the optimization for the next value of the penalty starts at the estimates found for the current value of the penalty.

First I'll generate some simulated data with known coefficients. In this case the code is set up to either generate independent predictors or autocorrelated predictors, depending on the value of autocorr.

import numpy as np
from operator import add

n = 10000000
P = 100
nNonZero = 10

betaTrue = [-2, -1.5, -1.0, -0.5, -0.3, -0.1, 0.3, 0.5, 1.0, 1.5]
betaTrue = np.append(betaTrue, np.zeros(P-nNonZero))

# create dataset as RDD

# allow generation of autocorrelated X's to make optimiz'n harder
autocorr = 1
rho = 0.7

def simData(it):
    np.random.seed(it)
    x = np.random.normal(size = P)
    if autocorr == 1:
        for ind in xrange(P-1):
            x[ind+1] = rho*x[ind] + x[ind+1]
    y = sum(x*betaTrue) + np.random.normal(size = 1)
    return(np.append(x, y))

data = sc.parallelize(xrange(n)).repartition(96).map(simData)

As with the regression, let's set up each batch to have all of its data as a single matrix.

def readPointBatch(iterator):
    vals = list(iterator)
    matrix = np.zeros((len(vals), P+1))
    for i in xrange(len(vals)):
        matrix[i] = vals[i]
    return [matrix]

batchesLasso = data.mapPartitions(readPointBatch)

Now I'll standardize the predictors so we can directly implement the equations of the Friedman et al. paper.

def getSums(mat):
    return mat.sum(axis = 0)

colMeans = (batchesLasso.map(getSums).reduce(add) / n)[0:P]

def stdizeMean(mat):
    mat[:, 0:P] = mat[:, 0:P] - colMeans
    return mat

batchesLasso = batchesLasso.map(stdizeMean)

def getSumSqs(mat):
    return (mat*mat).sum(axis = 0)

colSds = (pow(batchesLasso.map(getSumSqs).reduce(add) / n, 0.5))[0:P]

def stdizeSd(mat):
    mat[:, 0:P] = mat[:, 0:P] / colSds[0:P]
    return mat

batchesLasso = batchesLasso.map(stdizeSd).cache()

Now we'll implement the basic naive algorithm from the Friedman et al. paper.

eps = 0.01
alpha = 1 - eps  # alpha = 1 is lasso penalty, but with a slight numerical modification
lamb = 0.5   # value of penalty parameter

def Sfun(z, gamma):
    if gamma >= abs(z):
        return(0)
    if z > 0:
        return(z - gamma)
    return(z + gamma)

def getNumBatch(mat):
    if nonZeros == None:
        sumXb = 0
    else:
        # poor man's sparse implementation
        sumXb = mat[:, nonZeros].dot(beta[nonZeros])
    return(sum((mat[:,P] - sumXb)*mat[:,p]))


beta = np.array([0.0] * P)
nonZeros = None
p = 0

oldBeta = beta.copy()

it = 0

nIts = 6
storeVals = np.zeros((nIts, P+1))

for it in range(0,nIts):
    for p in xrange(P):
        # get numerator as product of residual and X for coordinate
        sumNum = batchesLasso.map(getNumBatch).reduce(add)
        beta[p] = Sfun(sumNum/n + beta[p], lamb*alpha) / (1 + lamb*eps)
        if beta[p] != 0:
            if nonZeros == None:
                nonZeros = np.array([p])
            elif sum(p == nonZeros) == 0:
                nonZeros = np.append(nonZeros, p)
                nonZeros.sort()
        print("Updated var " + str(p) + " in iteration ", str(it), ".")
    crit = sum(abs(beta - oldBeta))
    storeVals[it, 0] = crit
    storeVals[it, 1:(P+1)] = beta
    oldBeta = beta.copy()
    print("-"*100)
    print(beta)
    print(crit)
    print("-"*100)
#('Updated var 99 in iteration ', '5', '.')
#----------------------------------------------------------------------------------------------------
#[-1.68283553 -1.72371224 -1.22416959 -0.45692377  0.          0.          0.
#  0.32505124  1.3951588   1.77561668  0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.          0.          0.          0.          0.          0.
#  0.          0.        ]
#0.18567818344
#----------------------------------------------------------------------------------------------------

After 5 iterations, the Lasso seems to be getting close to convergence (the sum of the absolute difference in the coefficients is 0.19). Each update of a coefficient takes about 3 seconds and a single full iteration about 6 minutes.

As noted, I implemented the naive algorithm. The covariance updating algorithm also presented in Friedman et al. makes use of the sparsity of the coefficient vector to achieve computational speedup.

3.4) Linear regression via gradient descent

The coordinate descent algorithm for linear regression would be slow when there are many predictors as we are doing iterations in Spark over the coordinates. It would be faster to update all the coefficients at once and have the "iteration" over the coefficients be done within Python in the computation of the gradient (i.e., have the for loop across predictors calculate the gradient vector in Python within a partition).

Here's gradient descent in Spark. Note that as is general for descent methods, we could step too far and the objective function might increase, particularly when we are not using the second derivatives to guide our step sizes. I haven't put in any checking here to ensure that our steps always go downhill, but I do provide a function to evaluate the objective function -- this could be used to make the algorithm more sophisticated.

As above with coordinate descent, we'll use the batches RDD that has the data in one matrix per partition.

# step size (somewhat arbitrary here)
P=8

alpha = .4

n = lines.count()

def sumVals(mat):
    return(sum(mat[:,P]))

beta = np.array([0.0] * P)

# initialize the intercept at empirical mean
beta[0] = batches.map(sumVals).reduce(add) / n
oldBeta = beta.copy()

def getGradBatch(mat):
# gradient function
    sumXb = mat[:, 0:P].dot(beta)
    return( ((sumXb - mat[:,P])*((mat[:, 0:P]).T)).sum(1) )

def obj(mat):
# objective function evaluation
    return ( (pow(mat[:,P] - mat[:, 0:P].dot(beta), 2)).sum() ) 

objValue = batches.map(obj).reduce(add)

nIts = 300

storeVals = np.zeros((nIts, P+2))

# for simplicity do a fixed number of iterations
for it in range(1,nIts):
    gradVec = batches.map(getGradBatch).reduce(add)
    # update coefficients via simple gradient descent
    beta = beta - alpha * gradVec / n
    crit = sum(abs(beta - oldBeta))
    objValue = batches.map(obj).reduce(add)
    oldBeta = beta.copy()
    storeVals[it, 0] = pow(objValue/n,0.5)
    storeVals[it, 1] = crit
    storeVals[it, 2:(P+2)] = beta
    print("-"*100)
    print(it)
    print(beta)
    print(crit)
    print(pow(objValue/n,0.5))
    print("-"*100)
# ....
# 99
#[ 6.5580312   0.75471562 -0.91028266  0.17939929  2.00668012  2.66706259
# -2.62962892 -0.34925791]
#0.011090628482
#29.837622418

Each of the gradient calculation and objective function calculation take about 5 seconds per iteration. With $p=100$, the whole thing takes a while

It takes a while in terms of number of iterations, but when I ran it for another 300 iterations it was getting fairly close to the MLE (plus/minus .05 for all the coefficients). And that was without any tuning of the step size or checking for steps that go too far (though I don't think any of the steps resulted in an increase in the objective function).

3.5) Stochastic gradient descent and MLlib

MLlib is the machine learning library for Spark, which implements a number of standard statistical/machine learning models/methods. We won't go into it here, but since the goal of MLlib is to provide ready-to-go functions to apply to datasets, it won't be hard for you to try it out given all of what we've seen here.

One standard method in fitting large datasets is stochastic gradient descent, which involves choosing a random subset of the observations to use to compute the gradient at each update. MLlib relies heavily on SGD and the Spark documentation provides details.

3.6 Uncertainty

I'd have my Statistics PhD revoked if I didn't say anything about uncertainty estimates, beyond simply getting estimates. For small $p$, one can do the usual computation of regression and GLMs standard errors/covariances. For large $p$, my understanding is that this is a hard problem, and I haven't had a chance to dig into the literature to figure out the state of the art.

4) Final thoughts

4.1) Running Spark serially on your own machine

One should be able to test Spark readily on your own machine with the workers all operating on your single machine rather than on the nodes of a cluster. So far, based on the instructions above, you have simply unzipped the Spark tarball and not actually installed the software. To install the software, try downloading the pre-built binary at the Spark downloads page. This will allow you to use Spark for simulation. To access data on disk, you'll want to read data from the local disk, rather than HDFS, into Spark. (Otherwise, you'd need to have Hadoop installed with the HDFS set up and with only a single node there's no obvious purpose to this.) So in your textFile() invocation, just refer to the local path on the machine, e.g., sc.textFile("/path/to/data.txt").

4.2) Using a fast BLAS

As is generally the case when using Python or R, if you're doing a lot of linear algebra, the speed of your code may be driven by the speed of the BLAS that Python or R is linked to. So if your Spark code does a lot of linear algebra, you probably want to link Python to a fast BLAS such as openBLAS. For more information, email consult@stat.berkeley.edu or consult@econ.berkeley.edu.

4.3) Additional features of Spark

To use Spark with iPython, see this post.

Spark SQL allows you to do SQL queries on RDDs.

Spark Streaming allows you to work with data arriving in an on-line manner.

SparkR provides an R interface. It's under active development and is available on Github and will be available soon as part of the Apache Spark project on the main Spark webpage. T There is a mailing list sparkr-dev@googlegroups.com and a JIRA, https://sparkr.atlassian.net/browse/SPARKR/. I have some shell code that will install SparkR on an AWS EC2 cluster and some demo code for using SparkR. However the code is still under construction and I've run into some issues that I've been talking with the SparkR developers about. But let me know if you'd like the code.

4.4) Alternatives to Spark

Many "big data" problems will fit on disk (and sometimes even in memory on machines with a lot of memory) on a single machine. If so, you may be better off doing the work on a single machine, potentially parallelizing calculations across multiple cores on that machine. Here are some of the tools that are useful for dealing with large datasets.

  • SQL for serial on-disk processing (e.g., from R or Python)

  • SAS for serial on-disk data processing

  • ff or bigmatrix packages in R for serial on-disk data processing (and biglm for fitting large linear models and GLMs)

  • data.table package in R for serial in-memory processing (this is fast if things fit in memory)

  • pbd package in R for distributed computation, particularly linear algebra and distributed matrices

  • numpy provides memory-mapped files via numpy.memmap()

  • manual coding to process data in batches from disk in any language

  • serial processing of files using UNIX tools such as sed, grep, cut, etc.

  • up-coming updates to LAPACK, ScaLAPACK for linear algebra on distributed memory clusters and on GPUs

4.5) More resources

5) Homework

  1. Try to run the demo code given in this document yourself. Try some small tweaks and play around some.

  2. Suppose we wanted to find the number of departing flights from each airport and add this as a column to our RDD, matched of course to the departure airport for each flight. Implement this using Spark. Essentially, of course, you're doing a manual database join. One could then, for example, add the log of the number of flights as a predictor in our regression modeling of flight delays.

  3. Consider logistic regression. Fit a logistic regression model for the probability that a departure delay is greater than 30 minutes. You're welcome to use whatever predictors you like, but you might just use the ones I've used above (distance and day of week). Work out the code to implement logistic regression where at each iteration you solve a least squares problem in closed form using the working weights and working observations of the usual IWLS/IRLS algorithm.