Skip to content

Commit

Permalink
[#4504] Use mashumaro for logging serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Dec 17, 2021
1 parent 7c46b78 commit 6da0dec
Show file tree
Hide file tree
Showing 24 changed files with 595 additions and 724 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Reimplement log message to use adapter name instead of the object method. ([#4501](https://github.com/dbt-labs/dbt-core/pull/4501))
- Issue better error message for incompatible schemas ([#4470](https://github.com/dbt-labs/dbt-core/pull/4442), [#4497](https://github.com/dbt-labs/dbt-core/pull/4497))
- Remove secrets from error related to packages. ([#4507](https://github.com/dbt-labs/dbt-core/pull/4507))
- Some structured logging serialization cleanup ([#4504](https://github.com/dbt-labs/dbt-core/issues/4504), [#4505](https://github.com/dbt-labs/dbt-core/pull/4505))

### Docs
- Fix missing data on exposures in docs ([#4467](https://github.com/dbt-labs/dbt-core/issues/4467))
Expand Down
21 changes: 10 additions & 11 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,19 @@ def add_link(self, referenced, dependent):
:raises InternalError: If either entry does not exist.
"""
ref_key = _make_key(referenced)
dep_key = _make_key(dependent)
if (ref_key.database, ref_key.schema) not in self:
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
fire_event(UncachedRelation(dep_key=dependent, ref_key=ref_key))
fire_event(UncachedRelation(dep_key=dep_key, ref_key=ref_key))
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
referenced = referenced.replace(
type=referenced.External
)
self.add(referenced)

dep_key = _make_key(dependent)
if dep_key not in self.relations:
# Insert a dummy "external" relation.
dependent = dependent.replace(
Expand Down Expand Up @@ -342,17 +341,17 @@ def _remove_refs(self, keys):
for cached in self.relations.values():
cached.release_references(keys)

def _drop_cascade_relation(self, dropped):
def _drop_cascade_relation(self, dropped_key):
"""Drop the given relation and cascade it appropriately to all
dependent relations.
:param _CachedRelation dropped: An existing _CachedRelation to drop.
"""
if dropped not in self.relations:
fire_event(DropMissingRelation(relation=dropped))
if dropped_key not in self.relations:
fire_event(DropMissingRelation(relation=dropped_key))
return
consequences = self.relations[dropped].collect_consequences()
fire_event(DropCascade(dropped=dropped, consequences=consequences))
consequences = self.relations[dropped_key].collect_consequences()
fire_event(DropCascade(dropped=dropped_key, consequences=consequences))
self._remove_refs(consequences)

def drop(self, relation):
Expand All @@ -366,10 +365,10 @@ def drop(self, relation):
:param str schema: The schema of the relation to drop.
:param str identifier: The identifier of the relation to drop.
"""
dropped = _make_key(relation)
fire_event(DropRelation(dropped=dropped))
dropped_key = _make_key(relation)
fire_event(DropRelation(dropped=dropped_key))
with self.lock:
self._drop_cascade_relation(dropped)
self._drop_cascade_relation(dropped_key)

def _rename_relation(self, old_key, new_relation):
"""Rename a relation named old_key to new_key, updating references.
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def load_plugin(self, name: str) -> Type[Credentials]:
# if we failed to import the target module in particular, inform
# the user about it via a runtime error
if exc.name == 'dbt.adapters.' + name:
fire_event(AdapterImportError(exc=exc))
fire_event(AdapterImportError(exc=str(exc)))
raise RuntimeException(f'Could not find adapter type {name}!')
# otherwise, the error had to have come from some underlying
# library. Log the stack trace.
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/sql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def expand_column_types(self, goal, current):
ColTypeChange(
orig_type=target_column.data_type,
new_type=new_type,
table=current,
table=_make_key(current),
)
)

Expand Down
4 changes: 2 additions & 2 deletions core/dbt/context/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,9 @@ def log(msg: str, info: bool = False) -> str:
{% endmacro %}"
"""
if info:
fire_event(MacroEventInfo(msg))
fire_event(MacroEventInfo(msg=msg))
else:
fire_event(MacroEventDebug(msg))
fire_event(MacroEventDebug(msg=msg))
return ''

@contextproperty
Expand Down
27 changes: 27 additions & 0 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ def write_node(self, target_path: str, subdirectory: str, payload: str):
write_file(full_path, payload)
return full_path

@property
def node_info(self):
node_info = {
"node_path": self.path,
"node_name": self.name,
"unique_id": self.unique_id,
"resource_type": self.resource_type.value,
"materialized": self.config.get('materialized'),
"node_status": str(self._event_status.get('node_status')),
"node_started_at": self._event_status.get("started_at"),
"node_finished_at": self._event_status.get("finished_at")
}
return node_info


T = TypeVar('T', bound='ParsedNode')

Expand Down Expand Up @@ -738,6 +752,19 @@ def has_freshness(self):
def search_name(self):
return f'{self.source_name}.{self.name}'

@property
def node_info(self):
node_info = {
"node_path": self.path,
"node_name": self.name,
"unique_id": self.unique_id,
"resource_type": self.resource_type.value,
"node_status": str(self._event_status.get('node_status')),
"node_started_at": self._event_status.get("started_at"),
"node_finished_at": self._event_status.get("finished_at")
}
return node_info


@dataclass
class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
Expand Down
22 changes: 2 additions & 20 deletions core/dbt/events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,10 @@ class PartialParsingDeletedExposure(DebugLevel, Cli, File):

## Optional (based on your event)

- Events associated with node status changes must have `report_node_data` passed in and be extended with `NodeInfo`
- define `asdict` if your data is not serializable to json
- Events associated with node status changes must be extended with `NodeInfo` which contains a node_info attribute

Example
```
@dataclass
class SuperImportantNodeEvent(InfoLevel, File, NodeInfo):
node_name: str
run_result: RunResult
report_node_data: ParsedModelNode # may vary
code: str = "Q036"
def message(self) -> str:
return f"{self.node_name} had overly verbose result of {run_result}"
@classmethod
def asdict(cls, data: list) -> dict:
return dict((k, str(v)) for k, v in data)
```

All values other than `code` and `report_node_data` will be included in the `data` node of the json log output.
All values other than `code` and `node_info` will be included in the `data` node of the json log output.

Once your event has been added, add a dummy call to your new event at the bottom of `types.py` and also add your new Event to the list `sample_values` in `test/unit/test_events.py'.

Expand Down
67 changes: 12 additions & 55 deletions core/dbt/events/base_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from abc import ABCMeta, abstractmethod, abstractproperty
from dataclasses import dataclass
from abc import ABCMeta
from dataclasses import dataclass, field
from dbt.events.serialization import dbtClassEventMixin
from datetime import datetime
import os
import threading
from typing import Any, Optional
from typing import Any, Optional, Dict

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# These base types define the _required structure_ for the concrete event #
Expand All @@ -16,19 +17,6 @@ class Cache():
pass


@dataclass
class Node():
node_path: str
node_name: str
unique_id: str
resource_type: str
materialized: str
node_status: str
node_started_at: datetime
node_finished_at: Optional[datetime]
type: str = 'node_status'


@dataclass
class ShowException():
# N.B.:
Expand All @@ -42,31 +30,28 @@ def __post_init__(self):


# TODO add exhaustiveness checking for subclasses
# can't use ABCs with @dataclass because of https://github.com/python/mypy/issues/5374
# top-level superclass for all events
class Event(metaclass=ABCMeta):
@dataclass
class Event(dbtClassEventMixin, metaclass=ABCMeta):
# fields that should be on all events with their default implementations
log_version: int = 1
ts: Optional[datetime] = None # use getter for non-optional
ts_rfc3339: Optional[str] = None # use getter for non-optional
pid: Optional[int] = None # use getter for non-optional
node_info: Optional[Node]

# four digit string code that uniquely identifies this type of event
# uniqueness and valid characters are enforced by tests
@abstractproperty
@property
@staticmethod
def code() -> str:
raise Exception("code() not implemented for event")

# do not define this yourself. inherit it from one of the above level types.
@abstractmethod
def level_tag(self) -> str:
raise Exception("level_tag not implemented for Event")

# Solely the human readable message. Timestamps and formatting will be added by the logger.
# Must override yourself
@abstractmethod
def message(self) -> str:
raise Exception("msg not implemented for Event")

Expand Down Expand Up @@ -99,21 +84,6 @@ def get_invocation_id(cls) -> str:
from dbt.events.functions import get_invocation_id
return get_invocation_id()

# default dict factory for all events. can override on concrete classes.
@classmethod
def asdict(cls, data: list) -> dict:
d = dict()
for k, v in data:
# stringify all exceptions
if isinstance(v, Exception) or isinstance(v, BaseException):
d[k] = str(v)
# skip all binary data
elif isinstance(v, bytes):
continue
else:
d[k] = v
return d


# in preparation for #3977
class TestLevel(Event):
Expand Down Expand Up @@ -141,24 +111,6 @@ def level_tag(self) -> str:
return "error"


@dataclass # type: ignore
class NodeInfo(Event, metaclass=ABCMeta):
report_node_data: Any # Union[ParsedModelNode, ...] TODO: resolve circular imports

def get_node_info(self):
node_info = Node(
node_path=self.report_node_data.path,
node_name=self.report_node_data.name,
unique_id=self.report_node_data.unique_id,
resource_type=self.report_node_data.resource_type.value,
materialized=self.report_node_data.config.get('materialized'),
node_status=str(self.report_node_data._event_status.get('node_status')),
node_started_at=self.report_node_data._event_status.get("started_at"),
node_finished_at=self.report_node_data._event_status.get("finished_at")
)
return node_info


# prevents an event from going to the file
class NoFile():
pass
Expand All @@ -167,3 +119,8 @@ class NoFile():
# prevents an event from going to stdout
class NoStdOut():
pass


@dataclass
class NodeInfo():
node_info: Dict[str, Any] = field(default_factory=dict)
19 changes: 7 additions & 12 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import uuid
import threading
from typing import Any, Callable, Dict, List, Optional, Union
import dataclasses
from collections import deque


Expand Down Expand Up @@ -138,25 +137,21 @@ def event_to_serializable_dict(
ts_fn: Callable[[datetime], str]
) -> Dict[str, Any]:
data = dict()
node_info = dict()
node_info: Dict[str, Any] = dict()
log_line = dict()
try:
log_line = dataclasses.asdict(e, dict_factory=type(e).asdict)
except AttributeError:
log_line = e.to_dict()
except AttributeError as exc:
event_type = type(e).__name__
raise Exception( # TODO this may hang async threads
f"type {event_type} is not serializable to json."
f" First make sure that the call sites for {event_type} match the type hints"
f" and if they do, you can override the dataclass method `asdict` in {event_type} in"
" types.py to define your own serialization function to a dictionary of valid json"
" types"
f"type {event_type} is not serializable. {str(exc)}"
)

if isinstance(e, NodeInfo):
node_info = dataclasses.asdict(e.get_node_info())
node_info = e.node_info

for field, value in log_line.items(): # type: ignore[attr-defined]
if field not in ["code", "report_node_data"]:
if field not in ["code", "node_info", "log_version", "ts", "ts_rfc3339", "pid"]:
data[field] = value

event_dict = {
Expand Down Expand Up @@ -315,7 +310,7 @@ def fire_event(e: Event) -> None:
global EVENT_HISTORY
if len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
EVENT_HISTORY.append(e)
fire_event(EventBufferFull())
EVENT_HISTORY.append(EventBufferFull())
else:
EVENT_HISTORY.append(e)

Expand Down
40 changes: 40 additions & 0 deletions core/dbt/events/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from mashumaro import DataClassDictMixin
from mashumaro.config import (
BaseConfig as MashBaseConfig
)
from mashumaro.types import SerializationStrategy
from datetime import datetime
from dateutil.parser import parse
from typing import cast


class ExceptionSerialization(SerializationStrategy):
def serialize(self, value):
out = str(value)
return out

def deserialize(self, value):
return (Exception(value))


class DateTimeSerialization(SerializationStrategy):
def serialize(self, value):
out = value.isoformat()
# Assume UTC if timezone is missing
if value.tzinfo is None:
out += "Z"
return out

def deserialize(self, value):
return (
value if isinstance(value, datetime) else parse(cast(str, value))
)


class dbtClassEventMixin(DataClassDictMixin):

class Config(MashBaseConfig):
serialization_strategy = {
Exception: ExceptionSerialization(),
datetime: DateTimeSerialization(),
}
Loading

0 comments on commit 6da0dec

Please sign in to comment.