From 446e6ae81359ebac841ebf75e8172d0b1f415da8 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Tue, 4 Apr 2023 12:18:27 -0700 Subject: [PATCH 01/18] Fix this bit up --- dbt/adapters/duckdb/connections.py | 25 ++++++++----- dbt/adapters/duckdb/credentials.py | 21 ++++++++++- dbt/adapters/duckdb/environments.py | 34 +++++++++++++---- dbt/adapters/duckdb/impl.py | 9 ++--- dbt/adapters/duckdb/plugins/__init__.py | 31 +++++++++++++++ dbt/adapters/duckdb/plugins/excel.py | 15 ++++++++ dbt/adapters/duckdb/relation.py | 50 ++++++++++++------------- dbt/adapters/duckdb/utils.py | 35 +++++++++++++++++ 8 files changed, 170 insertions(+), 50 deletions(-) create mode 100644 dbt/adapters/duckdb/plugins/__init__.py create mode 100644 dbt/adapters/duckdb/plugins/excel.py create mode 100644 dbt/adapters/duckdb/utils.py diff --git a/dbt/adapters/duckdb/connections.py b/dbt/adapters/duckdb/connections.py index 8ab60014..4f306eaa 100644 --- a/dbt/adapters/duckdb/connections.py +++ b/dbt/adapters/duckdb/connections.py @@ -14,12 +14,19 @@ class DuckDBConnectionManager(SQLConnectionManager): TYPE = "duckdb" - LOCK = threading.RLock() - ENV = None + _LOCK = threading.RLock() + _ENV = None def __init__(self, profile: AdapterRequiredConfig): super().__init__(profile) + @classmethod + def env(cls) -> environments.Environment: + with cls._LOCK: + if not cls._ENV: + raise Exception("DuckDBConnectionManager environment requested before creation!") + return cls._ENV + @classmethod def open(cls, connection: Connection) -> Connection: if connection.state == ConnectionState.OPEN: @@ -27,11 +34,11 @@ def open(cls, connection: Connection) -> Connection: return connection credentials = cls.get_credentials(connection.credentials) - with cls.LOCK: + with cls._LOCK: try: - if not cls.ENV: - cls.ENV = environments.create(credentials) - connection.handle = cls.ENV.handle() + if not cls._ENV: + cls._ENV = environments.create(credentials) + connection.handle = cls._ENV.handle() connection.state = ConnectionState.OPEN except RuntimeError as e: @@ -79,9 +86,9 @@ def get_response(cls, cursor) -> AdapterResponse: @classmethod def close_all_connections(cls): - with cls.LOCK: - if cls.ENV is not None: - cls.ENV = None + with cls._LOCK: + if cls._ENV is not None: + cls._ENV = None atexit.register(DuckDBConnectionManager.close_all_connections) diff --git a/dbt/adapters/duckdb/credentials.py b/dbt/adapters/duckdb/credentials.py index f395b9bf..4325dc5a 100644 --- a/dbt/adapters/duckdb/credentials.py +++ b/dbt/adapters/duckdb/credentials.py @@ -43,6 +43,20 @@ def to_sql(self) -> str: return base +@dataclass +class PluginConfig(dbtClassMixin): + # The name that this plugin will be referred to by in sources/models; must + # be unique within the project + name: str + + # The fully-specified class name of the plugin code to use, which must be a + # subclass of dbt.adapters.duckdb.plugins.Plugin. + impl: str + + # A plugin-specific set of configuration options + config: Optional[Dict[str, Any]] = None + + @dataclass class Remote(dbtClassMixin): host: str @@ -61,7 +75,7 @@ class DuckDBCredentials(Credentials): # to DuckDB (e.g., if we need to enable using unsigned extensions) config_options: Optional[Dict[str, Any]] = None - # any extensions we want to install and load (httpfs, parquet, etc.) + # any DuckDB extensions we want to install and load (httpfs, parquet, etc.) extensions: Optional[Tuple[str, ...]] = None # any additional pragmas we want to configure on our DuckDB connections; @@ -95,6 +109,11 @@ class DuckDBCredentials(Credentials): # Used to configure remote environments/connections remote: Optional[Remote] = None + # A list of dbt-duckdb plugins that can be used to customize the + # behavior of loading source data and/or storing the relations that are + # created by SQL or Python models; see the plugins module for more details. + plugins: Optional[List[PluginConfig]] = None + @classmethod def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]: data = super().__pre_deserialize__(data) diff --git a/dbt/adapters/duckdb/environments.py b/dbt/adapters/duckdb/environments.py index 8abfc13b..c7268dc3 100644 --- a/dbt/adapters/duckdb/environments.py +++ b/dbt/adapters/duckdb/environments.py @@ -1,10 +1,13 @@ import importlib.util import os import tempfile +from typing import Dict import duckdb from .credentials import DuckDBCredentials +from .plugins import Plugin +from .utils import SourceConfig from dbt.contracts.connection import AdapterResponse from dbt.exceptions import DbtRuntimeError @@ -60,11 +63,16 @@ def handle(self): def cursor(self): raise NotImplementedError - def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: + def submit_python_job( + self, handle, parsed_model: dict, compiled_code: str + ) -> AdapterResponse: raise NotImplementedError - def close(self, cursor): - raise NotImplementedError + def get_binding_char(self) -> str: + return "?" + + def load_source(self, plugin_name: str, source_config: SourceConfig) -> str: + return "load_source_todo" @classmethod def initialize_db(cls, creds: DuckDBCredentials): @@ -93,7 +101,7 @@ def initialize_db(cls, creds: DuckDBCredentials): return conn @classmethod - def initialize_cursor(cls, creds, cursor): + def initialize_cursor(cls, creds: DuckDBCredentials, cursor): # Extensions/settings need to be configured per cursor for ext in creds.extensions or []: cursor.execute(f"LOAD '{ext}'") @@ -103,6 +111,16 @@ def initialize_cursor(cls, creds, cursor): cursor.execute(f"SET {key} = '{value}'") return cursor + @classmethod + def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, Plugin]: + ret = {} + for plugin in creds.plugins or []: + if plugin.name in ret: + raise Exception("Duplicate plugin name: " + plugin.name) + else: + ret[plugin.name] = Plugin.create(plugin.impl, plugin.config or {}) + return ret + @classmethod def run_python_job(cls, con, load_df_function, identifier: str, compiled_code: str): mod_file = tempfile.NamedTemporaryFile(suffix=".py", delete=False) @@ -136,13 +154,11 @@ def run_python_job(cls, con, load_df_function, identifier: str, compiled_code: s finally: os.unlink(mod_file.name) - def get_binding_char(self) -> str: - return "?" - class LocalEnvironment(Environment): def __init__(self, credentials: DuckDBCredentials): self.conn = self.initialize_db(credentials) + self.plugins = self.initialize_plugins(credentials) self.creds = credentials def handle(self): @@ -150,7 +166,9 @@ def handle(self): cursor = self.initialize_cursor(self.creds, self.conn.cursor()) return DuckDBConnectionWrapper(cursor) - def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: + def submit_python_job( + self, handle, parsed_model: dict, compiled_code: str + ) -> AdapterResponse: con = handle.cursor() def ldf(table_name): diff --git a/dbt/adapters/duckdb/impl.py b/dbt/adapters/duckdb/impl.py index 6bd3b125..b00e9749 100644 --- a/dbt/adapters/duckdb/impl.py +++ b/dbt/adapters/duckdb/impl.py @@ -85,7 +85,7 @@ def use_database(self) -> bool: @available def get_binding_char(self): - return DuckDBConnectionManager.ENV.get_binding_char() + return DuckDBConnectionManager.env().get_binding_char() @available def external_write_options(self, write_location: str, rendered_options: dict) -> str: @@ -144,11 +144,8 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str) -> AdapterRe connection = self.connections.get_if_exists() if not connection: connection = self.connections.get_thread_connection() - if DuckDBConnectionManager.ENV: - env = DuckDBConnectionManager.ENV - return env.submit_python_job(connection.handle, parsed_model, compiled_code) - else: - raise Exception("No ENV defined to execute dbt-duckdb python models!") + env = DuckDBConnectionManager.env() + return env.submit_python_job(connection.handle, parsed_model, compiled_code) def get_rows_different_sql( self, diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py new file mode 100644 index 00000000..2b750f9d --- /dev/null +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -0,0 +1,31 @@ +import abc +import importlib +from typing import Any +from typing import Dict + +from dbt.contracts.graph.nodes import SourceDefinition + + +class Plugin(abc.ABC): + @classmethod + def create(cls, impl: str, config: Dict[str, Any]) -> "Plugin": + module_name, class_name = impl.rsplit(".", 1) + module = importlib.import_module(module_name) + Class = getattr(module, class_name) + if not issubclass(Class, Plugin): + raise TypeError(f"{impl} is not a subclass of Plugin") + return Class(config) + + @abc.abstractmethod + def __init__(self, plugin_config: Dict): + pass + + @abc.abstractmethod + def load_source(self, source_definition: SourceDefinition) -> str: + """Load data from a source and return it as a string.""" + pass + + @abc.abstractmethod + def store_target(self, data, config: Dict) -> None: + """Store the given data using the provided config dictionary.""" + pass diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py new file mode 100644 index 00000000..c7d7957b --- /dev/null +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -0,0 +1,15 @@ +from typing import Dict + +import pandas as pd + +from . import Plugin + + +class ExcelPlugin(Plugin): + def __init__(self, config: Dict): + self._config = config + + def load_source(self, source_definition) -> str: + var_name = f"__excel_source_{source_definition.identifier}" + globals()[var_name] = pd.read_excel() + return var_name diff --git a/dbt/adapters/duckdb/relation.py b/dbt/adapters/duckdb/relation.py index 0aea4604..7235a665 100644 --- a/dbt/adapters/duckdb/relation.py +++ b/dbt/adapters/duckdb/relation.py @@ -3,6 +3,8 @@ from typing import Optional from typing import Type +from .connections import DuckDBConnectionManager +from .utils import SourceConfig from dbt.adapters.base.relation import BaseRelation from dbt.adapters.base.relation import Self from dbt.contracts.graph.nodes import SourceDefinition @@ -10,44 +12,40 @@ @dataclass(frozen=True, eq=False, repr=False) class DuckDBRelation(BaseRelation): - external_location: Optional[str] = None + external: Optional[str] = None @classmethod - def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self: - - # Some special handling here to allow sources that are external files to be specified - # via a `external_location` meta field. If the source's meta field is used, we include - # some logic to allow basic templating of the external location based on the individual - # name or identifier for the table itself to cut down on boilerplate. - ext_location = None - if "external_location" in source.meta: - ext_location = source.meta["external_location"] - elif "external_location" in source.source_meta: - # Use str.format here to allow for some basic templating outside of Jinja - ext_location = source.source_meta["external_location"] - - if ext_location: + def create_from_source( + cls: Type[Self], source: SourceDefinition, **kwargs: Any + ) -> Self: + source_config = SourceConfig.create(source) + # First check to see if a 'plugin' is defined in the meta argument for + # the source or its parent configuration, and if it is, use the environment + # associated with this run to get the name of the source that we should + # reference in the compiled model + if "plugin" in source_config.meta: + plugin_name = source_config.meta["plugin"] + source_name = DuckDBConnectionManager.env().load_source( + plugin_name, source_config + ) + kwargs["external"] = source_name + elif "external_location" in source_config.meta: # Call str.format with the schema, name and identifier for the source so that they # can be injected into the string; this helps reduce boilerplate when all # of the tables in the source have a similar location based on their name # and/or identifier. - format_args = { - "schema": source.schema, - "name": source.name, - "identifier": source.identifier, - } - if source.meta: - format_args.update(source.meta) - ext_location = ext_location.format(**format_args) + ext_location = source_config.meta["external_location"].format( + **source_config.as_dict() + ) # If it's a function call or already has single quotes, don't add them if "(" not in ext_location and not ext_location.startswith("'"): ext_location = f"'{ext_location}'" - kwargs["external_location"] = ext_location + kwargs["external"] = ext_location return super().create_from_source(source, **kwargs) # type: ignore def render(self) -> str: - if self.external_location: - return self.external_location + if self.external: + return self.external else: return super().render() diff --git a/dbt/adapters/duckdb/utils.py b/dbt/adapters/duckdb/utils.py new file mode 100644 index 00000000..592fa444 --- /dev/null +++ b/dbt/adapters/duckdb/utils.py @@ -0,0 +1,35 @@ +from dataclasses import dataclass +from typing import Any, Dict + +from dbt.contracts.graph.nodes import SourceDefinition + + +@dataclass +class SourceConfig: + name: str + identifier: str + schema: str + database: str + meta: Dict[str, Any] + + def as_dict(self) -> Dict[str, Any]: + base = { + "name": self.name, + "identifier": self.identifier, + "schema": self.schema, + "database": self.database, + } + base.update(self.meta) + return base + + @classmethod + def create(cls, source: SourceDefinition) -> "SourceConfig": + meta = source.source_meta.copy() + meta.update(source.meta) + return SourceConfig( + name=source.name, + identifier=source.identifier, + schema=source.schema, + database=source.database, + meta=meta, + ) From 417773ad1674bc9efa7264449c4843f9eb370266 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Wed, 5 Apr 2023 15:04:44 -0700 Subject: [PATCH 02/18] plugins WIP --- dbt/adapters/duckdb/environments.py | 8 ++------ dbt/adapters/duckdb/plugins/__init__.py | 8 ++++---- dbt/adapters/duckdb/plugins/excel.py | 6 ++---- dbt/adapters/duckdb/relation.py | 8 ++------ dbt/adapters/duckdb/utils.py | 3 ++- 5 files changed, 12 insertions(+), 21 deletions(-) diff --git a/dbt/adapters/duckdb/environments.py b/dbt/adapters/duckdb/environments.py index c7268dc3..779c78db 100644 --- a/dbt/adapters/duckdb/environments.py +++ b/dbt/adapters/duckdb/environments.py @@ -63,9 +63,7 @@ def handle(self): def cursor(self): raise NotImplementedError - def submit_python_job( - self, handle, parsed_model: dict, compiled_code: str - ) -> AdapterResponse: + def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: raise NotImplementedError def get_binding_char(self) -> str: @@ -166,9 +164,7 @@ def handle(self): cursor = self.initialize_cursor(self.creds, self.conn.cursor()) return DuckDBConnectionWrapper(cursor) - def submit_python_job( - self, handle, parsed_model: dict, compiled_code: str - ) -> AdapterResponse: + def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: con = handle.cursor() def ldf(table_name): diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 2b750f9d..bf9d202d 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -3,7 +3,7 @@ from typing import Any from typing import Dict -from dbt.contracts.graph.nodes import SourceDefinition +from ..utils import SourceConfig class Plugin(abc.ABC): @@ -21,11 +21,11 @@ def __init__(self, plugin_config: Dict): pass @abc.abstractmethod - def load_source(self, source_definition: SourceDefinition) -> str: - """Load data from a source and return it as a string.""" + def load(self, source_config: SourceConfig): + """Load data from a source config and return it as a string.""" pass @abc.abstractmethod - def store_target(self, data, config: Dict) -> None: + def store(self, data, config: Dict) -> None: """Store the given data using the provided config dictionary.""" pass diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py index c7d7957b..92f82867 100644 --- a/dbt/adapters/duckdb/plugins/excel.py +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -9,7 +9,5 @@ class ExcelPlugin(Plugin): def __init__(self, config: Dict): self._config = config - def load_source(self, source_definition) -> str: - var_name = f"__excel_source_{source_definition.identifier}" - globals()[var_name] = pd.read_excel() - return var_name + def load_data(self, source_config) -> pd.DataFrame: + return pd.read_csv() diff --git a/dbt/adapters/duckdb/relation.py b/dbt/adapters/duckdb/relation.py index 7235a665..eb9c71a1 100644 --- a/dbt/adapters/duckdb/relation.py +++ b/dbt/adapters/duckdb/relation.py @@ -15,9 +15,7 @@ class DuckDBRelation(BaseRelation): external: Optional[str] = None @classmethod - def create_from_source( - cls: Type[Self], source: SourceDefinition, **kwargs: Any - ) -> Self: + def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self: source_config = SourceConfig.create(source) # First check to see if a 'plugin' is defined in the meta argument for # the source or its parent configuration, and if it is, use the environment @@ -25,9 +23,7 @@ def create_from_source( # reference in the compiled model if "plugin" in source_config.meta: plugin_name = source_config.meta["plugin"] - source_name = DuckDBConnectionManager.env().load_source( - plugin_name, source_config - ) + source_name = DuckDBConnectionManager.env().load_source(plugin_name, source_config) kwargs["external"] = source_name elif "external_location" in source_config.meta: # Call str.format with the schema, name and identifier for the source so that they diff --git a/dbt/adapters/duckdb/utils.py b/dbt/adapters/duckdb/utils.py index 592fa444..8a2062d1 100644 --- a/dbt/adapters/duckdb/utils.py +++ b/dbt/adapters/duckdb/utils.py @@ -1,5 +1,6 @@ from dataclasses import dataclass -from typing import Any, Dict +from typing import Any +from typing import Dict from dbt.contracts.graph.nodes import SourceDefinition From 390a264bc99d8644c321724148f746461b27a3f1 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 7 Apr 2023 15:28:49 -0700 Subject: [PATCH 03/18] WIP: source plugins --- dbt/adapters/duckdb/buenavista.py | 21 +++++++++- dbt/adapters/duckdb/environments.py | 31 +++++++++----- dbt/adapters/duckdb/plugins/__init__.py | 17 ++++---- dbt/adapters/duckdb/plugins/excel.py | 9 ++++- dbt/adapters/duckdb/plugins/gsheet.py | 54 +++++++++++++++++++++++++ dbt/adapters/duckdb/plugins/iceberg.py | 18 +++++++++ dbt/adapters/duckdb/relation.py | 3 +- dbt/adapters/duckdb/utils.py | 6 ++- dev-requirements.txt | 2 + 9 files changed, 137 insertions(+), 24 deletions(-) create mode 100644 dbt/adapters/duckdb/plugins/gsheet.py create mode 100644 dbt/adapters/duckdb/plugins/iceberg.py diff --git a/dbt/adapters/duckdb/buenavista.py b/dbt/adapters/duckdb/buenavista.py index 0936b225..929aca0a 100644 --- a/dbt/adapters/duckdb/buenavista.py +++ b/dbt/adapters/duckdb/buenavista.py @@ -3,6 +3,7 @@ import psycopg2 from . import credentials +from . import utils from .environments import Environment from dbt.contracts.connection import AdapterResponse @@ -29,6 +30,9 @@ def handle(self): cursor.close() return conn + def get_binding_char(self) -> str: + return "%s" + def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: identifier = parsed_model["alias"] payload = { @@ -42,5 +46,18 @@ def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> A handle.cursor().execute(json.dumps(payload)) return AdapterResponse(_message="OK") - def get_binding_char(self) -> str: - return "%s" + def load_source(self, plugin_name: str, source_config: utils.SourceConfig) -> str: + handle = self.handle() + payload = { + "method": "dbt_load_source", + "params": { + "plugin_name": plugin_name, + "source_config": source_config.as_dict(), + }, + } + cursor = handle.cursor() + cursor.execute(json.dumps(payload)) + res = cursor.fetchone() + cursor.close() + handle.close() + return res[0] diff --git a/dbt/adapters/duckdb/environments.py b/dbt/adapters/duckdb/environments.py index 779c78db..d66991a7 100644 --- a/dbt/adapters/duckdb/environments.py +++ b/dbt/adapters/duckdb/environments.py @@ -1,3 +1,4 @@ +import abc import importlib.util import os import tempfile @@ -56,21 +57,21 @@ def cursor(self): return self._cursor -class Environment: +class Environment(abc.ABC): + @abc.abstractmethod def handle(self): - raise NotImplementedError - - def cursor(self): - raise NotImplementedError + pass + @abc.abstractmethod def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse: - raise NotImplementedError + pass def get_binding_char(self) -> str: return "?" + @abc.abstractmethod def load_source(self, plugin_name: str, source_config: SourceConfig) -> str: - return "load_source_todo" + pass @classmethod def initialize_db(cls, creds: DuckDBCredentials): @@ -116,7 +117,10 @@ def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, Plugin]: if plugin.name in ret: raise Exception("Duplicate plugin name: " + plugin.name) else: - ret[plugin.name] = Plugin.create(plugin.impl, plugin.config or {}) + try: + ret[plugin.name] = Plugin.create(plugin.impl, plugin.config or {}) + except Exception as e: + raise Exception(f"Error attempting to create plugin {plugin.name}", e) return ret @classmethod @@ -156,7 +160,7 @@ def run_python_job(cls, con, load_df_function, identifier: str, compiled_code: s class LocalEnvironment(Environment): def __init__(self, credentials: DuckDBCredentials): self.conn = self.initialize_db(credentials) - self.plugins = self.initialize_plugins(credentials) + self._plugins = self.initialize_plugins(credentials) self.creds = credentials def handle(self): @@ -173,6 +177,15 @@ def ldf(table_name): self.run_python_job(con, ldf, parsed_model["alias"], compiled_code) return AdapterResponse(_message="OK") + def load_source(self, plugin_name: str, source_config: SourceConfig): + df = self._plugins[plugin_name].load(source_config) + assert df + handle = self.handle() + cursor = handle.cursor() + cursor.execute(f"CREATE OR REPLACE TABLE {source_config.table_name()} AS SELECT * FROM df") + cursor.close() + handle.close() + def close(self): if self.conn: self.conn.close() diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index bf9d202d..669ba12f 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -4,6 +4,13 @@ from typing import Dict from ..utils import SourceConfig +from dbt.dataclass_schema import dbtClassMixin + + +class PluginConfig(dbtClassMixin): + """A helper class for defining the configuration settings a particular plugin uses.""" + + pass class Plugin(abc.ABC): @@ -20,12 +27,6 @@ def create(cls, impl: str, config: Dict[str, Any]) -> "Plugin": def __init__(self, plugin_config: Dict): pass - @abc.abstractmethod def load(self, source_config: SourceConfig): - """Load data from a source config and return it as a string.""" - pass - - @abc.abstractmethod - def store(self, data, config: Dict) -> None: - """Store the given data using the provided config dictionary.""" - pass + """Load data from a source config and return it as a DataFrame-like object that DuckDB can read.""" + raise NotImplementedError diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py index 92f82867..e9a7199a 100644 --- a/dbt/adapters/duckdb/plugins/excel.py +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -1,13 +1,18 @@ +import pathlib from typing import Dict import pandas as pd from . import Plugin +from ..utils import SourceConfig class ExcelPlugin(Plugin): def __init__(self, config: Dict): self._config = config - def load_data(self, source_config) -> pd.DataFrame: - return pd.read_csv() + def load(self, source_config: SourceConfig): + ext_location = source_config.meta["external_location"] + ext_location = ext_location.format(**source_config.as_dict()) + source_location = pathlib.Path(ext_location.strip("'")) + return pd.read_excel(source_location) diff --git a/dbt/adapters/duckdb/plugins/gsheet.py b/dbt/adapters/duckdb/plugins/gsheet.py new file mode 100644 index 00000000..eee812a4 --- /dev/null +++ b/dbt/adapters/duckdb/plugins/gsheet.py @@ -0,0 +1,54 @@ +import tempfile +from dataclasses import dataclass +from typing import Dict +from typing import Literal + +import gspread +import pandas as pd + +from . import Plugin, PluginConfig +from ..utils import SourceConfig + + +@dataclass +class GSheetConfig(PluginConfig): + method: Literal["service", "oauth"] + + def client(self): + if self.method == "service": + return gspread.service_account() + else: + return gspread.oauth() + + +class GSheetPlugin(Plugin): + def __init__(self, config: Dict): + self._config = GSheetConfig.from_dict(config) + self._gc = self._config.client() + + def load(self, source_config: SourceConfig): + doc = None + if "title" in source_config.meta: + doc = self._gc.open(source_config.meta["title"]) + elif "key" in source_config.meta: + doc = self._gc.open_by_key(source_config.meta["key"]) + elif "url" in source_config.meta: + doc = self._gc.open_by_url(source_config.meta["url"]) + else: + raise Exception("Source config did not indicate a method to open a GSheet to read") + + sheet = None + if "worksheeet" in source_config.meta: + work_id = source_config.meta["worksheet"] + if isinstance(work_id, int): + sheet = doc.get_worksheet(work_id) + elif isinstance(work_id, str): + sheet = doc.worksheet(work_id) + else: + raise Exception( + f"Could not identify a worksheet in the doc from identifier: {work_id}" + ) + else: + sheet = doc.sheet1 + + return pd.DataFrame(sheet.get_all_records()) diff --git a/dbt/adapters/duckdb/plugins/iceberg.py b/dbt/adapters/duckdb/plugins/iceberg.py new file mode 100644 index 00000000..7c37f850 --- /dev/null +++ b/dbt/adapters/duckdb/plugins/iceberg.py @@ -0,0 +1,18 @@ +import tempfile +from typing import Dict + +from pyiceberg import catalog + +from . import Plugin +from ..utils import SourceConfig + + +class IcebergPlugin(Plugin): + def __init__(self, config: Dict): + self._catalog = catalog.load_catalog(config.get("name"), config.get("properties")) + + def load(self, source_config: SourceConfig): + table_format = source_config.meta.get("iceberg_table", "{schema}.{identifier}") + table_name = table_format.format(**source_config.as_dict()) + table = self._catalog.load_table(table_name) + return table.scan().to_arrow() # TODO: configurable scans diff --git a/dbt/adapters/duckdb/relation.py b/dbt/adapters/duckdb/relation.py index eb9c71a1..52265810 100644 --- a/dbt/adapters/duckdb/relation.py +++ b/dbt/adapters/duckdb/relation.py @@ -23,8 +23,7 @@ def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) # reference in the compiled model if "plugin" in source_config.meta: plugin_name = source_config.meta["plugin"] - source_name = DuckDBConnectionManager.env().load_source(plugin_name, source_config) - kwargs["external"] = source_name + DuckDBConnectionManager.env().load_source(plugin_name, source_config) elif "external_location" in source_config.meta: # Call str.format with the schema, name and identifier for the source so that they # can be injected into the string; this helps reduce boilerplate when all diff --git a/dbt/adapters/duckdb/utils.py b/dbt/adapters/duckdb/utils.py index 8a2062d1..2d1612e3 100644 --- a/dbt/adapters/duckdb/utils.py +++ b/dbt/adapters/duckdb/utils.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from typing import Any from typing import Dict +from typing import Optional from dbt.contracts.graph.nodes import SourceDefinition @@ -10,9 +11,12 @@ class SourceConfig: name: str identifier: str schema: str - database: str + database: Optional[str] meta: Dict[str, Any] + def table_name(self) -> str: + return f"{self.schema}.{self.identifier}" + def as_dict(self) -> Dict[str, Any]: base = { "name": self.name, diff --git a/dev-requirements.txt b/dev-requirements.txt index 561f418c..37779f80 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -15,11 +15,13 @@ flake8 flaky freezegun==1.2.2 fsspec +gspread ipdb mypy==1.1.1 pip-tools pre-commit psycopg2-binary +pyiceberg pytest pytest-dotenv pytest-logbook From 50e4c76fef62a704e9ec8e6cf5ff2370d7a3bd87 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 7 Apr 2023 19:05:54 -0700 Subject: [PATCH 04/18] commit those bits too --- dbt/adapters/duckdb/buenavista.py | 4 +--- dbt/adapters/duckdb/plugins/gsheet.py | 4 ++-- dbt/adapters/duckdb/plugins/iceberg.py | 1 - 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/dbt/adapters/duckdb/buenavista.py b/dbt/adapters/duckdb/buenavista.py index 929aca0a..e6ed4af6 100644 --- a/dbt/adapters/duckdb/buenavista.py +++ b/dbt/adapters/duckdb/buenavista.py @@ -46,7 +46,7 @@ def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> A handle.cursor().execute(json.dumps(payload)) return AdapterResponse(_message="OK") - def load_source(self, plugin_name: str, source_config: utils.SourceConfig) -> str: + def load_source(self, plugin_name: str, source_config: utils.SourceConfig): handle = self.handle() payload = { "method": "dbt_load_source", @@ -57,7 +57,5 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig) -> st } cursor = handle.cursor() cursor.execute(json.dumps(payload)) - res = cursor.fetchone() cursor.close() handle.close() - return res[0] diff --git a/dbt/adapters/duckdb/plugins/gsheet.py b/dbt/adapters/duckdb/plugins/gsheet.py index eee812a4..f1273c80 100644 --- a/dbt/adapters/duckdb/plugins/gsheet.py +++ b/dbt/adapters/duckdb/plugins/gsheet.py @@ -1,4 +1,3 @@ -import tempfile from dataclasses import dataclass from typing import Dict from typing import Literal @@ -6,7 +5,8 @@ import gspread import pandas as pd -from . import Plugin, PluginConfig +from . import Plugin +from . import PluginConfig from ..utils import SourceConfig diff --git a/dbt/adapters/duckdb/plugins/iceberg.py b/dbt/adapters/duckdb/plugins/iceberg.py index 7c37f850..9a4df79a 100644 --- a/dbt/adapters/duckdb/plugins/iceberg.py +++ b/dbt/adapters/duckdb/plugins/iceberg.py @@ -1,4 +1,3 @@ -import tempfile from typing import Dict from pyiceberg import catalog From 328c9af5613c62b903c2824c3cbfd3660be6437e Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 7 Apr 2023 19:22:52 -0700 Subject: [PATCH 05/18] Simplify the sources test a bit --- tests/functional/adapter/test_sources.py | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/tests/functional/adapter/test_sources.py b/tests/functional/adapter/test_sources.py index 500c3e10..0c1f9448 100644 --- a/tests/functional/adapter/test_sources.py +++ b/tests/functional/adapter/test_sources.py @@ -1,7 +1,6 @@ import os import pytest -import yaml from dbt.tests.util import run_dbt sources_schema_yml = """version: 2 @@ -35,14 +34,6 @@ class TestExternalSources: - @pytest.fixture(scope="class", autouse=True) - def setEnvVars(self): - os.environ["DBT_TEST_SCHEMA_NAME_VARIABLE"] = "test_run_schema" - - yield - - del os.environ["DBT_TEST_SCHEMA_NAME_VARIABLE"] - @pytest.fixture(scope="class") def models(self): return { @@ -51,13 +42,6 @@ def models(self): "multi_source_model.sql": models_multi_source_model_sql, } - def run_dbt_with_vars(self, project, cmd, *args, **kwargs): - vars_dict = { - "test_run_schema": project.test_schema, - } - cmd.extend(["--vars", yaml.safe_dump(vars_dict)]) - return run_dbt(cmd, *args, **kwargs) - @pytest.fixture(scope="class") def seeds_source_file(self): with open("/tmp/seeds_source_something.csv", "w") as f: @@ -73,7 +57,7 @@ def ost_file(self): os.unlink("/tmp/seeds_other_source_table.csv") def test_external_sources(self, seeds_source_file, ost_file, project): - results = self.run_dbt_with_vars(project, ["run"]) + results = run_dbt(["run"]) assert len(results) == 2 - test_results = self.run_dbt_with_vars(project, ["test"]) + test_results = run_dbt(["test"]) assert len(test_results) == 2 From 5c2319fcf315f1b2e10345502813811843b7dc0e Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 7 Apr 2023 20:59:32 -0700 Subject: [PATCH 06/18] Okay I think that's all of the basics --- .github/workflows/main.yml | 45 +++++++++++++++ dbt/adapters/duckdb/environments.py | 4 +- dbt/adapters/duckdb/plugins/__init__.py | 7 +++ dbt/adapters/duckdb/utils.py | 5 +- dev-requirements.txt | 1 + tests/conftest.py | 7 +++ tests/data/excel_file.xlsx | Bin 0 -> 5425 bytes tests/functional/plugins/test_excel.py | 70 ++++++++++++++++++++++++ tox.ini | 15 ++++- 9 files changed, 149 insertions(+), 5 deletions(-) create mode 100644 tests/data/excel_file.xlsx create mode 100644 tests/functional/plugins/test_excel.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 05bdde32..a9099967 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -290,6 +290,51 @@ jobs: name: fsspec_results_${{ matrix.python-version }}-${{ steps.date.outputs.date }}.csv path: fsspec_results.csv + plugins: + name: plugins test / python ${{ matrix.python-version }} + + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + python-version: ['3.9'] + + env: + TOXENV: "plugins" + PYTEST_ADDOPTS: "-v --color=yes --csv plugins_results.csv" + + steps: + - name: Check out the repository + uses: actions/checkout@v3 + with: + persist-credentials: false + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install python dependencies + run: | + python -m pip install tox + python -m pip --version + tox --version + + - name: Run tox + run: tox + + - name: Get current date + if: always() + id: date + run: echo "date=$(date +'%Y-%m-%dT%H_%M_%S')" >> $GITHUB_OUTPUT #no colons allowed for artifacts + + - uses: actions/upload-artifact@v3 + if: always() + with: + name: plugins_results_${{ matrix.python-version }}-${{ steps.date.outputs.date }}.csv + path: plugins_results.csv + build: name: build packages diff --git a/dbt/adapters/duckdb/environments.py b/dbt/adapters/duckdb/environments.py index d66991a7..da6e1aed 100644 --- a/dbt/adapters/duckdb/environments.py +++ b/dbt/adapters/duckdb/environments.py @@ -117,6 +117,8 @@ def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, Plugin]: if plugin.name in ret: raise Exception("Duplicate plugin name: " + plugin.name) else: + if plugin.impl in Plugin.WELL_KNOWN_PLUGINS: + plugin.impl = Plugin.WELL_KNOWN_PLUGINS[plugin.impl] try: ret[plugin.name] = Plugin.create(plugin.impl, plugin.config or {}) except Exception as e: @@ -179,7 +181,7 @@ def ldf(table_name): def load_source(self, plugin_name: str, source_config: SourceConfig): df = self._plugins[plugin_name].load(source_config) - assert df + assert df is not None handle = self.handle() cursor = handle.cursor() cursor.execute(f"CREATE OR REPLACE TABLE {source_config.table_name()} AS SELECT * FROM df") diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index 669ba12f..b6dd864b 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -14,6 +14,13 @@ class PluginConfig(dbtClassMixin): class Plugin(abc.ABC): + + WELL_KNOWN_PLUGINS = { + "excel": "dbt.adapters.duckdb.plugins.excel.ExcelPlugin", + "gsheet": "dbt.adapters.duckdb.plugins.gsheet.GSheetPlugin", + "iceberg": "dbt.adapters.duckdb.plugins.iceberg.IcebergPlugin", + } + @classmethod def create(cls, impl: str, config: Dict[str, Any]) -> "Plugin": module_name, class_name = impl.rsplit(".", 1) diff --git a/dbt/adapters/duckdb/utils.py b/dbt/adapters/duckdb/utils.py index 2d1612e3..c8e6d14d 100644 --- a/dbt/adapters/duckdb/utils.py +++ b/dbt/adapters/duckdb/utils.py @@ -15,7 +15,10 @@ class SourceConfig: meta: Dict[str, Any] def table_name(self) -> str: - return f"{self.schema}.{self.identifier}" + if self.database: + return ".".join([self.database, self.schema, self.identifier]) + else: + return ".".join([self.schema, self.identifier]) def as_dict(self) -> Dict[str, Any]: base = { diff --git a/dev-requirements.txt b/dev-requirements.txt index 37779f80..ce186774 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -18,6 +18,7 @@ fsspec gspread ipdb mypy==1.1.1 +openpyxl pip-tools pre-commit psycopg2-binary diff --git a/tests/conftest.py b/tests/conftest.py index 60a57a9f..a870f471 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import os import subprocess import time @@ -65,3 +66,9 @@ def skip_by_profile_type(profile_type, request): for skip_profile_type in request.node.get_closest_marker("skip_profile").args: if skip_profile_type == profile_type: pytest.skip(f"skipped on '{profile_type}' profile") + + +@pytest.fixture(scope="session") +def test_data_path(): + test_dir = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(test_dir, "data") \ No newline at end of file diff --git a/tests/data/excel_file.xlsx b/tests/data/excel_file.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..ffe1270865c0ab4e4a3b38b83ea2f56007c43384 GIT binary patch literal 5425 zcmaJ_1z6PG(g$hj6j(yKq?K5Zl1raDA-4blXcQY3&2Ul*cn|D>p zs3Hn4LFB%FOp>qXJJKiMy7s8CEP4N+jG-$F!IwM~?*6`^n1*G=7>No2$HRisnF~bL z-(#}nVq_uZh8%GT7v#Q$xp?qxK>Jg@VTKW1MMiuOtbDTD;pBdb6LKwpXVzGwOWzc@ zVqQ$u90^_k`-VB1n4w&QH-AKx{aCbJJdf9ngomZSDchEf^jtH&BP%?cH)J>Z^*$I&JiJIm$VwNxLpCqtjiqInpIZoOmOw`pQ+f2~#%l zsx3w3+o0{#`*9XqaU^vm3{30tkj7<%+ah<65aa&^3<2U7U+Ij6mwd8uPl^yj3X_toJuhon;cAf2nANCUVX)c;xk38S6Bb0Ir zdojBf-j9T95IL&&IcvLx!~h(x3lCF|rYI1a;c&-(C}EDlt8}MV(QOE-0J8-i0Kb;? zPS%sOeIR7!(qju93Q@>8fI9gP6f{s{ zJR8(i!UeXiMCFp*m8U9qNwhan$v8Kq2OfRN2^Rcl$Dtdu^N7|7B^PblD)MmuB^G~PdH-vQD=&WoT(kJ_yM-%3w%$%*&rpUW zslAa--?oAmV_$E4CmI(Wwsr<4ezeT70%xWa{CsejdxpfwsWrTq7#|WNSdrH(79TOO z@5FJnA%vB18mA1Q=0>9Qa9F?&ZM! zcg%T0T%KA&AWv^V|2zJYRh~j`kiR6Ah4(06fZ~MO<0NY?eyCvCxGcMpL`;eX6jjS+23kUQHMj?l6h3An50h$D8KKlWaj3Vtc3r;#&EVph z75e7#^y6X;yU--_k837cE^~16VVpRgu0LKEr>CA}fYpTNY6^H_O_^EK?E7Ftu;#1O z19mPLkB=H(%2d@XF8JNKK}PN1)MokTmO&j(eZ zJOnauKbf`1?i1z3QVQi{h7x0Uqi1|I&}Op53GEx-eeFXy(f&!RHJ*CkC`ubkDs~Bb)K!tBhZ|w z16O<&7P=0O2p{S@uXo-uMscT1-u1+qK6?5&Pi~3_^{MSr4k+D?@S3oXncEMd%+3`p zG`m>$mB}mq{o11~ve|cu>;-Sk91e)tscENb9;}9z|9w@kurUssB2I1s%Ac+Z^1t`z z=FGaHLv!xl7x+<*KPzeG~r8t#XqE+DNV1)9^P5?B`d10T*DK4Z0 zL7678KrDqh;y{qBCrX?sCmI~kw%nRT^>}ernX%9Ayknkp^hwJ3#znGqD-xS&o=z_l zT5>||?oY!9>l2*d5B!8FX?#88HZLCpZ4WX1D2&Q|QhgjdaS@QfryQ5S&F3Tsd#y`{ZARa`N=a57+Pz6R<8XE2yBmCF`uZ%tKIORx zr`6^Uc$K|YLq)fMtvwxkV7!$HTElX|hSUD1l?ar#XNvwXAh@&=T%5JVt&q9WzA1dh zrbphr!jZh8NK;^Az}{?|a#T!y%)OFF?Cujlgswov4s*wQ;T0l8iATO5REd`-wowuD zv5mPw$FYxMqV&VO)uYh*MM6QZc>W$aqj>$IA@2qyb|DI=+~ABlN}~G0pmk#a85K*gLEOQRWf-O4oA(-}=~xoI z=$NzP#MF1zc~FtF0_^W&dt43?pwZwwj-iE{CNTi(sFFLr1R-AUQTr`|f{yulCEg>S znFk>al%T&08dOds3X=AylEYdtovD=bvwmzD514&ksBd;(=o*BAnw=Mo7u;7U8`k$( zk)?t<@uz#^L}8foq~Kt3wn4&^Rdg~T#S_C&;)enfL8UjVkT*?-N|0s0efdhxmE*A@ z0n_6;ss;YR@u{ZZRq?H@26SvH!2SEMqKbQfmKVr(zK?y9TChRq8mq$5SP|me9v7*j zc^nToxYSTHyYE-eC$U`j0_GoRV51(*>IAN{oK>3c#cO z9Nk5m1Z5S|+&V%#(~e?=(fOo{_po8+=pEO%4AFI97ZtPE8M6lCto&GqZ7r@viT0jms*LL@e(M{*@)*|s^6!%qP@%Jc(ZOC%(-#b zL0Hdxu=$SB29Rn40^~RPDNPGUrANLJK_DOploQ z2G`u7w<%gQS!u3O%tbQe(m>c_D5vo)GylR_M;wR@aG zu->NSj0Q@t=?rJS8J4ydn;l|HwI_@t+Jib91@qgzDzwvADIYZ(#(E?@;c5<}axfV} zWqzfpjRyQTd8v)n?IxXbVZ3W64Olwzsmrkl5o-MVaUaX+7CL(LYJsJR3RSMC8Ca`Lm z^p~;cK2Pb9lGlGm8mML`;+IuIJAJMKq#AozC9)R*Q8(=XRE!FW>!x=4-WRUU9`#!F z%NJ%3xDLtHrLg>7)O8wswr{_@3+<13^zxcaW4c>oX z6d*eQ9SNzQ?N12?_umD8o4142UzvYb-vpAygUBj1Z!S7Aix?eQ^0xF!6rSodl-1kM zg<_*uQUWMmtDJ4yfbJ(%&vh|#Y8;38`}yyEF$O&F?K_SOW}(MsZ^Fm$#X}8?q%ROX z{IcM7bVv{)WkF-XEOhuKgycZz=Q64Nhki%Js5_a#_r#Q6z{Kga!`KF%y=8If`Y!(f z9PwfghDzGZD8xQg6qldB8mAG#78%@U^NCRl4YMbXgBA~fIop>m{}j+lpx?qzXvdbz zUCrgB+97o^xU3=g^N1)XxA5GTIKJF1_#Sr05HN%cvuy=U)S|>7JB7-o`-d81&yWP~ z2&<9)Gkwju=*J}@0D;R7c&x$<)|HK<>rj1VifxiDVM#f}Xv)$$ioyE@dPBP^1XqQ?Am}pjGXX%|BOBVpR?l8vSHR zkE38fj-8!GN4*&~9%ch(IxxH|BcR&O3#Mu_l2qQ12^dRYGRoJBdq1;43%!xK=9Z`H7m$SV4} zDIP1iyW0AAsBz@bX$f>88c@|%wZUBsnSa><%4&V(cwN0bzcE(|xAYyY{=QVO4|}BT zzhkIJJ{$?bF=FLOmlSK`OLwTW`V5WZlkuLywh0eN){1R#shY@+egJ(mXg-<$$?Q40 zpJ%{B?Nz)fFWmB%sbP_^5#RElYt^HZwLLUd16b9Zi~+(Wr>)~ z4N;TLmZoVNG8of?C{VsDd^uanibLj2lgL+ijD*w#6>3$x&y8y+w-3IV%S$W;h9_8= zg?_Bn?;O0Mk-_z8-G($94xk%L3Cu-&F}e)NoFx=u2}kbQDiF<4yeh!#e(^ctIcQ~I zCFyDzltHmCRo@xH@(dq{zL8qgB;oHOigZ>_RDaEOdgjoM6`Imk(1;JGGs~^Y zPrK(8P%D(<;%O_$$<|!T^;MH#QoL7^O_+PM$kstQH498;&f>ck0P2p2!B03EDbe~V zI(AV@S$yOTX>z?{=^k5THWJaLN>2;+f_#1HTwu|fxdFjqm@$jYvBAdq1;s zHMuF9+fzR%L3v1e_s=;Lt0zj1@6+#k5QuKQ_fE@btIBsIT8ynb-}f)e%EO$WJ(y5G z>ez=brYG5potV5sX52R6ERkX3do~Byo^J3m5L;`s6wA^cbVPzRO>1ywl0$P+ivXLX zq+&scBKEtcQaZ&T!(7QruZMv$`q-7xsw&TT4pL#TgAECMTpo&QI#f46Wu zSKm;+U&4XN;0O!q*3<@R-Vlh=Pq9M0eC_)nJqUG;WE-Q0)2BoQ(E_X7U89e-E8 jeKKx>?w8mgcICf>pt=$o!Yw2uEW{&@*s0G%H=}<6lUpq0 literal 0 HcmV?d00001 diff --git a/tests/functional/plugins/test_excel.py b/tests/functional/plugins/test_excel.py new file mode 100644 index 00000000..46bfe690 --- /dev/null +++ b/tests/functional/plugins/test_excel.py @@ -0,0 +1,70 @@ +import pytest + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +sources_schema_yml = """ +version: 2 +sources: + - name: excel_source + schema: main + meta: + plugin: excel + tables: + - name: excel_file + description: "An excel file" + meta: + external_location: "{test_data_path}/excel_file.xlsx" +""" + +models_source_model_sql = """ + select * from {{ source('excel_source', 'excel_file') }} +""" + + +@pytest.mark.skip_profile("buenavista") +class TestExcelPlugin: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + if "path" not in dbt_profile_target: + return {} + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target["path"], + "plugins": [{"name": "excel", "impl": "excel"}], + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self, test_data_path): + return { + "schema.yml": sources_schema_yml.format(test_data_path=test_data_path), + "source_model.sql": models_source_model_sql, + "source_model2.sql": models_source_model_sql, + "source_model3.sql": models_source_model_sql, + "source_model4.sql": models_source_model_sql, + } + + def test_excel_plugin(self, project): + results = run_dbt() + assert len(results) == 4 + + # relations_equal + check_relations_equal( + project.adapter, + [ + "excel_file", + "source_model", + "source_model2", + "source_model3", + "source_model4", + ], + ) diff --git a/tox.ini b/tox.ini index 58c0a094..eaea029f 100644 --- a/tox.ini +++ b/tox.ini @@ -13,7 +13,7 @@ deps = [testenv:{functional,py37,py38,py39,py310,py311,py}] -description = adapter plugin functional testing +description = adapter functional testing skip_install = True passenv = * commands = {envpython} -m pytest {posargs} tests/functional/adapter @@ -22,7 +22,7 @@ deps = -e. [testenv:{filebased,py37,py38,py39,py310,py311,py}] -description = adapter plugin functional testing using file-based DBs +description = adapter functional testing using file-based DBs skip_install = True passenv = * commands = {envpython} -m pytest --profile=file {posargs} tests/functional/adapter @@ -31,10 +31,19 @@ deps = -e. [testenv:{fsspec,py37,py38,py39,py310,py311,py}] -description = adapter plugin fsspec testing +description = adapter fsspec testing skip_install = True passenv = * commands = {envpython} -m pytest {posargs} tests/functional/fsspec deps = -rdev-requirements.txt -e. + +[testenv:{plugins,py37,py38,py39,py310,py311,py}] +description = adapter plugin testing +skip_install = True +passenv = * +commands = {envpython} -m pytest {posargs} tests/functional/plugins +deps = + -rdev-requirements.txt + -e. From 58eafafa0166c53bf5ff5f99cbc10425b795e2bc Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 7 Apr 2023 21:44:56 -0700 Subject: [PATCH 07/18] Only install pyiceberg in the plugins test until we stop support for 3.7 --- dev-requirements.txt | 1 - tox.ini | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index ce186774..d8b4e05c 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -22,7 +22,6 @@ openpyxl pip-tools pre-commit psycopg2-binary -pyiceberg pytest pytest-dotenv pytest-logbook diff --git a/tox.ini b/tox.ini index eaea029f..bcd4314a 100644 --- a/tox.ini +++ b/tox.ini @@ -46,4 +46,5 @@ passenv = * commands = {envpython} -m pytest {posargs} tests/functional/plugins deps = -rdev-requirements.txt + pyiceberg -e. From e017af8dd678e62b388ad6d864f716a37753b7ab Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 10 Apr 2023 10:11:43 -0700 Subject: [PATCH 08/18] Fixing bugs and adding a test case (that only runs on my machine) for the gsheet plugin --- dbt/adapters/duckdb/plugins/gsheet.py | 2 +- tests/functional/plugins/test_gsheet.py | 79 +++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 tests/functional/plugins/test_gsheet.py diff --git a/dbt/adapters/duckdb/plugins/gsheet.py b/dbt/adapters/duckdb/plugins/gsheet.py index f1273c80..1b2621cc 100644 --- a/dbt/adapters/duckdb/plugins/gsheet.py +++ b/dbt/adapters/duckdb/plugins/gsheet.py @@ -38,7 +38,7 @@ def load(self, source_config: SourceConfig): raise Exception("Source config did not indicate a method to open a GSheet to read") sheet = None - if "worksheeet" in source_config.meta: + if "worksheet" in source_config.meta: work_id = source_config.meta["worksheet"] if isinstance(work_id, int): sheet = doc.get_worksheet(work_id) diff --git a/tests/functional/plugins/test_gsheet.py b/tests/functional/plugins/test_gsheet.py new file mode 100644 index 00000000..1c6ab273 --- /dev/null +++ b/tests/functional/plugins/test_gsheet.py @@ -0,0 +1,79 @@ +import pytest + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +sources_schema_yml = """ +version: 2 +sources: + - name: gsheet_source + schema: main + meta: + plugin: gsheet + title: "Josh's Test Spreadsheet" + tables: + - name: gsheet1 + description: "My first sheet" + - name: gsheet2 + description: "The second sheet in the doc" + meta: + worksheet: "TwoSheet" +""" + +models_source_model1_sql = """ + select * from {{ source('gsheet_source', 'gsheet1') }} +""" +models_source_model2_sql = """ + select * from {{ source('gsheet_source', 'gsheet2') }} +""" + +@pytest.mark.skip +class TestGSheetPlugin: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + config = {"method": "oauth"} + if "path" not in dbt_profile_target: + return {} + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target["path"], + "plugins": [{"name": "gsheet", "impl": "gsheet", "config": config}], + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self, test_data_path): + return { + "schema.yml": sources_schema_yml.format(test_data_path=test_data_path), + "source_model1.sql": models_source_model1_sql, + "source_model2.sql": models_source_model2_sql, + } + + def test_gshseet_plugin(self, project): + results = run_dbt() + assert len(results) == 2 + + check_relations_equal( + project.adapter, + [ + "gsheet1", + "source_model1", + ], + ) + + check_relations_equal( + project.adapter, + [ + "gsheet2", + "source_model2", + ], + ) + From 4f69a8c15835a91f1251f544bb366ef8471f4b7e Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Tue, 11 Apr 2023 20:21:02 -0700 Subject: [PATCH 09/18] Add in a SQLAlchemy plugin --- dbt/adapters/duckdb/plugins/__init__.py | 2 +- dbt/adapters/duckdb/plugins/sqlalchemy.py | 17 +++++ tests/functional/plugins/test_sqlalchemy.py | 83 +++++++++++++++++++++ 3 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 dbt/adapters/duckdb/plugins/sqlalchemy.py create mode 100644 tests/functional/plugins/test_sqlalchemy.py diff --git a/dbt/adapters/duckdb/plugins/__init__.py b/dbt/adapters/duckdb/plugins/__init__.py index b6dd864b..ed37c726 100644 --- a/dbt/adapters/duckdb/plugins/__init__.py +++ b/dbt/adapters/duckdb/plugins/__init__.py @@ -14,11 +14,11 @@ class PluginConfig(dbtClassMixin): class Plugin(abc.ABC): - WELL_KNOWN_PLUGINS = { "excel": "dbt.adapters.duckdb.plugins.excel.ExcelPlugin", "gsheet": "dbt.adapters.duckdb.plugins.gsheet.GSheetPlugin", "iceberg": "dbt.adapters.duckdb.plugins.iceberg.IcebergPlugin", + "sqlalchemy": "dbt.adapters.duckdb.plugins.sqlalchemy.SQLAlchemyPlugin", } @classmethod diff --git a/dbt/adapters/duckdb/plugins/sqlalchemy.py b/dbt/adapters/duckdb/plugins/sqlalchemy.py new file mode 100644 index 00000000..d55f918a --- /dev/null +++ b/dbt/adapters/duckdb/plugins/sqlalchemy.py @@ -0,0 +1,17 @@ +from typing import Dict, Any +import pandas as pd +from sqlalchemy import create_engine + +from . import Plugin +from ..utils import SourceConfig + + +class SQLAlchemyPlugin(Plugin): + def __init__(self, plugin_config: Dict[str, Any]): + self.engine = create_engine(plugin_config["connection_url"]) + + def load(self, source_config: SourceConfig) -> pd.DataFrame: + query = source_config.meta["query"] + query = query.format(**source_config.as_dict()) + params = source_config.meta.get("params", {}) + return pd.read_sql_query(query, con=self.engine, params=params) diff --git a/tests/functional/plugins/test_sqlalchemy.py b/tests/functional/plugins/test_sqlalchemy.py new file mode 100644 index 00000000..d588051d --- /dev/null +++ b/tests/functional/plugins/test_sqlalchemy.py @@ -0,0 +1,83 @@ +import os +import pytest +import sqlite3 + +from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +sources_schema_yml = """ +version: 2 +sources: + - name: sql_source + schema: main + meta: + plugin: sql + query: "SELECT * FROM {identifier} WHERE id=:id" + tables: + - name: test_table + description: "My first SQLAlchemy table" + meta: + params: + id: 1 +""" + +models_source_model1_sql = """ + select * from {{ source('sql_source', 'test_table') }} +""" + + +class TestSQLAlchemyPlugin: + @pytest.fixture(scope="class") + def sqlite_test_db(self): + path = "/tmp/satest.db" + db = sqlite3.connect(path) + cursor = db.cursor() + cursor.execute("CREATE TABLE test_table (id int, name text)") + cursor.execute("INSERT INTO test_table VALUES (1, 'John Doe')") + cursor.execute("INSERT INTO test_table VALUES (2, 'Jane Smith')") + cursor.close() + db.close() + yield path + os.unlink(path) + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target, sqlite_test_db): + config = {"connection_url": f"sqlite:///{sqlite_test_db}"} + if "path" not in dbt_profile_target: + return {} + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target["path"], + "plugins": [ + {"name": "sql", "impl": "sqlalchemy", "config": config} + ], + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self): + return { + "schema.yml": sources_schema_yml, + "source_model1.sql": models_source_model1_sql, + } + + def test_sqlalchemy_plugin(self, project): + results = run_dbt() + assert len(results) == 1 + + check_relations_equal( + project.adapter, + [ + "test_table", + "source_model1", + ], + ) From aad97bf0296661ce5fb5abea5af3e5eaefc88335 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Tue, 11 Apr 2023 20:36:42 -0700 Subject: [PATCH 10/18] this is why gpt3.5 isn't allowed to write code anymore --- dbt/adapters/duckdb/plugins/sqlalchemy.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/duckdb/plugins/sqlalchemy.py b/dbt/adapters/duckdb/plugins/sqlalchemy.py index d55f918a..e7daa6ba 100644 --- a/dbt/adapters/duckdb/plugins/sqlalchemy.py +++ b/dbt/adapters/duckdb/plugins/sqlalchemy.py @@ -1,4 +1,6 @@ -from typing import Dict, Any +from typing import Any +from typing import Dict + import pandas as pd from sqlalchemy import create_engine From fe8b9d191ef380f56f199057d8ec086372798dd4 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Tue, 11 Apr 2023 20:39:45 -0700 Subject: [PATCH 11/18] Add in sqlalchemy dev dep --- dev-requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/dev-requirements.txt b/dev-requirements.txt index a759d514..992ac7ea 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -29,6 +29,7 @@ pytest-csv pytest-xdist pytest-mock pytz +sqlalchemy tox>=3.13 twine wheel From 0ada3969cea61802b73cc0130fa0e42f3498c486 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Wed, 12 Apr 2023 10:31:33 -0700 Subject: [PATCH 12/18] really shouldn't ever let gpt3.5 write code, lesson learned --- dbt/adapters/duckdb/plugins/sqlalchemy.py | 18 ++++++++--- tests/functional/plugins/test_sqlalchemy.py | 35 ++++++++++++++++----- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/sqlalchemy.py b/dbt/adapters/duckdb/plugins/sqlalchemy.py index e7daa6ba..e9b7b2e4 100644 --- a/dbt/adapters/duckdb/plugins/sqlalchemy.py +++ b/dbt/adapters/duckdb/plugins/sqlalchemy.py @@ -3,6 +3,7 @@ import pandas as pd from sqlalchemy import create_engine +from sqlalchemy import text from . import Plugin from ..utils import SourceConfig @@ -13,7 +14,16 @@ def __init__(self, plugin_config: Dict[str, Any]): self.engine = create_engine(plugin_config["connection_url"]) def load(self, source_config: SourceConfig) -> pd.DataFrame: - query = source_config.meta["query"] - query = query.format(**source_config.as_dict()) - params = source_config.meta.get("params", {}) - return pd.read_sql_query(query, con=self.engine, params=params) + if "query" in source_config.meta: + query = source_config.meta["query"] + query = query.format(**source_config.as_dict()) + params = source_config.meta.get("params", {}) + with self.engine.connect() as conn: + return pd.read_sql_query(text(query), con=conn, params=params) + else: + if "table" in source_config.meta: + table = source_config.meta["table"] + else: + table = source_config.table_name() + with self.engine.connect() as conn: + return pd.read_sql_table(table, con=conn) diff --git a/tests/functional/plugins/test_sqlalchemy.py b/tests/functional/plugins/test_sqlalchemy.py index d588051d..c3f73fc0 100644 --- a/tests/functional/plugins/test_sqlalchemy.py +++ b/tests/functional/plugins/test_sqlalchemy.py @@ -15,30 +15,40 @@ schema: main meta: plugin: sql - query: "SELECT * FROM {identifier} WHERE id=:id" tables: - - name: test_table + - name: tt1 description: "My first SQLAlchemy table" meta: + query: "SELECT * FROM {identifier} WHERE id=:id" params: id: 1 + - name: tt2 + meta: + table: "test_table2" """ models_source_model1_sql = """ - select * from {{ source('sql_source', 'test_table') }} + select * from {{ source('sql_source', 'tt1') }} +""" +models_source_model2_sql = """ + select * from {{ source('sql_source', 'tt2') }} """ +@pytest.mark.skip_profile("buenavista") class TestSQLAlchemyPlugin: @pytest.fixture(scope="class") def sqlite_test_db(self): path = "/tmp/satest.db" db = sqlite3.connect(path) cursor = db.cursor() - cursor.execute("CREATE TABLE test_table (id int, name text)") - cursor.execute("INSERT INTO test_table VALUES (1, 'John Doe')") - cursor.execute("INSERT INTO test_table VALUES (2, 'Jane Smith')") + cursor.execute("CREATE TABLE tt1 (id int, name text)") + cursor.execute("INSERT INTO tt1 VALUES (1, 'John Doe')") + cursor.execute("INSERT INTO tt1 VALUES (2, 'Jane Smith')") + cursor.execute("CREATE TABLE test_table2 (a int, b int, c int)") + cursor.execute("INSERT INTO test_table2 VALUES (1, 2, 3), (4, 5, 6)") cursor.close() + db.commit() db.close() yield path os.unlink(path) @@ -68,16 +78,25 @@ def models(self): return { "schema.yml": sources_schema_yml, "source_model1.sql": models_source_model1_sql, + "source_model2.sql": models_source_model2_sql, } def test_sqlalchemy_plugin(self, project): results = run_dbt() - assert len(results) == 1 + assert len(results) == 2 check_relations_equal( project.adapter, [ - "test_table", + "tt1", "source_model1", ], ) + + check_relations_equal( + project.adapter, + [ + "tt2", + "source_model2", + ], + ) From a48d223d26ac3d34113d8ed000b5ba3999138176 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Wed, 12 Apr 2023 17:30:01 -0700 Subject: [PATCH 13/18] try to get some more detail on what is going wrong here since this works locally --- .gitignore | 1 + dbt/adapters/duckdb/environments.py | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/.gitignore b/.gitignore index 9535a289..7f94f1ff 100644 --- a/.gitignore +++ b/.gitignore @@ -77,3 +77,4 @@ target/ .DS_Store .idea/ +.vscode/ diff --git a/dbt/adapters/duckdb/environments.py b/dbt/adapters/duckdb/environments.py index da6e1aed..464c0cf6 100644 --- a/dbt/adapters/duckdb/environments.py +++ b/dbt/adapters/duckdb/environments.py @@ -180,6 +180,11 @@ def ldf(table_name): return AdapterResponse(_message="OK") def load_source(self, plugin_name: str, source_config: SourceConfig): + if plugin_name not in self._plugins: + raise Exception( + f"Plugin {plugin_name} not found; known plugins are: " + + ",".join(self._plugins.keys()) + ) df = self._plugins[plugin_name].load(source_config) assert df is not None handle = self.handle() From 0dc41b8b2717b70085df08e38c940e5f0c9b4ca4 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Wed, 12 Apr 2023 17:58:32 -0700 Subject: [PATCH 14/18] rm unused import --- tests/functional/plugins/test_sqlalchemy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/functional/plugins/test_sqlalchemy.py b/tests/functional/plugins/test_sqlalchemy.py index c3f73fc0..421aa0ac 100644 --- a/tests/functional/plugins/test_sqlalchemy.py +++ b/tests/functional/plugins/test_sqlalchemy.py @@ -2,7 +2,6 @@ import pytest import sqlite3 -from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table from dbt.tests.util import ( check_relations_equal, run_dbt, From 3f0a1536a170fff389f00264f9bf2287667a553f Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Thu, 13 Apr 2023 04:16:34 +0000 Subject: [PATCH 15/18] Apparently you can't have multiple tests in a suite that both override profiles_config_update, or something --- tests/functional/plugins/test_excel.py | 70 ------------------- .../{test_sqlalchemy.py => test_plugins.py} | 67 +++++++++++++----- 2 files changed, 51 insertions(+), 86 deletions(-) delete mode 100644 tests/functional/plugins/test_excel.py rename tests/functional/plugins/{test_sqlalchemy.py => test_plugins.py} (61%) diff --git a/tests/functional/plugins/test_excel.py b/tests/functional/plugins/test_excel.py deleted file mode 100644 index 46bfe690..00000000 --- a/tests/functional/plugins/test_excel.py +++ /dev/null @@ -1,70 +0,0 @@ -import pytest - -from dbt.tests.util import ( - check_relations_equal, - run_dbt, -) - -sources_schema_yml = """ -version: 2 -sources: - - name: excel_source - schema: main - meta: - plugin: excel - tables: - - name: excel_file - description: "An excel file" - meta: - external_location: "{test_data_path}/excel_file.xlsx" -""" - -models_source_model_sql = """ - select * from {{ source('excel_source', 'excel_file') }} -""" - - -@pytest.mark.skip_profile("buenavista") -class TestExcelPlugin: - @pytest.fixture(scope="class") - def profiles_config_update(self, dbt_profile_target): - if "path" not in dbt_profile_target: - return {} - return { - "test": { - "outputs": { - "dev": { - "type": "duckdb", - "path": dbt_profile_target["path"], - "plugins": [{"name": "excel", "impl": "excel"}], - } - }, - "target": "dev", - } - } - - @pytest.fixture(scope="class") - def models(self, test_data_path): - return { - "schema.yml": sources_schema_yml.format(test_data_path=test_data_path), - "source_model.sql": models_source_model_sql, - "source_model2.sql": models_source_model_sql, - "source_model3.sql": models_source_model_sql, - "source_model4.sql": models_source_model_sql, - } - - def test_excel_plugin(self, project): - results = run_dbt() - assert len(results) == 4 - - # relations_equal - check_relations_equal( - project.adapter, - [ - "excel_file", - "source_model", - "source_model2", - "source_model3", - "source_model4", - ], - ) diff --git a/tests/functional/plugins/test_sqlalchemy.py b/tests/functional/plugins/test_plugins.py similarity index 61% rename from tests/functional/plugins/test_sqlalchemy.py rename to tests/functional/plugins/test_plugins.py index 421aa0ac..591e031d 100644 --- a/tests/functional/plugins/test_sqlalchemy.py +++ b/tests/functional/plugins/test_plugins.py @@ -7,7 +7,21 @@ run_dbt, ) -sources_schema_yml = """ +excel_schema_yml = """ +version: 2 +sources: + - name: excel_source + schema: main + meta: + plugin: excel + tables: + - name: excel_file + description: "An excel file" + meta: + external_location: "{test_data_path}/excel_file.xlsx" +""" + +sqlalchemy_schema_yml = """ version: 2 sources: - name: sql_source @@ -26,16 +40,20 @@ table: "test_table2" """ -models_source_model1_sql = """ + +excel1_sql = """ + select * from {{ source('excel_source', 'excel_file') }} +""" +sqlalchemy1_sql = """ select * from {{ source('sql_source', 'tt1') }} """ -models_source_model2_sql = """ +sqlalchemy2_sql = """ select * from {{ source('sql_source', 'tt2') }} """ @pytest.mark.skip_profile("buenavista") -class TestSQLAlchemyPlugin: +class TestPlugins: @pytest.fixture(scope="class") def sqlite_test_db(self): path = "/tmp/satest.db" @@ -54,18 +72,22 @@ def sqlite_test_db(self): @pytest.fixture(scope="class") def profiles_config_update(self, dbt_profile_target, sqlite_test_db): - config = {"connection_url": f"sqlite:///{sqlite_test_db}"} if "path" not in dbt_profile_target: return {} + + config = {"connection_url": f"sqlite:///{sqlite_test_db}"} + plugins = [ + {"name": "excel", "impl": "excel"}, + {"name": "sql", "impl": "sqlalchemy", "config": config}, + ] + return { "test": { "outputs": { "dev": { "type": "duckdb", "path": dbt_profile_target["path"], - "plugins": [ - {"name": "sql", "impl": "sqlalchemy", "config": config} - ], + "plugins": plugins, } }, "target": "dev", @@ -73,29 +95,42 @@ def profiles_config_update(self, dbt_profile_target, sqlite_test_db): } @pytest.fixture(scope="class") - def models(self): + def models(self, test_data_path): return { - "schema.yml": sources_schema_yml, - "source_model1.sql": models_source_model1_sql, - "source_model2.sql": models_source_model2_sql, + "schema_excel.yml": excel_schema_yml.format(test_data_path=test_data_path), + "schema_sqlalchemy.yml": sqlalchemy_schema_yml, + "excel.sql": excel1_sql, + "sqlalchemy1.sql": sqlalchemy1_sql, + "sqlalchemy2.sql": sqlalchemy2_sql, } - def test_sqlalchemy_plugin(self, project): + def test_plugins(self, project): results = run_dbt() - assert len(results) == 2 + assert len(results) == 3 + + # relations_equal + check_relations_equal( + project.adapter, + [ + "excel_file", + "excel", + ], + ) + # relations_equal check_relations_equal( project.adapter, [ "tt1", - "source_model1", + "sqlalchemy1", ], ) + # relations_equal check_relations_equal( project.adapter, [ "tt2", - "source_model2", + "sqlalchemy2", ], ) From 7acb6802386f05c066373e1afd4978d1ca481469 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Wed, 12 Apr 2023 21:51:44 -0700 Subject: [PATCH 16/18] Add in the sheet_name parameter and some more test checks --- dbt/adapters/duckdb/plugins/excel.py | 3 ++- tests/functional/plugins/test_plugins.py | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/duckdb/plugins/excel.py b/dbt/adapters/duckdb/plugins/excel.py index e9a7199a..582527c9 100644 --- a/dbt/adapters/duckdb/plugins/excel.py +++ b/dbt/adapters/duckdb/plugins/excel.py @@ -15,4 +15,5 @@ def load(self, source_config: SourceConfig): ext_location = source_config.meta["external_location"] ext_location = ext_location.format(**source_config.as_dict()) source_location = pathlib.Path(ext_location.strip("'")) - return pd.read_excel(source_location) + sheet_name = source_config.meta.get("sheet_name", 0) + return pd.read_excel(source_location, sheet_name=sheet_name) diff --git a/tests/functional/plugins/test_plugins.py b/tests/functional/plugins/test_plugins.py index 591e031d..dbaca109 100644 --- a/tests/functional/plugins/test_plugins.py +++ b/tests/functional/plugins/test_plugins.py @@ -108,7 +108,9 @@ def test_plugins(self, project): results = run_dbt() assert len(results) == 3 - # relations_equal + res = project.run_sql("SELECT COUNT(1) FROM excel_file", fetch="one") + assert res[0] == 9 + check_relations_equal( project.adapter, [ @@ -117,7 +119,8 @@ def test_plugins(self, project): ], ) - # relations_equal + res = project.run_sql("SELECT COUNT(1) FROM tt1", fetch="one") + assert res[0] == 1 check_relations_equal( project.adapter, [ @@ -126,7 +129,8 @@ def test_plugins(self, project): ], ) - # relations_equal + res = project.run_sql("SELECT COUNT(1) FROM tt2", fetch="one") + assert res[0] == 2 check_relations_equal( project.adapter, [ From 1ab241f598acd203b6dc3299a70f80ab54e6f5ff Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Thu, 13 Apr 2023 19:52:12 -0700 Subject: [PATCH 17/18] Add in iceberg tests and some additional functionality for scan filtering --- dbt/adapters/duckdb/plugins/iceberg.py | 13 ++++- tests/functional/plugins/test_gsheet.py | 10 +++- tests/functional/plugins/test_iceberg.py | 71 ++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 tests/functional/plugins/test_iceberg.py diff --git a/dbt/adapters/duckdb/plugins/iceberg.py b/dbt/adapters/duckdb/plugins/iceberg.py index 9a4df79a..ffc5cb3c 100644 --- a/dbt/adapters/duckdb/plugins/iceberg.py +++ b/dbt/adapters/duckdb/plugins/iceberg.py @@ -8,10 +8,19 @@ class IcebergPlugin(Plugin): def __init__(self, config: Dict): - self._catalog = catalog.load_catalog(config.get("name"), config.get("properties")) + self._catalog = catalog.load_catalog(config.get("catalog")) def load(self, source_config: SourceConfig): table_format = source_config.meta.get("iceberg_table", "{schema}.{identifier}") table_name = table_format.format(**source_config.as_dict()) table = self._catalog.load_table(table_name) - return table.scan().to_arrow() # TODO: configurable scans + scan_keys = { + "row_filter", + "selected_fields", + "case_sensitive", + "snapshot_id", + "options", + "limit", + } + scan_config = {k: source_config.meta[k] for k in scan_keys if k in source_config.meta} + return table.scan(**scan_config).to_arrow() diff --git a/tests/functional/plugins/test_gsheet.py b/tests/functional/plugins/test_gsheet.py index 1c6ab273..a7a5ab11 100644 --- a/tests/functional/plugins/test_gsheet.py +++ b/tests/functional/plugins/test_gsheet.py @@ -29,6 +29,11 @@ select * from {{ source('gsheet_source', 'gsheet2') }} """ + +# Skipping this b/c it requires using my (@jwills) personal creds +# when testing it locally and also b/c I think there is something +# wrong with profiles_config_update since it can't be used in multiple +# tests in the same pytest session @pytest.mark.skip class TestGSheetPlugin: @pytest.fixture(scope="class") @@ -42,7 +47,9 @@ def profiles_config_update(self, dbt_profile_target): "dev": { "type": "duckdb", "path": dbt_profile_target["path"], - "plugins": [{"name": "gsheet", "impl": "gsheet", "config": config}], + "plugins": [ + {"name": "gsheet", "impl": "gsheet", "config": config} + ], } }, "target": "dev", @@ -76,4 +83,3 @@ def test_gshseet_plugin(self, project): "source_model2", ], ) - diff --git a/tests/functional/plugins/test_iceberg.py b/tests/functional/plugins/test_iceberg.py new file mode 100644 index 00000000..79a1c21c --- /dev/null +++ b/tests/functional/plugins/test_iceberg.py @@ -0,0 +1,71 @@ +import pytest + +from dbt.tests.util import ( + check_relations_equal, + run_dbt, +) + +sources_schema_yml = """ +version: 2 +sources: + - name: iceberg_source + schema: main + meta: + plugin: icee + iceberg_table: "examples.{identifier}" + tables: + - name: nyc_taxi_locations +""" + +models_source_model1_sql = """ + select * from {{ source('iceberg_source', 'nyc_taxi_locations') }} +""" + + +# Skipping this b/c it requires using my (@jwills) personal creds +# when testing it locally and also b/c I think there is something +# wrong with profiles_config_update since it can't be used in multiple +# tests in the same pytest session +@pytest.mark.skip +class TestIcebergPlugin: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + config = {"catalog": "default"} + if "path" not in dbt_profile_target: + return {} + return { + "test": { + "outputs": { + "dev": { + "type": "duckdb", + "path": dbt_profile_target["path"], + "plugins": [ + {"name": "icee", "impl": "iceberg", "config": config} + ], + } + }, + "target": "dev", + } + } + + @pytest.fixture(scope="class") + def models(self): + return { + "schema.yml": sources_schema_yml, + "source_model1.sql": models_source_model1_sql, + } + + def test_iceberg_plugin(self, project): + results = run_dbt() + assert len(results) == 1 + + res = project.run_sql("SELECT COUNT(1) FROM nyc_taxi_locations", fetch="one") + assert res[0] == 265 + + check_relations_equal( + project.adapter, + [ + "nyc_taxi_locations", + "source_model1", + ], + ) \ No newline at end of file From 3e094de6ef1d4de5b82a455bb8d91b8e51c27e70 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Thu, 13 Apr 2023 21:01:26 -0700 Subject: [PATCH 18/18] Add the option to materialize a plugin source as a view instead of a table --- dbt/adapters/duckdb/environments.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/duckdb/environments.py b/dbt/adapters/duckdb/environments.py index 464c0cf6..a667e2dd 100644 --- a/dbt/adapters/duckdb/environments.py +++ b/dbt/adapters/duckdb/environments.py @@ -189,7 +189,10 @@ def load_source(self, plugin_name: str, source_config: SourceConfig): assert df is not None handle = self.handle() cursor = handle.cursor() - cursor.execute(f"CREATE OR REPLACE TABLE {source_config.table_name()} AS SELECT * FROM df") + materialization = source_config.meta.get("materialization", "table") + cursor.execute( + f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df" + ) cursor.close() handle.close()