From 219f3e7ea518046f10ace53979fe1d2e022283f2 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Sun, 16 Apr 2023 21:22:57 -0700 Subject: [PATCH] Enable additional iceberg config options + and add a Spark-style save mode argument for plugin-based sources --- dbt/adapters/duckdb/environments.py | 19 +++++++++++++++++-- dbt/adapters/duckdb/plugins/iceberg.py | 7 +++++-- tests/functional/plugins/test_plugins.py | 1 + 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/duckdb/environments.py b/dbt/adapters/duckdb/environments.py index a667e2dd..7d49f2a9 100644 --- a/dbt/adapters/duckdb/environments.py +++ b/dbt/adapters/duckdb/environments.py @@ -185,10 +185,25 @@ def load_source(self, plugin_name: str, source_config: SourceConfig): f"Plugin {plugin_name} not found; known plugins are: " + ",".join(self._plugins.keys()) ) - df = self._plugins[plugin_name].load(source_config) - assert df is not None + plugin = self._plugins[plugin_name] handle = self.handle() cursor = handle.cursor() + save_mode = source_config.meta.get("save_mode", "overwrite") + if save_mode in ("ignore", "error_if_exists"): + schema, identifier = source_config.schema, source_config.identifier + q = f"""SELECT COUNT(1) + FROM information_schema.tables + WHERE table_schema = '{schema}' + AND table_name = '{identifier}' + """ + if cursor.execute(q).fetchone()[0]: + if save_mode == "error_if_exists": + raise Exception(f"Source {source_config.table_name()} already exists!") + else: + # Nothing to do (we ignore the existing table) + return + df = plugin.load(source_config) + assert df is not None materialization = source_config.meta.get("materialization", "table") cursor.execute( f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df" diff --git a/dbt/adapters/duckdb/plugins/iceberg.py b/dbt/adapters/duckdb/plugins/iceberg.py index ffc5cb3c..50b26a35 100644 --- a/dbt/adapters/duckdb/plugins/iceberg.py +++ b/dbt/adapters/duckdb/plugins/iceberg.py @@ -1,6 +1,6 @@ from typing import Dict -from pyiceberg import catalog +import pyiceberg.catalog from . import Plugin from ..utils import SourceConfig @@ -8,7 +8,10 @@ class IcebergPlugin(Plugin): def __init__(self, config: Dict): - self._catalog = catalog.load_catalog(config.get("catalog")) + if "catalog" not in config: + raise Exception("'catalog' is a required argument for the iceberg plugin!") + catalog = config.pop("catalog") + self._catalog = pyiceberg.catalog.load_catalog(catalog, **config) def load(self, source_config: SourceConfig): table_format = source_config.meta.get("iceberg_table", "{schema}.{identifier}") diff --git a/tests/functional/plugins/test_plugins.py b/tests/functional/plugins/test_plugins.py index dbaca109..5efb2c1d 100644 --- a/tests/functional/plugins/test_plugins.py +++ b/tests/functional/plugins/test_plugins.py @@ -28,6 +28,7 @@ schema: main meta: plugin: sql + save_mode: ignore tables: - name: tt1 description: "My first SQLAlchemy table"