Skip to content

Commit

Permalink
refactor _create_freshness_response
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Mar 25, 2024
1 parent 82d8217 commit b4cdd48
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,25 +1330,11 @@ def calculate_freshness_from_metadata_batch(
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

if last_modified_val is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")

age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}
freshness_response = self._create_freshness_response(last_modified_val, snapshotted_at_val)
source_relation_for_result = schema_identifier_to_source[
(schema.lower(), identifier.lower())
]
freshness_responses[source_relation_for_result] = freshness
freshness_responses[source_relation_for_result] = freshness_response

return adapter_response, freshness_responses

Expand All @@ -1364,6 +1350,23 @@ def calculate_freshness_from_metadata(
)
return adapter_response, list(freshness_responses.values())[0]

def _create_freshness_response(self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]) -> FreshnessResponse:
if last_modified is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified, None, "last_modified")

snapshotted_at = _utc(snapshotted_at, None, "snapshotted_at")
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return freshness

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
runs. The hook can assume it has a connection available.
Expand Down

0 comments on commit b4cdd48

Please sign in to comment.