diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py index 80be566cdcd46..103f4175a9ccf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py @@ -88,8 +88,7 @@ def column_name_in_sql_attribute(self) -> List[str]: for upstream_field_match in re.finditer(r"\${TABLE}\.[\"]*([\.\w]+)", sql): matched_field = upstream_field_match.group(1) # Remove quotes from field names - matched_field = matched_field.replace('"', "").replace("`", "").lower() - column_names.append(matched_field) + column_names.append(matched_field.replace('"', "").replace("`", "").lower()) return column_names diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 8cec6f2607774..971181e4300d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -25,11 +25,13 @@ LookMLSourceReport, ) from datahub.ingestion.source.looker.urn_functions import get_qualified_table_name +from datahub.sql_parsing.schema_resolver import match_columns_to_schema from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, SqlParsingResult, Urn, + create_and_cache_schema_resolver, create_lineage_sql_parsed_result, ) @@ -200,7 +202,7 @@ def _generate_fully_qualified_name( class AbstractViewUpstream(ABC): """ Implementation of this interface extracts the view upstream as per the way the view is bound to datasets. - For detail explanation please refer lookml_concept_context.LookerViewContext documentation. + For detail explanation, please refer lookml_concept_context.LookerViewContext documentation. """ view_context: LookerViewContext @@ -236,6 +238,47 @@ def get_upstream_dataset_urn(self) -> List[Urn]: def create_fields(self) -> List[ViewField]: return [] # it is for the special case + def create_upstream_column_refs( + self, upstream_urn: str, downstream_looker_columns: List[str] + ) -> List[ColumnRef]: + """ + - **`upstream_urn`**: The URN of the upstream dataset. + + - **`expected_columns`**: These are the columns identified by the Looker connector as belonging to the `upstream_urn` dataset. However, there is potential for human error in specifying the columns of the upstream dataset. For example, a user might declare a column in lowercase, while on the actual platform, it may exist in uppercase, or vice versa. + + - This function ensures consistency in column-level lineage by consulting GMS before creating the final `ColumnRef` instance, avoiding discrepancies. + """ + schema_resolver = create_and_cache_schema_resolver( + platform=self.view_context.view_connection.platform, + platform_instance=self.view_context.view_connection.platform_instance, + env=self.view_context.view_connection.platform_env or self.config.env, + graph=self.ctx.graph, + ) + + urn, schema_info = schema_resolver.resolve_urn(urn=upstream_urn) + + if schema_info: + actual_columns = match_columns_to_schema( + schema_info, downstream_looker_columns + ) + else: + logger.info( + f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef" + ) + actual_columns = [column.lower() for column in downstream_looker_columns] + + upstream_column_refs: List[ColumnRef] = [] + + for column in actual_columns: + upstream_column_refs.append( + ColumnRef( + column=column, + table=upstream_urn, + ) + ) + + return upstream_column_refs + class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC): """ @@ -372,15 +415,12 @@ def get_upstream_column_ref( # in-case of "select * from look_ml_view.SQL_TABLE_NAME" or extra field are defined in the looker view which is # referring to upstream table if self._get_upstream_dataset_urn() and not upstreams_column_refs: - upstreams_column_refs = [ - ColumnRef( - table=self._get_upstream_dataset_urn()[ - 0 - ], # 0th index has table of from clause - column=column, - ) - for column in field_context.column_name_in_sql_attribute() - ] + upstreams_column_refs = self.create_upstream_column_refs( + upstream_urn=self._get_upstream_dataset_urn()[ + 0 + ], # 0th index has table of from clause, + downstream_looker_columns=field_context.column_name_in_sql_attribute(), + ) # fix any derived view reference present in urn upstreams_column_refs = resolve_derived_view_urn_of_col_ref( @@ -487,18 +527,18 @@ def get_upstream_column_ref( return upstream_column_refs explore_urn: str = self._get_upstream_dataset_urn()[0] + expected_columns: List[str] = [] for column in field_context.column_name_in_sql_attribute(): if column in self._get_explore_column_mapping(): explore_column: Dict = self._get_explore_column_mapping()[column] - upstream_column_refs.append( - ColumnRef( - column=explore_column.get("field", explore_column[NAME]), - table=explore_urn, - ) + expected_columns.append( + explore_column.get("field", explore_column[NAME]) ) - return upstream_column_refs + return self.create_upstream_column_refs( + upstream_urn=explore_urn, downstream_looker_columns=expected_columns + ) def get_upstream_dataset_urn(self) -> List[Urn]: return self._get_upstream_dataset_urn() @@ -548,14 +588,10 @@ def __get_upstream_dataset_urn(self) -> Urn: def get_upstream_column_ref( self, field_context: LookerFieldContext ) -> List[ColumnRef]: - upstream_column_ref: List[ColumnRef] = [] - - for column_name in field_context.column_name_in_sql_attribute(): - upstream_column_ref.append( - ColumnRef(table=self._get_upstream_dataset_urn(), column=column_name) - ) - - return upstream_column_ref + return self.create_upstream_column_refs( + upstream_urn=self._get_upstream_dataset_urn(), + downstream_looker_columns=field_context.column_name_in_sql_attribute(), + ) def get_upstream_dataset_urn(self) -> List[Urn]: return [self._get_upstream_dataset_urn()] @@ -609,15 +645,14 @@ def get_upstream_column_ref( self, field_context: LookerFieldContext ) -> List[ColumnRef]: upstream_column_ref: List[ColumnRef] = [] + if not self._get_upstream_dataset_urn(): return upstream_column_ref - for column_name in field_context.column_name_in_sql_attribute(): - upstream_column_ref.append( - ColumnRef(table=self._get_upstream_dataset_urn()[0], column=column_name) - ) - - return upstream_column_ref + return self.create_upstream_column_refs( + upstream_urn=self._get_upstream_dataset_urn()[0], + downstream_looker_columns=field_context.column_name_in_sql_attribute(), + ) def get_upstream_dataset_urn(self) -> List[Urn]: return self._get_upstream_dataset_urn() diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index e3f2fbc786b43..6aa10381a883e 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -123,6 +123,13 @@ def get_urn_for_table( ) return urn + def resolve_urn(self, urn: str) -> Tuple[str, Optional[SchemaInfo]]: + schema_info = self._resolve_schema_info(urn) + if schema_info: + return urn, schema_info + + return urn, None + def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]: urn = self.get_urn_for_table(table) @@ -293,3 +300,19 @@ def _convert_schema_field_list_to_info( def _convert_schema_aspect_to_info(schema_metadata: SchemaMetadataClass) -> SchemaInfo: return _convert_schema_field_list_to_info(schema_metadata.fields) + + +def match_columns_to_schema( + schema_info: SchemaInfo, input_columns: List[str] +) -> List[str]: + column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint + + gms_column_map: Dict[str, str] = { + column.lower(): column for column in column_from_gms + } + + output_columns: List[str] = [ + gms_column_map.get(column.lower(), column) for column in input_columns + ] + + return output_columns diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 4ff68574bf20e..f387618bfaec1 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1181,6 +1181,45 @@ def sqlglot_lineage( ) +@functools.lru_cache(maxsize=128) +def create_and_cache_schema_resolver( + platform: str, + env: str, + graph: Optional[DataHubGraph] = None, + platform_instance: Optional[str] = None, + schema_aware: bool = True, +) -> SchemaResolver: + return create_schema_resolver( + platform=platform, + env=env, + graph=graph, + platform_instance=platform_instance, + schema_aware=schema_aware, + ) + + +def create_schema_resolver( + platform: str, + env: str, + graph: Optional[DataHubGraph] = None, + platform_instance: Optional[str] = None, + schema_aware: bool = True, +) -> SchemaResolver: + if graph and schema_aware: + return graph._make_schema_resolver( + platform=platform, + platform_instance=platform_instance, + env=env, + ) + + return SchemaResolver( + platform=platform, + platform_instance=platform_instance, + env=env, + graph=None, + ) + + def create_lineage_sql_parsed_result( query: str, default_db: Optional[str], @@ -1191,21 +1230,17 @@ def create_lineage_sql_parsed_result( graph: Optional[DataHubGraph] = None, schema_aware: bool = True, ) -> SqlParsingResult: + schema_resolver = create_schema_resolver( + platform=platform, + platform_instance=platform_instance, + env=env, + schema_aware=schema_aware, + graph=graph, + ) + + needs_close: bool = True if graph and schema_aware: needs_close = False - schema_resolver = graph._make_schema_resolver( - platform=platform, - platform_instance=platform_instance, - env=env, - ) - else: - needs_close = True - schema_resolver = SchemaResolver( - platform=platform, - platform_instance=platform_instance, - env=env, - graph=None, - ) try: return sqlglot_lineage( diff --git a/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/data.model.lkml b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/data.model.lkml new file mode 100644 index 0000000000000..95391f6a73e63 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/data.model.lkml @@ -0,0 +1,6 @@ +connection: "my_connection" + +include: "top_10_employee_income_source.view.lkml" + +explore: top_10_employee_income_source { +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/top_10_employee_income_source.view.lkml b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/top_10_employee_income_source.view.lkml new file mode 100644 index 0000000000000..6037bab33c44f --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/top_10_employee_income_source.view.lkml @@ -0,0 +1,18 @@ +view: top_10_employee_income_source { + sql_table_name: "db.public.employee" + ;; + dimension: id { + type: number + sql: ${TABLE}.id ;; + } + + dimension: name { + type: string + sql: ${TABLE}.name ;; + } + + dimension: source { + type: string + sql: ${TABLE}.source ;; + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/gms_schema_resolution_golden.json b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution_golden.json new file mode 100644 index 0000000000000..9b0dd78ca1e8e --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution_golden.json @@ -0,0 +1,358 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "looker", + "env": "PROD", + "project_name": "lkml_samples" + }, + "name": "lkml_samples", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "LookML Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Folders" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "view: top_10_employee_income_source {\n sql_table_name: \"db.public.employee\"\n ;;\n dimension: id {\n type: number\n sql: ${TABLE}.id ;;\n }\n\n dimension: name {\n type: string\n sql: ${TABLE}.name ;;\n }\n\n dimension: source {\n type: string\n sql: ${TABLE}.source ;;\n }\n}", + "viewLanguage": "lookml" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:78f22c19304954b15e8adb1d9809975e" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/Develop/lkml_samples/" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,public.employee,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,public.employee,PROD),Id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,public.employee,PROD),Name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,public.employee,PROD),source)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),source)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "top_10_employee_income_source", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "id", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "name", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "source", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + } + ], + "primaryKeys": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "looker.file.path": "top_10_employee_income_source.view.lkml", + "looker.model": "data" + }, + "name": "top_10_employee_income_source", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Develop" + }, + { + "id": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "urn": "urn:li:container:78f22c19304954b15e8adb1d9809975e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Dimension", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Dimension" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 4cd2777dc7dca..940e7f36675f7 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -1,8 +1,8 @@ import logging import pathlib -from typing import Any, List +from typing import Any, List, Optional, Tuple from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pydantic import pytest @@ -25,6 +25,7 @@ MetadataChangeEventClass, UpstreamLineageClass, ) +from datahub.sql_parsing.schema_resolver import SchemaInfo, SchemaResolver from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import get_current_checkpoint_from_pipeline @@ -62,7 +63,7 @@ def get_default_recipe(output_file_path, base_folder_path): @freeze_time(FROZEN_TIME) def test_lookml_ingest(pytestconfig, tmp_path, mock_time): - """Test backwards compatibility with previous form of config with new flags turned off""" + """Test backwards compatibility with a previous form of config with new flags turned off""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out_file = "expected_output.json" @@ -1013,3 +1014,40 @@ def test_drop_hive(pytestconfig, tmp_path, mock_time): output_path=tmp_path / mce_out_file, golden_path=golden_path, ) + + +@freeze_time(FROZEN_TIME) +def test_gms_schema_resolution(pytestconfig, tmp_path, mock_time): + test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" + mce_out_file = "drop_hive_dot.json" + + new_recipe = get_default_recipe( + f"{tmp_path}/{mce_out_file}", + f"{test_resources_dir}/gms_schema_resolution", + ) + + new_recipe["source"]["config"]["connection_to_platform_map"] = { + "my_connection": "hive" + } + + return_value: Tuple[str, Optional[SchemaInfo]] = ( + "fake_dataset_urn", + { + "Id": "String", + "Name": "String", + "source": "String", + }, + ) + + with patch.object(SchemaResolver, "resolve_urn", return_value=return_value): + pipeline = Pipeline.create(new_recipe) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status(raise_warnings=True) + + golden_path = test_resources_dir / "gms_schema_resolution_golden.json" + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / mce_out_file, + golden_path=golden_path, + ) diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py index e5fa980bec452..67222531d3bc1 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py @@ -1,7 +1,12 @@ -from datahub.sql_parsing.schema_resolver import SchemaResolver, _TableName +from datahub.sql_parsing.schema_resolver import ( + SchemaInfo, + SchemaResolver, + _TableName, + match_columns_to_schema, +) -def test_basic_schema_resolver(): +def create_default_schema_resolver(urn: str) -> SchemaResolver: schema_resolver = SchemaResolver( platform="redshift", env="PROD", @@ -9,18 +14,51 @@ def test_basic_schema_resolver(): ) schema_resolver.add_raw_schema_info( - urn="urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)", + urn=urn, schema_info={"name": "STRING"}, ) + return schema_resolver + + +def test_basic_schema_resolver(): + input_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" + ) + + schema_resolver = create_default_schema_resolver(urn=input_urn) + urn, schema = schema_resolver.resolve_table( _TableName(database="my_db", db_schema="public", table="test_table") ) - assert ( - urn - == "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" + + assert urn == input_urn + + assert schema + + assert schema["name"] + + assert schema_resolver.schema_count() == 1 + + +def test_resolve_urn(): + input_urn: str = ( + "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" + ) + + schema_resolver = create_default_schema_resolver(urn=input_urn) + + schema_resolver.add_raw_schema_info( + urn=input_urn, + schema_info={"name": "STRING"}, ) + + urn, schema = schema_resolver.resolve_urn(urn=input_urn) + + assert urn == input_urn + assert schema + assert schema["name"] assert schema_resolver.schema_count() == 1 @@ -62,3 +100,13 @@ def test_get_urn_for_table_not_lower_should_keep_capital_letters(): == "urn:li:dataset:(urn:li:dataPlatform:mssql,Uppercased-Instance.Database.DataSet.Table,PROD)" ) assert schema_resolver.schema_count() == 0 + + +def test_match_columns_to_schema(): + schema_info: SchemaInfo = {"id": "string", "Name": "string", "Address": "string"} + + output_columns = match_columns_to_schema( + schema_info, input_columns=["Id", "name", "address", "weight"] + ) + + assert output_columns == ["id", "Name", "Address", "weight"]