From c56a48e9691261af46868db00c1cdede9ca888dc Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Thu, 7 Nov 2024 18:13:21 +0530 Subject: [PATCH 1/5] * Add support for views --- README.md | 4 ++- setup.py | 4 +-- sqlalchemy_kusto/dbapi.py | 11 +++++--- sqlalchemy_kusto/dialect_base.py | 45 +++++++++++++++++++++++++------- 4 files changed, 48 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index c9842c9..a6ab7f5 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ connection = connect( azure_ad_client_id=kusto_client_id, azure_ad_client_secret=kusto_client_secret, azure_ad_tenant_id=kusto_tenant_id, + dev_mode=False ) result = connection.execute(f"select 1").fetchall() @@ -51,7 +52,8 @@ engine = create_engine( f"kustosql+{kusto_url}/{database_name}?" f"msi=False&azure_ad_client_id={kusto_client_id}&" f"azure_ad_client_secret={kusto_client_secret}&" - f"azure_ad_tenant_id={kusto_tenant_id}" + f"azure_ad_tenant_id={kusto_tenant_id}&" + f"dev_mode=False" ) engine.connect() cursor = engine.execute(f"select top 1") diff --git a/setup.py b/setup.py index cb871b9..53abb4f 100644 --- a/setup.py +++ b/setup.py @@ -2,10 +2,10 @@ NAME = "sqlalchemy-kusto" DESCRIPTION = "Azure Data Explorer (Kusto) dialect for SQLAlchemy" -VERSION = "2.0.1" +VERSION = "2.0.2" REQUIREMENTS = [ - "azure-kusto-data==3.*", + "azure-kusto-data==4.*", "sqlalchemy==1.4.*", "typing-extensions>=3.10", ] diff --git a/sqlalchemy_kusto/dbapi.py b/sqlalchemy_kusto/dbapi.py index f9b2f90..f5a5734 100644 --- a/sqlalchemy_kusto/dbapi.py +++ b/sqlalchemy_kusto/dbapi.py @@ -38,6 +38,7 @@ def connect( azure_ad_client_id: str = None, azure_ad_client_secret: str = None, azure_ad_tenant_id: str = None, + dev_mode: bool = False, ): """Return a connection to the database.""" return Connection( @@ -48,6 +49,7 @@ def connect( azure_ad_client_id, azure_ad_client_secret, azure_ad_tenant_id, + dev_mode, ) @@ -63,12 +65,15 @@ def __init__( azure_ad_client_id: str = None, azure_ad_client_secret: str = None, azure_ad_tenant_id: str = None, + dev_mode: bool = False, ): self.closed = False self.cursors: List[Cursor] = [] kcsb = None - if msi: + if dev_mode: + kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster) + elif msi: # Managed Service Identity (MSI) kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication( cluster, client_id=user_msi @@ -89,7 +94,7 @@ def __init__( @check_closed def close(self): """Close the connection now. Kusto does not require to close the connection.""" - self.closed = True + #self.closed = True for cursor in self.cursors: cursor.close() @@ -157,7 +162,7 @@ def rowcount(self) -> int: @check_closed def close(self): """Closes the cursor.""" - self.closed = True + #self.closed = True @check_closed def execute(self, operation, parameters=None) -> "Cursor": diff --git a/sqlalchemy_kusto/dialect_base.py b/sqlalchemy_kusto/dialect_base.py index 335d602..87426b3 100644 --- a/sqlalchemy_kusto/dialect_base.py +++ b/sqlalchemy_kusto/dialect_base.py @@ -62,6 +62,7 @@ class KustoBaseDialect(default.DefaultDialect, ABC): "azure_ad_client_secret": str, "azure_ad_tenant_id": str, "user_msi": str, + "dev_mode": parse_bool_argument, } @classmethod @@ -102,27 +103,51 @@ def get_columns( .show tables | where TableName == "{table_name}" """ + function_search_query = f""" + .show functions + | where Name == "{table_name}" + """ table_search_result = connection.execute(table_search_query) - entity_type = "table" if table_search_result.rowcount == 1 else "materialized-view" + # Add Functions as View as well. Retrieve the schema of the table + if table_search_result.rowcount == 0: + function_search_result = connection.execute(function_search_query) + if function_search_result.rowcount == 1: + function_schema = f".show function {table_name} schema as json" + query_result = connection.execute(function_schema) + rows = list(query_result) + entity_schema = json.loads(rows[0].Schema) + return [ + self.schema_definition(column) + for column in entity_schema["OutputColumns"] + ] + entity_type = "table" if table_search_result.rowcount == 1 else "materialized-view" query = f".show {entity_type} {table_name} schema as json" query_result = connection.execute(query) rows = list(query_result) entity_schema = json.loads(rows[0].Schema) - return [ - { - "name": column["Name"], - "type": kql_to_sql_types[column["CslType"].lower()], - "nullable": True, - "default": "", - } + self.schema_definition(column) for column in entity_schema["OrderedColumns"] ] + @staticmethod + def schema_definition(column): + return { + "name": column["Name"], + "type": kql_to_sql_types[column["CslType"].lower()], + "nullable": True, + "default": "", + } + def get_view_names(self, connection: Connection, schema: Optional[str] = None, **kwargs) -> List[str]: - result = connection.execute(".show materialized-views | project Name") - return [row.Name for row in result] + mvs = connection.execute(".show materialized-views | project Name") + # Functions are also Views. + # Filtering no input functions specifically here as there is no way to pass parameters today + functions = connection.execute(".show functions | where Parameters =='()' | project Name") + mv = [row.Name for row in mvs] + view = [row.Name for row in functions] + return mv + view def get_pk_constraint(self, connection: Connection, table_name: str, schema: Optional[str] = None, **kw): return {"constrained_columns": [], "name": None} From 7cff6070ac6235882ac6b9b8f32971d52c7367cc Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Thu, 21 Nov 2024 20:02:50 +0530 Subject: [PATCH 2/5] * Add tests for AZCLI and for views --- sqlalchemy_kusto/dbapi.py | 34 ++++++++------------- sqlalchemy_kusto/dialect_base.py | 12 ++------ tests/integration/conftest.py | 14 ++++----- tests/integration/test_dialect_sql.py | 38 ++++++++++++++++-------- tests/integration/test_error_handling.py | 8 +++-- 5 files changed, 52 insertions(+), 54 deletions(-) diff --git a/sqlalchemy_kusto/dbapi.py b/sqlalchemy_kusto/dbapi.py index f5a5734..0a22a3d 100644 --- a/sqlalchemy_kusto/dbapi.py +++ b/sqlalchemy_kusto/dbapi.py @@ -38,19 +38,9 @@ def connect( azure_ad_client_id: str = None, azure_ad_client_secret: str = None, azure_ad_tenant_id: str = None, - dev_mode: bool = False, ): """Return a connection to the database.""" - return Connection( - cluster, - database, - msi, - user_msi, - azure_ad_client_id, - azure_ad_client_secret, - azure_ad_tenant_id, - dev_mode, - ) + return Connection(cluster, database, msi, user_msi, azure_ad_client_id, azure_ad_client_secret, azure_ad_tenant_id) class Connection: @@ -65,20 +55,12 @@ def __init__( azure_ad_client_id: str = None, azure_ad_client_secret: str = None, azure_ad_tenant_id: str = None, - dev_mode: bool = False, ): self.closed = False self.cursors: List[Cursor] = [] kcsb = None - if dev_mode: - kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster) - elif msi: - # Managed Service Identity (MSI) - kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication( - cluster, client_id=user_msi - ) - else: + if azure_ad_client_id and azure_ad_client_secret and azure_ad_tenant_id: # Service Principal auth kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication( connection_string=cluster, @@ -86,6 +68,14 @@ def __init__( app_key=azure_ad_client_secret, authority_id=azure_ad_tenant_id, ) + elif msi: + # Managed Service Identity (MSI) + kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication( + cluster, client_id=user_msi + ) + else: + # neither SP or MSI + kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster) self.kusto_client = KustoClient(kcsb) self.database = database @@ -94,7 +84,7 @@ def __init__( @check_closed def close(self): """Close the connection now. Kusto does not require to close the connection.""" - #self.closed = True + # self.closed = True for cursor in self.cursors: cursor.close() @@ -162,7 +152,7 @@ def rowcount(self) -> int: @check_closed def close(self): """Closes the cursor.""" - #self.closed = True + # self.closed = True @check_closed def execute(self, operation, parameters=None) -> "Cursor": diff --git a/sqlalchemy_kusto/dialect_base.py b/sqlalchemy_kusto/dialect_base.py index 87426b3..fa7e1fa 100644 --- a/sqlalchemy_kusto/dialect_base.py +++ b/sqlalchemy_kusto/dialect_base.py @@ -117,19 +117,13 @@ def get_columns( query_result = connection.execute(function_schema) rows = list(query_result) entity_schema = json.loads(rows[0].Schema) - return [ - self.schema_definition(column) - for column in entity_schema["OutputColumns"] - ] + return [self.schema_definition(column) for column in entity_schema["OutputColumns"]] entity_type = "table" if table_search_result.rowcount == 1 else "materialized-view" query = f".show {entity_type} {table_name} schema as json" query_result = connection.execute(query) rows = list(query_result) entity_schema = json.loads(rows[0].Schema) - return [ - self.schema_definition(column) - for column in entity_schema["OrderedColumns"] - ] + return [self.schema_definition(column) for column in entity_schema["OrderedColumns"]] @staticmethod def schema_definition(column): @@ -145,7 +139,7 @@ def get_view_names(self, connection: Connection, schema: Optional[str] = None, * # Functions are also Views. # Filtering no input functions specifically here as there is no way to pass parameters today functions = connection.execute(".show functions | where Parameters =='()' | project Name") - mv = [row.Name for row in mvs] + mv = [row.Name for row in mvs] view = [row.Name for row in functions] return mv + view diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8167638..dd0d47d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,10 +7,10 @@ registry.register("kustokql.https", "sqlalchemy_kusto.dialect_kql", "KustoKqlHttpsDialect") load_dotenv() -AZURE_AD_CLIENT_ID = os.environ["AZURE_AD_CLIENT_ID"] -AZURE_AD_CLIENT_SECRET = os.environ["AZURE_AD_CLIENT_SECRET"] -AZURE_AD_TENANT_ID = os.environ["AZURE_AD_TENANT_ID"] -KUSTO_URL = os.environ["KUSTO_URL"] -KUSTO_SQL_ALCHEMY_URL = "kustosql+" + os.environ["KUSTO_URL"] -KUSTO_KQL_ALCHEMY_URL = "kustokql+" + os.environ["KUSTO_URL"] -DATABASE = os.environ["DATABASE"] +AZURE_AD_CLIENT_ID = os.environ.get("AZURE_AD_CLIENT_ID", "") +AZURE_AD_CLIENT_SECRET = os.environ.get("AZURE_AD_CLIENT_SECRET", "") +AZURE_AD_TENANT_ID = os.environ.get("AZURE_AD_TENANT_ID", "") +KUSTO_URL = os.environ.get("KUSTO_URL", "https://sdktestcluster.southeastasia.dev.kusto.windows.net") +KUSTO_SQL_ALCHEMY_URL = "kustosql+" + KUSTO_URL +KUSTO_KQL_ALCHEMY_URL = "kustokql+" + KUSTO_URL +DATABASE = os.environ.get("DATABASE", "e2e") diff --git a/tests/integration/test_dialect_sql.py b/tests/integration/test_dialect_sql.py index 4566c6a..878027d 100644 --- a/tests/integration/test_dialect_sql.py +++ b/tests/integration/test_dialect_sql.py @@ -32,6 +32,10 @@ def test_get_table_names(temp_table_name): result = engine.dialect.get_table_names(conn) assert temp_table_name in result +def test_get_view_names(temp_table_name): + conn = engine.connect() + result = engine.dialect.get_view_names(conn) + assert f"{temp_table_name}_fn" in result def test_get_columns(temp_table_name): conn = engine.connect() @@ -77,19 +81,28 @@ def test_limit(temp_table_name): assert result_length == 5 -def _create_temp_table(table_name: str): - kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication( - KUSTO_URL, AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, AZURE_AD_TENANT_ID +def get_kcsb(): + return ( + KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_URL) + if not AZURE_AD_CLIENT_ID and not AZURE_AD_CLIENT_SECRET and not AZURE_AD_TENANT_ID + else KustoConnectionStringBuilder.with_aad_application_key_authentication( + KUSTO_URL, AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, AZURE_AD_TENANT_ID + ) ) - client = KustoClient(kcsb) + + +def _create_temp_table(table_name: str): + client = KustoClient(get_kcsb()) response = client.execute(DATABASE, f".create table {table_name}(Id: int, Text: string)", ClientRequestProperties()) +def _create_temp_fn(fn_name: str): + client = KustoClient(get_kcsb()) + response = client.execute(DATABASE, f".create function {fn_name}() {{ print now()}}", ClientRequestProperties()) + + def _ingest_data_to_table(table_name: str): - kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication( - KUSTO_URL, AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, AZURE_AD_TENANT_ID - ) - client = KustoClient(kcsb) + client = KustoClient(get_kcsb()) data_to_ingest = {i: "value_" + str(i) for i in range(1, 10)} str_data = "\n".join("{},{}".format(*p) for p in data_to_ingest.items()) ingest_query = f""".ingest inline into table {table_name} <| @@ -98,12 +111,10 @@ def _ingest_data_to_table(table_name: str): def _drop_table(table_name: str): - kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication( - KUSTO_URL, AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, AZURE_AD_TENANT_ID - ) - client = KustoClient(kcsb) + client = KustoClient(get_kcsb()) - response = client.execute(DATABASE, f".drop table {table_name}", ClientRequestProperties()) + _ = client.execute(DATABASE, f".drop table {table_name}", ClientRequestProperties()) + _ = client.execute(DATABASE, f".drop function {table_name}_fn", ClientRequestProperties()) @pytest.fixture() @@ -114,6 +125,7 @@ def temp_table_name(): @pytest.fixture(autouse=True) def run_around_tests(temp_table_name): _create_temp_table(temp_table_name) + _create_temp_fn(f"{temp_table_name}_fn") _ingest_data_to_table(temp_table_name) # A test function will be run at this point yield temp_table_name diff --git a/tests/integration/test_error_handling.py b/tests/integration/test_error_handling.py index 4fd7073..7673aaa 100644 --- a/tests/integration/test_error_handling.py +++ b/tests/integration/test_error_handling.py @@ -2,15 +2,17 @@ import sqlalchemy from sqlalchemy import create_engine -from tests.integration.conftest import AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, DATABASE, KUSTO_SQL_ALCHEMY_URL +from tests.integration.conftest import DATABASE, KUSTO_SQL_ALCHEMY_URL def test_operational_error(): wrong_tenant_id = "wrong_tenant_id" + azure_ad_client_id="x" + azure_ad_client_secret="x" engine = create_engine( f"{KUSTO_SQL_ALCHEMY_URL}/{DATABASE}?" - f"msi=False&azure_ad_client_id={AZURE_AD_CLIENT_ID}&" - f"azure_ad_client_secret={AZURE_AD_CLIENT_SECRET}&" + f"msi=False&azure_ad_client_id={azure_ad_client_id}&" + f"azure_ad_client_secret={azure_ad_client_secret}&" f"azure_ad_tenant_id={wrong_tenant_id}" ) engine.connect() From 77e7f24c9f70d89cb6373a8e8df161ea3c967e51 Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Thu, 21 Nov 2024 20:05:58 +0530 Subject: [PATCH 3/5] * Add connector name in KCSB --- sqlalchemy_kusto/dbapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlalchemy_kusto/dbapi.py b/sqlalchemy_kusto/dbapi.py index 0a22a3d..71ba521 100644 --- a/sqlalchemy_kusto/dbapi.py +++ b/sqlalchemy_kusto/dbapi.py @@ -76,7 +76,7 @@ def __init__( else: # neither SP or MSI kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster) - + kcsb._set_connector_details("sqlalchemy-kusto", "0.1.0") # pylint: disable=protected-access self.kusto_client = KustoClient(kcsb) self.database = database self.properties = ClientRequestProperties() From 6ebc7ba86cb587b6debf4ff3005d27e8b4b72253 Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Thu, 21 Nov 2024 21:10:14 +0530 Subject: [PATCH 4/5] * Make URL and Database options --- tests/integration/conftest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index dd0d47d..50e0f3c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -10,7 +10,7 @@ AZURE_AD_CLIENT_ID = os.environ.get("AZURE_AD_CLIENT_ID", "") AZURE_AD_CLIENT_SECRET = os.environ.get("AZURE_AD_CLIENT_SECRET", "") AZURE_AD_TENANT_ID = os.environ.get("AZURE_AD_TENANT_ID", "") -KUSTO_URL = os.environ.get("KUSTO_URL", "https://sdktestcluster.southeastasia.dev.kusto.windows.net") +KUSTO_URL = os.environ["KUSTO_URL"] KUSTO_SQL_ALCHEMY_URL = "kustosql+" + KUSTO_URL KUSTO_KQL_ALCHEMY_URL = "kustokql+" + KUSTO_URL -DATABASE = os.environ.get("DATABASE", "e2e") +DATABASE = os.environ["DATABASE"] From 07a779ffed709493f517065cae2fc4f3a292d1b7 Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Fri, 22 Nov 2024 14:32:10 +0530 Subject: [PATCH 5/5] * Run lint and fix --- sqlalchemy_kusto/dialect_base.py | 6 +++--- tests/integration/test_dialect_sql.py | 2 ++ tests/integration/test_error_handling.py | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sqlalchemy_kusto/dialect_base.py b/sqlalchemy_kusto/dialect_base.py index fa7e1fa..ce874ba 100644 --- a/sqlalchemy_kusto/dialect_base.py +++ b/sqlalchemy_kusto/dialect_base.py @@ -135,13 +135,13 @@ def schema_definition(column): } def get_view_names(self, connection: Connection, schema: Optional[str] = None, **kwargs) -> List[str]: - mvs = connection.execute(".show materialized-views | project Name") + materialized_views = connection.execute(".show materialized-views | project Name") # Functions are also Views. # Filtering no input functions specifically here as there is no way to pass parameters today functions = connection.execute(".show functions | where Parameters =='()' | project Name") - mv = [row.Name for row in mvs] + materialized_view = [row.Name for row in materialized_views] view = [row.Name for row in functions] - return mv + view + return materialized_view + view def get_pk_constraint(self, connection: Connection, table_name: str, schema: Optional[str] = None, **kw): return {"constrained_columns": [], "name": None} diff --git a/tests/integration/test_dialect_sql.py b/tests/integration/test_dialect_sql.py index 878027d..07168d2 100644 --- a/tests/integration/test_dialect_sql.py +++ b/tests/integration/test_dialect_sql.py @@ -32,11 +32,13 @@ def test_get_table_names(temp_table_name): result = engine.dialect.get_table_names(conn) assert temp_table_name in result + def test_get_view_names(temp_table_name): conn = engine.connect() result = engine.dialect.get_view_names(conn) assert f"{temp_table_name}_fn" in result + def test_get_columns(temp_table_name): conn = engine.connect() columns_result = engine.dialect.get_columns(conn, temp_table_name) diff --git a/tests/integration/test_error_handling.py b/tests/integration/test_error_handling.py index 7673aaa..38079e5 100644 --- a/tests/integration/test_error_handling.py +++ b/tests/integration/test_error_handling.py @@ -7,8 +7,8 @@ def test_operational_error(): wrong_tenant_id = "wrong_tenant_id" - azure_ad_client_id="x" - azure_ad_client_secret="x" + azure_ad_client_id = "x" + azure_ad_client_secret = "x" engine = create_engine( f"{KUSTO_SQL_ALCHEMY_URL}/{DATABASE}?" f"msi=False&azure_ad_client_id={azure_ad_client_id}&"