From 5066942f3d0c4f1522efb4c9c069ec8d3fb29200 Mon Sep 17 00:00:00 2001 From: sid-acryl <155424659+sid-acryl@users.noreply.github.com> Date: Wed, 27 Nov 2024 23:02:24 +0530 Subject: [PATCH] refactor(ingest/powerbi): organize code within the module based on responsibilities (#11924) --- metadata-ingestion/setup.py | 2 +- .../ingestion/source/powerbi/__init__.py | 1 - .../ingestion/source/powerbi/config.py | 6 +- .../source/powerbi/m_query/data_classes.py | 36 +- .../source/powerbi/m_query/parser.py | 9 +- .../source/powerbi/m_query/pattern_handler.py | 920 +++++++++++++++++ .../source/powerbi/m_query/resolver.py | 954 +----------------- .../source/powerbi/m_query/validator.py | 12 +- .../ingestion/source/powerbi/powerbi.py | 18 +- .../integration/powerbi/test_m_parser.py | 52 +- .../tests/unit/test_powerbi_parser.py | 6 +- 11 files changed, 1042 insertions(+), 974 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 8ae112c0ab0b2..74c2e611cf68f 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -773,7 +773,7 @@ "trino = datahub.ingestion.source.sql.trino:TrinoSource", "starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource", "nifi = datahub.ingestion.source.nifi:NifiSource", - "powerbi = datahub.ingestion.source.powerbi:PowerBiDashboardSource", + "powerbi = datahub.ingestion.source.powerbi.powerbi:PowerBiDashboardSource", "powerbi-report-server = datahub.ingestion.source.powerbi_report_server:PowerBiReportServerDashboardSource", "iceberg = datahub.ingestion.source.iceberg.iceberg:IcebergSource", "vertica = datahub.ingestion.source.sql.vertica:VerticaSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/__init__.py index 1068f335e8f8e..e69de29bb2d1d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/__init__.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/__init__.py @@ -1 +0,0 @@ -from datahub.ingestion.source.powerbi.powerbi import PowerBiDashboardSource diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 91fa2e96be2cc..f7458c4eb4d5b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -173,7 +173,7 @@ class SupportedDataPlatform(Enum): datahub_data_platform_name="redshift", ) - DATABRICK_SQL = DataPlatformPair( + DATABRICKS_SQL = DataPlatformPair( powerbi_data_platform_name="Databricks", datahub_data_platform_name="databricks" ) @@ -313,8 +313,8 @@ class PowerBiDashboardSourceConfig( " Note: This field works in conjunction with 'workspace_type_filter' and both must be considered when filtering workspaces.", ) - # Dataset type mapping PowerBI support many type of data-sources. Here user need to define what type of PowerBI - # DataSource need to be mapped to corresponding DataHub Platform DataSource. For example PowerBI `Snowflake` is + # Dataset type mapping PowerBI support many type of data-sources. Here user needs to define what type of PowerBI + # DataSource needs to be mapped to corresponding DataHub Platform DataSource. For example, PowerBI `Snowflake` is # mapped to DataHub `snowflake` PowerBI `PostgreSQL` is mapped to DataHub `postgres` and so on. dataset_type_mapping: Union[ Dict[str, str], Dict[str, PlatformDetail] diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py index bb0c0c2f79bbd..f1691b5df68a9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py @@ -1,10 +1,14 @@ import os from abc import ABC from dataclasses import dataclass -from typing import Any, Dict, Optional +from enum import Enum +from typing import Any, Dict, List, Optional from lark import Tree +from datahub.ingestion.source.powerbi.config import DataPlatformPair +from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo + TRACE_POWERBI_MQUERY_PARSER = os.getenv("DATAHUB_TRACE_POWERBI_MQUERY_PARSER", False) @@ -30,7 +34,7 @@ class IdentifierAccessor(AbstractIdentifierAccessor): "[Schema="public",Item="order_date"]" is "items" in ItemSelector. Data of items varies as per DataSource - "public_order_date" is in "next" of ItemSelector. The "next" will be None if this identifier is leaf i.e. table + "public_order_date" is in "next" of ItemSelector. The "next" will be None if this identifier is leaf i.e., table """ @@ -53,3 +57,31 @@ class ReferencedTable: database: str schema: str table: str + + +@dataclass +class DataPlatformTable: + data_platform_pair: DataPlatformPair + urn: str + + +@dataclass +class Lineage: + upstreams: List[DataPlatformTable] + column_lineage: List[ColumnLineageInfo] + + @staticmethod + def empty() -> "Lineage": + return Lineage(upstreams=[], column_lineage=[]) + + +class FunctionName(Enum): + NATIVE_QUERY = "Value.NativeQuery" + POSTGRESQL_DATA_ACCESS = "PostgreSQL.Database" + ORACLE_DATA_ACCESS = "Oracle.Database" + SNOWFLAKE_DATA_ACCESS = "Snowflake.Databases" + MSSQL_DATA_ACCESS = "Sql.Database" + DATABRICK_DATA_ACCESS = "Databricks.Catalogs" + GOOGLE_BIGQUERY_DATA_ACCESS = "GoogleBigQuery.Database" + AMAZON_REDSHIFT_DATA_ACCESS = "AmazonRedshift.Database" + DATABRICK_MULTI_CLOUD_DATA_ACCESS = "DatabricksMultiCloud.Catalogs" diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index 97698a3d0d56c..2a5de7494920b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -7,6 +7,7 @@ import lark from lark import Lark, Tree +import datahub.ingestion.source.powerbi.m_query.data_classes from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.powerbi.config import ( PowerBiDashboardSourceConfig, @@ -65,7 +66,7 @@ def get_upstream_tables( ctx: PipelineContext, config: PowerBiDashboardSourceConfig, parameters: Dict[str, str] = {}, -) -> List[resolver.Lineage]: +) -> List[datahub.ingestion.source.powerbi.m_query.data_classes.Lineage]: if table.expression is None: logger.debug(f"There is no M-Query expression in table {table.full_name}") return [] @@ -127,12 +128,14 @@ def get_upstream_tables( reporter.m_query_parse_successes += 1 try: - lineage: List[resolver.Lineage] = resolver.MQueryResolver( + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = resolver.MQueryResolver( table=table, parse_tree=parse_tree, reporter=reporter, parameters=parameters, - ).resolve_to_data_platform_table_list( + ).resolve_to_lineage( ctx=ctx, config=config, platform_instance_resolver=platform_instance_resolver, diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py new file mode 100644 index 0000000000000..13d97a7029029 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py @@ -0,0 +1,920 @@ +import logging +from abc import ABC, abstractmethod +from enum import Enum +from typing import Dict, List, Optional, Tuple, Type, Union, cast + +from lark import Tree + +from datahub.emitter import mce_builder as builder +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.powerbi.config import ( + Constant, + DataBricksPlatformDetail, + DataPlatformPair, + PlatformDetail, + PowerBiDashboardSourceConfig, + PowerBiDashboardSourceReport, + PowerBIPlatformDetail, + SupportedDataPlatform, +) +from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( + AbstractDataPlatformInstanceResolver, +) +from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function +from datahub.ingestion.source.powerbi.m_query.data_classes import ( + AbstractIdentifierAccessor, + DataAccessFunctionDetail, + DataPlatformTable, + FunctionName, + IdentifierAccessor, + Lineage, + ReferencedTable, +) +from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table +from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult + +logger = logging.getLogger(__name__) + + +def get_next_item(items: List[str], item: str) -> Optional[str]: + if item in items: + try: + index = items.index(item) + return items[index + 1] + except IndexError: + logger.debug(f'item:"{item}", not found in item-list: {items}') + return None + + +def urn_to_lowercase(value: str, flag: bool) -> str: + if flag is True: + return value.lower() + + return value + + +def make_urn( + config: PowerBiDashboardSourceConfig, + platform_instance_resolver: AbstractDataPlatformInstanceResolver, + data_platform_pair: DataPlatformPair, + server: str, + qualified_table_name: str, +) -> str: + platform_detail: PlatformDetail = platform_instance_resolver.get_platform_instance( + PowerBIPlatformDetail( + data_platform_pair=data_platform_pair, + data_platform_server=server, + ) + ) + + return builder.make_dataset_urn_with_platform_instance( + platform=data_platform_pair.datahub_data_platform_name, + platform_instance=platform_detail.platform_instance, + env=platform_detail.env, + name=urn_to_lowercase( + qualified_table_name, config.convert_lineage_urns_to_lowercase + ), + ) + + +class AbstractLineage(ABC): + """ + Base class to share common functionalities among different dataplatform for M-Query parsing. + + To create qualified table name we need to parse M-Query data-access-functions(https://learn.microsoft.com/en-us/powerquery-m/accessing-data-functions) and + the data-access-functions has some define pattern to access database-name, schema-name and table-name, for example, see below M-Query. + + let + Source = Sql.Database("localhost", "library"), + dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data] + in + dbo_book_issue + + It is MSSQL M-Query and Sql.Database is the data-access-function to access MSSQL. If this function is available in M-Query then database name is available in the second argument of the first statement and schema-name and table-name is available in the second statement. the second statement can be repeated to access different tables from MSSQL. + + DefaultTwoStepDataAccessSources extends the AbstractDataPlatformTableCreator and provides the common functionalities for data-platform which has above type of M-Query pattern + + data-access-function varies as per data-platform for example for MySQL.Database for MySQL, PostgreSQL.Database for Postgres and Oracle.Database for Oracle and number of statement to + find out database-name , schema-name and table-name also varies as per dataplatform. + + Value.NativeQuery is one of the functions which is used to execute a native query inside M-Query, for example see below M-Query + + let + Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) + in + Source + + In this M-Query database-name is available in first argument and rest of the detail i.e database & schema is available in native query. + + NativeQueryDataPlatformTableCreator extends AbstractDataPlatformTableCreator to support Redshift and Snowflake native query parsing. + + """ + + ctx: PipelineContext + table: Table + config: PowerBiDashboardSourceConfig + reporter: PowerBiDashboardSourceReport + platform_instance_resolver: AbstractDataPlatformInstanceResolver + + def __init__( + self, + ctx: PipelineContext, + table: Table, + config: PowerBiDashboardSourceConfig, + reporter: PowerBiDashboardSourceReport, + platform_instance_resolver: AbstractDataPlatformInstanceResolver, + ) -> None: + super().__init__() + self.ctx = ctx + self.table = table + self.config = config + self.reporter = reporter + self.platform_instance_resolver = platform_instance_resolver + + @abstractmethod + def create_lineage( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + pass + + @abstractmethod + def get_platform_pair(self) -> DataPlatformPair: + pass + + @staticmethod + def get_db_detail_from_argument( + arg_list: Tree, + ) -> Tuple[Optional[str], Optional[str]]: + arguments: List[str] = tree_function.strip_char_from_list( + values=tree_function.remove_whitespaces_from_list( + tree_function.token_values(arg_list) + ), + ) + + if len(arguments) < 2: + logger.debug(f"Expected minimum 2 arguments, but got {len(arguments)}") + return None, None + + return arguments[0], arguments[1] + + @staticmethod + def create_reference_table( + arg_list: Tree, + table_detail: Dict[str, str], + ) -> Optional[ReferencedTable]: + arguments: List[str] = tree_function.strip_char_from_list( + values=tree_function.remove_whitespaces_from_list( + tree_function.token_values(arg_list) + ), + ) + + logger.debug(f"Processing arguments {arguments}") + + if ( + len(arguments) + >= 4 # [0] is warehouse FQDN. + # [1] is endpoint, we are not using it. + # [2] is "Catalog" key + # [3] is catalog's value + ): + return ReferencedTable( + warehouse=arguments[0], + catalog=arguments[3], + # As per my observation, database and catalog names are same in M-Query + database=table_detail["Database"] + if table_detail.get("Database") + else arguments[3], + schema=table_detail["Schema"], + table=table_detail.get("Table") or table_detail["View"], + ) + elif len(arguments) == 2: + return ReferencedTable( + warehouse=arguments[0], + database=table_detail["Database"], + schema=table_detail["Schema"], + table=table_detail.get("Table") or table_detail["View"], + catalog=None, + ) + + return None + + def parse_custom_sql( + self, query: str, server: str, database: Optional[str], schema: Optional[str] + ) -> Lineage: + dataplatform_tables: List[DataPlatformTable] = [] + + platform_detail: PlatformDetail = ( + self.platform_instance_resolver.get_platform_instance( + PowerBIPlatformDetail( + data_platform_pair=self.get_platform_pair(), + data_platform_server=server, + ) + ) + ) + + query = native_sql_parser.remove_drop_statement( + native_sql_parser.remove_special_characters(query) + ) + + parsed_result: Optional[ + "SqlParsingResult" + ] = native_sql_parser.parse_custom_sql( + ctx=self.ctx, + query=query, + platform=self.get_platform_pair().datahub_data_platform_name, + platform_instance=platform_detail.platform_instance, + env=platform_detail.env, + database=database, + schema=schema, + ) + + if parsed_result is None: + self.reporter.info( + title=Constant.SQL_PARSING_FAILURE, + message="Fail to parse native sql present in PowerBI M-Query", + context=f"table-name={self.table.full_name}, sql={query}", + ) + return Lineage.empty() + + if parsed_result.debug_info and parsed_result.debug_info.table_error: + self.reporter.warning( + title=Constant.SQL_PARSING_FAILURE, + message="Fail to parse native sql present in PowerBI M-Query", + context=f"table-name={self.table.full_name}, error={parsed_result.debug_info.table_error},sql={query}", + ) + return Lineage.empty() + + for urn in parsed_result.in_tables: + dataplatform_tables.append( + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ) + + logger.debug(f"Native Query parsed result={parsed_result}") + logger.debug(f"Generated dataplatform_tables={dataplatform_tables}") + + return Lineage( + upstreams=dataplatform_tables, + column_lineage=( + parsed_result.column_lineage + if parsed_result.column_lineage is not None + else [] + ), + ) + + +class AmazonRedshiftLineage(AbstractLineage): + def get_platform_pair(self) -> DataPlatformPair: + return SupportedDataPlatform.AMAZON_REDSHIFT.value + + def create_lineage( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + logger.debug( + f"Processing AmazonRedshift data-access function detail {data_access_func_detail}" + ) + + server, db_name = self.get_db_detail_from_argument( + data_access_func_detail.arg_list + ) + if db_name is None or server is None: + return Lineage.empty() # Return an empty list + + schema_name: str = cast( + IdentifierAccessor, data_access_func_detail.identifier_accessor + ).items["Name"] + + table_name: str = cast( + IdentifierAccessor, + cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, + ).items["Name"] + + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" + + urn = make_urn( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + + return Lineage( + upstreams=[ + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ], + column_lineage=[], + ) + + +class OracleLineage(AbstractLineage): + def get_platform_pair(self) -> DataPlatformPair: + return SupportedDataPlatform.ORACLE.value + + @staticmethod + def _get_server_and_db_name(value: str) -> Tuple[Optional[str], Optional[str]]: + error_message: str = ( + f"The target argument ({value}) should in the format of :/[" + ".]" + ) + splitter_result: List[str] = value.split("/") + if len(splitter_result) != 2: + logger.debug(error_message) + return None, None + + db_name = splitter_result[1].split(".")[0] + + return tree_function.strip_char_from_list([splitter_result[0]])[0], db_name + + def create_lineage( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + logger.debug( + f"Processing Oracle data-access function detail {data_access_func_detail}" + ) + + arguments: List[str] = tree_function.remove_whitespaces_from_list( + tree_function.token_values(data_access_func_detail.arg_list) + ) + + server, db_name = self._get_server_and_db_name(arguments[0]) + + if db_name is None or server is None: + return Lineage.empty() + + schema_name: str = cast( + IdentifierAccessor, data_access_func_detail.identifier_accessor + ).items["Schema"] + + table_name: str = cast( + IdentifierAccessor, + cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, + ).items["Name"] + + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" + + urn = make_urn( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + + return Lineage( + upstreams=[ + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ], + column_lineage=[], + ) + + +class DatabricksLineage(AbstractLineage): + def form_qualified_table_name( + self, + table_reference: ReferencedTable, + data_platform_pair: DataPlatformPair, + ) -> str: + platform_detail: PlatformDetail = ( + self.platform_instance_resolver.get_platform_instance( + PowerBIPlatformDetail( + data_platform_pair=data_platform_pair, + data_platform_server=table_reference.warehouse, + ) + ) + ) + + metastore: Optional[str] = None + + qualified_table_name: str = f"{table_reference.database}.{table_reference.schema}.{table_reference.table}" + + if isinstance(platform_detail, DataBricksPlatformDetail): + metastore = platform_detail.metastore + + if metastore is not None: + return f"{metastore}.{qualified_table_name}" + + return qualified_table_name + + def create_lineage( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + logger.debug( + f"Processing Databrick data-access function detail {data_access_func_detail}" + ) + table_detail: Dict[str, str] = {} + temp_accessor: Optional[ + Union[IdentifierAccessor, AbstractIdentifierAccessor] + ] = data_access_func_detail.identifier_accessor + + while temp_accessor: + if isinstance(temp_accessor, IdentifierAccessor): + # Condition to handle databricks M-query pattern where table, schema and database all are present in + # the same invoke statement + if all( + element in temp_accessor.items + for element in ["Item", "Schema", "Catalog"] + ): + table_detail["Schema"] = temp_accessor.items["Schema"] + table_detail["Table"] = temp_accessor.items["Item"] + else: + table_detail[temp_accessor.items["Kind"]] = temp_accessor.items[ + "Name" + ] + + if temp_accessor.next is not None: + temp_accessor = temp_accessor.next + else: + break + else: + logger.debug( + "expecting instance to be IdentifierAccessor, please check if parsing is done properly" + ) + return Lineage.empty() + + table_reference = self.create_reference_table( + arg_list=data_access_func_detail.arg_list, + table_detail=table_detail, + ) + + if table_reference: + qualified_table_name: str = self.form_qualified_table_name( + table_reference=table_reference, + data_platform_pair=self.get_platform_pair(), + ) + + urn = make_urn( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=table_reference.warehouse, + qualified_table_name=qualified_table_name, + ) + + return Lineage( + upstreams=[ + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ], + column_lineage=[], + ) + + return Lineage.empty() + + def get_platform_pair(self) -> DataPlatformPair: + return SupportedDataPlatform.DATABRICKS_SQL.value + + +class TwoStepDataAccessPattern(AbstractLineage, ABC): + """ + These are the DataSource for which PowerBI Desktop generates default M-Query of the following pattern + let + Source = Sql.Database("localhost", "library"), + dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data] + in + dbo_book_issue + """ + + def two_level_access_pattern( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + logger.debug( + f"Processing {self.get_platform_pair().powerbi_data_platform_name} data-access function detail {data_access_func_detail}" + ) + + server, db_name = self.get_db_detail_from_argument( + data_access_func_detail.arg_list + ) + if server is None or db_name is None: + return Lineage.empty() # Return an empty list + + schema_name: str = cast( + IdentifierAccessor, data_access_func_detail.identifier_accessor + ).items["Schema"] + + table_name: str = cast( + IdentifierAccessor, data_access_func_detail.identifier_accessor + ).items["Item"] + + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" + + logger.debug( + f"Platform({self.get_platform_pair().datahub_data_platform_name}) qualified_table_name= {qualified_table_name}" + ) + + urn = make_urn( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + return Lineage( + upstreams=[ + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ], + column_lineage=[], + ) + + +class PostgresLineage(TwoStepDataAccessPattern): + def create_lineage( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + return self.two_level_access_pattern(data_access_func_detail) + + def get_platform_pair(self) -> DataPlatformPair: + return SupportedDataPlatform.POSTGRES_SQL.value + + +class MSSqlLineage(TwoStepDataAccessPattern): + # https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16 + DEFAULT_SCHEMA = "dbo" # Default schema name in MS-SQL is dbo + + def get_platform_pair(self) -> DataPlatformPair: + return SupportedDataPlatform.MS_SQL.value + + def create_urn_using_old_parser( + self, query: str, db_name: str, server: str + ) -> List[DataPlatformTable]: + dataplatform_tables: List[DataPlatformTable] = [] + + tables: List[str] = native_sql_parser.get_tables(query) + + for parsed_table in tables: + # components: List[str] = [v.strip("[]") for v in parsed_table.split(".")] + components = [v.strip("[]") for v in parsed_table.split(".")] + if len(components) == 3: + database, schema, table = components + elif len(components) == 2: + schema, table = components + database = db_name + elif len(components) == 1: + (table,) = components + database = db_name + schema = MSSqlLineage.DEFAULT_SCHEMA + else: + self.reporter.warning( + title="Invalid table format", + message="The advanced SQL lineage feature (enable_advance_lineage_sql_construct) is disabled. Please either enable this feature or ensure the table is referenced as .. in the SQL.", + context=f"table-name={self.table.full_name}", + ) + continue + + qualified_table_name = f"{database}.{schema}.{table}" + urn = make_urn( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + dataplatform_tables.append( + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ) + + logger.debug(f"Generated upstream tables = {dataplatform_tables}") + + return dataplatform_tables + + def create_lineage( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + arguments: List[str] = tree_function.strip_char_from_list( + values=tree_function.remove_whitespaces_from_list( + tree_function.token_values(data_access_func_detail.arg_list) + ), + ) + + server, database = self.get_db_detail_from_argument( + data_access_func_detail.arg_list + ) + if server is None or database is None: + return Lineage.empty() # Return an empty list + + assert server + assert database # to silent the lint + + query: Optional[str] = get_next_item(arguments, "Query") + if query: + if self.config.enable_advance_lineage_sql_construct is False: + # Use previous parser to generate URN to keep backward compatibility + return Lineage( + upstreams=self.create_urn_using_old_parser( + query=query, + db_name=database, + server=server, + ), + column_lineage=[], + ) + + return self.parse_custom_sql( + query=query, + database=database, + server=server, + schema=MSSqlLineage.DEFAULT_SCHEMA, + ) + + # It is a regular case of MS-SQL + logger.debug("Handling with regular case") + return self.two_level_access_pattern(data_access_func_detail) + + +class ThreeStepDataAccessPattern(AbstractLineage, ABC): + def get_datasource_server( + self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail + ) -> str: + return tree_function.strip_char_from_list([arguments[0]])[0] + + def create_lineage( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + logger.debug( + f"Processing {self.get_platform_pair().datahub_data_platform_name} function detail {data_access_func_detail}" + ) + + arguments: List[str] = tree_function.remove_whitespaces_from_list( + tree_function.token_values(data_access_func_detail.arg_list) + ) + # First is database name + db_name: str = data_access_func_detail.identifier_accessor.items["Name"] # type: ignore + # Second is schema name + schema_name: str = cast( + IdentifierAccessor, data_access_func_detail.identifier_accessor.next # type: ignore + ).items["Name"] + # Third is table name + table_name: str = cast( + IdentifierAccessor, data_access_func_detail.identifier_accessor.next.next # type: ignore + ).items["Name"] + + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" + + logger.debug( + f"{self.get_platform_pair().datahub_data_platform_name} qualified_table_name {qualified_table_name}" + ) + + server: str = self.get_datasource_server(arguments, data_access_func_detail) + + urn = make_urn( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + + return Lineage( + upstreams=[ + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ], + column_lineage=[], + ) + + +class SnowflakeLineage(ThreeStepDataAccessPattern): + def get_platform_pair(self) -> DataPlatformPair: + return SupportedDataPlatform.SNOWFLAKE.value + + +class GoogleBigQueryLineage(ThreeStepDataAccessPattern): + def get_platform_pair(self) -> DataPlatformPair: + return SupportedDataPlatform.GOOGLE_BIGQUERY.value + + def get_datasource_server( + self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail + ) -> str: + # In Google BigQuery server is project-name + # condition to silent lint, it is not going to be None + return ( + data_access_func_detail.identifier_accessor.items["Name"] + if data_access_func_detail.identifier_accessor is not None + else "" + ) + + +class NativeQueryLineage(AbstractLineage): + SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = { + SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE, + SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT, + SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name: SupportedDataPlatform.DatabricksMultiCloud_SQL, + } + current_data_platform: SupportedDataPlatform = SupportedDataPlatform.SNOWFLAKE + + def get_platform_pair(self) -> DataPlatformPair: + return self.current_data_platform.value + + @staticmethod + def is_native_parsing_supported(data_access_function_name: str) -> bool: + return ( + data_access_function_name + in NativeQueryLineage.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM + ) + + def create_urn_using_old_parser(self, query: str, server: str) -> Lineage: + dataplatform_tables: List[DataPlatformTable] = [] + + tables: List[str] = native_sql_parser.get_tables(query) + + for qualified_table_name in tables: + if len(qualified_table_name.split(".")) != 3: + logger.debug( + f"Skipping table {qualified_table_name} as it is not as per qualified_table_name format" + ) + continue + + urn = make_urn( + config=self.config, + platform_instance_resolver=self.platform_instance_resolver, + data_platform_pair=self.get_platform_pair(), + server=server, + qualified_table_name=qualified_table_name, + ) + + dataplatform_tables.append( + DataPlatformTable( + data_platform_pair=self.get_platform_pair(), + urn=urn, + ) + ) + + logger.debug(f"Generated dataplatform_tables {dataplatform_tables}") + + return Lineage( + upstreams=dataplatform_tables, + column_lineage=[], + ) + + def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]: + if ( + data_access_tokens[0] + != SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name + ): + return None + + database: Optional[str] = get_next_item(data_access_tokens, "Database") + + if ( + database and database != Constant.M_QUERY_NULL + ): # database name is explicitly set + return database + + return get_next_item( # database name is set in Name argument + data_access_tokens, "Name" + ) or get_next_item( # If both above arguments are not available, then try Catalog + data_access_tokens, "Catalog" + ) + + def create_lineage( + self, data_access_func_detail: DataAccessFunctionDetail + ) -> Lineage: + t1: Tree = cast( + Tree, tree_function.first_arg_list_func(data_access_func_detail.arg_list) + ) + flat_argument_list: List[Tree] = tree_function.flat_argument_list(t1) + + if len(flat_argument_list) != 2: + logger.debug( + f"Expecting 2 argument, actual argument count is {len(flat_argument_list)}" + ) + logger.debug(f"Flat argument list = {flat_argument_list}") + return Lineage.empty() + + data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list( + tree_function.token_values(flat_argument_list[0]) + ) + + if not self.is_native_parsing_supported(data_access_tokens[0]): + logger.debug( + f"Unsupported native-query data-platform = {data_access_tokens[0]}" + ) + logger.debug( + f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}" + ) + + return Lineage.empty() + + if len(data_access_tokens[0]) < 3: + logger.debug( + f"Server is not available in argument list for data-platform {data_access_tokens[0]}. Returning empty " + "list" + ) + return Lineage.empty() + + self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[ + data_access_tokens[0] + ] + # The First argument is the query + sql_query: str = tree_function.strip_char_from_list( + values=tree_function.remove_whitespaces_from_list( + tree_function.token_values(flat_argument_list[1]) + ), + )[ + 0 + ] # Remove any whitespaces and double quotes character + + server = tree_function.strip_char_from_list([data_access_tokens[2]])[0] + + if self.config.enable_advance_lineage_sql_construct is False: + # Use previous parser to generate URN to keep backward compatibility + return self.create_urn_using_old_parser( + query=sql_query, + server=server, + ) + + database_name: Optional[str] = self.get_db_name(data_access_tokens) + + return self.parse_custom_sql( + query=sql_query, + server=server, + database=database_name, + schema=None, + ) + + +class SupportedPattern(Enum): + DATABRICKS_QUERY = ( + DatabricksLineage, + FunctionName.DATABRICK_DATA_ACCESS, + ) + + DATABRICKS_MULTI_CLOUD = ( + DatabricksLineage, + FunctionName.DATABRICK_MULTI_CLOUD_DATA_ACCESS, + ) + + POSTGRES_SQL = ( + PostgresLineage, + FunctionName.POSTGRESQL_DATA_ACCESS, + ) + + ORACLE = ( + OracleLineage, + FunctionName.ORACLE_DATA_ACCESS, + ) + + SNOWFLAKE = ( + SnowflakeLineage, + FunctionName.SNOWFLAKE_DATA_ACCESS, + ) + + MS_SQL = ( + MSSqlLineage, + FunctionName.MSSQL_DATA_ACCESS, + ) + + GOOGLE_BIG_QUERY = ( + GoogleBigQueryLineage, + FunctionName.GOOGLE_BIGQUERY_DATA_ACCESS, + ) + + AMAZON_REDSHIFT = ( + AmazonRedshiftLineage, + FunctionName.AMAZON_REDSHIFT_DATA_ACCESS, + ) + + NATIVE_QUERY = ( + NativeQueryLineage, + FunctionName.NATIVE_QUERY, + ) + + def handler(self) -> Type[AbstractLineage]: + return self.value[0] + + def function_name(self) -> str: + return self.value[1].value + + @staticmethod + def get_function_names() -> List[str]: + functions: List[str] = [] + for supported_resolver in SupportedPattern: + functions.append(supported_resolver.function_name()) + + return functions + + @staticmethod + def get_pattern_handler(function_name: str) -> Optional["SupportedPattern"]: + logger.debug(f"Looking for pattern-handler for {function_name}") + for supported_resolver in SupportedPattern: + if function_name == supported_resolver.function_name(): + return supported_resolver + logger.debug(f"pattern-handler not found for function_name {function_name}") + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index a40e67d08da5b..81a0e1ef2d79b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -1,286 +1,33 @@ import logging from abc import ABC, abstractmethod -from dataclasses import dataclass -from enum import Enum -from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast +from typing import Any, Dict, List, Optional, Tuple, Union, cast from lark import Tree -import datahub.emitter.mce_builder as builder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.powerbi.config import ( - Constant, - DataBricksPlatformDetail, - DataPlatformPair, - PlatformDetail, PowerBiDashboardSourceConfig, PowerBiDashboardSourceReport, - PowerBIPlatformDetail, - SupportedDataPlatform, ) from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( AbstractDataPlatformInstanceResolver, ) -from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function +from datahub.ingestion.source.powerbi.m_query import tree_function from datahub.ingestion.source.powerbi.m_query.data_classes import ( TRACE_POWERBI_MQUERY_PARSER, - AbstractIdentifierAccessor, DataAccessFunctionDetail, IdentifierAccessor, - ReferencedTable, + Lineage, +) +from datahub.ingestion.source.powerbi.m_query.pattern_handler import ( + AbstractLineage, + SupportedPattern, ) from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table -from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult logger = logging.getLogger(__name__) -@dataclass -class DataPlatformTable: - data_platform_pair: DataPlatformPair - urn: str - - -@dataclass -class Lineage: - upstreams: List[DataPlatformTable] - column_lineage: List[ColumnLineageInfo] - - @staticmethod - def empty() -> "Lineage": - return Lineage(upstreams=[], column_lineage=[]) - - -def urn_to_lowercase(value: str, flag: bool) -> str: - if flag is True: - return value.lower() - - return value - - -def urn_creator( - config: PowerBiDashboardSourceConfig, - platform_instance_resolver: AbstractDataPlatformInstanceResolver, - data_platform_pair: DataPlatformPair, - server: str, - qualified_table_name: str, -) -> str: - platform_detail: PlatformDetail = platform_instance_resolver.get_platform_instance( - PowerBIPlatformDetail( - data_platform_pair=data_platform_pair, - data_platform_server=server, - ) - ) - - return builder.make_dataset_urn_with_platform_instance( - platform=data_platform_pair.datahub_data_platform_name, - platform_instance=platform_detail.platform_instance, - env=platform_detail.env, - name=urn_to_lowercase( - qualified_table_name, config.convert_lineage_urns_to_lowercase - ), - ) - - -def get_next_item(items: List[str], item: str) -> Optional[str]: - if item in items: - try: - index = items.index(item) - return items[index + 1] - except IndexError: - logger.debug(f'item:"{item}", not found in item-list: {items}') - return None - - -class AbstractDataPlatformTableCreator(ABC): - """ - Base class to share common functionalities among different dataplatform for M-Query parsing. - - To create qualified table name we need to parse M-Query data-access-functions(https://learn.microsoft.com/en-us/powerquery-m/accessing-data-functions) and - the data-access-functions has some define pattern to access database-name, schema-name and table-name, for example see below M-Query. - - let - Source = Sql.Database("localhost", "library"), - dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data] - in - dbo_book_issue - - It is MSSQL M-Query and Sql.Database is the data-access-function to access MSSQL. If this function is available in M-Query then database name is available in second argument - of first statement and schema-name and table-name is available in second statement. second statement can be repeated to access different tables from MSSQL. - - DefaultTwoStepDataAccessSources extends the AbstractDataPlatformTableCreator and provides the common functionalities for data-platform which has above type of M-Query pattern - - data-access-function varies as per data-platform for example for MySQL.Database for MySQL, PostgreSQL.Database for Postgres and Oracle.Database for Oracle and number of statement to - find out database-name , schema-name and table-name also varies as per dataplatform. - - Value.NativeQuery is one of the function which is used to execute native query inside M-Query, for example see below M-Query - - let - Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) - in - Source - - In this M-Query database-name is available in first argument and rest of the detail i.e database & schema is available in native query. - - NativeQueryDataPlatformTableCreator extends AbstractDataPlatformTableCreator to support Redshift and Snowflake native query parsing. - - """ - - ctx: PipelineContext - table: Table - config: PowerBiDashboardSourceConfig - reporter: PowerBiDashboardSourceReport - platform_instance_resolver: AbstractDataPlatformInstanceResolver - - def __init__( - self, - ctx: PipelineContext, - table: Table, - config: PowerBiDashboardSourceConfig, - reporter: PowerBiDashboardSourceReport, - platform_instance_resolver: AbstractDataPlatformInstanceResolver, - ) -> None: - super().__init__() - self.ctx = ctx - self.table = table - self.config = config - self.reporter = reporter - self.platform_instance_resolver = platform_instance_resolver - - @abstractmethod - def create_lineage( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - pass - - @abstractmethod - def get_platform_pair(self) -> DataPlatformPair: - pass - - @staticmethod - def get_db_detail_from_argument( - arg_list: Tree, - ) -> Tuple[Optional[str], Optional[str]]: - arguments: List[str] = tree_function.strip_char_from_list( - values=tree_function.remove_whitespaces_from_list( - tree_function.token_values(arg_list) - ), - ) - - if len(arguments) < 2: - logger.debug(f"Expected minimum 2 arguments, but got {len(arguments)}") - return None, None - - return arguments[0], arguments[1] - - @staticmethod - def create_reference_table( - arg_list: Tree, - table_detail: Dict[str, str], - ) -> Optional[ReferencedTable]: - arguments: List[str] = tree_function.strip_char_from_list( - values=tree_function.remove_whitespaces_from_list( - tree_function.token_values(arg_list) - ), - ) - - logger.debug(f"Processing arguments {arguments}") - - if ( - len(arguments) - >= 4 # [0] is warehouse FQDN. - # [1] is endpoint, we are not using it. - # [2] is "Catalog" key - # [3] is catalog's value - ): - return ReferencedTable( - warehouse=arguments[0], - catalog=arguments[3], - # As per my observation, database and catalog names are same in M-Query - database=table_detail["Database"] - if table_detail.get("Database") - else arguments[3], - schema=table_detail["Schema"], - table=table_detail.get("Table") or table_detail["View"], - ) - elif len(arguments) == 2: - return ReferencedTable( - warehouse=arguments[0], - database=table_detail["Database"], - schema=table_detail["Schema"], - table=table_detail.get("Table") or table_detail["View"], - catalog=None, - ) - - return None - - def parse_custom_sql( - self, query: str, server: str, database: Optional[str], schema: Optional[str] - ) -> Lineage: - dataplatform_tables: List[DataPlatformTable] = [] - - platform_detail: PlatformDetail = ( - self.platform_instance_resolver.get_platform_instance( - PowerBIPlatformDetail( - data_platform_pair=self.get_platform_pair(), - data_platform_server=server, - ) - ) - ) - - query = native_sql_parser.remove_drop_statement( - native_sql_parser.remove_special_characters(query) - ) - - parsed_result: Optional[ - "SqlParsingResult" - ] = native_sql_parser.parse_custom_sql( - ctx=self.ctx, - query=query, - platform=self.get_platform_pair().datahub_data_platform_name, - platform_instance=platform_detail.platform_instance, - env=platform_detail.env, - database=database, - schema=schema, - ) - - if parsed_result is None: - self.reporter.info( - title=Constant.SQL_PARSING_FAILURE, - message="Fail to parse native sql present in PowerBI M-Query", - context=f"table-name={self.table.full_name}, sql={query}", - ) - return Lineage.empty() - - if parsed_result.debug_info and parsed_result.debug_info.table_error: - self.reporter.warning( - title=Constant.SQL_PARSING_FAILURE, - message="Fail to parse native sql present in PowerBI M-Query", - context=f"table-name={self.table.full_name}, error={parsed_result.debug_info.table_error},sql={query}", - ) - return Lineage.empty() - - for urn in parsed_result.in_tables: - dataplatform_tables.append( - DataPlatformTable( - data_platform_pair=self.get_platform_pair(), - urn=urn, - ) - ) - - logger.debug(f"Native Query parsed result={parsed_result}") - logger.debug(f"Generated dataplatform_tables={dataplatform_tables}") - - return Lineage( - upstreams=dataplatform_tables, - column_lineage=( - parsed_result.column_lineage - if parsed_result.column_lineage is not None - else [] - ), - ) - - class AbstractDataAccessMQueryResolver(ABC): table: Table parse_tree: Tree @@ -299,10 +46,10 @@ def __init__( self.parse_tree = parse_tree self.reporter = reporter self.parameters = parameters - self.data_access_functions = SupportedResolver.get_function_names() + self.data_access_functions = SupportedPattern.get_function_names() @abstractmethod - def resolve_to_data_platform_table_list( + def resolve_to_lineage( self, ctx: PipelineContext, config: PowerBiDashboardSourceConfig, @@ -318,7 +65,7 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC): This class has generic code to process M-Query tokens and create instance of DataAccessFunctionDetail. Once DataAccessFunctionDetail instance is initialized thereafter MQueryResolver generates the DataPlatformTable with the help of AbstractDataPlatformTableCreator - (see method resolve_to_data_platform_table_list). + (see method resolve_to_lineage). Classes which extended from AbstractDataPlatformTableCreator know how to convert generated DataAccessFunctionDetail instance to the respective DataPlatformTable instance as per dataplatform. @@ -602,7 +349,7 @@ def internal( return table_links - def resolve_to_data_platform_table_list( + def resolve_to_lineage( self, ctx: PipelineContext, config: PowerBiDashboardSourceConfig, @@ -630,7 +377,7 @@ def resolve_to_data_platform_table_list( # Each item is data-access function for f_detail in table_links: # Get & Check if we support data-access-function available in M-Query - supported_resolver = SupportedResolver.get_resolver( + supported_resolver = SupportedPattern.get_pattern_handler( f_detail.data_access_function_name ) if supported_resolver is None: @@ -643,11 +390,9 @@ def resolve_to_data_platform_table_list( ) continue - # From supported_resolver enum get respective resolver like AmazonRedshift or Snowflake or Oracle or NativeQuery and create instance of it - # & also pass additional information that will be need to generate urn - table_qualified_name_creator: ( - AbstractDataPlatformTableCreator - ) = supported_resolver.get_table_full_name_creator()( + # From supported_resolver enum get respective handler like AmazonRedshift or Snowflake or Oracle or NativeQuery and create instance of it + # & also pass additional information that will be need to generate lineage + pattern_handler: (AbstractLineage) = supported_resolver.handler()( ctx=ctx, table=self.table, config=config, @@ -655,673 +400,6 @@ def resolve_to_data_platform_table_list( platform_instance_resolver=platform_instance_resolver, ) - lineage.append(table_qualified_name_creator.create_lineage(f_detail)) + lineage.append(pattern_handler.create_lineage(f_detail)) return lineage - - -class DefaultTwoStepDataAccessSources(AbstractDataPlatformTableCreator, ABC): - """ - These are the DataSource for which PowerBI Desktop generates default M-Query of following pattern - let - Source = Sql.Database("localhost", "library"), - dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data] - in - dbo_book_issue - """ - - def two_level_access_pattern( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - logger.debug( - f"Processing {self.get_platform_pair().powerbi_data_platform_name} data-access function detail {data_access_func_detail}" - ) - - server, db_name = self.get_db_detail_from_argument( - data_access_func_detail.arg_list - ) - if server is None or db_name is None: - return Lineage.empty() # Return an empty list - - schema_name: str = cast( - IdentifierAccessor, data_access_func_detail.identifier_accessor - ).items["Schema"] - - table_name: str = cast( - IdentifierAccessor, data_access_func_detail.identifier_accessor - ).items["Item"] - - qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" - - logger.debug( - f"Platform({self.get_platform_pair().datahub_data_platform_name}) qualified_table_name= {qualified_table_name}" - ) - - urn = urn_creator( - config=self.config, - platform_instance_resolver=self.platform_instance_resolver, - data_platform_pair=self.get_platform_pair(), - server=server, - qualified_table_name=qualified_table_name, - ) - return Lineage( - upstreams=[ - DataPlatformTable( - data_platform_pair=self.get_platform_pair(), - urn=urn, - ) - ], - column_lineage=[], - ) - - -class PostgresDataPlatformTableCreator(DefaultTwoStepDataAccessSources): - def create_lineage( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - return self.two_level_access_pattern(data_access_func_detail) - - def get_platform_pair(self) -> DataPlatformPair: - return SupportedDataPlatform.POSTGRES_SQL.value - - -class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources): - # https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16 - DEFAULT_SCHEMA = "dbo" # Default schema name in MS-SQL is dbo - - def get_platform_pair(self) -> DataPlatformPair: - return SupportedDataPlatform.MS_SQL.value - - def create_urn_using_old_parser( - self, query: str, db_name: str, server: str - ) -> List[DataPlatformTable]: - dataplatform_tables: List[DataPlatformTable] = [] - - tables: List[str] = native_sql_parser.get_tables(query) - - for parsed_table in tables: - # components: List[str] = [v.strip("[]") for v in parsed_table.split(".")] - components = [v.strip("[]") for v in parsed_table.split(".")] - if len(components) == 3: - database, schema, table = components - elif len(components) == 2: - schema, table = components - database = db_name - elif len(components) == 1: - (table,) = components - database = db_name - schema = MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA - else: - self.reporter.warning( - title="Invalid table format", - message="The advanced SQL lineage feature (enable_advance_lineage_sql_construct) is disabled. Please either enable this feature or ensure the table is referenced as .. in the SQL.", - context=f"table-name={self.table.full_name}", - ) - continue - - qualified_table_name = f"{database}.{schema}.{table}" - urn = urn_creator( - config=self.config, - platform_instance_resolver=self.platform_instance_resolver, - data_platform_pair=self.get_platform_pair(), - server=server, - qualified_table_name=qualified_table_name, - ) - dataplatform_tables.append( - DataPlatformTable( - data_platform_pair=self.get_platform_pair(), - urn=urn, - ) - ) - - logger.debug(f"Generated upstream tables = {dataplatform_tables}") - - return dataplatform_tables - - def create_lineage( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - arguments: List[str] = tree_function.strip_char_from_list( - values=tree_function.remove_whitespaces_from_list( - tree_function.token_values(data_access_func_detail.arg_list) - ), - ) - - server, database = self.get_db_detail_from_argument( - data_access_func_detail.arg_list - ) - if server is None or database is None: - return Lineage.empty() # Return an empty list - - assert server - assert database # to silent the lint - - query: Optional[str] = get_next_item(arguments, "Query") - if query: - if self.config.enable_advance_lineage_sql_construct is False: - # Use previous parser to generate URN to keep backward compatibility - return Lineage( - upstreams=self.create_urn_using_old_parser( - query=query, - db_name=database, - server=server, - ), - column_lineage=[], - ) - - return self.parse_custom_sql( - query=query, - database=database, - server=server, - schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA, - ) - - # It is a regular case of MS-SQL - logger.debug("Handling with regular case") - return self.two_level_access_pattern(data_access_func_detail) - - -class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator): - def get_platform_pair(self) -> DataPlatformPair: - return SupportedDataPlatform.ORACLE.value - - @staticmethod - def _get_server_and_db_name(value: str) -> Tuple[Optional[str], Optional[str]]: - error_message: str = ( - f"The target argument ({value}) should in the format of :/[" - ".]" - ) - splitter_result: List[str] = value.split("/") - if len(splitter_result) != 2: - logger.debug(error_message) - return None, None - - db_name = splitter_result[1].split(".")[0] - - return tree_function.strip_char_from_list([splitter_result[0]])[0], db_name - - def create_lineage( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - logger.debug( - f"Processing Oracle data-access function detail {data_access_func_detail}" - ) - - arguments: List[str] = tree_function.remove_whitespaces_from_list( - tree_function.token_values(data_access_func_detail.arg_list) - ) - - server, db_name = self._get_server_and_db_name(arguments[0]) - - if db_name is None or server is None: - return Lineage.empty() - - schema_name: str = cast( - IdentifierAccessor, data_access_func_detail.identifier_accessor - ).items["Schema"] - - table_name: str = cast( - IdentifierAccessor, - cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, - ).items["Name"] - - qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" - - urn = urn_creator( - config=self.config, - platform_instance_resolver=self.platform_instance_resolver, - data_platform_pair=self.get_platform_pair(), - server=server, - qualified_table_name=qualified_table_name, - ) - - return Lineage( - upstreams=[ - DataPlatformTable( - data_platform_pair=self.get_platform_pair(), - urn=urn, - ) - ], - column_lineage=[], - ) - - -class DatabrickDataPlatformTableCreator(AbstractDataPlatformTableCreator): - def form_qualified_table_name( - self, - table_reference: ReferencedTable, - data_platform_pair: DataPlatformPair, - ) -> str: - platform_detail: PlatformDetail = ( - self.platform_instance_resolver.get_platform_instance( - PowerBIPlatformDetail( - data_platform_pair=data_platform_pair, - data_platform_server=table_reference.warehouse, - ) - ) - ) - - metastore: Optional[str] = None - - qualified_table_name: str = f"{table_reference.database}.{table_reference.schema}.{table_reference.table}" - - if isinstance(platform_detail, DataBricksPlatformDetail): - metastore = platform_detail.metastore - - if metastore is not None: - return f"{metastore}.{qualified_table_name}" - - return qualified_table_name - - def create_lineage( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - logger.debug( - f"Processing Databrick data-access function detail {data_access_func_detail}" - ) - table_detail: Dict[str, str] = {} - temp_accessor: Optional[ - Union[IdentifierAccessor, AbstractIdentifierAccessor] - ] = data_access_func_detail.identifier_accessor - - while temp_accessor: - if isinstance(temp_accessor, IdentifierAccessor): - # Condition to handle databricks M-query pattern where table, schema and database all are present in - # the same invoke statement - if all( - element in temp_accessor.items - for element in ["Item", "Schema", "Catalog"] - ): - table_detail["Schema"] = temp_accessor.items["Schema"] - table_detail["Table"] = temp_accessor.items["Item"] - else: - table_detail[temp_accessor.items["Kind"]] = temp_accessor.items[ - "Name" - ] - - if temp_accessor.next is not None: - temp_accessor = temp_accessor.next - else: - break - else: - logger.debug( - "expecting instance to be IdentifierAccessor, please check if parsing is done properly" - ) - return Lineage.empty() - - table_reference = self.create_reference_table( - arg_list=data_access_func_detail.arg_list, - table_detail=table_detail, - ) - - if table_reference: - qualified_table_name: str = self.form_qualified_table_name( - table_reference=table_reference, - data_platform_pair=self.get_platform_pair(), - ) - - urn = urn_creator( - config=self.config, - platform_instance_resolver=self.platform_instance_resolver, - data_platform_pair=self.get_platform_pair(), - server=table_reference.warehouse, - qualified_table_name=qualified_table_name, - ) - - return Lineage( - upstreams=[ - DataPlatformTable( - data_platform_pair=self.get_platform_pair(), - urn=urn, - ) - ], - column_lineage=[], - ) - - return Lineage.empty() - - def get_platform_pair(self) -> DataPlatformPair: - return SupportedDataPlatform.DATABRICK_SQL.value - - -class DefaultThreeStepDataAccessSources(AbstractDataPlatformTableCreator, ABC): - def get_datasource_server( - self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail - ) -> str: - return tree_function.strip_char_from_list([arguments[0]])[0] - - def create_lineage( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - logger.debug( - f"Processing {self.get_platform_pair().datahub_data_platform_name} function detail {data_access_func_detail}" - ) - - arguments: List[str] = tree_function.remove_whitespaces_from_list( - tree_function.token_values(data_access_func_detail.arg_list) - ) - # First is database name - db_name: str = data_access_func_detail.identifier_accessor.items["Name"] # type: ignore - # Second is schema name - schema_name: str = cast( - IdentifierAccessor, data_access_func_detail.identifier_accessor.next # type: ignore - ).items["Name"] - # Third is table name - table_name: str = cast( - IdentifierAccessor, data_access_func_detail.identifier_accessor.next.next # type: ignore - ).items["Name"] - - qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" - - logger.debug( - f"{self.get_platform_pair().datahub_data_platform_name} qualified_table_name {qualified_table_name}" - ) - - server: str = self.get_datasource_server(arguments, data_access_func_detail) - - urn = urn_creator( - config=self.config, - platform_instance_resolver=self.platform_instance_resolver, - data_platform_pair=self.get_platform_pair(), - server=server, - qualified_table_name=qualified_table_name, - ) - - return Lineage( - upstreams=[ - DataPlatformTable( - data_platform_pair=self.get_platform_pair(), - urn=urn, - ) - ], - column_lineage=[], - ) - - -class SnowflakeDataPlatformTableCreator(DefaultThreeStepDataAccessSources): - def get_platform_pair(self) -> DataPlatformPair: - return SupportedDataPlatform.SNOWFLAKE.value - - -class GoogleBigQueryDataPlatformTableCreator(DefaultThreeStepDataAccessSources): - def get_platform_pair(self) -> DataPlatformPair: - return SupportedDataPlatform.GOOGLE_BIGQUERY.value - - def get_datasource_server( - self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail - ) -> str: - # In Google BigQuery server is project-name - # condition to silent lint, it is not going to be None - return ( - data_access_func_detail.identifier_accessor.items["Name"] - if data_access_func_detail.identifier_accessor is not None - else "" - ) - - -class AmazonRedshiftDataPlatformTableCreator(AbstractDataPlatformTableCreator): - def get_platform_pair(self) -> DataPlatformPair: - return SupportedDataPlatform.AMAZON_REDSHIFT.value - - def create_lineage( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - logger.debug( - f"Processing AmazonRedshift data-access function detail {data_access_func_detail}" - ) - - server, db_name = self.get_db_detail_from_argument( - data_access_func_detail.arg_list - ) - if db_name is None or server is None: - return Lineage.empty() # Return empty list - - schema_name: str = cast( - IdentifierAccessor, data_access_func_detail.identifier_accessor - ).items["Name"] - - table_name: str = cast( - IdentifierAccessor, - cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, - ).items["Name"] - - qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" - - urn = urn_creator( - config=self.config, - platform_instance_resolver=self.platform_instance_resolver, - data_platform_pair=self.get_platform_pair(), - server=server, - qualified_table_name=qualified_table_name, - ) - - return Lineage( - upstreams=[ - DataPlatformTable( - data_platform_pair=self.get_platform_pair(), - urn=urn, - ) - ], - column_lineage=[], - ) - - -class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator): - SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = { - SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE, - SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT, - SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name: SupportedDataPlatform.DatabricksMultiCloud_SQL, - } - current_data_platform: SupportedDataPlatform = SupportedDataPlatform.SNOWFLAKE - - def get_platform_pair(self) -> DataPlatformPair: - return self.current_data_platform.value - - @staticmethod - def is_native_parsing_supported(data_access_function_name: str) -> bool: - return ( - data_access_function_name - in NativeQueryDataPlatformTableCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM - ) - - def create_urn_using_old_parser(self, query: str, server: str) -> Lineage: - dataplatform_tables: List[DataPlatformTable] = [] - - tables: List[str] = native_sql_parser.get_tables(query) - - for qualified_table_name in tables: - if len(qualified_table_name.split(".")) != 3: - logger.debug( - f"Skipping table {qualified_table_name} as it is not as per qualified_table_name format" - ) - continue - - urn = urn_creator( - config=self.config, - platform_instance_resolver=self.platform_instance_resolver, - data_platform_pair=self.get_platform_pair(), - server=server, - qualified_table_name=qualified_table_name, - ) - - dataplatform_tables.append( - DataPlatformTable( - data_platform_pair=self.get_platform_pair(), - urn=urn, - ) - ) - - logger.debug(f"Generated dataplatform_tables {dataplatform_tables}") - - return Lineage( - upstreams=dataplatform_tables, - column_lineage=[], - ) - - def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]: - if ( - data_access_tokens[0] - != SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name - ): - return None - - database: Optional[str] = get_next_item(data_access_tokens, "Database") - - if ( - database and database != Constant.M_QUERY_NULL - ): # database name is explicitly set - return database - - return get_next_item( # database name is set in Name argument - data_access_tokens, "Name" - ) or get_next_item( # If both above arguments are not available, then try Catalog - data_access_tokens, "Catalog" - ) - - def create_lineage( - self, data_access_func_detail: DataAccessFunctionDetail - ) -> Lineage: - t1: Tree = cast( - Tree, tree_function.first_arg_list_func(data_access_func_detail.arg_list) - ) - flat_argument_list: List[Tree] = tree_function.flat_argument_list(t1) - - if len(flat_argument_list) != 2: - logger.debug( - f"Expecting 2 argument, actual argument count is {len(flat_argument_list)}" - ) - logger.debug(f"Flat argument list = {flat_argument_list}") - return Lineage.empty() - - data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list( - tree_function.token_values(flat_argument_list[0]) - ) - - if not self.is_native_parsing_supported(data_access_tokens[0]): - logger.debug( - f"Unsupported native-query data-platform = {data_access_tokens[0]}" - ) - logger.debug( - f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}" - ) - - return Lineage.empty() - - if len(data_access_tokens[0]) < 3: - logger.debug( - f"Server is not available in argument list for data-platform {data_access_tokens[0]}. Returning empty " - "list" - ) - return Lineage.empty() - - self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[ - data_access_tokens[0] - ] - # The First argument is the query - sql_query: str = tree_function.strip_char_from_list( - values=tree_function.remove_whitespaces_from_list( - tree_function.token_values(flat_argument_list[1]) - ), - )[ - 0 - ] # Remove any whitespaces and double quotes character - - server = tree_function.strip_char_from_list([data_access_tokens[2]])[0] - - if self.config.enable_advance_lineage_sql_construct is False: - # Use previous parser to generate URN to keep backward compatibility - return self.create_urn_using_old_parser( - query=sql_query, - server=server, - ) - - database_name: Optional[str] = self.get_db_name(data_access_tokens) - - return self.parse_custom_sql( - query=sql_query, - server=server, - database=database_name, - schema=None, - ) - - -class FunctionName(Enum): - NATIVE_QUERY = "Value.NativeQuery" - POSTGRESQL_DATA_ACCESS = "PostgreSQL.Database" - ORACLE_DATA_ACCESS = "Oracle.Database" - SNOWFLAKE_DATA_ACCESS = "Snowflake.Databases" - MSSQL_DATA_ACCESS = "Sql.Database" - DATABRICK_DATA_ACCESS = "Databricks.Catalogs" - GOOGLE_BIGQUERY_DATA_ACCESS = "GoogleBigQuery.Database" - AMAZON_REDSHIFT_DATA_ACCESS = "AmazonRedshift.Database" - DATABRICK_MULTI_CLOUD_DATA_ACCESS = "DatabricksMultiCloud.Catalogs" - - -class SupportedResolver(Enum): - DATABRICKS_QUERY = ( - DatabrickDataPlatformTableCreator, - FunctionName.DATABRICK_DATA_ACCESS, - ) - - DATABRICKS_MULTI_CLOUD = ( - DatabrickDataPlatformTableCreator, - FunctionName.DATABRICK_MULTI_CLOUD_DATA_ACCESS, - ) - - POSTGRES_SQL = ( - PostgresDataPlatformTableCreator, - FunctionName.POSTGRESQL_DATA_ACCESS, - ) - - ORACLE = ( - OracleDataPlatformTableCreator, - FunctionName.ORACLE_DATA_ACCESS, - ) - - SNOWFLAKE = ( - SnowflakeDataPlatformTableCreator, - FunctionName.SNOWFLAKE_DATA_ACCESS, - ) - - MS_SQL = ( - MSSqlDataPlatformTableCreator, - FunctionName.MSSQL_DATA_ACCESS, - ) - - GOOGLE_BIG_QUERY = ( - GoogleBigQueryDataPlatformTableCreator, - FunctionName.GOOGLE_BIGQUERY_DATA_ACCESS, - ) - - AMAZON_REDSHIFT = ( - AmazonRedshiftDataPlatformTableCreator, - FunctionName.AMAZON_REDSHIFT_DATA_ACCESS, - ) - - NATIVE_QUERY = ( - NativeQueryDataPlatformTableCreator, - FunctionName.NATIVE_QUERY, - ) - - def get_table_full_name_creator(self) -> Type[AbstractDataPlatformTableCreator]: - return self.value[0] - - def get_function_name(self) -> str: - return self.value[1].value - - @staticmethod - def get_function_names() -> List[str]: - functions: List[str] = [] - for supported_resolver in SupportedResolver: - functions.append(supported_resolver.get_function_name()) - - return functions - - @staticmethod - def get_resolver(function_name: str) -> Optional["SupportedResolver"]: - logger.debug(f"Looking for resolver {function_name}") - for supported_resolver in SupportedResolver: - if function_name == supported_resolver.get_function_name(): - return supported_resolver - logger.debug(f"Resolver not found for function_name {function_name}") - return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/validator.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/validator.py index ca2abf97c9f30..b52977aaa41fb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/validator.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/validator.py @@ -1,7 +1,7 @@ import logging from typing import Optional, Tuple -from datahub.ingestion.source.powerbi.m_query import resolver +import datahub.ingestion.source.powerbi.m_query.data_classes logger = logging.getLogger(__name__) @@ -14,12 +14,18 @@ def validate_parse_tree( :param native_query_enabled: Whether user want to extract lineage from native query :return: True or False. """ - function_names = [fun.value for fun in resolver.FunctionName] + function_names = [ + fun.value + for fun in datahub.ingestion.source.powerbi.m_query.data_classes.FunctionName + ] if not any(fun in expression for fun in function_names): return False, "DataAccess function is not present in M-Query expression." if native_query_enabled is False: - if resolver.FunctionName.NATIVE_QUERY.value in function_names: + if ( + datahub.ingestion.source.powerbi.m_query.data_classes.FunctionName.NATIVE_QUERY.value + in function_names + ): return ( False, "Lineage extraction from native query is disabled. Enable native_query_parsing in recipe", diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index cef2d098aebc4..044946a5d308d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -10,6 +10,7 @@ import more_itertools import datahub.emitter.mce_builder as builder +import datahub.ingestion.source.powerbi.m_query.data_classes import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ContainerKey, gen_containers @@ -42,12 +43,13 @@ Constant, PowerBiDashboardSourceConfig, PowerBiDashboardSourceReport, + SupportedDataPlatform, ) from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( AbstractDataPlatformInstanceResolver, create_dataplatform_instance_resolver, ) -from datahub.ingestion.source.powerbi.m_query import parser, resolver +from datahub.ingestion.source.powerbi.m_query import parser from datahub.ingestion.source.powerbi.rest_api_wrapper.powerbi_api import PowerBiAPI from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, @@ -182,7 +184,9 @@ def extract_dataset_schema( return [schema_mcp] def make_fine_grained_lineage_class( - self, lineage: resolver.Lineage, dataset_urn: str + self, + lineage: datahub.ingestion.source.powerbi.m_query.data_classes.Lineage, + dataset_urn: str, ) -> List[FineGrainedLineage]: fine_grained_lineages: List[FineGrainedLineage] = [] @@ -234,7 +238,9 @@ def extract_lineage( upstream: List[UpstreamClass] = [] cll_lineage: List[FineGrainedLineage] = [] - upstream_lineage: List[resolver.Lineage] = parser.get_upstream_tables( + upstream_lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = parser.get_upstream_tables( table=table, reporter=self.__reporter, platform_instance_resolver=self.__dataplatform_instance_resolver, @@ -1294,7 +1300,7 @@ def get_allowed_workspaces(self) -> List[powerbi_data_classes.Workspace]: def validate_dataset_type_mapping(self): powerbi_data_platforms: List[str] = [ data_platform.value.powerbi_data_platform_name - for data_platform in resolver.SupportedDataPlatform + for data_platform in SupportedDataPlatform ] for key in self.source_config.dataset_type_mapping.keys(): @@ -1481,7 +1487,7 @@ def _get_dashboard_patch_work_unit( def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: # As modified_workspaces is not idempotent, hence workunit processors are run later for each workspace_id - # This will result in creating checkpoint for each workspace_id + # This will result in creating a checkpoint for each workspace_id if self.source_config.modified_since: return [] # Handle these in get_workunits_internal else: @@ -1492,7 +1498,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: """ - Datahub Ingestion framework invoke this method + Datahub Ingestion framework invokes this method """ logger.info("PowerBi plugin execution is started") # Validate dataset type mapping diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index f22998b47b900..63821f9038a88 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -7,6 +7,7 @@ import pytest from lark import Tree +import datahub.ingestion.source.powerbi.m_query.data_classes import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import StructuredLogLevel @@ -18,8 +19,11 @@ AbstractDataPlatformInstanceResolver, create_dataplatform_instance_resolver, ) -from datahub.ingestion.source.powerbi.m_query import parser, resolver, tree_function -from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable, Lineage +from datahub.ingestion.source.powerbi.m_query import parser, tree_function +from datahub.ingestion.source.powerbi.m_query.data_classes import ( + DataPlatformTable, + Lineage, +) pytestmark = pytest.mark.integration_batch_2 @@ -62,7 +66,9 @@ ] -def get_data_platform_tables_with_dummy_table(q: str) -> List[resolver.Lineage]: +def get_data_platform_tables_with_dummy_table( + q: str, +) -> List[datahub.ingestion.source.powerbi.m_query.data_classes.Lineage]: table: powerbi_data_classes.Table = powerbi_data_classes.Table( columns=[], measures=[], @@ -759,7 +765,9 @@ def test_sqlglot_parser(): } ) - lineage: List[resolver.Lineage] = parser.get_upstream_tables( + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = parser.get_upstream_tables( table, reporter, ctx=ctx, @@ -806,7 +814,9 @@ def test_sqlglot_parser(): def test_databricks_multi_cloud(): q = M_QUERIES[25] - lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = get_data_platform_tables_with_dummy_table(q=q) assert len(lineage) == 1 @@ -823,7 +833,9 @@ def test_databricks_multi_cloud(): def test_databricks_catalog_pattern_1(): q = M_QUERIES[26] - lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = get_data_platform_tables_with_dummy_table(q=q) assert len(lineage) == 1 @@ -892,7 +904,9 @@ def test_sqlglot_parser_2(): } ) - lineage: List[resolver.Lineage] = parser.get_upstream_tables( + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = parser.get_upstream_tables( table, reporter, ctx=ctx, @@ -951,7 +965,9 @@ def test_databricks_regular_case_with_view(): def test_snowflake_double_double_quotes(): q = M_QUERIES[30] - lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = get_data_platform_tables_with_dummy_table(q=q) assert len(lineage) == 1 @@ -968,7 +984,9 @@ def test_snowflake_double_double_quotes(): def test_databricks_multicloud(): q = M_QUERIES[31] - lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = get_data_platform_tables_with_dummy_table(q=q) assert len(lineage) == 1 @@ -985,7 +1003,9 @@ def test_databricks_multicloud(): def test_snowflake_multi_function_call(): q = M_QUERIES[32] - lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = get_data_platform_tables_with_dummy_table(q=q) assert len(lineage) == 1 @@ -1002,7 +1022,9 @@ def test_snowflake_multi_function_call(): def test_mssql_drop_with_select(): q = M_QUERIES[33] - lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = get_data_platform_tables_with_dummy_table(q=q) assert len(lineage) == 1 @@ -1062,7 +1084,9 @@ def test_empty_string_in_m_query(): # TRIM(TRIM(TRIM(AGENT_NAME, '\"\"'), '+'), '\\'') is in Query q = "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TRIM(TRIM(TRIM(AGENT_NAME, '\"\"'), '+'), '\\'') AS TRIM_AGENT_NAME,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source" - lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = get_data_platform_tables_with_dummy_table(q=q) assert len(lineage) == 1 @@ -1084,7 +1108,9 @@ def test_double_quotes_in_alias(): # SELECT CAST(sales_date AS DATE) AS \"\"Date\"\" in query q = 'let \n Source = Sql.Database("abc.com", "DB", [Query="SELECT CAST(sales_date AS DATE) AS ""Date"",#(lf) SUM(cshintrpret) / 60.0 AS ""Total Order All Items"",#(lf)#(tab)#(tab)#(tab) SUM(cshintrpret) / 60.0 - LAG(SUM(cshintrpret) / 60.0, 1) OVER (ORDER BY CAST(sales_date AS DATE)) AS ""Total minute difference"",#(lf)#(tab)#(tab)#(tab) SUM(sale_price) / 60.0 - LAG(SUM(sale_price) / 60.0, 1) OVER (ORDER BY CAST(sales_date AS DATE)) AS ""Normal minute difference""#(lf) FROM [DB].[dbo].[sales_t]#(lf) WHERE sales_date >= GETDATE() - 365#(lf) GROUP BY CAST(sales_date AS DATE),#(lf)#(tab)#(tab)CAST(sales_date AS TIME);"]) \n in \n Source' - lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q) + lineage: List[ + datahub.ingestion.source.powerbi.m_query.data_classes.Lineage + ] = get_data_platform_tables_with_dummy_table(q=q) assert len(lineage) == 1 diff --git a/metadata-ingestion/tests/unit/test_powerbi_parser.py b/metadata-ingestion/tests/unit/test_powerbi_parser.py index 31579f0c0abd3..a487a3a5b87f8 100644 --- a/metadata-ingestion/tests/unit/test_powerbi_parser.py +++ b/metadata-ingestion/tests/unit/test_powerbi_parser.py @@ -8,9 +8,7 @@ from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( ResolvePlatformInstanceFromDatasetTypeMapping, ) -from datahub.ingestion.source.powerbi.m_query.resolver import ( - MSSqlDataPlatformTableCreator, -) +from datahub.ingestion.source.powerbi.m_query.pattern_handler import MSSqlLineage from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table @@ -27,7 +25,7 @@ def creator(): full_name="db.schema.test_table", ) - return MSSqlDataPlatformTableCreator( + return MSSqlLineage( ctx=PipelineContext(run_id="test-run-id"), table=table, reporter=PowerBiDashboardSourceReport(),