diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index f947a8fa..270bbd7a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -12,7 +12,6 @@ "NODE_VERSION": "none" } }, - // Configure tool-specific properties. "customizations": { // Configure properties specific to VS Code. @@ -43,7 +42,6 @@ } } }, - // Add the IDs of extensions you want installed when the container is created. "extensions": [ "ms-python.python", @@ -51,13 +49,10 @@ ] } }, - // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], - // Use 'postCreateCommand' to run commands after the container is created. // "postCreateCommand": "pip3 install --user -r requirements.txt", - // Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "vscode", "postCreateCommand": "pip install -e . && pip install -r dev-requirements.txt" diff --git a/README.md b/README.md index 91a23073..17c482c9 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,7 @@ Please remember that using plugins may require you to add additional dependencie * `gsheet` depends on `gspread` and `pandas` * `iceberg` depends on `pyiceberg` and Python >= 3.8 * `sqlalchemy` depends on `pandas`, `sqlalchemy`, and the driver(s) you need +* `delta` depends on `deltalake`, [an example project](https://github.com/milicevica23/dbt-duckdb-delta-plugin-demo) #### Using Local Python Modules diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index 8eaabf9c..08fee34f 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -106,11 +106,27 @@ def initialize_db( return conn @classmethod - def initialize_cursor(cls, creds: DuckDBCredentials, cursor): + def initialize_cursor( + cls, + creds: DuckDBCredentials, + cursor, + plugins: Optional[Dict[str, BasePlugin]] = None, + registered_df: dict = {}, + ): for key, value in creds.load_settings().items(): # Okay to set these as strings because DuckDB will cast them # to the correct type cursor.execute(f"SET {key} = '{value}'") + + # update cursor if something is lost in the copy + # of the parent connection + if plugins: + for plugin in plugins.values(): + plugin.configure_cursor(cursor) + + for df_name, df in registered_df.items(): + cursor.register(df_name, df) + return cursor @classmethod diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index f998190a..def00cff 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -53,6 +53,7 @@ def __init__(self, credentials: credentials.DuckDBCredentials): or credentials.path.startswith("md:") or credentials.path.startswith("motherduck:") ) + self._REGISTERED_DF: dict = {} def notify_closed(self): with self.lock: @@ -66,7 +67,10 @@ def handle(self): if self.conn is None: self.conn = self.initialize_db(self.creds, self._plugins) self.handle_count += 1 - cursor = self.initialize_cursor(self.creds, self.conn.cursor()) + + cursor = self.initialize_cursor( + self.creds, self.conn.cursor(), self._plugins, self._REGISTERED_DF + ) return DuckDBConnectionWrapper(cursor, self) def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: @@ -87,6 +91,10 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): plugin = self._plugins[plugin_name] handle = self.handle() cursor = handle.cursor() + + if source_config.schema: + cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {source_config.schema}") + save_mode = source_config.get("save_mode", "overwrite") if save_mode in ("ignore", "error_if_exists"): params = [source_config.schema, source_config.identifier] @@ -106,10 +114,23 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): return df = plugin.load(source_config) assert df is not None - materialization = source_config.meta.get("materialization", "table") + + materialization = source_config.meta.get( + "materialization", plugin.default_materialization() + ) + source_table_name = source_config.table_name() + df_name = source_table_name.replace(".", "_") + "_df" + + cursor.register(df_name, df) + + if materialization == "view": + # save to df instance to register on each cursor creation + self._REGISTERED_DF[df_name] = df + cursor.execute( - f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df" + f"CREATE OR REPLACE {materialization} {source_table_name} AS SELECT * FROM {df_name}" ) + cursor.close() handle.close() diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 4495acf8..2520173b 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -111,3 +111,16 @@ def load(self, source_config: SourceConfig): def store(self, target_config: TargetConfig): raise NotImplementedError(f"store method not implemented for {self.name}") + + def configure_cursor(self, cursor): + """ + Configure each copy of the DuckDB cursor. + This method should be overridden by subclasses to provide additional + attributes to the connection which are lost in the copy of the parent connection. + + :param cursor: A DuckDBPyConnection instance to be configured. + """ + pass + + def default_materialization(self): + return "table" diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py new file mode 100644 index 00000000..c6b0aa2a --- /dev/null +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -0,0 +1,48 @@ +from typing import Any +from typing import Dict + +from deltalake import DeltaTable + +from . import BasePlugin +from ..utils import SourceConfig + + +class Plugin(BasePlugin): + def initialize(self, config: Dict[str, Any]): + pass + + def configure_cursor(self, cursor): + pass + + def load(self, source_config: SourceConfig): + if "delta_table_path" not in source_config: + raise Exception("'delta_table_path' is a required argument for the delta table!") + + table_path = source_config["delta_table_path"] + storage_options = source_config.get("storage_options", None) + + if storage_options: + dt = DeltaTable(table_path, storage_options=storage_options) + else: + dt = DeltaTable(table_path) + + # delta attributes + as_of_version = source_config.get("as_of_version", None) + as_of_datetime = source_config.get("as_of_datetime", None) + + if as_of_version: + dt.load_version(as_of_version) + + if as_of_datetime: + dt.load_with_datetime(as_of_datetime) + + df = dt.to_pyarrow_dataset() + + return df + + def default_materialization(self): + return "view" + + +# Future +# TODO add databricks catalog diff --git a/dev-requirements.txt b/dev-requirements.txt index 7bdcd0eb..837842a5 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -34,3 +34,4 @@ sqlalchemy tox>=3.13 twine wheel +deltalake diff --git a/tests/functional/plugins/test_delta.py b/tests/functional/plugins/test_delta.py new file mode 100644 index 00000000..de0a0b83 --- /dev/null +++ b/tests/functional/plugins/test_delta.py @@ -0,0 +1,133 @@ +import pytest +from pathlib import Path +import pandas as pd +import tempfile + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) +from deltalake.writer import write_deltalake + +delta_schema_yml = """ +version: 2 +sources: + - name: delta_source + meta: + plugin: delta + tables: + - name: table_1 + description: "An delta table" + meta: + delta_table_path: "{test_delta_path1}" + + - name: delta_source_test + schema: test + meta: + plugin: delta + tables: + - name: table_2 + description: "An delta table" + meta: + delta_table_path: "{test_delta_path2}" + as_of_version: 0 +""" + + +delta1_sql = """ + {{ config(materialized='table') }} + select * from {{ source('delta_source', 'table_1') }} +""" +delta2_sql = """ + {{ config(materialized='table') }} + select * from {{ source('delta_source', 'table_1') }} limit 1 +""" +delta3_sql = """ + {{ config(materialized='table') }} + select * as a from {{ source('delta_source_test', 'table_2') }} WHERE y = 'd' +""" + +delta3_sql_expected = """ + select 1 as x, 'a' as y +""" + + +@pytest.mark.skip_profile("buenavista", "md") +class TestPlugins: + @pytest.fixture(scope="class") + def delta_test_table1(self): + td = tempfile.TemporaryDirectory() + path = Path(td.name) + table_path = path / "test_delta_table1" + + df = pd.DataFrame({"x": [1, 2, 3]}) + write_deltalake(table_path, df, mode="overwrite") + + yield table_path + + td.cleanup() + + @pytest.fixture(scope="class") + def delta_test_table2(self): + td = tempfile.TemporaryDirectory() + path = Path(td.name) + table_path = path / "test_delta_table2" + + df = pd.DataFrame({ + "x": [1], + "y": ["a"] + }) + write_deltalake(table_path, df, mode="overwrite") + + df = pd.DataFrame({ + "x": [1, 2], + "y": ["a","b"] + }) + write_deltalake(table_path, df, mode="overwrite") + + yield table_path + + td.cleanup() + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + plugins = [{"module": "delta"}] + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target.get("path", ":memory:"), + "plugins": plugins, + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self, delta_test_table1,delta_test_table2): + return { + "source_schema.yml": delta_schema_yml.format( + test_delta_path1=delta_test_table1, + test_delta_path2=delta_test_table2 + ), + "delta_table1.sql": delta1_sql, + "delta_table2.sql": delta2_sql, + "delta_table3.sql": delta3_sql, + "delta_table3_expected.sql": delta3_sql_expected, + } + + def test_plugins(self, project): + results = run_dbt() + assert len(results) == 4 + + # check_relations_equal( + # project.adapter, + # [ + # "delta_table3", + # "delta_table3_expected", + # ], + # ) + # res = project.run_sql("SELECT count(1) FROM 'delta_table3'", fetch="one") + # assert res[0] == 2