From 8df5c96f3de73a88ae83218f18fa9165045856ec Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 29 Oct 2024 15:31:19 -0500 Subject: [PATCH] Make `--event-time-start` and `--event-time-end` mutually required (#10878) * Stop validating that `--event-time-start` is before "current" time In the next commit we'll be adding a validation that requires that `--event-time-start` and `--event-time-end` are mutually required. That is, whenever one is specified, the other is required. In that world, `--event-time-start` will never need to be compared against the "current" time, because it'll never be run in conjunction with the "current" time. * Validate that `--event-time-start` and `--event-time-end` are mutually present * Add changie doc for validation changes * Alter functional microbatch tests to work with updated `event_time_start/end` reqs We made it such that when `event_time_start` is specified, `event_time_end` must also be specified (and vice versa). This broke numerous tests, in a few different ways: 1. There were tests that used `--event-time-start` without `--event-time-end` butg were using event_time_start essentially as the `begin` time for models being initially built or full refreshed. These tests could simply drop the `--event-time-start` and instead rely on the `begin` value. 2. There was a test that was trying to load a subset of the data _excluding_ some data which would be captured by using `begin`. In this test we added an appropriate `--event-time-end` as the `--event-time-start` was necessary to statisfy what the test was testing 3. There was a test which was trying to ensure that two microbatch models would be given the same "current" time. Because we wanted to ensure the "current" time code path was used, we couldn't add `--event-time-end` to resolve the problem, thus we needed to remove the `--event-time-start` that was being used. However, this led to the test being incredibly slow. This was resolved by switching the relevant microbatch models from having `batch_size`s of `day` to instead have `year`. This solution should be good enough for roughly ~40 years? We'll figure out a better solution then, so see ya in 2064. Assuming I haven't died before my 70th birthday, feel free to ping me to get this taken care of. --------- Co-authored-by: Michelle Ark --- .../unreleased/Fixes-20241017-145357.yaml | 6 +++ core/dbt/cli/flags.py | 26 ++++++---- .../test_option_interaction_validations.py | 27 +++++----- .../functional/microbatch/test_microbatch.py | 50 +++++++++++-------- 4 files changed, 63 insertions(+), 46 deletions(-) create mode 100644 .changes/unreleased/Fixes-20241017-145357.yaml diff --git a/.changes/unreleased/Fixes-20241017-145357.yaml b/.changes/unreleased/Fixes-20241017-145357.yaml new file mode 100644 index 00000000000..8736a1247f7 --- /dev/null +++ b/.changes/unreleased/Fixes-20241017-145357.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Make `--event-time-start` and `--event-time-end` mutually required +time: 2024-10-17T14:53:57.149238-07:00 +custom: + Author: QMalcolm + Issue: "10874" diff --git a/core/dbt/cli/flags.py b/core/dbt/cli/flags.py index 649c24fb3e0..21fd660460f 100644 --- a/core/dbt/cli/flags.py +++ b/core/dbt/cli/flags.py @@ -7,7 +7,6 @@ from pprint import pformat as pf from typing import Any, Callable, Dict, List, Optional, Set, Union -import pytz from click import Context, Parameter, get_current_context from click.core import Command as ClickCommand from click.core import Group, ParameterSource @@ -362,17 +361,26 @@ def _validate_event_time_configs(self) -> None: getattr(self, "EVENT_TIME_END") if hasattr(self, "EVENT_TIME_END") else None ) - if event_time_start is not None and event_time_end is not None: - if event_time_start >= event_time_end: + # only do validations if at least one of `event_time_start` or `event_time_end` are specified + if event_time_start is not None or event_time_end is not None: + + # These `ifs`, combined with the parent `if` make it so that `event_time_start` and + # `event_time_end` are mutually required + if event_time_start is None: raise DbtUsageException( - "Value for `--event-time-start` must be less than `--event-time-end`" + "The flag `--event-time-end` was specified, but `--event-time-start` was not. " + "When specifying `--event-time-end`, `--event-time-start` must also be present." + ) + if event_time_end is None: + raise DbtUsageException( + "The flag `--event-time-start` was specified, but `--event-time-end` was not. " + "When specifying `--event-time-start`, `--event-time-end` must also be present." ) - elif event_time_start is not None: - utc_start = event_time_start.replace(tzinfo=pytz.UTC) - current_time = datetime.now(pytz.UTC) - if utc_start >= current_time: + + # This `if` just is a sanity check that `event_time_start` is before `event_time_end` + if event_time_start >= event_time_end: raise DbtUsageException( - f"Value for `--event-time-start` ({utc_start}) must be less than the current time ({current_time}) if `--event-time-end` is not specififed" + "Value for `--event-time-start` must be less than `--event-time-end`" ) def fire_deprecations(self): diff --git a/tests/functional/cli/test_option_interaction_validations.py b/tests/functional/cli/test_option_interaction_validations.py index 6a57820b026..3cee244aedd 100644 --- a/tests/functional/cli/test_option_interaction_validations.py +++ b/tests/functional/cli/test_option_interaction_validations.py @@ -1,7 +1,4 @@ -from datetime import datetime - import pytest -from freezegun import freeze_time from dbt.tests.util import run_dbt @@ -34,19 +31,19 @@ def test_option_combo(self, project, event_time_start, event_time_end, expect_pa assert not expect_pass -class TestEventTimeStartCurrent_time: +class TestEventTimeEndEventTimeStartMutuallyRequired: @pytest.mark.parametrize( - "event_time_start,current_time,expect_pass", + "specified,missing", [ - ("2024-10-01", "2024-10-02", True), - ("2024-10-02", "2024-10-01", False), + ("--event-time-start", "--event-time-end"), + ("--event-time-end", "--event-time-start"), ], ) - def test_option_combo(self, project, event_time_start, current_time, expect_pass): - with freeze_time(datetime.fromisoformat(current_time)): - try: - run_dbt(["build", "--event-time-start", event_time_start]) - assert expect_pass - except Exception as e: - assert "must be less than the current time" in e.__str__() - assert not expect_pass + def test_option_combo(self, project, specified, missing): + try: + run_dbt(["build", specified, "2024-10-01"]) + assert False, f"An error should have been raised for missing `{missing}` flag" + except Exception as e: + assert ( + f"When specifying `{specified}`, `{missing}` must also be present." in e.__str__() + ) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 3bdacf2ee78..bd13f9cff24 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -1,9 +1,7 @@ import os -from datetime import datetime from unittest import mock import pytest -import pytz from dbt.events.types import LogModelResult, MicrobatchModelNoEventTimeInputs from dbt.tests.util import ( @@ -42,6 +40,16 @@ select * from {{ ref('input_model') }} """ +microbatch_yearly_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select * from {{ ref('input_model') }} +""" + +microbatch_yearly_model_downstream_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select * from {{ ref('microbatch_model') }} +""" + invalid_batch_context_macro_sql = """ {% macro check_invalid_batch_context() %} @@ -232,15 +240,6 @@ def test_run_with_event_time(self, project): # creation events assert batch_creation_events == 3 - # build model >= 2020-01-02 - with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-02", "--full-refresh"]) - self.assert_row_count(project, "microbatch_model", 2) - - # build model < 2020-01-03 - run_dbt(["run", "--event-time-end", "2020-01-03", "--full-refresh"]) - self.assert_row_count(project, "microbatch_model", 2) - # build model between 2020-01-02 >= event_time < 2020-01-03 run_dbt( [ @@ -466,7 +465,7 @@ def models(self): @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time_logs(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): - _, logs = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) + _, logs = run_dbt_and_capture(["run"]) assert "start: 2020-01-01 00:00:00+00:00" in logs assert "end: 2020-01-02 00:00:00+00:00" in logs @@ -500,7 +499,7 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-01"], expect_pass=False) + run_dbt(["run"], expect_pass=False) self.assert_row_count(project, "microbatch_model", 2) run_results = get_artifact(project.project_root, "target", "run_results.json") @@ -525,7 +524,7 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - _, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) + _, console_output = run_dbt_and_capture(["run"]) assert "PARTIAL SUCCESS (2/3)" in console_output assert "Completed with 1 partial success" in console_output @@ -565,7 +564,7 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - _, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) + _, console_output = run_dbt_and_capture(["run"]) assert "PARTIAL SUCCESS (2/3)" in console_output assert "Completed with 1 partial success" in console_output @@ -612,7 +611,7 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-01"]) + run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 2) @@ -621,7 +620,7 @@ class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-01"]) + run_dbt(["run"]) # Compiled paths - compiled model without filter only assert read_file( @@ -704,7 +703,15 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-02"]) + run_dbt( + [ + "run", + "--event-time-start", + "2020-01-02", + "--event-time-end", + "2020-01-03 13:57:00", + ] + ) self.assert_row_count(project, "microbatch_model", 2) # re-running shouldn't change what it's in the data set because there is nothing new @@ -733,14 +740,13 @@ class TestMicrbobatchModelsRunWithSameCurrentTime(BaseMicrobatchTest): def models(self): return { "input_model.sql": input_model_sql, - "microbatch_model.sql": microbatch_model_sql, - "second_microbatch_model.sql": microbatch_model_downstream_sql, + "microbatch_model.sql": microbatch_yearly_model_sql, + "second_microbatch_model.sql": microbatch_yearly_model_downstream_sql, } @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_microbatch(self, project) -> None: - current_time = datetime.now(pytz.UTC) - run_dbt(["run", "--event-time-start", current_time.strftime("%Y-%m-%d")]) + run_dbt(["run"]) run_results = get_artifact(project.project_root, "target", "run_results.json") microbatch_model_last_batch = run_results["results"][1]["batch_results"]["successful"][-1]