Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workaround for temporary tables in remote database when running incremental model #326

Merged
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5d7f7fa
add workaround for temporary tables in remote databases
guenp Jan 30, 2024
6e91597
clarify inline comment
guenp Jan 30, 2024
61b4a3e
add is_motherduck property to credentials
guenp Jan 30, 2024
bd85518
Add UT to test incremental model on MotherDuck
guenp Jan 31, 2024
4dc717c
consolidate MotherDuck plugin tests
guenp Jan 31, 2024
fae6efd
clarify docstsring
guenp Jan 31, 2024
1adda92
clarify docstring
guenp Jan 31, 2024
83727cc
use py311 for md tox env
guenp Jan 31, 2024
6393537
clean up temp tables for incrementals, add db creation and cleanup fo…
guenp Jan 31, 2024
4758526
add some helpful inline comments
guenp Jan 31, 2024
d7dc736
add more cleanup to UT, add schema temp to target database for temp t…
guenp Jan 31, 2024
c3f4376
create temp schema if needed
guenp Jan 31, 2024
26b0923
Create remote temporary tables in a separate schema dbt_temp
guenp Jan 31, 2024
66c496e
Don't use MD_CONNECT, instead use SET motherduck_token
guenp Feb 1, 2024
0044725
Don't drop temp schema after test ends
guenp Feb 1, 2024
21ef4dc
use adapter.is_motherduck
guenp Feb 1, 2024
f4ac34c
Set md profile path back to md:test, make database_name fixture
guenp Feb 1, 2024
4dcb585
use credentials.is_motherduck in LocalEnvironment
guenp Feb 1, 2024
87a1e6a
Reverse change to tox.ini for CI
guenp Feb 1, 2024
22b430d
make temp schema name configurable, fix bugs for local in-memory tests
guenp Feb 1, 2024
be71a2d
formatting
guenp Feb 1, 2024
63eb849
Add test for temp schema name config
guenp Feb 1, 2024
e3dc75c
address mypy issues
guenp Feb 1, 2024
3ec5527
add _temp_schema_name attribute to adapter
guenp Feb 1, 2024
f857608
update docstring
guenp Feb 1, 2024
fd6e033
Add plugin test to MotherDuck tox environment
guenp Feb 1, 2024
0ed05bf
Update dbt/adapters/duckdb/impl.py
guenp Feb 1, 2024
15e61b4
remove superfluous need_drop_temp variable and add temp_relation to t…
guenp Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dbt/adapters/duckdb/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ class DuckDBCredentials(Credentials):
# by networking issues)
retries: Optional[Retries] = None

@property
def is_motherduck(self):
parsed = urlparse(self.path)
return self._is_motherduck(parsed.scheme)

@staticmethod
def _is_motherduck(scheme: str) -> bool:
return scheme in {"md", "motherduck"}

@classmethod
def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]:
data = super().__pre_deserialize__(data)
Expand All @@ -159,7 +168,7 @@ def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]:
path_db = os.path.splitext(base_file)[0]
# For MotherDuck, turn on disable_transactions unless
# it's explicitly set already by the user
if parsed.scheme in {"md", "motherduck"}:
if cls._is_motherduck(parsed.scheme):
if "disable_transactions" not in data:
data["disable_transactions"] = True
if path_db == "":
Expand Down
5 changes: 1 addition & 4 deletions dbt/adapters/duckdb/environments/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ def __init__(self, credentials: credentials.DuckDBCredentials):
self.handle_count = 0
self.lock = threading.RLock()
self._keep_open = (
credentials.keep_open
or credentials.path == ":memory:"
or credentials.path.startswith("md:")
or credentials.path.startswith("motherduck:")
credentials.keep_open or credentials.path == ":memory:" or credentials.is_motherduck
)
self._REGISTERED_DF: dict = {}

Expand Down
42 changes: 42 additions & 0 deletions dbt/adapters/duckdb/impl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from typing import Any
from typing import List
from typing import Optional
from typing import Sequence
Expand All @@ -19,9 +20,14 @@
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.nodes import ColumnLevelConstraint
from dbt.contracts.graph.nodes import ConstraintType
from dbt.contracts.relation import Path
from dbt.contracts.relation import RelationType
from dbt.exceptions import DbtInternalError
from dbt.exceptions import DbtRuntimeError

TEMP_SCHEMA_NAME = "temp_schema_name"
DEFAULT_TEMP_SCHEMA_NAME = "dbt_temp"


class DuckDBAdapter(SQLAdapter):
ConnectionManager = DuckDBConnectionManager
Expand All @@ -36,6 +42,9 @@ class DuckDBAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

# can be overridden via the model config metadata
_temp_schema_name = DEFAULT_TEMP_SCHEMA_NAME

@classmethod
def date_function(cls) -> str:
return "now()"
Expand All @@ -47,6 +56,10 @@ def is_cancelable(cls) -> bool:
def debug_query(self):
self.execute("select 1 as id")

@available
def is_motherduck(self):
return self.config.credentials.is_motherduck

@available
def convert_datetimes_to_strs(self, table: agate.Table) -> agate.Table:
for column in table.columns:
Expand Down Expand Up @@ -217,6 +230,35 @@ def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional
else:
return super().render_column_constraint(constraint)

def pre_model_hook(self, config: Any) -> None:
"""A hook for getting the temp schema name from the model config"""
self._temp_schema_name = config.model.config.meta.get(
TEMP_SCHEMA_NAME, self._temp_schema_name
)
super().pre_model_hook(config)

@available
def get_temp_relation_path(self, model: Any):
"""This is a workaround to enable incremental models on MotherDuck because it
currently doesn't support remote temporary tables. Instead we use a regular
table that is dropped at the end of the incremental macro or post-model hook.
"""
return Path(
schema=self._temp_schema_name, database=model.database, identifier=model.identifier
)

def post_model_hook(self, config: Any, context: Any) -> None:
"""A hook for cleaning up the remote temporary table on MotherDuck if the
incremental model materialization fails to do so.
"""
if self.is_motherduck():
if "incremental" == config.model.get_materialization():
temp_relation = self.Relation(
path=self.get_temp_relation_path(config.model), type=RelationType.Table
)
self.drop_relation(temp_relation)
super().post_model_hook(config, context)


# Change `table_a/b` to `table_aaaaa/bbbbb` to avoid duckdb binding issues when relation_a/b
# is called "table_a" or "table_b" in some of the dbt tests
Expand Down
5 changes: 2 additions & 3 deletions dbt/adapters/duckdb/plugins/motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def initialize(self, config: Dict[str, Any]):

def configure_connection(self, conn: DuckDBPyConnection):
conn.load_extension("motherduck")
connect_stmt = "PRAGMA md_connect"
if self._token:
connect_stmt = f"PRAGMA md_connect('token={self._token}')"
conn.execute(connect_stmt)
connect_stmt = f"SET motherduck_token={self._token}')"
conn.execute(connect_stmt)
15 changes: 14 additions & 1 deletion dbt/include/duckdb/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{% materialization incremental, adapter="duckdb", supported_languages=['sql', 'python'] -%}

{%- set language = model['language'] -%}
-- only create temp tables if using local duckdb, as it is not currently supported for remote databases
{%- set temporary = not adapter.is_motherduck() -%}

-- relations
{%- set existing_relation = load_cached_relation(this) -%}
Expand Down Expand Up @@ -39,13 +41,20 @@
{% set build_sql = create_table_as(False, intermediate_relation, compiled_code, language) %}
{% set need_swap = true %}
{% else %}
{% if not temporary %}
-- if not using a temporary table we will update the temp relation to use a different temp schema ("dbt_temp" by default)
{% set temp_relation = temp_relation.incorporate(path=adapter.get_temp_relation_path(this)) %}
{% do run_query(create_schema(temp_relation)) %}
-- then drop the temp relation after we insert the incremental data into the target relation
{% set need_drop_temp = True %}
guenp marked this conversation as resolved.
Show resolved Hide resolved
{% endif %}
{% if language == 'python' %}
{% set build_python = create_table_as(False, temp_relation, compiled_code, language) %}
{% call statement("pre", language=language) %}
{{- build_python }}
{% endcall %}
{% else %} {# SQL #}
{% do run_query(create_table_as(True, temp_relation, compiled_code, language)) %}
{% do run_query(create_table_as(temporary, temp_relation, compiled_code, language)) %}
{% endif %}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
Expand Down Expand Up @@ -76,6 +85,10 @@
{% do to_drop.append(backup_relation) %}
{% endif %}

{% if need_drop_temp %}
{% do to_drop.append(temp_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

Expand Down
102 changes: 102 additions & 0 deletions tests/functional/plugins/test_motherduck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import pytest
from dbt.tests.util import (
run_dbt,
)

random_logs_sql = """
{{ config(materialized='table', meta=dict(temp_schema_name='dbt_temp_test')) }}

select
uuid()::varchar as log_id,
'2023-10-01'::timestamp + interval 1 minute * (random() * 20000)::int as dt ,
(random() * 4)::int64 as user_id
from generate_series(1, 10000) g(x)
"""

summary_of_logs_sql = """
{{
config(
materialized='incremental',
meta=dict(temp_schema_name='dbt_temp_test'),
)
}}

select dt::date as dt, user_id, count(1) as c
from {{ ref('random_logs_test') }}


{% if is_incremental() %}

-- this filter will only be applied on an incremental run
-- (uses > to include records whose timestamp occurred since the last run of this model)
where dt > '2023-10-08'::timestamp

{% endif %}
group by all
"""

@pytest.mark.skip_profile("buenavista", "file", "memory")
class TestMDPlugin:
@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
md_config = {}
plugins = [{"module": "motherduck", "config": md_config}]
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": dbt_profile_target.get("path", ":memory:"),
"plugins": plugins,
}
},
"target": "dev",
}
}

@pytest.fixture(scope="class")
def database_name(self, dbt_profile_target):
return dbt_profile_target["path"].replace("md:", "")

@pytest.fixture(scope="class")
def md_sql(self, database_name):
# Reads from a MD database in my test account in the cloud
return f"""
select * FROM {database_name}.main.plugin_table
"""

@pytest.fixture(scope="class")
def models(self, md_sql):
return {
"md_table.sql": md_sql,
"random_logs_test.sql": random_logs_sql,
"summary_of_logs_test.sql": summary_of_logs_sql,
}

@pytest.fixture(autouse=True)
def run_dbt_scope(self, project, database_name):
project.run_sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
project.run_sql("CREATE OR REPLACE TABLE plugin_table (i integer, j string)")
project.run_sql("INSERT INTO plugin_table (i, j) VALUES (1, 'foo')")
yield
project.run_sql("DROP VIEW md_table")
project.run_sql("DROP TABLE random_logs_test")
project.run_sql("DROP TABLE summary_of_logs_test")
project.run_sql("DROP TABLE plugin_table")

def test_motherduck(self, project):
run_dbt()
res = project.run_sql("SELECT * FROM md_table", fetch="one")
assert res == (1, "foo")

def test_incremental(self, project):
run_dbt()
res = project.run_sql("SELECT count(*) FROM summary_of_logs_test", fetch="one")
assert res == (70,)

run_dbt()
res = project.run_sql("SELECT count(*) FROM summary_of_logs_test", fetch="one")
assert res == (105,)

res = project.run_sql("SELECT schema_name FROM information_schema.schemata WHERE catalog_name = 'test'", fetch="all")
assert "dbt_temp_test" in [_r for (_r,) in res]
36 changes: 0 additions & 36 deletions tests/functional/plugins/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@
select foo() as foo
"""

# Reads from a MD database in my test account in the cloud
md_sql = """
select * FROM plugin_test.main.plugin_table
"""


@pytest.mark.skip_profile("buenavista", "md")
class TestPlugins:
Expand Down Expand Up @@ -133,34 +128,3 @@ def test_plugins(self, project):

res = project.run_sql("SELECT foo FROM foo", fetch="one")
assert res[0] == 1729


@pytest.mark.skip_profile("buenavista", "file", "memory")
class TestMDPlugin:
@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
md_config = {}
plugins = [{"module": "motherduck", "config": md_config}]
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": dbt_profile_target.get("path", ":memory:"),
"plugins": plugins,
}
},
"target": "dev",
}
}

@pytest.fixture(scope="class")
def models(self):
return {
"md_table.sql": md_sql,
}

def test_plugins(self, project):
run_dbt()
res = project.run_sql("SELECT * FROM md_table", fetch="one")
assert res == (1, "foo")
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ deps =
description = adapter function testing using MotherDuck
skip_install = True
passenv = *
commands = {envpython} -m pytest --profile=md --maxfail=2 {posargs} tests/functional/adapter
commands = {envpython} -m pytest --profile=md --maxfail=2 {posargs} tests/functional/adapter tests/functional/plugins/test_motherduck.py
deps =
-rdev-requirements.txt
-e.
Expand Down
Loading