This project is actually a collection of custom operators,hooks and sensors.
The example dag with this repo will be scheduled daily at 3.00 P.M IST and will try to fetch tweets from @diprjk twitter handle which is the Official Twitter handle of Department of Information and Public Relations, Govt of Jammu & Kashmir. The DAG uses a custom sensor to sense tweet related to daily COVID update of J&K.
When the tweet is found, the custom operator will download the update which is in the form of an image and saves it in locally for further analysis.
Then the operator will use a libary ExtractTable (https://extracttable.github.io/ExtractTable-py/) to convert the image into a dataframe.
And finally the dataframe is processed and converted into a structured csv file.
Installation of airflow is simple, you can find installation details for airflow on the official website airflow.apache.org
This plugins is tested with airflow v 1.10.9.
if you have airflow allready installed then jump to Step 12
- We will be using virtualenv so make sure virtual env is installed
~$ pip install virtualenv
or
~$ pip3 install virtualenv
- Create a new directory for our project like airflow_tweet_test or something you may wish
~$ mkdir airflow_tweet_test
- cd into the newly created directory
~$ cd airflow_tweet_test
- Use python virtualenv to create a virtual environment for our project
~$ python -m virtualenv {name_of_virtual_env}
where {name_of_virtual_env } is the name given to your virtual environment.
- Airflow requires a special variable
PATH
variable with the nameAIRFLOW_HOME
it's necessary for yourairflow
to find the project files. Let's do it one by one.
- Now use the following command to open an editor
~$ nano ./{name_of_virtual_env}/bin/activate
This will open a bash script, scroll to the end of the file and paste the following lines at the end of the file where {full_project_path} is full path of your project and {name_of_virtual_env } is the name given to your virtual environment.
#This is for AIRFLOW usage
export AIRFLOW_HOME=/{full_project_path}
press CTRL+X and y, to close the editor
- You are ready to activate your virtual environment. enter the following command to activate it
~$ source ./{name_of_virtual_env}/bin/activate
- Now install airflow using pip inside virtual env
~$ pip install apache-airflow==1.10.9
~$ pip install SQLAlchemy==1.3.15
- Check if airflow is sucessfully installed or not. Type following command inside terminal
~$ airflow version
- create a new folder
plugins
inside our project home
~$ mkdir plugins
~$ cd plugins
- Now clone this repo inside
plugins
directory
~$ git clone https://github.com/kundroomajid/twitter_plugin.git
- To install twitter_plugin dependencies, we prefer installation using
requirements.txt
file. Enter the following command to install other dependent packages required for our twitter_plugin
~$ pip install -r ./twitter_plugin/requirements.txt
- Once all the dependencies are installed, its time to initialize your
airflow
meta-database
~$ airflow initdb
- Once you have initialized your airflow db, you can start airflow webserver and scheduler
~$ airflow webserver
~$ airflow scheduler
- Now we need to add few required airflow variables and airflow connections using airflow UI.
twitter_plugin
requires aconfig
variable (An Airflow Variable) which contains few details like:twitter_account_id
: twitter account id of @diprjkemail
: Your Email Id for notifications related stuff *find_param
: The string on which the sensor senses if tweet related to covid update is avalaible or not *since_id
: Initially empty .But later will be automatically updated by DAG.NOTE :
Create an Airflow Variable namedconfig
using Airflow UI, with value as shown in below example.example
:
{ "frequency": "daily", "twitter_account_id": 830669077022531584, "email": "yourmail@domain.com", "find_param": "Media Bulletin on Novel", "since_id": 1388848538663088135 }
twitter_plugin
Also requires atwitter_default
connection (An Airflow Connection) which contains twitter API credentials (visit : https://developer.twitter.com/en/apply-for-access for more info):consumer_key
: Obtained from twitter developer accountconsumer_secret
: Obtained from twitter developer account *access_token
: Obtained from twitter developer account *access_token_secret
: Obtained from twitter developer accountNOTE :
Create an Airflow Connection namedtwitter_default
using Airflow UI, leave all fields blank put your twitter credentials inside extra field like shown below.example
:
{ "consumer_key" : "xxxxxx", "consumer_secret":"xxxxxxxxxx", "access_token":"xxxxxxxxx", "access_token_secret":"xxxxxx" }
twitter_plugin
Also requires aextract_table_default
connection (An Airflow Connection) which contains ExtractTable API credentials (visit : https://extracttable.github.io/ExtractTable-py/):password
: ExtractTable API keyNOTE :
Create an Airflow Connection namedextract_table_default
using Airflow UI, leave all fields blank put your extracttable API key inside password field.
- Now copy
dag_tweet_etl.py
fromtwitter_plugin/example_dags/
toproject directory/dags
if dags directory is not avalaible please create one inside project root.
- Also we need to add smtp server details in
airflow.cfg
file to recieve email alerts like
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user = yoursmtpemailid
smtp_password = yourpassword
smtp_port = 587
smtp_mail_from = airflow@example.com
- And we are good to go.