Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

* Add support for views #20

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ connection = connect(
azure_ad_client_id=kusto_client_id,
azure_ad_client_secret=kusto_client_secret,
azure_ad_tenant_id=kusto_tenant_id,
dev_mode=False
)

result = connection.execute(f"select 1").fetchall()
Expand All @@ -51,7 +52,8 @@ engine = create_engine(
f"kustosql+{kusto_url}/{database_name}?"
f"msi=False&azure_ad_client_id={kusto_client_id}&"
f"azure_ad_client_secret={kusto_client_secret}&"
f"azure_ad_tenant_id={kusto_tenant_id}"
f"azure_ad_tenant_id={kusto_tenant_id}&"
f"dev_mode=False"
)
engine.connect()
cursor = engine.execute(f"select top 1")
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
DESCRIPTION = "Azure Data Explorer (Kusto) dialect for SQLAlchemy"
VERSION = "3.0.0"


REQUIREMENTS = [
"azure-kusto-data==3.*",
"azure-kusto-data==4.*",
"sqlalchemy==1.4.*",
"typing-extensions>=3.10",
]
Expand Down
31 changes: 13 additions & 18 deletions sqlalchemy_kusto/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,7 @@ def connect(
azure_ad_tenant_id: str = None,
):
"""Return a connection to the database."""
return Connection(
cluster,
database,
msi,
user_msi,
azure_ad_client_id,
azure_ad_client_secret,
azure_ad_tenant_id,
)
return Connection(cluster, database, msi, user_msi, azure_ad_client_id, azure_ad_client_secret, azure_ad_tenant_id)


class Connection:
Expand All @@ -68,28 +60,31 @@ def __init__(
self.cursors: List[Cursor] = []
kcsb = None

if msi:
# Managed Service Identity (MSI)
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(
cluster, client_id=user_msi
)
else:
if azure_ad_client_id and azure_ad_client_secret and azure_ad_tenant_id:
# Service Principal auth
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
connection_string=cluster,
aad_app_id=azure_ad_client_id,
app_key=azure_ad_client_secret,
authority_id=azure_ad_tenant_id,
)

elif msi:
# Managed Service Identity (MSI)
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(
cluster, client_id=user_msi
)
else:
# neither SP or MSI
kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster)
kcsb._set_connector_details("sqlalchemy-kusto", "0.1.0") # pylint: disable=protected-access
self.kusto_client = KustoClient(kcsb)
self.database = database
self.properties = ClientRequestProperties()

@check_closed
def close(self):
"""Close the connection now. Kusto does not require to close the connection."""
self.closed = True
# self.closed = True
for cursor in self.cursors:
cursor.close()

Expand Down Expand Up @@ -157,7 +152,7 @@ def rowcount(self) -> int:
@check_closed
def close(self):
"""Closes the cursor."""
self.closed = True
# self.closed = True

@check_closed
def execute(self, operation, parameters=None) -> "Cursor":
Expand Down
45 changes: 32 additions & 13 deletions sqlalchemy_kusto/dialect_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class KustoBaseDialect(default.DefaultDialect, ABC):
"azure_ad_client_secret": str,
"azure_ad_tenant_id": str,
"user_msi": str,
"dev_mode": parse_bool_argument,
}

@classmethod
Expand Down Expand Up @@ -102,27 +103,45 @@ def get_columns(
.show tables
| where TableName == "{table_name}"
"""
function_search_query = f"""
.show functions
| where Name == "{table_name}"
"""
table_search_result = connection.execute(table_search_query)
entity_type = "table" if table_search_result.rowcount == 1 else "materialized-view"

# Add Functions as View as well. Retrieve the schema of the table
if table_search_result.rowcount == 0:
function_search_result = connection.execute(function_search_query)
if function_search_result.rowcount == 1:
function_schema = f".show function {table_name} schema as json"
query_result = connection.execute(function_schema)
rows = list(query_result)
entity_schema = json.loads(rows[0].Schema)
return [self.schema_definition(column) for column in entity_schema["OutputColumns"]]
entity_type = "table" if table_search_result.rowcount == 1 else "materialized-view"
query = f".show {entity_type} {table_name} schema as json"
query_result = connection.execute(query)
rows = list(query_result)
entity_schema = json.loads(rows[0].Schema)

return [
{
"name": column["Name"],
"type": kql_to_sql_types[column["CslType"].lower()],
"nullable": True,
"default": "",
}
for column in entity_schema["OrderedColumns"]
]
return [self.schema_definition(column) for column in entity_schema["OrderedColumns"]]

@staticmethod
def schema_definition(column):
return {
"name": column["Name"],
"type": kql_to_sql_types[column["CslType"].lower()],
"nullable": True,
"default": "",
}

def get_view_names(self, connection: Connection, schema: Optional[str] = None, **kwargs) -> List[str]:
result = connection.execute(".show materialized-views | project Name")
return [row.Name for row in result]
materialized_views = connection.execute(".show materialized-views | project Name")
# Functions are also Views.
# Filtering no input functions specifically here as there is no way to pass parameters today
functions = connection.execute(".show functions | where Parameters =='()' | project Name")
materialized_view = [row.Name for row in materialized_views]
view = [row.Name for row in functions]
return materialized_view + view

def get_pk_constraint(self, connection: Connection, table_name: str, schema: Optional[str] = None, **kw):
return {"constrained_columns": [], "name": None}
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
registry.register("kustokql.https", "sqlalchemy_kusto.dialect_kql", "KustoKqlHttpsDialect")

load_dotenv()
AZURE_AD_CLIENT_ID = os.environ["AZURE_AD_CLIENT_ID"]
AZURE_AD_CLIENT_SECRET = os.environ["AZURE_AD_CLIENT_SECRET"]
AZURE_AD_TENANT_ID = os.environ["AZURE_AD_TENANT_ID"]
AZURE_AD_CLIENT_ID = os.environ.get("AZURE_AD_CLIENT_ID", "")
AZURE_AD_CLIENT_SECRET = os.environ.get("AZURE_AD_CLIENT_SECRET", "")
AZURE_AD_TENANT_ID = os.environ.get("AZURE_AD_TENANT_ID", "")
KUSTO_URL = os.environ["KUSTO_URL"]
KUSTO_SQL_ALCHEMY_URL = "kustosql+" + os.environ["KUSTO_URL"]
KUSTO_KQL_ALCHEMY_URL = "kustokql+" + os.environ["KUSTO_URL"]
KUSTO_SQL_ALCHEMY_URL = "kustosql+" + KUSTO_URL
KUSTO_KQL_ALCHEMY_URL = "kustokql+" + KUSTO_URL
DATABASE = os.environ["DATABASE"]
40 changes: 27 additions & 13 deletions tests/integration/test_dialect_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def test_get_table_names(temp_table_name):
assert temp_table_name in result


def test_get_view_names(temp_table_name):
conn = engine.connect()
result = engine.dialect.get_view_names(conn)
assert f"{temp_table_name}_fn" in result


def test_get_columns(temp_table_name):
conn = engine.connect()
columns_result = engine.dialect.get_columns(conn, temp_table_name)
Expand Down Expand Up @@ -77,19 +83,28 @@ def test_limit(temp_table_name):
assert result_length == 5


def _create_temp_table(table_name: str):
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
KUSTO_URL, AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, AZURE_AD_TENANT_ID
def get_kcsb():
return (
KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_URL)
if not AZURE_AD_CLIENT_ID and not AZURE_AD_CLIENT_SECRET and not AZURE_AD_TENANT_ID
else KustoConnectionStringBuilder.with_aad_application_key_authentication(
KUSTO_URL, AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, AZURE_AD_TENANT_ID
)
)
client = KustoClient(kcsb)


def _create_temp_table(table_name: str):
client = KustoClient(get_kcsb())
response = client.execute(DATABASE, f".create table {table_name}(Id: int, Text: string)", ClientRequestProperties())


def _create_temp_fn(fn_name: str):
client = KustoClient(get_kcsb())
response = client.execute(DATABASE, f".create function {fn_name}() {{ print now()}}", ClientRequestProperties())


def _ingest_data_to_table(table_name: str):
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
KUSTO_URL, AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, AZURE_AD_TENANT_ID
)
client = KustoClient(kcsb)
client = KustoClient(get_kcsb())
data_to_ingest = {i: "value_" + str(i) for i in range(1, 10)}
str_data = "\n".join("{},{}".format(*p) for p in data_to_ingest.items())
ingest_query = f""".ingest inline into table {table_name} <|
Expand All @@ -98,12 +113,10 @@ def _ingest_data_to_table(table_name: str):


def _drop_table(table_name: str):
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
KUSTO_URL, AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, AZURE_AD_TENANT_ID
)
client = KustoClient(kcsb)
client = KustoClient(get_kcsb())

response = client.execute(DATABASE, f".drop table {table_name}", ClientRequestProperties())
_ = client.execute(DATABASE, f".drop table {table_name}", ClientRequestProperties())
_ = client.execute(DATABASE, f".drop function {table_name}_fn", ClientRequestProperties())


@pytest.fixture()
Expand All @@ -114,6 +127,7 @@ def temp_table_name():
@pytest.fixture(autouse=True)
def run_around_tests(temp_table_name):
_create_temp_table(temp_table_name)
_create_temp_fn(f"{temp_table_name}_fn")
_ingest_data_to_table(temp_table_name)
# A test function will be run at this point
yield temp_table_name
Expand Down
8 changes: 5 additions & 3 deletions tests/integration/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
import sqlalchemy
from sqlalchemy import create_engine

from tests.integration.conftest import AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET, DATABASE, KUSTO_SQL_ALCHEMY_URL
from tests.integration.conftest import DATABASE, KUSTO_SQL_ALCHEMY_URL


def test_operational_error():
wrong_tenant_id = "wrong_tenant_id"
azure_ad_client_id = "x"
azure_ad_client_secret = "x"
engine = create_engine(
f"{KUSTO_SQL_ALCHEMY_URL}/{DATABASE}?"
f"msi=False&azure_ad_client_id={AZURE_AD_CLIENT_ID}&"
f"azure_ad_client_secret={AZURE_AD_CLIENT_SECRET}&"
f"msi=False&azure_ad_client_id={azure_ad_client_id}&"
f"azure_ad_client_secret={azure_ad_client_secret}&"
f"azure_ad_tenant_id={wrong_tenant_id}"
)
engine.connect()
Expand Down
Loading