Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix(ingestion): Snowflake Table to View lineage #4520

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 27 additions & 55 deletions metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ def _populate_view_upstream_lineage(self, engine: sqlalchemy.engine.Engine) -> N
def _populate_view_downstream_lineage(
self, engine: sqlalchemy.engine.Engine
) -> None:
# NOTE: This query captures both the upstream and downstream table lineage for views.
# We need this query to populate the downstream lineage of a view,
# as well as to delete the false direct edges between the upstream and downstream tables of a view.
# This query captures the downstream table lineage for views.
# See https://docs.snowflake.com/en/sql-reference/account-usage/access_history.html#usage-notes for current limitations on capturing the lineage for views.
# Eg: For viewA->viewB->ViewC->TableD, snowflake does not yet log intermediate view logs, resulting in only the viewA->TableD edge.
view_lineage_query: str = """
Expand All @@ -198,9 +196,6 @@ def _populate_view_downstream_lineage(
vu.value : "objectName" AS view_name,
vu.value : "objectDomain" AS view_domain,
vu.value : "columns" AS view_columns,
r.value : "objectName" AS upstream_table_name,
r.value : "objectDomain" AS upstream_table_domain,
r.value : "columns" AS upstream_table_columns,
w.value : "objectName" AS downstream_table_name,
w.value : "objectDomain" AS downstream_table_domain,
w.value : "columns" AS downstream_table_columns,
Expand All @@ -213,34 +208,26 @@ def _populate_view_downstream_lineage(
snowflake.account_usage.access_history
) t,
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) vu,
lateral flatten(input => t.BASE_OBJECTS_ACCESSED) r,
lateral flatten(input => t.OBJECTS_MODIFIED) w
WHERE
vu.value : "objectId" IS NOT NULL
AND r.value : "objectId" IS NOT NULL
AND w.value : "objectId" IS NOT NULL
AND w.value : "objectName" NOT LIKE '%.GE_TMP_%'
AND w.value : "objectName" NOT LIKE '%.GE_TEMP_%'
AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
)
SELECT
view_name,
view_columns,
upstream_table_name,
upstream_table_domain,
upstream_table_columns,
downstream_table_name,
downstream_table_domain,
downstream_table_columns
FROM
view_lineage_history
WHERE
view_domain in ('View', 'Materialized view')
AND view_name != upstream_table_name
AND upstream_table_name != downstream_table_name
AND view_name != downstream_table_name
QUALIFY ROW_NUMBER() OVER (
PARTITION BY view_name,
upstream_table_name,
downstream_table_name
ORDER BY
query_start_time DESC
Expand All @@ -253,56 +240,41 @@ def _populate_view_downstream_lineage(
)

assert self._lineage_map is not None
num_edges: int = 0
num_false_edges: int = 0
self.report.num_view_to_table_edges_scanned = 0

try:
for db_row in engine.execute(view_lineage_query):
# We get two edges here.
# (1) False UpstreamTable->Downstream table that will be deleted.
# (2) View->DownstreamTable that will be added.

view_name: str = db_row[0].lower().replace('"', "")
upstream_table: str = db_row[2].lower().replace('"', "")
downstream_table: str = db_row[5].lower().replace('"', "")
# (1) Delete false direct edge between upstream_table and downstream_table
prior_edges: List[Tuple[str, str, str]] = self._lineage_map[
downstream_table
]
self._lineage_map[downstream_table] = [
entry
for entry in self._lineage_map[downstream_table]
if entry[0] != upstream_table
]
for false_edge in set(prior_edges) - set(
self._lineage_map[downstream_table]
):
logger.debug(
f"False Table->Table edge removed: Lineage[Table(Down)={downstream_table}]:Table(Up)={false_edge}."
)
num_false_edges += 1

# (2) Add view->downstream table lineage.
self._lineage_map[downstream_table].append(
# (<upstream_view_name>, <json_list_of_upstream_view_columns>, <json_list_of_downstream_columns>)
(view_name, db_row[1], db_row[7])
)
logger.debug(
f"View->Table: Lineage[Table(Down)={downstream_table}]:View(Up)={self._lineage_map[downstream_table]}, downstream_domain={db_row[6]}"
)
num_edges += 1

db_rows = engine.execute(view_lineage_query)
except Exception as e:
self.warn(
logger,
"view_downstream_lineage",
f"Extracting the view lineage from Snowflake failed."
f"Please check your permissions. Continuing...\nError was {e}.",
)
else:
for db_row in db_rows:
view_name: str = db_row["view_name"].lower().replace('"', "")
downstream_table: str = (
db_row["downstream_table_name"].lower().replace('"', "")
)
# Capture view->downstream table lineage.
self._lineage_map[downstream_table].append(
# (<upstream_view_name>, <json_list_of_upstream_view_columns>, <json_list_of_downstream_columns>)
(
view_name,
db_row["view_columns"],
db_row["downstream_table_columns"],
)
)
self.report.num_view_to_table_edges_scanned += 1

logger.debug(
f"View->Table: Lineage[Table(Down)={downstream_table}]:View(Up)={self._lineage_map[downstream_table]}"
)

logger.info(
f"Found {num_edges} View->Table edges. Removed {num_false_edges} false Table->Table edges."
f"Found {self.report.num_view_to_table_edges_scanned} View->Table edges."
)
self.report.num_view_to_table_edges_scanned = num_edges

def _populate_view_lineage(self) -> None:
if not self.config.include_view_lineage:
Expand Down