Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/source freshness hooks #9366

Merged
merged 7 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231231-171205.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Added hook support for `dbt source freshness`
time: 2023-12-31T17:12:05.587185+02:00
custom:
Author: ofek1weiss
Issue: "5609"
9 changes: 8 additions & 1 deletion core/dbt/cli/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ def _assign_params(
# Add entire invocation command to flags
object.__setattr__(self, "INVOCATION_COMMAND", "dbt " + " ".join(sys.argv[1:]))

# Overwrite default assignments with user config if available.
if project_flags:
# Overwrite default assignments with project flags if available.
param_assigned_from_default_copy = params_assigned_from_default.copy()
for param_assigned_from_default in params_assigned_from_default:
project_flags_param_value = getattr(
Expand All @@ -252,6 +252,13 @@ def _assign_params(
param_assigned_from_default_copy.remove(param_assigned_from_default)
params_assigned_from_default = param_assigned_from_default_copy

# Add project-level flags that are not available as CLI options / env vars
for (
project_level_flag_name,
project_level_flag_value,
) in project_flags.project_only_flags.items():
object.__setattr__(self, project_level_flag_name.upper(), project_level_flag_value)

# Set hard coded flags.
object.__setattr__(self, "WHICH", invoked_subcommand_name or ctx.info_name)

Expand Down
5 changes: 5 additions & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable):
populate_cache: Optional[bool] = None
printer_width: Optional[int] = None
send_anonymous_usage_stats: bool = DEFAULT_SEND_ANONYMOUS_USAGE_STATS
source_freshness_run_project_hooks: bool = False
static_parser: Optional[bool] = None
use_colors: Optional[bool] = None
use_colors_file: Optional[bool] = None
Expand All @@ -316,6 +317,10 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable):
warn_error_options: Optional[Dict[str, Union[str, List[str]]]] = None
write_json: Optional[bool] = None

@property
def project_only_flags(self) -> Dict[str, Any]:
return {"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks}


@dataclass
class ProfileConfig(dbtClassMixin, Replaceable):
Expand Down
23 changes: 17 additions & 6 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os
import threading
import time
from typing import Optional
from typing import Optional, List

from .base import BaseRunner
from .printer import (
print_run_result_error,
)
from .runnable import GraphRunnableTask
from .run import RunTask

from dbt.artifacts.freshness import (
FreshnessResult,
Expand All @@ -23,11 +23,12 @@
LogStartLine,
LogFreshnessResult,
)
from dbt.node_types import NodeType
from dbt.contracts.results import RunStatus
from dbt.node_types import NodeType, RunHookType

from dbt.adapters.capability import Capability
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.contracts.graph.nodes import SourceDefinition
from dbt.contracts.graph.nodes import SourceDefinition, HookNode
from dbt_common.events.base_types import EventLevel
from dbt.graph import ResourceTypeSelector

Expand Down Expand Up @@ -170,7 +171,7 @@
return node.has_freshness


class FreshnessTask(GraphRunnableTask):
class FreshnessTask(RunTask):
def result_path(self):
if self.args.output:
return os.path.realpath(self.args.output)
Expand Down Expand Up @@ -200,7 +201,17 @@

def task_end_messages(self, results):
for result in results:
if result.status in (FreshnessStatus.Error, FreshnessStatus.RuntimeErr):
if result.status in (

Check warning on line 204 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L204

Added line #L204 was not covered by tests
FreshnessStatus.Error,
FreshnessStatus.RuntimeErr,
RunStatus.Error,
):
print_run_result_error(result)

fire_event(FreshnessCheckComplete())

def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
if self.args.source_freshness_run_project_hooks:
return super().get_hooks_by_type(hook_type)

Check warning on line 215 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L214-L215

Added lines #L214 - L215 were not covered by tests
else:
return []

Check warning on line 217 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L217

Added line #L217 was not covered by tests
11 changes: 9 additions & 2 deletions tests/functional/sources/common_source_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest
import yaml

from dbt.tests.util import run_dbt
from dbt.tests.util import run_dbt, run_dbt_and_capture
from tests.functional.sources.fixtures import (
models_schema_yml,
models_view_model_sql,
Expand Down Expand Up @@ -57,10 +57,17 @@ def project_config_update(self):
},
}

def run_dbt_with_vars(self, project, cmd, *args, **kwargs):
def _extend_cmd_with_vars(self, project, cmd):
vars_dict = {
"test_run_schema": project.test_schema,
"test_loaded_at": project.adapter.quote("updated_at"),
}
cmd.extend(["--vars", yaml.safe_dump(vars_dict)])

def run_dbt_with_vars(self, project, cmd, *args, **kwargs):
self._extend_cmd_with_vars(project, cmd)
return run_dbt(cmd, *args, **kwargs)

def run_dbt_and_capture_with_vars(self, project, cmd, *args, **kwargs):
self._extend_cmd_with_vars(project, cmd)
return run_dbt_and_capture(cmd, *args, **kwargs)
82 changes: 82 additions & 0 deletions tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,85 @@ def warning_probe(e):
runner.invoke(["parse"])

assert got_warning


class TestHooksInSourceFreshness(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
"flags": {
"source_freshness_run_project_hooks": True,
},
}

def test_hooks_do_run_for_source_freshness(
self,
project,
):
_, log_output = self.run_dbt_and_capture_with_vars(
project,
[
"source",
"freshness",
],
expect_pass=False,
)
assert "on-run-start" in log_output
assert "on-run-end" in log_output


class TestHooksInSourceFreshnessDisabled(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
"flags": {
"source_freshness_run_project_hooks": False,
},
}

def test_hooks_do_run_for_source_freshness(
self,
project,
):
_, log_output = self.run_dbt_and_capture_with_vars(
project,
[
"source",
"freshness",
],
expect_pass=False,
)
assert "on-run-start" not in log_output
assert "on-run-end" not in log_output


class TestHooksInSourceFreshnessDefault(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
}

def test_hooks_do_run_for_source_freshness(
self,
project,
):
_, log_output = self.run_dbt_and_capture_with_vars(
project,
[
"source",
"freshness",
],
expect_pass=False,
)
# default behaviour - no hooks run in source freshness
assert "on-run-start" not in log_output
assert "on-run-end" not in log_output
3 changes: 2 additions & 1 deletion tests/unit/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dbt import tracking
from dbt.contracts.files import SourceFile, FileHash, FilePath
from dbt.contracts.graph.manifest import MacroManifest, ManifestStateCheck
from dbt.contracts.project import ProjectFlags
from dbt.graph import NodeSelector, parse_difference
from dbt.events.logging import setup_event_logger
from dbt.mp_context import get_mp_context
Expand Down Expand Up @@ -130,7 +131,7 @@ def get_config(self, extra_cfg=None):
cfg.update(extra_cfg)

config = config_from_parts_or_dicts(project=cfg, profile=self.profile)
dbt.flags.set_from_args(Namespace(), config)
dbt.flags.set_from_args(Namespace(), ProjectFlags())
setup_event_logger(dbt.flags.get_flags())
object.__setattr__(dbt.flags.get_flags(), "PARTIAL_PARSE", False)
return config
Expand Down