Skip to content

Commit

Permalink
fix(ingest): looker cascading derived tables should express lineage t…
Browse files Browse the repository at this point in the history
…o view not underlying table (#3262)
  • Loading branch information
swaroopjagadish authored Sep 18, 2021
1 parent d21d497 commit bb73c46
Show file tree
Hide file tree
Showing 7 changed files with 755 additions and 7 deletions.
17 changes: 10 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,6 @@ class LookerView:
id: LookerViewId
absolute_file_path: str
connection: LookerConnectionDefinition
# project_name: str
# model_name: str
# view_name: str
sql_table_names: List[str]
fields: List[ViewField]
raw_file_content: str
Expand Down Expand Up @@ -663,15 +660,23 @@ def _generate_fully_qualified_name(
return sql_table_name.lower()

def _construct_datalineage_urn(
self, sql_table_name: str, connection_def: LookerConnectionDefinition
self, sql_table_name: str, looker_view: LookerView
) -> str:
logger.debug(f"sql_table_name={sql_table_name}")
connection_def: LookerConnectionDefinition = looker_view.connection

# Check if table name matches cascading derived tables pattern
# derived tables can be referred to using aliases that look like table_name.SQL_TABLE_NAME
# See https://docs.looker.com/data-modeling/learning-lookml/derived-tables#syntax_for_referencing_a_derived_table
if re.fullmatch(r"\w+\.SQL_TABLE_NAME", sql_table_name):
sql_table_name = sql_table_name.lower().split(".")[0]
# upstream dataset is a looker view based on current view id's project and model
view_id = LookerViewId(
project_name=looker_view.id.project_name,
model_name=looker_view.id.model_name,
view_name=sql_table_name,
)
return view_id.get_urn(self.source_config)

# Ensure sql_table_name is in canonical form (add in db, schema names)
sql_table_name = self._generate_fully_qualified_name(
Expand Down Expand Up @@ -725,9 +730,7 @@ def _get_upstream_lineage(self, looker_view: LookerView) -> UpstreamLineage:
sql_table_name = sql_table_name.replace('"', "").replace("`", "")

upstream = UpstreamClass(
dataset=self._construct_datalineage_urn(
sql_table_name, looker_view.connection
),
dataset=self._construct_datalineage_urn(sql_table_name, looker_view),
type=DatasetLineageTypeClass.VIEW,
)
upstreams.append(upstream)
Expand Down
176 changes: 176 additions & 0 deletions metadata-ingestion/tests/integration/lookml/expected_output.json
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,182 @@
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_derived_view,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/looker/lkml_samples/views/my_derived_view"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)",
"type": "VIEW"
}
]
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "my_derived_view",
"platform": "urn:li:dataPlatform:looker",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "country",
"jsonPath": null,
"nullable": false,
"description": "The country",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
}
]
},
"glossaryTerms": null,
"isPartOfKey": false
},
{
"fieldPath": "city",
"jsonPath": null,
"nullable": false,
"description": "City",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
}
]
},
"glossaryTerms": null,
"isPartOfKey": false
},
{
"fieldPath": "timestamp",
"jsonPath": null,
"nullable": false,
"description": "Timestamp of measurement",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "time",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Dimension"
},
{
"tag": "urn:li:tag:Temporal"
}
]
},
"glossaryTerms": null,
"isPartOfKey": false
},
{
"fieldPath": "average_measurement",
"jsonPath": null,
"nullable": false,
"description": "My measurement",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "average",
"recursive": false,
"globalTags": {
"tags": [
{
"tag": "urn:li:tag:Temporal"
}
]
},
"glossaryTerms": null,
"isPartOfKey": false
}
],
"primaryKeys": [],
"foreignKeysSpecs": null
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"looker.file.content": "view: my_derived_view {\n derived_table: {\n sql:\n SELECT\n country,\n city,\n timestamp,\n measurement\n FROM\n ${my_view.SQL_TABLE_NAME} AS my_view ;;\n }\n\n dimension: country {\n type: string\n description: \"The country\"\n sql: ${TABLE}.country ;;\n }\n\n dimension: city {\n type: string\n description: \"City\"\n sql: ${TABLE}.city ;;\n }\n\n dimension_group: timestamp {\n group_label: \"Timestamp\"\n type: time\n description: \"Timestamp of measurement\"\n sql: ${TABLE}.timestamp ;;\n timeframes: [hour, date, week, day_of_week]\n }\n\n measure: average_measurement {\n group_label: \"Measurement\"\n type: average\n description: \"My measurement\"\n sql: ${TABLE}.measurement ;;\n }\n\n}\n",
"looker.file.path": "/bar.view.lkml"
},
"externalUrl": null,
"description": null,
"uri": null,
"tags": []
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "lookml-test",
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
view: my_derived_view {
derived_table: {
sql:
SELECT
country,
city,
timestamp,
measurement
FROM
${my_view.SQL_TABLE_NAME} AS my_view ;;
}

dimension: country {
type: string
description: "The country"
sql: ${TABLE}.country ;;
}

dimension: city {
type: string
description: "City"
sql: ${TABLE}.city ;;
}

dimension_group: timestamp {
group_label: "Timestamp"
type: time
description: "Timestamp of measurement"
sql: ${TABLE}.timestamp ;;
timeframes: [hour, date, week, day_of_week]
}

measure: average_measurement {
group_label: "Measurement"
type: average
description: "My measurement"
sql: ${TABLE}.measurement ;;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
connection: "my_connection"

include: "foo.view.lkml"
include: "bar.view.lkml"

explore: aliased_explore {
from: my_view
Expand Down
Loading

0 comments on commit bb73c46

Please sign in to comment.