From 989d74d7e2c4e33a8c452717eca6e473c04e5949 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Tue, 21 May 2024 15:20:47 +0530 Subject: [PATCH 01/10] Add default interval config to extract runs and perf timers code --- .../ingestion/source/fivetran/config.py | 22 +++++ .../ingestion/source/fivetran/data_classes.py | 4 +- .../ingestion/source/fivetran/fivetran.py | 9 +- .../source/fivetran/fivetran_log_api.py | 88 +++++++++++-------- .../source/fivetran/fivetran_query.py | 7 +- .../integration/fivetran/test_fivetran.py | 4 +- 6 files changed, 89 insertions(+), 45 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index a689e9ee642aef..2d3977ccfd4388 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -9,6 +9,7 @@ from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin from datahub.configuration.validate_field_rename import pydantic_renamed_field +from datahub.ingestion.api.report import Report from datahub.ingestion.source.bigquery_v2.bigquery_config import ( BigQueryConnectionConfig, ) @@ -20,6 +21,7 @@ StatefulIngestionConfigBase, ) from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig +from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -110,10 +112,26 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict: return values +@dataclass +class MetadataExtractionPerfReport(Report): + connectors_metadata_extraction_sec: PerfTimer = dataclass_field( + default_factory=PerfTimer + ) + connectors_lineage_extraction_sec: PerfTimer = dataclass_field( + default_factory=PerfTimer + ) + connectors_jobs_extraction_sec: PerfTimer = dataclass_field( + default_factory=PerfTimer + ) + + @dataclass class FivetranSourceReport(StaleEntityRemovalSourceReport): connectors_scanned: int = 0 filtered_connectors: List[str] = dataclass_field(default_factory=list) + metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field( + default_factory=MetadataExtractionPerfReport + ) def report_connectors_scanned(self, count: int = 1) -> None: self.connectors_scanned += count @@ -163,3 +181,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin default={}, description="A mapping of destination dataset to platform instance. Use destination id as key.", ) + extract_syncs_for_interval: int = pydantic.Field( + 7, + description="Interval in days to extract connector's syncs from current date.", + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py index 4ae71b990e5cde..18de2b01edd3b7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import List, Optional +from typing import List @dataclass @@ -23,7 +23,7 @@ class Connector: paused: bool sync_frequency: int destination_id: str - user_email: Optional[str] + user_id: str table_lineage: List[TableLineage] jobs: List["Job"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index c8ae779b602b8a..7e90e1c0545a0c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -66,7 +66,7 @@ class FivetranSource(StatefulIngestionSourceBase): platform: str = "fivetran" def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext): - super().__init__(config, ctx) + super(FivetranSource, self).__init__(config, ctx) self.config = config self.report = FivetranSourceReport() @@ -173,11 +173,12 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob: env=self.config.env, platform_instance=self.config.platform_instance, ) + owner_email = self.audit_log.get_user_email(connector.user_id) datajob = DataJob( id=connector.connector_id, flow_urn=dataflow_urn, name=connector.connector_name, - owners={connector.user_email} if connector.user_email else set(), + owners={owner_email} if owner_email else set(), ) job_property_bag: Dict[str, str] = {} @@ -281,7 +282,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: """ logger.info("Fivetran plugin execution is started") connectors = self.audit_log.get_allowed_connectors_list( - self.config.connector_patterns, self.report + self.config.connector_patterns, + self.report, + self.config.extract_syncs_for_interval, ) for connector in connectors: logger.info(f"Processing connector id: {connector.connector_id}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index a9eb59f9297992..7ac7fce75264ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -1,3 +1,4 @@ +import functools import json import logging from typing import Any, Dict, List, Optional, Tuple @@ -76,7 +77,7 @@ def _initialize_fivetran_variables( ) def _query(self, query: str) -> List[Dict]: - logger.debug(f"Query : {query}") + logger.debug("Query : {}".format(query)) resp = self.engine.execute(query) return [row for row in resp] @@ -151,9 +152,13 @@ def _get_table_lineage( return table_lineage_list - def _get_all_connector_sync_logs(self) -> Dict[str, Dict]: + def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]: sync_logs = {} - for row in self._query(self.fivetran_log_query.get_sync_logs_query()): + for row in self._query( + self.fivetran_log_query.get_sync_logs_query().format( + self.fivetran_log_query.db_clause, syncs_interval + ) + ): if row[Constant.CONNECTOR_ID] not in sync_logs: sync_logs[row[Constant.CONNECTOR_ID]] = { row[Constant.SYNC_ID]: { @@ -208,50 +213,61 @@ def _get_jobs_list( ) return jobs - def _get_user_email(self, user_id: Optional[str]) -> Optional[str]: + @functools.lru_cache() + def get_user_email(self, user_id: str) -> Optional[str]: if not user_id: return None user_details = self._query( self.fivetran_log_query.get_user_query(user_id=user_id) ) - if not user_details: return None - return f"{user_details[0][Constant.EMAIL]}" + def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None: + table_lineage_metadata = self._get_connectors_table_lineage_metadata() + column_lineage_metadata = self._get_column_lineage_metadata() + for connector in connectors: + connector.table_lineage = self._get_table_lineage( + column_lineage_metadata=column_lineage_metadata, + table_lineage_result=table_lineage_metadata.get(connector.connector_id), + ) + + def _fill_connectors_jobs( + self, connectors: List[Connector], syncs_interval: int + ) -> None: + sync_logs = self._get_all_connector_sync_logs(syncs_interval) + for connector in connectors: + connector.jobs = self._get_jobs_list(sync_logs.get(connector.connector_id)) + def get_allowed_connectors_list( - self, connector_patterns: AllowDenyPattern, report: FivetranSourceReport + self, + connector_patterns: AllowDenyPattern, + report: FivetranSourceReport, + syncs_interval: int, ) -> List[Connector]: connectors: List[Connector] = [] - sync_logs = self._get_all_connector_sync_logs() - table_lineage_metadata = self._get_connectors_table_lineage_metadata() - column_lineage_metadata = self._get_column_lineage_metadata() - connector_list = self._query(self.fivetran_log_query.get_connectors_query()) - for connector in connector_list: - if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): - report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) - continue - connectors.append( - Connector( - connector_id=connector[Constant.CONNECTOR_ID], - connector_name=connector[Constant.CONNECTOR_NAME], - connector_type=connector[Constant.CONNECTOR_TYPE_ID], - paused=connector[Constant.PAUSED], - sync_frequency=connector[Constant.SYNC_FREQUENCY], - destination_id=connector[Constant.DESTINATION_ID], - user_email=self._get_user_email( - connector[Constant.CONNECTING_USER_ID] - ), - table_lineage=self._get_table_lineage( - column_lineage_metadata=column_lineage_metadata, - table_lineage_result=table_lineage_metadata.get( - connector[Constant.CONNECTOR_ID] - ), - ), - jobs=self._get_jobs_list( - sync_logs.get(connector[Constant.CONNECTOR_ID]) - ), + with report.metadata_extraction_perf.connectors_metadata_extraction_sec: + connector_list = self._query(self.fivetran_log_query.get_connectors_query()) + for connector in connector_list: + if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): + report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) + continue + connectors.append( + Connector( + connector_id=connector[Constant.CONNECTOR_ID], + connector_name=connector[Constant.CONNECTOR_NAME], + connector_type=connector[Constant.CONNECTOR_TYPE_ID], + paused=connector[Constant.PAUSED], + sync_frequency=connector[Constant.SYNC_FREQUENCY], + destination_id=connector[Constant.DESTINATION_ID], + user_id=connector[Constant.CONNECTING_USER_ID], + table_lineage=[], + jobs=[], + ) ) - ) + with report.metadata_extraction_perf.connectors_lineage_extraction_sec: + self._fill_connectors_table_lineage(connectors) + with report.metadata_extraction_perf.connectors_jobs_extraction_sec: + self._fill_connectors_jobs(connectors, syncs_interval) return connectors diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 8f621bc3ffd06e..24a278c29c19c9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -31,14 +31,15 @@ def get_user_query(self, user_id: str) -> str: WHERE id = '{user_id}'""" def get_sync_logs_query(self) -> str: - return f""" + return """ SELECT connector_id, sync_id, message_event, message_data, time_stamp - FROM {self.db_clause}log - WHERE message_event in ('sync_start', 'sync_end')""" + FROM {}log + WHERE message_event in ('sync_start', 'sync_end') + and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{} days'""" def get_table_lineage_query(self) -> str: return f""" diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index de1e5543f4be69..7079ee807052ab 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -98,7 +98,9 @@ def default_query_results( "email": "abc.xyz@email.com", } ] - elif query == fivetran_log_query.get_sync_logs_query(): + elif query == fivetran_log_query.get_sync_logs_query().format( + fivetran_log_query.db_clause, 7 + ): return [ { "connector_id": "calendar_elected", From fa07ace0a75850259e5da5d173432d709daf5fcc Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Tue, 21 May 2024 15:24:57 +0530 Subject: [PATCH 02/10] rename extract_sync_history_for_interval config --- .../src/datahub/ingestion/source/fivetran/config.py | 4 ++-- .../src/datahub/ingestion/source/fivetran/fivetran.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 2d3977ccfd4388..757b2bf9423b4c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -181,7 +181,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin default={}, description="A mapping of destination dataset to platform instance. Use destination id as key.", ) - extract_syncs_for_interval: int = pydantic.Field( + extract_sync_history_for_interval: int = pydantic.Field( 7, - description="Interval in days to extract connector's syncs from current date.", + description="Interval in days to extract connector's sync history from current date.", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 7e90e1c0545a0c..6d443692e02f45 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -284,7 +284,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: connectors = self.audit_log.get_allowed_connectors_list( self.config.connector_patterns, self.report, - self.config.extract_syncs_for_interval, + self.config.extract_sync_history_for_interval, ) for connector in connectors: logger.info(f"Processing connector id: {connector.connector_id}") From 39d19ffdb06ff6c95f5ab0d19d5aa8feba1aa661 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Wed, 22 May 2024 15:35:23 +0530 Subject: [PATCH 03/10] Address review comments --- .../src/datahub/ingestion/source/fivetran/fivetran.py | 2 +- .../datahub/ingestion/source/fivetran/fivetran_log_api.py | 5 +++-- .../src/datahub/ingestion/source/fivetran/fivetran_query.py | 4 ++-- .../tests/integration/fivetran/test_fivetran.py | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 6d443692e02f45..58e7fab03db1e6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -66,7 +66,7 @@ class FivetranSource(StatefulIngestionSourceBase): platform: str = "fivetran" def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext): - super(FivetranSource, self).__init__(config, ctx) + super().__init__(config, ctx) self.config = config self.report = FivetranSourceReport() diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 7ac7fce75264ea..bfe328a1bb8fce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -77,7 +77,7 @@ def _initialize_fivetran_variables( ) def _query(self, query: str) -> List[Dict]: - logger.debug("Query : {}".format(query)) + logger.debug(f"Query : {query}") resp = self.engine.execute(query) return [row for row in resp] @@ -156,7 +156,8 @@ def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]: sync_logs = {} for row in self._query( self.fivetran_log_query.get_sync_logs_query().format( - self.fivetran_log_query.db_clause, syncs_interval + db_clause=self.fivetran_log_query.db_clause, + syncs_interval=syncs_interval, ) ): if row[Constant.CONNECTOR_ID] not in sync_logs: diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 24a278c29c19c9..8da14a274db4b8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -37,9 +37,9 @@ def get_sync_logs_query(self) -> str: message_event, message_data, time_stamp - FROM {}log + FROM {db_clause}log WHERE message_event in ('sync_start', 'sync_end') - and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{} days'""" + and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'""" def get_table_lineage_query(self) -> str: return f""" diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 7079ee807052ab..bf07859f49bfd9 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -99,7 +99,7 @@ def default_query_results( } ] elif query == fivetran_log_query.get_sync_logs_query().format( - fivetran_log_query.db_clause, 7 + db_clause=fivetran_log_query.db_clause, syncs_interval=7 ): return [ { From 7dcce982bae658391de9800829bed2b6075feed8 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Mon, 27 May 2024 12:21:56 +0530 Subject: [PATCH 04/10] Address review comments --- .../src/datahub/ingestion/api/source_helpers.py | 15 ++++++--------- .../datahub/ingestion/source/fivetran/config.py | 4 ++-- .../datahub/ingestion/source/fivetran/fivetran.py | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 91402fa3c62b25..8cc2cc565db85c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -102,7 +102,6 @@ def auto_status_aspect( """ all_urns: Set[str] = set() status_urns: Set[str] = set() - skip_urns: Set[str] = set() for wu in stream: urn = wu.get_urn() all_urns.add(urn) @@ -125,17 +124,15 @@ def auto_status_aspect( else: raise ValueError(f"Unexpected type {type(wu.metadata)}") - if not isinstance( - wu.metadata, MetadataChangeEventClass - ) and not entity_supports_aspect(wu.metadata.entityType, StatusClass): + yield wu + + for urn in sorted(all_urns - status_urns): + entity_type = guess_entity_type(urn) + if not entity_supports_aspect(entity_type, StatusClass): # If any entity does not support aspect 'status' then skip that entity from adding status aspect. # Example like dataProcessInstance doesn't suppport status aspect. # If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance - skip_urns.add(urn) - - yield wu - - for urn in sorted(all_urns - status_urns - skip_urns): + continue yield MetadataChangeProposalWrapper( entityUrn=urn, aspect=StatusClass(removed=False), diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 757b2bf9423b4c..f55d9f89ad97f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -181,7 +181,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin default={}, description="A mapping of destination dataset to platform instance. Use destination id as key.", ) - extract_sync_history_for_interval: int = pydantic.Field( + history_sync_lookback_period: int = pydantic.Field( 7, - description="Interval in days to extract connector's sync history from current date.", + description="The number of days to look back when extracting connectors' sync history.", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 58e7fab03db1e6..56a80a2fd963e7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -284,7 +284,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: connectors = self.audit_log.get_allowed_connectors_list( self.config.connector_patterns, self.report, - self.config.extract_sync_history_for_interval, + self.config.history_sync_lookback_period, ) for connector in connectors: logger.info(f"Processing connector id: {connector.connector_id}") From d93390a80edd1f1b90ff65c0f6e96a4eaf2067ce Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Fri, 31 May 2024 16:12:56 +0530 Subject: [PATCH 05/10] add temporary debug changes --- metadata-ingestion/src/datahub/ingestion/api/source.py | 5 +++++ .../src/datahub/ingestion/source/fivetran/config.py | 4 ++++ .../src/datahub/ingestion/source/fivetran/fivetran.py | 1 + 3 files changed, 10 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 173f7ec59b0c23..d8c713416f642b 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -64,6 +64,9 @@ class SourceCapability(Enum): @dataclass class SourceReport(Report): + dpi_status_list: Dict[str, list] = field( + default_factory=lambda: defaultdict(LossyList) + ) events_produced: int = 0 events_produced_per_sec: int = 0 @@ -101,6 +104,8 @@ def report_workunit(self, wu: WorkUnit) -> None: if aspectName is not None: # usually true self.aspects[entityType][aspectName] += 1 + if entityType == "dataProcessInstance" and aspectName == "status": + self.dpi_status_list[entityType].append(urn) self.aspect_urn_samples[entityType][aspectName].append(urn) if isinstance(mcp.aspect, UpstreamLineageClass): upstream_lineage = cast(UpstreamLineageClass, mcp.aspect) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index f55d9f89ad97f1..da053bb3bc86f4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -128,6 +128,7 @@ class MetadataExtractionPerfReport(Report): @dataclass class FivetranSourceReport(StaleEntityRemovalSourceReport): connectors_scanned: int = 0 + jobs_scanned: int = 0 filtered_connectors: List[str] = dataclass_field(default_factory=list) metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field( default_factory=MetadataExtractionPerfReport @@ -136,6 +137,9 @@ class FivetranSourceReport(StaleEntityRemovalSourceReport): def report_connectors_scanned(self, count: int = 1) -> None: self.connectors_scanned += count + def report_jobs_scanned(self, count: int = 1) -> None: + self.jobs_scanned += count + def report_connectors_dropped(self, model: str) -> None: self.filtered_connectors.append(model) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 56a80a2fd963e7..72e222b6ecf46e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -263,6 +263,7 @@ def _get_connector_workunits( # Map Fivetran's job/sync history entity with Datahub's data process entity for job in connector.jobs: dpi = self._generate_dpi_from_job(job, datajob) + self.report.report_jobs_scanned() yield from self._get_dpi_workunits(job, dpi) @classmethod From 9381ebc652ba43f71cde0e0c4f4695db2b677e56 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Mon, 3 Jun 2024 18:42:00 +0530 Subject: [PATCH 06/10] Add skip soft delete for unsupported status aspect entities --- .../ingestion/source/state/stale_entity_removal_handler.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index 9154a555f23090..58bb7a17efc5fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -6,6 +6,7 @@ import pydantic from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId from datahub.ingestion.api.source_helpers import auto_stale_entity_removal @@ -23,6 +24,7 @@ ) from datahub.metadata.schema_classes import StatusClass from datahub.utilities.lossy_collections import LossyList +from datahub.utilities.urns.urn import guess_entity_type logger: logging.Logger = logging.getLogger(__name__) @@ -276,6 +278,9 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: for urn in last_checkpoint_state.get_urns_not_in( type="*", other_checkpoint_state=cur_checkpoint_state ): + if not entity_supports_aspect(guess_entity_type(urn), StatusClass): + # If any entity does not support aspect 'status' then skip that entity + continue if urn in self._urns_to_skip: logger.debug( f"Not soft-deleting entity {urn} since it is in urns_to_skip" From a36a1529ad845d50c7c7ad18088743ff01b32cc9 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Mon, 3 Jun 2024 23:41:27 +0530 Subject: [PATCH 07/10] revert temporary debug changes --- metadata-ingestion/src/datahub/ingestion/api/source.py | 5 ----- .../src/datahub/ingestion/source/fivetran/config.py | 4 ---- .../src/datahub/ingestion/source/fivetran/fivetran.py | 1 - 3 files changed, 10 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index d8c713416f642b..173f7ec59b0c23 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -64,9 +64,6 @@ class SourceCapability(Enum): @dataclass class SourceReport(Report): - dpi_status_list: Dict[str, list] = field( - default_factory=lambda: defaultdict(LossyList) - ) events_produced: int = 0 events_produced_per_sec: int = 0 @@ -104,8 +101,6 @@ def report_workunit(self, wu: WorkUnit) -> None: if aspectName is not None: # usually true self.aspects[entityType][aspectName] += 1 - if entityType == "dataProcessInstance" and aspectName == "status": - self.dpi_status_list[entityType].append(urn) self.aspect_urn_samples[entityType][aspectName].append(urn) if isinstance(mcp.aspect, UpstreamLineageClass): upstream_lineage = cast(UpstreamLineageClass, mcp.aspect) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index da053bb3bc86f4..f55d9f89ad97f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -128,7 +128,6 @@ class MetadataExtractionPerfReport(Report): @dataclass class FivetranSourceReport(StaleEntityRemovalSourceReport): connectors_scanned: int = 0 - jobs_scanned: int = 0 filtered_connectors: List[str] = dataclass_field(default_factory=list) metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field( default_factory=MetadataExtractionPerfReport @@ -137,9 +136,6 @@ class FivetranSourceReport(StaleEntityRemovalSourceReport): def report_connectors_scanned(self, count: int = 1) -> None: self.connectors_scanned += count - def report_jobs_scanned(self, count: int = 1) -> None: - self.jobs_scanned += count - def report_connectors_dropped(self, model: str) -> None: self.filtered_connectors.append(model) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 72e222b6ecf46e..56a80a2fd963e7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -263,7 +263,6 @@ def _get_connector_workunits( # Map Fivetran's job/sync history entity with Datahub's data process entity for job in connector.jobs: dpi = self._generate_dpi_from_job(job, datajob) - self.report.report_jobs_scanned() yield from self._get_dpi_workunits(job, dpi) @classmethod From 7aa8f3e9640b94cd192e8c335ec92bc0e8f34568 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Tue, 4 Jun 2024 12:05:43 +0530 Subject: [PATCH 08/10] Add report var for last state unremovable entities --- .../source/state/stale_entity_removal_handler.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index 58bb7a17efc5fc..e5b5957e45f895 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -50,10 +50,14 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig): @dataclass class StaleEntityRemovalSourceReport(StatefulIngestionReport): soft_deleted_stale_entities: LossyList[str] = field(default_factory=LossyList) + last_state_unremovable_entities: LossyList[str] = field(default_factory=LossyList) def report_stale_entity_soft_deleted(self, urn: str) -> None: self.soft_deleted_stale_entities.append(urn) + def report_last_state_unremovable_entities(self, urn: str) -> None: + self.last_state_unremovable_entities.append(urn) + class StaleEntityRemovalHandler( StatefulIngestionUsecaseHandlerBase["GenericCheckpointState"] @@ -274,12 +278,16 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: self.add_entity_to_state("", urn) return + report = self.source.get_report() + assert isinstance(report, StaleEntityRemovalSourceReport) + # Everything looks good, emit the soft-deletion workunits for urn in last_checkpoint_state.get_urns_not_in( type="*", other_checkpoint_state=cur_checkpoint_state ): if not entity_supports_aspect(guess_entity_type(urn), StatusClass): - # If any entity does not support aspect 'status' then skip that entity + # If any entity does not support aspect 'status' then skip that entity urn + report.report_last_state_unremovable_entities(urn) continue if urn in self._urns_to_skip: logger.debug( From ee994cf95fa4f55a7d897f4aef414b75640923ed Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Tue, 4 Jun 2024 17:19:07 +0530 Subject: [PATCH 09/10] add test cases for last state non deletable entities --- .../state/stale_entity_removal_handler.py | 8 ++-- .../state/golden_test_checkpoint_state.json | 2 +- ...n_test_checkpoint_state_after_deleted.json | 2 +- .../state/golden_test_stateful_ingestion.json | 22 ++++++++++ ...test_stateful_ingestion_after_deleted.json | 22 ++++++++++ .../state/test_stateful_ingestion.py | 42 ++++++++++++++++++- 6 files changed, 91 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index e5b5957e45f895..bfc15acf725107 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -50,13 +50,13 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig): @dataclass class StaleEntityRemovalSourceReport(StatefulIngestionReport): soft_deleted_stale_entities: LossyList[str] = field(default_factory=LossyList) - last_state_unremovable_entities: LossyList[str] = field(default_factory=LossyList) + last_state_non_deletable_entities: LossyList[str] = field(default_factory=LossyList) def report_stale_entity_soft_deleted(self, urn: str) -> None: self.soft_deleted_stale_entities.append(urn) - def report_last_state_unremovable_entities(self, urn: str) -> None: - self.last_state_unremovable_entities.append(urn) + def report_last_state_non_deletable_entities(self, urn: str) -> None: + self.last_state_non_deletable_entities.append(urn) class StaleEntityRemovalHandler( @@ -287,7 +287,7 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: ): if not entity_supports_aspect(guess_entity_type(urn), StatusClass): # If any entity does not support aspect 'status' then skip that entity urn - report.report_last_state_unremovable_entities(urn) + report.report_last_state_non_deletable_entities(urn) continue if urn in self._urns_to_skip: logger.debug( diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json index fcf73d9614f242..ce03804279097f 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json @@ -17,7 +17,7 @@ "state": { "formatVersion": "1.0", "serde": "utf-8", - "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}" + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json index 5477af72a1939c..6a00e67a2ca216 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json @@ -17,7 +17,7 @@ "state": { "formatVersion": "1.0", "serde": "utf-8", - "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}" + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json index 4a77651c930667..c5d0df1aeb59b5 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json @@ -46,5 +46,27 @@ "runId": "dummy-test-stateful-ingestion", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "job1", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json index 9d6f755374462b..c1bdc8ffeee052 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json @@ -31,6 +31,28 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "job2", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py index 50d9b86b3a0171..e3a2a6cccea794 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py @@ -1,3 +1,4 @@ +import time from dataclasses import dataclass, field as dataclass_field from typing import Any, Dict, Iterable, List, Optional, cast from unittest import mock @@ -7,6 +8,7 @@ from freezegun import freeze_time from pydantic import Field +from datahub.api.entities.dataprocess.dataprocess_instance import DataProcessInstance from datahub.configuration.common import AllowDenyPattern from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -24,7 +26,10 @@ StatefulIngestionConfigBase, StatefulIngestionSourceBase, ) -from datahub.metadata.schema_classes import StatusClass +from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import ( + DataProcessInstanceProperties, +) +from datahub.metadata.schema_classes import AuditStampClass, StatusClass from datahub.utilities.urns.dataset_urn import DatasetUrn from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import ( @@ -62,6 +67,10 @@ class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): default=False, description="Should this dummy source report a failure.", ) + dpi_id_to_ingest: Optional[str] = Field( + default=None, + description="Data process instance id to ingest.", + ) class DummySource(StatefulIngestionSourceBase): @@ -109,6 +118,24 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: aspect=StatusClass(removed=False), ).as_workunit() + if self.source_config.dpi_id_to_ingest: + dpi = DataProcessInstance( + id=self.source_config.dpi_id_to_ingest, + orchestrator="dummy", + ) + + yield MetadataChangeProposalWrapper( + entityUrn=str(dpi.urn), + aspect=DataProcessInstanceProperties( + name=dpi.id, + created=AuditStampClass( + time=int(time.time() * 1000), + actor="urn:li:corpuser:datahub", + ), + type=dpi.type, + ), + ).as_workunit() + if self.source_config.report_failure: self.reporter.report_failure("Dummy error", "Error") @@ -152,6 +179,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): "stateful_ingestion": { "enabled": True, "remove_stale_metadata": True, + "fail_safe_threshold": 100, "state_provider": { "type": "file", "config": { @@ -159,6 +187,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): }, }, }, + "dpi_id_to_ingest": "job1", }, }, "sink": { @@ -207,6 +236,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): pipeline_run2_config["source"]["config"]["dataset_patterns"] = { "allow": ["dummy_dataset1", "dummy_dataset2"], } + pipeline_run2_config["source"]["config"]["dpi_id_to_ingest"] = "job2" pipeline_run2_config["sink"]["config"][ "filename" ] = f"{tmp_path}/{output_file_name_after_deleted}" @@ -253,6 +283,16 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): ] assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns) + report = pipeline_run2.source.get_report() + assert isinstance(report, StaleEntityRemovalSourceReport) + # assert report last ingestion state non_deletable entity urns + non_deletable_urns: List[str] = [ + "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", + ] + assert sorted(non_deletable_urns) == sorted( + report.last_state_non_deletable_entities + ) + @freeze_time(FROZEN_TIME) def test_stateful_ingestion_failure(pytestconfig, tmp_path, mock_time): From bc42e878f22d6808baba29e445f92a9fe5f1c6f0 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Wed, 5 Jun 2024 10:24:38 +0530 Subject: [PATCH 10/10] Address review comments --- .../ingestion/source/fivetran/fivetran_log_api.py | 13 +++++++------ .../ingestion/source/fivetran/fivetran_query.py | 5 ++--- .../source/state/stale_entity_removal_handler.py | 1 + .../tests/integration/fivetran/test_fivetran.py | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index bfe328a1bb8fce..51ef45c500c350 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -215,15 +215,16 @@ def _get_jobs_list( return jobs @functools.lru_cache() + def _get_users(self) -> Dict[str, str]: + users = self._query(self.fivetran_log_query.get_users_query()) + if not users: + return {} + return {user[Constant.USER_ID]: user[Constant.EMAIL] for user in users} + def get_user_email(self, user_id: str) -> Optional[str]: if not user_id: return None - user_details = self._query( - self.fivetran_log_query.get_user_query(user_id=user_id) - ) - if not user_details: - return None - return f"{user_details[0][Constant.EMAIL]}" + return self._get_users().get(user_id) def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None: table_lineage_metadata = self._get_connectors_table_lineage_metadata() diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 8da14a274db4b8..0c8ade26943490 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -21,14 +21,13 @@ def get_connectors_query(self) -> str: FROM {self.db_clause}connector WHERE _fivetran_deleted = FALSE""" - def get_user_query(self, user_id: str) -> str: + def get_users_query(self) -> str: return f""" SELECT id as user_id, given_name, family_name, email - FROM {self.db_clause}user - WHERE id = '{user_id}'""" + FROM {self.db_clause}user""" def get_sync_logs_query(self) -> str: return """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index bfc15acf725107..97c9dd9e245ddf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -290,6 +290,7 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: report.report_last_state_non_deletable_entities(urn) continue if urn in self._urns_to_skip: + report.report_last_state_non_deletable_entities(urn) logger.debug( f"Not soft-deleting entity {urn} since it is in urns_to_skip" ) diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index bf07859f49bfd9..642d4ca992ca03 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -89,7 +89,7 @@ def default_query_results( "destination_column_name": "name", }, ] - elif query == fivetran_log_query.get_user_query("reapply_phone"): + elif query == fivetran_log_query.get_users_query(): return [ { "user_id": "reapply_phone",