From 872bbad2f11909b6f5cccd79eb6d781b0379fa4a Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 3 May 2022 18:36:13 -0600 Subject: [PATCH 01/28] push snowflake implementation refactored from drews code --- .../macros/materializations/table.sql | 63 +++++++++++++++++-- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 49f97069b..153f10081 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -17,11 +17,37 @@ {{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }} {{ drop_relation_if_exists(old_relation) }} {% endif %} + + {% if config.get('language', 'sql') == 'python' -%}} + {%- set proc_name = api.Relation.create(identifier=identifier ~ "__dbt_sp", + schema=schema, + database=database) -%} + {% set materialization_logic = py_materialize_as_table() %} + {% set setup_stored_proc = py_create_stored_procedure(proc_name, materialization_logic, model, sql) %} + + {% do log("Creating stored procedure: " ~ proc_name, info=true) %} + {% do run_query(setup_stored_proc) %} + {% do log("Finished creating stored procedure: " ~ proc_name, info=true) %} + + --build model + {% call statement('main') -%} + + {% do log("Calling stored procedure: " ~ proc_name, info=true) %} + CALL {{ proc_name }}('{{ target_relation }}'); - --build model - {% call statement('main') -%} - {{ create_table_as(false, target_relation, sql) }} - {%- endcall %} + {%- endcall %} + + -- cleanup stuff + {% do log("Dropping stored procedure: " ~ proc_name, info=true) %} + {% do run_query("drop procedure if exists " ~ proc_name ~ "(string)") %} + + {%- else -%} + --build model + {% call statement('main') -%} + {{ create_table_as(false, target_relation, sql) }} + {%- endcall %} + + {%- endif %} {{ run_hooks(post_hooks) }} @@ -32,3 +58,32 @@ {{ return({'relations': [target_relation]}) }} {% endmaterialization %} + + + +{% macro py_create_stored_procedure(proc_name, materialization_logic, model, user_supplied_logic) %} + +{% set packages = ['snowflake-snowpark-python'] + config.get('packages', []) %} + +CREATE OR REPLACE PROCEDURE {{ proc_name }} (target_relation STRING) +RETURNS STRING +LANGUAGE PYTHON +RUNTIME_VERSION = '3.8' +PACKAGES = ('{{ packages | join("', '") }}') +HANDLER = 'run' +AS +$$ + +{#-- can we wrap in 'def model:' here? or will formatting screw us? --#} +{{ user_supplied_logic }} + +{{ materialization_logic }} + +def run(session, target_relation): + df = model(session, dbt) + materialize(session, df, target_relation) + return "OK" + +$$; + +{% endmacro %} \ No newline at end of file From 179750116154fda2d39ce33e3de3f7e5db991154 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 4 May 2022 15:28:53 -0600 Subject: [PATCH 02/28] add py_materialize_as_table --- dbt/include/snowflake/macros/materializations/table.sql | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 153f10081..2f8396860 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -59,7 +59,12 @@ {% endmaterialization %} +{% macro py_materialize_as_table(config) %} +def materialize(session, df, target_relation): + df.write.mode("overwrite").save_as_table(target_relation) + +{% endmacro %} {% macro py_create_stored_procedure(proc_name, materialization_logic, model, user_supplied_logic) %} From 68c66f715521ed105692f0b7817e5e50a4107bbc Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Fri, 6 May 2022 17:59:17 -0600 Subject: [PATCH 03/28] add load df def --- dbt/include/snowflake/macros/adapters.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index c5f07ff5f..8b47895b5 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -273,3 +273,7 @@ {{ snowflake_dml_explicit_transaction(truncate_dml) }} {%- endcall %} {% endmacro %} + +{% macro load_df_def() %} + load_df_function = session.table +{% endmacro %} From 8a6b0666d704ac7c75bd34e7d199d1b33bb4b3c6 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Fri, 6 May 2022 15:40:46 +0200 Subject: [PATCH 04/28] Bump version to 1.3.0a1 --- dbt/adapters/snowflake/__version__.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/snowflake/__version__.py b/dbt/adapters/snowflake/__version__.py index a6b977228..3fc48dd51 100644 --- a/dbt/adapters/snowflake/__version__.py +++ b/dbt/adapters/snowflake/__version__.py @@ -1 +1 @@ -version = "1.2.0a1" +version = '1.3.0a1' diff --git a/setup.py b/setup.py index 332b5d68e..0319a9908 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ def _get_dbt_core_version(): package_name = "dbt-snowflake" -package_version = "1.2.0a1" +package_version = "1.3.0a1" dbt_core_version = _get_dbt_core_version() description = """The Snowflake adapter plugin for dbt""" From 49bb604225e4df08770badc8fb6ae0e354053726 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 1 Jun 2022 18:06:58 -0700 Subject: [PATCH 05/28] make session global so we can load table with dbt ref --- dbt/include/snowflake/macros/adapters.sql | 3 ++- dbt/include/snowflake/macros/materializations/table.sql | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 8b47895b5..b672f475f 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -275,5 +275,6 @@ {% endmacro %} {% macro load_df_def() %} - load_df_function = session.table + global snowpark_session + load_df_function = snowpark_session.table {% endmacro %} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 2f8396860..6958fdbac 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -79,14 +79,18 @@ HANDLER = 'run' AS $$ +snowpark_session = None + {#-- can we wrap in 'def model:' here? or will formatting screw us? --#} {{ user_supplied_logic }} {{ materialization_logic }} def run(session, target_relation): - df = model(session, dbt) - materialize(session, df, target_relation) + global snowpark_session + snowpark_session = session + df = model(dbt) + materialize(session, df, target_relation) return "OK" $$; From 1490de24d0c3a7c802bc36a807ab2b78ec4ccade Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 1 Jun 2022 21:50:18 -0700 Subject: [PATCH 06/28] update requirement for the moment and fix format --- dbt/adapters/snowflake/__version__.py | 2 +- dbt/include/snowflake/macros/materializations/table.sql | 8 ++++---- dev-requirements.txt | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dbt/adapters/snowflake/__version__.py b/dbt/adapters/snowflake/__version__.py index 3fc48dd51..a9fe3c3ee 100644 --- a/dbt/adapters/snowflake/__version__.py +++ b/dbt/adapters/snowflake/__version__.py @@ -1 +1 @@ -version = '1.3.0a1' +version = "1.3.0a1" diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 6958fdbac..45ca4ca5f 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -17,7 +17,7 @@ {{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }} {{ drop_relation_if_exists(old_relation) }} {% endif %} - + {% if config.get('language', 'sql') == 'python' -%}} {%- set proc_name = api.Relation.create(identifier=identifier ~ "__dbt_sp", schema=schema, @@ -40,13 +40,13 @@ -- cleanup stuff {% do log("Dropping stored procedure: " ~ proc_name, info=true) %} {% do run_query("drop procedure if exists " ~ proc_name ~ "(string)") %} - + {%- else -%} --build model {% call statement('main') -%} {{ create_table_as(false, target_relation, sql) }} {%- endcall %} - + {%- endif %} {{ run_hooks(post_hooks) }} @@ -95,4 +95,4 @@ def run(session, target_relation): $$; -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/dev-requirements.txt b/dev-requirements.txt index b5943e34b..46e9d3e04 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/python-models-dbt-core.git@feature/python-model-v1#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/python-models-dbt-core.git@feature/python-model-v1#egg=dbt-tests-adapter&subdirectory=tests/adapter black==21.12b0 click~=8.0.4 From d61a212a18f22566833b527ef625a5ddb0a06b32 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 2 Jun 2022 16:18:07 -0700 Subject: [PATCH 07/28] remove logging and add functional test --- dbt/include/snowflake/macros/materializations/table.sql | 3 --- dev-requirements.txt | 4 ++-- tests/functional/adapter/test_basic.py | 9 ++++++++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 45ca4ca5f..0d7f194ac 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -31,14 +31,11 @@ --build model {% call statement('main') -%} - - {% do log("Calling stored procedure: " ~ proc_name, info=true) %} CALL {{ proc_name }}('{{ target_relation }}'); {%- endcall %} -- cleanup stuff - {% do log("Dropping stored procedure: " ~ proc_name, info=true) %} {% do run_query("drop procedure if exists " ~ proc_name ~ "(string)") %} {%- else -%} diff --git a/dev-requirements.txt b/dev-requirements.txt index 46e9d3e04..b5943e34b 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/python-models-dbt-core.git@feature/python-model-v1#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/python-models-dbt-core.git@feature/python-model-v1#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter black==21.12b0 click~=8.0.4 diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index fec10aa33..4a117b11f 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -12,6 +12,9 @@ from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod +from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests + + class TestSimpleMaterializationsSnowflake(BaseSimpleMaterializations): pass @@ -51,4 +54,8 @@ class TestSnapshotTimestampSnowflake(BaseSnapshotTimestamp): class TestBaseAdapterMethodSnowflake(BaseAdapterMethod): @pytest.fixture(scope="class") def equal_tables(self): - return ["MODEL", "EXPECTED"] \ No newline at end of file + return ["MODEL", "EXPECTED"] + + +class TestBasePythonModelSnowflake(BasePythonModelTests): + pass From 86e6f110c8d2688c5c9b7c3e44b124156a7cd94d Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Mon, 6 Jun 2022 12:22:58 +0200 Subject: [PATCH 08/28] Misc experimentation --- dbt/include/snowflake/macros/adapters.sql | 4 +-- .../macros/materializations/table.sql | 27 ++++++++++++++----- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index b672f475f..c06f715e8 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -275,6 +275,6 @@ {% endmacro %} {% macro load_df_def() %} - global snowpark_session - load_df_function = snowpark_session.table +global snowpark_session +load_df_function = snowpark_session.table {% endmacro %} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 0d7f194ac..eea41c239 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -59,7 +59,16 @@ {% macro py_materialize_as_table(config) %} def materialize(session, df, target_relation): - df.write.mode("overwrite").save_as_table(target_relation) + if isinstance(df, snowflake.snowpark.DataFrame): + df.write.mode("overwrite").save_as_table(target_relation) + elif pandas and isinstance(df, pandas.core.frame.DataFrame): + session.write_pandas( + df=df, + table_name=target_relation.identifier, + database=target_relation.database, + schema=target_relation.schema, + auto_create_table=True + ) {% endmacro %} @@ -67,27 +76,31 @@ def materialize(session, df, target_relation): {% set packages = ['snowflake-snowpark-python'] + config.get('packages', []) %} -CREATE OR REPLACE PROCEDURE {{ proc_name }} (target_relation STRING) +CREATE OR REPLACE PROCEDURE {{ proc_name }} () RETURNS STRING LANGUAGE PYTHON -RUNTIME_VERSION = '3.8' +RUNTIME_VERSION = '3.8' -- TODO should this be configurable? PACKAGES = ('{{ packages | join("', '") }}') -HANDLER = 'run' +HANDLER = 'run' -- TODO should this be called 'main', to match Snowsight default? AS $$ snowpark_session = None -{#-- can we wrap in 'def model:' here? or will formatting screw us? --#} {{ user_supplied_logic }} {{ materialization_logic }} -def run(session, target_relation): +def run(session): + """ + TODOs: + - how can we avoid the 'session' global? + - what should this return? can we make a real RunResult? + """ global snowpark_session snowpark_session = session df = model(dbt) - materialize(session, df, target_relation) + materialize(session, df, dbt.this) return "OK" $$; From 1cc215870a9ae5eea38438ad35242b2f4078358b Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 7 Jun 2022 18:26:12 -0700 Subject: [PATCH 09/28] make snowflake work --- .../macros/materializations/table.sql | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index eea41c239..927e4d67c 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -31,7 +31,7 @@ --build model {% call statement('main') -%} - CALL {{ proc_name }}('{{ target_relation }}'); + CALL {{ proc_name }}(); {%- endcall %} @@ -57,18 +57,18 @@ {% endmaterialization %} {% macro py_materialize_as_table(config) %} - def materialize(session, df, target_relation): - if isinstance(df, snowflake.snowpark.DataFrame): + if "pandas" in dir() and isinstance(df, pandas.core.frame.DataFrame): + session.write_pandas( + df=df, + table_name=target_relation.identifier, + database=target_relation.database, + schema=target_relation.schema, + auto_create_table=True + ) + else: + # we are assuming it is going to be snowflake DataFrame df.write.mode("overwrite").save_as_table(target_relation) - elif pandas and isinstance(df, pandas.core.frame.DataFrame): - session.write_pandas( - df=df, - table_name=target_relation.identifier, - database=target_relation.database, - schema=target_relation.schema, - auto_create_table=True - ) {% endmacro %} @@ -85,7 +85,6 @@ HANDLER = 'run' -- TODO should this be called 'main', to match Snowsight default AS $$ -snowpark_session = None {{ user_supplied_logic }} @@ -97,10 +96,9 @@ def run(session): - how can we avoid the 'session' global? - what should this return? can we make a real RunResult? """ - global snowpark_session - snowpark_session = session + dbt = dbtObj(session.table) df = model(dbt) - materialize(session, df, dbt.this) + materialize(session, df, str(dbt.this)) return "OK" $$; From bc0b241c56a383fba5397dcbe6768183871e1e47 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 8 Jun 2022 09:00:17 -0700 Subject: [PATCH 10/28] fix indentation --- .../macros/materializations/table.sql | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 927e4d67c..914066081 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -59,13 +59,13 @@ {% macro py_materialize_as_table(config) %} def materialize(session, df, target_relation): if "pandas" in dir() and isinstance(df, pandas.core.frame.DataFrame): - session.write_pandas( - df=df, - table_name=target_relation.identifier, - database=target_relation.database, - schema=target_relation.schema, - auto_create_table=True - ) + session.write_pandas( + df=df, + table_name=target_relation.identifier, + database=target_relation.database, + schema=target_relation.schema, + auto_create_table=True + ) else: # we are assuming it is going to be snowflake DataFrame df.write.mode("overwrite").save_as_table(target_relation) @@ -91,15 +91,14 @@ $$ {{ materialization_logic }} def run(session): - """ - TODOs: - - how can we avoid the 'session' global? - - what should this return? can we make a real RunResult? - """ - dbt = dbtObj(session.table) - df = model(dbt) - materialize(session, df, str(dbt.this)) - return "OK" + """ + TODOs: + - what should this return? can we make a real RunResult? + """ + dbt = dbtObj(session.table) + df = model(dbt) + materialize(session, df, str(dbt.this)) + return "OK" $$; From 2bfddc6062844b3adf8afeba1177482250ac6cc7 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 16 Jun 2022 14:10:08 -0700 Subject: [PATCH 11/28] resolve pandas df save and update entry function name --- .../macros/materializations/table.sql | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 914066081..681522fb3 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -58,18 +58,12 @@ {% macro py_materialize_as_table(config) %} def materialize(session, df, target_relation): - if "pandas" in dir() and isinstance(df, pandas.core.frame.DataFrame): - session.write_pandas( - df=df, - table_name=target_relation.identifier, - database=target_relation.database, - schema=target_relation.schema, - auto_create_table=True - ) - else: - # we are assuming it is going to be snowflake DataFrame - df.write.mode("overwrite").save_as_table(target_relation) - + # we have to make sure pandas is imported + import pandas + if isinstance(df, pandas.core.frame.DataFrame): + # session.write_pandas does not have overwrite function + df = session.createDataFrame(df) + df.write.mode("overwrite").save_as_table(str(target_relation)) {% endmacro %} {% macro py_create_stored_procedure(proc_name, materialization_logic, model, user_supplied_logic) %} @@ -81,23 +75,22 @@ RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.8' -- TODO should this be configurable? PACKAGES = ('{{ packages | join("', '") }}') -HANDLER = 'run' -- TODO should this be called 'main', to match Snowsight default? +HANDLER = 'main' AS $$ - {{ user_supplied_logic }} {{ materialization_logic }} -def run(session): +def main(session): """ TODOs: - what should this return? can we make a real RunResult? """ dbt = dbtObj(session.table) - df = model(dbt) - materialize(session, df, str(dbt.this)) + df = model(dbt, session) + materialize(session, df, dbt.this) return "OK" $$; From 7d2c51b1812a24c3e128e6761fcbeee777c7686a Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 16 Jun 2022 16:01:54 -0700 Subject: [PATCH 12/28] update python model tests --- tests/functional/adapter/test_basic.py | 6 -- tests/functional/adapter/test_python_model.py | 69 +++++++++++++++++++ 2 files changed, 69 insertions(+), 6 deletions(-) create mode 100644 tests/functional/adapter/test_python_model.py diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index 4a117b11f..ee4e4283b 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -12,8 +12,6 @@ from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod -from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests - class TestSimpleMaterializationsSnowflake(BaseSimpleMaterializations): @@ -55,7 +53,3 @@ class TestBaseAdapterMethodSnowflake(BaseAdapterMethod): @pytest.fixture(scope="class") def equal_tables(self): return ["MODEL", "EXPECTED"] - - -class TestBasePythonModelSnowflake(BasePythonModelTests): - pass diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py new file mode 100644 index 000000000..6ef9dc2c9 --- /dev/null +++ b/tests/functional/adapter/test_python_model.py @@ -0,0 +1,69 @@ +import pytest +from dbt.tests.util import run_dbt, write_file +from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests + +basic_sql = """ +select 1 as id union all +select 1 as id union all +select 1 as id union all +select 1 as id union all +select 1 as id +""" +basic_python = """ +def model(dbt, session): + dbt.config( + materialized='table', + ) + df = dbt.ref("my_sql_model") + df = df.limit(2) + return df +""" + +class TestBasePythonModelSnowflake: + @pytest.fixture(scope="class") + def models(self): + return { + "my_sql_model.sql": basic_sql, + "my_python_model.py": basic_python, + } + + def test_singular_tests(self, project): + # test command + results = run_dbt(["run"]) + assert len(results) == 2 + + +models__simple_python_model = """ +import pandas + +def model(dbt, session): + dbt.config( + materialized='table', + ) + data = [[1,2]] * 10 + return pandas.DataFrame(data, columns=['test', 'test2']) +""" +models__simple_python_model_v2 = """ +import pandas + +def model(dbt, session): + dbt.config( + materialized='table', + ) + data = [[1,2]] * 10 + return pandas.DataFrame(data, columns=['test1', 'test3']) +""" + +class TestChangingSchemaSnowflake: + @pytest.fixture(scope="class") + def models(self): + return { + "simple_python_model.py": models__simple_python_model + } + def test_changing_schema(self,project): + run_dbt(["run"]) + write_file(models__simple_python_model_v2, project.project_root + '/models', "simple_python_model.py") + run_dbt(["run"]) + + + From 61a9ff36d81abe0fe44e9fa1abdf67a9638acefb Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Mon, 27 Jun 2022 22:28:59 -0700 Subject: [PATCH 13/28] adjust for compiled_code syntax --- dbt/include/snowflake/macros/materializations/table.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 681522fb3..2e0f07525 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -23,7 +23,7 @@ schema=schema, database=database) -%} {% set materialization_logic = py_materialize_as_table() %} - {% set setup_stored_proc = py_create_stored_procedure(proc_name, materialization_logic, model, sql) %} + {% set setup_stored_proc = py_create_stored_procedure(proc_name, materialization_logic, model, compiled_code) %} {% do log("Creating stored procedure: " ~ proc_name, info=true) %} {% do run_query(setup_stored_proc) %} @@ -41,7 +41,7 @@ {%- else -%} --build model {% call statement('main') -%} - {{ create_table_as(false, target_relation, sql) }} + {{ create_table_as(false, target_relation, compiled_code) }} {%- endcall %} {%- endif %} From 037ff06cc3afc695ef2588db6045bcbd7acb6532 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 28 Jun 2022 20:17:50 -0700 Subject: [PATCH 14/28] refactor submit python and attampt for incremental --- dbt/adapters/snowflake/impl.py | 31 +++++++ dbt/include/snowflake/macros/adapters.sql | 85 ++++++++++--------- .../macros/materializations/incremental.sql | 25 ++++-- .../macros/materializations/table.sql | 57 ++----------- 4 files changed, 100 insertions(+), 98 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index f2af32795..ea3ff310c 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -4,6 +4,7 @@ import agate from dbt.adapters.base.impl import AdapterConfig +from dbt.adapters.base.meta import available from dbt.adapters.sql import SQLAdapter # type: ignore from dbt.adapters.sql.impl import ( LIST_SCHEMAS_MACRO_NAME, @@ -160,3 +161,33 @@ def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str: def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str: return f"DATEADD({interval}, {number}, {add_to})" + + @available.parse_none + def submit_python_job(self, parsed_model:dict, model_code: str): + schema = getattr(parsed_model, "schema", self.config.credentials.schema) + database = getattr(parsed_model, "database", self.config.credentials.database) + identifier = parsed_model['alias'] + proc_name = f"{database}.{schema}.{identifier}__dbt_sp" + packages = ['snowflake-snowpark-python'] + parsed_model['config'].get('packages', []) + packages = "', ".join(packages) + python_stored_procedure = f""" +CREATE OR REPLACE PROCEDURE {proc_name} () +RETURNS STRING +LANGUAGE PYTHON +RUNTIME_VERSION = '3.8' -- TODO should this be configurable? +PACKAGES = ('{packages}') +HANDLER = 'main' +AS +$$ +{model_code} + +$$; + """ + self.execute(python_stored_procedure, auto_begin=False, fetch=False) + self.execute(f"CALL {proc_name}()", auto_begin=False, fetch=False) + self.execute(f"drop procedure if exists {proc_name}(string)", auto_begin=False, fetch=False) + + # TODO add proper return + return "OK" + + diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index c06f715e8..ee17801b7 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -1,41 +1,49 @@ -{% macro snowflake__create_table_as(temporary, relation, sql) -%} - {%- set transient = config.get('transient', default=true) -%} - {%- set cluster_by_keys = config.get('cluster_by', default=none) -%} - {%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%} - {%- set copy_grants = config.get('copy_grants', default=false) -%} - - {%- if cluster_by_keys is not none and cluster_by_keys is string -%} - {%- set cluster_by_keys = [cluster_by_keys] -%} - {%- endif -%} - {%- if cluster_by_keys is not none -%} - {%- set cluster_by_string = cluster_by_keys|join(", ")-%} - {% else %} - {%- set cluster_by_string = none -%} - {%- endif -%} - {%- set sql_header = config.get('sql_header', none) -%} - - {{ sql_header if sql_header is not none }} - - create or replace {% if temporary -%} - temporary - {%- elif transient -%} - transient - {%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %} as - ( - {%- if cluster_by_string is not none -%} - select * from( - {{ sql }} - ) order by ({{ cluster_by_string }}) - {%- else -%} - {{ sql }} - {%- endif %} - ); - {% if cluster_by_string is not none and not temporary -%} - alter table {{relation}} cluster by ({{cluster_by_string}}); +{% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%} + {%- if language == 'sql' -%} + {%- set transient = config.get('transient', default=true) -%} + {%- set cluster_by_keys = config.get('cluster_by', default=none) -%} + {%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%} + {%- set copy_grants = config.get('copy_grants', default=false) -%} + + {%- if cluster_by_keys is not none and cluster_by_keys is string -%} + {%- set cluster_by_keys = [cluster_by_keys] -%} {%- endif -%} - {% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%} - alter table {{relation}} resume recluster; + {%- if cluster_by_keys is not none -%} + {%- set cluster_by_string = cluster_by_keys|join(", ")-%} + {% else %} + {%- set cluster_by_string = none -%} {%- endif -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none }} + + create or replace {% if temporary -%} + temporary + {%- elif transient -%} + transient + {%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %} as + ( + {%- if cluster_by_string is not none -%} + select * from( + {{ compiled_code }} + ) order by ({{ cluster_by_string }}) + {%- else -%} + {{ compiled_code }} + {%- endif %} + ); + {% if cluster_by_string is not none and not temporary -%} + alter table {{relation}} cluster by ({{cluster_by_string}}); + {%- endif -%} + {% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%} + alter table {{relation}} resume recluster; + {%- endif -%} + + {%- elif language == 'python' -%} + {{ py_complete_script(compiled_code=compiled_code, target_relation=relation) }} + {%- else -%} + {% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language") %} + {%- endif -%} + {% endmacro %} {% macro get_column_comment_sql(column_name, column_dict) %} @@ -273,8 +281,3 @@ {{ snowflake_dml_explicit_transaction(truncate_dml) }} {%- endcall %} {% endmacro %} - -{% macro load_df_def() %} -global snowpark_session -load_df_function = snowpark_session.table -{% endmacro %} diff --git a/dbt/include/snowflake/macros/materializations/incremental.sql b/dbt/include/snowflake/macros/materializations/incremental.sql index 5710284f3..2f754240a 100644 --- a/dbt/include/snowflake/macros/materializations/incremental.sql +++ b/dbt/include/snowflake/macros/materializations/incremental.sql @@ -28,9 +28,10 @@ {% set original_query_tag = set_query_tag() %} + {#-- Set vars --#} {%- set unique_key = config.get('unique_key') -%} {%- set full_refresh_mode = (should_full_refresh()) -%} - + {%- set language = config.get('language') -%} {% set target_relation = this %} {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} @@ -42,19 +43,27 @@ {{ run_hooks(pre_hooks) }} {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {%- call statement('main', language=language) -%} + {{ create_table_as(False, target_relation, compiled_code, language) }} + {%- endcall -%} {% elif existing_relation.is_view %} {#-- Can't overwrite a view with a table - we must drop --#} {{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }} {% do adapter.drop_relation(existing_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - + {%- call statement('main', language=language) -%} + {{ create_table_as(False, target_relation, compiled_code, language) }} + {%- endcall -%} {% elif full_refresh_mode %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {%- call statement('main', language=language) -%} + {{ create_table_as(False, target_relation, compiled_code, language) }} + {%- endcall -%} {% else %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {%- call statement('create_tmp_relation', language=language) -%} + {{ create_table_as(True, tmp_relation, compiled_code, language) }} + {%- endcall -%} + {% do adapter.expand_target_column_types( from_relation=tmp_relation, to_relation=target_relation) %} @@ -63,7 +72,9 @@ {% if not dest_columns %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} - {% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %} + {%- call statement('main') -%} + {{ dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) }} + {%- endcall -%} {% endif %} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 2e0f07525..3e62b03db 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -3,6 +3,7 @@ {% set original_query_tag = set_query_tag() %} {%- set identifier = model['alias'] -%} + {%- set language = config.get('language') -%} {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} {%- set target_relation = api.Relation.create(identifier=identifier, @@ -18,33 +19,9 @@ {{ drop_relation_if_exists(old_relation) }} {% endif %} - {% if config.get('language', 'sql') == 'python' -%}} - {%- set proc_name = api.Relation.create(identifier=identifier ~ "__dbt_sp", - schema=schema, - database=database) -%} - {% set materialization_logic = py_materialize_as_table() %} - {% set setup_stored_proc = py_create_stored_procedure(proc_name, materialization_logic, model, compiled_code) %} - - {% do log("Creating stored procedure: " ~ proc_name, info=true) %} - {% do run_query(setup_stored_proc) %} - {% do log("Finished creating stored procedure: " ~ proc_name, info=true) %} - - --build model - {% call statement('main') -%} - CALL {{ proc_name }}(); - - {%- endcall %} - - -- cleanup stuff - {% do run_query("drop procedure if exists " ~ proc_name ~ "(string)") %} - - {%- else -%} - --build model - {% call statement('main') -%} - {{ create_table_as(false, target_relation, compiled_code) }} - {%- endcall %} - - {%- endif %} + {% call statement('main', language=language) -%} + {{ create_table_as(False, target_relation, compiled_code, language) }} + {%- endcall %} {{ run_hooks(post_hooks) }} @@ -56,32 +33,15 @@ {% endmaterialization %} -{% macro py_materialize_as_table(config) %} +{% macro py_complete_script(compiled_code, target_relation) %} +{{ compiled_code }} def materialize(session, df, target_relation): # we have to make sure pandas is imported import pandas if isinstance(df, pandas.core.frame.DataFrame): # session.write_pandas does not have overwrite function df = session.createDataFrame(df) - df.write.mode("overwrite").save_as_table(str(target_relation)) -{% endmacro %} - -{% macro py_create_stored_procedure(proc_name, materialization_logic, model, user_supplied_logic) %} - -{% set packages = ['snowflake-snowpark-python'] + config.get('packages', []) %} - -CREATE OR REPLACE PROCEDURE {{ proc_name }} () -RETURNS STRING -LANGUAGE PYTHON -RUNTIME_VERSION = '3.8' -- TODO should this be configurable? -PACKAGES = ('{{ packages | join("', '") }}') -HANDLER = 'main' -AS -$$ - -{{ user_supplied_logic }} - -{{ materialization_logic }} + df.write.mode("overwrite").save_as_table("{{ target_relation }}") def main(session): """ @@ -92,7 +52,4 @@ def main(session): df = model(dbt, session) materialize(session, df, dbt.this) return "OK" - -$$; - {% endmacro %} From 4b88f3fa9ffd125a15a611dad1395c033472217e Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 28 Jun 2022 21:00:35 -0700 Subject: [PATCH 15/28] adjust name --- dbt/adapters/snowflake/impl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index ea3ff310c..5cd5d1ed8 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -163,7 +163,7 @@ def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour" return f"DATEADD({interval}, {number}, {add_to})" @available.parse_none - def submit_python_job(self, parsed_model:dict, model_code: str): + def submit_python_job(self, parsed_model:dict, compiled_code: str): schema = getattr(parsed_model, "schema", self.config.credentials.schema) database = getattr(parsed_model, "database", self.config.credentials.database) identifier = parsed_model['alias'] @@ -179,7 +179,7 @@ def submit_python_job(self, parsed_model:dict, model_code: str): HANDLER = 'main' AS $$ -{model_code} +{compiled_code} $$; """ From b0bf163f399bccfd79eeb9e986d3f8b7c3216448 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 7 Jul 2022 18:03:32 -0700 Subject: [PATCH 16/28] fix incremental model --- .../snowflake/macros/materializations/incremental.sql | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/incremental.sql b/dbt/include/snowflake/macros/materializations/incremental.sql index 2f754240a..da7212bd6 100644 --- a/dbt/include/snowflake/macros/materializations/incremental.sql +++ b/dbt/include/snowflake/macros/materializations/incremental.sql @@ -72,16 +72,13 @@ {% if not dest_columns %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} - {%- call statement('main') -%} + + {%- call statement('main') -%} {{ dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) }} {%- endcall -%} {% endif %} - {%- call statement('main') -%} - {{ build_sql }} - {%- endcall -%} - {{ run_hooks(post_hooks) }} {% set target_relation = target_relation.incorporate(type='table') %} From 6e74ed6364e16c907922fc4ed25a4b3beefb0d37 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Fri, 8 Jul 2022 14:55:47 -0700 Subject: [PATCH 17/28] proper tmp table for snowflake incremental model --- dbt/adapters/snowflake/impl.py | 1 + dbt/include/snowflake/macros/adapters.sql | 2 +- dbt/include/snowflake/macros/materializations/table.sql | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 5cd5d1ed8..f5763af0d 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -177,6 +177,7 @@ def submit_python_job(self, parsed_model:dict, compiled_code: str): RUNTIME_VERSION = '3.8' -- TODO should this be configurable? PACKAGES = ('{packages}') HANDLER = 'main' +EXECUTE AS CALLER AS $$ {compiled_code} diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 2c61beb03..0bd1920d7 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -39,7 +39,7 @@ {%- endif -%} {%- elif language == 'python' -%} - {{ py_complete_script(compiled_code=compiled_code, target_relation=relation) }} + {{ py_complete_script(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }} {%- else -%} {% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language") %} {%- endif -%} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 3e62b03db..8f840a501 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -33,7 +33,7 @@ {% endmaterialization %} -{% macro py_complete_script(compiled_code, target_relation) %} +{% macro py_complete_script(compiled_code, target_relation, temporary=False) %} {{ compiled_code }} def materialize(session, df, target_relation): # we have to make sure pandas is imported @@ -41,7 +41,7 @@ def materialize(session, df, target_relation): if isinstance(df, pandas.core.frame.DataFrame): # session.write_pandas does not have overwrite function df = session.createDataFrame(df) - df.write.mode("overwrite").save_as_table("{{ target_relation }}") + df.write.mode("overwrite").save_as_table("{{ target_relation }}", create_temp_table={{temporary}}) def main(session): """ From d4fdbcd9a29a97622b86b8b1f59ce761a95af209 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Mon, 18 Jul 2022 10:38:30 -0700 Subject: [PATCH 18/28] add test for incremental model (#197) --- tests/functional/adapter/test_python_model.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 6ef9dc2c9..321b2204d 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -54,6 +54,8 @@ def model(dbt, session): return pandas.DataFrame(data, columns=['test1', 'test3']) """ + + class TestChangingSchemaSnowflake: @pytest.fixture(scope="class") def models(self): @@ -65,5 +67,47 @@ def test_changing_schema(self,project): write_file(models__simple_python_model_v2, project.project_root + '/models', "simple_python_model.py") run_dbt(["run"]) + +m_1 = """ +{{config(materialized='table')}} +select 1 as id union all +select 2 as id union all +select 3 as id union all +select 4 as id union all +select 5 as id +""" + +incremental_python = """ +def model(dbt, session): + dbt.config(materialized="incremental", unique_key='id') + df = dbt.ref("m_1") + if dbt.is_incremental: + # incremental runs should only apply to + df = df.filter(df.id >= session.sql(f"select max(id) from {dbt.this}").collect()[0][0]) + return df +""" + +class TestIncrementalModelSnowflake: + @pytest.fixture(scope="class") + def models(self): + return { + "m_1.sql": m_1, + "incremental.py": incremental_python + } + + def test_incremental(self,project): + # create m_1 and run incremental model the first time + run_dbt(["run"]) + assert project.run_sql(f"select count(*) from {project.database}.{project.test_schema}.incremental", fetch="one")[0] == 5 + # running incremental model again will not cause any changes in the result model + run_dbt(["run", "-s", "incremental"]) + assert project.run_sql(f"select count(*) from {project.database}.{project.test_schema}.incremental", fetch="one")[0] == 5 + # add 3 records with one supposed to be filtered out + project.run_sql(f"insert into {project.database}.{project.test_schema}.m_1(id) values (0), (6), (7)") + # validate that incremental model would correctly add 2 valid records to result model + run_dbt(["run", "-s", "incremental"]) + assert project.run_sql(f"select count(*) from {project.database}.{project.test_schema}.incremental", fetch="one")[0] == 7 + + From ec792d5c07e0f19d30f66adeacf70a8ae361896f Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Mon, 18 Jul 2022 17:45:39 -0700 Subject: [PATCH 19/28] update python procedure result --- dbt/adapters/snowflake/impl.py | 6 ++---- dbt/include/snowflake/macros/adapters.sql | 2 +- dbt/include/snowflake/macros/materializations/table.sql | 5 +---- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index f5763af0d..3b7f72a8b 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -185,10 +185,8 @@ def submit_python_job(self, parsed_model:dict, compiled_code: str): $$; """ self.execute(python_stored_procedure, auto_begin=False, fetch=False) - self.execute(f"CALL {proc_name}()", auto_begin=False, fetch=False) + response, _ = self.execute(f"CALL {proc_name}()", auto_begin=False, fetch=False) self.execute(f"drop procedure if exists {proc_name}(string)", auto_begin=False, fetch=False) - - # TODO add proper return - return "OK" + return response diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 0bd1920d7..b25e1b2ec 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -41,7 +41,7 @@ {%- elif language == 'python' -%} {{ py_complete_script(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }} {%- else -%} - {% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language") %} + {% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %} {%- endif -%} {% endmacro %} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 8f840a501..f252c33e2 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -44,10 +44,7 @@ def materialize(session, df, target_relation): df.write.mode("overwrite").save_as_table("{{ target_relation }}", create_temp_table={{temporary}}) def main(session): - """ - TODOs: - - what should this return? can we make a real RunResult? - """ + dbt = dbtObj(session.table) df = model(dbt, session) materialize(session, df, dbt.this) From 3e9518c5317c374798816bba8fd23f90af69cb3b Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 19 Jul 2022 13:16:26 -0700 Subject: [PATCH 20/28] Update dbt/adapters/snowflake/impl.py Co-authored-by: Jeremy Cohen --- dbt/adapters/snowflake/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 3b7f72a8b..ca5251de5 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -169,7 +169,7 @@ def submit_python_job(self, parsed_model:dict, compiled_code: str): identifier = parsed_model['alias'] proc_name = f"{database}.{schema}.{identifier}__dbt_sp" packages = ['snowflake-snowpark-python'] + parsed_model['config'].get('packages', []) - packages = "', ".join(packages) + packages = "', '".join(packages) python_stored_procedure = f""" CREATE OR REPLACE PROCEDURE {proc_name} () RETURNS STRING From 8f99b3e6b19ac418d3f4e995a473f0bbc1744610 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Fri, 22 Jul 2022 14:11:45 -0700 Subject: [PATCH 21/28] fix testing --- dev-requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 6711f16df..a0172413c 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git@feature/python-model-v1#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@feature/python-model-v1#egg=dbt-tests-adapter&subdirectory=tests/adapter black==22.3.0 click~=8.0.4 From ccccf79e0f36301e2637d5b7b3d9b453272bf567 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Fri, 22 Jul 2022 14:18:05 -0700 Subject: [PATCH 22/28] fix format --- dbt/adapters/snowflake/impl.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index ca5251de5..09ea537f2 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -107,9 +107,7 @@ def get_columns_in_relation(self, relation): else: raise - def list_relations_without_caching( - self, schema_relation: SnowflakeRelation - ) -> List[SnowflakeRelation]: + def list_relations_without_caching(self, schema_relation: SnowflakeRelation) -> List[SnowflakeRelation]: # type: ignore kwargs = {"schema_relation": schema_relation} try: results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) @@ -163,12 +161,12 @@ def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour" return f"DATEADD({interval}, {number}, {add_to})" @available.parse_none - def submit_python_job(self, parsed_model:dict, compiled_code: str): + def submit_python_job(self, parsed_model: dict, compiled_code: str): schema = getattr(parsed_model, "schema", self.config.credentials.schema) database = getattr(parsed_model, "database", self.config.credentials.database) - identifier = parsed_model['alias'] + identifier = parsed_model["alias"] proc_name = f"{database}.{schema}.{identifier}__dbt_sp" - packages = ['snowflake-snowpark-python'] + parsed_model['config'].get('packages', []) + packages = ["snowflake-snowpark-python"] + parsed_model["config"].get("packages", []) packages = "', '".join(packages) python_stored_procedure = f""" CREATE OR REPLACE PROCEDURE {proc_name} () @@ -186,7 +184,9 @@ def submit_python_job(self, parsed_model:dict, compiled_code: str): """ self.execute(python_stored_procedure, auto_begin=False, fetch=False) response, _ = self.execute(f"CALL {proc_name}()", auto_begin=False, fetch=False) - self.execute(f"drop procedure if exists {proc_name}(string)", auto_begin=False, fetch=False) + self.execute( + f"drop procedure if exists {proc_name}(string)", + auto_begin=False, + fetch=False, + ) return response - - From f7d65315da8d539db0aea10304a32d60eb7388e4 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Sat, 23 Jul 2022 13:35:26 -0700 Subject: [PATCH 23/28] update write_table macro name --- dbt/include/snowflake/macros/adapters.sql | 2 +- .../macros/materializations/table.sql | 3 +- tests/functional/adapter/test_python_model.py | 32 ++----------------- 3 files changed, 4 insertions(+), 33 deletions(-) diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index b25e1b2ec..e956982c5 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -39,7 +39,7 @@ {%- endif -%} {%- elif language == 'python' -%} - {{ py_complete_script(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }} + {{ py_write_table(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }} {%- else -%} {% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %} {%- endif -%} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index f252c33e2..05d58c8a3 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -33,7 +33,7 @@ {% endmaterialization %} -{% macro py_complete_script(compiled_code, target_relation, temporary=False) %} +{% macro py_write_table(compiled_code, target_relation, temporary=False) %} {{ compiled_code }} def materialize(session, df, target_relation): # we have to make sure pandas is imported @@ -44,7 +44,6 @@ def materialize(session, df, target_relation): df.write.mode("overwrite").save_as_table("{{ target_relation }}", create_temp_table={{temporary}}) def main(session): - dbt = dbtObj(session.table) df = model(dbt, session) materialize(session, df, dbt.this) diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 321b2204d..d8ef93066 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -2,36 +2,8 @@ from dbt.tests.util import run_dbt, write_file from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests -basic_sql = """ -select 1 as id union all -select 1 as id union all -select 1 as id union all -select 1 as id union all -select 1 as id -""" -basic_python = """ -def model(dbt, session): - dbt.config( - materialized='table', - ) - df = dbt.ref("my_sql_model") - df = df.limit(2) - return df -""" - -class TestBasePythonModelSnowflake: - @pytest.fixture(scope="class") - def models(self): - return { - "my_sql_model.sql": basic_sql, - "my_python_model.py": basic_python, - } - - def test_singular_tests(self, project): - # test command - results = run_dbt(["run"]) - assert len(results) == 2 - +class TestPythonModelSpark(BasePythonModelTests): + pass models__simple_python_model = """ import pandas From 86e55ef626854dc32194a4bc6e4b97809784d2db Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Mon, 25 Jul 2022 17:47:57 -0700 Subject: [PATCH 24/28] adjust to new syntax --- .../macros/materializations/incremental.sql | 2 +- .../snowflake/macros/materializations/table.sql | 2 +- tests/functional/adapter/test_python_model.py | 7 +++++++ .../integration/defer_state_test/test_defer_state.py | 12 ++++++------ 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/incremental.sql b/dbt/include/snowflake/macros/materializations/incremental.sql index 2a8588aac..c6dee3e8b 100644 --- a/dbt/include/snowflake/macros/materializations/incremental.sql +++ b/dbt/include/snowflake/macros/materializations/incremental.sql @@ -5,7 +5,7 @@ {#-- Set vars --#} {%- set unique_key = config.get('unique_key') -%} {%- set full_refresh_mode = (should_full_refresh()) -%} - {%- set language = config.get('language') -%} + {%- set language = model['language'] -%} {% set target_relation = this %} {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index b6b1cb870..65096bc9a 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -3,7 +3,7 @@ {% set original_query_tag = set_query_tag() %} {%- set identifier = model['alias'] -%} - {%- set language = config.get('language') -%} + {%- set language = model['language'] -%} {% set grant_config = config.get('grants') %} diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index d8ef93066..6be22af89 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -60,6 +60,13 @@ def model(dbt, session): """ class TestIncrementalModelSnowflake: + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { "+incremental_strategy": "delete+insert" } + } + @pytest.fixture(scope="class") def models(self): return { diff --git a/tests/integration/defer_state_test/test_defer_state.py b/tests/integration/defer_state_test/test_defer_state.py index 7296210cf..2d14c6103 100644 --- a/tests/integration/defer_state_test/test_defer_state.py +++ b/tests/integration/defer_state_test/test_defer_state.py @@ -70,8 +70,8 @@ def run_and_defer(self): # with state it should work though results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema']) - assert self.other_schema not in results[0].node.compiled_sql - assert self.unique_schema() in results[0].node.compiled_sql + assert self.other_schema not in results[0].node.compiled_code + assert self.unique_schema() in results[0].node.compiled_code with open('target/manifest.json') as fp: data = json.load(fp) @@ -122,8 +122,8 @@ def run_defer_iff_not_exists(self): assert len(results) == 2 # because the seed now exists in our schema, we shouldn't defer it - assert self.other_schema not in results[0].node.compiled_sql - assert self.unique_schema() in results[0].node.compiled_sql + assert self.other_schema not in results[0].node.compiled_code + assert self.unique_schema() in results[0].node.compiled_code def run_defer_deleted_upstream(self): results = self.run_dbt(['seed']) @@ -144,8 +144,8 @@ def run_defer_deleted_upstream(self): # despite deferral, test should use models just created in our schema results = self.run_dbt(['test', '--state', 'state', '--defer']) - assert self.other_schema not in results[0].node.compiled_sql - assert self.unique_schema() in results[0].node.compiled_sql + assert self.other_schema not in results[0].node.compiled_code + assert self.unique_schema() in results[0].node.compiled_code @use_profile('snowflake') def test_snowflake_state_changetarget(self): From 68df73fcd5b6d346300aa57c8f9ad76a38600563 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Mon, 25 Jul 2022 18:03:12 -0700 Subject: [PATCH 25/28] fix tox --- dbt/adapters/snowflake/impl.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index d038101ee..0ee3c995f 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -10,7 +10,6 @@ LIST_SCHEMAS_MACRO_NAME, LIST_RELATIONS_MACRO_NAME, ) -from dbt.adapters.base.meta import available from dbt.adapters.snowflake import SnowflakeConnectionManager from dbt.adapters.snowflake import SnowflakeRelation from dbt.adapters.snowflake import SnowflakeColumn @@ -208,4 +207,3 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str): def valid_incremental_strategies(self): return ["append", "merge", "delete+insert"] - From 03c61cc4ff3f14b883317ecc2848f00820bb8a05 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 27 Jul 2022 17:38:48 -0700 Subject: [PATCH 26/28] add comment to run code --- .../snowflake/macros/materializations/table.sql | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index 65096bc9a..d2bc9718e 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -54,3 +54,17 @@ def main(session): materialize(session, df, dbt.this) return "OK" {% endmacro %} + +{%macro py_script_comment()%} +# To run this in snowsight, you need to select entry point to be main +# And you may have to modify the return type to text to get the result back +# def main(session): +# dbt = dbtObj(session.table) +# df = model(dbt, session) +# return df.collect() + +# to run this in local notebook, you need to create a session following examples https://github.com/Snowflake-Labs/sfguide-getting-started-snowpark-python +# then you can do the following to run model +# dbt = dbtObj(session.table) +# df = model(dbt, session) +{%endmacro%} From ed9a6ec7c54c3e1c727f79cc267286453d49d78c Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 28 Jul 2022 11:44:20 -0700 Subject: [PATCH 27/28] move tests to base and adjust core version --- dev-requirements.txt | 4 +- tests/functional/adapter/test_python_model.py | 59 ++----------------- 2 files changed, 7 insertions(+), 56 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index a0172413c..6711f16df 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git@feature/python-model-v1#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git@feature/python-model-v1#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter black==22.3.0 click~=8.0.4 diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 6be22af89..6923c8694 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -1,8 +1,11 @@ import pytest from dbt.tests.util import run_dbt, write_file -from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests +from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests, BasePythonIncrementalTests -class TestPythonModelSpark(BasePythonModelTests): +class TestPythonModelSnowflake(BasePythonModelTests): + pass + +class TestIncrementalSnowflake(BasePythonIncrementalTests): pass models__simple_python_model = """ @@ -38,55 +41,3 @@ def test_changing_schema(self,project): run_dbt(["run"]) write_file(models__simple_python_model_v2, project.project_root + '/models', "simple_python_model.py") run_dbt(["run"]) - - -m_1 = """ -{{config(materialized='table')}} -select 1 as id union all -select 2 as id union all -select 3 as id union all -select 4 as id union all -select 5 as id -""" - -incremental_python = """ -def model(dbt, session): - dbt.config(materialized="incremental", unique_key='id') - df = dbt.ref("m_1") - if dbt.is_incremental: - # incremental runs should only apply to - df = df.filter(df.id >= session.sql(f"select max(id) from {dbt.this}").collect()[0][0]) - return df -""" - -class TestIncrementalModelSnowflake: - - @pytest.fixture(scope="class") - def project_config_update(self): - return { - "models": { "+incremental_strategy": "delete+insert" } - } - - @pytest.fixture(scope="class") - def models(self): - return { - "m_1.sql": m_1, - "incremental.py": incremental_python - } - - def test_incremental(self,project): - # create m_1 and run incremental model the first time - run_dbt(["run"]) - assert project.run_sql(f"select count(*) from {project.database}.{project.test_schema}.incremental", fetch="one")[0] == 5 - # running incremental model again will not cause any changes in the result model - run_dbt(["run", "-s", "incremental"]) - assert project.run_sql(f"select count(*) from {project.database}.{project.test_schema}.incremental", fetch="one")[0] == 5 - # add 3 records with one supposed to be filtered out - project.run_sql(f"insert into {project.database}.{project.test_schema}.m_1(id) values (0), (6), (7)") - # validate that incremental model would correctly add 2 valid records to result model - run_dbt(["run", "-s", "incremental"]) - assert project.run_sql(f"select count(*) from {project.database}.{project.test_schema}.incremental", fetch="one")[0] == 7 - - - - From 1df614d861874576e431ce7430214647de5408bb Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 28 Jul 2022 12:16:05 -0700 Subject: [PATCH 28/28] add changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index accae9b35..0c6450fd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## dbt-snowflake 1.3.0b1 (Release TBD) +### Features +- support python model through create stored procedure with python in it, currently supported materializations are table and incremental. ([#182](https://github.com/dbt-labs/dbt-snowflake/pull/182)) + ### Under the Hood - Reformat overridden macro location of grants work to a apply_grants.sql file in snowflake ([#193](https://github.com/dbt-labs/dbt-snowflake/issues/193), [#192](https://github.com/dbt-labs/dbt-snowflake/pull/192)) - Support dbt Core incremental materialization refactor ([#195](https://github.com/dbt-labs/dbt-snowflake/issues/195), [#196](https://github.com/dbt-labs/dbt-snowflake/pull/196))