From 2914aedb13fa82d08b81200c069aa269a0c78a49 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Sun, 1 Oct 2023 12:49:20 +0000 Subject: [PATCH 01/22] add delta read plugin --- dbt/adapters/duckdb/plugins/delta.py | 37 ++++++++++++++++++++++++++++ dev-requirements.txt | 1 + 2 files changed, 38 insertions(+) create mode 100644 dbt/adapters/duckdb/plugins/delta.py diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py new file mode 100644 index 00000000..c7b5f851 --- /dev/null +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -0,0 +1,37 @@ +from typing import Any +from typing import Dict +import duckdb + +from deltalake import DeltaTable + +from . import BasePlugin +from ..utils import SourceConfig + + +class Plugin(BasePlugin): + def initialize(self, config: Dict[str, Any]): + pass + + def load(self, source_config: SourceConfig): + if "delta_table_path" not in source_config: + raise Exception("'delta_table_path' is a required argument for the delta table!") + + table_path = source_config["delta_table_path"] + + pruning_filter = source_config.get("pruning_filter", "1=1") + pruning_projection = source_config.get("pruning_projection", "*") + + ##TODO check if table is there and path is ok + + dt = DeltaTable(table_path) + df_db = duckdb.arrow(dt.to_pyarrow_table()) + df_db_pruned = df_db.filter(pruning_filter).project(pruning_projection) + + print(df_db_pruned.explain()) + return df_db_pruned + +#TODO each node calls plugin.load indipendent which is maybe overhead? + +#Future +#TODO add time travel; add deltalake storage options +#TODO add databricks catalog \ No newline at end of file diff --git a/dev-requirements.txt b/dev-requirements.txt index 7bdcd0eb..837842a5 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -34,3 +34,4 @@ sqlalchemy tox>=3.13 twine wheel +deltalake From 14558bc3c8f64d1c5c7e3ce39094ff42c23b2ba7 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Sun, 1 Oct 2023 21:24:17 +0000 Subject: [PATCH 02/22] add time travel; remote storage (should test) --- dbt/adapters/duckdb/plugins/delta.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index c7b5f851..4fdeda2c 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -10,20 +10,35 @@ class Plugin(BasePlugin): def initialize(self, config: Dict[str, Any]): + #place for init catalog in the future pass def load(self, source_config: SourceConfig): if "delta_table_path" not in source_config: raise Exception("'delta_table_path' is a required argument for the delta table!") - + print(source_config) table_path = source_config["delta_table_path"] + storage_options = source_config.get("storage",None) + + if storage_options: + dt = DeltaTable(table_path, storage_options) + else: + dt = DeltaTable(table_path) + + #delta attributes + as_of_version = source_config.get("as_of_version", None) + as_of_datetime = source_config.get("as_of_datetime", None) + if as_of_version: + dt.load_version(1) + + if as_of_datetime: + dt.load_with_datetime(as_of_datetime) + + #prunning attributes pruning_filter = source_config.get("pruning_filter", "1=1") pruning_projection = source_config.get("pruning_projection", "*") - ##TODO check if table is there and path is ok - - dt = DeltaTable(table_path) df_db = duckdb.arrow(dt.to_pyarrow_table()) df_db_pruned = df_db.filter(pruning_filter).project(pruning_projection) @@ -33,5 +48,5 @@ def load(self, source_config: SourceConfig): #TODO each node calls plugin.load indipendent which is maybe overhead? #Future -#TODO add time travel; add deltalake storage options +#TODO add deltalake storage options #TODO add databricks catalog \ No newline at end of file From f2d0aa74b70cbb839559fac5107456ceb279ddbf Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Wed, 4 Oct 2023 11:28:23 +0000 Subject: [PATCH 03/22] add simple test showcase --- .vscode/launch.json | 18 ++++++ tests/functional/plugins/test_delta.py | 87 ++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 .vscode/launch.json create mode 100644 tests/functional/plugins/test_delta.py diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..88331b37 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,18 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Module", + "type": "python", + "request": "launch", + "module": "pytest", + "args": [ + "/workspaces/dbt-duckdb/tests/functional/plugins/test_delta.py" + ], + "justMyCode": true + } + ] +} \ No newline at end of file diff --git a/tests/functional/plugins/test_delta.py b/tests/functional/plugins/test_delta.py new file mode 100644 index 00000000..a11332f2 --- /dev/null +++ b/tests/functional/plugins/test_delta.py @@ -0,0 +1,87 @@ +import os +import pytest +import sqlite3 +from pathlib import Path +import shutil +import pandas as pd + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) +from deltalake.writer import write_deltalake + +delta_schema_yml = """ +version: 2 +sources: + - name: delta_source + schema: main + meta: + plugin: delta + tables: + - name: table_1 + description: "An delta table" + meta: + materialization: "view" + delta_table_path: "{test_delta_path}" +""" + + +delta1_sql = """ + {{ config(materialized='table') }} + select * from {{ source('delta_source', 'table_1') }} +""" + +# plugin_sql = """ +# {{ config(materialized='external', plugin='cfp', key='value') }} +# select foo() as foo +# """ + +# # Reads from a MD database in my test account in the cloud +# md_sql = """ +# select * FROM plugin_test.main.plugin_table +# """ + + +@pytest.mark.skip_profile("buenavista", "md") +class TestPlugins: + @pytest.fixture(scope="class") + def delta_test_table(self): + path = Path("/tmp/test_delta") + table_path = path / "test_delta_table" + + df = pd.DataFrame({"x": [1, 2, 3]}) + write_deltalake(table_path, df, mode="overwrite") + + yield table_path + + shutil.rmtree(table_path) + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + plugins = [{"module": "delta"}] + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target.get("path", ":memory:"), + "plugins": plugins, + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self, delta_test_table): + return { + "source_schema.yml": delta_schema_yml.format( + test_delta_path=delta_test_table + ), + "delta_table.sql": delta1_sql, + } + + def test_plugins(self, project): + results = run_dbt() + assert len(results) == 1 From b2e9e76c527784dab1409eaf625bc408903476f2 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Wed, 4 Oct 2023 11:35:37 +0000 Subject: [PATCH 04/22] add notebook with delta+conn testing --- dbt/adapters/duckdb/plugins/delta.py | 34 ++++++----- test_delta.ipynb | 87 ++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 15 deletions(-) create mode 100644 test_delta.ipynb diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 4fdeda2c..ff89aca3 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -2,51 +2,55 @@ from typing import Dict import duckdb -from deltalake import DeltaTable +from deltalake import DeltaTable from . import BasePlugin from ..utils import SourceConfig +from dbt.logger import GLOBAL_LOGGER as logger class Plugin(BasePlugin): def initialize(self, config: Dict[str, Any]): - #place for init catalog in the future + # place for init catalog in the future pass def load(self, source_config: SourceConfig): if "delta_table_path" not in source_config: - raise Exception("'delta_table_path' is a required argument for the delta table!") - print(source_config) + raise Exception( + "'delta_table_path' is a required argument for the delta table!" + ) + logger.debug(source_config) table_path = source_config["delta_table_path"] - storage_options = source_config.get("storage",None) - - if storage_options: + storage_options = source_config.get("storage", None) + + if storage_options: dt = DeltaTable(table_path, storage_options) else: dt = DeltaTable(table_path) - #delta attributes + # delta attributes as_of_version = source_config.get("as_of_version", None) as_of_datetime = source_config.get("as_of_datetime", None) if as_of_version: dt.load_version(1) - + if as_of_datetime: dt.load_with_datetime(as_of_datetime) - #prunning attributes + # prunning attributes pruning_filter = source_config.get("pruning_filter", "1=1") pruning_projection = source_config.get("pruning_projection", "*") df_db = duckdb.arrow(dt.to_pyarrow_table()) df_db_pruned = df_db.filter(pruning_filter).project(pruning_projection) - print(df_db_pruned.explain()) + logger.debug(df_db_pruned.explain()) return df_db_pruned -#TODO each node calls plugin.load indipendent which is maybe overhead? -#Future -#TODO add deltalake storage options -#TODO add databricks catalog \ No newline at end of file +# TODO each node calls plugin.load indipendent which is maybe overhead? + +# Future +# TODO add deltalake storage options +# TODO add databricks catalog diff --git a/test_delta.ipynb b/test_delta.ipynb new file mode 100644 index 00000000..d6b95b3d --- /dev/null +++ b/test_delta.ipynb @@ -0,0 +1,87 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "┌───────┐\n", + "│ x │\n", + "│ int64 │\n", + "├───────┤\n", + "│ 1 │\n", + "│ 2 │\n", + "│ 3 │\n", + "└───────┘\n", + "\n" + ] + }, + { + "ename": "CatalogException", + "evalue": "Catalog Error: Table with name test_materialized does not exist!\nDid you mean \"sqlite_master\"?", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mCatalogException\u001b[0m Traceback (most recent call last)", + "\u001b[1;32m/workspaces/dbt-duckdb/test_delta.ipynb Cell 1\u001b[0m line \u001b[0;36m1\n\u001b[1;32m 14\u001b[0m \u001b[39m# df not accessable\u001b[39;00m\n\u001b[1;32m 15\u001b[0m conn\u001b[39m.\u001b[39msql(\u001b[39m\"\u001b[39m\u001b[39mSELECT * FROM test_materialized\u001b[39m\u001b[39m\"\u001b[39m)\u001b[39m.\u001b[39mshow()\n\u001b[0;32m---> 16\u001b[0m conn1\u001b[39m.\u001b[39;49msql(\u001b[39m\"\u001b[39;49m\u001b[39mSELECT * FROM test_materialized\u001b[39;49m\u001b[39m\"\u001b[39;49m)\u001b[39m.\u001b[39mshow()\n\u001b[1;32m 17\u001b[0m conn\u001b[39m.\u001b[39mclose()\n", + "\u001b[0;31mCatalogException\u001b[0m: Catalog Error: Table with name test_materialized does not exist!\nDid you mean \"sqlite_master\"?" + ] + } + ], + "source": [ + "import duckdb\n", + "from deltalake import DeltaTable\n", + "conn = duckdb.connect(\":memory:\")\n", + "conn1 = conn.cursor()\n", + "\n", + "def create_view(conn):\n", + " delta_path = \"/tmp/test_delta/test_delta_table\"\n", + " dt = DeltaTable(delta_path)\n", + " dataset = dt.to_pyarrow_dataset()\n", + " conn.register(\"test_materialized\",dataset)\n", + " return conn\n", + "\n", + "conn = create_view(conn)\n", + "\n", + "conn.sql(\"SELECT * FROM test_materialized\").show()\n", + "conn1.sql(\"SELECT * FROM test_materialized\").show()\n", + "\n", + "conn.close()\n", + "conn1.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.17" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} From d3366ebff34107b98c1995123ef6c1d2262103ec Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Wed, 4 Oct 2023 11:41:02 +0000 Subject: [PATCH 05/22] make duplicate connection example simple --- test_delta.ipynb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test_delta.ipynb b/test_delta.ipynb index d6b95b3d..3d66a70f 100644 --- a/test_delta.ipynb +++ b/test_delta.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 4, + "execution_count": 6, "metadata": {}, "outputs": [ { @@ -27,7 +27,7 @@ "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mCatalogException\u001b[0m Traceback (most recent call last)", - "\u001b[1;32m/workspaces/dbt-duckdb/test_delta.ipynb Cell 1\u001b[0m line \u001b[0;36m1\n\u001b[1;32m 14\u001b[0m \u001b[39m# df not accessable\u001b[39;00m\n\u001b[1;32m 15\u001b[0m conn\u001b[39m.\u001b[39msql(\u001b[39m\"\u001b[39m\u001b[39mSELECT * FROM test_materialized\u001b[39m\u001b[39m\"\u001b[39m)\u001b[39m.\u001b[39mshow()\n\u001b[0;32m---> 16\u001b[0m conn1\u001b[39m.\u001b[39;49msql(\u001b[39m\"\u001b[39;49m\u001b[39mSELECT * FROM test_materialized\u001b[39;49m\u001b[39m\"\u001b[39;49m)\u001b[39m.\u001b[39mshow()\n\u001b[1;32m 17\u001b[0m conn\u001b[39m.\u001b[39mclose()\n", + "\u001b[1;32m/workspaces/dbt-duckdb/test_delta.ipynb Cell 1\u001b[0m line \u001b[0;36m1\n\u001b[1;32m 12\u001b[0m conn \u001b[39m=\u001b[39m create_view(conn)\n\u001b[1;32m 14\u001b[0m conn\u001b[39m.\u001b[39msql(\u001b[39m\"\u001b[39m\u001b[39mSELECT * FROM test_materialized\u001b[39m\u001b[39m\"\u001b[39m)\u001b[39m.\u001b[39mshow()\n\u001b[0;32m---> 15\u001b[0m conn1\u001b[39m.\u001b[39;49msql(\u001b[39m\"\u001b[39;49m\u001b[39mSELECT * FROM test_materialized\u001b[39;49m\u001b[39m\"\u001b[39;49m)\u001b[39m.\u001b[39mshow()\n\u001b[1;32m 17\u001b[0m conn\u001b[39m.\u001b[39mclose()\n\u001b[1;32m 18\u001b[0m conn1\u001b[39m.\u001b[39mclose()\n", "\u001b[0;31mCatalogException\u001b[0m: Catalog Error: Table with name test_materialized does not exist!\nDid you mean \"sqlite_master\"?" ] } @@ -35,14 +35,14 @@ "source": [ "import duckdb\n", "from deltalake import DeltaTable\n", + "import pandas as pd\n", + "\n", "conn = duckdb.connect(\":memory:\")\n", "conn1 = conn.cursor()\n", "\n", "def create_view(conn):\n", - " delta_path = \"/tmp/test_delta/test_delta_table\"\n", - " dt = DeltaTable(delta_path)\n", - " dataset = dt.to_pyarrow_dataset()\n", - " conn.register(\"test_materialized\",dataset)\n", + " df = pd.DataFrame({\"x\": [1, 2, 3]})\n", + " conn.register(\"test_materialized\",df)\n", " return conn\n", "\n", "conn = create_view(conn)\n", From 919b233af9dc56b910cd6ae0268d8aad956c37aa Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Wed, 4 Oct 2023 11:55:47 +0000 Subject: [PATCH 06/22] simplify test_delta --- test_delta.ipynb | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/test_delta.ipynb b/test_delta.ipynb index 3d66a70f..ac728a6d 100644 --- a/test_delta.ipynb +++ b/test_delta.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 6, + "execution_count": 9, "metadata": {}, "outputs": [ { @@ -27,7 +27,7 @@ "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mCatalogException\u001b[0m Traceback (most recent call last)", - "\u001b[1;32m/workspaces/dbt-duckdb/test_delta.ipynb Cell 1\u001b[0m line \u001b[0;36m1\n\u001b[1;32m 12\u001b[0m conn \u001b[39m=\u001b[39m create_view(conn)\n\u001b[1;32m 14\u001b[0m conn\u001b[39m.\u001b[39msql(\u001b[39m\"\u001b[39m\u001b[39mSELECT * FROM test_materialized\u001b[39m\u001b[39m\"\u001b[39m)\u001b[39m.\u001b[39mshow()\n\u001b[0;32m---> 15\u001b[0m conn1\u001b[39m.\u001b[39;49msql(\u001b[39m\"\u001b[39;49m\u001b[39mSELECT * FROM test_materialized\u001b[39;49m\u001b[39m\"\u001b[39;49m)\u001b[39m.\u001b[39mshow()\n\u001b[1;32m 17\u001b[0m conn\u001b[39m.\u001b[39mclose()\n\u001b[1;32m 18\u001b[0m conn1\u001b[39m.\u001b[39mclose()\n", + "\u001b[1;32m/workspaces/dbt-duckdb/test_delta.ipynb Cell 1\u001b[0m line \u001b[0;36m1\n\u001b[1;32m 8\u001b[0m conn\u001b[39m.\u001b[39mregister(\u001b[39m\"\u001b[39m\u001b[39mtest_materialized\u001b[39m\u001b[39m\"\u001b[39m,df)\n\u001b[1;32m 9\u001b[0m conn\u001b[39m.\u001b[39msql(\u001b[39m\"\u001b[39m\u001b[39mSELECT * FROM test_materialized\u001b[39m\u001b[39m\"\u001b[39m)\u001b[39m.\u001b[39mshow()\n\u001b[0;32m---> 10\u001b[0m conn1\u001b[39m.\u001b[39;49msql(\u001b[39m\"\u001b[39;49m\u001b[39mSELECT * FROM test_materialized\u001b[39;49m\u001b[39m\"\u001b[39;49m)\u001b[39m.\u001b[39mshow()\n\u001b[1;32m 12\u001b[0m conn\u001b[39m.\u001b[39mclose()\n\u001b[1;32m 13\u001b[0m conn1\u001b[39m.\u001b[39mclose()\n", "\u001b[0;31mCatalogException\u001b[0m: Catalog Error: Table with name test_materialized does not exist!\nDid you mean \"sqlite_master\"?" ] } @@ -39,14 +39,8 @@ "\n", "conn = duckdb.connect(\":memory:\")\n", "conn1 = conn.cursor()\n", - "\n", - "def create_view(conn):\n", - " df = pd.DataFrame({\"x\": [1, 2, 3]})\n", - " conn.register(\"test_materialized\",df)\n", - " return conn\n", - "\n", - "conn = create_view(conn)\n", - "\n", + "df = pd.DataFrame({\"x\": [1, 2, 3]})\n", + "conn.register(\"test_materialized\",df)\n", "conn.sql(\"SELECT * FROM test_materialized\").show()\n", "conn1.sql(\"SELECT * FROM test_materialized\").show()\n", "\n", From b6a67c19ca5c21d8a72e1678a4a581f4c7324893 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Fri, 6 Oct 2023 20:11:25 +0000 Subject: [PATCH 07/22] try to register df to localsession --- dbt/adapters/duckdb/environments/local.py | 37 +++++++++++++--- test_delta.ipynb | 54 +++++++++-------------- 2 files changed, 54 insertions(+), 37 deletions(-) diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index f998190a..bdc2e0fe 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -6,6 +6,8 @@ from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError +_REGISTERED_DF = {} + class DuckDBCursorWrapper: def __init__(self, cursor): @@ -16,6 +18,9 @@ def __getattr__(self, name): return getattr(self._cursor, name) def execute(self, sql, bindings=None): + # register_df(self._cursor, _REGISTERED_DF) + for df_name, df in _REGISTERED_DF.items(): + self._cursor.register(df_name, df) try: if bindings is None: return self._cursor.execute(sql) @@ -24,6 +29,9 @@ def execute(self, sql, bindings=None): except RuntimeError as e: raise DbtRuntimeError(str(e)) + def register_df(self, df_name, df): + self._cursor.register(df_name, df) + class DuckDBConnectionWrapper: def __init__(self, cursor, env): @@ -67,9 +75,12 @@ def handle(self): self.conn = self.initialize_db(self.creds, self._plugins) self.handle_count += 1 cursor = self.initialize_cursor(self.creds, self.conn.cursor()) + # register_df(cursor, self._registered_df) return DuckDBConnectionWrapper(cursor, self) - def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: + def submit_python_job( + self, handle, parsed_model: dict, compiled_code: str + ) -> AdapterResponse: con = handle.cursor() def ldf(table_name): @@ -100,20 +111,31 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): params.append(source_config.database) if cursor.execute(q, params).fetchone()[0]: if save_mode == "error_if_exists": - raise Exception(f"Source {source_config.table_name()} already exists!") + raise Exception( + f"Source {source_config.table_name()} already exists!" + ) else: # Nothing to do (we ignore the existing table) return df = plugin.load(source_config) + df_name = source_config.identifier + "_df" assert df is not None - materialization = source_config.meta.get("materialization", "table") + if plugin_name == "delta": + _REGISTERED_DF[df_name] = df + cursor.register_df(df_name, df) + materialization = source_config.meta.get("materialization", "view") + else: + materialization = source_config.meta.get("materialization", "table") + cursor.execute( - f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df" + f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM {df_name}" ) cursor.close() handle.close() - def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None: + def store_relation( + self, plugin_name: str, target_config: utils.TargetConfig + ) -> None: if plugin_name not in self._plugins: if plugin_name.startswith("glue|"): from ..plugins import glue @@ -137,3 +159,8 @@ def close(self): def __del__(self): self.close() + + +def register_df(cursor, df_dict): + for df_name, df in df_dict.items(): + cursor.register(df_name, df) diff --git a/test_delta.ipynb b/test_delta.ipynb index ac728a6d..85927e31 100644 --- a/test_delta.ipynb +++ b/test_delta.ipynb @@ -2,34 +2,18 @@ "cells": [ { "cell_type": "code", - "execution_count": 9, + "execution_count": 22, "metadata": {}, "outputs": [ { - "name": "stdout", - "output_type": "stream", - "text": [ - "┌───────┐\n", - "│ x │\n", - "│ int64 │\n", - "├───────┤\n", - "│ 1 │\n", - "│ 2 │\n", - "│ 3 │\n", - "└───────┘\n", - "\n" - ] - }, - { - "ename": "CatalogException", - "evalue": "Catalog Error: Table with name test_materialized does not exist!\nDid you mean \"sqlite_master\"?", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mCatalogException\u001b[0m Traceback (most recent call last)", - "\u001b[1;32m/workspaces/dbt-duckdb/test_delta.ipynb Cell 1\u001b[0m line \u001b[0;36m1\n\u001b[1;32m 8\u001b[0m conn\u001b[39m.\u001b[39mregister(\u001b[39m\"\u001b[39m\u001b[39mtest_materialized\u001b[39m\u001b[39m\"\u001b[39m,df)\n\u001b[1;32m 9\u001b[0m conn\u001b[39m.\u001b[39msql(\u001b[39m\"\u001b[39m\u001b[39mSELECT * FROM test_materialized\u001b[39m\u001b[39m\"\u001b[39m)\u001b[39m.\u001b[39mshow()\n\u001b[0;32m---> 10\u001b[0m conn1\u001b[39m.\u001b[39;49msql(\u001b[39m\"\u001b[39;49m\u001b[39mSELECT * FROM test_materialized\u001b[39;49m\u001b[39m\"\u001b[39;49m)\u001b[39m.\u001b[39mshow()\n\u001b[1;32m 12\u001b[0m conn\u001b[39m.\u001b[39mclose()\n\u001b[1;32m 13\u001b[0m conn1\u001b[39m.\u001b[39mclose()\n", - "\u001b[0;31mCatalogException\u001b[0m: Catalog Error: Table with name test_materialized does not exist!\nDid you mean \"sqlite_master\"?" - ] + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" } ], "source": [ @@ -38,14 +22,20 @@ "import pandas as pd\n", "\n", "conn = duckdb.connect(\":memory:\")\n", - "conn1 = conn.cursor()\n", + "\n", "df = pd.DataFrame({\"x\": [1, 2, 3]})\n", - "conn.register(\"test_materialized\",df)\n", - "conn.sql(\"SELECT * FROM test_materialized\").show()\n", - "conn1.sql(\"SELECT * FROM test_materialized\").show()\n", + "conn.execute('BEGIN')\n", + "conn.execute('create schema if not exists \"memory\".\"test16965229211977761792_test_delta\"')\n", + "conn.execute('COMMIT')\n", + "conn1 = conn.cursor() \n", + "conn1.register(\"table_1_df\",df)\n", + "conn1.execute('CREATE OR REPLACE view memory.main.table_1 AS SELECT * FROM table_1_df')\n", "\n", - "conn.close()\n", - "conn1.close()" + "conn.execute('BEGIN')\n", + "conn.execute('drop schema if exists \"memory\".\"test16965229211977761792_test_delta\" cascade')\n", + "conn.execute('COMMIT')\n", + "#conn.execute(\"SELECT * FROM test_materialized\")\n", + "#conn1.execute(\"SELECT * FROM test_materialized\").show()" ] }, { @@ -72,7 +62,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.17" + "version": "3.11.4" }, "orig_nbformat": 4 }, From ddda9190ae21a5ddc88c08aaf923e6683c647d42 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Fri, 6 Oct 2023 22:59:10 +0000 Subject: [PATCH 08/22] add df registration --- .devcontainer/devcontainer.json | 9 ++----- dbt/adapters/duckdb/environments/local.py | 23 ++++++++--------- dbt/adapters/duckdb/plugins/delta.py | 17 +++---------- test_delta.ipynb | 30 ++++++++++++++++++++++- tests/functional/plugins/test_delta.py | 11 --------- 5 files changed, 44 insertions(+), 46 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index f947a8fa..6aa82b4a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -7,12 +7,11 @@ // Update 'VARIANT' to pick a Python version: 3, 3.10, 3.9, 3.8, 3.7, 3.6 // Append -bullseye or -buster to pin to an OS version. // Use -bullseye variants on local on arm64/Apple Silicon. - "VARIANT": "3.8", + "VARIANT": "3.11", // Options "NODE_VERSION": "none" } }, - // Configure tool-specific properties. "customizations": { // Configure properties specific to VS Code. @@ -43,7 +42,6 @@ } } }, - // Add the IDs of extensions you want installed when the container is created. "extensions": [ "ms-python.python", @@ -51,14 +49,11 @@ ] } }, - // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], - // Use 'postCreateCommand' to run commands after the container is created. // "postCreateCommand": "pip3 install --user -r requirements.txt", - // Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "vscode", "postCreateCommand": "pip install -e . && pip install -r dev-requirements.txt" -} +} \ No newline at end of file diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index bdc2e0fe..f94f9cfc 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -6,8 +6,7 @@ from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError -_REGISTERED_DF = {} - +_REGISTERED_DF: dict = {} class DuckDBCursorWrapper: def __init__(self, cursor): @@ -17,10 +16,12 @@ def __init__(self, cursor): def __getattr__(self, name): return getattr(self._cursor, name) - def execute(self, sql, bindings=None): - # register_df(self._cursor, _REGISTERED_DF) + def execute(self, sql, bindings=None): + #register all dfs + #TODO is this okey to be here? for df_name, df in _REGISTERED_DF.items(): self._cursor.register(df_name, df) + try: if bindings is None: return self._cursor.execute(sql) @@ -75,7 +76,6 @@ def handle(self): self.conn = self.initialize_db(self.creds, self._plugins) self.handle_count += 1 cursor = self.initialize_cursor(self.creds, self.conn.cursor()) - # register_df(cursor, self._registered_df) return DuckDBConnectionWrapper(cursor, self) def submit_python_job( @@ -118,11 +118,12 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): # Nothing to do (we ignore the existing table) return df = plugin.load(source_config) - df_name = source_config.identifier + "_df" assert df is not None + df_name = source_config.identifier + "_df" + #this can be problem with other plugins because they will be loaded into memory e.g excel + _REGISTERED_DF[df_name] = df + if plugin_name == "delta": - _REGISTERED_DF[df_name] = df - cursor.register_df(df_name, df) materialization = source_config.meta.get("materialization", "view") else: materialization = source_config.meta.get("materialization", "table") @@ -130,6 +131,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): cursor.execute( f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM {df_name}" ) + cursor.close() handle.close() @@ -159,8 +161,3 @@ def close(self): def __del__(self): self.close() - - -def register_df(cursor, df_dict): - for df_name, df in df_dict.items(): - cursor.register(df_name, df) diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index ff89aca3..5dfdae6e 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -19,7 +19,7 @@ def load(self, source_config: SourceConfig): raise Exception( "'delta_table_path' is a required argument for the delta table!" ) - logger.debug(source_config) + #logger.debug(source_config) table_path = source_config["delta_table_path"] storage_options = source_config.get("storage", None) @@ -33,23 +33,12 @@ def load(self, source_config: SourceConfig): as_of_datetime = source_config.get("as_of_datetime", None) if as_of_version: - dt.load_version(1) + dt.load_version(as_of_version) if as_of_datetime: dt.load_with_datetime(as_of_datetime) - # prunning attributes - pruning_filter = source_config.get("pruning_filter", "1=1") - pruning_projection = source_config.get("pruning_projection", "*") - - df_db = duckdb.arrow(dt.to_pyarrow_table()) - df_db_pruned = df_db.filter(pruning_filter).project(pruning_projection) - - logger.debug(df_db_pruned.explain()) - return df_db_pruned - - -# TODO each node calls plugin.load indipendent which is maybe overhead? + return dt.to_pyarrow_table() # Future # TODO add deltalake storage options diff --git a/test_delta.ipynb b/test_delta.ipynb index 85927e31..c6a1da5e 100644 --- a/test_delta.ipynb +++ b/test_delta.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 22, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -38,6 +38,34 @@ "#conn1.execute(\"SELECT * FROM test_materialized\").show()" ] }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "┌───────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────┐\n", + "│ explain_key │ explain_value │\n", + "│ varchar │ varchar │\n", + "├───────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────┤\n", + "│ physical_plan │ ┌───────────────────────────┐\\n│ PROJECTION │\\n│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │\\n│ … │\n", + "└───────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import duckdb\n", + "conn = duckdb.connect(\":memory:\")\n", + "\n", + "conn.sql(\"EXPLAIN SELECT 1\")" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/tests/functional/plugins/test_delta.py b/tests/functional/plugins/test_delta.py index a11332f2..51bb032b 100644 --- a/tests/functional/plugins/test_delta.py +++ b/tests/functional/plugins/test_delta.py @@ -32,17 +32,6 @@ select * from {{ source('delta_source', 'table_1') }} """ -# plugin_sql = """ -# {{ config(materialized='external', plugin='cfp', key='value') }} -# select foo() as foo -# """ - -# # Reads from a MD database in my test account in the cloud -# md_sql = """ -# select * FROM plugin_test.main.plugin_table -# """ - - @pytest.mark.skip_profile("buenavista", "md") class TestPlugins: @pytest.fixture(scope="class") From c072cedaaf18918e72ba34216424305eab427d2f Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Sun, 8 Oct 2023 17:16:55 +0000 Subject: [PATCH 09/22] refactor registered df; delete launch --- .vscode/launch.json | 18 ------------- dbt/adapters/duckdb/environments/__init__.py | 9 ++++++- dbt/adapters/duckdb/environments/local.py | 27 +++++--------------- dbt/adapters/duckdb/plugins/__init__.py | 10 ++++++++ dbt/adapters/duckdb/plugins/delta.py | 19 +++++++++++--- 5 files changed, 40 insertions(+), 43 deletions(-) delete mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 88331b37..00000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "Python: Module", - "type": "python", - "request": "launch", - "module": "pytest", - "args": [ - "/workspaces/dbt-duckdb/tests/functional/plugins/test_delta.py" - ], - "justMyCode": true - } - ] -} \ No newline at end of file diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index 8eaabf9c..553d4403 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -106,11 +106,18 @@ def initialize_db( return conn @classmethod - def initialize_cursor(cls, creds: DuckDBCredentials, cursor): + def initialize_cursor(cls, creds: DuckDBCredentials, cursor, plugins: Optional[Dict[str, BasePlugin]] = None): for key, value in creds.load_settings().items(): # Okay to set these as strings because DuckDB will cast them # to the correct type cursor.execute(f"SET {key} = '{value}'") + + #update cursor if something is lost in the copy + #of the parent connection + if plugins: + for plugin in plugins.values(): + plugin.configure_cursor(cursor) + return cursor @classmethod diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index f94f9cfc..c7dfb920 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -6,7 +6,6 @@ from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError -_REGISTERED_DF: dict = {} class DuckDBCursorWrapper: def __init__(self, cursor): @@ -17,11 +16,6 @@ def __getattr__(self, name): return getattr(self._cursor, name) def execute(self, sql, bindings=None): - #register all dfs - #TODO is this okey to be here? - for df_name, df in _REGISTERED_DF.items(): - self._cursor.register(df_name, df) - try: if bindings is None: return self._cursor.execute(sql) @@ -30,10 +24,6 @@ def execute(self, sql, bindings=None): except RuntimeError as e: raise DbtRuntimeError(str(e)) - def register_df(self, df_name, df): - self._cursor.register(df_name, df) - - class DuckDBConnectionWrapper: def __init__(self, cursor, env): self._cursor = DuckDBCursorWrapper(cursor) @@ -75,7 +65,8 @@ def handle(self): if self.conn is None: self.conn = self.initialize_db(self.creds, self._plugins) self.handle_count += 1 - cursor = self.initialize_cursor(self.creds, self.conn.cursor()) + + cursor = self.initialize_cursor(self.creds, self.conn.cursor(), self._plugins) return DuckDBConnectionWrapper(cursor, self) def submit_python_job( @@ -119,18 +110,14 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): return df = plugin.load(source_config) assert df is not None - df_name = source_config.identifier + "_df" - #this can be problem with other plugins because they will be loaded into memory e.g excel - _REGISTERED_DF[df_name] = df - if plugin_name == "delta": - materialization = source_config.meta.get("materialization", "view") + if plugin_name == "delta": # potentially should all plugins use configure_cursor + plugin.configure_cursor(cursor) else: materialization = source_config.meta.get("materialization", "table") - - cursor.execute( - f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM {df_name}" - ) + cursor.execute( + f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df" + ) cursor.close() handle.close() diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 4495acf8..68bfceb3 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -111,3 +111,13 @@ def load(self, source_config: SourceConfig): def store(self, target_config: TargetConfig): raise NotImplementedError(f"store method not implemented for {self.name}") + + def configure_cursor(self, cursor): + """ + Configure each copy of the DuckDB cursor. + This method should be overridden by subclasses to provide additional + attributes to the connection which are lost in the copy of the parent connection. + + :param cursor: A DuckDBPyConnection instance to be configured. + """ + pass \ No newline at end of file diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 5dfdae6e..3e879940 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -11,8 +11,15 @@ class Plugin(BasePlugin): def initialize(self, config: Dict[str, Any]): - # place for init catalog in the future - pass + self._REGISTERED_DF: dict = {} + + def configure_cursor(self, cursor): + for source_table_name, df in self._REGISTERED_DF.items(): + df_name = source_table_name.replace(".", "_") + "_df" + cursor.register(df_name, df) + cursor.execute( + f"CREATE OR REPLACE VIEW {source_table_name} AS SELECT * FROM {df_name}" + ) def load(self, source_config: SourceConfig): if "delta_table_path" not in source_config: @@ -38,8 +45,12 @@ def load(self, source_config: SourceConfig): if as_of_datetime: dt.load_with_datetime(as_of_datetime) - return dt.to_pyarrow_table() + df = dt.to_pyarrow_table() + + ##save to register it later + self._REGISTERED_DF[source_config.table_name()] = df + + return df # Future -# TODO add deltalake storage options # TODO add databricks catalog From 959a1c171e6959800dd992e215dc76c3eaf9edd5 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Sun, 8 Oct 2023 17:22:42 +0000 Subject: [PATCH 10/22] delete test delta --- test_delta.ipynb | 99 ------------------------------------------------ 1 file changed, 99 deletions(-) delete mode 100644 test_delta.ipynb diff --git a/test_delta.ipynb b/test_delta.ipynb deleted file mode 100644 index c6a1da5e..00000000 --- a/test_delta.ipynb +++ /dev/null @@ -1,99 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 22, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "import duckdb\n", - "from deltalake import DeltaTable\n", - "import pandas as pd\n", - "\n", - "conn = duckdb.connect(\":memory:\")\n", - "\n", - "df = pd.DataFrame({\"x\": [1, 2, 3]})\n", - "conn.execute('BEGIN')\n", - "conn.execute('create schema if not exists \"memory\".\"test16965229211977761792_test_delta\"')\n", - "conn.execute('COMMIT')\n", - "conn1 = conn.cursor() \n", - "conn1.register(\"table_1_df\",df)\n", - "conn1.execute('CREATE OR REPLACE view memory.main.table_1 AS SELECT * FROM table_1_df')\n", - "\n", - "conn.execute('BEGIN')\n", - "conn.execute('drop schema if exists \"memory\".\"test16965229211977761792_test_delta\" cascade')\n", - "conn.execute('COMMIT')\n", - "#conn.execute(\"SELECT * FROM test_materialized\")\n", - "#conn1.execute(\"SELECT * FROM test_materialized\").show()" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "┌───────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────┐\n", - "│ explain_key │ explain_value │\n", - "│ varchar │ varchar │\n", - "├───────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────┤\n", - "│ physical_plan │ ┌───────────────────────────┐\\n│ PROJECTION │\\n│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │\\n│ … │\n", - "└───────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘" - ] - }, - "execution_count": 3, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "import duckdb\n", - "conn = duckdb.connect(\":memory:\")\n", - "\n", - "conn.sql(\"EXPLAIN SELECT 1\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.4" - }, - "orig_nbformat": 4 - }, - "nbformat": 4, - "nbformat_minor": 2 -} From 806e04f76c3963b320dec491735d9703fb8751f7 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Wed, 11 Oct 2023 22:22:34 +0000 Subject: [PATCH 11/22] delete redudant configure call --- dbt/adapters/duckdb/environments/local.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index c7dfb920..f8fd2da4 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -111,9 +111,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): df = plugin.load(source_config) assert df is not None - if plugin_name == "delta": # potentially should all plugins use configure_cursor - plugin.configure_cursor(cursor) - else: + if plugin_name not in ["delta"]: # plugins which configure cursor itselfs materialization = source_config.meta.get("materialization", "table") cursor.execute( f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df" From cbca70d979ff87fd3bdbfc76ce1fe45ce910e392 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Fri, 13 Oct 2023 00:08:21 +0000 Subject: [PATCH 12/22] addapt test; add create schema; fix storage --- dbt/adapters/duckdb/environments/local.py | 4 ++ dbt/adapters/duckdb/plugins/delta.py | 9 ++-- tests/functional/plugins/test_delta.py | 65 +++++++++++++++++++---- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index f8fd2da4..82fe8866 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -89,6 +89,10 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): plugin = self._plugins[plugin_name] handle = self.handle() cursor = handle.cursor() + + if source_config.schema: + cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {source_config.schema}") + save_mode = source_config.get("save_mode", "overwrite") if save_mode in ("ignore", "error_if_exists"): params = [source_config.schema, source_config.identifier] diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 3e879940..0d9e3b8d 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -1,6 +1,5 @@ from typing import Any from typing import Dict -import duckdb from deltalake import DeltaTable @@ -26,12 +25,12 @@ def load(self, source_config: SourceConfig): raise Exception( "'delta_table_path' is a required argument for the delta table!" ) - #logger.debug(source_config) + table_path = source_config["delta_table_path"] - storage_options = source_config.get("storage", None) + storage_options = source_config.get("storage_options", None) if storage_options: - dt = DeltaTable(table_path, storage_options) + dt = DeltaTable(table_path, storage_options=storage_options) else: dt = DeltaTable(table_path) @@ -46,7 +45,7 @@ def load(self, source_config: SourceConfig): dt.load_with_datetime(as_of_datetime) df = dt.to_pyarrow_table() - + ##save to register it later self._REGISTERED_DF[source_config.table_name()] = df diff --git a/tests/functional/plugins/test_delta.py b/tests/functional/plugins/test_delta.py index 51bb032b..f8b2e25c 100644 --- a/tests/functional/plugins/test_delta.py +++ b/tests/functional/plugins/test_delta.py @@ -1,6 +1,4 @@ -import os import pytest -import sqlite3 from pathlib import Path import shutil import pandas as pd @@ -15,15 +13,24 @@ version: 2 sources: - name: delta_source - schema: main meta: plugin: delta tables: - name: table_1 description: "An delta table" meta: - materialization: "view" - delta_table_path: "{test_delta_path}" + delta_table_path: "{test_delta_path1}" + + - name: delta_source_test + schema: test + meta: + plugin: delta + tables: + - name: table_2 + description: "An delta table" + meta: + delta_table_path: "{test_delta_path2}" + as_of_version: 0 """ @@ -31,13 +38,22 @@ {{ config(materialized='table') }} select * from {{ source('delta_source', 'table_1') }} """ +delta2_sql = """ + {{ config(materialized='table') }} + select * from {{ source('delta_source', 'table_1') }} limit 1 +""" +delta3_sql = """ + {{ config(materialized='table') }} + select * as a from {{ source('delta_source_test', 'table_2') }} WHERE y = 'd' +""" + @pytest.mark.skip_profile("buenavista", "md") class TestPlugins: @pytest.fixture(scope="class") - def delta_test_table(self): + def delta_test_table1(self): path = Path("/tmp/test_delta") - table_path = path / "test_delta_table" + table_path = path / "test_delta_table1" df = pd.DataFrame({"x": [1, 2, 3]}) write_deltalake(table_path, df, mode="overwrite") @@ -46,6 +62,27 @@ def delta_test_table(self): shutil.rmtree(table_path) + @pytest.fixture(scope="class") + def delta_test_table2(self): + path = Path("/workspaces/dbt-duckdb/.vscode/test_delta") + table_path = path / "test_delta_table2" + + df = pd.DataFrame({ + "x": [1, 2, 3, 2, 3, 4, 5, 6], + "y": ["a", "b", "b", "c", "d", "c", "d", "a"] + }) + write_deltalake(table_path, df, mode="overwrite") + + df = pd.DataFrame({ + "x": [1, 2], + "y": ["a","b"] + }) + write_deltalake(table_path, df, mode="overwrite") + + yield table_path + + shutil.rmtree(table_path) + @pytest.fixture(scope="class") def profiles_config_update(self, dbt_profile_target): plugins = [{"module": "delta"}] @@ -63,14 +100,20 @@ def profiles_config_update(self, dbt_profile_target): } @pytest.fixture(scope="class") - def models(self, delta_test_table): + def models(self, delta_test_table1,delta_test_table2): return { "source_schema.yml": delta_schema_yml.format( - test_delta_path=delta_test_table + test_delta_path1=delta_test_table1, + test_delta_path2=delta_test_table2 ), - "delta_table.sql": delta1_sql, + "delta_table1.sql": delta1_sql, + "delta_table2.sql": delta2_sql, + "delta_table3.sql": delta3_sql, } def test_plugins(self, project): results = run_dbt() - assert len(results) == 1 + assert len(results) == 3 + + # res = project.run_sql("SELECT count(1) FROM 'delta_table3'", fetch="one") + # assert res[0] == 2 From 0faf9bb0095f01472785f9549597f85e2d6a9e70 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Fri, 13 Oct 2023 22:16:25 +0000 Subject: [PATCH 13/22] add delta_table3_expected in test --- tests/functional/plugins/test_delta.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/functional/plugins/test_delta.py b/tests/functional/plugins/test_delta.py index f8b2e25c..856abedd 100644 --- a/tests/functional/plugins/test_delta.py +++ b/tests/functional/plugins/test_delta.py @@ -47,6 +47,10 @@ select * as a from {{ source('delta_source_test', 'table_2') }} WHERE y = 'd' """ +delta3_sql_expected = """ + select 1 as x, 'a' as y +""" + @pytest.mark.skip_profile("buenavista", "md") class TestPlugins: @@ -68,8 +72,8 @@ def delta_test_table2(self): table_path = path / "test_delta_table2" df = pd.DataFrame({ - "x": [1, 2, 3, 2, 3, 4, 5, 6], - "y": ["a", "b", "b", "c", "d", "c", "d", "a"] + "x": [1], + "y": ["a"] }) write_deltalake(table_path, df, mode="overwrite") @@ -109,11 +113,19 @@ def models(self, delta_test_table1,delta_test_table2): "delta_table1.sql": delta1_sql, "delta_table2.sql": delta2_sql, "delta_table3.sql": delta3_sql, + "delta_table3_expected.sql": delta3_sql_expected, } def test_plugins(self, project): results = run_dbt() - assert len(results) == 3 - + assert len(results) == 4 + + # check_relations_equal( + # project.adapter, + # [ + # "delta_table3", + # "delta_table3_expected", + # ], + # ) # res = project.run_sql("SELECT count(1) FROM 'delta_table3'", fetch="one") # assert res[0] == 2 From d891d913103ef58368ea9a891ae56b0e36835d5e Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Fri, 13 Oct 2023 22:23:36 +0000 Subject: [PATCH 14/22] update readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 91a23073..88871068 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,7 @@ Please remember that using plugins may require you to add additional dependencie * `gsheet` depends on `gspread` and `pandas` * `iceberg` depends on `pyiceberg` and Python >= 3.8 * `sqlalchemy` depends on `pandas`, `sqlalchemy`, and the driver(s) you need +* `delta` depends on `deltalake`, [a example project](https://github.com/milicevica23/dbt-duckdb-delta-plugin-demo) #### Using Local Python Modules From eba3f7f1032c1673b839665416a3abf63876ab1f Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Fri, 13 Oct 2023 22:23:46 +0000 Subject: [PATCH 15/22] fix readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 88871068..17c482c9 100644 --- a/README.md +++ b/README.md @@ -183,7 +183,7 @@ Please remember that using plugins may require you to add additional dependencie * `gsheet` depends on `gspread` and `pandas` * `iceberg` depends on `pyiceberg` and Python >= 3.8 * `sqlalchemy` depends on `pandas`, `sqlalchemy`, and the driver(s) you need -* `delta` depends on `deltalake`, [a example project](https://github.com/milicevica23/dbt-duckdb-delta-plugin-demo) +* `delta` depends on `deltalake`, [an example project](https://github.com/milicevica23/dbt-duckdb-delta-plugin-demo) #### Using Local Python Modules From 3d64d08aba644221d93e41c1c7f4a968c512ea2c Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Sun, 15 Oct 2023 18:34:18 +0000 Subject: [PATCH 16/22] change pyarrow_table to pyarrow_dataset; use tempfile --- dbt/adapters/duckdb/plugins/delta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 0d9e3b8d..550a5a5d 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -44,7 +44,7 @@ def load(self, source_config: SourceConfig): if as_of_datetime: dt.load_with_datetime(as_of_datetime) - df = dt.to_pyarrow_table() + df = dt.to_pyarrow_dataset() ##save to register it later self._REGISTERED_DF[source_config.table_name()] = df From 09dfe1af2aec091673b02fc22af736f92b9a2f40 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Sun, 15 Oct 2023 18:34:42 +0000 Subject: [PATCH 17/22] use tempfile --- tests/functional/plugins/test_delta.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/functional/plugins/test_delta.py b/tests/functional/plugins/test_delta.py index 856abedd..de0a0b83 100644 --- a/tests/functional/plugins/test_delta.py +++ b/tests/functional/plugins/test_delta.py @@ -1,7 +1,7 @@ import pytest from pathlib import Path -import shutil import pandas as pd +import tempfile from dbt.tests.util import ( check_relations_equal, @@ -56,7 +56,8 @@ class TestPlugins: @pytest.fixture(scope="class") def delta_test_table1(self): - path = Path("/tmp/test_delta") + td = tempfile.TemporaryDirectory() + path = Path(td.name) table_path = path / "test_delta_table1" df = pd.DataFrame({"x": [1, 2, 3]}) @@ -64,11 +65,12 @@ def delta_test_table1(self): yield table_path - shutil.rmtree(table_path) + td.cleanup() @pytest.fixture(scope="class") def delta_test_table2(self): - path = Path("/workspaces/dbt-duckdb/.vscode/test_delta") + td = tempfile.TemporaryDirectory() + path = Path(td.name) table_path = path / "test_delta_table2" df = pd.DataFrame({ @@ -85,7 +87,7 @@ def delta_test_table2(self): yield table_path - shutil.rmtree(table_path) + td.cleanup() @pytest.fixture(scope="class") def profiles_config_update(self, dbt_profile_target): From 0cd1f3553a98b17ca082b17628055f3be8b2ac5e Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Thu, 19 Oct 2023 13:29:35 +0000 Subject: [PATCH 18/22] refactor df registration for more general approach --- dbt/adapters/duckdb/environments/__init__.py | 15 ++++++++--- dbt/adapters/duckdb/environments/local.py | 27 ++++++++++++++------ dbt/adapters/duckdb/plugins/delta.py | 16 +++--------- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index 553d4403..08fee34f 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -106,18 +106,27 @@ def initialize_db( return conn @classmethod - def initialize_cursor(cls, creds: DuckDBCredentials, cursor, plugins: Optional[Dict[str, BasePlugin]] = None): + def initialize_cursor( + cls, + creds: DuckDBCredentials, + cursor, + plugins: Optional[Dict[str, BasePlugin]] = None, + registered_df: dict = {}, + ): for key, value in creds.load_settings().items(): # Okay to set these as strings because DuckDB will cast them # to the correct type cursor.execute(f"SET {key} = '{value}'") - #update cursor if something is lost in the copy - #of the parent connection + # update cursor if something is lost in the copy + # of the parent connection if plugins: for plugin in plugins.values(): plugin.configure_cursor(cursor) + for df_name, df in registered_df.items(): + cursor.register(df_name, df) + return cursor @classmethod diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index 82fe8866..97536e30 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -52,6 +52,7 @@ def __init__(self, credentials: credentials.DuckDBCredentials): or credentials.path.startswith("md:") or credentials.path.startswith("motherduck:") ) + self._REGISTERED_DF: dict = {} def notify_closed(self): with self.lock: @@ -65,8 +66,10 @@ def handle(self): if self.conn is None: self.conn = self.initialize_db(self.creds, self._plugins) self.handle_count += 1 - - cursor = self.initialize_cursor(self.creds, self.conn.cursor(), self._plugins) + + cursor = self.initialize_cursor( + self.creds, self.conn.cursor(), self._plugins, self._REGISTERED_DF + ) return DuckDBConnectionWrapper(cursor, self) def submit_python_job( @@ -115,12 +118,20 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): df = plugin.load(source_config) assert df is not None - if plugin_name not in ["delta"]: # plugins which configure cursor itselfs - materialization = source_config.meta.get("materialization", "table") - cursor.execute( - f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df" - ) - + materialization = source_config.meta.get("materialization", "table") + source_table_name = source_config.table_name() + df_name = source_table_name.replace(".", "_") + "_df" + + cursor.register(df_name, df) + + if materialization == "view": + ##save to df instance to register on each cursor creation + self._REGISTERED_DF[df_name] = df + + cursor.execute( + f"CREATE OR REPLACE {materialization} {source_table_name} AS SELECT * FROM {df_name}" + ) + cursor.close() handle.close() diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 550a5a5d..74801451 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -10,22 +10,17 @@ class Plugin(BasePlugin): def initialize(self, config: Dict[str, Any]): - self._REGISTERED_DF: dict = {} - + pass + def configure_cursor(self, cursor): - for source_table_name, df in self._REGISTERED_DF.items(): - df_name = source_table_name.replace(".", "_") + "_df" - cursor.register(df_name, df) - cursor.execute( - f"CREATE OR REPLACE VIEW {source_table_name} AS SELECT * FROM {df_name}" - ) + pass def load(self, source_config: SourceConfig): if "delta_table_path" not in source_config: raise Exception( "'delta_table_path' is a required argument for the delta table!" ) - + table_path = source_config["delta_table_path"] storage_options = source_config.get("storage_options", None) @@ -45,9 +40,6 @@ def load(self, source_config: SourceConfig): dt.load_with_datetime(as_of_datetime) df = dt.to_pyarrow_dataset() - - ##save to register it later - self._REGISTERED_DF[source_config.table_name()] = df return df From 040bd5432d97d6b7e29df812984626fcf3451cc0 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 20 Oct 2023 14:24:28 -0700 Subject: [PATCH 19/22] With linting enabled and fixes for it --- .devcontainer/devcontainer.json | 2 +- .pre-commit-config.yaml | 2 +- dbt/adapters/duckdb/environments/local.py | 17 ++++++----------- dbt/adapters/duckdb/plugins/__init__.py | 4 ++-- dbt/adapters/duckdb/plugins/delta.py | 6 ++---- 5 files changed, 12 insertions(+), 19 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 6aa82b4a..066f8c7a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -56,4 +56,4 @@ // Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "vscode", "postCreateCommand": "pip install -e . && pip install -r dev-requirements.txt" -} \ No newline at end of file +} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1df0c78f..d75d365b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,7 @@ exclude: "^tests/.*" default_language_version: - python: python3.8 + python: python3.11 repos: - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index 97536e30..34560074 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -15,7 +15,7 @@ def __init__(self, cursor): def __getattr__(self, name): return getattr(self._cursor, name) - def execute(self, sql, bindings=None): + def execute(self, sql, bindings=None): try: if bindings is None: return self._cursor.execute(sql) @@ -24,6 +24,7 @@ def execute(self, sql, bindings=None): except RuntimeError as e: raise DbtRuntimeError(str(e)) + class DuckDBConnectionWrapper: def __init__(self, cursor, env): self._cursor = DuckDBCursorWrapper(cursor) @@ -72,9 +73,7 @@ def handle(self): ) return DuckDBConnectionWrapper(cursor, self) - def submit_python_job( - self, handle, parsed_model: dict, compiled_code: str - ) -> AdapterResponse: + def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: con = handle.cursor() def ldf(table_name): @@ -109,9 +108,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): params.append(source_config.database) if cursor.execute(q, params).fetchone()[0]: if save_mode == "error_if_exists": - raise Exception( - f"Source {source_config.table_name()} already exists!" - ) + raise Exception(f"Source {source_config.table_name()} already exists!") else: # Nothing to do (we ignore the existing table) return @@ -125,7 +122,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): cursor.register(df_name, df) if materialization == "view": - ##save to df instance to register on each cursor creation + # save to df instance to register on each cursor creation self._REGISTERED_DF[df_name] = df cursor.execute( @@ -135,9 +132,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): cursor.close() handle.close() - def store_relation( - self, plugin_name: str, target_config: utils.TargetConfig - ) -> None: + def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None: if plugin_name not in self._plugins: if plugin_name.startswith("glue|"): from ..plugins import glue diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 68bfceb3..183c8693 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -114,10 +114,10 @@ def store(self, target_config: TargetConfig): def configure_cursor(self, cursor): """ - Configure each copy of the DuckDB cursor. + Configure each copy of the DuckDB cursor. This method should be overridden by subclasses to provide additional attributes to the connection which are lost in the copy of the parent connection. :param cursor: A DuckDBPyConnection instance to be configured. """ - pass \ No newline at end of file + pass diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index 74801451..db4df705 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -5,7 +5,6 @@ from . import BasePlugin from ..utils import SourceConfig -from dbt.logger import GLOBAL_LOGGER as logger class Plugin(BasePlugin): @@ -17,9 +16,7 @@ def configure_cursor(self, cursor): def load(self, source_config: SourceConfig): if "delta_table_path" not in source_config: - raise Exception( - "'delta_table_path' is a required argument for the delta table!" - ) + raise Exception("'delta_table_path' is a required argument for the delta table!") table_path = source_config["delta_table_path"] storage_options = source_config.get("storage_options", None) @@ -43,5 +40,6 @@ def load(self, source_config: SourceConfig): return df + # Future # TODO add databricks catalog From 6ec7466b195a41734652bc21411e30a2d3bd9abf Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 20 Oct 2023 14:29:04 -0700 Subject: [PATCH 20/22] See if that makes the precommit checks happier --- .devcontainer/devcontainer.json | 2 +- .pre-commit-config.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 066f8c7a..270bbd7a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -7,7 +7,7 @@ // Update 'VARIANT' to pick a Python version: 3, 3.10, 3.9, 3.8, 3.7, 3.6 // Append -bullseye or -buster to pin to an OS version. // Use -bullseye variants on local on arm64/Apple Silicon. - "VARIANT": "3.11", + "VARIANT": "3.8", // Options "NODE_VERSION": "none" } diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d75d365b..1df0c78f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,7 @@ exclude: "^tests/.*" default_language_version: - python: python3.11 + python: python3.8 repos: - repo: https://github.com/pre-commit/pre-commit-hooks From b48be37684bfc520d753f554c3c11bc2d8c45612 Mon Sep 17 00:00:00 2001 From: Aleksandar Milicevic Date: Wed, 25 Oct 2023 19:04:29 +0000 Subject: [PATCH 21/22] adapt readme; add default materialization config --- dbt/adapters/duckdb/environments/local.py | 2 +- dbt/adapters/duckdb/plugins/__init__.py | 3 +++ dbt/adapters/duckdb/plugins/delta.py | 2 ++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index 34560074..abfead8f 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -115,7 +115,7 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): df = plugin.load(source_config) assert df is not None - materialization = source_config.meta.get("materialization", "table") + materialization = source_config.meta.get("materialization", plugin.default_materialization()) source_table_name = source_config.table_name() df_name = source_table_name.replace(".", "_") + "_df" diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 183c8693..2520173b 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -121,3 +121,6 @@ def configure_cursor(self, cursor): :param cursor: A DuckDBPyConnection instance to be configured. """ pass + + def default_materialization(self): + return "table" diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index db4df705..ffcf1312 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -40,6 +40,8 @@ def load(self, source_config: SourceConfig): return df + def default_materialization(self): + return "view" # Future # TODO add databricks catalog From 689d35d9e04b059b7e80e5acf2d5318716f0c860 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Wed, 25 Oct 2023 13:26:16 -0700 Subject: [PATCH 22/22] format fixes --- dbt/adapters/duckdb/environments/local.py | 4 +++- dbt/adapters/duckdb/plugins/delta.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/duckdb/environments/local.py b/dbt/adapters/duckdb/environments/local.py index abfead8f..def00cff 100644 --- a/dbt/adapters/duckdb/environments/local.py +++ b/dbt/adapters/duckdb/environments/local.py @@ -115,7 +115,9 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig): df = plugin.load(source_config) assert df is not None - materialization = source_config.meta.get("materialization", plugin.default_materialization()) + materialization = source_config.meta.get( + "materialization", plugin.default_materialization() + ) source_table_name = source_config.table_name() df_name = source_table_name.replace(".", "_") + "_df" diff --git a/dbt/adapters/duckdb/plugins/delta.py b/dbt/adapters/duckdb/plugins/delta.py index ffcf1312..c6b0aa2a 100644 --- a/dbt/adapters/duckdb/plugins/delta.py +++ b/dbt/adapters/duckdb/plugins/delta.py @@ -43,5 +43,6 @@ def load(self, source_config: SourceConfig): def default_materialization(self): return "view" + # Future # TODO add databricks catalog