diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 69da11802..5b5881eed 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -258,28 +258,20 @@ def list_relations_without_caching( return [] raise - # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory - columns = ["database_name", "schema_name", "name", "kind"] - if "is_dynamic" in schema_objects.column_names: - columns.append("is_dynamic") - if "is_iceberg" in schema_objects.column_names: + # this can be collapsed once Snowflake adds is_iceberg to show objects + columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"] + if self.behavior.enable_iceberg_materializations.no_warn: columns.append("is_iceberg") return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)] def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation: - # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory - # this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects - try: - if self.behavior.enable_iceberg_materializations.no_warn: - database, schema, identifier, relation_type, is_dynamic, is_iceberg = result - else: - database, schema, identifier, relation_type, is_dynamic = result - except ValueError: - database, schema, identifier, relation_type = result - is_dynamic = "N" - if self.behavior.enable_iceberg_materializations.no_warn: - is_iceberg = "N" + # this can be collapsed once Snowflake adds is_iceberg to show objects + if self.behavior.enable_iceberg_materializations.no_warn: + database, schema, identifier, relation_type, is_dynamic, is_iceberg = result + else: + database, schema, identifier, relation_type, is_dynamic = result + is_iceberg = "N" try: relation_type = self.Relation.get_relation_type(relation_type.lower()) @@ -289,13 +281,8 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation if relation_type == self.Relation.Table and is_dynamic == "Y": relation_type = self.Relation.DynamicTable - # This line is the main gate on supporting Iceberg materializations. Pass forward a default - # table format, and no downstream table macros can build iceberg relations. - table_format: str = ( - TableFormat.ICEBERG - if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES") - else TableFormat.DEFAULT - ) + table_format = TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT + quote_policy = {"database": True, "schema": True, "identifier": True} return self.Relation.create( diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py deleted file mode 100644 index a17f5d267..000000000 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py +++ /dev/null @@ -1,186 +0,0 @@ -from typing import Optional, Tuple - -import pytest - -from dbt.tests.util import ( - get_model_file, - run_dbt, - run_dbt_and_capture, - set_model_file, -) - -from dbt.adapters.snowflake.relation import SnowflakeRelation, SnowflakeRelationType -from tests.functional.adapter.dynamic_table_tests.files import ( - MY_DYNAMIC_TABLE, - MY_SEED, - MY_TABLE, - MY_VIEW, -) -from tests.functional.adapter.dynamic_table_tests.utils import query_relation_type - - -class TestSnowflakeDynamicTableBasic: - @staticmethod - def insert_record(project, table: SnowflakeRelation, record: Tuple[int, int]): - my_id, value = record - project.run_sql(f"insert into {table} (id, value) values ({my_id}, {value})") - - @staticmethod - def refresh_dynamic_table(project, dynamic_table: SnowflakeRelation): - sql = f"alter dynamic table {dynamic_table} refresh" - project.run_sql(sql) - - @staticmethod - def query_row_count(project, relation: SnowflakeRelation) -> int: - sql = f"select count(*) from {relation}" - return project.run_sql(sql, fetch="one")[0] - - @staticmethod - def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - return query_relation_type(project, relation) - - @pytest.fixture(scope="class", autouse=True) - def seeds(self): - return {"my_seed.csv": MY_SEED} - - @pytest.fixture(scope="class", autouse=True) - def models(self): - yield { - "my_table.sql": MY_TABLE, - "my_view.sql": MY_VIEW, - "my_dynamic_table.sql": MY_DYNAMIC_TABLE, - } - - @pytest.fixture(scope="class") - def my_dynamic_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_dynamic_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.DynamicTable, - ) - - @pytest.fixture(scope="class") - def my_view(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_view", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.View, - ) - - @pytest.fixture(scope="class") - def my_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.Table, - ) - - @pytest.fixture(scope="class") - def my_seed(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_seed", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.Table, - ) - - @staticmethod - def load_model(project, current_model, new_model): - model_to_load = get_model_file(project, new_model) - set_model_file(project, current_model, model_to_load) - - @pytest.fixture(scope="function", autouse=True) - def setup(self, project, my_dynamic_table, my_view, my_table): - run_dbt(["seed"]) - run_dbt(["run", "--models", my_dynamic_table.identifier, "--full-refresh"]) - - # the tests touch these files, store their contents in memory - my_dynamic_table_config = get_model_file(project, my_dynamic_table) - my_view_config = get_model_file(project, my_view) - my_table_config = get_model_file(project, my_table) - - yield - - # and then reset them after the test runs - set_model_file(project, my_dynamic_table, my_dynamic_table_config) - set_model_file(project, my_view, my_view_config) - set_model_file(project, my_table, my_table_config) - project.run_sql(f"drop schema if exists {project.test_schema} cascade") - - def test_dynamic_table_create(self, project, my_dynamic_table): - # setup creates it; verify it's there - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_create_idempotent(self, project, my_dynamic_table): - # setup creates it once; verify it's there and run once - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_full_refresh(self, project, my_dynamic_table): - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.identifier, "--full-refresh"] - ) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_replaces_table(self, project, my_table, my_dynamic_table): - run_dbt(["run", "--models", my_table.identifier]) - assert self.query_relation_type(project, my_table) == "table" - - self.load_model(project, my_table, my_dynamic_table) - - run_dbt(["run", "--models", my_table.identifier]) - assert self.query_relation_type(project, my_table) == "dynamic_table" - - def test_dynamic_table_replaces_view(self, project, my_view, my_dynamic_table): - run_dbt(["run", "--models", my_view.identifier]) - assert self.query_relation_type(project, my_view) == "view" - - self.load_model(project, my_view, my_dynamic_table) - - run_dbt(["run", "--models", my_view.identifier]) - assert self.query_relation_type(project, my_view) == "dynamic_table" - - def test_table_replaces_dynamic_table(self, project, my_dynamic_table, my_table): - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - self.load_model(project, my_dynamic_table, my_table) - - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "table" - - def test_view_replaces_dynamic_table(self, project, my_dynamic_table, my_view): - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - self.load_model(project, my_dynamic_table, my_view) - - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "view" - - def test_dynamic_table_only_updates_after_refresh(self, project, my_dynamic_table, my_seed): - # poll database - table_start = self.query_row_count(project, my_seed) - view_start = self.query_row_count(project, my_dynamic_table) - - # insert new record in table - self.insert_record(project, my_seed, (4, 400)) - - # poll database - table_mid = self.query_row_count(project, my_seed) - view_mid = self.query_row_count(project, my_dynamic_table) - - # refresh the materialized view - self.refresh_dynamic_table(project, my_dynamic_table) - - # poll database - table_end = self.query_row_count(project, my_seed) - view_end = self.query_row_count(project, my_dynamic_table) - - # new records were inserted in the table but didn't show up in the view until it was refreshed - assert table_start < table_mid == table_end - assert view_start == view_mid < view_end diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py deleted file mode 100644 index a58b76f29..000000000 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py +++ /dev/null @@ -1,307 +0,0 @@ -from typing import Optional - -import pytest - -from dbt_common.contracts.config.materialization import OnConfigurationChangeOption -from dbt.tests.util import ( - assert_message_in_logs, - get_model_file, - run_dbt, - run_dbt_and_capture, - set_model_file, -) - -from dbt.adapters.snowflake.relation import SnowflakeRelation, SnowflakeRelationType -from tests.functional.adapter.dynamic_table_tests.files import ( - MY_DYNAMIC_TABLE, - MY_SEED, -) -from tests.functional.adapter.dynamic_table_tests.utils import ( - query_refresh_mode, - query_relation_type, - query_target_lag, - query_warehouse, -) - - -class SnowflakeDynamicTableChanges: - @staticmethod - def check_start_state(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "2 minutes" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - assert query_refresh_mode(project, dynamic_table) == "INCREMENTAL" - - @staticmethod - def change_config_via_alter(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace( - "target_lag='2 minutes'", "target_lag='5 minutes'" - ) - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def change_config_via_alter_downstream(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace( - "target_lag='2 minutes'", "target_lag='DOWNSTREAM'" - ) - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def check_state_alter_change_is_applied(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "5 minutes" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - - @staticmethod - def check_state_alter_change_is_applied_downstream(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "DOWNSTREAM" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - - @staticmethod - def change_config_via_replace(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace("refresh_mode='INCREMENTAL'", "refresh_mode='FULL'") - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def check_state_replace_change_is_applied(project, dynamic_table): - assert query_refresh_mode(project, dynamic_table) == "FULL" - - @staticmethod - def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - return query_relation_type(project, relation) - - @pytest.fixture(scope="class", autouse=True) - def seeds(self): - yield {"my_seed.csv": MY_SEED} - - @pytest.fixture(scope="class", autouse=True) - def models(self): - yield {"my_dynamic_table.sql": MY_DYNAMIC_TABLE} - - @pytest.fixture(scope="class") - def my_dynamic_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_dynamic_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.DynamicTable, - ) - - @pytest.fixture(scope="function", autouse=True) - def setup(self, project, my_dynamic_table): - # make sure the model in the data reflects the files each time - run_dbt(["seed"]) - run_dbt(["run", "--models", my_dynamic_table.identifier, "--full-refresh"]) - - # the tests touch these files, store their contents in memory - initial_model = get_model_file(project, my_dynamic_table) - - # verify the initial settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - yield - - # and then reset them after the test runs - set_model_file(project, my_dynamic_table, initial_model) - - # ensure clean slate each method - project.run_sql(f"drop schema if exists {project.test_schema} cascade") - - def test_full_refresh_occurs_with_changes(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.identifier, "--full-refresh"] - ) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - self.check_state_replace_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - - -class TestSnowflakeDynamicTableChangesApply(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}} - - def test_change_is_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_applied_via_alter_downstream(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter_downstream(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied_downstream(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - @pytest.mark.skip( - "dbt-snowflake does not currently monitor any changes the trigger a full refresh" - ) - def test_change_is_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - self.check_state_replace_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - - -class TestSnowflakeDynamicTableChangesContinue(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Continue.value}} - - def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `continue` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `continue` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - -class TestSnowflakeDynamicTableChangesFailMixin(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Fail.value}} - - def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False - ) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `fail` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False - ) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `fail` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) diff --git a/tests/functional/adapter/dynamic_table_tests/utils.py b/tests/functional/adapter/dynamic_table_tests/utils.py deleted file mode 100644 index d72b231c9..000000000 --- a/tests/functional/adapter/dynamic_table_tests/utils.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Optional - -import agate -from dbt.tests.util import get_connection - -from dbt.adapters.snowflake.relation import SnowflakeRelation - - -def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - sql = f""" - select - case - when table_type = 'BASE TABLE' and is_dynamic = 'YES' then 'dynamic_table' - when table_type = 'BASE TABLE' then 'table' - when table_type = 'VIEW' then 'view' - when table_type = 'EXTERNAL TABLE' then 'external_table' - end as relation_type - from information_schema.tables - where table_name like '{relation.identifier.upper()}' - and table_schema like '{relation.schema.upper()}' - and table_catalog like '{relation.database.upper()}' - """ - results = project.run_sql(sql, fetch="one") - if results is None or len(results) == 0: - return None - elif len(results) > 1: - raise ValueError(f"More than one instance of {relation.name} found!") - else: - return results[0].lower() - - -def query_target_lag(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("target_lag") - - -def query_warehouse(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("warehouse") - - -def query_refresh_mode(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("refresh_mode") - - -def describe_dynamic_table(project, dynamic_table: SnowflakeRelation) -> agate.Row: - with get_connection(project.adapter): - macro_results = project.adapter.execute_macro( - "snowflake__describe_dynamic_table", kwargs={"relation": dynamic_table} - ) - config = macro_results["dynamic_table"] - return config.rows[0] diff --git a/tests/functional/relation_tests/__init__.py b/tests/functional/relation_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/functional/relation_tests/dynamic_table_tests/__init__.py b/tests/functional/relation_tests/dynamic_table_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/functional/relation_tests/dynamic_table_tests/models.py b/tests/functional/relation_tests/dynamic_table_tests/models.py new file mode 100644 index 000000000..5e46bed53 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/models.py @@ -0,0 +1,50 @@ +SEED = """ +id,value +1,100 +2,200 +3,300 +""".strip() + + +DYNAMIC_TABLE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_DOWNSTREAM = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='DOWNSTREAM', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_ALTER = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='5 minutes', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_REPLACE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='FULL', +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_basic.py b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py new file mode 100644 index 000000000..2406e1c14 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py @@ -0,0 +1,30 @@ +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.relation_tests.dynamic_table_tests import models +from tests.functional.utils import query_relation_type + + +class TestBasic: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_dynamic_table.sql": models.DYNAMIC_TABLE, + "my_dynamic_table_downstream.sql": models.DYNAMIC_TABLE_DOWNSTREAM, + } + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + + def test_dynamic_table_full_refresh(self, project): + run_dbt(["run", "--full-refresh"]) + assert query_relation_type(project, "my_dynamic_table") == "dynamic_table" + assert query_relation_type(project, "my_dynamic_table_downstream") == "dynamic_table" diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py new file mode 100644 index 000000000..3c4f65a87 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py @@ -0,0 +1,103 @@ +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.relation_tests.dynamic_table_tests import models +from tests.functional.utils import describe_dynamic_table, update_model + + +class Changes: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "dynamic_table_alter.sql": models.DYNAMIC_TABLE, + "dynamic_table_replace.sql": models.DYNAMIC_TABLE, + } + + @pytest.fixture(scope="function", autouse=True) + def setup_class(self, project): + run_dbt(["seed"]) + yield + project.run_sql(f"drop schema if exists {project.test_schema} cascade") + + @pytest.fixture(scope="function", autouse=True) + def setup_method(self, project, setup_class): + # make sure the model in the data reflects the files each time + run_dbt(["run", "--full-refresh"]) + self.assert_changes_are_not_applied(project) + + update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER) + update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE) + + yield + + update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE) + update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE) + + @staticmethod + def assert_changes_are_applied(project): + altered = describe_dynamic_table(project, "dynamic_table_alter") + assert altered.snowflake_warehouse == "DBT_TESTING" + assert altered.target_lag == "5 minutes" # this updated + assert altered.refresh_mode == "INCREMENTAL" + + replaced = describe_dynamic_table(project, "dynamic_table_replace") + assert replaced.snowflake_warehouse == "DBT_TESTING" + assert replaced.target_lag == "2 minutes" + assert replaced.refresh_mode == "FULL" # this updated + + @staticmethod + def assert_changes_are_not_applied(project): + altered = describe_dynamic_table(project, "dynamic_table_alter") + assert altered.snowflake_warehouse == "DBT_TESTING" + assert altered.target_lag == "2 minutes" # this would have updated, but didn't + assert altered.refresh_mode == "INCREMENTAL" + + replaced = describe_dynamic_table(project, "dynamic_table_replace") + assert replaced.snowflake_warehouse == "DBT_TESTING" + assert replaced.target_lag == "2 minutes" + assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't + + def test_full_refresh_is_always_successful(self, project): + # this always passes and always changes the configuration, regardless of on_configuration_change + # and regardless of whether the changes require a replace versus an alter + run_dbt(["run", "--full-refresh"]) + self.assert_changes_are_applied(project) + + +class TestChangesApply(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_changes_are_applied(self, project): + # this passes and changes the configuration + run_dbt(["run"]) + self.assert_changes_are_applied(project) + + +class TestChangesContinue(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "continue"}} + + def test_changes_are_not_applied(self, project): + # this passes but does not change the configuration + run_dbt(["run"]) + self.assert_changes_are_not_applied(project) + + +class TestChangesFail(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "fail"}} + + def test_changes_are_not_applied(self, project): + # this fails and does not change the configuration + run_dbt(["run"], expect_pass=False) + self.assert_changes_are_not_applied(project) diff --git a/tests/functional/adapter/dynamic_table_tests/files.py b/tests/functional/relation_tests/models.py similarity index 78% rename from tests/functional/adapter/dynamic_table_tests/files.py rename to tests/functional/relation_tests/models.py index ef8d2bf1f..6fe066313 100644 --- a/tests/functional/adapter/dynamic_table_tests/files.py +++ b/tests/functional/relation_tests/models.py @@ -1,4 +1,4 @@ -MY_SEED = """ +SEED = """ id,value 1,100 2,200 @@ -6,7 +6,7 @@ """.strip() -MY_TABLE = """ +TABLE = """ {{ config( materialized='table', ) }} @@ -14,7 +14,7 @@ """ -MY_VIEW = """ +VIEW = """ {{ config( materialized='view', ) }} @@ -22,11 +22,11 @@ """ -MY_DYNAMIC_TABLE = """ +DYNAMIC_TABLE = """ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='2 minutes', + target_lag='1 minute', refresh_mode='INCREMENTAL', ) }} select * from {{ ref('my_seed') }} diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py new file mode 100644 index 000000000..1246b0791 --- /dev/null +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass +from itertools import product + +from dbt.tests.util import run_dbt +import pytest + +from tests.functional.relation_tests import models +from tests.functional.utils import query_relation_type, update_model + + +@dataclass +class Model: + model: str + relation_type: str + + @property + def name(self) -> str: + return f"{self.relation_type}" + + +@dataclass +class Scenario: + initial: Model + final: Model + + @property + def name(self) -> str: + return f"REPLACE_{self.initial.name}__WITH_{self.final.name}" + + @property + def error_message(self) -> str: + return f"Failed when migrating from: {self.initial.name} to: {self.final.name}" + + +relations = [ + Model(models.VIEW, "view"), + Model(models.TABLE, "table"), + Model(models.DYNAMIC_TABLE, "dynamic_table"), +] +scenarios = [Scenario(*scenario) for scenario in product(relations, relations)] + + +class TestRelationTypeChange: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {f"{scenario.name}.sql": scenario.initial.model for scenario in scenarios} + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + for scenario in scenarios: + update_model(project, scenario.name, scenario.final.model) + run_dbt(["run"]) + + @pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios]) + def test_replace(self, project, scenario): + relation_type = query_relation_type(project, scenario.name) + assert relation_type == scenario.final.relation_type, scenario.error_message diff --git a/tests/functional/utils.py b/tests/functional/utils.py new file mode 100644 index 000000000..d185e8d2b --- /dev/null +++ b/tests/functional/utils.py @@ -0,0 +1,78 @@ +from typing import Any, Dict, Optional + +from dbt.tests.util import ( + get_connection, + get_model_file, + relation_from_name, + set_model_file, +) + +from dbt.adapters.snowflake.relation_configs import SnowflakeDynamicTableConfig + + +def query_relation_type(project, name: str) -> Optional[str]: + relation = relation_from_name(project.adapter, name) + sql = f""" + select + case table_type + when 'BASE TABLE' then iff(is_dynamic = 'YES', 'dynamic_table', 'table') + when 'VIEW' then 'view' + when 'EXTERNAL TABLE' then 'external_table' + end as relation_type + from information_schema.tables + where table_name like '{relation.identifier.upper()}' + and table_schema like '{relation.schema.upper()}' + and table_catalog like '{relation.database.upper()}' + """ + results = project.run_sql(sql, fetch="all") + + assert len(results) > 0, f"Relation {relation} not found" + assert len(results) == 1, f"Multiple relations found" + + return results[0][0].lower() + + +def query_row_count(project, name: str) -> int: + relation = relation_from_name(project.adapter, name) + sql = f"select count(*) from {relation}" + return project.run_sql(sql, fetch="one")[0] + + +def insert_record(project, name: str, record: Dict[str, Any]): + relation = relation_from_name(project.adapter, name) + column_names = ", ".join(record.keys()) + values = ", ".join( + [f"'{value}'" if isinstance(value, str) else f"{value}" for value in record.values()] + ) + sql = f"insert into {relation} ({column_names}) values ({values})" + project.run_sql(sql) + + +def update_model(project, name: str, model: str) -> str: + relation = relation_from_name(project.adapter, name) + original_model = get_model_file(project, relation) + set_model_file(project, relation, model) + return original_model + + +def describe_dynamic_table(project, name: str) -> Optional[SnowflakeDynamicTableConfig]: + macro = "snowflake__describe_dynamic_table" + dynamic_table = relation_from_name(project.adapter, name) + kwargs = {"relation": dynamic_table} + with get_connection(project.adapter): + results = project.adapter.execute_macro(macro, kwargs=kwargs) + + assert len(results["dynamic_table"].rows) > 0, f"Dynamic table {dynamic_table} not found" + found = len(results["dynamic_table"].rows) + names = ", ".join([table.get("name") for table in results["dynamic_table"].rows]) + assert found == 1, f"Multiple dynamic tables found: {names}" + + return SnowflakeDynamicTableConfig.from_relation_results(results) + + +def refresh_dynamic_table(project, name: str) -> None: + macro = "snowflake__refresh_dynamic_table" + dynamic_table = relation_from_name(project.adapter, name) + kwargs = {"relation": dynamic_table} + with get_connection(project.adapter): + project.adapter.execute_macro(macro, kwargs=kwargs)