Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enregistrement des dataframes entre les tâches en table dans une DB spécifique (vs xcom) #1139

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.template
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
AIRFLOW_CONN_QFDMO_DJANGO_DB=postgresql+psycopg2://qfdmo:qfdmo@localhost:6543/qfdmo
AIRFLOW_CONN_QFDMO_DATA_DB=postgresql+psycopg2://qfdmo:qfdmo@localhost:6543/qfdmo-data
AIRFLOW_WEBSERVER_REFRESHACTEUR_URL=http://localhost:8080
ALLOWED_HOSTS=localhost,127.0.0.1
AWS_ACCESS_KEY_ID=
Expand Down
2 changes: 1 addition & 1 deletion dags/annuaire_entreprise_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

def fetch_and_parse_data(**context):
limit = context["params"]["limit"]
engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine
active_actors_query = """
SELECT
da.*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from compute_acteurs.tasks.business_logic import apply_corrections_acteur
from shared.tasks.database_logic.db_manager import PostgresConnectionManager
from utils import logging_utils as log

logger = logging.getLogger(__name__)
Expand All @@ -20,9 +21,38 @@ def apply_corrections_acteur_wrapper(**kwargs):
df_acteur = kwargs["ti"].xcom_pull(task_ids="load_acteur")
df_revisionacteur = kwargs["ti"].xcom_pull(task_ids="load_revisionacteur")

# get dag_run_id from kwargs
dag_id = kwargs["dag_run"].dag_id
dag_run_id = kwargs["dag_run"].run_id
# get task id
task_id = kwargs["task_instance"].task_id

log.preview("df_acteur", df_acteur)
log.preview("df_revisionacteur", df_revisionacteur)
log.preview("dag_id", dag_id)
log.preview("dag_run_id", dag_run_id)

return apply_corrections_acteur(
result = apply_corrections_acteur(
df_acteur=df_acteur, df_revisionacteur=df_revisionacteur
)
db_manager = PostgresConnectionManager()
db_manager.write_data_xcom(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
dataset_name="df_acteur_merged",
df=result["df_acteur_merged"],
)
db_manager.write_data_xcom(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
dataset_name="df_children",
df=result["df_children"],
)
# {
# "df_acteur_merged": df_acteur_merged,
# # ["parent_id", "child_id", "child_source_id"]
# "df_children": df_children
# }
return result
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def apply_corrections_acteur(df_acteur: pd.DataFrame, df_revisionacteur: pd.Data
df_acteur_merged["uuid"] = df_acteur_merged["identifiant_unique"].apply(
lambda x: shortuuid.uuid(name=x)
)

df_children = df_children[
["parent_id", "identifiant_unique", "source_id"]
].reset_index(drop=True)
return {
"df_acteur_merged": df_acteur_merged,
# ["parent_id", "child_id", "child_source_id"]
"df_children": df_children[
["parent_id", "identifiant_unique", "source_id"]
].reset_index(drop=True),
"df_children": df_children,
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def db_data_write(
inplace=True,
)

engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine

original_table_name_actor = "qfdmo_displayedacteur"
temp_table_name_actor = "qfdmo_displayedacteurtemp"
Expand Down
4 changes: 2 additions & 2 deletions dags/ingest_validated_dataset_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def fetch_and_parse_data(**context):
row = _get_first_dagrun_to_insert()
dag_run_id = row[0]

engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine

df_sql = pd.read_sql_query(
f"SELECT * FROM qfdmo_dagrunchange WHERE dag_run_id = '{dag_run_id}'",
Expand Down Expand Up @@ -92,7 +92,7 @@ def write_data_to_postgres(**kwargs):
data_dict = kwargs["ti"].xcom_pull(task_ids="fetch_and_parse_data")
# If data_set is empty, nothing to do
dag_run_id = data_dict["dag_run_id"]
engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine
if "actors" not in data_dict:
with engine.begin() as connection:
dag_ingest_validated_utils.update_dag_run_status(connection, dag_run_id)
Expand Down
51 changes: 44 additions & 7 deletions dags/shared/tasks/database_logic/db_manager.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,64 @@
import logging
from datetime import datetime

import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
from sqlalchemy.engine import Engine

logger = logging.getLogger(__name__)


def _table_name(dag_id: str, dag_run_id: str, task_id: str, dataset_name: str):
# dag_run_id remove str before __
dag_run_id = dag_run_id.split("__")[1]
timestamp = datetime.strptime(dag_run_id, "%Y-%m-%dT%H:%M:%S.%f%z")
timestamp = int(timestamp.timestamp())
return f"{dag_id[:10]}_{timestamp}_{task_id[:10]}_{dataset_name}"


class PostgresConnectionManager:
"""
Singleton class to manage the connection to the Postgres database.
use the connecter qfdmo_django_db by default
Singleton class to manage the connections to the Postgres database.
use the connecters qfdmo_django_db and qfdmo_data_db by default
this connecter is set by using env variable AIRFLOW_CONN_QFDMO_DJANGO_DB
and AIRFLOW_CONN_QFDMO_DATA_DB
"""

# FIXME : créer une migration pour créer la ase de données qfdmodata

_instance = None

def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(PostgresConnectionManager, cls).__new__(cls)
return cls._instance

def __init__(self, postgres_conn_id="qfdmo_django_db"):
def __init__(self, django_conn_id="qfdmo_django_db", data_conn_id="qfdmo_data_db"):
if not hasattr(self, "initialized"): # Pour éviter la réinitialisation
self.postgres_conn_id = postgres_conn_id
self.engine = self._create_engine()
self.django_conn_id = django_conn_id
self.data_conn_id = data_conn_id
self.django_engine = self._create_engine(self.django_conn_id)
self.data_engine = self._create_engine(self.data_conn_id)
self.initialized = True

def _create_engine(self) -> Engine:
pg_hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
def _create_engine(self, engine_id: str) -> Engine:
pg_hook = PostgresHook(postgres_conn_id=engine_id)
return pg_hook.get_sqlalchemy_engine()

def write_data_xcom(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
dataset_name: str,
df: pd.DataFrame,
):

table_name = _table_name(dag_id, dag_run_id, task_id, dataset_name)
df.to_sql(table_name, self.data_engine, if_exists="replace", index=False)

def read_data_xcom(
self, dag_id: str, dag_run_id: str, task_id: str, dataset_name: str
) -> pd.DataFrame:
table_name = _table_name(dag_id, dag_run_id, task_id, dataset_name)
return pd.read_sql(f"SELECT * FROM {table_name}", self.data_engine)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def db_read_propositions_max_id_task(dag: DAG) -> PythonOperator:


def db_read_propositions_max_id():
engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine

# TODO : check if we need to manage the max id here
displayedpropositionservice_max_id = engine.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def source_config_validate_task(dag: DAG) -> PythonOperator:


def source_config_validate_wrapper(**kwargs) -> None:
engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine
params = kwargs["params"]

codes_sc_db = set(
Expand Down
2 changes: 1 addition & 1 deletion dags/sources/tasks/business_logic/db_read_acteur.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def db_read_acteur(
raise ValueError(
"La colonne source_id est requise dans la dataframe normalisée"
)
engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine
unique_source_ids = df_normalized["source_id"].unique()

joined_source_ids = ",".join([f"'{source_id}'" for source_id in unique_source_ids])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def df_normalize_sinoe(

@retry(wait=wait_fixed(5), stop=stop_after_attempt(5))
def enrich_from_ban_api(row: pd.Series) -> pd.Series:
engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine

adresse = row["adresse"] if row["adresse"] else ""
code_postal = row["code_postal"] if row["code_postal"] else ""
Expand Down
2 changes: 1 addition & 1 deletion dags/utils/dag_eo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
def insert_dagrun_and_process_df(df_acteur_updates, metadata, dag_name, run_name):
if df_acteur_updates.empty:
return
engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine
current_date = datetime.now()

with engine.connect() as conn:
Expand Down
2 changes: 1 addition & 1 deletion dags/utils/db_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

def read_data_from_postgres(**kwargs):
table_name = kwargs["table_name"]
engine = PostgresConnectionManager().engine
engine = PostgresConnectionManager().django_engine
df = pd.read_sql_table(table_name, engine).replace({pd.NA: None})
if df.empty:
raise ValueError(f"DB: pas de données pour table {table_name}")
Expand Down
Loading