-
Notifications
You must be signed in to change notification settings - Fork 1
/
fores_data_pipeline_v_3_PythonOperator.py
61 lines (54 loc) · 2.18 KB
/
fores_data_pipeline_v_3_PythonOperator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
import jason
import csv
import requests
default_args = {
"owner": "airflow",
"start_date": datetime(2019, 1, 1),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"email": "youremail@host.com",
"retries": 1,
"retry_delay": timedelta(minutes=5)
}
# Download forex rates according to the currencies we want to watch
# described in the file forex_currencies.csv
def download_rates():
with open('/usr/local/airflow/dags/files/forex_currencies.csv') as forex_currencies:
reader = csv.DictReader(forex_currencies, delimiter=';')
for row in reader:
base = row['base']
with_pairs = row['with_pairs'].split(' ')
indata = requests.get('https://api.exchangeratesapi.io/latest?base=' + base).json()
outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
for pair in with_pairs:
outdata['rates'][pair] = indata['rates'][pair]
with open('/usr/local/airflow/dags/files/forex_rates.json', 'a') as outfile:
json.dump(outdata, outfile)
outfile.write('\n')
with DAG(dag_id="forex_data_pipeline_v_3", schedule_interval="@daily", default_args=default_args, catchup=False) as dag:
is_forex_rates_available = HttpSensor(
task_id="is_forex_rates_available",
method="GET",
http_conn_id="forex_api",
endpoint="latest",
response_check=lambda response: "rates" in response.text,
poke_interval=5,
timeout=20
)
is_forex_currencies_file_available = FileSensor(
task_id="is_forex_currencies_file_available",
fs_conn_id="forex_path",
filepath="forex_currencies.csv",
poke_interval=5,
timeout=20
downloading_rates = PythonOperator(
task_id="downloading_rates",
python_callable = download_rates
)
)