Skip to content

Commit

Permalink
addapt test; add create schema; fix storage
Browse files Browse the repository at this point in the history
  • Loading branch information
milicevica23 committed Oct 13, 2023
1 parent 806e04f commit cbca70d
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 16 deletions.
4 changes: 4 additions & 0 deletions dbt/adapters/duckdb/environments/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
plugin = self._plugins[plugin_name]
handle = self.handle()
cursor = handle.cursor()

if source_config.schema:
cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {source_config.schema}")

save_mode = source_config.get("save_mode", "overwrite")
if save_mode in ("ignore", "error_if_exists"):
params = [source_config.schema, source_config.identifier]
Expand Down
9 changes: 4 additions & 5 deletions dbt/adapters/duckdb/plugins/delta.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Any
from typing import Dict
import duckdb

from deltalake import DeltaTable

Expand All @@ -26,12 +25,12 @@ def load(self, source_config: SourceConfig):
raise Exception(
"'delta_table_path' is a required argument for the delta table!"
)
#logger.debug(source_config)

table_path = source_config["delta_table_path"]
storage_options = source_config.get("storage", None)
storage_options = source_config.get("storage_options", None)

if storage_options:
dt = DeltaTable(table_path, storage_options)
dt = DeltaTable(table_path, storage_options=storage_options)
else:
dt = DeltaTable(table_path)

Expand All @@ -46,7 +45,7 @@ def load(self, source_config: SourceConfig):
dt.load_with_datetime(as_of_datetime)

df = dt.to_pyarrow_table()

##save to register it later
self._REGISTERED_DF[source_config.table_name()] = df

Expand Down
65 changes: 54 additions & 11 deletions tests/functional/plugins/test_delta.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import os
import pytest
import sqlite3
from pathlib import Path
import shutil
import pandas as pd
Expand All @@ -15,29 +13,47 @@
version: 2
sources:
- name: delta_source
schema: main
meta:
plugin: delta
tables:
- name: table_1
description: "An delta table"
meta:
materialization: "view"
delta_table_path: "{test_delta_path}"
delta_table_path: "{test_delta_path1}"
- name: delta_source_test
schema: test
meta:
plugin: delta
tables:
- name: table_2
description: "An delta table"
meta:
delta_table_path: "{test_delta_path2}"
as_of_version: 0
"""


delta1_sql = """
{{ config(materialized='table') }}
select * from {{ source('delta_source', 'table_1') }}
"""
delta2_sql = """
{{ config(materialized='table') }}
select * from {{ source('delta_source', 'table_1') }} limit 1
"""
delta3_sql = """
{{ config(materialized='table') }}
select * as a from {{ source('delta_source_test', 'table_2') }} WHERE y = 'd'
"""


@pytest.mark.skip_profile("buenavista", "md")
class TestPlugins:
@pytest.fixture(scope="class")
def delta_test_table(self):
def delta_test_table1(self):
path = Path("/tmp/test_delta")
table_path = path / "test_delta_table"
table_path = path / "test_delta_table1"

df = pd.DataFrame({"x": [1, 2, 3]})
write_deltalake(table_path, df, mode="overwrite")
Expand All @@ -46,6 +62,27 @@ def delta_test_table(self):

shutil.rmtree(table_path)

@pytest.fixture(scope="class")
def delta_test_table2(self):
path = Path("/workspaces/dbt-duckdb/.vscode/test_delta")
table_path = path / "test_delta_table2"

df = pd.DataFrame({
"x": [1, 2, 3, 2, 3, 4, 5, 6],
"y": ["a", "b", "b", "c", "d", "c", "d", "a"]
})
write_deltalake(table_path, df, mode="overwrite")

df = pd.DataFrame({
"x": [1, 2],
"y": ["a","b"]
})
write_deltalake(table_path, df, mode="overwrite")

yield table_path

shutil.rmtree(table_path)

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
plugins = [{"module": "delta"}]
Expand All @@ -63,14 +100,20 @@ def profiles_config_update(self, dbt_profile_target):
}

@pytest.fixture(scope="class")
def models(self, delta_test_table):
def models(self, delta_test_table1,delta_test_table2):
return {
"source_schema.yml": delta_schema_yml.format(
test_delta_path=delta_test_table
test_delta_path1=delta_test_table1,
test_delta_path2=delta_test_table2
),
"delta_table.sql": delta1_sql,
"delta_table1.sql": delta1_sql,
"delta_table2.sql": delta2_sql,
"delta_table3.sql": delta3_sql,
}

def test_plugins(self, project):
results = run_dbt()
assert len(results) == 1
assert len(results) == 3

# res = project.run_sql("SELECT count(1) FROM 'delta_table3'", fetch="one")
# assert res[0] == 2

0 comments on commit cbca70d

Please sign in to comment.