Skip to content

Commit

Permalink
Add support for getting freshness from DBMS metadata (#8795)
Browse files Browse the repository at this point in the history
* Add support for getting freshness from DBMS metadata

* Add changelog entry

* Add simple test case

* Change parsing error to warning and add new event type for warning

* Code review simplification of capability dict.

* Revisions to the capability mechanism per review

* Move utility function.

* Reduce try/except scope

* Clean up imports.

* Simplify typing per review

* Unit test fix
  • Loading branch information
peterallenwebb authored Oct 11, 2023
1 parent bf10a29 commit 2e35426
Show file tree
Hide file tree
Showing 17 changed files with 1,184 additions and 953 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231008-134329.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow freshness to be determined via DBMS metadata for supported adapters
time: 2023-10-08T13:43:29.884766-04:00
custom:
Author: peterallenwebb
Issue: "8704"
82 changes: 66 additions & 16 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
Set,
Tuple,
Type,
TypedDict,
Union,
)

from dbt.adapters.capability import Capability, CapabilityDict
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint

import agate
Expand All @@ -44,7 +46,12 @@
)

from dbt.adapters.protocol import AdapterConfig
from dbt.clients.agate_helper import empty_table, merge_tables, table_from_rows
from dbt.clients.agate_helper import (
empty_table,
get_column_value_uncased,
merge_tables,
table_from_rows,
)
from dbt.clients.jinja import MacroGenerator
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.nodes import ResultNode
Expand Down Expand Up @@ -76,6 +83,7 @@
GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
FRESHNESS_MACRO_NAME = "collect_freshness"
GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified"


class ConstraintSupport(str, Enum):
Expand Down Expand Up @@ -110,7 +118,7 @@ def test(row: agate.Row) -> bool:
return test


def _utc(dt: Optional[datetime], source: BaseRelation, field_name: str) -> datetime:
def _utc(dt: Optional[datetime], source: Optional[BaseRelation], field_name: str) -> datetime:
"""If dt has a timezone, return a new datetime that's in UTC. Otherwise,
assume the datetime is already for UTC and add the timezone.
"""
Expand Down Expand Up @@ -162,12 +170,10 @@ def submit(self, compiled_code: str) -> Any:
raise NotImplementedError("PythonJobHelper submit function is not implemented yet")


class AdapterFeature(str, Enum):
"""Enumeration of optional adapter features which can be probed using BaseAdapter.has_feature()"""

CatalogByRelations = "CatalogByRelations"
"""Flags support for retrieving catalog information using a list of relations, rather than always retrieving all
the relations in a schema """
class FreshnessResponse(TypedDict):
max_loaded_at: datetime
snapshotted_at: datetime
age: float # age in seconds


class BaseAdapter(metaclass=AdapterMeta):
Expand Down Expand Up @@ -231,6 +237,10 @@ class BaseAdapter(metaclass=AdapterMeta):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

# This static member variable can be overriden in concrete adapter
# implementations to indicate adapter support for optional capabilities.
_capabilities = CapabilityDict({})

def __init__(self, config) -> None:
self.config = config
self.cache = RelationsCache()
Expand Down Expand Up @@ -1164,7 +1174,7 @@ def get_catalog(
futures: List[Future[agate.Table]] = []
catalog_relations = self._get_catalog_relations(manifest, selected_nodes)
relation_count = len(catalog_relations)
if relation_count <= 100 and self.has_feature(AdapterFeature.CatalogByRelations):
if relation_count <= 100 and self.supports(Capability.SchemaMetadataByRelations):
relations_by_schema = self._get_catalog_relations_by_info_schema(catalog_relations)
for info_schema in relations_by_schema:
name = ".".join([str(info_schema.database), "information_schema"])
Expand Down Expand Up @@ -1203,7 +1213,7 @@ def calculate_freshness(
loaded_at_field: str,
filter: Optional[str],
manifest: Optional[Manifest] = None,
) -> Tuple[Optional[AdapterResponse], Dict[str, Any]]:
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Calculate the freshness of sources in dbt, and return it"""
kwargs: Dict[str, Any] = {
"source": source,
Expand Down Expand Up @@ -1238,11 +1248,50 @@ def calculate_freshness(

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness = {
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}
return adapter_response, freshness

def calculate_freshness_from_metadata(
self,
source: BaseRelation,
manifest: Optional[Manifest] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs: Dict[str, Any] = {
"information_schema": source.information_schema_only(),
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs=kwargs, manifest=manifest
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]

try:
row = table[0]
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

if last_modified_val is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")

age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return adapter_response, freshness

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
Expand Down Expand Up @@ -1519,11 +1568,12 @@ def render_model_constraint(cls, constraint: ModelLevelConstraint) -> Optional[s
return None

@classmethod
def has_feature(cls, feature: AdapterFeature) -> bool:
# The base adapter implementation does not implement any optional
# features, so always return false. Adapters which wish to provide
# optional features will have to override this function.
return False
def capabilities(cls) -> CapabilityDict:
return cls._capabilities

@classmethod
def supports(cls, capability: Capability) -> bool:
return bool(cls.capabilities()[capability])


COLUMNS_EQUAL_SQL = """
Expand Down
52 changes: 52 additions & 0 deletions core/dbt/adapters/capability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from dataclasses import dataclass
from enum import Enum
from typing import Optional, DefaultDict, Mapping


class Capability(str, Enum):
"""Enumeration of optional adapter features which can be probed using BaseAdapter.has_feature()"""

SchemaMetadataByRelations = "SchemaMetadataByRelations"
"""Indicates efficient support for retrieving schema metadata for a list of relations, rather than always retrieving
all the relations in a schema."""

TableLastModifiedMetadata = "TableLastModifiedMetadata"
"""Indicates support for determining the time of the last table modification by querying database metadata."""


class Support(str, Enum):
Unknown = "Unknown"
"""The adapter has not declared whether this capability is a feature of the underlying DBMS."""

Unsupported = "Unsupported"
"""This capability is not possible with the underlying DBMS, so the adapter does not implement related macros."""

NotImplemented = "NotImplemented"
"""This capability is available in the underlying DBMS, but support has not yet been implemented in the adapter."""

Versioned = "Versioned"
"""Some versions of the DBMS supported by the adapter support this capability and the adapter has implemented any
macros needed to use it."""

Full = "Full"
"""All versions of the DBMS supported by the adapter support this capability and the adapter has implemented any
macros needed to use it."""


@dataclass
class CapabilitySupport:
support: Support
first_version: Optional[str] = None

def __bool__(self):
return self.support == Support.Versioned or self.support == Support.Full


class CapabilityDict(DefaultDict[Capability, CapabilitySupport]):
def __init__(self, vals: Mapping[Capability, CapabilitySupport]):
super().__init__(self._default)
self.update(vals)

@staticmethod
def _default():
return CapabilitySupport(support=Support.Unknown)
9 changes: 9 additions & 0 deletions core/dbt/clients/agate_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,12 @@ def merge_tables(tables: List[agate.Table]) -> agate.Table:
rows.append(agate.Row(data, column_names))
# _is_fork to tell agate that we already made things into `Row`s.
return agate.Table(rows, column_names, column_types, _is_fork=True)


def get_column_value_uncased(column_name: str, row: agate.Row) -> Any:
"""Get the value of a column in this row, ignoring the casing of the column name."""
for key, value in row.items():
if key.casefold() == column_name.casefold():
return value

raise KeyError
4 changes: 2 additions & 2 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,8 +1302,8 @@ def sources(self):
return []

@property
def has_freshness(self):
return bool(self.freshness) and self.loaded_at_field is not None
def has_freshness(self) -> bool:
return bool(self.freshness)

@property
def search_name(self):
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,19 @@ message WarnStateTargetEqualMsg {
WarnStateTargetEqual data = 2;
}

// I073
message FreshnessConfigProblem {
string msg = 1;
}

message FreshnessConfigProblemMsg {
EventInfo info = 1;
FreshnessConfigProblem data = 2;
}




// M - Deps generation

// M001
Expand Down
10 changes: 9 additions & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ def message(self) -> str:


class WarnStateTargetEqual(WarnLevel):
def code(self):
def code(self) -> str:
return "I072"

def message(self) -> str:
Expand All @@ -1259,6 +1259,14 @@ def message(self) -> str:
)


class FreshnessConfigProblem(WarnLevel):
def code(self) -> str:
return "I073"

def message(self) -> str:
return self.msg


# =======================================================
# M - Deps generation
# =======================================================
Expand Down
1,801 changes: 902 additions & 899 deletions core/dbt/events/types_pb2.py

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions core/dbt/include/global_project/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,12 @@
{{ exceptions.raise_not_implemented(
'get_relations macro not implemented for adapter '+adapter.type()) }}
{% endmacro %}

{% macro get_relation_last_modified(information_schema, relations) %}
{{ return(adapter.dispatch('get_relation_last_modified', 'dbt')(information_schema, relations)) }}
{% endmacro %}

{% macro default__get_relation_last_modified(information_schema, relations) %}
{{ exceptions.raise_not_implemented(
'get_relation_last_modified macro not implemented for adapter ' + adapter.type()) }}
{% endmacro %}
22 changes: 20 additions & 2 deletions core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import itertools
from pathlib import Path
from typing import Iterable, Dict, Optional, Set, Any, List

from dbt.adapters.capability import Capability
from dbt.adapters.factory import get_adapter
from dbt.config import RuntimeConfig
from dbt.context.context_config import (
Expand All @@ -24,8 +26,9 @@
UnparsedColumn,
Time,
)
from dbt.events.functions import warn_or_error
from dbt.events.types import UnusedTables

from dbt.events.functions import warn_or_error, fire_event
from dbt.events.types import UnusedTables, FreshnessConfigProblem
from dbt.exceptions import DbtInternalError
from dbt.node_types import NodeType

Expand Down Expand Up @@ -184,6 +187,21 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
unrendered_config=unrendered_config,
)

if (
parsed_source.freshness
and not parsed_source.loaded_at_field
and not get_adapter(self.root_project).supports(Capability.TableLastModifiedMetadata)
):
# Metadata-based freshness is being used by default for this node,
# but is not available through the configured adapter, so warn the
# user that freshness info will not be collected for this node at
# runtime.
fire_event(
FreshnessConfigProblem(
msg=f"The configured adapter does not support metadata-based freshness. A loaded_at_field must be specified for source '{source.name}'."
)
)

# relation name is added after instantiation because the adapter does
# not provide the relation name for a UnpatchedSourceDefinition object
parsed_source.relation_name = self._get_relation_name(parsed_source)
Expand Down
Loading

0 comments on commit 2e35426

Please sign in to comment.