Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified Backport of #4619 #4656

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ def add(self, relation):
"""
cached = _CachedRelation(relation)
fire_event(AddRelation(relation=_make_key(cached)))
fire_event(DumpBeforeAddGraph(dump=self.dump_graph()))
fire_event(DumpBeforeAddGraph(dump=self.dump_graph))

with self.lock:
self._setdefault(cached)
fire_event(DumpAfterAddGraph(dump=self.dump_graph()))
fire_event(DumpAfterAddGraph(dump=self.dump_graph))

def _remove_refs(self, keys):
"""Removes all references to all entries in keys. This does not
Expand Down Expand Up @@ -441,15 +441,15 @@ def rename(self, old, new):
new_key = _make_key(new)
fire_event(RenameSchema(old_key=old_key, new_key=new_key))

fire_event(DumpBeforeRenameSchema(dump=self.dump_graph()))
fire_event(DumpBeforeRenameSchema(dump=self.dump_graph))

with self.lock:
if self._check_rename_constraints(old_key, new_key):
self._rename_relation(old_key, _CachedRelation(new))
else:
self._setdefault(_CachedRelation(new))

fire_event(DumpAfterRenameSchema(dump=self.dump_graph()))
fire_event(DumpAfterRenameSchema(dump=self.dump_graph))

def get_relations(
self, database: Optional[str], schema: Optional[str]
Expand Down
1 change: 1 addition & 0 deletions core/dbt/events/base_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
from typing import Any, Optional


# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# These base types define the _required structure_ for the concrete event #
# types defined in types.py #
Expand Down
8 changes: 8 additions & 0 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ def event_to_serializable_dict(
data = dict()
node_info = dict()
log_line = dict()
# hack to force evaluation of the lazy dump call before it hits
# asdict. asdict is not applying the dict_factory early enough for
# it to be caught there.
if "Dump" in e.__class__.__name__ and hasattr(e, 'dump'):
try:
e.dump = e.dump() # type: ignore[attr-defined]
except TypeError:
pass
try:
log_line = dataclasses.asdict(e, dict_factory=type(e).asdict)
except AttributeError:
Expand Down
54 changes: 37 additions & 17 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from dbt.events.format import format_fancy_output_line, pluralize
from dbt.node_types import NodeType
from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar


# The classes in this file represent the data necessary to describe a
Expand Down Expand Up @@ -699,42 +699,62 @@ def message(self) -> str:

@dataclass
class DumpBeforeAddGraph(DebugLevel, Cli, File, Cache):
# large value. delay not necessary since every debug level message is logged anyway.
dump: Dict[str, List[str]]
# callable for lazy evaluation
# this will be transformed into a dict _before_ the message is created.
dump: Callable[[], Dict[str, List[str]]]
code: str = "E031"

def message(self) -> str:
return f"before adding : {self.dump}"
try:
self.dump = self.dump() # type: ignore[assignment, misc]
except TypeError:
pass
return f"before adding : {self.dump}" # type: ignore[misc]


@dataclass
class DumpAfterAddGraph(DebugLevel, Cli, File, Cache):
# large value. delay not necessary since every debug level message is logged anyway.
dump: Dict[str, List[str]]
# callable for lazy evaluation
# this will be transformed into a dict _before_ the message is created.
dump: Callable[[], Dict[str, List[str]]]
code: str = "E032"

def message(self) -> str:
return f"after adding: {self.dump}"
try:
self.dump = self.dump() # type: ignore[assignment, misc]
except TypeError:
pass
return f"after adding: {self.dump}" # type: ignore[misc]


@dataclass
class DumpBeforeRenameSchema(DebugLevel, Cli, File, Cache):
# large value. delay not necessary since every debug level message is logged anyway.
dump: Dict[str, List[str]]
# callable for lazy evaluation
# this will be transformed into a dict _before_ the message is created.
dump: Callable[[], Dict[str, List[str]]]
code: str = "E033"

def message(self) -> str:
return f"before rename: {self.dump}"
try:
self.dump = self.dump() # type: ignore[assignment, misc]
except TypeError:
pass
return f"before rename: {self.dump}" # type: ignore[misc]


@dataclass
class DumpAfterRenameSchema(DebugLevel, Cli, File, Cache):
# large value. delay not necessary since every debug level message is logged anyway.
dump: Dict[str, List[str]]
# callable for lazy evaluation
# this will be transformed into a dict _before_ the message is created.
dump: Callable[[], Dict[str, List[str]]]
code: str = "E034"

def message(self) -> str:
return f"after rename: {self.dump}"
try:
self.dump = self.dump() # type: ignore[assignment, misc]
except TypeError:
pass
return f"after rename: {self.dump}" # type: ignore[misc]


@dataclass
Expand Down Expand Up @@ -2499,10 +2519,10 @@ def message(self) -> str:
old_key=_ReferenceKey(database="", schema="", identifier=""),
new_key=_ReferenceKey(database="", schema="", identifier="")
)
DumpBeforeAddGraph(dict())
DumpAfterAddGraph(dict())
DumpBeforeRenameSchema(dict())
DumpAfterRenameSchema(dict())
DumpBeforeAddGraph(lambda: dict())
DumpAfterAddGraph(lambda: dict())
DumpBeforeRenameSchema(lambda: dict())
DumpAfterRenameSchema(lambda: dict())
AdapterImportError(ModuleNotFoundError())
PluginLoadError()
SystemReportReturnCode(returncode=0)
Expand Down
175 changes: 169 additions & 6 deletions test/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ParsedModelNode, NodeConfig, DependsOn
)
from dbt.contracts.files import FileHash
from typing import Generic, TypeVar

# takes in a class and finds any subclasses for it
def get_all_subclasses(cls):
Expand Down Expand Up @@ -222,10 +223,10 @@ def MockNode():
old_key=_ReferenceKey(database="", schema="", identifier=""),
new_key=_ReferenceKey(database="", schema="", identifier="")
),
DumpBeforeAddGraph(dict()),
DumpAfterAddGraph(dict()),
DumpBeforeRenameSchema(dict()),
DumpAfterRenameSchema(dict()),
DumpBeforeAddGraph(lambda: dict()),
DumpAfterAddGraph(lambda: dict()),
DumpBeforeRenameSchema(lambda: dict()),
DumpAfterRenameSchema(lambda: dict()),
AdapterImportError(ModuleNotFoundError()),
PluginLoadError(),
SystemReportReturnCode(returncode=0),
Expand Down Expand Up @@ -411,7 +412,9 @@ class TestEventJSONSerialization(TestCase):
# event types that take `Any` are not possible to test in this way since some will serialize
# just fine and others won't.
def test_all_serializable(self):
all_non_abstract_events = set(filter(lambda x: not inspect.isabstract(x), get_all_subclasses(Event)))
no_test = [DummyDumpEvent]

all_non_abstract_events = set(filter(lambda x: not inspect.isabstract(x) and x not in no_test, get_all_subclasses(Event)))
all_event_values_list = list(map(lambda x: x.__class__, sample_values))
diff = all_non_abstract_events.difference(set(all_event_values_list))
self.assertFalse(diff, f"test is missing concrete values in `sample_values`. Please add the values for the aforementioned event classes")
Expand All @@ -427,4 +430,164 @@ def test_all_serializable(self):
json.dumps(d)
except TypeError as e:
raise Exception(f"{event} is not serializable to json. Originating exception: {e}")



T = TypeVar('T')


@dataclass
class Counter(Generic[T]):
dummy_val: T
count: int = 0

def next(self) -> T:
self.count = self.count + 1
return self.dummy_val

# mashumaro serializer
def _serialize() -> Dict[str, int]:
return {'count': count}


@dataclass
class DummyDumpEvent(InfoLevel, Cache):
code = 'X999'
dump: Callable[[], dict]

def message(self) -> str:
try:
self.dump = self.dump()
except TypeError:
pass
return f"state: {self.dump}"

# mashumaro serializer
def _serialize() -> str:
return "DummyCacheEvent"


# tests that if a cache event uses lazy evaluation for its message
# creation, the evaluation will not be forced for cache events when
# running without `--log-cache-events`.
def skip_cache_event_message_rendering(x: TestCase):
counter = Counter(dict())

# a dummy event that extends `Cache`
e = DummyDumpEvent(counter)

# counter of zero means this potentially expensive function
# (emulating dump_graph) has never been called
x.assertEqual(counter.count, 0)

# call fire_event
event_funcs.fire_event(e)

# assert that the expensive function has STILL not been called
x.assertEqual(counter.count, 0)

# this test checks that every subclass of `Cache` uses the same lazy evaluation
# strategy. This ensures that potentially expensive cache event values are not
# built unless they are needed for logging purposes. It also checks that these
# potentially expensive values are cached, and not evaluated more than once.
def all_cache_events_are_lazy(x):
cache_events = get_all_subclasses(Cache)
matching_classes = []
for clazz in cache_events:
# this body is only testing subclasses of `Cache` that take a param called "dump"

# initialize the counter to return a dictionary (emulating dump_graph)
counter = Counter(dict())

# assert that the counter starts at 0
x.assertEqual(counter.count, 0)

# try to create the cache event to use this counter type
# fails for cache events that don't have a "dump" param
try:
clazz()
except TypeError as err:
print(clazz)
# hack that roughly detects attribute names without an instance of the class
# skips the test event
if 'dump' in str(err) and 'DummyDumpEvent' not in str(clazz):
matching_classes.append(clazz)

# make the class. If this throws, maybe your class didn't use Lazy when it should have
e = clazz(dump = counter.next)

# assert that initializing the event with the counter
# did not evaluate the lazy value
x.assertEqual(counter.count, 0)

# log an event which should trigger evaluation and up
# the counter
event_funcs.fire_event(e)

# assert that the counter increased
x.assertEqual(counter.count, 1)

# fire another event which should reuse the previous value
# not evaluate the function again
event_funcs.fire_event(e)

# assert that the counter did not increase
x.assertEqual(counter.count, 1)

# if the init function doesn't require something named "dump"
# we can just continue
else:
pass

# other exceptions are issues and should be thrown
except Exception as e:
raise e

# we should have exactly 4 matching classes (raise this threshold if we add more)
x.assertEqual(len(matching_classes), 4, f"matching classes:\n{len(matching_classes)}: {matching_classes}")

class SkipsRenderingCacheEventsTEXT(TestCase):

def setUp(self):
flags.LOG_FORMAT = 'text'

def test_skip_cache_event_message_rendering_TEXT(self):
skip_cache_event_message_rendering(self)


class SkipsRenderingCacheEventsJSON(TestCase):

def setUp(self):
flags.LOG_FORMAT = 'json'

def tearDown(self):
flags.LOG_FORMAT = 'text'

def test_skip_cache_event_message_rendering_JSON(self):
skip_cache_event_message_rendering(self)


class TestLazyMemoizationInCacheEventsTEXT(TestCase):

def setUp(self):
flags.LOG_FORMAT = 'text'
flags.LOG_CACHE_EVENTS = True

def tearDown(self):
flags.LOG_CACHE_EVENTS = False

def test_all_cache_events_are_lazy_TEXT(self):
all_cache_events_are_lazy(self)


class TestLazyMemoizationInCacheEventsJSON(TestCase):

def setUp(self):
flags.LOG_FORMAT = 'json'
flags.LOG_CACHE_EVENTS = True

def tearDown(self):
flags.LOG_FORMAT = 'text'
flags.LOG_CACHE_EVENTS = False

def test_all_cache_events_are_lazy_JSON(self):
all_cache_events_are_lazy(self)