Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.5.latest] Back compat for previous return type of collect_freshness #7548

Merged
merged 1 commit into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230506-173315.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Back-compat for previous return type of 'collect_freshness' macro
time: 2023-05-06T17:33:15.104953+02:00
custom:
Author: jtcohen6
Issue: "7489"
17 changes: 14 additions & 3 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.base import Credentials
from dbt.adapters.cache import RelationsCache, _make_ref_key_dict

from dbt import deprecations

GET_CATALOG_MACRO_NAME = "get_catalog"
FRESHNESS_MACRO_NAME = "collect_freshness"
Expand Down Expand Up @@ -1104,7 +1104,7 @@ def calculate_freshness(
loaded_at_field: str,
filter: Optional[str],
manifest: Optional[Manifest] = None,
) -> Tuple[AdapterResponse, Dict[str, Any]]:
) -> Tuple[Optional[AdapterResponse], Dict[str, Any]]:
"""Calculate the freshness of sources in dbt, and return it"""
kwargs: Dict[str, Any] = {
"source": source,
Expand All @@ -1113,8 +1113,19 @@ def calculate_freshness(
}

# run the macro
# in older versions of dbt-core, the 'collect_freshness' macro returned the table of results directly
# starting in v1.5, by default, we return both the table and the adapter response (metadata about the query)
result: Union[
AttrDict, # current: contains AdapterResponse + agate.Table
agate.Table, # previous: just table
]
result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
if isinstance(result, agate.Table):
deprecations.warn("collect-freshness-return-signature")
adapter_response = None
table = result
else:
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
Expand Down
6 changes: 6 additions & 0 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ class ConfigTargetPathDeprecation(DBTDeprecation):
_event = "ConfigTargetPathDeprecation"


class CollectFreshnessReturnSignature(DBTDeprecation):
_name = "collect-freshness-return-signature"
_event = "CollectFreshnessReturnSignature"


def renamed_env_var(old_name: str, new_name: str):
class EnvironmentVariableRenamed(DBTDeprecation):
_name = f"environment-variable-renamed:{old_name}"
Expand Down Expand Up @@ -128,6 +133,7 @@ def warn(name, *args, **kwargs):
ExposureNameDeprecation(),
ConfigLogPathDeprecation(),
ConfigTargetPathDeprecation(),
CollectFreshnessReturnSignature(),
]

deprecations: Dict[str, DBTDeprecation] = {d.name: d for d in deprecations_list}
Expand Down
9 changes: 9 additions & 0 deletions core/dbt/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,15 @@ message ConfigTargetPathDeprecationMsg {
ConfigTargetPathDeprecation data = 2;
}

// D012
message CollectFreshnessReturnSignature {
}

message CollectFreshnessReturnSignatureMsg {
EventInfo info = 1;
CollectFreshnessReturnSignature data = 2;
}

// E - DB Adapter

// E001
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,19 @@ def message(self):
return line_wrap_message(warning_tag(f"Deprecated functionality\n\n{description}"))


class CollectFreshnessReturnSignature(WarnLevel):
def code(self):
return "D012"

def message(self):
description = (
"The 'collect_freshness' macro signature has changed to return the full "
"query result, rather than just a table of values. See the v1.5 migration guide "
"for details on how to update your custom macro: https://docs.getdbt.com/guides/migration/versions/upgrading-to-v1.5"
)
return line_wrap_message(warning_tag(f"Deprecated functionality\n\n{description}"))


# =======================================================
# E - DB Adapter
# =======================================================
Expand Down
1,422 changes: 713 additions & 709 deletions core/dbt/events/types_pb2.py

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,19 @@ def execute(self, compiled_node, manifest):

status = compiled_node.freshness.status(freshness["age"])

# adapter_response was not returned in previous versions, so this will be None
# we cannot call to_dict() on NoneType
if adapter_response:
adapter_response = adapter_response.to_dict(omit_none=True)

return SourceFreshnessResult(
node=compiled_node,
status=status,
thread_id=threading.current_thread().name,
timing=[],
execution_time=0,
message=None,
adapter_response=adapter_response.to_dict(omit_none=True),
adapter_response=adapter_response or {},
failures=None,
**freshness,
)
Expand Down
16 changes: 16 additions & 0 deletions tests/functional/sources/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,3 +439,19 @@
enabled: True and False
- name: other_test_table
"""


collect_freshness_macro_override_previous_return_signature = """
{% macro collect_freshness(source, loaded_at_field, filter) %}
{% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%}
select
max({{ loaded_at_field }}) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
from {{ source }}
{% if filter %}
where {{ filter }}
{% endif %}
{% endcall %}
{{ return(load_result('collect_freshness').table) }}
{% endmacro %}
"""
22 changes: 22 additions & 0 deletions tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
error_models_model_sql,
filtered_models_schema_yml,
override_freshness_models_schema_yml,
collect_freshness_macro_override_previous_return_signature,
)
from dbt.tests.util import AnyStringWith, AnyFloat
from dbt import deprecations


class SuccessfulSourceFreshnessTest(BaseSourcesTest):
Expand Down Expand Up @@ -353,3 +355,23 @@ def test_override_source_freshness(self, project):
"filter": None,
}
assert result_source_d["criteria"] == expected


class TestSourceFreshnessMacroOverride(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def macros(self):
return {
"collect_freshness.sql": collect_freshness_macro_override_previous_return_signature
}

def test_source_freshness(self, project):
# ensure that the deprecation warning is raised
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
self.run_dbt_with_vars(
project,
["source", "freshness"],
expect_pass=False,
)
expected = {"collect-freshness-return-signature"}
assert expected == deprecations.active_deprecations
1 change: 1 addition & 0 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def test_event_codes(self):
types.EnvironmentVariableRenamed(old_name="", new_name=""),
types.ConfigLogPathDeprecation(deprecated_path=""),
types.ConfigTargetPathDeprecation(deprecated_path=""),
types.CollectFreshnessReturnSignature(),
# E - DB Adapter ======================
types.AdapterEventDebug(),
types.AdapterEventInfo(),
Expand Down