Skip to content

Commit

Permalink
test prefer sql parser lineage
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Jul 30, 2024
1 parent 9a533dd commit 83a66ff
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
15 changes: 13 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ def _infer_schemas_and_update_cll( # noqa: C901
# Save the column lineage.
if self.config.include_column_lineage and sql_result:
# We save the raw info here. We use this for supporting `prefer_sql_parser_lineage`.
if depends_on_ephemeral_models:
if not depends_on_ephemeral_models:
node.raw_sql_parsing_result = sql_result

# We use this for error reporting. However, we only want to report errors
Expand Down Expand Up @@ -1847,7 +1847,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
)

cll = None
if self.config.prefer_sql_parser_lineage:
if self.config.prefer_sql_parser_lineage and node.raw_sql_parsing_result:
sql_parsing_result = node.raw_sql_parsing_result
if sql_parsing_result and not sql_parsing_result.debug_info.table_error:
# If we have some table lineage from SQL parsing, use that.
Expand All @@ -1870,10 +1870,21 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
node_urn, column_lineage.downstream.column
)
],
confidenceScore=sql_parsing_result.debug_info.confidence,
)
)

else:
if self.config.prefer_sql_parser_lineage:
if node.upstream_cll:
self.report.report_warning(
"SQL parser lineage is not available for this node, falling back to dbt-based column lineage.",
context=node.dbt_name,
)
else:
# SQL parsing failed entirely, which is already reported above.
pass

cll = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,31 +190,31 @@
"time": 1643871600000,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer_snapshot,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.address,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 1643871600000,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.address,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.city,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 1643871600000,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.city,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 1643871600000,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer_snapshot,PROD)",
"type": "TRANSFORMED"
}
],
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/integration/dbt/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def set_paths(
sources_file="sample_dbt_sources_2.json",
run_results_files=["sample_dbt_run_results_2.json"],
source_config_modifiers={
# "prefer_sql_parser_lineage": True,
"prefer_sql_parser_lineage": True,
"skip_sources_in_lineage": True,
"entities_enabled": {"sources": "NO"},
},
Expand Down

0 comments on commit 83a66ff

Please sign in to comment.