Skip to content

Commit

Permalink
[Tech Debt] Fix dynamic table tests (#1085)
Browse files Browse the repository at this point in the history
* use default syntax for target_lag to match expected settings in snowflake
* update test utils to use project.adapter instead of adapter, turn on automated checks
* fix expected values on checks
  • Loading branch information
mikealfare authored Jun 17, 2024
1 parent 6f876fc commit efe6df3
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 63 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240614-170858.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Automate all manual integration tests for Dynamic Tables
time: 2024-06-14T17:08:58.231472-04:00
custom:
Author: mikealfare
Issue: "1084"
2 changes: 1 addition & 1 deletion tests/functional/adapter/dynamic_table_tests/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='120 seconds',
target_lag='2 minutes',
) }}
select * from {{ ref('my_seed') }}
"""
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,35 @@

class SnowflakeDynamicTableChanges:
@staticmethod
def check_start_state(adapter, dynamic_table):
"""
This needs to be done manually for now until we fix the test suite's runner. The test suite's
runner cannot run queries with multiple statements. Snowflake's metadata is all behind `show`
and `describe` calls that require a second call to fetch the results; hence, the results
cannot be fetched.
"""
assert query_target_lag(adapter, dynamic_table) is None == "120 seconds"
assert query_warehouse(adapter, dynamic_table) is None == "DBT_TESTING"
def check_start_state(project, dynamic_table):
assert query_target_lag(project, dynamic_table) == "2 minutes"
assert query_warehouse(project, dynamic_table) == "DBT_TESTING"

@staticmethod
def change_config_via_alter(project, dynamic_table):
initial_model = get_model_file(project, dynamic_table)
new_model = initial_model.replace(
"target_lag='120 seconds'", "target_lag='5 minutes'"
"target_lag='2 minutes'", "target_lag='5 minutes'"
)
set_model_file(project, dynamic_table, new_model)

@staticmethod
def change_config_via_alter_downstream(project, dynamic_table):
initial_model = get_model_file(project, dynamic_table)
new_model = initial_model.replace(
"target_lag='120 seconds'", "target_lag='downstream'"
"target_lag='2 minutes'", "target_lag='DOWNSTREAM'"
)
set_model_file(project, dynamic_table, new_model)

@staticmethod
def check_state_alter_change_is_applied(adapter, dynamic_table):
# see above
assert query_target_lag(adapter, dynamic_table) == "5 minutes"
assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING"
def check_state_alter_change_is_applied(project, dynamic_table):
assert query_target_lag(project, dynamic_table) == "5 minutes"
assert query_warehouse(project, dynamic_table) == "DBT_TESTING"

@staticmethod
def check_state_alter_change_is_applied_downstream(adapter, dynamic_table):
# see above
assert query_target_lag(adapter, dynamic_table) == "downstream"
assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING"
def check_state_alter_change_is_applied_downstream(project, dynamic_table):
assert query_target_lag(project, dynamic_table) == "DOWNSTREAM"
assert query_warehouse(project, dynamic_table) == "DBT_TESTING"

@staticmethod
def change_config_via_replace(project, dynamic_table):
Expand Down Expand Up @@ -103,6 +95,9 @@ def setup(self, project, my_dynamic_table):
# the tests touch these files, store their contents in memory
initial_model = get_model_file(project, my_dynamic_table)

# verify the initial settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

yield

# and then reset them after the test runs
Expand All @@ -112,12 +107,19 @@ def setup(self, project, my_dynamic_table):
project.run_sql(f"drop schema if exists {project.test_schema} cascade")

def test_full_refresh_occurs_with_changes(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
self.change_config_via_replace(project, my_dynamic_table)
_, logs = run_dbt_and_capture(
["--debug", "run", "--models", my_dynamic_table.identifier, "--full-refresh"]
)
assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table"

# verify the updated settings are correct in Snowflake
self.check_state_alter_change_is_applied(project, my_dynamic_table)
self.check_state_replace_change_is_applied(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False
)
Expand All @@ -131,17 +133,16 @@ class TestSnowflakeDynamicTableChangesApply(SnowflakeDynamicTableChanges):
def project_config_update(self):
return {"models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}}

def test_change_is_applied_via_alter(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_applied_via_alter(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_state_alter_change_is_applied(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_state_alter_change_is_applied(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "")
)
Expand All @@ -151,17 +152,16 @@ def test_change_is_applied_via_alter(self, project, adapter, my_dynamic_table):
False,
)

def test_change_is_applied_via_alter_downstream(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_applied_via_alter_downstream(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter_downstream(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_state_alter_change_is_applied_downstream(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_state_alter_change_is_applied_downstream(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "")
)
Expand All @@ -174,16 +174,18 @@ def test_change_is_applied_via_alter_downstream(self, project, adapter, my_dynam
@pytest.mark.skip(
"dbt-snowflake does not currently monitor any changes the trigger a full refresh"
)
def test_change_is_applied_via_replace(self, project, adapter, my_dynamic_table):
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_applied_via_replace(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
self.change_config_via_replace(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_state_alter_change_is_applied(adapter, my_dynamic_table)
# self.check_state_replace_change_is_applied(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_state_alter_change_is_applied(project, my_dynamic_table)
self.check_state_replace_change_is_applied(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Applying REPLACE to: {my_dynamic_table.render().upper()}", logs.replace('"', "")
)
Expand All @@ -194,17 +196,16 @@ class TestSnowflakeDynamicTableChangesContinue(SnowflakeDynamicTableChanges):
def project_config_update(self):
return {"models": {"on_configuration_change": OnConfigurationChangeOption.Continue.value}}

def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_not_applied_via_alter(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_start_state(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Configuration changes were identified and `on_configuration_change` was set"
f" to `continue` for `{my_dynamic_table}`",
Expand All @@ -219,15 +220,17 @@ def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_tabl
False,
)

def test_change_is_not_applied_via_replace(self, project, adapter, my_dynamic_table):
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_not_applied_via_replace(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
self.change_config_via_replace(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_start_state(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Configuration changes were identified and `on_configuration_change` was set"
f" to `continue` for `{my_dynamic_table}`",
Expand All @@ -248,19 +251,18 @@ class TestSnowflakeDynamicTableChangesFailMixin(SnowflakeDynamicTableChanges):
def project_config_update(self):
return {"models": {"on_configuration_change": OnConfigurationChangeOption.Fail.value}}

def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_not_applied_via_alter(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
_, logs = run_dbt_and_capture(
["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False
)

# self.check_start_state(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Configuration changes were identified and `on_configuration_change` was set"
f" to `fail` for `{my_dynamic_table}`",
Expand All @@ -275,17 +277,19 @@ def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_tabl
False,
)

def test_change_is_not_applied_via_replace(self, project, adapter, my_dynamic_table):
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_not_applied_via_replace(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
self.change_config_via_replace(project, my_dynamic_table)
_, logs = run_dbt_and_capture(
["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False
)

# self.check_start_state(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Configuration changes were identified and `on_configuration_change` was set"
f" to `fail` for `{my_dynamic_table}`",
Expand Down
15 changes: 7 additions & 8 deletions tests/functional/adapter/dynamic_table_tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional

import agate
from dbt.adapters.base import BaseAdapter
from dbt.tests.util import get_connection

from dbt.adapters.snowflake.relation import SnowflakeRelation
Expand Down Expand Up @@ -30,19 +29,19 @@ def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]:
return results[0].lower()


def query_target_lag(adapter, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(adapter, dynamic_table)
def query_target_lag(project, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(project, dynamic_table)
return config.get("target_lag")


def query_warehouse(adapter, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(adapter, dynamic_table)
def query_warehouse(project, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(project, dynamic_table)
return config.get("warehouse")


def describe_dynamic_table(adapter: BaseAdapter, dynamic_table: SnowflakeRelation) -> agate.Row:
with get_connection(adapter):
macro_results = adapter.execute_macro(
def describe_dynamic_table(project, dynamic_table: SnowflakeRelation) -> agate.Row:
with get_connection(project.adapter):
macro_results = project.adapter.execute_macro(
"snowflake__describe_dynamic_table", kwargs={"relation": dynamic_table}
)
config = macro_results["dynamic_table"]
Expand Down

0 comments on commit efe6df3

Please sign in to comment.