Skip to content

Commit

Permalink
Switch from environment variable to behavior flag for gating microbat…
Browse files Browse the repository at this point in the history
…ch functionality (#323)
  • Loading branch information
QMalcolm authored Nov 8, 2024
1 parent da2a380 commit 9c08dea
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20241001-165406.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Use a behavior flag to gate microbatch functionality (instead of an environment
variable)
time: 2024-10-01T16:54:06.121016-05:00
custom:
Author: QMalcolm
Issue: "327"
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import os
from pprint import pformat
from unittest import mock

import pytest

Expand Down Expand Up @@ -63,7 +61,6 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)

assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}"

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project, insert_two_rows_sql):
# initial run -- backfills all data
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down
22 changes: 19 additions & 3 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
Union,
TYPE_CHECKING,
)
import os
import pytz
from dbt_common.behavior_flags import Behavior, BehaviorFlag
from dbt_common.clients.jinja import CallableMacroGenerator
Expand Down Expand Up @@ -316,7 +315,13 @@ def _behavior_flags(self) -> List[BehaviorFlag]:
"""
This method should be overwritten by adapter maintainers to provide platform-specific flags
"""
return []
return [
{
"name": "require_batched_execution_for_custom_microbatch_strategy",
"default": False,
"docs_url": "https://docs.getdbt.com/docs/build/incremental-microbatch",
}
]

###
# Methods that pass through to the connection manager
Expand Down Expand Up @@ -1574,13 +1579,24 @@ def valid_incremental_strategies(self):

def builtin_incremental_strategies(self):
builtin_strategies = ["append", "delete+insert", "merge", "insert_overwrite"]
if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"):
if self.behavior.require_batched_execution_for_custom_microbatch_strategy.no_warn:
builtin_strategies.append("microbatch")

return builtin_strategies

@available.parse_none
def get_incremental_strategy_macro(self, model_context, strategy: str):
"""Gets the macro for the given incremental strategy.
Additionally some validations are done:
1. Assert that if the given strategy is a "builtin" strategy, then it must
also be defined as a "valid" strategy for the associated adapter
2. Assert that the incremental strategy exists in the model context
Notably, something be defined by the adapter as "valid" without it being
a "builtin", and nothing will break (and that is desirable).
"""

# Construct macro_name from strategy name
if strategy is None:
strategy = "default"
Expand Down

0 comments on commit 9c08dea

Please sign in to comment.