Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: nullable error_after in source #3955

Merged
merged 17 commits into from
Oct 26, 2021
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## dbt-core 1.0.0 (Release TBD)

### Features
- Allow nullable `error_after` in source freshness ([#3874](https://github.com/dbt-labs/dbt-core/issues/3874), [#3955](https://github.com/dbt-labs/dbt-core/pull/3955))

Contributors:
- [@kadero](https://github.com/kadero) ([3955](https://github.com/dbt-labs/dbt-core/pull/3955))

## dbt-core 1.0.0b2 (October 25, 2021)

### Breaking changes
Expand Down
19 changes: 12 additions & 7 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,25 @@ def plural(self) -> str:


@dataclass
class Time(dbtClassMixin, Replaceable):
count: int
period: TimePeriod
class Time(dbtClassMixin, Mergeable):
count: Optional[int] = None
period: Optional[TimePeriod] = None

def exceeded(self, actual_age: float) -> bool:
kwargs = {self.period.plural(): self.count}
if self.period is None or self.count is None:
return False
kwargs: Dict[str, int] = {self.period.plural(): self.count}
difference = timedelta(**kwargs).total_seconds()
return actual_age > difference

def __bool__(self):
return self.count is not None and self.period is not None


@dataclass
class FreshnessThreshold(dbtClassMixin, Mergeable):
warn_after: Optional[Time] = None
error_after: Optional[Time] = None
warn_after: Optional[Time] = field(default_factory=Time)
error_after: Optional[Time] = field(default_factory=Time)
filter: Optional[str] = None

def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus":
Expand All @@ -193,7 +198,7 @@ def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus":
return FreshnessStatus.Pass

def __bool__(self):
return self.warn_after is not None or self.error_after is not None
return bool(self.warn_after) or bool(self.error_after)


@dataclass
Expand Down
21 changes: 20 additions & 1 deletion core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
UnparsedSourceTableDefinition,
FreshnessThreshold,
UnparsedColumn,
Time
)
from dbt.exceptions import warn_or_error, InternalException
from dbt.node_types import NodeType
Expand Down Expand Up @@ -339,11 +340,29 @@ def get_unused_msg(
return '\n'.join(msg)


def merge_freshness_time_thresholds(
base: Optional[Time], update: Optional[Time]
) -> Optional[Time]:
if base and update:
return base.merged(update)
elif update is None:
return None
else:
return update or base
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • ✅ If base and update, merge them. This will apply if update is {}, too
  • ✅ If base is None, take update
  • ❌ If update is None (override!), we need to return None

So something like:

def merge_freshness_time_thresholds(
    base: Optional[Time], update: Optional[Time]
) -> Optional[Time]:
    if base and update:
        return base.merged(update)
    elif update is None:
        return None
    else:
        return update or base

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed 🙈,

Thanks @jtcohen6, update done and the test is fixed ✔️



def merge_freshness(
base: Optional[FreshnessThreshold], update: Optional[FreshnessThreshold]
) -> Optional[FreshnessThreshold]:
if base is not None and update is not None:
return base.merged(update)
merged_freshness = base.merged(update)
# merge one level deeper the error_after and warn_after thresholds
merged_error_after = merge_freshness_time_thresholds(base.error_after, update.error_after)
merged_warn_after = merge_freshness_time_thresholds(base.warn_after, update.warn_after)

merged_freshness.error_after = merged_error_after
merged_freshness.warn_after = merged_warn_after
return merged_freshness
elif base is None and update is not None:
return update
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ def expected_seeded_manifest(self, model_database=None, quote_model=False):
'database': self.default_database,
'description': 'My table',
'external': None,
'freshness': {'error_after': None, 'warn_after': None, 'filter': None},
'freshness': {'error_after': {'count': None, 'period': None}, 'warn_after': {'count': None, 'period': None}, 'filter': None},
'identifier': 'seed',
'loaded_at_field': None,
'loader': 'a_loader',
Expand Down Expand Up @@ -1542,7 +1542,7 @@ def expected_postgres_references_manifest(self, model_database=None):
'database': self.default_database,
'description': 'My table',
'external': None,
'freshness': {'error_after': None, 'warn_after': None, 'filter': None},
'freshness': {'error_after': {'count': None, 'period': None}, 'warn_after': {'count': None, 'period': None}, 'filter': None},
'identifier': 'seed',
'loaded_at_field': None,
'loader': 'a_loader',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: 2
sources:
- name: test_source
loader: custom
freshness: # default freshness
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
loaded_at_field: loaded_at
quoting:
identifier: True
tags:
- my_test_source_tag
tables:
- name: source_a
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness:
warn_after: {count: 6, period: hour}
# use the default error_after defined above
- name: source_b
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness:
warn_after: {count: 6, period: hour}
error_after: {} # use the default error_after defined above
- name: source_c
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness:
warn_after: {count: 6, period: hour}
error_after: null # override: disable error_after for this table
- name: source_d
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness:
warn_after: {count: 6, period: hour}
error_after: {count: 72, period: hour} # override: use this new behavior instead of error_after defined above
- name: source_e
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness: null # override: disable freshness for this table
76 changes: 76 additions & 0 deletions test/integration/042_sources_test/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def test_postgres_basic_source_def(self):
['expected_multi_source', 'multi_source_model'])
results = self.run_dbt_with_vars(['test'])
self.assertEqual(len(results), 6)
print(results)

@use_profile('postgres')
def test_postgres_source_selector(self):
Expand Down Expand Up @@ -400,6 +401,81 @@ def test_postgres_source_freshness_selection_graph_operation(self):
self.assertEqual(results[0].status, 'pass')
self._assert_freshness_results('target/ancestor_source.json', 'pass')

class TestOverrideSourceFreshness(SuccessfulSourcesTest):

@property
def models(self):
return "override_freshness_models"

@staticmethod
def get_result_from_unique_id(data, unique_id):
try:
return list(filter(lambda x : x['unique_id'] == unique_id, data['results']))[0]
except IndexError:
raise f"No result for the given unique_id. unique_id={unique_id}"

def _run_override_source_freshness(self):
self._set_updated_at_to(timedelta(hours=-30))
self.freshness_start_time = datetime.utcnow()

path = 'target/pass_source.json'
results = self.run_dbt_with_vars(
['source', 'freshness', '-o', path],
expect_pass=False
)
self.assertEqual(len(results), 4) # freshness disabled for source_e

self.assertTrue(os.path.exists(path))
with open(path) as fp:
data = json.load(fp)

result_source_a = self.get_result_from_unique_id(data, 'source.test.test_source.source_a')
self.assertEqual(result_source_a['status'], 'error')
self.assertEqual(
result_source_a['criteria'],
{
'warn_after': {'count': 6, 'period': 'hour'},
'error_after': {'count': 24, 'period': 'hour'},
'filter': None
}
)

result_source_b = self.get_result_from_unique_id(data, 'source.test.test_source.source_b')
self.assertEqual(result_source_b['status'], 'error')
self.assertEqual(
result_source_b['criteria'],
{
'warn_after': {'count': 6, 'period': 'hour'},
'error_after': {'count': 24, 'period': 'hour'},
'filter': None
}
)

result_source_c = self.get_result_from_unique_id(data, 'source.test.test_source.source_c')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kadero When you get a chance:

Thanks again for the contribution!

Hello @jtcohen6 @nathaniel-may 👋,

As we agreed, I created an integration test under test/integration/042_sources_test/, where a child source (called source_c) disables the error_after set by its parent source. 🙂

Unfortunately, the test didn't pass, everything goes on as if the child source inherit the error_after set by its parent 😕.

Do you have any clues, are we missing something?

Many thanks your support 🙏

self.assertEqual(result_source_c['status'], 'warn')
Copy link
Contributor Author

@kadero kadero Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.assertEqual(
result_source_c['criteria'],
{
'warn_after': {'count': 6, 'period': 'hour'},
'error_after': None,
Copy link
Contributor Author

@kadero kadero Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return 'error_after': {'count': 24, 'period': 'hour'} instead of 'error_after': None, 😕

'filter': None
}
)

result_source_d = self.get_result_from_unique_id(data, 'source.test.test_source.source_d')
self.assertEqual(result_source_d['status'], 'warn')
self.assertEqual(
result_source_d['criteria'],
{
'warn_after': {'count': 6, 'period': 'hour'},
'error_after': {'count': 72, 'period': 'hour'},
'filter': None
}
)

@use_profile('postgres')
def test_postgres_override_source_freshness(self):
self._run_override_source_freshness()

class TestSourceFreshnessErrors(SuccessfulSourcesTest):
@property
Expand Down
1 change: 1 addition & 0 deletions test/unit/test_contracts_graph_parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,7 @@ def complex_parsed_source_definition_dict():
},
'freshness': {
'warn_after': {'period': 'hour', 'count': 1},
'error_after': {}
},
'loaded_at_field': 'loaded_at',
'unrendered_config': {},
Expand Down
19 changes: 8 additions & 11 deletions test/unit/test_contracts_graph_unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ class TestFreshnessThreshold(ContractTestCase):
ContractType = FreshnessThreshold

def test_empty(self):
empty = self.ContractType(None, None)
self.assert_symmetric(empty, {})
empty = self.ContractType()
self.assert_symmetric(empty, {'error_after': {}, 'warn_after': {}})
self.assertEqual(empty.status(float('Inf')), FreshnessStatus.Pass)
self.assertEqual(empty.status(0), FreshnessStatus.Pass)

Expand Down Expand Up @@ -209,15 +209,12 @@ def test_merged(self):
)
threshold = self.ContractType(
warn_after=Time(count=18, period=TimePeriod.hour),
error_after=Time(count=2, period=TimePeriod.day),
error_after=Time(count=None, period=None),
)
self.assertEqual(threshold, t1.merged(t2))

error_seconds = timedelta(days=3).total_seconds()
warn_seconds = timedelta(days=1).total_seconds()
pass_seconds = timedelta(hours=3).total_seconds()
self.assertEqual(threshold.status(
error_seconds), FreshnessStatus.Error)
self.assertEqual(threshold.status(warn_seconds), FreshnessStatus.Warn)
self.assertEqual(threshold.status(pass_seconds), FreshnessStatus.Pass)

Expand Down Expand Up @@ -252,7 +249,7 @@ def test_defaults(self):
to_dict = {
'name': 'foo',
'description': '',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'quoting': {},
'tables': [],
'loader': '',
Expand All @@ -278,7 +275,7 @@ def test_contents(self):
'description': 'a description',
'quoting': {'database': False},
'loader': 'some_loader',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'tables': [],
'meta': {},
'tags': [],
Expand Down Expand Up @@ -312,7 +309,7 @@ def test_table_defaults(self):
'name': 'foo',
'description': '',
'loader': '',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'quoting': {},
'meta': {},
'tables': [
Expand All @@ -323,7 +320,7 @@ def test_table_defaults(self):
'tests': [],
'columns': [],
'quoting': {},
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'meta': {},
'tags': [],
},
Expand All @@ -334,7 +331,7 @@ def test_table_defaults(self):
'tests': [],
'columns': [],
'quoting': {'database': True},
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'meta': {},
'tags': [],
},
Expand Down