Skip to content

Commit

Permalink
[#4504] Use mashumaro for logging serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Jan 7, 2022
1 parent e943b9f commit dec3c1c
Show file tree
Hide file tree
Showing 22 changed files with 365 additions and 474 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

## dbt-core 1.0.1 (January 03, 2022)

## dbt-core 1.0.1rc2(TBD)

### Fixes
- Switch to using mashumaro for logging serialization ([#4504](https://github.com/dbt-labs/dbt-core/issues/4504), [#4505](https://github.com/dbt-labs/dbt-core/pull/4505))

## dbt-core 1.0.1rc1 (December 20, 2021)

### Fixes
Expand Down
21 changes: 10 additions & 11 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,19 @@ def add_link(self, referenced, dependent):
:raises InternalError: If either entry does not exist.
"""
ref_key = _make_key(referenced)
dep_key = _make_key(dependent)
if (ref_key.database, ref_key.schema) not in self:
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
fire_event(UncachedRelation(dep_key=dependent, ref_key=ref_key))
fire_event(UncachedRelation(dep_key=dep_key, ref_key=ref_key))
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
referenced = referenced.replace(
type=referenced.External
)
self.add(referenced)

dep_key = _make_key(dependent)
if dep_key not in self.relations:
# Insert a dummy "external" relation.
dependent = dependent.replace(
Expand Down Expand Up @@ -342,17 +341,17 @@ def _remove_refs(self, keys):
for cached in self.relations.values():
cached.release_references(keys)

def _drop_cascade_relation(self, dropped):
def _drop_cascade_relation(self, dropped_key):
"""Drop the given relation and cascade it appropriately to all
dependent relations.
:param _CachedRelation dropped: An existing _CachedRelation to drop.
"""
if dropped not in self.relations:
fire_event(DropMissingRelation(relation=dropped))
if dropped_key not in self.relations:
fire_event(DropMissingRelation(relation=dropped_key))
return
consequences = self.relations[dropped].collect_consequences()
fire_event(DropCascade(dropped=dropped, consequences=consequences))
consequences = self.relations[dropped_key].collect_consequences()
fire_event(DropCascade(dropped=dropped_key, consequences=consequences))
self._remove_refs(consequences)

def drop(self, relation):
Expand All @@ -366,10 +365,10 @@ def drop(self, relation):
:param str schema: The schema of the relation to drop.
:param str identifier: The identifier of the relation to drop.
"""
dropped = _make_key(relation)
fire_event(DropRelation(dropped=dropped))
dropped_key = _make_key(relation)
fire_event(DropRelation(dropped=dropped_key))
with self.lock:
self._drop_cascade_relation(dropped)
self._drop_cascade_relation(dropped_key)

def _rename_relation(self, old_key, new_relation):
"""Rename a relation named old_key to new_key, updating references.
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/sql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def expand_column_types(self, goal, current):
ColTypeChange(
orig_type=target_column.data_type,
new_type=new_type,
table=current,
table=_make_key(current),
)
)

Expand Down
4 changes: 2 additions & 2 deletions core/dbt/context/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,9 @@ def log(msg: str, info: bool = False) -> str:
{% endmacro %}"
"""
if info:
fire_event(MacroEventInfo(msg))
fire_event(MacroEventInfo(msg=msg))
else:
fire_event(MacroEventDebug(msg))
fire_event(MacroEventDebug(msg=msg))
return ''

@contextproperty
Expand Down
33 changes: 28 additions & 5 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,26 @@ def identifier(self):


@dataclass
class ParsedNodeDefaults(ParsedNodeMandatory):
class NodeInfoMixin():
_event_status: Dict[str, Any] = field(default_factory=dict)

@property
def node_info(self):
node_info = {
"node_path": getattr(self, 'path', None),
"node_name": getattr(self, 'name', None),
"unique_id": getattr(self, 'unique_id', None),
"resource_type": str(getattr(self, 'resource_type', '')),
"materialized": self.config.get('materialized'),
"node_status": str(self._event_status.get('node_status')),
"node_started_at": self._event_status.get("started_at"),
"node_finished_at": self._event_status.get("finished_at")
}
return node_info


@dataclass
class ParsedNodeDefaults(NodeInfoMixin, ParsedNodeMandatory):
tags: List[str] = field(default_factory=list)
refs: List[List[str]] = field(default_factory=list)
sources: List[List[str]] = field(default_factory=list)
Expand All @@ -194,7 +213,6 @@ class ParsedNodeDefaults(ParsedNodeMandatory):
unrendered_config: Dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=lambda: time.time())
config_call_dict: Dict[str, Any] = field(default_factory=dict)
_event_status: Dict[str, Any] = field(default_factory=dict)

def write_node(self, target_path: str, subdirectory: str, payload: str):
if (os.path.basename(self.path) ==
Expand Down Expand Up @@ -610,19 +628,25 @@ def tests(self) -> List[TestDef]:


@dataclass
class ParsedSourceDefinition(
class ParsedSourceMandatory(
UnparsedBaseNode,
HasUniqueID,
HasRelationMetadata,
HasFqn,

):
name: str
source_name: str
source_description: str
loader: str
identifier: str
resource_type: NodeType = field(metadata={'restrict': [NodeType.Source]})


@dataclass
class ParsedSourceDefinition(
NodeInfoMixin,
ParsedSourceMandatory
):
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
Expand All @@ -637,7 +661,6 @@ class ParsedSourceDefinition(
unrendered_config: Dict[str, Any] = field(default_factory=dict)
relation_name: Optional[str] = None
created_at: float = field(default_factory=lambda: time.time())
_event_status: Dict[str, Any] = field(default_factory=dict)

def __post_serialize__(self, dct):
if '_event_status' in dct:
Expand Down
22 changes: 2 additions & 20 deletions core/dbt/events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,10 @@ class PartialParsingDeletedExposure(DebugLevel, Cli, File):

## Optional (based on your event)

- Events associated with node status changes must have `report_node_data` passed in and be extended with `NodeInfo`
- define `asdict` if your data is not serializable to json
- Events associated with node status changes must be extended with `NodeInfo` which contains a node_info attribute

Example
```
@dataclass
class SuperImportantNodeEvent(InfoLevel, File, NodeInfo):
node_name: str
run_result: RunResult
report_node_data: ParsedModelNode # may vary
code: str = "Q036"
def message(self) -> str:
return f"{self.node_name} had overly verbose result of {run_result}"
@classmethod
def asdict(cls, data: list) -> dict:
return dict((k, str(v)) for k, v in data)
```

All values other than `code` and `report_node_data` will be included in the `data` node of the json log output.
All values other than `code` and `node_info` will be included in the `data` node of the json log output.

Once your event has been added, add a dummy call to your new event at the bottom of `types.py` and also add your new Event to the list `sample_values` in `test/unit/test_events.py'.

Expand Down
106 changes: 28 additions & 78 deletions core/dbt/events/base_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from abc import ABCMeta, abstractmethod, abstractproperty
from abc import ABCMeta, abstractproperty, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from dbt.events.serialization import dbtClassEventMixin
import os
import threading
from typing import Any, Optional
from typing import Any, Dict

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# These base types define the _required structure_ for the concrete event #
Expand All @@ -16,19 +16,6 @@ class Cache():
pass


@dataclass
class Node():
node_path: str
node_name: str
unique_id: str
resource_type: str
materialized: str
node_status: str
node_started_at: datetime
node_finished_at: Optional[datetime]
type: str = 'node_status'


@dataclass
class ShowException():
# N.B.:
Expand All @@ -42,15 +29,9 @@ def __post_init__(self):


# TODO add exhaustiveness checking for subclasses
# can't use ABCs with @dataclass because of https://github.com/python/mypy/issues/5374
# top-level superclass for all events
class Event(metaclass=ABCMeta):
# fields that should be on all events with their default implementations
log_version: int = 1
ts: Optional[datetime] = None # use getter for non-optional
ts_rfc3339: Optional[str] = None # use getter for non-optional
pid: Optional[int] = None # use getter for non-optional
node_info: Optional[Node]
# Do not define fields with defaults here

# four digit string code that uniquely identifies this type of event
# uniqueness and valid characters are enforced by tests
Expand All @@ -59,6 +40,12 @@ class Event(metaclass=ABCMeta):
def code() -> str:
raise Exception("code() not implemented for event")

# The 'to_dict' method is added by mashumaro via the dbtClassEventMixin.
# It should be in all subclasses that are to record actual events.
@abstractmethod
def to_dict(self):
raise Exception('to_dict not implemented for Event')

# do not define this yourself. inherit it from one of the above level types.
@abstractmethod
def level_tag(self) -> str:
Expand All @@ -70,25 +57,9 @@ def level_tag(self) -> str:
def message(self) -> str:
raise Exception("msg not implemented for Event")

# exactly one time stamp per concrete event
def get_ts(self) -> datetime:
if not self.ts:
self.ts = datetime.utcnow()
self.ts_rfc3339 = self.ts.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
return self.ts

# preformatted time stamp
def get_ts_rfc3339(self) -> str:
if not self.ts_rfc3339:
# get_ts() creates the formatted string too so all time logic is centralized
self.get_ts()
return self.ts_rfc3339 # type: ignore

# exactly one pid per concrete event
def get_pid(self) -> int:
if not self.pid:
self.pid = os.getpid()
return self.pid
return os.getpid()

# in theory threads can change so we don't cache them.
def get_thread_name(self) -> str:
Expand All @@ -99,66 +70,38 @@ def get_invocation_id(cls) -> str:
from dbt.events.functions import get_invocation_id
return get_invocation_id()

# default dict factory for all events. can override on concrete classes.
@classmethod
def asdict(cls, data: list) -> dict:
d = dict()
for k, v in data:
# stringify all exceptions
if isinstance(v, Exception) or isinstance(v, BaseException):
d[k] = str(v)
# skip all binary data
elif isinstance(v, bytes):
continue
else:
d[k] = v
return d


# in preparation for #3977
class TestLevel(Event):
@dataclass # type: ignore
class TestLevel(dbtClassEventMixin, Event):
def level_tag(self) -> str:
return "test"


class DebugLevel(Event):
@dataclass # type: ignore
class DebugLevel(dbtClassEventMixin, Event):
def level_tag(self) -> str:
return "debug"


class InfoLevel(Event):
@dataclass # type: ignore
class InfoLevel(dbtClassEventMixin, Event):
def level_tag(self) -> str:
return "info"


class WarnLevel(Event):
@dataclass # type: ignore
class WarnLevel(dbtClassEventMixin, Event):
def level_tag(self) -> str:
return "warn"


class ErrorLevel(Event):
@dataclass # type: ignore
class ErrorLevel(dbtClassEventMixin, Event):
def level_tag(self) -> str:
return "error"


@dataclass # type: ignore
class NodeInfo(Event, metaclass=ABCMeta):
report_node_data: Any # Union[ParsedModelNode, ...] TODO: resolve circular imports

def get_node_info(self):
node_info = Node(
node_path=self.report_node_data.path,
node_name=self.report_node_data.name,
unique_id=self.report_node_data.unique_id,
resource_type=self.report_node_data.resource_type.value,
materialized=self.report_node_data.config.get('materialized'),
node_status=str(self.report_node_data._event_status.get('node_status')),
node_started_at=self.report_node_data._event_status.get("started_at"),
node_finished_at=self.report_node_data._event_status.get("finished_at")
)
return node_info


# prevents an event from going to the file
class NoFile():
pass
Expand All @@ -167,3 +110,10 @@ class NoFile():
# prevents an event from going to stdout
class NoStdOut():
pass


# This class represents the node_info which is generated
# by the NodeInfoMixin class in dbt.contracts.graph.parsed
@dataclass
class NodeInfo():
node_info: Dict[str, Any]
Loading

0 comments on commit dec3c1c

Please sign in to comment.