From 42a58bba9b4565c0f360dbc9e9a6abaa880a6623 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 8 Jan 2025 12:32:28 -0600 Subject: [PATCH] Always emit warning when microbatch models lack any filtered input node (#11196) (#11199) * Update `TestMicrobatchWithInputWithoutEventTime` to check running again raises warning The first time the project is run, the appropriate warning about inputs is raised. However, the warning is only being raised when a full parse happens. When partial parsing happens the warning isn't getting raised. In the next commit we'll fix this issue. This commit updates the test to show that the second run (with partial parsing) doesn't raise the update, and thus the test fails. * Update manifest loading to _always_ check microbatch model inputs Of note we are at the point where multiple validations are iterating all of the nodes in a manifest. We should refactor these _soon_ such that we are not iterating over the same list multiple times. * Add changie doc (cherry picked from commit 2eb1a5c3eaf85233bdafdb68495a4c520c9332a7) Co-authored-by: Quigley Malcolm --- .../unreleased/Fixes-20250107-173719.yaml | 6 +++ core/dbt/parser/manifest.py | 38 +++++++++++-------- .../functional/microbatch/test_microbatch.py | 4 +- 3 files changed, 32 insertions(+), 16 deletions(-) create mode 100644 .changes/unreleased/Fixes-20250107-173719.yaml diff --git a/.changes/unreleased/Fixes-20250107-173719.yaml b/.changes/unreleased/Fixes-20250107-173719.yaml new file mode 100644 index 00000000000..2d2310f1bac --- /dev/null +++ b/.changes/unreleased/Fixes-20250107-173719.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Ensure warning about microbatch lacking filter inputs is always fired +time: 2025-01-07T17:37:19.373261-06:00 +custom: + Author: QMalcolm + Issue: "11159" diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index ba42c6637d3..023c5db9300 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -513,6 +513,7 @@ def load(self) -> Manifest: self.check_for_spaces_in_resource_names() self.check_for_microbatch_deprecations() self.check_forcing_batch_concurrency() + self.check_microbatch_model_has_a_filtered_input() return self.manifest @@ -1472,21 +1473,6 @@ def check_valid_microbatch_config(self): f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})." ) - # Validate upstream node event_time (if configured) - has_input_with_event_time_config = False - for input_unique_id in node.depends_on.nodes: - input_node = self.manifest.expect(unique_id=input_unique_id) - input_event_time = input_node.config.event_time - if input_event_time: - if not isinstance(input_event_time, str): - raise dbt.exceptions.ParsingError( - f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}." - ) - has_input_with_event_time_config = True - - if not has_input_with_event_time_config: - fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name)) - def check_forcing_batch_concurrency(self) -> None: if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name): adapter = get_adapter(self.root_project) @@ -1508,6 +1494,28 @@ def check_forcing_batch_concurrency(self) -> None: ) ) + def check_microbatch_model_has_a_filtered_input(self): + if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name): + for node in self.manifest.nodes.values(): + if ( + node.config.materialized == "incremental" + and node.config.incremental_strategy == "microbatch" + ): + # Validate upstream node event_time (if configured) + has_input_with_event_time_config = False + for input_unique_id in node.depends_on.nodes: + input_node = self.manifest.expect(unique_id=input_unique_id) + input_event_time = input_node.config.event_time + if input_event_time: + if not isinstance(input_event_time, str): + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}." + ) + has_input_with_event_time_config = True + + if not has_input_with_event_time_config: + fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name)) + def write_perf_info(self, target_path: str): path = os.path.join(target_path, PERF_INFO_FILE_NAME) write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4)) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 953b372b226..ef747138fbd 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -464,9 +464,11 @@ def test_run_with_event_time(self, project): assert len(catcher.caught_events) == 1 # our partition grain is "day" so running the same day without new data should produce the same results + catcher.caught_events = [] with patch_microbatch_end_time("2020-01-03 14:57:00"): - run_dbt(["run"]) + run_dbt(["run"], callbacks=[catcher.catch]) self.assert_row_count(project, "microbatch_model", 3) + assert len(catcher.caught_events) == 1 # add next two days of data test_schema_relation = project.adapter.Relation.create(