-
Notifications
You must be signed in to change notification settings - Fork 2
/
Problem#3 _crime_main.txt
58 lines (48 loc) · 2.37 KB
/
Problem#3 _crime_main.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
Instructions
Get top 3 crime types based on number of incidents in RESIDENCE area using "Location Description"
Data Description
Data is available in HDFS under /public/crime/csv
Download this data from https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-Present/ijzp-q8t2/data
crime data information:
Structure of data: (ID, Case Number, Date, Block, IUCR, Primary Type,
Description, Location Description, Arrst, Domestic, Beat, District, Ward,
Community Area, FBI Code, X Coordinate, Y Coordinate, Year, Updated on,
Latitude, Longitude, Location)
File format - text file
Delimiter - "," (use regex while splitting split(",(?=(?:[^\"]*\"[^\"]*\")*
[^\"]*$)", -1), as there are some fields with comma and enclosed using double quotes.
Output Requirements
Output Fields: crime_type, incident_count
Output File Format: JSON
Delimiter: N/A
Compression: No
End of Problem
#Solution
#using DF API
crimes=spark.read.format('csv').\
option("inferSchema", True).\
option("header", True).\
option("quote",'"').\
option("delimiter", ",").\
load('/public/crime/csv/')
from pyspark.sql.functions import desc,count
from pyspark.sql.window import *
#filter and select needed data
cf=crimes.where(" `Location Description` == 'RESIDENCE' ").selectExpr("ID", "`Location Description`", "`Primary Type`")
#aggregate
cfa=cf.groupBy('Primary Type').agg(count('ID').alias('crimecounts'))
#window filter to top 3
cfw= cfa.withColumn('drank', dense_rank().over(Window.orderBy(desc('crimecounts')))).where("drank <=3").selectExpr("`Primary Type` as `Crime Type`", "crimecounts as `Number of Incidents`")
#output and save
output=cfw.orderBy('Number of Incidents', ascending=False)
output.write.mode('overwrite').format('json').save('/public/data/output/sol3/')
#validate
import os
os.system("hdfs dfs -ls /public/data/output/sol3/")
os.system("hdfs dfs -cat /public/data/output/sol3/part-00000-0cf0a79a-ac03-4b2e-a672-46dc66e7044d-c000.json | more")
#using SparkSQL
crimes.createOrReplaceTempView('crimes')
output=spark.sql(" SELECT `Primary Type` as `Crime Type`, crimecounts as `Number of Incidents` FROM \
(SELECT `Primary Type`, crimecounts, dense_rank() over(order by crimecounts desc) as RNK FROM \
(SELECT `Primary Type`, count(ID) AS crimecounts FROM crimes WHERE `Location Description`='RESIDENCE' GROUP BY `Primary Type`) T \
)RT WHERE RNK<=3 order by `Number of Incidents` desc")