Skip to content

Commit

Permalink
Fix Snapshot Merge Query (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterallenwebb authored Jan 6, 2025
1 parent b6152d6 commit 47b19b0
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241217-163126.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix the snapshot merge query for several adapters in new_record mode.
time: 2024-12-17T16:31:26.970526-05:00
custom:
Author: peterallenwebb
Issue: "385"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

from dbt.tests.util import check_relations_equal, run_dbt

# Snapshot source data for the tests in this file
_seed_new_record_mode = """
BEGIN
create table {database}.{schema}.seed (
id INTEGER,
first_name VARCHAR(50),
Expand Down Expand Up @@ -88,6 +91,8 @@
md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id,
'False' as dbt_is_deleted
from {database}.{schema}.seed;
END;
"""

_snapshot_actual_sql = """
Expand Down Expand Up @@ -119,6 +124,8 @@


_invalidate_sql = """
BEGIN
-- update records 11 - 21. Change email and updated_at field
update {schema}.seed set
updated_at = updated_at + interval '1 hour',
Expand All @@ -131,6 +138,7 @@
dbt_valid_to = updated_at + interval '1 hour'
where id >= 10 and id <= 20;
END;
"""

_update_sql = """
Expand Down Expand Up @@ -169,8 +177,14 @@
where id >= 10 and id <= 20;
"""

# SQL to delete a record from the snapshot source data
_delete_sql = """
delete from {schema}.seed where id = 1
delete from {database}.{schema}.seed where id = 1
"""

# If the deletion worked correctly, this should return two rows, with one of them representing the deletion.
_delete_check_sql = """
select dbt_valid_to, dbt_scd_id, dbt_is_deleted from {schema}.snapshot_actual where id = 1
"""


Expand Down Expand Up @@ -222,4 +236,31 @@ def test_snapshot_new_record_mode(
results = run_dbt(["snapshot"])
assert len(results) == 1

# TODO: Further validate results.
check_result = project.run_sql(_delete_check_sql, fetch="all")
valid_to = 0
scd_id = 1
is_deleted = 2
assert len(check_result) == 2
assert (
sum(
[
1
for c in check_result
if c[valid_to] is None and c[scd_id] is not None and c[is_deleted] == "True"
]
)
== 1
)
assert (
sum(
[
1
for c in check_result
if c[valid_to] is not None
and c[scd_id] is not None
and c[is_deleted] == "False"
]
)
== 1
)
assert check_result[0][scd_id] != check_result[1][scd_id]
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

{% if strategy.hard_deletes == 'new_record' %}
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
{% endif %}
with snapshot_query as (

{{ source_sql }}
Expand Down Expand Up @@ -169,12 +171,13 @@
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}

)
{%- endif %}

Expand Down

0 comments on commit 47b19b0

Please sign in to comment.