Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support reading delta tables with delta plugin #263

Merged
merged 22 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
// Update 'VARIANT' to pick a Python version: 3, 3.10, 3.9, 3.8, 3.7, 3.6
// Append -bullseye or -buster to pin to an OS version.
// Use -bullseye variants on local on arm64/Apple Silicon.
"VARIANT": "3.8",
"VARIANT": "3.11",
// Options
"NODE_VERSION": "none"
}
},

// Configure tool-specific properties.
"customizations": {
// Configure properties specific to VS Code.
Expand Down Expand Up @@ -43,22 +42,18 @@
}
}
},

// Add the IDs of extensions you want installed when the container is created.
"extensions": [
"ms-python.python",
"ms-python.vscode-pylance"
]
}
},

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "pip3 install --user -r requirements.txt",

// Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode",
"postCreateCommand": "pip install -e . && pip install -r dev-requirements.txt"
}
}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ Please remember that using plugins may require you to add additional dependencie
* `gsheet` depends on `gspread` and `pandas`
* `iceberg` depends on `pyiceberg` and Python >= 3.8
* `sqlalchemy` depends on `pandas`, `sqlalchemy`, and the driver(s) you need
* `delta` depends on `deltalake`, [an example project](https://github.com/milicevica23/dbt-duckdb-delta-plugin-demo)

#### Using Local Python Modules

Expand Down
9 changes: 8 additions & 1 deletion dbt/adapters/duckdb/environments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,18 @@ def initialize_db(
return conn

@classmethod
def initialize_cursor(cls, creds: DuckDBCredentials, cursor):
def initialize_cursor(cls, creds: DuckDBCredentials, cursor, plugins: Optional[Dict[str, BasePlugin]] = None):
for key, value in creds.load_settings().items():
# Okay to set these as strings because DuckDB will cast them
# to the correct type
cursor.execute(f"SET {key} = '{value}'")

#update cursor if something is lost in the copy
#of the parent connection
if plugins:
for plugin in plugins.values():
plugin.configure_cursor(cursor)

return cursor

@classmethod
Expand Down
33 changes: 23 additions & 10 deletions dbt/adapters/duckdb/environments/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, cursor):
def __getattr__(self, name):
return getattr(self._cursor, name)

def execute(self, sql, bindings=None):
def execute(self, sql, bindings=None):
try:
if bindings is None:
return self._cursor.execute(sql)
Expand All @@ -24,7 +24,6 @@ def execute(self, sql, bindings=None):
except RuntimeError as e:
raise DbtRuntimeError(str(e))


class DuckDBConnectionWrapper:
def __init__(self, cursor, env):
self._cursor = DuckDBCursorWrapper(cursor)
Expand Down Expand Up @@ -66,10 +65,13 @@ def handle(self):
if self.conn is None:
self.conn = self.initialize_db(self.creds, self._plugins)
self.handle_count += 1
cursor = self.initialize_cursor(self.creds, self.conn.cursor())

cursor = self.initialize_cursor(self.creds, self.conn.cursor(), self._plugins)
return DuckDBConnectionWrapper(cursor, self)

def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse:
def submit_python_job(
self, handle, parsed_model: dict, compiled_code: str
) -> AdapterResponse:
con = handle.cursor()

def ldf(table_name):
Expand All @@ -87,6 +89,10 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
plugin = self._plugins[plugin_name]
handle = self.handle()
cursor = handle.cursor()

if source_config.schema:
cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {source_config.schema}")
milicevica23 marked this conversation as resolved.
Show resolved Hide resolved

save_mode = source_config.get("save_mode", "overwrite")
if save_mode in ("ignore", "error_if_exists"):
params = [source_config.schema, source_config.identifier]
Expand All @@ -100,20 +106,27 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
params.append(source_config.database)
if cursor.execute(q, params).fetchone()[0]:
if save_mode == "error_if_exists":
raise Exception(f"Source {source_config.table_name()} already exists!")
raise Exception(
f"Source {source_config.table_name()} already exists!"
)
else:
# Nothing to do (we ignore the existing table)
return
df = plugin.load(source_config)
assert df is not None
materialization = source_config.meta.get("materialization", "table")
cursor.execute(
f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df"
)

if plugin_name not in ["delta"]: # plugins which configure cursor itselfs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not thrilled about special-casing the delta plugin like this; would it be better if we simply maintained the dictionary of source-created data frames (or dataframe-like equivalents) in the Environment instance, instead of inside of the Plugin class, for all of the sources? Would that work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, we have to distinguish between materialization. Correct me if I am wrong
If you create table as df then you have the df instance and you copy data into duckdb process, if you create a view as df then you hold just the representation and pointer to df and load the first time when you resolve this view.
This is why other plugins work with table materialization and without each cursor configuration.

Therefore I am not sure if the idea is good to hold every instance of df in a dictionary in the Environment. Maybe I could make it so that we return a dataframe from the load function and if the materialization is a view register it to Environment. This could work and would solve also other plugins problems with the materialization view.

Let me try it! Just thought about it while answerinng 😁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jwills i did refactor now, can you take another look?
So i moved the logic into environment and abstracted it over the plugins if the plugin sets materialization to view
What i would like to do but i am not sure where exactly is to set default materialization for delta framework to view because it is per default better but it is not so much important

materialization = source_config.meta.get("materialization", "table")
cursor.execute(
f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df"
)

cursor.close()
handle.close()

def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None:
def store_relation(
self, plugin_name: str, target_config: utils.TargetConfig
) -> None:
if plugin_name not in self._plugins:
if plugin_name.startswith("glue|"):
from ..plugins import glue
Expand Down
10 changes: 10 additions & 0 deletions dbt/adapters/duckdb/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,13 @@ def load(self, source_config: SourceConfig):

def store(self, target_config: TargetConfig):
raise NotImplementedError(f"store method not implemented for {self.name}")

def configure_cursor(self, cursor):
"""
Configure each copy of the DuckDB cursor.
This method should be overridden by subclasses to provide additional
attributes to the connection which are lost in the copy of the parent connection.

:param cursor: A DuckDBPyConnection instance to be configured.
"""
pass
55 changes: 55 additions & 0 deletions dbt/adapters/duckdb/plugins/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import Any
from typing import Dict

from deltalake import DeltaTable

from . import BasePlugin
from ..utils import SourceConfig
from dbt.logger import GLOBAL_LOGGER as logger


class Plugin(BasePlugin):
def initialize(self, config: Dict[str, Any]):
self._REGISTERED_DF: dict = {}

def configure_cursor(self, cursor):
for source_table_name, df in self._REGISTERED_DF.items():
df_name = source_table_name.replace(".", "_") + "_df"
cursor.register(df_name, df)
cursor.execute(
f"CREATE OR REPLACE VIEW {source_table_name} AS SELECT * FROM {df_name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, why the indirection instead of just referencing df directly in the create view statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons

  1. Historically code was differently structured and I was fighting to define dataframe which will persist between sessions executed by the dbt and therefore had first to register it in another scope and then to reference it by unique name
  2. I was not sure how duckdb registers that dataframe when it is defined in the create statement and especially when you have more then two tables and start to execute things in parallel so i thought unique name per source dataframe makes sense

)

def load(self, source_config: SourceConfig):
if "delta_table_path" not in source_config:
raise Exception(
"'delta_table_path' is a required argument for the delta table!"
)

table_path = source_config["delta_table_path"]
storage_options = source_config.get("storage_options", None)

if storage_options:
dt = DeltaTable(table_path, storage_options=storage_options)
else:
dt = DeltaTable(table_path)

# delta attributes
as_of_version = source_config.get("as_of_version", None)
as_of_datetime = source_config.get("as_of_datetime", None)

if as_of_version:
dt.load_version(as_of_version)

if as_of_datetime:
dt.load_with_datetime(as_of_datetime)

df = dt.to_pyarrow_table()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this use to_pyarrow_dataset() to enable filter pushdown and avoid materializing the entire delta table in memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Alexander, thank you very much for your comments
I tried so hard to make it pushdown predicates therefore was looking into duckdb execution graph which is as i see it now exactly the same by the table and dataset (see above few comments). But now when you say it, i remember reading somewhere in some article that there is difference and as i see it here in the example they use dataset too.
So i will change it to the dataset. Thank you very much for a good catch


##save to register it later
self._REGISTERED_DF[source_config.table_name()] = df

return df

# Future
# TODO add databricks catalog
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ sqlalchemy
tox>=3.13
twine
wheel
deltalake
131 changes: 131 additions & 0 deletions tests/functional/plugins/test_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import pytest
from pathlib import Path
import shutil
import pandas as pd

from dbt.tests.util import (
check_relations_equal,
run_dbt,
)
from deltalake.writer import write_deltalake

delta_schema_yml = """
version: 2
sources:
- name: delta_source
meta:
plugin: delta
tables:
- name: table_1
description: "An delta table"
meta:
delta_table_path: "{test_delta_path1}"

- name: delta_source_test
schema: test
meta:
plugin: delta
tables:
- name: table_2
description: "An delta table"
meta:
delta_table_path: "{test_delta_path2}"
as_of_version: 0
"""


delta1_sql = """
{{ config(materialized='table') }}
select * from {{ source('delta_source', 'table_1') }}
"""
delta2_sql = """
{{ config(materialized='table') }}
select * from {{ source('delta_source', 'table_1') }} limit 1
"""
delta3_sql = """
{{ config(materialized='table') }}
select * as a from {{ source('delta_source_test', 'table_2') }} WHERE y = 'd'
"""

delta3_sql_expected = """
select 1 as x, 'a' as y
"""


@pytest.mark.skip_profile("buenavista", "md")
class TestPlugins:
@pytest.fixture(scope="class")
def delta_test_table1(self):
path = Path("/tmp/test_delta")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it similarly to another test cases but it makes sense to me. I have to look into it how to do it.
Thank you for the tip

table_path = path / "test_delta_table1"

df = pd.DataFrame({"x": [1, 2, 3]})
write_deltalake(table_path, df, mode="overwrite")

yield table_path

shutil.rmtree(table_path)

@pytest.fixture(scope="class")
def delta_test_table2(self):
path = Path("/workspaces/dbt-duckdb/.vscode/test_delta")
table_path = path / "test_delta_table2"

df = pd.DataFrame({
"x": [1],
"y": ["a"]
})
write_deltalake(table_path, df, mode="overwrite")

df = pd.DataFrame({
"x": [1, 2],
"y": ["a","b"]
})
write_deltalake(table_path, df, mode="overwrite")

yield table_path

shutil.rmtree(table_path)

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
plugins = [{"module": "delta"}]
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": dbt_profile_target.get("path", ":memory:"),
"plugins": plugins,
}
},
"target": "dev",
}
}

@pytest.fixture(scope="class")
def models(self, delta_test_table1,delta_test_table2):
return {
"source_schema.yml": delta_schema_yml.format(
test_delta_path1=delta_test_table1,
test_delta_path2=delta_test_table2
),
"delta_table1.sql": delta1_sql,
"delta_table2.sql": delta2_sql,
"delta_table3.sql": delta3_sql,
"delta_table3_expected.sql": delta3_sql_expected,
}

def test_plugins(self, project):
results = run_dbt()
assert len(results) == 4

# check_relations_equal(
# project.adapter,
# [
# "delta_table3",
# "delta_table3_expected",
# ],
# )
# res = project.run_sql("SELECT count(1) FROM 'delta_table3'", fetch="one")
# assert res[0] == 2