From b65b761fcf156a233b41d397679755c3755c982d Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 12 Apr 2024 12:43:14 -0700 Subject: [PATCH] Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch (#127) --- .../unreleased/Features-20240325-180611.yaml | 6 + dbt/adapters/base/impl.py | 117 ++++++++++++++---- dbt/adapters/capability.py | 3 + 3 files changed, 103 insertions(+), 23 deletions(-) create mode 100644 .changes/unreleased/Features-20240325-180611.yaml diff --git a/.changes/unreleased/Features-20240325-180611.yaml b/.changes/unreleased/Features-20240325-180611.yaml new file mode 100644 index 00000000..2299d2ee --- /dev/null +++ b/.changes/unreleased/Features-20240325-180611.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch +time: 2024-03-25T18:06:11.816163-04:00 +custom: + Author: michelleark + Issue: "138" diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 5b4b8080..3abe5e09 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1062,6 +1062,7 @@ def execute_macro( project: Optional[str] = None, context_override: Optional[Dict[str, Any]] = None, kwargs: Optional[Dict[str, Any]] = None, + needs_conn: bool = False, ) -> AttrDict: """Look macro_name up in the manifest and execute its results. @@ -1074,6 +1075,10 @@ def execute_macro( execution context. :param kwargs: An optional dict of keyword args used to pass to the macro. + : param needs_conn: A boolean that indicates whether the specified macro + requires an open connection to execute. If needs_conn is True, a + connection is expected and opened if necessary. Otherwise (and by default), + no connection is expected prior to executing the macro. """ if kwargs is None: @@ -1106,6 +1111,10 @@ def execute_macro( macro_function = CallableMacroGenerator(macro, macro_context) + if needs_conn: + connection = self.connections.get_thread_connection() + self.connections.open(connection) + with self.connections.exception_handler(f"macro {macro_name}"): result = macro_function(**kwargs) return result @@ -1297,48 +1306,110 @@ def calculate_freshness( } return adapter_response, freshness + def calculate_freshness_from_metadata_batch( + self, + sources: List[BaseRelation], + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[List[Optional[AdapterResponse]], Dict[BaseRelation, FreshnessResponse]]: + """ + Given a list of sources (BaseRelations), calculate the metadata-based freshness in batch. + This method should _not_ execute a warehouse query per source, but rather batch up + the sources into as few requests as possible to minimize the number of roundtrips required + to compute metadata-based freshness for each input source. + + :param sources: The list of sources to calculate metadata-based freshness for + :param macro_resolver: An optional macro_resolver to use for get_relation_last_modified + :return: a tuple where: + * the first element is a list of optional AdapterResponses indicating the response + for each request the method made to compute the freshness for the provided sources. + * the second element is a dictionary mapping an input source BaseRelation to a FreshnessResponse, + if it was possible to calculate a FreshnessResponse for the source. + """ + # Track schema, identifiers of sources for lookup from batch query + schema_identifier_to_source = { + ( + source.path.get_lowered_part(ComponentName.Schema), + source.path.get_lowered_part(ComponentName.Identifier), + ): source + for source in sources + } + + # Group metadata sources by information schema -- one query per information schema will be necessary + sources_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = self._get_catalog_relations_by_info_schema(sources) + + freshness_responses: Dict[BaseRelation, FreshnessResponse] = {} + adapter_responses: List[Optional[AdapterResponse]] = [] + for ( + information_schema, + sources_for_information_schema, + ) in sources_by_info_schema.items(): + result = self.execute_macro( + GET_RELATION_LAST_MODIFIED_MACRO_NAME, + kwargs={ + "information_schema": information_schema, + "relations": sources_for_information_schema, + }, + macro_resolver=macro_resolver, + needs_conn=True, + ) + adapter_response, table = result.response, result.table # type: ignore[attr-defined] + adapter_responses.append(adapter_response) + + for row in table: + raw_relation, freshness_response = self._parse_freshness_row(row, table) + source_relation_for_result = schema_identifier_to_source[raw_relation] + freshness_responses[source_relation_for_result] = freshness_response + + return adapter_responses, freshness_responses + def calculate_freshness_from_metadata( self, source: BaseRelation, macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: - kwargs: Dict[str, Any] = { - "information_schema": source.information_schema_only(), - "relations": [source], - } - result = self.execute_macro( - GET_RELATION_LAST_MODIFIED_MACRO_NAME, - kwargs=kwargs, + adapter_responses, freshness_responses = self.calculate_freshness_from_metadata_batch( + sources=[source], macro_resolver=macro_resolver, ) - adapter_response, table = result.response, result.table # type: ignore[attr-defined] - - try: - from dbt_common.clients.agate_helper import get_column_value_uncased + adapter_response = adapter_responses[0] if adapter_responses else None + return adapter_response, freshness_responses[source] - row = table[0] - last_modified_val = get_column_value_uncased("last_modified", row) - snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) - except Exception: - raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) - - if last_modified_val is None: + def _create_freshness_response( + self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime] + ) -> FreshnessResponse: + if last_modified is None: # Interpret missing value as "infinitely long ago" max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) else: - max_loaded_at = _utc(last_modified_val, None, "last_modified") - - snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at") + max_loaded_at = _utc(last_modified, None, "last_modified") + snapshotted_at = _utc(snapshotted_at, None, "snapshotted_at") age = (snapshotted_at - max_loaded_at).total_seconds() - freshness: FreshnessResponse = { "max_loaded_at": max_loaded_at, "snapshotted_at": snapshotted_at, "age": age, } - return adapter_response, freshness + return freshness + + def _parse_freshness_row(self, row: "agate.Row", table: "agate.Table") -> Tuple[Any, FreshnessResponse]: + from dbt_common.clients.agate_helper import get_column_value_uncased + + try: + last_modified_val = get_column_value_uncased("last_modified", row) + snapshotted_at_val = get_column_value_uncased("snapshotted_at", row) + identifier = get_column_value_uncased("identifier", row) + schema = get_column_value_uncased("schema", row) + except Exception: + raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table) + + freshness_response = self._create_freshness_response( + last_modified_val, + snapshotted_at_val + ) + raw_relation = schema.lower().strip(), identifier.lower().strip() + return raw_relation, freshness_response def pre_model_hook(self, config: Mapping[str, Any]) -> Any: """A hook for running some operation before the model materialization diff --git a/dbt/adapters/capability.py b/dbt/adapters/capability.py index 745cb27a..305604c7 100644 --- a/dbt/adapters/capability.py +++ b/dbt/adapters/capability.py @@ -13,6 +13,9 @@ class Capability(str, Enum): TableLastModifiedMetadata = "TableLastModifiedMetadata" """Indicates support for determining the time of the last table modification by querying database metadata.""" + TableLastModifiedMetadataBatch = "TableLastModifiedMetadataBatch" + """Indicates support for performantly determining the time of the last table modification by querying database metadata in batch.""" + class Support(str, Enum): Unknown = "Unknown"