Skip to content

Commit

Permalink
Microbatch Strategy (#1108)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Sep 25, 2024
1 parent 8c6fcb4 commit 101aad2
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240925-125242.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add Microbatch Strategy to dbt-spark
time: 2024-09-25T12:52:42.872017+01:00
custom:
Author: michelleark
Issue: "1109"
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ python dagger/run_dbt_spark_tests.py --profile databricks_sql_endpoint --test-pa
_options_:
- "apache_spark"
- "spark_session"
- "spark_http_odbc"
- "databricks_sql_endpoint"
- "databricks_cluster"
- "databricks_http_cluster"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{%- endif -%}

{#-- Set Overwrite Mode --#}
{%- if strategy == 'insert_overwrite' and partition_by -%}
{%- if strategy in ['insert_overwrite', 'microbatch'] and partition_by -%}
{%- call statement() -%}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{%- endcall -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target, existing) }}
{%- elif strategy == 'microbatch' -%}
{#-- microbatch wraps insert_overwrite, and requires a partition_by config #}
{% set missing_partition_key_microbatch_msg -%}
dbt-spark 'microbatch' incremental strategy requires a `partition_by` config.
Ensure you are using a `partition_by` column that is of grain {{ config.get('batch_size') }}.
{%- endset %}

{%- if not config.get('partition_by') -%}
{{ exceptions.raise_compiler_error(missing_partition_key_microbatch_msg) }}
{%- endif -%}
{{ get_insert_overwrite_sql(source, target, existing) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'merge', 'insert_overwrite'
Expected one of: 'append', 'merge', 'insert_overwrite', 'microbatch'
{%- endset %}

{% set invalid_merge_msg -%}
Expand All @@ -35,13 +35,13 @@
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'microbatch'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and target.endpoint %}
{% if raw_strategy in ['insert_overwrite', 'microbatch'] and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}
Expand Down
21 changes: 21 additions & 0 deletions tests/functional/adapter/incremental_strategies/test_microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pytest

from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)

# No requirement for a unique_id for spark microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), partition_by=['date_day'], file_format='parquet') }}
select *, cast(event_time as date) as date_day
from {{ ref('input_model') }}
"""


@pytest.mark.skip_profile(
"databricks_http_cluster", "databricks_sql_endpoint", "spark_session", "spark_http_odbc"
)
class TestMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql

0 comments on commit 101aad2

Please sign in to comment.