-
Notifications
You must be signed in to change notification settings - Fork 1
/
spark.Rmd
1149 lines (808 loc) · 62.3 KB
/
spark.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
An Introduction to Using Distributed File
======================================================================================
Systems and MapReduce through Spark
======================================================================================
Chris Paciorek, Statistical Computing Facility / Department of Statistics and Berkeley Research Computing, UC Berkeley
Presented: April 24, 2015
Last Revised: April 24, 2015
```{r setup, include=FALSE}
opts_chunk$set(engine='python', eval=FALSE) # because we're using a lot of bash, let's set as default
```
# 0) This Tutorial
The first half of this material covers starting up a Spark virtual cluster on Amazon's EC2 service, working with the Hadoop distributed file system (HDFS), reading data into spark and basic data processing.
The second half covers fitting basic statistical models, with examples of iterative and non-iterative fitting. We'll also see the use of Spark for simulation work and batch computation. Time permitting and based on participant interest, we can also have some discussion of what computations would make sense for doing in Spark and comparison with other potential approaches.
Materials for this tutorial, including the R markdown file that was used to create this document are available on github at `https://github.com/ucberkeley/ph290-spark-2015-spring`. You can download the files by doing a git clone:
```{clone, eval=FALSE}
git clone https://github.com/ucberkeley/ph290-spark-2015-spring
```
To create this HTML document, simply compile the corresponding R Markdown file in R:
```{rmd-compile, engine='bash'}
Rscript -e "library(knitr); knit2html('spark.Rmd')"
```
<!--
#pandoc --number-sections spark.md -o spark.html
#Rscript -e "library(knitr); knit('spark.Rmd')"
#pandoc --mathjax --number-sections spark.md -o spark.html
-->
To get the airline dataset we'll be using as a running example, you can download from [this URL](http://www.stat.berkeley.edu/share/paciorek/1987-2008.csvs.tgz) or as individual .bz2 files from `http://www.stat.berkeley.edu/share/paciorek/{1987,...,2008}.csv.bz2`. The demo code later in this document will do the downloading automatically.
There is a version of Spark for R (SparkR, see notes at the end of this document), so if you get the big picture from this but don't follow all the Python syntax, you should be able to get up to speed on Spark with R fairly quickly.
I'm using the R extension of Markdown for ease of dealing with Latex equations, though it is admittedly a bit strange to use R Markdown for a tutorial that doesn't involve R. For the most part this could have just been done as a Markdown document.
# 1) Introduction
## 1.1) Hadoop, MapReduce, and Spark
The goal of this tutorial is to introduce Hadoop-style distributed computing for statistical use. We'll focus on using MapReduce algorithms for data preprocessing and statistical model fitting. We'll make use of the Hadoop distributed file system (HDFS) for storing data and of Amazon's EC2 for creating virtual Linux clusters on which to do our computation.
To do all this we'll be making use of [Spark](http://spark.apache.org), a project out of the AMPLab here at Berkeley that aims to greatly increase the speed of Hadoop MapReduce by keeping data in memory when possible.
Spark allows one to write code in Java, Scala, and Python. We'll use the Python interface, called PySpark, as Python is more widely known in the Statistics and Economics communities and should be easier to get started with for those who don't know any of those languages.
I won't assume knowledge of Python but will assume basic familiarity with operating in UNIX shell and with a scripting languages similar to Python such as R or Matlab. We'll also make use of some basic concepts of object-oriented programming.
## 1.2) Some Cautionary Notes
1. Since PySpark involves serializing Python objects to pass them to Java, it may be slower than using Java or Scala for your MapReduce code. Here's a [thread with some info](http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-performance-differences-td4247.html).
2. If you can run your job using the memory and disk space on a single machine, you may be better off doing so, given the overhead involved in working in a distributed fashion. That said, Spark allows you to scale in a way that would otherwise be difficult.
3. Note that I've found the error messages in Spark (and also Hadoop) to be often very hard to understand. Some of this is because the messages are from the underlying Java implementation. So the process of figuring out syntax can be a bit painful at times. As usual, it's good to start with small test problems and to the extent possible test your Python code does what you expect before using that Python code in a Spark operation. Look carefully for error messages coming from Python that indicate bugs in your Python code.
In general, I'm not advocating for you to use MapReduce or Spark. They might or might not be the right tool for your job. But I do think it's valuable to understand how they work and be able to assess if they would be helpful for a given computation you need to do.
## 1.3) Getting Spark running on an EC2-based cluster
We'll focus on using the Amazon EC2 tools provided in the Spark software to start up an EC2 cluster with Spark and a distributed file system (the HDFS) already set up.
Note that a detailed set of instructions that mimics those here for starting up and logging into a Spark EC2 cluster are provided in the *startSparkVirtualCluster.txt* file in the workshop Github repository.
We'll use the *spark-ec2* script provided by Spark. To get the script, download and unzip the source code from the [Spark downloads page](http://spark.apache.org/downloads.html). Select the current release, the "Source Code" package type and "Direct Download" and then download the .tgz file. Simply unzip the .tgz file and navigate to the ec2 directory and you'll find the `spark-ec2` script there. For this tutorial we'll be using Spark 1.3.0.
```{r, engine='bash'}
SPARK_VERSION=1.3.0
tar -xvzf spark-${SPARK_VERSION}.tgz
cd spark-${SPARK_VERSION}/ec2
```
### 1.3.1) Starting your cluster
First let's launch an EC2 cluster. We can choose the number of slave nodes. You'll need to have the authentication information for your Amazon AWS account as well as have generated an SSH public-private key pair for use with your Amazon account.
```{r, engine='bash'}
CLUSTER_SIZE=6
SPARK_VERSION=1.3.0
REGION=us-west-2
SSH_KEY_NAME=FILL_IN_NAME_OF_YOUR_SSH_KEY # e.g., id_rsa
SSH_KEY_FILENAME=FILL_IN_NAME_OF_YOUR_SSH_KEY_FILENAME # e.g., id_rsa
export AWS_ACCESS_KEY_ID=FILL_IN_YOUR_ACCESS_KEY
export AWS_SECRET_ACCESS_KEY=FILL_IN_YOUR_SECRET_KEY
YOUR_NAME=SOME_UNIQUE_NAME_FOR_YOUR_CLUSTER
cd /usr/local/linux/spark/ec2
./spark-ec2 -k ${SSH_KEY_NAME} -i ~/.ssh/${SSH_KEY_FILENAME} --region=${REGION} -s ${CLUSTER_SIZE} -v ${SPARK_VERSION} launch sparkvm-${YOUR_NAME}
```
You'll need to wait for a few minutes. Look carefully for error messages as sometimes the startup of the nodes and installation of the Spark software does not go smoothly. If all goes well, the end of the messages printed to the screen should look like this:
```
Setting up security groups...
Setting up security groups...
Creating security group sparkvm-paciorek-master
Creating security group sparkvm-paciorek-slaves
Searching for existing cluster sparkvm-paciorek...
Spark AMI: ami-9a6e0daa
Launching instances...
Launched 6 slaves in us-west-2c, regid = r-fad6cef5
Launched master in us-west-2c, regid = r-64d7cf6b
Waiting for AWS to propagate instance metadata...
Waiting for cluster to enter 'ssh-ready' state.........
[thousands of lines of information about software installation will scroll by ...]
Shutting down GANGLIA gmond: [FAILED]
Starting GANGLIA gmond: [ OK ]
Shutting down GANGLIA gmond: [FAILED]
Starting GANGLIA gmond: [ OK ]
Connection to ec2-54-212-54-11.us-west-2.compute.amazonaws.com closed.
Shutting down GANGLIA gmond: [FAILED]
Starting GANGLIA gmond: [ OK ]
Connection to ec2-54-218-203-231.us-west-2.compute.amazonaws.com closed.
Shutting down GANGLIA gmond: [FAILED]
Starting GANGLIA gmond: [ OK ]
Connection to ec2-50-112-80-198.us-west-2.compute.amazonaws.com closed.
Shutting down GANGLIA gmetad: [FAILED]
Starting GANGLIA gmetad: [ OK ]
Stopping httpd: [FAILED]
Starting httpd: [ OK ]
Connection to ec2-54-190-212-116.us-west-2.compute.amazonaws.com closed.
Spark standalone cluster started at http://ec2-54-190-212-116.us-west-2.compute.amazonaws.com:8080
Ganglia started at http://ec2-54-190-212-116.us-west-2.compute.amazonaws.com:5080/ganglia
Done!
```
Also note that I've had other glitches in starting up a Spark cluster. You should not get messages about files not found nor should the startup interrupt you to ask you to answer any questions. If either of these happen, something may have gone wrong. However, you are likely to see messages that indicate SSH connection errors while the startup script waits for the EC2 nodes to become available for SSH access.
Note that when you want to shut down your cluster (and remember it will cost you money as long as it is running), do the following and make sure to answer 'y' when prompted.
```{r, eval=FALSE}
./spark-ec2 --region=${REGION} --delete-groups destroy sparkvm-${YOUR_NAME}
```
If you want to see some of the various options and actions possible with the spark-ec2 script just do:
```{r, eval=FALSE}
./spark-ec2
```
### 1.3.2) Navigating around your cluster
First we'll login to the master node of our cluster.
```{r, engine='bash'}
./spark-ec2 -k ${SSH_KEY_NAME} -i ~/.ssh/${SSH_KEY_FILENAME} --region=${REGION} login sparkvm-${YOUR_NAME}
```
You could also login directly with ssh provided you notice the IP address of your master in the startup messages or on the AWS EC2 webpage for your AWS account:
```{r, eval=FALSE}
ssh -i ~/.ssh/${SSH_KEY_FILENAME} root@ec2-54-71-181-49.us-west-2.compute.amazonaws.com
```
Note that Spark expects you to be the root user so we'll do everything as root. Since this is a virtual cluster that we can shut down whenever we want, this isn't as dangerous as it would be with a regular computer.
Sometimes your connection might be interrupted and you'll lose your work, so you may want to run your work on the master node using the UNIX *screen* program:
```{r, engine='bash'}
screen -x -RR
```
If you get disconnected, simply login to the master node and type the command above and you'll be reconnected to your ongoing session.
Once on the master, the addresses of the slave nodes are given in `/root/ephemeral-hdfs/conf/slaves`. You can choose one of them and ssh to it using the address. Then tools such as `top` will allow you to see what is going on on the machine.
Spark provides graphical interfaces that allow you to view the status of the system. Open a browser on your local machine. You'll need the IP address (I'll call this `<master_url>` below) stored in `/root/ephemeral-hdfs/conf/masters` on the master node. Here are the web interfaces you might take a look at:
1. `http://<master_url>:8080` -- general information about the Spark cluster
2. `http://<master_url>:4040` -- information about the Spark tasks being executed
3. `http://<master_url>:50070` -- information about the HDFS
### 1.3.3) Getting data onto your cluster and onto the HDFS
Here are some options.
#### Copying to the master node followed by copying to the HDFS
You can use `scp` to copy files to the master node. The easiest thing is to be logged onto the master node and scp from your local machine to the master. But you can also use the IP address from `/root/ephemeral-hdfs/conf/masters` to scp from your local machine to the master, making sure to use the `-i ~/.ssh/${SSH_KEY_FILENAME}` flag to point to your private key.
We'll copy the ASA Data Expo Airline dataset:
```{r, engine='bash'}
mkdir /mnt/airline
cd /mnt/airline
wget http://www.stat.berkeley.edu/share/paciorek/1987-2008.csvs.tgz
tar -xvzf 1987-2008.csvs.tgz
```
Note that transfer to your cluster is free, but transfer from your cluster can be expensive. Campus is setting up a portal to AWS to greatly reduce that cost. Email consult@stat.berkeley.edu or consult@econ.berkeley.edu for more information.
Once you have files on the master you can set up directories in the HDFS as desired and copy the dataset onto the HDFS as follows. Often your dataset will be a collection of (possibly zipped) plain text files, such as CSVs. You don't need to unzip the files - Spark can read data from zipped files.
```{r, engine='bash'}
export PATH=$PATH:/root/ephemeral-hdfs/bin/
hadoop fs -mkdir /data/airline
hadoop fs -copyFromLocal /mnt/airline/*bz2 /data/airline
hadoop fs -ls /data/airline
```
Note that the commands to interact with the HDFS are similar to UNIX commands (`mkdir`, `ls`, etc.) but they must follow the `hadoop fs -` syntax.
Note that in general you want your dataset to be split into multiple chunks but not so many that each chunk is tiny (HDFS has trouble when there are millions of files -- see [this explanation](http://blog.cloudera.com/blog/2009/02/the-small-files-problem/), nor so few that the dataset can't be distributed across the nodes of the distributed file system or cannot be distributed roughly equally in size across the nodes. However, in terms of the latter issues, one can "repartition" the data in Spark.
If you've done some operations and created a dataset (called *airline-sfo* here) on the HDFS that you want to transfer back to the master node, you can do the following. Note that this may give you back a dataset broken into pieces (one per Spark partition) so you may need to use UNIX tools to combine into one file.
```{r, engine='bash'}
hadoop fs -copyToLocal /data/airline-sfo /mnt/airline-sfo
```
#### Copying directly from Amazon's S3
If the files are stored on Amazon's S3, you can copy directly onto the HDFS. Here's how, using the Google ngrams dataset as an example. Note that we need Hadoop's MapReduce processes running to do this distributed copy operation and Spark does not start these by default. (For doing MapReduce within Spark, we do NOT need to do this.)
```{r, engine='bash'}
# start MapReduce processes
/root/ephemeral-hdfs/bin/start-mapred.sh
hadoop fs -mkdir /data/ngrams
hadoop distcp -D fs.s3n.awsAccessKeyId="<AWS_ACCESS_KEY_ID>" -D fs.s3n.awsSecretAccessKey="<AWS_SECRET_ACCESS_KEY>" \
s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram hdfs:///data/ngrams
```
You'll need to use your AWS account information in place of the `<AWS_ACCESS_KEY_ID>` and `<AWS_SECRET_ACCESS_KEY>` strings here. Also check to see whether you need "s3" or "s3n" for the dataset you are interested in. Also, you may want to see if the dataset is stored in a particular AWS region as this may incur additional charges to transfer across regions. If so you may want to start up your cluster in the region in which the data reside.
# 2) Using Spark
Ok, now we're in a position to do computations with Spark.
We can run PySpark in the form of a Python command line interpreter session or in batch mode to run the code from a PySpark/Python code file.
Before you do so, if you plan to use numpy, make sure you do the following. There is a (new) glitch in the EC2 setup that Spark provides and numpy is not installed on the version of Python that Spark uses (Python 2.7). To install numpy on both the master and worker nodes, do the following as root on the master node.
```{r, engine='bash'}
wget https://bootstrap.pypa.io/get-pip.py
python27 get-pip.py
yum install python27-devel
pip2.7 install numpy
/root/spark-ec2/copy-dir /usr/local/lib64/python2.7/site-packages/numpy
```
To start PySpark in an interactive session, we do this
```{r, engine='bash'}
export PATH=${PATH}:/root/spark/bin
pyspark
```
## 2.1) Background
Spark is AMPLab's effort to create a version of Hadoop that uses memory as much as possible to reduce the speed of computation, particularly for iterative computations. So in addition to the distributed file system, you can think of Spark as providing you with access to distributed memory - the collective memory of all of the nodes in your cluster. However, that memory is still physically separated so you're not operating as if you had a single machine. But the abstractions provided by Spark allow you to do a lot of computations that would be a pain to program yourself even with access to a cluster and use of something like MPI for passing messages between machines.
## 2.2) Reading data into Spark and doing basic manipulations
### 2.2.1) Reading data into Spark
#### Reading from HDFS
First we'll see how to read data into Spark from the HDFS and do some basic manipulations of the data.
```{r, eval=FALSE}
lines = sc.textFile('/data/airline')
lines.count()
```
The `textFile()` method reads the data from the airline directory, dealing with the compression and with the various files that data is split into.
Note that the first line seems to take no time at all. That is because Spark uses **lazy evaluation**, delaying evaluation of code until it needs to actually produce a result. So sometimes it can be hard to figure out how long each piece of a calculation takes because multiple pieces are being combined together. I'll sometimes do a `count()` just to force the evaluation of the previous command. Transformations such as `map()` and `reduce()` do not trigger evaluation but actions such as `count()`, `collect()`, and `saveAsTextFile()` do. Note also that serialization between Java and Python objects only occurs at barriers defined by actions, so if you have a series of transformations, you shouldn't incur repeated serialization.
You should see a bunch of logging messages that look like this:
```
14/10/31 01:15:53 INFO mapred.FileInputFormat: Total input paths to process : 22
14/10/31 01:15:53 INFO spark.SparkContext: Starting job: count at <stdin>:1
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Got job 3 (count at <stdin>:1) with 22 output partitions (allowLocal=false)
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Final stage: Stage 3(count at <stdin>:1)
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Missing parents: List()
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Submitting Stage 3 (PythonRDD[16] at RDD at PythonRDD.scala:43), which has no missing parents
14/10/31 01:15:53 INFO storage.MemoryStore: ensureFreeSpace(5128) called with curMem=273374, maxMem=278302556
14/10/31 01:15:53 INFO storage.MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.0 KB, free 265.1 MB)
14/10/31 01:15:53 INFO storage.MemoryStore: ensureFreeSpace(3265) called with curMem=278502, maxMem=278302556
14/10/31 01:15:53 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 3.2 KB, free 265.1 MB)
14/10/31 01:15:53 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on ip-10-225-185-17.us-west-2.compute.internal:54745 (size: 3.2 KB, free: 265.4 MB)
14/10/31 01:15:53 INFO storage.BlockManagerMaster: Updated info of block broadcast_9_piece0
14/10/31 01:15:53 INFO scheduler.DAGScheduler: Submitting 22 missing tasks from Stage 3 (PythonRDD[16] at RDD at PythonRDD.scala:43)
14/10/31 01:15:53 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 22 tasks
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 6, ip-10-249-30-169.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 3.0 (TID 7, ip-10-249-65-147.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 3.0 (TID 8, ip-10-249-62-60.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 12.0 in stage 3.0 (TID 9, ip-10-249-14-81.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 3.0 (TID 10, ip-10-249-14-142.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 8.0 in stage 3.0 (TID 11, ip-10-249-71-5.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 3.0 (TID 12, ip-10-249-25-57.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 3.0 (TID 13, ip-10-226-152-67.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 9.0 in stage 3.0 (TID 14, ip-10-226-142-5.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes)
14/10/31 01:15:53 INFO scheduler.TaskSetManager: Starting task 11.0 in stage 3.0 (TID 15, ip-10-249-31-110.us-west-2.compute.internal, NODE_LOCAL, 1243 bytes
...
...
14/10/31 01:15:54 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on ip-10-249-71-5.us-west-2.compute.internal:49964 (size: 4.7 KB, free: 3.1 GB)
14/10/31 01:15:54 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on ip-10-249-25-57.us-west-2.compute.internal:37497 (size: 4.7 KB, free: 3.1 GB)
14/10/31 01:15:54 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on ip-10-249-62-60.us-west-2.compute.internal:56086 (size: 4.7 KB, free: 3.1 GB)
14/10/31 01:15:57 INFO scheduler.TaskSetManager: Starting task 19.0 in stage 3.0 (TID 27, ip-10-249-31-110.us-west-2.compute.internal, ANY, 1243 bytes)
14/10/31 01:16:14 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 6) in 21020 ms on ip-10-249-30-169.us-west-2.compute.internal (1/22)
14/10/31 01:16:29 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 3.0 (TID 17) in 35919 ms on ip-10-249-15-15.us-west-2.compute.internal (2/22)
14/10/31 01:16:31 INFO scheduler.TaskSetManager: Finished task 9.0 in stage 3.0 (TID 14) in 38311 ms on ip-10-226-142-5.us-west-2.compute.internal (3/22)
14/10/31 01:16:37 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 3.0 (TID 18) in 43942 ms on ip-10-249-30-169.us-west-2.compute.internal (4/22)
14/10/31 01:16:55 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 3.0 (TID 10) in 62229 ms on ip-10-249-14-142.us-west-2.compute.internal (5/22)
...
```
There are 22 tasks because there are 22 data files. Each task is done as a single process on one of the nodes.
You should see there are about 123 million observations (rows) in the dataset (the dataset is an *RDD* object). More on RDDs in a bit.
#### What if the data are not one-line-per-row?
You can read in each file as an individual observation using `wholeTextFiles()`.
You can use `flatMap()` if each row contains multiple observations.
#### Reading into Spark from S3
If we wanted to read directly into Spark from S3 (again checking if we need s3 or s3n), we can do something like this, here reading the Google n-grams word co-occurrence dataset:
```{r, eval=FALSE}
lines = sc.textFile("s3n://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram")
```
Note the ":" to separate the two parts of your AWS credential information. That was to read from a set of files but you could also point to a specific individual file. And you may be able to use wildcards.
Note that if either of your AWD credential strings has a "/" in it, that can cause problems because it will be interpreted as a file path separator (":" would probably also cause similar problems). As an alternative, you can add your credentials in the following format as lines in the `/root/ephemeral-hdfs/conf/core-site.xml` configuration file.
```
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>YOUR_AWS_ACCESS_KEY_ID</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>YOUR_AWS_SECRET_ACCESS_KEY</value>
</property>
```
In that case you would just do:
```{r, eval=FALSE}
lines = sc.textFile("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram")
```
The Google ngrams data is somewhat big so that may take a while. To test that things are working, try this test file:
```{r, eval=FALSE}
lines = sc.textFile("s3n://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@berkeley-scf-test/test/2009_nfl_pbp_data.csv")
lines.count()
```
You should get that there are 43064 lines.
To get a directory of files rather than a single file you should be able to give a directory name, and I think wildcards are also allowed to indicate multiple files.
#### Repartitioning the input
To better distribute the data across the nodes, I'm going to repartition the dataset into more than 22 chunks.
```{r, eval=FALSE}
lines = sc.textFile('/data/airline').repartition(192).cache()
```
And I also "cached" the RDD so that it is retained in memory for later processing steps.
We'll come back to both of these ideas in a bit.
### 2.2.2) Subsetting in Spark
The next thing we could do is subset the dataset using the `filter()` method. Here's an example:
```{r, eval=FALSE}
sfo_lines = lines.filter(lambda line: "SFO" in line.split(',')[16]).cache()
sfo_lines.count()
```
The filtering function (in this case my anonymous (lambda) function) should produce a single boolean. I could have used a named function here too.
There are 2373910 observations corresponding to flights from SFO.
Note that the processing here shows 192 tasks, corresponding to the now 192 partitions.
### 2.2.3) Extracting data from an RDD
If the subset is sufficiently small we can pull it into Python as a standard Python object in the PySpark/Python session on our master node.
```{r, eval=FALSE}
sfo = sfo_lines.collect()
type(sfo)
# <type 'list'>
len(sfo)
# 2733910
sfo[1] # but remember Python indexing starts at 0
# u'1992,1,2,4,708,705,1243,1222,US,1118,NA,215,197,NA,21,3,SFO,MCI,1499,NA,NA,0,NA,0,NA,NA,NA,NA,NA'
```
Or I could save the subset as a new HDFS dataset that I could transfer back to the standard filesystem on the master using the HDFS `copyToLocal` command from the UNIX prompt on the master node.
```{r, eval=FALSE}
sfo_lines.saveAsTextFile('/data/airline-sfo')
```
### 2.2.4) Stratifying in Spark
Now suppose I want to use Spark to do some stratification. Here's a very basic stratification. We'll do it using MapReduce.
I'm going to group by departure/arrival airport pairs and then count the number of flights. I'll use a mapper, `createKeyValue()`, to create key-value pairs where the key is the route and the value is just a 1 and then I do a reduction, stratifying by key, where I simply count the records associated with each key by using `add` as the reduce function. In Python, the key-value pairs are just a 2-element tuple (I think a 2-element list would be fine), but the value could be arbitrarily more complicated than the scalar quantity 1 that I use here.
```{r, eval=FALSE}
from operator import add
def createKeyValue(line):
vals = line.split(',')
return(vals[16]+'-'+vals[17], 1)
routeCount = lines.map(createKeyValue).reduceByKey(add).collect()
mx = routeCount[0]
for i in xrange(len(routeCount)):
if routeCount[i][1] > mx[1]:
mx = routeCount[i]
```
Any guesses about the route with the most flights?
I wrote that as presented above to illustrate key-value pairs and using a map-reduce pair of functions, but for simply counting, I could have done it more efficiently like this:
```
routeCount2 = lines.map(createKeyValue).countByKey()
```
Note that the result is a dictionary not a list.
### 2.2.5) Getting a few elements for inspection and debugging
One useful thing is to be able to grab a single element or a few elements from an Spark dataset. This allows us to look at the structure of units in the dataset and test our code by applying our Python functions directly to the units.
```
oneLine = lines.take(1)
# test my createKeyValue() function:
createKeyValue(oneLine)
createKeyValue(oneLine[0])
keyValues = lines.map(createKeyValue).cache()
twoKeyValues = keyValues.take(2)
twoKeyValues
# [(u'AVP-PIT', 1), (u'LAX-IND', 1)]
twoKeyCounts = keyValues.reduceByKey(add).take(2)
# [(u'ORD-SEA', 101949), (u'MCI-MSY', 925)]
```
Make sure you know what kind of Python object is in use for each observation in the RDD - originally it's often just a text line from CSV, but after mapping it would often be a tuple or a list. It's good to check your code on individual elements or pairs of elements from the RDD obtained via `take()`. Note that sometimes things will be embedded in a list and you'll need to burrow into it as seen in the example where we had a list of length one containing the string.
## 2.3) Some fundamental ideas in MapReduce and Spark
Now that we've seen a few example computations, let's step back and discuss a few concepts.
### 2.3.1) RDDs
Spark datasets are called Resilient Distributed Datasets (RDDs). Spark operations generally take the form of a method applied to an RDD object and the methods can be chained together, as we've already seen.
Note also the *sc* object we used for reading from the HDFS was a *SparkContext* object that encapsulates information about the Spark setup. The `textfile()` function is a method used with SparkContext objects.
### 2.3.2) Caching
Spark by default will try to manipulate RDDs in memory if they fit in the collective memory of the nodes. You can tell Spark that a given RDD should be kept in memory so that you can quickly access it later by using the `cache()` method. To see the RDDs that are cached in memory, go to `http://<master_url>:4040` and click on the "Storage" tab.
If an RDD won't fit in memory, Spark will recompute it as needed, but this will involve a lot of additional computation. In some (but not all) cases in which there is not enough collective memory you may want it cached on disk. The [Spark programming guide](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) has more information.
### 2.3.3) Partitions and repartitioning
The number of chunks your dataset is broken into can greatly impact computational efficiency. When a map function is applied to the RDD, the work is broken into one task per partition.
You want each partition to be able to fit in the memory availalbe on a node, and if you have multi-core nodes, you want that as many partitions as there are cores be able to fit in memory.
For load-balancing you'll want at least as many partitions as total computational cores in your cluster and probably rather more partitions. The Spark documentation suggests 2-4 partitions (which they also seem to call *slices*) per CPU. Often there are 100-10,000 partitions. Another rule of thumb is that tasks should take at least 100 ms. If less than that, you may want to repartition to have fewer partitions.
As an example that we've already seen, the original airline dataset is in 22 partitions, one per input CSV file, and those partitions are different sizes because the data size varies by year. So it's a good idea to repartition the dataset as you're doing your initial manipulations on it, as we did above. But it did take a while to do that because of all the transferring of data across nodes.
### 2.3.4) MapReduce
MapReduce is a sort of meta-algorithm. If you can write your algorithm/computation as a MapReduce algorithm, then you should be able to implement it in Spark or in standard Hadoop, including Amazon's Elastic MapReduce framework.
The basic idea is that your computation be written as a one or more iterations over a *Map* step and a *Reduce* step. The map step takes each 'observation' (each unit that can be treated independently) and applies some transformation to it. Think of a mathematical mapping. The reduction step then takes the results and does some sort of aggregation operation that returns a summary measure. Sometimes the mapping step involves computing a key for each unit. The units that share the same key are then collected together and a reduction step may be done for each key value.
Actually, it's more general than that. We may have multiple map steps before any reduction or no reduction at all. Also `reduceByKey()` returns an RDD, so there could potentially be multiple reductions.
Reduction functions should be associative and commutative. It shouldn't matter what order the operations are done in or which observations are grouped with which others in order that the reduction can be done in parallel in a simple fashion.
We'll see a variety of MapReduce algorithms here so this should become more concrete.
In addition to map and reduce steps, we can explicitly pass shared data to all the nodes via a broadcast step. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
## 2.4) Additional data processing in Spark
Now let's do some aggregation/summarization calculations. One might do this sort of thing in the process of reducing down a large dataset to some summary values on which to do the core analysis.
First we'll do some calculations involving sums/means that naturally fit within a MapReduce paradigm and then we'll do a median calculation, which is not associative and commutative so is a bit harder and wouldn't scale well.
### 2.4.1) Calculating means as a simple MapReduce task
Here we'll compute the mean departure delay by airline-month-origin-destination sets. So we'll create a compound key and then do reduction separately for each key value (stratum).
Our map function will exclude missing data and the header 'observations'.
```{r, engine='python'}
def mapper(line):
vals = line.split(',')
key = '-'.join([vals[x] for x in [8,1,16,17]])
if vals[0] == 'Year' or vals[14] == 'NA':
key = '0'
delay = 0.0
valid = 0.0
else:
delay = float(vals[14])
valid = 1.0
return(key, (delay, valid))
def reducer( (dep1, valid1), (dep2, valid2) ):
return( (dep1 + dep2), (valid1 + valid2) )
mappedLines = lines.map(mapper).cache()
tmp = mappedLines.reduceByKey(reducer)
tmp
# PythonRDD[11] at RDD at PythonRDD.scala:43
results = tmp.collect()
results[0:3]
# [(u'EA-11-ATL-DEN', (1531.0, 265.0)), (u'NW-11-MSP-SDF', (3290.0, 882.0)), (u'DL-9-SEA-JFK', (5301.0, 577.0))]
means = [(val[0], val[1][0]/val[1][1]) for val in results if val[1][1] > 0.0]
means[0:3]
# [(u'EA-11-ATL-DEN', 5.777358490566038), (u'NW-11-MSP-SDF', 3.7301587301587302), (u'DL-9-SEA-JFK', 9.1871750433275565)]
```
Note that the result of the `reduceByKey()` is just another (small) RDD.
Note that we wrote our own reduce function because Python lists/tuples don't add in a vectorized fashion so we couldn't just use `operator.add()`, but if we had made the values in the key-value pairs to be *numpy* arrays we could have just done `reduceByKey(add)`.
### 2.4.2) Calculating medians as a non-standard MapReduce task
This will be more computationally intensive because we don't have an associative and commutative function for reduction. Instead we need all the values for a given key to calculate the summary statistic of interest. So we use `groupByKey()` and then apply the median function to the RDD where each key has as its value the entire set of values for that key from the mapper. I suspect that Spark/Hadoop experts would frown on what I do here as it wouldn't scale as the strata sizes increase, but it seems fairly effective for these sizes of strata.
```{r, engine='python'}
def medianFun(input):
import numpy as np
if len(input) == 2:
if len(input[1]) > 0:
med = np.median([val[0] for val in input[1] if val[1] == 1.0])
return((input[0], med))
else:
return((input[0], -999))
else:
return((input[0], -9999))
output = mappedLines.groupByKey()
output.count()
# 186116
medianResults = output.map(medianFun).collect()
medianResults[0:3]
# [(u'EA-11-ATL-DEN', 0.0), (u'NW-11-MSP-SDF', -3.0), (u'DH-8-HPN-ORD', -7.0)]
# here's code to show the iterable object produced by groupByKey()
dat = output.take(1)
dat[0][0]
len(dat[0][1])
[val for val in dat[0][1]]
```
Note that the median calculation is another map function not a reduce, because it's just a 1-1 transformation of the RDD that is returned by `groupByKey()`. But of course the size of the value associated with each key is much larger before the application of the `medianFun()` map step.
### 2.4.3) Stratifying data and exporting
Now suppose we wanted to create multiple datasets, one per stratum. This does not fit all that well into the MapReduce/HDFS paradigm. We could loop through the unique values of the stratifying variable using `filter()` and `saveAsTextFile()` as seen above. There are not really any better ways to do this. In previous tries, I used `sortByKey()` first and that improved speed but I've been running into errors recently when doing that, with Spark losing access to some of the worker tasks (executors). Here I'll stratify by departure airport.
```{r, engine='python'}
def createKeyValue(line):
vals = line.split(',')
keyVal = vals[16]
return(keyVal, line)
keyedLines = lines.map(createKeyValue).cache()
keys = keyedLines.countByKey().keys()
for curkey in keys:
fn = '/data/' + curkey
keyedLines.filter(lambda x: curkey == x[0]).map(lambda x: x[1]).repartition(1).saveAsTextFile(fn)
```
The `repartition(1)` ensures that the entire RDD for a given key is on a single partition so that we get a single file for each stratum instead of having each stratum split into pieces across the different nodes hosting the HDFS. The mapper simply pulls out the value for each key-value pair, discarding the key.
## 2.5) Doing simulation via a simple MapReduce calculation
We'll do a simulation to estimate the value of $\pi$ as an embarrassingly parallel calculation using MapReduce.
Here's the Python code
```{r, eval=FALSE, engine='python'}
import numpy.random as rand
from operator import add
samples_per_slice = 1000000
num_cores = 24
num_slices = num_cores * 20
def sample(p):
rand.seed(p)
x, y = rand.random(samples_per_slice), rand.random(samples_per_slice)
return sum(x*x + y*y < 1)
count = sc.parallelize(xrange(0, num_slices), num_slices).map(sample).reduce(add)
print "Pi is roughly %f" % (4.0 * count / (num_slices*samples_per_slice))
```
Let's piece that apart. The `parallelize()` call basically takes the numbers {0,1,2,...,num_slices-1} and treats them as the input "dataset". These are the initial units and then the transformation (the mapper) is to do the random number generation and compute the partial sum. The second argument to `parallelize()` indicates how many partitions to create, so this will correspond to the number of tasks that will be carried out in a subsequent map step. Using `paralllelize()` to create an RDD from an existing Python list is another way to create an RDD, in addition to reading data from an external storage system such as the HDFS.
A few comments here. First we could have sampled a single random vector, thereby calling `parallelize()` across numslices*samples_per_slice, with one task per {x,y} pair. But it's much more computationally efficient to do a larger chunk of calculation in each process since there is overhead in running each process. But we want enough processes to make use of all the processors collectively in our Spark cluster and probably rather more than that in case some processors do the computation more slowly than others. Also, if the computation involved a large amount of memory we wouldn't want to exceed the physical RAM available to the processes that work simultaneously on a node. However, we also don't want too many tasks. For example, if we try to have as many as 50 million parallel tasks, Spark will hang. Somehow the overhead in creating that many tasks is too great, though I need to look further for a complete explanation.
## 2.6) Random number generation in parallel
A basic approach to deal with parallel random number generation (RNG) is to set the seed based on the task id, e.g., passed in via `parallelize()` above. This doesn't guarantee that the random number streams for the different tasks will not overlap, but it should be unlikely. Unfortunately Python does not provide the L'Ecuyer algorithm, or other algorithms to ensure non-overlapping streams, unlike R and Matlab.
## 2.7) Using PySpark in batch (non-interactive mode)
We can also do that calculation as a batch job.
Here's the code in the file *piCalc.py*:
```{r, eval=FALSE, engine='python'}
import sys
import numpy.random as rand
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: spark-submit piCalc.py <total_samples> <slices>"
exit(-1)
sc = SparkContext()
total_samples = int(sys.argv[1]) if len(sys.argv) > 1 else 1000000
num_slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
samples_per_slice = round(total_samples / num_slices)
def sample(p):
rand.seed(p)
x, y = rand.random(samples_per_slice), rand.random(samples_per_slice)
return sum(x*x + y*y < 1)
count = sc.parallelize(xrange(0, num_slices), num_slices).map(sample).reduce(add)
print "Pi is roughly %f" % (4.0 * count / (num_slices*samples_per_slice))
```
Note that by starting Python via PySpark, the *sc* object is created and available to you, but for batch jobs we need to import *SparkContext* and instantiate the *sc* object ourselves.
And here's how we run it from the UNIX command line on the master node
```{r, eval=FALSE}
spark-submit piCalc.py 100000000 1000
```
<!--
# pyspark piCalc.py `cat /root/spark-ec2/cluster-url` 100000000 1000
-->
# 3) Model fitting with Spark
## 3.1) Regression example with sufficient statistics
Suppose we wanted to run a regression with very many observations, $n$, but relatively few predictors, $p$. For our purposes here, let's assume $p < 5000$ so that $X^\top X$ takes up at most about 200 Mb. In this case we can compute the sufficient statistics, $X^\top X$ and $X^\top Y$ using MapReduce and return them to the master for the final calculation.
The basic calculation we are doing is $\sum_{i=1}^n x_i x_i^\top$ and $\sum_{i=1}^n x_i y_i$.
```{r, eval=FALSE}
import numpy as np
from operator import add
def screen(vals):
vals = vals.split(',')
return(vals[0] != 'Year' and vals[14] != 'NA' and vals[18] != 'NA'
and vals[3] != 'NA' and
float(vals[14]) < 720 and float(vals[14]) > (-30) )
lines = sc.textFile('/data/airline').filter(screen).repartition(192).cache()
P = 8
def crossprod(line):
vals = line.split(',')
y = float(vals[14])
dist = float(vals[18])
dayOfWeek = int(vals[3])
xVec = np.array([0.0] * P)
xVec[0] = 1.0
xVec[1] = float(dist)/1000
if dayOfWeek > 1:
xVec[dayOfWeek] = 1.0
xtx = np.outer(xVec, xVec)
xty = xVec * y
return(np.c_[xtx, xty])
suffStats = lines.map(crossprod).reduce(add)
mles = np.linalg.solve(suffStats[0:P,0:P], suffStats[0:P,P])
# array([ 6.366239 , 0.76375807, -0.69956913, 0.39282445, 2.22473943,
# 2.88670704, -2.42725064, -0.13621746])
```
### More efficient calculation using `mapPartitions()`
That calculation of the sufficient statistics applied the linear algebra row by row. For the full airline dataset it took about 11 minutes. A more efficient alternative is to put all the data for a given partition into a matrix and do the linear algebra on the matrix. The code below takes about 2.5 minutes.
Here we use `mapPartitions()` to apply a map to an entire partition - the `readPointBatch()` function iterates through the observations in the partition and constructs the $X$ matrix before then doing the crossproduct.
In contrast to the approach above, our reduction is now done over the $K$ matrices, one per partition: $\sum_{k=1}^K X_k^\top X_k$ and $\sum_{k=1}^K X_k^\top y_k$.
```{r, eval=FALSE}
def readPointBatch(iterator):
strs = list(iterator)
matrix = np.zeros((len(strs), P+1))
for i in xrange(len(strs)):
vals = strs[i].split(',')
dist = float(vals[18])
dayOfWeek = int(vals[3])
xVec = np.array([0.0] * (P+1))
xVec[8] = float(vals[14]) # y
xVec[0] = 1.0 # int
xVec[1] = float(dist) / 1000
if(dayOfWeek > 1):
xVec[dayOfWeek] = 1.0
matrix[i] = xVec
return [matrix.T.dot(matrix)]
suffStats2 = lines.mapPartitions(readPointBatch).reduce(add)
mles = np.linalg.solve(suffStats2[0:P,0:P], suffStats2[0:P,P])
```
It's not entirely clear to me why I needed to return a list with a single matrix from `readPointBatch()` as opposed to simply returning the matrix, but I expect it's just that PySpark needs each element of the RDD to be a list (or perhaps a tuple instead).
## 3.2) Coordinate descent for linear regression
The strategy above makes a lot of sense but breaks down as $P$ gets large. We might consider coordinate descent, iterating through the individual regression coefficients. Of course this would probably be quite slow for large $P$, but it's worthwhile to illustrate and sets the stage for our use of coordinate descent for lasso problems in the next section.
For linear regression, the optimization for a single coefficient, holding the others constant, can be done in closed form: $\hat{\beta}_{k} = \frac{\sum_{i=1}^n r_i y_i} {\sum_{i=1}^n x_{k,i}^2 }$ where $r_i = y_i - \sum_{j \ne k} x_{j,i}\beta_j$.
We'll precompute the denominator values in a MapReduce step. Our coordinate descent is then an iterative procedure with an inner loop that loops over the coefficients. For simplicity below I just do a fixed number of iterations, but in reality of course we would want a reasonable stopping criterion for the optimization.
Note that for efficiency we'll again use the `mapPartitions()` version of this algorithm.
```{r, eval=FALSE}
P=8
def readPointBatch(iterator):
strs = list(iterator)
matrix = np.zeros((len(strs), P+1))
print(len(strs))
for i in xrange(len(strs)):
vals = strs[i].split(',')
dist = float(vals[18])
dayOfWeek = int(vals[3])
xVec = np.array([0.0] * (P+1))
xVec[8] = float(vals[14]) # y
xVec[0] = 1.0 # int
xVec[1] = float(dist) / 1000
if(dayOfWeek > 1):
xVec[dayOfWeek] = 1.0
matrix[i] = xVec
return [matrix]
# set up the partitions with the submatrices
batches = lines.mapPartitions(readPointBatch).cache()
def denomSumSqBatch(mat):
# compute denominator of update: sum of squared predictor
return((mat*mat).sum(axis=0))
def getNumBatch(mat):
# compute numerator of update: covariance of residual and predictor
beta[p] = 0
sumXb = mat[:, 0:P].dot(beta)
return(sum((mat[:,P] - sumXb)*mat[:,p]))
# compute denominator of update
sumx2 = batches.map(denomSumSqBatch).reduce(add)
# initialize coefficient values
beta = np.array([0.0] * P)
p = 0
oldBeta = beta.copy()
it = 0
# finally.... do the optimization!
#while crit > tol and its < maxIts:
for it in range(1,100):
for p in xrange(P):
# distribute current beta and current coordinate
sumNum = batches.map(getNumBatch).reduce(add)
# update coordinate:
beta[p] = sumNum / sumx2[p]
crit = sum(abs(beta - oldBeta))
oldBeta = beta.copy()
# interim reporting of estimates:
print("-"*100)
print(beta)
print(crit)
print("-"*100)
print beta
# after 4 iterations
#[ 7.0987348 0.57795465 -1.30359727 -0.21110881 1.6211203 2.28315019
# -3.02584311 -0.73731912]
# after 100 iterations
#[ 6.36623919 0.76375807 -0.69956932 0.39282426 2.22473924 2.88670685
# -2.42725082 -0.13621764]
```
The parameters haven't converged fully after four iterations, but are equal to the MLE by 100 iterations (I didn't check carefully to see where convergence had been reached, so 100 iterations may be serious overkill).
Each update of a coordinate takes about a 7 seconds.
## 3.3) Lasso via coordinate descent
A standard approach to fitting the Lasso is coordinate descent, as done in R's *glmnet* package, documented here in [Friedman et al.'s well-known paper](http://www.jstatsoft.org/v33/i01) (1700 citations according to Google scholar!) on computation for penalized GLMs.
To test out my code, I'm going to generate a large simulated dataset, with 10 non-zero coefficients of varying size and 90 zero coefficients. The individual coefficient updates are similar to the coordinate descent for linear regression, involving covariances between residuals and predictor, but with thresholding. For the example here, I just do this with a single value of the penalty parameter, $\lambda$, but one could readily extend this to compute the solution path across different values of $\lambda$, using the idea of warm starts, in which the optimization for the next value of the penalty starts at the estimates found for the current value of the penalty.
First I'll generate some simulated data with known coefficients. In this case the code is set up to either generate independent predictors or autocorrelated predictors, depending on the value of *autocorr*.
```{r, eval=FALSE}
import numpy as np
from operator import add
n = 10000000
P = 100
nNonZero = 10
betaTrue = [-2, -1.5, -1.0, -0.5, -0.3, -0.1, 0.3, 0.5, 1.0, 1.5]
betaTrue = np.append(betaTrue, np.zeros(P-nNonZero))
# create dataset as RDD
# allow generation of autocorrelated X's to make optimiz'n harder
autocorr = 1
rho = 0.7
def simData(it):
np.random.seed(it)
x = np.random.normal(size = P)
if autocorr == 1:
for ind in xrange(P-1):
x[ind+1] = rho*x[ind] + x[ind+1]
y = sum(x*betaTrue) + np.random.normal(size = 1)
return(np.append(x, y))
data = sc.parallelize(xrange(n)).repartition(96).map(simData)
```
As with the regression, let's set up each batch to have all of its data as a single matrix.
```{r, eval=FALSE}
def readPointBatch(iterator):
vals = list(iterator)
matrix = np.zeros((len(vals), P+1))
for i in xrange(len(vals)):
matrix[i] = vals[i]
return [matrix]
batchesLasso = data.mapPartitions(readPointBatch)
```
Now I'll standardize the predictors so we can directly implement the equations of the Friedman et al. paper.
```{r, eval=FALSE}
def getSums(mat):
return mat.sum(axis = 0)
colMeans = (batchesLasso.map(getSums).reduce(add) / n)[0:P]
def stdizeMean(mat):
mat[:, 0:P] = mat[:, 0:P] - colMeans
return mat
batchesLasso = batchesLasso.map(stdizeMean)
def getSumSqs(mat):
return (mat*mat).sum(axis = 0)
colSds = (pow(batchesLasso.map(getSumSqs).reduce(add) / n, 0.5))[0:P]
def stdizeSd(mat):
mat[:, 0:P] = mat[:, 0:P] / colSds[0:P]
return mat
batchesLasso = batchesLasso.map(stdizeSd).cache()
```
Now we'll implement the basic *naive algorithm* from the Friedman et al. paper.
```{r, eval=FALSE}
eps = 0.01
alpha = 1 - eps # alpha = 1 is lasso penalty, but with a slight numerical modification
lamb = 0.5 # value of penalty parameter
def Sfun(z, gamma):
if gamma >= abs(z):
return(0)
if z > 0:
return(z - gamma)
return(z + gamma)
def getNumBatch(mat):
if nonZeros == None:
sumXb = 0
else:
# poor man's sparse implementation
sumXb = mat[:, nonZeros].dot(beta[nonZeros])
return(sum((mat[:,P] - sumXb)*mat[:,p]))
beta = np.array([0.0] * P)
nonZeros = None
p = 0
oldBeta = beta.copy()
it = 0
nIts = 6
storeVals = np.zeros((nIts, P+1))
for it in range(0,nIts):
for p in xrange(P):
# get numerator as product of residual and X for coordinate
sumNum = batchesLasso.map(getNumBatch).reduce(add)
beta[p] = Sfun(sumNum/n + beta[p], lamb*alpha) / (1 + lamb*eps)
if beta[p] != 0:
if nonZeros == None:
nonZeros = np.array([p])
elif sum(p == nonZeros) == 0:
nonZeros = np.append(nonZeros, p)
nonZeros.sort()
print("Updated var " + str(p) + " in iteration ", str(it), ".")
crit = sum(abs(beta - oldBeta))
storeVals[it, 0] = crit
storeVals[it, 1:(P+1)] = beta
oldBeta = beta.copy()
print("-"*100)
print(beta)
print(crit)
print("-"*100)
#('Updated var 99 in iteration ', '5', '.')
#----------------------------------------------------------------------------------------------------
#[-1.68283553 -1.72371224 -1.22416959 -0.45692377 0. 0. 0.
# 0.32505124 1.3951588 1.77561668 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. 0. 0. 0. 0. 0.
# 0. 0. ]
#0.18567818344
#----------------------------------------------------------------------------------------------------
```
After 5 iterations, the Lasso seems to be getting close to convergence (the sum of the absolute difference in the coefficients is 0.19). Each update of a coefficient takes about 3 seconds and a single full iteration about 6 minutes.
As noted, I implemented the *naive algorithm*. The *covariance updating algorithm* also presented in Friedman et al. makes use of the sparsity of the coefficient vector to achieve computational speedup.
## 3.4) Linear regression via gradient descent
The coordinate descent algorithm for linear regression would be slow when there are many predictors as we are doing iterations in Spark over the coordinates. It would be faster to update all the coefficients at once and have the "iteration" over the coefficients be done within Python in the computation of the gradient (i.e., have the for loop across predictors calculate the gradient vector in Python within a partition).
Here's gradient descent in Spark. Note that as is general for descent methods, we could step too far and the objective function might increase, particularly when we are not using the second derivatives to guide our step sizes. I haven't put in any checking here to ensure that our steps always go downhill, but I do provide a function to evaluate the objective function -- this could be used to make the algorithm more sophisticated.
As above with coordinate descent, we'll use the batches RDD that has the data in one matrix per partition.