Skip to content

Commit

Permalink
Enable additional iceberg config options + and add a Spark-style save…
Browse files Browse the repository at this point in the history
… mode argument for plugin-based sources
  • Loading branch information
jwills committed Apr 17, 2023
1 parent f665487 commit 219f3e7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
19 changes: 17 additions & 2 deletions dbt/adapters/duckdb/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,25 @@ def load_source(self, plugin_name: str, source_config: SourceConfig):
f"Plugin {plugin_name} not found; known plugins are: "
+ ",".join(self._plugins.keys())
)
df = self._plugins[plugin_name].load(source_config)
assert df is not None
plugin = self._plugins[plugin_name]
handle = self.handle()
cursor = handle.cursor()
save_mode = source_config.meta.get("save_mode", "overwrite")
if save_mode in ("ignore", "error_if_exists"):
schema, identifier = source_config.schema, source_config.identifier
q = f"""SELECT COUNT(1)
FROM information_schema.tables
WHERE table_schema = '{schema}'
AND table_name = '{identifier}'
"""
if cursor.execute(q).fetchone()[0]:
if save_mode == "error_if_exists":
raise Exception(f"Source {source_config.table_name()} already exists!")
else:
# Nothing to do (we ignore the existing table)
return
df = plugin.load(source_config)
assert df is not None
materialization = source_config.meta.get("materialization", "table")
cursor.execute(
f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df"
Expand Down
7 changes: 5 additions & 2 deletions dbt/adapters/duckdb/plugins/iceberg.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from typing import Dict

from pyiceberg import catalog
import pyiceberg.catalog

from . import Plugin
from ..utils import SourceConfig


class IcebergPlugin(Plugin):
def __init__(self, config: Dict):
self._catalog = catalog.load_catalog(config.get("catalog"))
if "catalog" not in config:
raise Exception("'catalog' is a required argument for the iceberg plugin!")
catalog = config.pop("catalog")
self._catalog = pyiceberg.catalog.load_catalog(catalog, **config)

def load(self, source_config: SourceConfig):
table_format = source_config.meta.get("iceberg_table", "{schema}.{identifier}")
Expand Down
1 change: 1 addition & 0 deletions tests/functional/plugins/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
schema: main
meta:
plugin: sql
save_mode: ignore
tables:
- name: tt1
description: "My first SQLAlchemy table"
Expand Down

0 comments on commit 219f3e7

Please sign in to comment.