Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace environment variable with a project flag to gate microbatch functionality #10799

Merged
merged 28 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a9df50f
first pass: replace os env with project flag
MichelleArk Sep 30, 2024
96c4ccb
Merge branch 'main' into microbatch-project-flags
QMalcolm Oct 1, 2024
704494f
Fix `TestMicrobatchMultipleRetries` to not use `os.env`
QMalcolm Oct 1, 2024
bfecde1
Turn off microbatch project flag for `TestMicrobatchCustomUserStrateg…
QMalcolm Oct 1, 2024
d690dec
Update `BaseMicrobatchTest` to turn on microbatch via project flags
QMalcolm Oct 1, 2024
5b9d7f7
[REVERT BEFORE MERGE] Update dbt-adapters dev req to point to branch …
QMalcolm Oct 1, 2024
1105e42
Add changie doc
QMalcolm Oct 1, 2024
a3f2708
Merge branch 'main' into microbatch-project-flags
QMalcolm Oct 15, 2024
18d1938
Merge branch 'main' into microbatch-project-flags
QMalcolm Nov 1, 2024
4d908ad
Fix functional tests after merging in main
QMalcolm Nov 4, 2024
0349968
Extract most of the logic from `find_materialization_macro_by_name` t…
QMalcolm Nov 5, 2024
a620919
Add function to that determines whether the new microbatch functional…
QMalcolm Nov 5, 2024
e77a13c
Gate microbatch functionality by `use_microbatch_batches` manifest fu…
QMalcolm Nov 5, 2024
543e024
Rename microbatch behavior flag to `require_batched_execution_for_cus…
QMalcolm Nov 5, 2024
db3fba8
Merge branch 'main' into microbatch-project-flags
QMalcolm Nov 6, 2024
c5e314a
Extract logic of `find_macro_by_name` to `find_macro_candiate_by_name`
QMalcolm Nov 7, 2024
35b23a3
Use `find_macro_candidate_by_name` to find the microbatch macro
QMalcolm Nov 7, 2024
7808992
Revert "Extract most of the logic from `find_materialization_macro_by…
QMalcolm Nov 7, 2024
7f88f96
Fix microbatch macro locality check to search for `core` locality ins…
QMalcolm Nov 7, 2024
874d7c6
Merge branch 'main' into microbatch-project-flags
QMalcolm Nov 8, 2024
a24b6f4
Move the evaluation of `use_microbatch_batches` to the last position …
QMalcolm Nov 8, 2024
dff25de
Drop behavior flag setting for BaseMicrobatchTest tests
QMalcolm Nov 8, 2024
3931b69
Rename 'env_var' to 'project_flag' in test_microbatch.py
QMalcolm Nov 8, 2024
945bcff
Update microbatch tests to assert when we are/aren't running with bat…
QMalcolm Nov 8, 2024
63445b5
Update `test_resolve_event_time_filter` to use `use_microbatch_batches`
QMalcolm Nov 8, 2024
26ba807
Fire deprecation warning for custom microbatch macros
QMalcolm Nov 8, 2024
94363e8
Add microbatch deprecation events to test_events.py
QMalcolm Nov 8, 2024
0f6b7fb
Revert "[REVERT BEFORE MERGE] Update dbt-adapters dev req to point to…
QMalcolm Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241001-161422.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Change gating of microbatch feature to be behind project flag / behavior flag
time: 2024-10-01T16:14:22.267253-05:00
custom:
Author: MichelleArk QMalcolm
Issue: "10798"
4 changes: 2 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,12 @@ def resolve_limit(self) -> Optional[int]:
def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
(isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and isinstance(self.model, ModelNode)
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
):
start = self.model.config.get("__dbt_internal_microbatch_event_time_start")
end = self.model.config.get("__dbt_internal_microbatch_event_time_end")
Expand Down
34 changes: 30 additions & 4 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,10 @@ def __init__(self):
self._macros_by_name = {}
self._macros_by_package = {}

def find_macro_by_name(
def find_macro_candidate_by_name(
self, name: str, root_project_name: str, package: Optional[str]
) -> Optional[Macro]:
"""Find a macro in the graph by its name and package name, or None for
) -> Optional[MacroCandidate]:
"""Find a MacroCandidate in the graph by its name and package name, or None for
any package. The root project name is used to determine priority:
- locally defined macros come first
- then imported macros
Expand All @@ -735,7 +735,15 @@ def filter(candidate: MacroCandidate) -> bool:
filter=filter,
)

return candidates.last()
return candidates.last_candidate()

def find_macro_by_name(
self, name: str, root_project_name: str, package: Optional[str]
) -> Optional[Macro]:
macro_candidate = self.find_macro_candidate_by_name(
name=name, root_project_name=root_project_name, package=package
)
return macro_candidate.macro if macro_candidate else None

def find_generate_macro_by_name(
self, component: str, root_project_name: str, imported_package: Optional[str] = None
Expand Down Expand Up @@ -1747,6 +1755,24 @@ def __reduce_ex__(self, protocol):
)
return self.__class__, args

def _microbatch_macro_is_core(self, project_name: str) -> bool:
microbatch_is_core = False
candidate = self.find_macro_candidate_by_name(
name="get_incremental_microbatch_sql", root_project_name=project_name, package=None
)

# We want to check for "Core", because "Core" basically means "builtin"
if candidate is not None and candidate.locality == Locality.Core:
microbatch_is_core = True

return microbatch_is_core

def use_microbatch_batches(self, project_name: str) -> bool:
return (
get_flags().require_batched_execution_for_custom_microbatch_strategy
or self._microbatch_macro_is_core(project_name=project_name)
)


class MacroManifest(MacroMethods):
def __init__(self, macros) -> None:
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ class ProjectFlags(ExtensibleDbtClassMixin):
write_json: Optional[bool] = None

# legacy behaviors - https://github.com/dbt-labs/dbt-core/blob/main/docs/guides/behavior-change-flags.md
require_batched_execution_for_custom_microbatch_strategy: bool = False
require_explicit_package_overrides_for_builtin_materializations: bool = True
require_resource_names_without_spaces: bool = False
source_freshness_run_project_hooks: bool = False
Expand All @@ -350,6 +351,7 @@ class ProjectFlags(ExtensibleDbtClassMixin):
@property
def project_only_flags(self) -> Dict[str, Any]:
return {
"require_batched_execution_for_custom_microbatch_strategy": self.require_batched_execution_for_custom_microbatch_strategy,
"require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations,
"require_resource_names_without_spaces": self.require_resource_names_without_spaces,
"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks,
Expand Down
6 changes: 6 additions & 0 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ class MFCumulativeTypeParamsDeprecation(DBTDeprecation):
_event = "MFCumulativeTypeParamsDeprecation"


class MicrobatchMacroOutsideOfBatchesDeprecation(DBTDeprecation):
_name = "microbatch-macro-outside-of-batches-deprecation"
_event = "MicrobatchMacroOutsideOfBatchesDeprecation"


def renamed_env_var(old_name: str, new_name: str):
class EnvironmentVariableRenamed(DBTDeprecation):
_name = f"environment-variable-renamed:{old_name}"
Expand Down Expand Up @@ -178,6 +183,7 @@ def show_callback():
SourceFreshnessProjectHooksNotRun(),
MFTimespineWithoutYamlConfigurationDeprecation(),
MFCumulativeTypeParamsDeprecation(),
MicrobatchMacroOutsideOfBatchesDeprecation(),
]

deprecations: Dict[str, DBTDeprecation] = {d.name: d for d in deprecations_list}
Expand Down
8 changes: 8 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ message MFCumulativeTypeParamsDeprecationMsg {
MFCumulativeTypeParamsDeprecation data = 2;
}

// D020
message MicrobatchMacroOutsideOfBatchesDeprecation {}

message MicrobatchMacroOutsideOfBatchesDeprecationMsg {
CoreEventInfo info = 1;
MicrobatchMacroOutsideOfBatchesDeprecation data = 2;
}

// I065
message DeprecatedModel {
string model_name = 1;
Expand Down
1,182 changes: 593 additions & 589 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,16 @@ def message(self) -> str:
return line_wrap_message(warning_tag(description))


class MicrobatchMacroOutsideOfBatchesDeprecation(WarnLevel):
def code(self) -> str:
return "D020"

def message(self) -> str:
description = "The use of a custom microbatch macro outside of batched execution is deprecated. To use it with batched execution, set `flags.require_batched_execution_for_custom_microbatch_strategy` to `True` in `dbt_project.yml`. In the future this will be the default behavior."
Copy link
Contributor Author

@MichelleArk MichelleArk Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit, just because this is user-facing:

"In the future this will be the default behavior." -> "In the future batched execution will be the default behaviour."

Copy link
Contributor Author

@MichelleArk MichelleArk Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also mention the name of the custom macro that exists (e.g. get_incremental_microbatch_sql) as part of the message in case the user that's seeing this doesn't know about the custom strategy macro naming offhand.


return line_wrap_message(warning_tag(description))


# =======================================================
# I - Project parsing
# =======================================================
Expand Down
20 changes: 19 additions & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@

self.check_for_model_deprecations()
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()

return self.manifest

Expand Down Expand Up @@ -649,6 +650,23 @@
else: # ERROR level
raise DbtValidationError("Resource names cannot contain spaces")

def check_for_microbatch_deprecations(self) -> None:
if not get_flags().require_batched_execution_for_custom_microbatch_strategy:
has_microbatch_model = False
for _, node in self.manifest.nodes.items():
if (
isinstance(node, ModelNode)
and node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
has_microbatch_model = True
break

Check warning on line 663 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L662-L663

Added lines #L662 - L663 were not covered by tests

if has_microbatch_model and self.manifest._microbatch_macro_is_core(
self.root_project.project_name
):
dbt.deprecations.warn("microbatch-macro-outside-of-batches-deprecation")

Check warning on line 668 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L668

Added line #L668 was not covered by tests

def load_and_parse_macros(self, project_parser_files):
for project in self.all_projects.values():
if project.project_name not in project_parser_files:
Expand Down Expand Up @@ -1390,7 +1408,7 @@
node.config.final_validate()

def check_valid_microbatch_config(self):
if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"):
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
Expand Down
6 changes: 2 additions & 4 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
import os
import threading
import time
from copy import deepcopy
Expand Down Expand Up @@ -482,11 +481,10 @@ def execute(self, model, manifest):
)

hook_ctx = self.adapter.pre_model_hook(context_config)

if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and model.config.materialized == "incremental"
model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
and manifest.use_microbatch_batches(project_name=self.config.project_name)
):
return self._execute_microbatch_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
Expand Down
Loading