From fe66aba2a522b3ffdf1639cd3a50cb11384d4859 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 31 Dec 2023 17:37:19 +0200 Subject: [PATCH 1/5] added hook support for dbt source freshness --- .changes/unreleased/Features-20231231-171205.yaml | 6 ++++++ core/dbt/task/freshness.py | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Features-20231231-171205.yaml diff --git a/.changes/unreleased/Features-20231231-171205.yaml b/.changes/unreleased/Features-20231231-171205.yaml new file mode 100644 index 00000000000..08f5ebe5aad --- /dev/null +++ b/.changes/unreleased/Features-20231231-171205.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Added hook support for `dbt source freshness` +time: 2023-12-31T17:12:05.587185+02:00 +custom: + Author: ofek1weiss + Issue: "5609" diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 8ab704ff996..f0cbcbf7122 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -7,7 +7,7 @@ from .printer import ( print_run_result_error, ) -from .runnable import GraphRunnableTask +from .run import RunTask from dbt.contracts.results import ( FreshnessResult, @@ -170,7 +170,7 @@ def node_is_match(self, node): return node.has_freshness -class FreshnessTask(GraphRunnableTask): +class FreshnessTask(RunTask): def defer_to_manifest(self, adapter, selected_uids): # freshness don't defer return From 73ef35f306a8ee4138cf6365fec747c11f18a699 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Wed, 3 Jan 2024 13:05:38 +0200 Subject: [PATCH 2/5] Handle errors in on-run-end hooks --- core/dbt/contracts/results.py | 12 +++++++----- core/dbt/task/freshness.py | 7 ++++++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 173cd396c90..0d8832a80b9 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -213,7 +213,6 @@ class RunResultOutput(BaseResult): def process_run_result(result: RunResult) -> RunResultOutput: - compiled = isinstance(result.node, CompiledNode) return RunResultOutput( @@ -282,7 +281,8 @@ def compatible_previous_versions(cls) -> Iterable[Tuple[str, int]]: @classmethod def upgrade_schema_version(cls, data): """This overrides the "upgrade_schema_version" call in VersionedSchema (via - ArtifactMixin) to modify the dictionary passed in from earlier versions of the run_results.""" + ArtifactMixin) to modify the dictionary passed in from earlier versions of the run_results. + """ run_results_schema_version = get_artifact_schema_version(data) # If less than the current version (v5), preprocess contents to match latest schema version if run_results_schema_version <= 5: @@ -398,12 +398,12 @@ class FreshnessMetadata(BaseArtifactMetadata): @dataclass class FreshnessResult(ExecutionResult): metadata: FreshnessMetadata - results: Sequence[FreshnessNodeResult] + results: Sequence[Union[FreshnessNodeResult, BaseResult]] @classmethod def from_node_results( cls, - results: List[FreshnessNodeResult], + results: List[Union[FreshnessNodeResult, BaseResult]], elapsed_time: float, generated_at: datetime, ): @@ -426,7 +426,9 @@ class FreshnessExecutionResultArtifact( @classmethod def from_result(cls, base: FreshnessResult): - processed = [process_freshness_result(r) for r in base.results] + processed = [ + process_freshness_result(r) for r in base.results if isinstance(r, FreshnessNodeResult) + ] return cls( metadata=base.metadata, results=processed, diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index f0cbcbf7122..642db4d6fc8 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -23,6 +23,7 @@ LogStartLine, LogFreshnessResult, ) +from dbt.contracts.results import RunStatus from dbt.node_types import NodeType from dbt.adapters.capability import Capability @@ -204,7 +205,11 @@ def get_result(self, results, elapsed_time, generated_at): def task_end_messages(self, results): for result in results: - if result.status in (FreshnessStatus.Error, FreshnessStatus.RuntimeErr): + if result.status in ( + FreshnessStatus.Error, + FreshnessStatus.RuntimeErr, + RunStatus.Error, + ): print_run_result_error(result) fire_event(FreshnessCheckComplete()) From 0fd1381d7fc31e83da06969231c409ed08e34473 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 12 Jan 2024 10:58:13 -0500 Subject: [PATCH 3/5] add flags.source_freshness_run_project_hooks and tests --- core/dbt/cli/flags.py | 9 +- core/dbt/contracts/project.py | 5 ++ core/dbt/task/freshness.py | 12 ++- .../functional/sources/common_source_setup.py | 11 ++- .../sources/test_source_freshness.py | 82 +++++++++++++++++++ 5 files changed, 113 insertions(+), 6 deletions(-) diff --git a/core/dbt/cli/flags.py b/core/dbt/cli/flags.py index ffc73323df8..8b949ddbb76 100644 --- a/core/dbt/cli/flags.py +++ b/core/dbt/cli/flags.py @@ -217,8 +217,8 @@ def _assign_params( # Add entire invocation command to flags object.__setattr__(self, "INVOCATION_COMMAND", "dbt " + " ".join(sys.argv[1:])) - # Overwrite default assignments with user config if available. if project_flags: + # Overwrite default assignments with project flags if available. param_assigned_from_default_copy = params_assigned_from_default.copy() for param_assigned_from_default in params_assigned_from_default: project_flags_param_value = getattr( @@ -233,6 +233,13 @@ def _assign_params( param_assigned_from_default_copy.remove(param_assigned_from_default) params_assigned_from_default = param_assigned_from_default_copy + # Add project-level flags that are not available as CLI options / env vars + for ( + project_level_flag_name, + project_level_flag_value, + ) in project_flags.project_level_flags.items(): + object.__setattr__(self, project_level_flag_name.upper(), project_level_flag_value) + # Set hard coded flags. object.__setattr__(self, "WHICH", invoked_subcommand_name or ctx.info_name) diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index 658a73da406..810edf44dd2 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -296,6 +296,7 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable): populate_cache: Optional[bool] = None printer_width: Optional[int] = None send_anonymous_usage_stats: bool = DEFAULT_SEND_ANONYMOUS_USAGE_STATS + source_freshness_run_project_hooks: bool = False static_parser: Optional[bool] = None use_colors: Optional[bool] = None use_colors_file: Optional[bool] = None @@ -305,6 +306,10 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable): warn_error_options: Optional[Dict[str, Union[str, List[str]]]] = None write_json: Optional[bool] = None + @property + def project_level_flags(self) -> Dict[str, Any]: + return {"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks} + @dataclass class ProfileConfig(dbtClassMixin, Replaceable): diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 642db4d6fc8..035fd5b069e 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -1,7 +1,7 @@ import os import threading import time -from typing import Optional +from typing import Optional, List from .base import BaseRunner from .printer import ( @@ -24,11 +24,11 @@ LogFreshnessResult, ) from dbt.contracts.results import RunStatus -from dbt.node_types import NodeType +from dbt.node_types import NodeType, RunHookType from dbt.adapters.capability import Capability from dbt.adapters.contracts.connection import AdapterResponse -from dbt.contracts.graph.nodes import SourceDefinition +from dbt.contracts.graph.nodes import SourceDefinition, HookNode from dbt.common.events.base_types import EventLevel from dbt.graph import ResourceTypeSelector @@ -213,3 +213,9 @@ def task_end_messages(self, results): print_run_result_error(result) fire_event(FreshnessCheckComplete()) + + def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: + if self.args.source_freshness_run_project_hooks: + return super().get_hooks_by_type(hook_type) + else: + return [] diff --git a/tests/functional/sources/common_source_setup.py b/tests/functional/sources/common_source_setup.py index ed68dedf5d4..c1e7f0a2568 100644 --- a/tests/functional/sources/common_source_setup.py +++ b/tests/functional/sources/common_source_setup.py @@ -2,7 +2,7 @@ import pytest import yaml -from dbt.tests.util import run_dbt +from dbt.tests.util import run_dbt, run_dbt_and_capture from tests.functional.sources.fixtures import ( models_schema_yml, models_view_model_sql, @@ -57,10 +57,17 @@ def project_config_update(self): }, } - def run_dbt_with_vars(self, project, cmd, *args, **kwargs): + def _extend_cmd_with_vars(self, project, cmd): vars_dict = { "test_run_schema": project.test_schema, "test_loaded_at": project.adapter.quote("updated_at"), } cmd.extend(["--vars", yaml.safe_dump(vars_dict)]) + + def run_dbt_with_vars(self, project, cmd, *args, **kwargs): + self._extend_cmd_with_vars(project, cmd) return run_dbt(cmd, *args, **kwargs) + + def run_dbt_and_capture_with_vars(self, project, cmd, *args, **kwargs): + self._extend_cmd_with_vars(project, cmd) + return run_dbt_and_capture(cmd, *args, **kwargs) diff --git a/tests/functional/sources/test_source_freshness.py b/tests/functional/sources/test_source_freshness.py index 222dfa58d0f..e204bf76142 100644 --- a/tests/functional/sources/test_source_freshness.py +++ b/tests/functional/sources/test_source_freshness.py @@ -400,3 +400,85 @@ def warning_probe(e): runner.invoke(["parse"]) assert got_warning + + +class TestHooksInSourceFreshness(SuccessfulSourceFreshnessTest): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "config-version": 2, + "on-run-start": ["{{ log('on-run-start hooks called') }}"], + "on-run-end": ["{{ log('on-run-end hooks called') }}"], + "flags": { + "source_freshness_run_project_hooks": True, + }, + } + + def test_hooks_do_run_for_source_freshness( + self, + project, + ): + _, log_output = self.run_dbt_and_capture_with_vars( + project, + [ + "source", + "freshness", + ], + expect_pass=False, + ) + assert "on-run-start" in log_output + assert "on-run-end" in log_output + + +class TestHooksInSourceFreshnessDisabled(SuccessfulSourceFreshnessTest): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "config-version": 2, + "on-run-start": ["{{ log('on-run-start hooks called') }}"], + "on-run-end": ["{{ log('on-run-end hooks called') }}"], + "flags": { + "source_freshness_run_project_hooks": False, + }, + } + + def test_hooks_do_run_for_source_freshness( + self, + project, + ): + _, log_output = self.run_dbt_and_capture_with_vars( + project, + [ + "source", + "freshness", + ], + expect_pass=False, + ) + assert "on-run-start" not in log_output + assert "on-run-end" not in log_output + + +class TestHooksInSourceFreshnessDefault(SuccessfulSourceFreshnessTest): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "config-version": 2, + "on-run-start": ["{{ log('on-run-start hooks called') }}"], + "on-run-end": ["{{ log('on-run-end hooks called') }}"], + } + + def test_hooks_do_run_for_source_freshness( + self, + project, + ): + _, log_output = self.run_dbt_and_capture_with_vars( + project, + [ + "source", + "freshness", + ], + expect_pass=False, + ) + # default behaviour - no hooks run in source freshness + assert "on-run-start" not in log_output + assert "on-run-end" not in log_output From 870b1d3c691de1e7f9e425c902f549eaf0243d7c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 12 Jan 2024 13:35:26 -0500 Subject: [PATCH 4/5] rename to project_only_flags --- core/dbt/cli/flags.py | 2 +- core/dbt/contracts/project.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/dbt/cli/flags.py b/core/dbt/cli/flags.py index 8b949ddbb76..5b4ccb5a8e4 100644 --- a/core/dbt/cli/flags.py +++ b/core/dbt/cli/flags.py @@ -237,7 +237,7 @@ def _assign_params( for ( project_level_flag_name, project_level_flag_value, - ) in project_flags.project_level_flags.items(): + ) in project_flags.project_only_flags.items(): object.__setattr__(self, project_level_flag_name.upper(), project_level_flag_value) # Set hard coded flags. diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index 810edf44dd2..5f7928e84c0 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -307,7 +307,7 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable): write_json: Optional[bool] = None @property - def project_level_flags(self) -> Dict[str, Any]: + def project_only_flags(self) -> Dict[str, Any]: return {"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks} From 002c3c40889d3e8439ebacd9949cd783f9bc2775 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 23 Jan 2024 14:58:10 -0500 Subject: [PATCH 5/5] add unit test for Flags initialized from ProjectFlags with project_only_flags' --- tests/unit/test_cli_flags.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/unit/test_cli_flags.py b/tests/unit/test_cli_flags.py index 81e1e4d7819..dcc8f800659 100644 --- a/tests/unit/test_cli_flags.py +++ b/tests/unit/test_cli_flags.py @@ -366,6 +366,14 @@ def test_global_flag_at_child_context(self): assert flags_a.USE_COLORS == flags_b.USE_COLORS + def test_set_project_only_flags(self, project_flags, run_context): + flags = Flags(run_context, project_flags) + + for project_only_flag, project_only_flag_value in project_flags.project_only_flags.items(): + assert getattr(flags, project_only_flag) == project_only_flag_value + # sanity check: ensure project_only_flag is not part of the click context + assert project_only_flag not in run_context.params + def _create_flags_from_dict(self, cmd, d): write_file("", "profiles.yml") result = Flags.from_dict(cmd, d)