Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/python model v1 #182

Merged
merged 34 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
872bbad
push snowflake implementation refactored from drews code
ChenyuLInx May 4, 2022
1797501
add py_materialize_as_table
ChenyuLInx May 4, 2022
68c66f7
add load df def
ChenyuLInx May 6, 2022
8a6b066
Bump version to 1.3.0a1
jtcohen6 May 6, 2022
49bb604
make session global so we can load table with dbt ref
ChenyuLInx Jun 2, 2022
1490de2
update requirement for the moment and fix format
ChenyuLInx Jun 2, 2022
d61a212
remove logging and add functional test
ChenyuLInx Jun 2, 2022
86e6f11
Misc experimentation
jtcohen6 Jun 6, 2022
1cc2158
make snowflake work
ChenyuLInx Jun 8, 2022
bc0b241
fix indentation
ChenyuLInx Jun 8, 2022
2f4c113
update materialization macro
ChenyuLInx Jun 8, 2022
2bfddc6
resolve pandas df save and update entry function name
ChenyuLInx Jun 16, 2022
7d2c51b
update python model tests
ChenyuLInx Jun 16, 2022
61a9ff3
adjust for compiled_code syntax
ChenyuLInx Jun 28, 2022
4219827
Merge pull request #3 from dbt-labs/python-model
ChenyuLInx Jun 28, 2022
037ff06
refactor submit python and attampt for incremental
ChenyuLInx Jun 29, 2022
4b88f3f
adjust name
ChenyuLInx Jun 29, 2022
85e34f5
refactor submit and misc expirements
ChenyuLInx Jun 29, 2022
c76cc0b
Merge branch 'main' into feature/python-model-v1
ChenyuLInx Jul 7, 2022
b0bf163
fix incremental model
ChenyuLInx Jul 8, 2022
6e74ed6
proper tmp table for snowflake incremental model
ChenyuLInx Jul 8, 2022
d4fdbcd
add test for incremental model (#197)
ChenyuLInx Jul 18, 2022
ec792d5
update python procedure result
ChenyuLInx Jul 19, 2022
3e9518c
Update dbt/adapters/snowflake/impl.py
ChenyuLInx Jul 19, 2022
8f99b3e
fix testing
ChenyuLInx Jul 22, 2022
ccccf79
fix format
ChenyuLInx Jul 22, 2022
f7d6531
update write_table macro name
ChenyuLInx Jul 23, 2022
e450e28
Merge branch 'main' into feature/python-model-v1
ChenyuLInx Jul 26, 2022
86e55ef
adjust to new syntax
ChenyuLInx Jul 26, 2022
68df73f
fix tox
ChenyuLInx Jul 26, 2022
190eafd
Merge branch 'main' into feature/python-model-v1
ChenyuLInx Jul 27, 2022
03c61cc
add comment to run code
ChenyuLInx Jul 28, 2022
ed9a6ec
move tests to base and adjust core version
ChenyuLInx Jul 28, 2022
1df614d
add changelog
ChenyuLInx Jul 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbt/adapters/snowflake/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.2.0a1"
version = "1.3.0a1"
31 changes: 31 additions & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import agate

from dbt.adapters.base.impl import AdapterConfig
from dbt.adapters.base.meta import available
from dbt.adapters.sql import SQLAdapter # type: ignore
from dbt.adapters.sql.impl import (
LIST_SCHEMAS_MACRO_NAME,
Expand Down Expand Up @@ -160,3 +161,33 @@ def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str:

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"DATEADD({interval}, {number}, {add_to})"

@available.parse_none
def submit_python_job(self, parsed_model:dict, compiled_code: str):
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
database = getattr(parsed_model, "database", self.config.credentials.database)
identifier = parsed_model['alias']
proc_name = f"{database}.{schema}.{identifier}__dbt_sp"
packages = ['snowflake-snowpark-python'] + parsed_model['config'].get('packages', [])
packages = "', ".join(packages)
ChenyuLInx marked this conversation as resolved.
Show resolved Hide resolved
python_stored_procedure = f"""
CREATE OR REPLACE PROCEDURE {proc_name} ()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8' -- TODO should this be configurable?
PACKAGES = ('{packages}')
HANDLER = 'main'
AS
$$
{compiled_code}

$$;
"""
self.execute(python_stored_procedure, auto_begin=False, fetch=False)
self.execute(f"CALL {proc_name}()", auto_begin=False, fetch=False)
self.execute(f"drop procedure if exists {proc_name}(string)", auto_begin=False, fetch=False)

# TODO add proper return
return "OK"


80 changes: 44 additions & 36 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,41 +1,49 @@
{% macro snowflake__create_table_as(temporary, relation, sql) -%}
{%- set transient = config.get('transient', default=true) -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}

{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
{%- endif -%}
{%- if cluster_by_keys is not none -%}
{%- set cluster_by_string = cluster_by_keys|join(", ")-%}
{% else %}
{%- set cluster_by_string = none -%}
{%- endif -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %} as
(
{%- if cluster_by_string is not none -%}
select * from(
{{ sql }}
) order by ({{ cluster_by_string }})
{%- else -%}
{{ sql }}
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
{% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%}
{%- if language == 'sql' -%}
{%- set transient = config.get('transient', default=true) -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}

{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{%- if cluster_by_keys is not none -%}
{%- set cluster_by_string = cluster_by_keys|join(", ")-%}
{% else %}
{%- set cluster_by_string = none -%}
{%- endif -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %} as
(
{%- if cluster_by_string is not none -%}
select * from(
{{ compiled_code }}
) order by ({{ cluster_by_string }})
{%- else -%}
{{ compiled_code }}
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{%- endif -%}

{%- elif language == 'python' -%}
{{ py_complete_script(compiled_code=compiled_code, target_relation=relation) }}
{%- else -%}
{% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language") %}
{%- endif -%}

{% endmacro %}

{% macro get_column_comment_sql(column_name, column_dict) %}
Expand Down
25 changes: 18 additions & 7 deletions dbt/include/snowflake/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@

{% set original_query_tag = set_query_tag() %}

{#-- Set vars --#}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set language = config.get('language') -%}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
Expand All @@ -42,19 +43,27 @@
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}

{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{%- call statement('create_tmp_relation', language=language) -%}
{{ create_table_as(True, tmp_relation, compiled_code, language) }}
{%- endcall -%}

{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
Expand All @@ -63,7 +72,9 @@
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{%- call statement('main') -%}
{{ dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) }}
{%- endcall -%}

{% endif %}

Expand Down
27 changes: 24 additions & 3 deletions dbt/include/snowflake/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{% set original_query_tag = set_query_tag() %}

{%- set identifier = model['alias'] -%}
{%- set language = config.get('language') -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
Expand All @@ -18,9 +19,8 @@
{{ drop_relation_if_exists(old_relation) }}
{% endif %}

--build model
{% call statement('main') -%}
{{ create_table_as(false, target_relation, sql) }}
{% call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall %}

{{ run_hooks(post_hooks) }}
Expand All @@ -32,3 +32,24 @@
{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

{% macro py_complete_script(compiled_code, target_relation) %}
{{ compiled_code }}
def materialize(session, df, target_relation):
# we have to make sure pandas is imported
import pandas
if isinstance(df, pandas.core.frame.DataFrame):
# session.write_pandas does not have overwrite function
df = session.createDataFrame(df)
df.write.mode("overwrite").save_as_table("{{ target_relation }}")

def main(session):
"""
TODOs:
- what should this return? can we make a real RunResult?
"""
dbt = dbtObj(session.table)
df = model(dbt, session)
materialize(session, df, dbt.this)
return "OK"
{% endmacro %}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _get_dbt_core_version():


package_name = "dbt-snowflake"
package_version = "1.2.0a1"
package_version = "1.3.0a1"
dbt_core_version = _get_dbt_core_version()
description = """The Snowflake adapter plugin for dbt"""

Expand Down
3 changes: 2 additions & 1 deletion tests/functional/adapter/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp
from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod


class TestSimpleMaterializationsSnowflake(BaseSimpleMaterializations):
pass

Expand Down Expand Up @@ -51,4 +52,4 @@ class TestSnapshotTimestampSnowflake(BaseSnapshotTimestamp):
class TestBaseAdapterMethodSnowflake(BaseAdapterMethod):
@pytest.fixture(scope="class")
def equal_tables(self):
return ["MODEL", "EXPECTED"]
return ["MODEL", "EXPECTED"]
69 changes: 69 additions & 0 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest
from dbt.tests.util import run_dbt, write_file
from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests

basic_sql = """
select 1 as id union all
select 1 as id union all
select 1 as id union all
select 1 as id union all
select 1 as id
"""
basic_python = """
def model(dbt, session):
dbt.config(
materialized='table',
)
df = dbt.ref("my_sql_model")
df = df.limit(2)
return df
"""

class TestBasePythonModelSnowflake:
@pytest.fixture(scope="class")
def models(self):
return {
"my_sql_model.sql": basic_sql,
"my_python_model.py": basic_python,
}

def test_singular_tests(self, project):
# test command
results = run_dbt(["run"])
assert len(results) == 2


models__simple_python_model = """
import pandas

def model(dbt, session):
dbt.config(
materialized='table',
)
data = [[1,2]] * 10
return pandas.DataFrame(data, columns=['test', 'test2'])
"""
models__simple_python_model_v2 = """
import pandas

def model(dbt, session):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the model code looks like, we are enforcing session here because it is local to the function and certain operations depends on it

dbt.config(
materialized='table',
)
data = [[1,2]] * 10
return pandas.DataFrame(data, columns=['test1', 'test3'])
ChenyuLInx marked this conversation as resolved.
Show resolved Hide resolved
"""

class TestChangingSchemaSnowflake:
@pytest.fixture(scope="class")
def models(self):
return {
"simple_python_model.py": models__simple_python_model
}
def test_changing_schema(self,project):
run_dbt(["run"])
write_file(models__simple_python_model_v2, project.project_root + '/models', "simple_python_model.py")
run_dbt(["run"])