From 9aba0a5ecb5073a885eeeb07a1034a52802734c9 Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Sat, 14 Oct 2023 00:50:51 -0400 Subject: [PATCH 1/6] Adding Snowflake Support for EvaDB --- .../databases/snowflake/__init__.py | 15 ++ .../databases/snowflake/requirements.txt | 3 + .../databases/snowflake/snowflake_handler.py | 182 ++++++++++++++++++ 3 files changed, 200 insertions(+) create mode 100644 evadb/third_party/databases/snowflake/__init__.py create mode 100644 evadb/third_party/databases/snowflake/requirements.txt create mode 100644 evadb/third_party/databases/snowflake/snowflake_handler.py diff --git a/evadb/third_party/databases/snowflake/__init__.py b/evadb/third_party/databases/snowflake/__init__.py new file mode 100644 index 0000000000..c881047fe1 --- /dev/null +++ b/evadb/third_party/databases/snowflake/__init__.py @@ -0,0 +1,15 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""snowflake integrations""" diff --git a/evadb/third_party/databases/snowflake/requirements.txt b/evadb/third_party/databases/snowflake/requirements.txt new file mode 100644 index 0000000000..c366baf101 --- /dev/null +++ b/evadb/third_party/databases/snowflake/requirements.txt @@ -0,0 +1,3 @@ +snowflake-connector-python +pyarrow +pandas \ No newline at end of file diff --git a/evadb/third_party/databases/snowflake/snowflake_handler.py b/evadb/third_party/databases/snowflake/snowflake_handler.py new file mode 100644 index 0000000000..8e17928ef7 --- /dev/null +++ b/evadb/third_party/databases/snowflake/snowflake_handler.py @@ -0,0 +1,182 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pandas as pd +import snowflake.connector +import datetime + +from evadb.third_party.databases.types import ( + DBHandler, + DBHandlerResponse, + DBHandlerStatus, +) + +class SnowFlakeDbHandler(DBHandler): + + """ + Class for implementing the SnowFlake DB handler as a backend store for + EvaDB. + """ + + def __init__(self, name: str, **kwargs): + """ + Initialize the handler. + Args: + name (str): name of the DB handler instance + **kwargs: arbitrary keyword arguments for establishing the connection. + """ + super().__init__(name) + self.user = kwargs.get("user") + self.password = kwargs.get("password") + self.database = kwargs.get("database") + self.warehouse = kwargs.get("warehouse") + self.account = kwargs.get("account") + self.schema = kwargs.get("schema") + + def connect(self): + """ + Establish connection to the database. + Returns: + DBHandlerStatus + """ + try: + self.connection = snowflake.connector.connect( + user=self.user, + password=self.password, + database=self.database, + warehouse=self.warehouse, + schema=self.schema, + account=self.account + ) + # Auto commit is off by default. + self.connection.autocommit = True + return DBHandlerStatus(status=True) + except snowflake.connector.errors.Error as e: + return DBHandlerStatus(status=False, error=str(e)) + + def disconnect(self): + """ + Disconnect from the database. + """ + if self.connection: + self.connection.close() + + def check_connection(self) -> DBHandlerStatus: + """ + Method for checking the status of database connection. + Returns: + DBHandlerStatus + """ + if self.connection: + return DBHandlerStatus(status=True) + else: + return DBHandlerStatus(status=False, error="Not connected to the database.") + + def get_tables(self) -> DBHandlerResponse: + """ + Method to get the list of tables from database. + Returns: + DBHandlerStatus + """ + if not self.connection: + return DBHandlerResponse(data=None, error="Not connected to the database.") + + try: + query = f"SELECT table_name as 'table_name' FROM information_schema.tables WHERE table_schema='{self.schema}'" + cursor = self.connection.cursor() + cursor.execute(query) + tables_df = self._fetch_results_as_df(cursor) + return DBHandlerResponse(data=tables_df) + except snowflake.connector.errors.Error as e: + return DBHandlerResponse(data=None, error=str(e)) + + def get_columns(self, table_name: str) -> DBHandlerResponse: + """ + Method to retrieve the columns of the specified table from the database. + Args: + table_name (str): name of the table whose columns are to be retrieved. + Returns: + DBHandlerStatus + """ + if not self.connection: + return DBHandlerResponse(data=None, error="Not connected to the database.") + + try: + query = f"SELECT column_name as 'name', data_type as 'dtype' FROM information_schema.columns WHERE table_name='{table_name}'" + cursor = self.connection.cursor() + cursor.execute(query) + columns_df = self._fetch_results_as_df(cursor) + columns_df["dtype"] = columns_df["dtype"].apply( + self._snowflake_to_python_types + ) + return DBHandlerResponse(data=columns_df) + except snowflake.connector.errors.Error as e: + return DBHandlerResponse(data=None, error=str(e)) + + def _fetch_results_as_df(self, cursor): + """ + Fetch results from the cursor for the executed query and return the + query results as dataframe. + """ + try: + res = cursor.fetchall() + res_df = pd.DataFrame( + res, columns=[desc[0].lower() for desc in cursor.description] + ) + return res_df + except snowflake.connector.errors.ProgrammingError as e: + if str(e) == "no results to fetch": + return pd.DataFrame({"status": ["success"]}) + raise e + + def execute_native_query(self, query_string: str) -> DBHandlerResponse: + """ + Executes the native query on the database. + Args: + query_string (str): query in native format + Returns: + DBHandlerResponse + """ + if not self.connection: + return DBHandlerResponse(data=None, error="Not connected to the database.") + + try: + cursor = self.connection.cursor() + cursor.execute(query_string) + return DBHandlerResponse(data=self._fetch_results_as_df(cursor)) + except snowflake.connector.errors.Error as e: + return DBHandlerResponse(data=None, error=str(e)) + + def _snowflake_to_python_types(self, snowflake_type: str): + mapping = { + "TEXT": str, + "NUMBER": int, + "INT": int, + "DECIMAL": float, + "STRING": str, + "CHAR": str, + "BOOLEAN": bool, + "BINARY": bytes, + "DATE": datetime.date, + "TIME": datetime.time, + "TIMESTAMP": datetime.datetime + # Add more mappings as needed + } + + if snowflake_type in mapping: + return mapping[snowflake_type] + else: + raise Exception( + f"Unsupported column {snowflake_type} encountered in the snowflake. Please raise a feature request!" + ) \ No newline at end of file From ca5b78e8e2ac2d352f616a2b7d9947ac83f06115 Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Sun, 15 Oct 2023 02:40:46 -0400 Subject: [PATCH 2/6] Added Unit Test Case for Snowflake Integration --- evadb/third_party/databases/interface.py | 2 + .../third_party_tests/test_native_executor.py | 19 +++ .../test_snowflake_native_storage_engine.py | 140 ++++++++++++++++++ 3 files changed, 161 insertions(+) create mode 100644 test/unit_tests/storage/test_snowflake_native_storage_engine.py diff --git a/evadb/third_party/databases/interface.py b/evadb/third_party/databases/interface.py index 5e30dc8220..e4cd86151c 100644 --- a/evadb/third_party/databases/interface.py +++ b/evadb/third_party/databases/interface.py @@ -44,6 +44,8 @@ def _get_database_handler(engine: str, **kwargs): return mod.MariaDbHandler(engine, **kwargs) elif engine == "clickhouse": return mod.ClickHouseHandler(engine, **kwargs) + elif engine == "snowflake": + return mod.SnowFlakeDbHandler(engine, **kwargs) elif engine == "github": return mod.GithubHandler(engine, **kwargs) else: diff --git a/test/third_party_tests/test_native_executor.py b/test/third_party_tests/test_native_executor.py index 40647328b1..30c7392c6e 100644 --- a/test/third_party_tests/test_native_executor.py +++ b/test/third_party_tests/test_native_executor.py @@ -228,6 +228,25 @@ def test_should_run_query_in_clickhouse(self): self._execute_native_query() self._execute_evadb_query() + def test_should_run_query_in_snowflake(self): + # Create database. + params = { + "user": "eva", + "password": "password", + "account": "account_number", + "database": "EVADB", + "schema": "SAMPLE_DATA", + "warehouse": "warehouse", + } + query = f"""CREATE DATABASE test_data_source + WITH ENGINE = "snowflake", + PARAMETERS = {params};""" + execute_query_fetch_all(self.evadb, query) + + # Test executions. + self._execute_native_query() + self._execute_evadb_query() + def test_should_run_query_in_sqlite(self): # Create database. diff --git a/test/unit_tests/storage/test_snowflake_native_storage_engine.py b/test/unit_tests/storage/test_snowflake_native_storage_engine.py new file mode 100644 index 0000000000..82a9c3b842 --- /dev/null +++ b/test/unit_tests/storage/test_snowflake_native_storage_engine.py @@ -0,0 +1,140 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from test.util import get_evadb_for_testing +from unittest.mock import patch + +import pytest + +from evadb.catalog.models.utils import DatabaseCatalogEntry +from evadb.server.command_handler import execute_query_fetch_all + + +class NativeQueryResponse: + def __init__(self): + self.error = None + self.data = None + + +@pytest.mark.notparallel +class SnowFlakeNativeStorageEngineTest(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_snowflake_params(self): + return { + "database": "evadb.db", + } + + def setUp(self): + connection_params = self.get_snowflake_params() + self.evadb = get_evadb_for_testing() + + # Create all class level patches + self.get_database_catalog_entry_patcher = patch( + "evadb.catalog.catalog_manager.CatalogManager.get_database_catalog_entry" + ) + self.get_database_catalog_entry_mock = ( + self.get_database_catalog_entry_patcher.start() + ) + + self.execute_native_query_patcher = patch( + "evadb.third_party.databases.snowflake.snowflake_handler.SnowFlakeDbHandler.execute_native_query" + ) + self.execute_native_query_mock = self.execute_native_query_patcher.start() + + self.connect_patcher = patch( + "evadb.third_party.databases.snowflake.snowflake_handler.SnowFlakeDbHandler.connect" + ) + self.connect_mock = self.connect_patcher.start() + + self.disconnect_patcher = patch( + "evadb.third_party.databases.snowflake.snowflake_handler.SnowFlakeDbHandler.disconnect" + ) + self.disconnect_mock = self.disconnect_patcher.start() + + # set return values + self.execute_native_query_mock.return_value = NativeQueryResponse() + self.get_database_catalog_entry_mock.return_value = DatabaseCatalogEntry( + name="test_data_source", + engine="snowflake", + params=connection_params, + row_id=1, + ) + + def tearDown(self): + self.get_database_catalog_entry_patcher.stop() + self.execute_native_query_patcher.stop() + self.connect_patcher.stop() + self.disconnect_patcher.stop() + + def test_execute_snowflake_select_query(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + SELECT * FROM test_table + }""", + ) + + self.connect_mock.assert_called_once() + self.execute_native_query_mock.assert_called_once() + self.get_database_catalog_entry_mock.assert_called_once() + self.disconnect_mock.assert_called_once() + + def test_execute_snowflake_insert_query(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + INSERT INTO test_table ( + name, age, comment + ) VALUES ( + 'val', 5, 'testing' + ) + }""", + ) + self.connect_mock.assert_called_once() + self.execute_native_query_mock.assert_called_once() + self.get_database_catalog_entry_mock.assert_called_once() + self.disconnect_mock.assert_called_once() + + def test_execute_snowflake_update_query(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + UPDATE test_table + SET comment = 'update' + WHERE age > 5 + }""", + ) + + self.connect_mock.assert_called_once() + self.execute_native_query_mock.assert_called_once() + self.get_database_catalog_entry_mock.assert_called_once() + self.disconnect_mock.assert_called_once() + + def test_execute_snowflake_delete_query(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + DELETE FROM test_table + WHERE age < 5 + }""", + ) + + self.connect_mock.assert_called_once() + self.execute_native_query_mock.assert_called_once() + self.get_database_catalog_entry_mock.assert_called_once() + self.disconnect_mock.assert_called_once() From 2c3ab74805d01a2578556185287a52a63dc60e80 Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Mon, 16 Oct 2023 20:31:39 -0400 Subject: [PATCH 3/6] Fixed Unit Test Cases for Snowflake Integration --- .../storage/test_snowflake_native_storage_engine.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/unit_tests/storage/test_snowflake_native_storage_engine.py b/test/unit_tests/storage/test_snowflake_native_storage_engine.py index 82a9c3b842..328d152c9a 100644 --- a/test/unit_tests/storage/test_snowflake_native_storage_engine.py +++ b/test/unit_tests/storage/test_snowflake_native_storage_engine.py @@ -13,15 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys import unittest from test.util import get_evadb_for_testing from unittest.mock import patch +from unittest.mock import MagicMock + import pytest from evadb.catalog.models.utils import DatabaseCatalogEntry from evadb.server.command_handler import execute_query_fetch_all +sys.modules["snowflake"] = MagicMock() +sys.modules["snowflake.connector"] = MagicMock() + class NativeQueryResponse: def __init__(self): From aef8d2e7a59ead6b05efd8d428ef7f490db46dd5 Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Tue, 17 Oct 2023 02:41:22 -0400 Subject: [PATCH 4/6] Fix linter issue --- .../third_party/databases/snowflake/snowflake_handler.py | 8 +++++--- .../storage/test_snowflake_native_storage_engine.py | 4 +--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/evadb/third_party/databases/snowflake/snowflake_handler.py b/evadb/third_party/databases/snowflake/snowflake_handler.py index da671db575..0b5ba4553d 100644 --- a/evadb/third_party/databases/snowflake/snowflake_handler.py +++ b/evadb/third_party/databases/snowflake/snowflake_handler.py @@ -12,9 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime + import pandas as pd import snowflake.connector -import datetime from evadb.third_party.databases.types import ( DBHandler, @@ -22,6 +23,7 @@ DBHandlerStatus, ) + class SnowFlakeDbHandler(DBHandler): """ @@ -57,7 +59,7 @@ def connect(self): database=self.database, warehouse=self.warehouse, schema=self.schema, - account=self.account + account=self.account, ) # Auto commit is off by default. self.connection.autocommit = True @@ -179,4 +181,4 @@ def _snowflake_to_python_types(self, snowflake_type: str): else: raise Exception( f"Unsupported column {snowflake_type} encountered in the snowflake. Please raise a feature request!" - ) \ No newline at end of file + ) diff --git a/test/unit_tests/storage/test_snowflake_native_storage_engine.py b/test/unit_tests/storage/test_snowflake_native_storage_engine.py index 328d152c9a..50fcea23b3 100644 --- a/test/unit_tests/storage/test_snowflake_native_storage_engine.py +++ b/test/unit_tests/storage/test_snowflake_native_storage_engine.py @@ -16,9 +16,7 @@ import sys import unittest from test.util import get_evadb_for_testing -from unittest.mock import patch -from unittest.mock import MagicMock - +from unittest.mock import MagicMock, patch import pytest From 83ba8f68a6247701f4cd8dca586f1fc5f13157f5 Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Thu, 19 Oct 2023 20:52:20 -0400 Subject: [PATCH 5/6] Added Documentation and Skipped Integration test for Snowflake --- docs/_toc.yml | 1 + docs/source/reference/databases/snowflake.rst | 47 +++++++++++++++++++ .../third_party_tests/test_native_executor.py | 4 +- 3 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 docs/source/reference/databases/snowflake.rst diff --git a/docs/_toc.yml b/docs/_toc.yml index a8639dec31..58d5e6d250 100644 --- a/docs/_toc.yml +++ b/docs/_toc.yml @@ -71,6 +71,7 @@ parts: - file: source/reference/databases/mariadb - file: source/reference/databases/clickhouse - file: source/reference/databases/github + - file: source/reference/databases/snowflake - file: source/reference/vector_databases/index title: Vector Databases diff --git a/docs/source/reference/databases/snowflake.rst b/docs/source/reference/databases/snowflake.rst new file mode 100644 index 0000000000..c4f0e9e55a --- /dev/null +++ b/docs/source/reference/databases/snowflake.rst @@ -0,0 +1,47 @@ +Snowflake +========== + +The connection to Snowflake is based on the `snowflake-connector-python `_ library. + +Dependency +---------- + +* snowflake-connector-python + +Parameters +---------- + +Required: + +* `user` is the database user. +* `password` is the snowflake account password. +* `database` is the database name. +* `warehouse` is the snowflake warehouse name. +* `account` is the snowflake account number ( can be found in the url ). +* `schema` is the schema name. + + +.. warning:: + + Provide the parameters of an already running ``Snowflake`` Data Warehouse. EvaDB only connects to an existing ``Snowflake`` Data Warehouse. + +Create Connection +----------------- + +.. code-block:: text + + CREATE DATABASE snowflake_data WITH ENGINE = 'snowflake', PARAMETERS = { + "user": "", + "password": "" + "account": "", + "database": "EVADB", + "warehouse": "COMPUTE_WH", + "schema": "SAMPLE_DATA" + }; + +.. warning:: + + In Snowflake Terminology, ``Database`` and ``Schema`` refer to the following. + A database is a logical grouping of schemas. Each database belongs to a single Snowflake account. + A schema is a logical grouping of database objects (tables, views, etc.). Each schema belongs to a single database. + diff --git a/test/third_party_tests/test_native_executor.py b/test/third_party_tests/test_native_executor.py index 30c7392c6e..879435f866 100644 --- a/test/third_party_tests/test_native_executor.py +++ b/test/third_party_tests/test_native_executor.py @@ -228,6 +228,9 @@ def test_should_run_query_in_clickhouse(self): self._execute_native_query() self._execute_evadb_query() + @pytest.mark.skip( + reason="Snowflake does not come with a free version of account, so integration test is not feasible" + ) def test_should_run_query_in_snowflake(self): # Create database. params = { @@ -247,7 +250,6 @@ def test_should_run_query_in_snowflake(self): self._execute_native_query() self._execute_evadb_query() - def test_should_run_query_in_sqlite(self): # Create database. import os From 2073d7c82b7bdac6997e15c0075f5f06d6739038 Mon Sep 17 00:00:00 2001 From: Lohith K S Date: Thu, 19 Oct 2023 21:08:47 -0400 Subject: [PATCH 6/6] Minor Modifications for the documentation --- docs/source/reference/databases/snowflake.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/reference/databases/snowflake.rst b/docs/source/reference/databases/snowflake.rst index c4f0e9e55a..239389e186 100644 --- a/docs/source/reference/databases/snowflake.rst +++ b/docs/source/reference/databases/snowflake.rst @@ -41,7 +41,7 @@ Create Connection .. warning:: - In Snowflake Terminology, ``Database`` and ``Schema`` refer to the following. - A database is a logical grouping of schemas. Each database belongs to a single Snowflake account. - A schema is a logical grouping of database objects (tables, views, etc.). Each schema belongs to a single database. + | In Snowflake Terminology, ``Database`` and ``Schema`` refer to the following. + | A database is a logical grouping of schemas. Each database belongs to a single Snowflake account. + | A schema is a logical grouping of database objects (tables, views, etc.). Each schema belongs to a single database.