From aa3a6931dbab91fd3735513e12141137462ddfa9 Mon Sep 17 00:00:00 2001 From: Ravindra Lanka Date: Tue, 29 Mar 2022 05:30:25 -0700 Subject: [PATCH] Fix: Snowflake Table to View lineage --- .../datahub/ingestion/source/sql/snowflake.py | 82 ++++++------------- 1 file changed, 27 insertions(+), 55 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index a05ff5c73319e..34032e67745f9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -187,9 +187,7 @@ def _populate_view_upstream_lineage(self, engine: sqlalchemy.engine.Engine) -> N def _populate_view_downstream_lineage( self, engine: sqlalchemy.engine.Engine ) -> None: - # NOTE: This query captures both the upstream and downstream table lineage for views. - # We need this query to populate the downstream lineage of a view, - # as well as to delete the false direct edges between the upstream and downstream tables of a view. + # This query captures the downstream table lineage for views. # See https://docs.snowflake.com/en/sql-reference/account-usage/access_history.html#usage-notes for current limitations on capturing the lineage for views. # Eg: For viewA->viewB->ViewC->TableD, snowflake does not yet log intermediate view logs, resulting in only the viewA->TableD edge. view_lineage_query: str = """ @@ -198,9 +196,6 @@ def _populate_view_downstream_lineage( vu.value : "objectName" AS view_name, vu.value : "objectDomain" AS view_domain, vu.value : "columns" AS view_columns, - r.value : "objectName" AS upstream_table_name, - r.value : "objectDomain" AS upstream_table_domain, - r.value : "columns" AS upstream_table_columns, w.value : "objectName" AS downstream_table_name, w.value : "objectDomain" AS downstream_table_domain, w.value : "columns" AS downstream_table_columns, @@ -213,34 +208,26 @@ def _populate_view_downstream_lineage( snowflake.account_usage.access_history ) t, lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) vu, - lateral flatten(input => t.BASE_OBJECTS_ACCESSED) r, lateral flatten(input => t.OBJECTS_MODIFIED) w WHERE vu.value : "objectId" IS NOT NULL - AND r.value : "objectId" IS NOT NULL AND w.value : "objectId" IS NOT NULL + AND w.value : "objectName" NOT LIKE '%.GE_TMP_%' + AND w.value : "objectName" NOT LIKE '%.GE_TEMP_%' AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3) AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) ) SELECT view_name, view_columns, - upstream_table_name, - upstream_table_domain, - upstream_table_columns, downstream_table_name, - downstream_table_domain, downstream_table_columns FROM view_lineage_history WHERE view_domain in ('View', 'Materialized view') - AND view_name != upstream_table_name - AND upstream_table_name != downstream_table_name - AND view_name != downstream_table_name QUALIFY ROW_NUMBER() OVER ( PARTITION BY view_name, - upstream_table_name, downstream_table_name ORDER BY query_start_time DESC @@ -253,45 +240,10 @@ def _populate_view_downstream_lineage( ) assert self._lineage_map is not None - num_edges: int = 0 - num_false_edges: int = 0 + self.report.num_view_to_table_edges_scanned = 0 try: - for db_row in engine.execute(view_lineage_query): - # We get two edges here. - # (1) False UpstreamTable->Downstream table that will be deleted. - # (2) View->DownstreamTable that will be added. - - view_name: str = db_row[0].lower().replace('"', "") - upstream_table: str = db_row[2].lower().replace('"', "") - downstream_table: str = db_row[5].lower().replace('"', "") - # (1) Delete false direct edge between upstream_table and downstream_table - prior_edges: List[Tuple[str, str, str]] = self._lineage_map[ - downstream_table - ] - self._lineage_map[downstream_table] = [ - entry - for entry in self._lineage_map[downstream_table] - if entry[0] != upstream_table - ] - for false_edge in set(prior_edges) - set( - self._lineage_map[downstream_table] - ): - logger.debug( - f"False Table->Table edge removed: Lineage[Table(Down)={downstream_table}]:Table(Up)={false_edge}." - ) - num_false_edges += 1 - - # (2) Add view->downstream table lineage. - self._lineage_map[downstream_table].append( - # (, , ) - (view_name, db_row[1], db_row[7]) - ) - logger.debug( - f"View->Table: Lineage[Table(Down)={downstream_table}]:View(Up)={self._lineage_map[downstream_table]}, downstream_domain={db_row[6]}" - ) - num_edges += 1 - + db_rows = engine.execute(view_lineage_query) except Exception as e: self.warn( logger, @@ -299,10 +251,30 @@ def _populate_view_downstream_lineage( f"Extracting the view lineage from Snowflake failed." f"Please check your permissions. Continuing...\nError was {e}.", ) + else: + for db_row in db_rows: + view_name: str = db_row["view_name"].lower().replace('"', "") + downstream_table: str = ( + db_row["downstream_table_name"].lower().replace('"', "") + ) + # Capture view->downstream table lineage. + self._lineage_map[downstream_table].append( + # (, , ) + ( + view_name, + db_row["view_columns"], + db_row["downstream_table_columns"], + ) + ) + self.report.num_view_to_table_edges_scanned += 1 + + logger.debug( + f"View->Table: Lineage[Table(Down)={downstream_table}]:View(Up)={self._lineage_map[downstream_table]}" + ) + logger.info( - f"Found {num_edges} View->Table edges. Removed {num_false_edges} false Table->Table edges." + f"Found {self.report.num_view_to_table_edges_scanned} View->Table edges." ) - self.report.num_view_to_table_edges_scanned = num_edges def _populate_view_lineage(self) -> None: if not self.config.include_view_lineage: