Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support reading delta tables with delta plugin #263

Merged
merged 22 commits into from
Oct 25, 2023

Conversation

milicevica23
Copy link
Contributor

@milicevica23 milicevica23 commented Oct 1, 2023

add delta table plugin
#241
an example project can be found here

@milicevica23
Copy link
Contributor Author

Hi @jwills, i have question regarding execution lifetime

As i understand we load/create materialization in the compile time from the first node which is using source? Therefore if i register the df in that time it will/is not visible to the context of the node creation time.

####### Here starts compilation
21:03:24  Began running node model.dbt_duckdb_delta.customer_raw
21:03:24  1 of 1 START sql table model main.customer_raw ................................. [RUN]
21:03:24  Acquiring new duckdb connection 'model.dbt_duckdb_delta.customer_raw'
21:03:24  Began compiling node model.dbt_duckdb_delta.customer_raw
##### We register df here 
SourceConfig(name='customer', identifier='customer', schema='main', database='dbt', meta={'delta_table_path': '/home/aleks/git/my-projects/banchmark-utils/utils/sf1/delta/customer', 'plugin': 'delta'}, tags=[])
21:03:24  Writing injected SQL for node "model.dbt_duckdb_delta.customer_raw"
21:03:24  Timing info for model.dbt_duckdb_delta.customer_raw (compile): 23:03:24.439860 => 23:03:24.506147
21:03:24  Began executing node model.dbt_duckdb_delta.customer_raw
21:03:24  Writing runtime sql for node "model.dbt_duckdb_delta.customer_raw"
21:03:24  Using duckdb connection "model.dbt_duckdb_delta.customer_raw"
######HERE STARTS node creation and ends compilation
21:03:24  On model.dbt_duckdb_delta.customer_raw: BEGIN
21:03:24  Opening a new connection, currently in state init
21:03:24  SQL status: OK in 0.0 seconds
21:03:24  Using duckdb connection "model.dbt_duckdb_delta.customer_raw"
21:03:24  On model.dbt_duckdb_delta.customer_raw: /* {"app": "dbt", "dbt_version": "1.6.3", "profile_name": "dbt_duckdb_delta", "target_name": "dev", "node_id": "model.dbt_duckdb_delta.customer_raw"} */
###### We df need it here 
    create  table
      "dbt"."main"."customer_raw__dbt_tmp"
  
    as (
      

SELECT * FROM "dbt"."main"."customer"
    );

The reasoning why i want that is because if we can somehow register that df in the execution from creation then we dont have to materialize the loading data earlier and we can leverage filter pruning from the first query automatically which will reduce drastically complexity and table preparation

Do you have some resource where i can understand execution lifecycle of the dbt <-> adapter execution and can you point me on some entry points of that interaction

@jwills
Copy link
Collaborator

jwills commented Oct 3, 2023

Hey @milicevica23 sorry for the lag here, I'm at a conference this week.

So at the time the source node in the dbt graph is created, we call the source plugin if one is defined (which is done here: https://github.com/duckdb/dbt-duckdb/blob/master/dbt/adapters/duckdb/relation.py#L42 ) which kicks off this logic block here: https://github.com/duckdb/dbt-duckdb/blob/master/dbt/adapters/duckdb/environments/local.py#L82

By default, that logic will materialize the data frame (or pyarrow table/dataset, or whatever) into the DuckDB database associated with the run (unless a table with the expected name already exists in the database, etc.-- there is some other logic in there for overwriting existing tables if they exist, etc., etc.)-- but you can also choose to materialize the data frame as a view instead, which I think is what you're alluding to here-- i.e., we can delay actually reading the data from the df/pyarrow dataset so that you can do predicate pushdown etc. later on in the dbt run.

@milicevica23
Copy link
Contributor Author

milicevica23 commented Oct 3, 2023

Hi @jwills thank you very much for the answer
The problem is that i tried to register df in this moment but in the very next when we materialize the node the df instance is not there.
I have to do that registration of the dataset in the moment/session when node which is dependent of the source is created.
I am sorry if i can't explain it right because i miss a bit of knowlage for the general adapter <-> dbt logic and when is what build. Do you have something to recommend to learn a general concept?

@jwills
Copy link
Collaborator

jwills commented Oct 3, 2023

Is it something you can demo for me so I can reproduce it myself? I think I understand the problem but I'm not entirely sure; if it is that the reference to the underlying data frame does not persist from dbt run to dbt run that makes sense, but the only way to fix that is via an actual DuckDB extension like the one that exists for Iceberg.

@milicevica23
Copy link
Contributor Author

milicevica23 commented Oct 3, 2023

will try to prepare a test, but have to do it tommorow

Just random two thoughts, i write it down not to forget:

  1. If we assume that we do the whole process in :memory: as stated in the main readme-> meaning two calls have to be in the same session? because if we create a view in one and use it in another (as currently) this view is still visible. (but scope can be not vissible but if i register it to the same session it should?) as they recommended here
  2. if we assume that two different processes are executed for the creation of the source node and the creation of the node which depends on the source -> we have to move the creation/registration of that df to the second process/session

This is purly performance reason otherwise it will work but as you said with first loding it in and then moving around

@milicevica23
Copy link
Contributor Author

milicevica23 commented Oct 4, 2023

I added a new test which showcase how the extension is used and it throws the same error
While doing that i also tried with df registration but this doesnt work either. I need more understanding how the connection instances are build and copied around
My feeling what happens is following:

import duckdb
from deltalake import DeltaTable
import pandas as pd

conn = duckdb.connect(":memory:")
conn1 = conn.cursor()
df = pd.DataFrame({"x": [1, 2, 3]})
conn.register("test_materialized",df)
conn.sql("SELECT * FROM test_materialized").show()
conn1.sql("SELECT * FROM test_materialized").show()

conn.close()
conn1.close()

This throws by conn1.sql():

CatalogException: Catalog Error: Table with name test_materialized does not exist!
Did you mean "sqlite_master"?

We create a connection in one point and then duplicate it but we dont register our df in the same connection instance

You did a very nice job with intercepting the creation of the node and adding plugins i like that idea a lot

@milicevica23
Copy link
Contributor Author

Hi @jwills i made it work as i wanted
Now the following sql query

SELECT c_custkey, c_nationkey 
FROM {{source("delta_source", "customer")}}
where c_nationkey = 15

Load just the needed parts into the memory and filter and selection are automatically pushed down from the query. Explane produce following:

┌─────────────
│ ARROW_SCAN
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
│ c_custkey
│ c_nationkey
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
│Filters: c_nationkey=15 AND
│ c_nationkey IS NOT NULL
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
│ EC: 1 │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
│ 5921 │
│ (0.00s) │
└─────────────

The problem was as i described above, between runs the registration df was not copied with the new coursor. There are multiple similar issues

I now register the df by the load time in the local environment and reregister it in each connection before execute. I will try to understand if i can move it to some other point as for example initialization of the cursor.
I would be happy if you can look over code and provide some feedback.

I still have to test if the remote storage as azure and s3 works out of box but it is slowly getting ready

.vscode/launch.json Outdated Show resolved Hide resolved
dbt/adapters/duckdb/environments/local.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@jwills jwills left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright this is looking really good-- there are some formatting fixes that will need to get applied, but the logic/setup looks good/right to me

dbt/adapters/duckdb/environments/local.py Outdated Show resolved Hide resolved
@milicevica23
Copy link
Contributor Author

Sure, just point it out where and what i have to change and i will do it. Also excuse for my English not my first language so i will be happy if you see there some potential to improve comments or documentation just point it out and i will add it.

So what i see that is still to do:

  • add some readme documentation how to use it -> where should i do it exactly?
  • add one extras in the setup in order that when you install it you install deltalake too
  • there is a small bug when you don't define schema to be main under the source configuration you get the error that there is no schema where the source view can be created -> i have to look into it, one friend of mine found it out, maybe will have here some questions
  • add few tests and make it running

I will try to go over those stuff in the next days, please add some points if i forgot some

@milicevica23
Copy link
Contributor Author

milicevica23 commented Oct 11, 2023

One more note i also tried this with storage options for Azure and it works very well and out of box so i don't think that there should be some problems with other providers as S3 or GCP storage. If there are then they are in the delta lake to fix but it should work.
I added some more explanation in my demo project where i show how it works.

I also tried dbt power user and it also works out of box which i think is very nice because. This means you can profile your delta tables very simple from the vs code

@milicevica23 milicevica23 marked this pull request as ready for review October 13, 2023 22:29
@milicevica23
Copy link
Contributor Author

milicevica23 commented Oct 13, 2023

Hi @jwills, thank you for your help and all feedback.
From the logical perspective i think this should work now, i would be happy if you have time to look once again over it and say what formatting can i fix.
Do you think that we can write this in the dbt slack chat and if somebody has some feedback or something similar or we can merge it and say that is experimental and the adapt it, if somebody has some feedback?
The thing which could be better is testing. I struggled a bit to understand utils for comparing two relations and how i can make it correctly

Copy link
Contributor

@AlexanderVR AlexanderVR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few random comments since I might want to use this plugin :-D

if as_of_datetime:
dt.load_with_datetime(as_of_datetime)

df = dt.to_pyarrow_table()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this use to_pyarrow_dataset() to enable filter pushdown and avoid materializing the entire delta table in memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Alexander, thank you very much for your comments
I tried so hard to make it pushdown predicates therefore was looking into duckdb execution graph which is as i see it now exactly the same by the table and dataset (see above few comments). But now when you say it, i remember reading somewhere in some article that there is difference and as i see it here in the example they use dataset too.
So i will change it to the dataset. Thank you very much for a good catch

class TestPlugins:
@pytest.fixture(scope="class")
def delta_test_table1(self):
path = Path("/tmp/test_delta")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it similarly to another test cases but it makes sense to me. I have to look into it how to do it.
Thank you for the tip

Comment on lines 18 to 20
cursor.register(df_name, df)
cursor.execute(
f"CREATE OR REPLACE VIEW {source_table_name} AS SELECT * FROM {df_name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, why the indirection instead of just referencing df directly in the create view statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons

  1. Historically code was differently structured and I was fighting to define dataframe which will persist between sessions executed by the dbt and therefore had first to register it in another scope and then to reference it by unique name
  2. I was not sure how duckdb registers that dataframe when it is defined in the create statement and especially when you have more then two tables and start to execute things in parallel so i thought unique name per source dataframe makes sense

dbt/adapters/duckdb/environments/local.py Show resolved Hide resolved
if as_of_datetime:
dt.load_with_datetime(as_of_datetime)

df = dt.to_pyarrow_table()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Alexander, thank you very much for your comments
I tried so hard to make it pushdown predicates therefore was looking into duckdb execution graph which is as i see it now exactly the same by the table and dataset (see above few comments). But now when you say it, i remember reading somewhere in some article that there is difference and as i see it here in the example they use dataset too.
So i will change it to the dataset. Thank you very much for a good catch

class TestPlugins:
@pytest.fixture(scope="class")
def delta_test_table1(self):
path = Path("/tmp/test_delta")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it similarly to another test cases but it makes sense to me. I have to look into it how to do it.
Thank you for the tip

Comment on lines 18 to 20
cursor.register(df_name, df)
cursor.execute(
f"CREATE OR REPLACE VIEW {source_table_name} AS SELECT * FROM {df_name}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons

  1. Historically code was differently structured and I was fighting to define dataframe which will persist between sessions executed by the dbt and therefore had first to register it in another scope and then to reference it by unique name
  2. I was not sure how duckdb registers that dataframe when it is defined in the create statement and especially when you have more then two tables and start to execute things in parallel so i thought unique name per source dataframe makes sense

@geoHeil
Copy link

geoHeil commented Oct 16, 2023

One note. from my side: the internal parquet reader is << 1 second to register the tables. The plugin-based delta reader took 5 seconds for me to process.

I do not know if there is potential to improve (given the external nature of the reader) but if possible, it would be nice to improve the performance.

@milicevica23
Copy link
Contributor Author

Interesting, I would take a look into it. I have to check if this comes from environment setup or reading metadata (by big delta tables, it can be that this takes time). Most important is that we don't load data into memory before pushdown predicates from the first materialization.
One note: you have to load and persist data with this delta-adapter and not make a view over it because the instance of the delta pointer df is not in the scope anymore. So if you build a view, the duckdb file has just the representation of it but will throw an error because it doesn't know anything about the delta pointer.

Copy link
Collaborator

@jwills jwills left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it took me a bit to get back to this, had a busy couple of days

f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df"
)

if plugin_name not in ["delta"]: # plugins which configure cursor itselfs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not thrilled about special-casing the delta plugin like this; would it be better if we simply maintained the dictionary of source-created data frames (or dataframe-like equivalents) in the Environment instance, instead of inside of the Plugin class, for all of the sources? Would that work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, we have to distinguish between materialization. Correct me if I am wrong
If you create table as df then you have the df instance and you copy data into duckdb process, if you create a view as df then you hold just the representation and pointer to df and load the first time when you resolve this view.
This is why other plugins work with table materialization and without each cursor configuration.

Therefore I am not sure if the idea is good to hold every instance of df in a dictionary in the Environment. Maybe I could make it so that we return a dataframe from the load function and if the materialization is a view register it to Environment. This could work and would solve also other plugins problems with the materialization view.

Let me try it! Just thought about it while answerinng 😁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jwills i did refactor now, can you take another look?
So i moved the logic into environment and abstracted it over the plugins if the plugin sets materialization to view
What i would like to do but i am not sure where exactly is to set default materialization for delta framework to view because it is per default better but it is not so much important

@jwills
Copy link
Collaborator

jwills commented Oct 20, 2023

just pushed some formatting fixes; going to take a shot at a) seeing if I can get the testing running for myself and then b) see if I can simplify things a bit more while keeping the tests passing

@jwills
Copy link
Collaborator

jwills commented Oct 23, 2023

@milicevica23 so I think I'm good to go with this as-is; anything I'm not sure about in the implementation is stuff that we can iterate on in the future.

The one last thing I think you wanted was a way to change the default materialization type of a source plugin from a table to a view, right? Do you just want to add that as a method on the BasePlugin class that returns "table" by default and then the delta plugin overrides it to return "view"?

@jwills jwills merged commit 31888bb into duckdb:master Oct 25, 2023
29 of 30 checks passed
@jwills
Copy link
Collaborator

jwills commented Oct 25, 2023

Hooray! Thank you @milicevica23! 🎉

@milicevica23 milicevica23 deleted the feature/delta-plugin branch October 29, 2023 01:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants