Skip to content

Commit

Permalink
Allow configuration of snapshot column names (#10608)
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank authored Sep 20, 2024
1 parent 7016cd3 commit db69473
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 55 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240903-154133.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow configuring snapshot column names
time: 2024-09-03T15:41:33.167097-04:00
custom:
Author: gshank
Issue: "10185"
24 changes: 22 additions & 2 deletions core/dbt/artifacts/resources/v1/snapshot.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Dict, List, Literal, Optional, Union

from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.resources.v1.components import CompiledResource, DeferRelation
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt_common.dataclass_schema import ValidationError
from dbt_common.dataclass_schema import ValidationError, dbtClassMixin


@dataclass
class SnapshotMetaColumnNames(dbtClassMixin):
dbt_valid_to: Optional[str] = None
dbt_valid_from: Optional[str] = None
dbt_scd_id: Optional[str] = None
dbt_updated_at: Optional[str] = None


@dataclass
Expand All @@ -17,6 +25,18 @@ class SnapshotConfig(NodeConfig):
updated_at: Optional[str] = None
# Not using Optional because of serialization issues with a Union of str and List[str]
check_cols: Union[str, List[str], None] = None
snapshot_meta_column_names: SnapshotMetaColumnNames = field(
default_factory=SnapshotMetaColumnNames
)

@property
def snapshot_table_column_names(self):
return {
"dbt_valid_from": self.snapshot_meta_column_names.dbt_valid_from or "dbt_valid_from",
"dbt_valid_to": self.snapshot_meta_column_names.dbt_valid_to or "dbt_valid_to",
"dbt_scd_id": self.snapshot_meta_column_names.dbt_scd_id or "dbt_scd_id",
"dbt_updated_at": self.snapshot_meta_column_names.dbt_updated_at or "dbt_updated_at",
}

def final_validate(self):
if not self.strategy or not self.unique_key:
Expand Down
52 changes: 2 additions & 50 deletions core/dbt/context/context_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dbt.contracts.graph.model_config import get_config_for
from dbt.node_types import NodeType
from dbt.utils import fqn_search
from dbt_common.contracts.config.base import BaseConfig, _listify
from dbt_common.contracts.config.base import BaseConfig, merge_config_dicts
from dbt_common.exceptions import DbtInternalError


Expand Down Expand Up @@ -293,55 +293,7 @@ def __init__(

def add_config_call(self, opts: Dict[str, Any]) -> None:
dct = self._config_call_dict
self._add_config_call(dct, opts)

@classmethod
def _add_config_call(cls, config_call_dict, opts: Dict[str, Any]) -> None:
# config_call_dict is already encountered configs, opts is new
# This mirrors code in _merge_field_value in model_config.py which is similar but
# operates on config objects.
for k, v in opts.items():
# MergeBehavior for post-hook and pre-hook is to collect all
# values, instead of overwriting
if k in BaseConfig.mergebehavior["append"]:
if not isinstance(v, list):
v = [v]
if k in config_call_dict: # should always be a list here
config_call_dict[k].extend(v)
else:
config_call_dict[k] = v

elif k in BaseConfig.mergebehavior["update"]:
if not isinstance(v, dict):
raise DbtInternalError(f"expected dict, got {v}")
if k in config_call_dict and isinstance(config_call_dict[k], dict):
config_call_dict[k].update(v)
else:
config_call_dict[k] = v
elif k in BaseConfig.mergebehavior["dict_key_append"]:
if not isinstance(v, dict):
raise DbtInternalError(f"expected dict, got {v}")
if k in config_call_dict: # should always be a dict
for key, value in v.items():
extend = False
# This might start with a +, to indicate we should extend the list
# instead of just clobbering it
if key.startswith("+"):
extend = True
if key in config_call_dict[k] and extend:
# extend the list
config_call_dict[k][key].extend(_listify(value))
else:
# clobber the list
config_call_dict[k][key] = _listify(value)
else:
# This is always a dictionary
config_call_dict[k] = v
# listify everything
for key, value in config_call_dict[k].items():
config_call_dict[k][key] = _listify(value)
else:
config_call_dict[k] = v
merge_config_dicts(dct, opts)

def build_config_dict(
self,
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/parser/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dbt.node_types import ModelLanguage, NodeType
from dbt.parser.base import SimpleSQLParser
from dbt.parser.search import FileBlock
from dbt_common.contracts.config.base import merge_config_dicts
from dbt_common.dataclass_schema import ValidationError
from dbt_common.exceptions.macros import UndefinedMacroError
from dbt_extractor import ExtractionError, py_extract_from_source # type: ignore
Expand Down Expand Up @@ -467,7 +468,7 @@ def _get_config_call_dict(static_parser_result: Dict[str, Any]) -> Dict[str, Any
config_call_dict: Dict[str, Any] = {}

for c in static_parser_result["configs"]:
ContextConfig._add_config_call(config_call_dict, {c[0]: c[1]})
merge_config_dicts(config_call_dict, {c[0]: c[1]})

return config_call_dict

Expand Down
4 changes: 2 additions & 2 deletions core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
"dbt-extractor>=0.5.0,<=0.6",
"dbt-semantic-interfaces>=0.7.1,<0.8",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.6.0,<2.0",
"dbt-adapters>=1.6.0,<2.0",
"dbt-common>=1.9.0,<2.0",
"dbt-adapters>=1.7.0,<2.0",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
"packaging>20.9",
Expand Down
102 changes: 102 additions & 0 deletions schemas/dbt/manifest/v12.json
Original file line number Diff line number Diff line change
Expand Up @@ -6610,6 +6610,57 @@
}
],
"default": null
},
"snapshot_meta_column_names": {
"type": "object",
"title": "SnapshotMetaColumnNames",
"properties": {
"dbt_valid_to": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"dbt_valid_from": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"dbt_scd_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"dbt_updated_at": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false
}
},
"additionalProperties": true
Expand Down Expand Up @@ -16336,6 +16387,57 @@
}
],
"default": null
},
"snapshot_meta_column_names": {
"type": "object",
"title": "SnapshotMetaColumnNames",
"properties": {
"dbt_valid_to": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"dbt_valid_from": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"dbt_scd_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"dbt_updated_at": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false
}
},
"additionalProperties": true
Expand Down
6 changes: 6 additions & 0 deletions tests/functional/artifacts/expected_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ def get_rendered_snapshot_config(**updates):
"post-hook": [],
"column_types": {},
"quoting": {},
"snapshot_meta_column_names": {
"dbt_valid_to": None,
"dbt_valid_from": None,
"dbt_updated_at": None,
"dbt_scd_id": None,
},
"tags": [],
"persist_docs": {},
"full_refresh": None,
Expand Down
6 changes: 6 additions & 0 deletions tests/functional/list/test_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ def expect_snapshot_output(self, happy_path_project): # noqa: F811
"persist_docs": {},
"target_database": happy_path_project.database,
"target_schema": happy_path_project.test_schema,
"snapshot_meta_column_names": {
"dbt_scd_id": None,
"dbt_updated_at": None,
"dbt_valid_from": None,
"dbt_valid_to": None,
},
"unique_key": "id",
"strategy": "timestamp",
"updated_at": "updated_at",
Expand Down
82 changes: 82 additions & 0 deletions tests/functional/snapshots/data/seed_cn.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
create table {database}.{schema}.seed (
id INTEGER,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
ip_address VARCHAR(20),
updated_at TIMESTAMP WITHOUT TIME ZONE
);

create table {database}.{schema}.snapshot_expected (
id INTEGER,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
ip_address VARCHAR(20),

-- snapshotting fields
updated_at TIMESTAMP WITHOUT TIME ZONE,
test_valid_from TIMESTAMP WITHOUT TIME ZONE,
test_valid_to TIMESTAMP WITHOUT TIME ZONE,
test_scd_id TEXT,
test_updated_at TIMESTAMP WITHOUT TIME ZONE
);


-- seed inserts
-- use the same email for two users to verify that duplicated check_cols values
-- are handled appropriately
insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values
(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'),
(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'),
(3, 'Rachel', 'Moreno', '[email protected]', 'Female', '31.222.249.23', '2016-04-05 02:05:30'),
(4, 'Ralph', 'Turner', '[email protected]', 'Male', '157.83.76.114', '2016-08-08 00:06:51'),
(5, 'Laura', 'Gonzales', '[email protected]', 'Female', '30.54.105.168', '2016-09-01 08:25:38'),
(6, 'Katherine', 'Lopez', '[email protected]', 'Female', '169.138.46.89', '2016-08-30 18:52:11'),
(7, 'Jeremy', 'Hamilton', '[email protected]', 'Male', '231.189.13.133', '2016-07-17 02:09:46'),
(8, 'Heather', 'Rose', '[email protected]', 'Female', '87.165.201.65', '2015-12-29 22:03:56'),
(9, 'Gregory', 'Kelly', '[email protected]', 'Male', '154.209.99.7', '2016-03-24 21:18:16'),
(10, 'Rachel', 'Lopez', '[email protected]', 'Female', '237.165.82.71', '2016-08-20 15:44:49'),
(11, 'Donna', 'Welch', '[email protected]', 'Female', '103.33.110.138', '2016-02-27 01:41:48'),
(12, 'Russell', 'Lawrence', '[email protected]', 'Male', '189.115.73.4', '2016-06-11 03:07:09'),
(13, 'Michelle', 'Montgomery', '[email protected]', 'Female', '243.220.95.82', '2016-06-18 16:27:19'),
(14, 'Walter', 'Castillo', '[email protected]', 'Male', '71.159.238.196', '2016-10-06 01:55:44'),
(15, 'Robin', 'Mills', '[email protected]', 'Female', '172.190.5.50', '2016-10-31 11:41:21'),
(16, 'Raymond', 'Holmes', '[email protected]', 'Male', '148.153.166.95', '2016-10-03 08:16:38'),
(17, 'Gary', 'Bishop', '[email protected]', 'Male', '161.108.182.13', '2016-08-29 19:35:20'),
(18, 'Anna', 'Riley', '[email protected]', 'Female', '253.31.108.22', '2015-12-11 04:34:27'),
(19, 'Sarah', 'Knight', '[email protected]', 'Female', '222.220.3.177', '2016-09-26 00:49:06'),
(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19');


-- populate snapshot table
insert into {database}.{schema}.snapshot_expected (
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
test_valid_from,
test_valid_to,
test_updated_at,
test_scd_id
)

select
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
-- fields added by snapshotting
updated_at as test_valid_from,
null::timestamp as test_valid_to,
updated_at as test_updated_at,
md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id
from {database}.{schema}.seed;
Loading

0 comments on commit db69473

Please sign in to comment.