This blog is all about a pipeline that fetches daily covid data as it is orchestrated by apache-airflow and saves the data into the BigQuery table.
You should have a GCP account with admin or privileged access, and apache-airflow installed or you can use cloud composers also(GCP).
This project consists of two files named as function.py
which contains a couple of functions to fetch and save the data, dag.py
which contains codes for airflow. I will also give a Dockerfile
, in case you need this project to deploy.
- codes for
function.py
# function to fetch the case data
def fetch_cases():
# request to the API to get the case data
response = requests.get('https://api.covid19api.com/dayone/country/india/status/confirmed')
data = response.json()
# Extract the relevant fields from the data
cases = []
for record in data:
date = record['Date']
cases = record['Cases']
cases.append({'date': date, 'cases': cases})
return cases
# Define a function to save the case data to BigQuery
def save_cases(**kwargs):
# Extract the case data from the kwargs
cases = kwargs['ti'].xcom_pull(task_ids='fetch_cases')
# Create a BigQuery client
client = bigquery.Client()
# Create a new dataset
dataset = bigquery.Dataset(client.dataset('covid_data_dataset'))
dataset.location = 'US'
dataset = client.create_dataset(dataset)
# Create a new table in the dataset
table = bigquery.Table(dataset.table('case_data_table'), schema=[
bigquery.SchemaField('date', 'DATE'),
bigquery.SchemaField('cases', 'INTEGER')
])
table = client.create_table(table)
# Load the data into the table
errors = client.insert_rows(table, rows=cases)
if errors:
print(errors)
- codes for
dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import requests
from datetime import timedelta
import json
from google.cloud import bigquery
from function import fetch_cases, save_cases
default_args = {
'owner': 'my-covid-dag',
'start_date': days_ago(2),
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'covid_cases_fetch_push',
default_args=default_args,
description='Fetch daily COVID-19 case data and store it in BigQuery',
schedule_interval='@daily',
)
fetch_cases_task = PythonOperator(
task_id='fetch_cases',
python_callable=fetch_cases,
dag=dag,
)
save_cases_task = PythonOperator(
task_id='save_cases',
python_callable=save_cases,
provide_context=True,
dag=dag,
)
fetch_cases_task >> save_cases_task
- codes for
Dckerfile
FROM python:3.10
RUN pip install requests
RUN pip install pip install python-csv
# Copy the code into the image
COPY covid_fetch.py . # . means current directory, set yours.
# Set the working directory
WORKDIR . # . means current directory, set yours.
CMD ["python", "covid_fetch.py"]
If you don't want to save the data into tables instead you want the data into .csv
format saved directly into your local directory then just modify function.py
by changing save_cases
function with the below codes.
# Define a function to save the case data to a CSV file
def save_cases(**kwargs):
# Extract the case data from the kwargs
cases = kwargs['ti'].xcom_pull(task_ids='fetch_cases')
# Write the case data to a CSV file
with open('./cases.csv', 'w', newline='') as csvfile:
fieldnames = ['date', 'cases']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for case in cases:
writer.writerow(case)