Skip to content

Commit

Permalink
Fixed to be able to run without specifying clicks_tables and conversi…
Browse files Browse the repository at this point in the history
…ons_tables
  • Loading branch information
doryokujin committed Feb 15, 2024
1 parent b14a25d commit 5de1575
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ td:
# use_distinct: false

master_campaigns_tables:
# 507568:
# -
# db: taka
# table: master_campaigns_507568

utm_names:
utm_cv: utm_term

507568:
-
db: cdp_audience_507568
table: master_campaigns
167 changes: 87 additions & 80 deletions scenarios/cdp_campaign_management/incremental_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,35 @@ _export:
query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_activations_info}
insert_into: ${td.database}.${td.tables.daily_activations_info}

+prepare_master_campaings:
+ingest_master_campaigns:
if>: ${td.master_campaigns_tables == null || td.master_campaigns_tables[ps_id] == null}
_do:
+create_table:
td>: queries/create_master_campaigns.sql
dest_db: ${td.database}
dest_table: ${td.tables.master_campaigns}
_else_do:
+create_table:
td_ddl>:
drop_tables:
- ${td.tables.tmp_master_campaigns}
create_tables:
- ${td.tables.tmp_master_campaigns}
+ingest_to_tmp_table:
_parallel: ${parallel}
for_each>:
tbl_info: ${td.master_campaigns_tables[ps_id]}
_do:
td>:
query: SELECT * FROM ${tbl_info.db}.${tbl_info.table}
insert_into: ${td.database}.${td.tables.tmp_master_campaigns}

+de_duplication:
td>: queries/de_duplicate_master_campaings.sql
create_table: ${td.database}.${td.tables.master_campaigns}


+ingest_clicks:
+prepare_table:
td_ddl>:
Expand All @@ -43,55 +72,29 @@ _export:
create_tables:
- ${td.tables.tmp_clicks}

+prepare_master_campaings:
+ingest_master_campaigns:
if>: ${typeof td.master_campaigns_tables === 'undefined' || td.master_campaigns_tables == null}
_do:
+create_table:
td>: queries/create_master_campaigns.sql
dest_db: ${td.database}
dest_table: ${td.tables.master_campaigns}
_else_do:
+create_table:
td_ddl>:
drop_tables:
- ${td.tables.tmp_master_campaigns}
create_tables:
- ${td.tables.tmp_master_campaigns}
+ingest_to_tmp_table:
_parallel: ${parallel}
for_each>:
tbl_info: ${td.master_campaigns_tables[ps_id]}
_do:
td>:
query: SELECT * FROM ${tbl_info.db}.${tbl_info.table}
insert_into: ${td.database}.${td.tables.tmp_master_campaigns}

+de_duplication:
td>: queries/de_duplicate_master_campaings.sql
create_table: ${td.database}.${td.tables.master_campaigns}

+ingest_to_tmp_table:
_parallel: ${parallel}
for_each>:
tbl_info: ${td.clicks_tables[ps_id]}
_do:
_export:
url_column: ${tbl_info.url_col}
input_table: ${tbl_info.table}
campaign_db: ${td.database}
filter: ${(typeof tbl_info.filter === 'undefined') || tbl_info.filter}
master_campaigns_table: ${td.tables.master_campaigns}

time_column: "${tbl_info.is_audience_table ? 'timestamp' : tbl_info.time_col}"
input_db: "${tbl_info.is_audience_table ? 'cdp_audience_' + ps_id : tbl_info.db}"
user_column: "${tbl_info.is_audience_table ? 't2.' + user_id : 't1.' + user_id}"
user_column_inner: "${tbl_info.is_audience_table ? 'cdp_customer_id' : user_id}"
join_part: "${tbl_info.is_audience_table ? 'JOIN cdp_audience_' + ps_id + '.customers t2 ON t1.cdp_customer_id = t2.cdp_customer_id' : ''}"
distinct: "${typeof tbl_info.use_distinct === 'undefined' || !tbl_info.use_distinct ? '' : 'DISTINCT'}"

td>: queries/ingest_clicks.sql
insert_into: ${td.database}.${td.tables.tmp_clicks}
if>: ${td.clicks_tables == null || td.clicks_tables[ps_id] == null}
_else_do:
_parallel: ${parallel}
for_each>:
tbl_info: ${td.clicks_tables[ps_id]}
_do:
_export:
url_column: ${tbl_info.url_col}
input_table: ${tbl_info.table}
campaign_db: ${td.database}
filter: ${(typeof tbl_info.filter === 'undefined') || tbl_info.filter}
master_campaigns_table: ${td.tables.master_campaigns}

time_column: "${tbl_info.is_audience_table ? 'timestamp' : tbl_info.time_col}"
input_db: "${tbl_info.is_audience_table ? 'cdp_audience_' + ps_id : tbl_info.db}"
user_column: "${tbl_info.is_audience_table ? 't2.' + user_id : 't1.' + user_id}"
user_column_inner: "${tbl_info.is_audience_table ? 'cdp_customer_id' : user_id}"
join_part: "${tbl_info.is_audience_table ? 'JOIN cdp_audience_' + ps_id + '.customers t2 ON t1.cdp_customer_id = t2.cdp_customer_id' : ''}"
distinct: "${typeof tbl_info.use_distinct === 'undefined' || !tbl_info.use_distinct ? '' : 'DISTINCT'}"

td>: queries/ingest_clicks.sql
insert_into: ${td.database}.${td.tables.tmp_clicks}

+write_tmp_to_dest_table:
td>:
Expand All @@ -118,7 +121,7 @@ _export:
- ${td.tables.tmp_activations}

+ingest_to_tmp_table:
if>: ${typeof td.activations_tables === 'undefined' || td.activations_tables == null || td.activations_tables[ps_id] == null || !td.activations_tables[ps_id].scan_journey_tables}
if>: ${td.activations_tables == null || td.activations_tables[ps_id] == null || !td.activations_tables[ps_id].scan_journey_tables}
_do:
_export:
cdp_audience_db: cdp_audience_${ps_id}
Expand Down Expand Up @@ -155,26 +158,28 @@ _export:
- ${td.tables.tmp_conversions}

+ingest_to_tmp_table:
_parallel: ${parallel}
for_each>:
tbl_info: ${td.conversions_tables[ps_id]}
_do:
_export:
input_table: ${tbl_info.table}
filter: ${(typeof tbl_info.filter === 'undefined') || tbl_info.filter}
cv_name: ${tbl_info.cv_name}
val_col: ${tbl_info.val_col}
acquired_revenue_per_person: ${tbl_info.acquired_revenue_per_person}
if>: ${td.conversions_tables == null || td.conversions_tables[ps_id] == null }
_else_do:
_parallel: ${parallel}
for_each>:
tbl_info: ${td.conversions_tables[ps_id]}
_do:
_export:
input_table: ${tbl_info.table}
filter: ${(typeof tbl_info.filter === 'undefined') || tbl_info.filter}
cv_name: ${tbl_info.cv_name}
val_col: ${tbl_info.val_col}
acquired_revenue_per_person: ${tbl_info.acquired_revenue_per_person}

time_column: "${tbl_info.is_audience_table ? 'timestamp' : tbl_info.time_col}"
user_column: "${tbl_info.is_audience_table ? 't2.' + user_id : 't1.' + user_id}"
inner_user_column: "${tbl_info.is_audience_table ? 'cdp_customer_id' : user_id}"
input_db: "${tbl_info.is_audience_table ? 'cdp_audience_' + ps_id : tbl_info.db}"
join_part: "${tbl_info.is_audience_table ? 'JOIN cdp_audience_' + ps_id + '.customers t2 ON t1.cdp_customer_id = t2.cdp_customer_id' : ''}"
distinct: "${typeof tbl_info.use_distinct === 'undefined' || !tbl_info.use_distinct ? '' : 'DISTINCT'}"
time_column: "${tbl_info.is_audience_table ? 'timestamp' : tbl_info.time_col}"
user_column: "${tbl_info.is_audience_table ? 't2.' + user_id : 't1.' + user_id}"
inner_user_column: "${tbl_info.is_audience_table ? 'cdp_customer_id' : user_id}"
input_db: "${tbl_info.is_audience_table ? 'cdp_audience_' + ps_id : tbl_info.db}"
join_part: "${tbl_info.is_audience_table ? 'JOIN cdp_audience_' + ps_id + '.customers t2 ON t1.cdp_customer_id = t2.cdp_customer_id' : ''}"
distinct: "${typeof tbl_info.use_distinct === 'undefined' || !tbl_info.use_distinct ? '' : 'DISTINCT'}"

td>: queries/ingest_conversions.sql
insert_into: ${td.database}.${td.tables.tmp_conversions}
td>: queries/ingest_conversions.sql
insert_into: ${td.database}.${td.tables.tmp_conversions}


+write_tmp_to_dest_table:
Expand Down Expand Up @@ -203,19 +208,21 @@ _export:
- ${td.tables.tmp_conversion_journeys}

+ingest_to_tmp_table:
_parallel: ${parallel}
for_each>:
tbl_info: ${td.conversions_tables[ps_id]}
_do:
_export:
cv_name: ${tbl_info.cv_name}
input_db: ${td.database}
input_table_activations: ${td.tables.activations}
input_table_clicks: ${td.tables.clicks}
input_table_conversions: ${td.tables.conversions}

td>: queries/ingest_conversion_journeys.sql
insert_into: ${td.database}.${td.tables.tmp_conversion_journeys}
if>: ${td.conversions_tables == null || td.conversions_tables[ps_id] == null }
_else_do:
_parallel: ${parallel}
for_each>:
tbl_info: ${td.conversions_tables[ps_id]}
_do:
_export:
cv_name: ${tbl_info.cv_name}
input_db: ${td.database}
input_table_activations: ${td.tables.activations}
input_table_clicks: ${td.tables.clicks}
input_table_conversions: ${td.tables.conversions}

td>: queries/ingest_conversion_journeys.sql
insert_into: ${td.database}.${td.tables.tmp_conversion_journeys}

+write_tmp_to_dest_table:
td>:
Expand Down
Loading

0 comments on commit 5de1575

Please sign in to comment.