Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch #127

Merged
merged 29 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2beba0b
extend RelationConfig and MaterializationConfig
MichelleArk Mar 1, 2024
151b131
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 11, 2024
727093c
first pass
MichelleArk Mar 11, 2024
ffaf808
accept information schema in calculate_freshness_from_metadata_batch
MichelleArk Mar 11, 2024
b66644a
implement calculate_freshness_from_metadata in terms of calculate_fre…
MichelleArk Mar 11, 2024
54670e0
add TableLastModifiedMetadataBatch capability
MichelleArk Mar 11, 2024
8046a3e
return batched freshness results keyed by (schema, identifier)
MichelleArk Mar 13, 2024
1dbf3e9
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 13, 2024
2c28576
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 14, 2024
a9cffcd
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 22, 2024
9af8c85
handle queries across information_schema in calculate_freshness_from_…
MichelleArk Mar 25, 2024
82d8217
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 25, 2024
ed81529
refactor _create_freshness_response
MichelleArk Mar 25, 2024
c44779a
make schema_identifier_to_source type-safe
MichelleArk Mar 25, 2024
62be8df
update TableLastModifiedMetadataBatch description
MichelleArk Mar 25, 2024
9a6c829
changelog entry
MichelleArk Mar 25, 2024
17701ad
refactor to _get_catalog_relations_by_info_schema, _parse_freshness_row
MichelleArk Mar 28, 2024
511cf14
sanitize raw_relation for freshness batch calculation
MichelleArk Apr 2, 2024
01f27a5
ensure a connection is open if possible prior to executing macro
MichelleArk Apr 4, 2024
05c7149
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 4, 2024
154e066
fix agate typing
MichelleArk Apr 4, 2024
8c707cc
lazy load agate_helper
MichelleArk Apr 4, 2024
8a1deac
add needs_conn to BaseAdapter.execute_macro
MichelleArk Apr 4, 2024
f3dcac2
docstring
MichelleArk Apr 4, 2024
ebee880
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 5, 2024
02988c8
Merge branch 'main' into batch-metadata-freshness
colin-rogers-dbt Apr 11, 2024
0bbf7ed
cleanup adapter_responses parsing in calculate_freshness_from_metadat…
MichelleArk Apr 11, 2024
a43f04c
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 12, 2024
7cf501e
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ def _get_cache_schemas(self, relation_configs: Iterable[RelationConfig]) -> Set[
populate.
"""
return {
self.Relation.create_from(quoting=self.config, relation_config=relation_config).without_identifier()
self.Relation.create_from(
quoting=self.config, relation_config=relation_config
).without_identifier()
for relation_config in relation_configs
}

Expand Down Expand Up @@ -1285,46 +1287,92 @@ def calculate_freshness(
}
return adapter_response, freshness

def calculate_freshness_from_metadata_batch(
self,
sources: List[BaseRelation],
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], Dict[BaseRelation, FreshnessResponse]]:
# Track schema, identifiers of sources for lookup from batch query
schema_identifier_to_source = {
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
(
source.path.get_lowered_part(ComponentName.Schema),
source.path.get_lowered_part(ComponentName.Identifier),
): source
for source in sources
}

# Group metadata sources by information schema -- one query per information schema will be necessary
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
information_schema_to_metadata_sources: Dict[InformationSchema, List[BaseRelation]] = {}
for source in sources:
information_schema = source.information_schema_only()
if information_schema not in information_schema_to_metadata_sources:
information_schema_to_metadata_sources[information_schema] = [source]
else:
information_schema_to_metadata_sources[information_schema].append(source)

freshness_responses: Dict[BaseRelation, FreshnessResponse] = {}
for (
information_schema,
sources_for_information_schema,
) in information_schema_to_metadata_sources.items():
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME,
kwargs={
"information_schema": information_schema,
"relations": sources_for_information_schema,
},
macro_resolver=macro_resolver,
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved

for row in table:
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
try:
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
identifier = get_column_value_uncased("identifier", row)
schema = get_column_value_uncased("schema", row)
except Exception:
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

freshness_response = self._create_freshness_response(
last_modified_val, snapshotted_at_val
)
source_relation_for_result = schema_identifier_to_source[
(schema.lower(), identifier.lower())
]
freshness_responses[source_relation_for_result] = freshness_response

return adapter_response, freshness_responses

def calculate_freshness_from_metadata(
self,
source: BaseRelation,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs: Dict[str, Any] = {
"information_schema": source.information_schema_only(),
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME,
kwargs=kwargs,
adapter_response, freshness_responses = self.calculate_freshness_from_metadata_batch(
sources=[source],
macro_resolver=macro_resolver,
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]

try:
row = table[0]
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)
return adapter_response, list(freshness_responses.values())[0]
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved

if last_modified_val is None:
def _create_freshness_response(
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]
) -> FreshnessResponse:
if last_modified is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")
max_loaded_at = _utc(last_modified, None, "last_modified")

snapshotted_at = _utc(snapshotted_at, None, "snapshotted_at")
age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return adapter_response, freshness
return freshness

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class Capability(str, Enum):
TableLastModifiedMetadata = "TableLastModifiedMetadata"
"""Indicates support for determining the time of the last table modification by querying database metadata."""

TableLastModifiedMetadataBatch = "TableLastModifiedMetadataBatch"
"""Indicates support for determining the time of the last table modification by querying database metadata in batch."""


class Support(str, Enum):
Unknown = "Unknown"
Expand Down
Loading