Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/lookml): resolve CLL issue caused by column name casing. #11876

Merged
merged 24 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7cdff5a
configurable: convert upstream column to lowercase
sid-acryl Nov 18, 2024
69b828b
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Nov 23, 2024
3d39e4a
wip
sid-acryl Nov 23, 2024
b33a0a9
existing test-case are working
sid-acryl Nov 24, 2024
ff3077e
test case for column resolution from gms
sid-acryl Nov 24, 2024
c0c1513
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Nov 24, 2024
e877cd0
address review comments
sid-acryl Dec 1, 2024
79b4f74
Merge branch 'cus3139-looker-ingestion' of https://github.com/sid-acr…
sid-acryl Dec 1, 2024
1639ec7
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 1, 2024
bfa92e0
remove graph=graph
sid-acryl Dec 2, 2024
66d825d
fix test-case
sid-acryl Dec 2, 2024
7e55a51
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 2, 2024
9caa198
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 3, 2024
95b5bb6
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 3, 2024
8f43c7a
address review comments
sid-acryl Dec 6, 2024
f8d723d
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 6, 2024
a6382f1
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 9, 2024
bdfbbe4
address review comments
sid-acryl Dec 9, 2024
afca825
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 10, 2024
bc0f119
address review comments
sid-acryl Dec 10, 2024
d6f04bf
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 11, 2024
22ffa40
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 11, 2024
72169ee
address review comments
sid-acryl Dec 11, 2024
493b513
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
LookMLSourceReport,
)
from datahub.ingestion.source.looker.urn_functions import get_qualified_table_name
from datahub.sql_parsing.schema_resolver import match_columns_to_schema
from datahub.sql_parsing.sqlglot_lineage import (
ColumnLineageInfo,
ColumnRef,
SqlParsingResult,
Urn,
create_and_cache_schema_resolver,
create_lineage_sql_parsed_result,
match_columns_to_schema,
)

logger = logging.getLogger(__name__)
Expand Down
16 changes: 16 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,19 @@ def _convert_schema_field_list_to_info(

def _convert_schema_aspect_to_info(schema_metadata: SchemaMetadataClass) -> SchemaInfo:
return _convert_schema_field_list_to_info(schema_metadata.fields)


def match_columns_to_schema(
schema_info: SchemaInfo, input_columns: List[str]
) -> List[str]:
column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint

gms_column_map: Dict[str, str] = {
column.lower(): column for column in column_from_gms
}

output_columns: List[str] = [
gms_column_map.get(column.lower(), column) for column in input_columns
]

return output_columns
14 changes: 0 additions & 14 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,20 +1275,6 @@ def infer_output_schema(result: SqlParsingResult) -> Optional[List[SchemaFieldCl
return output_schema


def match_columns_to_schema(
schema_info: SchemaInfo, input_columns: List[str]
) -> List[str]:
column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint

output_columns: List[str] = [
column
for column in column_from_gms
if column.lower() in map(str.lower, input_columns)
]

return output_columns


def view_definition_lineage_helper(
result: SqlParsingResult, view_urn: str
) -> SqlParsingResult:
Expand Down
76 changes: 70 additions & 6 deletions metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,64 @@
from datahub.sql_parsing.schema_resolver import SchemaResolver, _TableName
from datahub.sql_parsing.schema_resolver import (
SchemaInfo,
SchemaResolver,
_TableName,
match_columns_to_schema,
)


def test_basic_schema_resolver():
def create_default_schema_resolver(urn: str) -> SchemaResolver:
schema_resolver = SchemaResolver(
platform="redshift",
env="PROD",
graph=None,
)

schema_resolver.add_raw_schema_info(
urn="urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)",
urn=urn,
schema_info={"name": "STRING"},
)

return schema_resolver


def test_basic_schema_resolver():
input_urn = (
"urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)"
)

schema_resolver = create_default_schema_resolver(urn=input_urn)

urn, schema = schema_resolver.resolve_table(
_TableName(database="my_db", db_schema="public", table="test_table")
)
assert (
urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)"

assert urn == input_urn

assert schema

assert schema["name"]

assert schema_resolver.schema_count() == 1


def test_resolve_urn():
input_urn: str = (
"urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)"
)

schema_resolver = create_default_schema_resolver(urn=input_urn)

schema_resolver.add_raw_schema_info(
urn=input_urn,
schema_info={"name": "STRING"},
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this test do that test_basic_schema_resolver does not do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is calling resolve_urn instead of resolve_table


urn, schema = schema_resolver.resolve_urn(urn=input_urn)

assert urn == input_urn

assert schema

assert schema["name"]

assert schema_resolver.schema_count() == 1
Expand Down Expand Up @@ -62,3 +100,29 @@ def test_get_urn_for_table_not_lower_should_keep_capital_letters():
== "urn:li:dataset:(urn:li:dataPlatform:mssql,Uppercased-Instance.Database.DataSet.Table,PROD)"
)
assert schema_resolver.schema_count() == 0


def test_match_columns_to_schema():
input_urn = (
"urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)"
)

schema_resolver = create_default_schema_resolver(urn=input_urn)

schema_resolver.add_raw_schema_info(
urn=input_urn,
schema_info={"address": "STRING"},
)

schema_resolver.add_raw_schema_info(
urn=input_urn,
schema_info={"Id": "STRING"},
)

schema_info: SchemaInfo = {"id": "string", "Name": "string", "Address": "string"}

output_columns = match_columns_to_schema(
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
schema_info, input_columns=["Id", "name", "address", "weight"]
)

assert output_columns == ["id", "Name", "Address", "weight"]
Loading