From 3f267afc9743d341c35dc1032de0fc0f53655203 Mon Sep 17 00:00:00 2001 From: sid-acryl <155424659+sid-acryl@users.noreply.github.com> Date: Thu, 21 Nov 2024 01:01:54 +0530 Subject: [PATCH] fix(ingest/powerbi): m-query fixes (#11906) Co-authored-by: Harshal Sheth Co-authored-by: Aseem Bansal --- .../powerbi/m_query/native_sql_parser.py | 20 +++- .../source/powerbi/m_query/resolver.py | 94 ++++++++++--------- .../powerbi/test_native_sql_parser.py | 10 ++ 3 files changed, 78 insertions(+), 46 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py index 61b1164825257..63a6073c90a1a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py @@ -72,10 +72,24 @@ def get_tables(native_query: str) -> List[str]: def remove_drop_statement(query: str) -> str: # Certain PowerBI M-Queries contain a combination of DROP and SELECT statements within SQL, causing SQLParser to fail on these queries. # Therefore, these occurrences are being removed. - # Regular expression to match patterns like "DROP TABLE IF EXISTS #;" - pattern = r"DROP TABLE IF EXISTS #\w+;?" - return re.sub(pattern, "", query) + patterns = [ + # Regular expression to match patterns like: + # "DROP TABLE IF EXISTS #;" + # "DROP TABLE IF EXISTS #, , ...;" + # "DROP TABLE IF EXISTS #, , ...\n" + r"DROP\s+TABLE\s+IF\s+EXISTS\s+(?:#?\w+(?:,\s*#?\w+)*)[;\n]", + ] + + new_query = query + + for pattern in patterns: + new_query = re.sub(pattern, "", new_query, flags=re.IGNORECASE) + + # Remove extra spaces caused by consecutive replacements + new_query = re.sub(r"\s+", " ", new_query).strip() + + return new_query def parse_custom_sql( diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index 9eafde2f75ecd..32de95d6bd015 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -83,6 +83,16 @@ def urn_creator( ) +def get_next_item(items: List[str], item: str) -> Optional[str]: + if item in items: + try: + index = items.index(item) + return items[index + 1] + except IndexError: + logger.debug(f'item:"{item}", not found in item-list: {items}') + return None + + class AbstractDataPlatformTableCreator(ABC): """ Base class to share common functionalities among different dataplatform for M-Query parsing. @@ -675,7 +685,7 @@ def two_level_access_pattern( data_access_func_detail.arg_list ) if server is None or db_name is None: - return Lineage.empty() # Return empty list + return Lineage.empty() # Return an empty list schema_name: str = cast( IdentifierAccessor, data_access_func_detail.identifier_accessor @@ -782,32 +792,38 @@ def create_lineage( ), ) - if len(arguments) == 2: - # It is a regular case of MS-SQL - logger.debug("Handling with regular case") - return self.two_level_access_pattern(data_access_func_detail) - - if len(arguments) >= 4 and arguments[2] != "Query": - logger.debug("Unsupported case is found. Second index is not the Query") - return Lineage.empty() + server, database = self.get_db_detail_from_argument( + data_access_func_detail.arg_list + ) + if server is None or database is None: + return Lineage.empty() # Return an empty list + + assert server + assert database # to silent the lint + + query: Optional[str] = get_next_item(arguments, "Query") + if query: + if self.config.enable_advance_lineage_sql_construct is False: + # Use previous parser to generate URN to keep backward compatibility + return Lineage( + upstreams=self.create_urn_using_old_parser( + query=query, + db_name=database, + server=server, + ), + column_lineage=[], + ) - if self.config.enable_advance_lineage_sql_construct is False: - # Use previous parser to generate URN to keep backward compatibility - return Lineage( - upstreams=self.create_urn_using_old_parser( - query=arguments[3], - db_name=arguments[1], - server=arguments[0], - ), - column_lineage=[], + return self.parse_custom_sql( + query=query, + database=database, + server=server, + schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA, ) - return self.parse_custom_sql( - query=arguments[3], - database=arguments[1], - server=arguments[0], - schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA, - ) + # It is a regular case of MS-SQL + logger.debug("Handling with regular case") + return self.two_level_access_pattern(data_access_func_detail) class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator): @@ -1154,27 +1170,19 @@ def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]: != SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name ): return None - try: - if "Database" in data_access_tokens: - index = data_access_tokens.index("Database") - if data_access_tokens[index + 1] != Constant.M_QUERY_NULL: - # Database name is explicitly set in argument - return data_access_tokens[index + 1] - if "Name" in data_access_tokens: - index = data_access_tokens.index("Name") - # Next element is value of the Name. It is a database name - return data_access_tokens[index + 1] + database: Optional[str] = get_next_item(data_access_tokens, "Database") - if "Catalog" in data_access_tokens: - index = data_access_tokens.index("Catalog") - # Next element is value of the Catalog. In Databricks Catalog can also be used in place of a database. - return data_access_tokens[index + 1] - - except IndexError as e: - logger.debug("Database name is not available", exc_info=e) - - return None + if ( + database and database != Constant.M_QUERY_NULL + ): # database name is explicitly set + return database + + return get_next_item( # database name is set in Name argument + data_access_tokens, "Name" + ) or get_next_item( # If both above arguments are not available, then try Catalog + data_access_tokens, "Catalog" + ) def create_lineage( self, data_access_func_detail: DataAccessFunctionDetail diff --git a/metadata-ingestion/tests/integration/powerbi/test_native_sql_parser.py b/metadata-ingestion/tests/integration/powerbi/test_native_sql_parser.py index 53e184515c1d8..887f7fe4d6f44 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_native_sql_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_native_sql_parser.py @@ -19,3 +19,13 @@ def test_simple_from(): assert len(tables) == 1 assert tables[0] == "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4" + + +def test_drop_statement(): + expected: str = "SELECT#(lf)concat((UPPER(REPLACE(SELLER,'-',''))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,'-',''))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4" + + query: str = "DROP TABLE IF EXISTS #table1; DROP TABLE IF EXISTS #table1,#table2; DROP TABLE IF EXISTS table1; DROP TABLE IF EXISTS table1, #table2;SELECT#(lf)concat((UPPER(REPLACE(SELLER,'-',''))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,'-',''))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4" + + actual: str = native_sql_parser.remove_drop_statement(query) + + assert actual == expected