Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: nullable error_after in source #3955

Merged
merged 17 commits into from
Oct 26, 2021
19 changes: 12 additions & 7 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,25 @@ def plural(self) -> str:


@dataclass
class Time(dbtClassMixin, Replaceable):
count: int
period: TimePeriod
class Time(dbtClassMixin, Mergeable):
count: Optional[int] = None
period: Optional[TimePeriod] = None

def exceeded(self, actual_age: float) -> bool:
kwargs = {self.period.plural(): self.count}
if self.period is None or self.count is None:
return False
kwargs: Dict[str, int] = {self.period.plural(): self.count}
difference = timedelta(**kwargs).total_seconds()
return actual_age > difference

def __bool__(self):
return self.count is not None and self.period is not None


@dataclass
class FreshnessThreshold(dbtClassMixin, Mergeable):
warn_after: Optional[Time] = None
error_after: Optional[Time] = None
warn_after: Optional[Time] = field(default_factory=Time)
error_after: Optional[Time] = field(default_factory=Time)
filter: Optional[str] = None

def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus":
Expand All @@ -188,7 +193,7 @@ def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus":
return FreshnessStatus.Pass

def __bool__(self):
return self.warn_after is not None or self.error_after is not None
return bool(self.warn_after) or bool(self.error_after)


@dataclass
Expand Down
19 changes: 18 additions & 1 deletion core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
UnparsedSourceTableDefinition,
FreshnessThreshold,
UnparsedColumn,
Time
)
from dbt.exceptions import warn_or_error, InternalException
from dbt.node_types import NodeType
Expand Down Expand Up @@ -344,11 +345,27 @@ def get_unused_msg(
return '\n'.join(msg)


def merge_freshness_time_thresholds(
base: Optional[Time], update: Optional[Time]
) -> Optional[Time]:
if base and update:
return base.merged(update)
else:
return update or base
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • ✅ If base and update, merge them. This will apply if update is {}, too
  • ✅ If base is None, take update
  • ❌ If update is None (override!), we need to return None

So something like:

def merge_freshness_time_thresholds(
    base: Optional[Time], update: Optional[Time]
) -> Optional[Time]:
    if base and update:
        return base.merged(update)
    elif update is None:
        return None
    else:
        return update or base

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed 🙈,

Thanks @jtcohen6, update done and the test is fixed ✔️



def merge_freshness(
base: Optional[FreshnessThreshold], update: Optional[FreshnessThreshold]
) -> Optional[FreshnessThreshold]:
if base is not None and update is not None:
return base.merged(update)
merged_freshness = base.merged(update)
# merge one level deeper the error_after and warn_after thresholds
merged_error_after = merge_freshness_time_thresholds(base.error_after, update.error_after)
merged_warn_after = merge_freshness_time_thresholds(base.warn_after, update.warn_after)

merged_freshness.error_after = merged_error_after
merged_freshness.warn_after = merged_warn_after
return merged_freshness
elif base is None and update is not None:
return update
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1543,7 +1543,7 @@ def expected_seeded_manifest(self, model_database=None, quote_model=False):
'database': self.default_database,
'description': 'My table',
'external': None,
'freshness': {'error_after': None, 'warn_after': None, 'filter': None},
'freshness': {'error_after': {'count': None, 'period': None}, 'warn_after': {'count': None, 'period': None}, 'filter': None},
'identifier': 'seed',
'loaded_at_field': None,
'loader': 'a_loader',
Expand Down Expand Up @@ -1991,7 +1991,7 @@ def expected_postgres_references_manifest(self, model_database=None):
'database': self.default_database,
'description': 'My table',
'external': None,
'freshness': {'error_after': None, 'warn_after': None, 'filter': None},
'freshness': {'error_after': {'count': None, 'period': None}, 'warn_after': {'count': None, 'period': None}, 'filter': None},
'identifier': 'seed',
'loaded_at_field': None,
'loader': 'a_loader',
Expand Down
1 change: 1 addition & 0 deletions test/unit/test_contracts_graph_parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,7 @@ def complex_parsed_source_definition_dict():
},
'freshness': {
'warn_after': {'period': 'hour', 'count': 1},
'error_after': {}
},
'loaded_at_field': 'loaded_at',
'unrendered_config': {},
Expand Down
19 changes: 8 additions & 11 deletions test/unit/test_contracts_graph_unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ class TestFreshnessThreshold(ContractTestCase):
ContractType = FreshnessThreshold

def test_empty(self):
empty = self.ContractType(None, None)
self.assert_symmetric(empty, {})
empty = self.ContractType()
self.assert_symmetric(empty, {'error_after': {}, 'warn_after': {}})
self.assertEqual(empty.status(float('Inf')), FreshnessStatus.Pass)
self.assertEqual(empty.status(0), FreshnessStatus.Pass)

Expand Down Expand Up @@ -209,15 +209,12 @@ def test_merged(self):
)
threshold = self.ContractType(
warn_after=Time(count=18, period=TimePeriod.hour),
error_after=Time(count=2, period=TimePeriod.day),
error_after=Time(count=None, period=None),
)
self.assertEqual(threshold, t1.merged(t2))

error_seconds = timedelta(days=3).total_seconds()
warn_seconds = timedelta(days=1).total_seconds()
pass_seconds = timedelta(hours=3).total_seconds()
self.assertEqual(threshold.status(
error_seconds), FreshnessStatus.Error)
self.assertEqual(threshold.status(warn_seconds), FreshnessStatus.Warn)
self.assertEqual(threshold.status(pass_seconds), FreshnessStatus.Pass)

Expand Down Expand Up @@ -252,7 +249,7 @@ def test_defaults(self):
to_dict = {
'name': 'foo',
'description': '',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'quoting': {},
'tables': [],
'loader': '',
Expand All @@ -278,7 +275,7 @@ def test_contents(self):
'description': 'a description',
'quoting': {'database': False},
'loader': 'some_loader',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'tables': [],
'meta': {},
'tags': [],
Expand Down Expand Up @@ -312,7 +309,7 @@ def test_table_defaults(self):
'name': 'foo',
'description': '',
'loader': '',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'quoting': {},
'meta': {},
'tables': [
Expand All @@ -323,7 +320,7 @@ def test_table_defaults(self):
'tests': [],
'columns': [],
'quoting': {},
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'meta': {},
'tags': [],
},
Expand All @@ -334,7 +331,7 @@ def test_table_defaults(self):
'tests': [],
'columns': [],
'quoting': {'database': True},
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'meta': {},
'tags': [],
},
Expand Down