-
Notifications
You must be signed in to change notification settings - Fork 2
/
Problem#2.txt
60 lines (44 loc) · 2.45 KB
/
Problem#2.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Instructions
Get the customers who have not placed any orders, sorted by customer_lname and
then customer_fname
Data Description
Data is available in local file system /data/retail_db
retail_db information:
Source directories: /data/retail_db/orders and /data/retail_db/customers
Source delimiter: comma(",")
Source Columns - orders - order_id, order_date, order_customer_id,
order_status
Source Columns - customers - customer_id, customer_fname,
customer_lname and many more
Output Requirements
Target Columns: customer_lname, customer_fname
Number of Files: 1
File format should be text
delimiter is (",")
Compression: Uncompressed
End of Problem
#using DF APIs
from pyspark.sql.functions import substring, sum,desc, concat, dense_rank,concat_ws
customers =spark.read.format('csv').option("delimter",",").load("/public/data/retail_db/customers/")
orders=spark.read.format('csv').option("delimter",",").schema("order_id int, order_date string, order_customer_id int,order_status string").load("/public/data/retail_db/orders/")
#Filter and select required
cf=customers.selectExpr("_c0 as customer_id", "_c1 as customer_fname", "_c2 as customer_lname")
#join
cj=cf.join(orders, orders.order_customer_id==cf.customer_id, "left").where("order_customer_id is null")
#create output and ave
output=cj.orderBy(['customer_lname', 'customer_fname'], ascending=[1,1]).selectExpr("ws_concatcustomer_lname, 'customer_fname')
#another way of order by
output=cj.orderBy(cf.customer_lname.asc(), cf.customer_fname.asc()).selectExpr("concat_ws(',',customer_lname, customer_fname)")
output.coalesce(1).write.mode('overwrite').format('text').save("/public/data/output/sol2/")
#validate
import os
os.system("hdfs dfs -ls /public/data/output/sol2/")
os.system("hdfs dfs -cat /public/data/output/sol2/part-00000-695221b2-113d-483c-9405-dea6d1c759f9-c000.txt | more")
#if you are running hadoop commands in another terminal you don't need to import os. -head is not working with os.
hdfs dfs -head /public/data/output/sol2/part-00000-695221b2-113d-483c-9405-dea6d1c759f9-c000.txt
#using sparkSQL
cf.createOrReplaceTempView('cf')
orders.createOrReplaceTempView('orders')
cj=spark.sql("SELECT customer_lname,customer_fname FROM cf LEFT JOIN orders ON customer_id=order_customer_id WHERE order_customer_id is null ORDER BY customer_lname,customer_fname")
cj.createOrReplaceTempView('cj')
output= spark.sql("SELECT concat_ws(',',customer_lname,customer_fname) from cj")