-
Notifications
You must be signed in to change notification settings - Fork 1
/
forex_processing.py
26 lines (19 loc) · 927 Bytes
/
forex_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
warehouse_location = abspath('spark-warehouse')
#Initialize Spark Session
spark =SparkSession \
.builder \
.appName ("Forex processing") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
#Read the files forex_rates.jason from the HDFS
df = spark.read.jason("hdfs://namenode:9000/forex/forex_rates.json")
# Drop the duplicated rows based on the base and last_update columns
forex_rates = df.select('base', 'last_update', 'rates.eur', 'rates.usd', 'rates.cad', 'rates.gbp', 'rates.jpy', 'rates.nzd') \
.dropDuplicates(['base', 'last_update']) \
.fillna(0, subset=['EUR', 'USD', 'JPY', 'CAD', 'GBP', 'NZD'])
# Export the dataframe into the Hive table forex_rates
forex_rates.write.mode("append").insertInto("forex_rates")