Skip to content

Commit

Permalink
[#4504] Use mashumaro for logging serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Dec 17, 2021
1 parent 9f6ed3c commit 8a44af0
Show file tree
Hide file tree
Showing 27 changed files with 639 additions and 717 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- Fix redefined status param of SQLQueryStatus to typecheck the string which passes on `._message` value of `AdapterResponse` or the `str` value sent by adapter plugin. ([#4463](https://github.com/dbt-labs/dbt-core/pull/4463#issuecomment-990174166))
- Fix `DepsStartPackageInstall` event to use package name instead of version number. ([#4482](https://github.com/dbt-labs/dbt-core/pull/4482))
- Reimplement log message to use adapter name instead of the object method. ([#4501](https://github.com/dbt-labs/dbt-core/pull/4501))
- Issue better error message for incompatible schemas ([#4470](https://github.com/dbt-labs/dbt-core/pull/4442), [#4497](https://github.com/dbt-labs/dbt-core/pull/4497))
- Some structured logging serialization cleanup ([#4504](https://github.com/dbt-labs/dbt-core/issues/4504), [#4505](https://github.com/dbt-labs/dbt-core/pull/4505))

### Docs
- Fix missing data on exposures in docs ([#4467](https://github.com/dbt-labs/dbt-core/issues/4467))
Expand Down
5 changes: 2 additions & 3 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,19 @@ def add_link(self, referenced, dependent):
:raises InternalError: If either entry does not exist.
"""
ref_key = _make_key(referenced)
dep_key = _make_key(dependent)
if (ref_key.database, ref_key.schema) not in self:
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
fire_event(UncachedRelation(dep_key=dependent, ref_key=ref_key))
fire_event(UncachedRelation(dep_key=dep_key, ref_key=ref_key))
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
referenced = referenced.replace(
type=referenced.External
)
self.add(referenced)

dep_key = _make_key(dependent)
if dep_key not in self.relations:
# Insert a dummy "external" relation.
dependent = dependent.replace(
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def load_plugin(self, name: str) -> Type[Credentials]:
# if we failed to import the target module in particular, inform
# the user about it via a runtime error
if exc.name == 'dbt.adapters.' + name:
fire_event(AdapterImportError(exc=exc))
fire_event(AdapterImportError(exc=str(exc)))
raise RuntimeException(f'Could not find adapter type {name}!')
# otherwise, the error had to have come from some underlying
# library. Log the stack trace.
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/context/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,9 @@ def log(msg: str, info: bool = False) -> str:
{% endmacro %}"
"""
if info:
fire_event(MacroEventInfo(msg))
fire_event(MacroEventInfo(msg=msg))
else:
fire_event(MacroEventDebug(msg))
fire_event(MacroEventDebug(msg=msg))
return ''

@contextproperty
Expand Down
27 changes: 27 additions & 0 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ def write_node(self, target_path: str, subdirectory: str, payload: str):
write_file(full_path, payload)
return full_path

@property
def node_info(self):
node_info = {
"node_path": self.path,
"node_name": self.name,
"unique_id": self.unique_id,
"resource_type": self.resource_type.value,
"materialized": self.config.get('materialized'),
"node_status": str(self._event_status.get('node_status')),
"node_started_at": self._event_status.get("started_at"),
"node_finished_at": self._event_status.get("finished_at")
}
return node_info


T = TypeVar('T', bound='ParsedNode')

Expand Down Expand Up @@ -738,6 +752,19 @@ def has_freshness(self):
def search_name(self):
return f'{self.source_name}.{self.name}'

@property
def node_info(self):
node_info = {
"node_path": self.path,
"node_name": self.name,
"unique_id": self.unique_id,
"resource_type": self.resource_type.value,
"node_status": str(self._event_status.get('node_status')),
"node_started_at": self._event_status.get("started_at"),
"node_finished_at": self._event_status.get("finished_at")
}
return node_info


@dataclass
class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
Expand Down
6 changes: 4 additions & 2 deletions core/dbt/contracts/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ def __init__(self, path: Path):
manifest_path = self.path / 'manifest.json'
if manifest_path.exists() and manifest_path.is_file():
try:
self.manifest = WritableManifest.read(str(manifest_path))
# we want to bail with an error if schema versions don't match
self.manifest = WritableManifest.read_and_check_versions(str(manifest_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(manifest_path))
raise

results_path = self.path / 'run_results.json'
if results_path.exists() and results_path.is_file():
try:
self.results = RunResultsArtifact.read(str(results_path))
# we want to bail with an error if schema versions don't match
self.results = RunResultsArtifact.read_and_check_versions(str(results_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(results_path))
raise
41 changes: 41 additions & 0 deletions core/dbt/contracts/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dbt.exceptions import (
InternalException,
RuntimeException,
IncompatibleSchemaException
)
from dbt.version import __version__
from dbt.events.functions import get_invocation_id
Expand Down Expand Up @@ -158,6 +159,8 @@ def get_metadata_env() -> Dict[str, str]:
}


# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata,
# FreshnessMetadata, and CatalogMetadata classes
@dataclasses.dataclass
class BaseArtifactMetadata(dbtClassMixin):
dbt_schema_version: str
Expand All @@ -177,6 +180,17 @@ def __post_serialize__(self, dct):
return dct


# This is used as a class decorator to set the schema_version in the
# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.)
# Name attributes of SchemaVersion in classes with the 'schema_version' decorator:
# manifest
# run-results
# run-operation-result
# sources
# catalog
# remote-compile-result
# remote-execution-result
# remote-run-result
def schema_version(name: str, version: int):
def inner(cls: Type[VersionedSchema]):
cls.dbt_schema_version = SchemaVersion(
Expand All @@ -187,6 +201,7 @@ def inner(cls: Type[VersionedSchema]):
return inner


# This is used in the ArtifactMixin and RemoteResult classes
@dataclasses.dataclass
class VersionedSchema(dbtClassMixin):
dbt_schema_version: ClassVar[SchemaVersion]
Expand All @@ -198,13 +213,39 @@ def json_schema(cls, embeddable: bool = False) -> Dict[str, Any]:
result['$id'] = str(cls.dbt_schema_version)
return result

@classmethod
def read_and_check_versions(cls, path: str):
try:
data = read_json(path)
except (EnvironmentError, ValueError) as exc:
raise RuntimeException(
f'Could not read {cls.__name__} at "{path}" as JSON: {exc}'
) from exc

# Check metadata version. There is a class variable 'dbt_schema_version', but
# that doesn't show up in artifacts, where it only exists in the 'metadata'
# dictionary.
if hasattr(cls, 'dbt_schema_version'):
if 'metadata' in data and 'dbt_schema_version' in data['metadata']:
previous_schema_version = data['metadata']['dbt_schema_version']
# cls.dbt_schema_version is a SchemaVersion object
if str(cls.dbt_schema_version) != previous_schema_version:
raise IncompatibleSchemaException(
expected=str(cls.dbt_schema_version),
found=previous_schema_version
)

return cls.from_dict(data) # type: ignore


T = TypeVar('T', bound='ArtifactMixin')


# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to
# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue:
# https://github.com/python/mypy/issues/7520
# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact,
# and CatalogArtifact
@dataclasses.dataclass(init=False)
class ArtifactMixin(VersionedSchema, Writable, Readable):
metadata: BaseArtifactMetadata
Expand Down
22 changes: 2 additions & 20 deletions core/dbt/events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,10 @@ class PartialParsingDeletedExposure(DebugLevel, Cli, File):

## Optional (based on your event)

- Events associated with node status changes must have `report_node_data` passed in and be extended with `NodeInfo`
- define `asdict` if your data is not serializable to json
- Events associated with node status changes must be extended with `NodeInfo` which contains a node_info attribute

Example
```
@dataclass
class SuperImportantNodeEvent(InfoLevel, File, NodeInfo):
node_name: str
run_result: RunResult
report_node_data: ParsedModelNode # may vary
code: str = "Q036"
def message(self) -> str:
return f"{self.node_name} had overly verbose result of {run_result}"
@classmethod
def asdict(cls, data: list) -> dict:
return dict((k, str(v)) for k, v in data)
```

All values other than `code` and `report_node_data` will be included in the `data` node of the json log output.
All values other than `code` and `node_info` will be included in the `data` node of the json log output.

Once your event has been added, add a dummy call to your new event at the bottom of `types.py` and also add your new Event to the list `sample_values` in `test/unit/test_events.py'.

Expand Down
67 changes: 12 additions & 55 deletions core/dbt/events/base_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from abc import ABCMeta, abstractmethod, abstractproperty
from dataclasses import dataclass
from abc import ABCMeta
from dataclasses import dataclass, field
from dbt.events.serialization import dbtClassEventMixin
from datetime import datetime
import os
import threading
from typing import Any, Optional
from typing import Any, Optional, Dict

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# These base types define the _required structure_ for the concrete event #
Expand All @@ -16,19 +17,6 @@ class Cache():
pass


@dataclass
class Node():
node_path: str
node_name: str
unique_id: str
resource_type: str
materialized: str
node_status: str
node_started_at: datetime
node_finished_at: Optional[datetime]
type: str = 'node_status'


@dataclass
class ShowException():
# N.B.:
Expand All @@ -42,31 +30,28 @@ def __post_init__(self):


# TODO add exhaustiveness checking for subclasses
# can't use ABCs with @dataclass because of https://github.com/python/mypy/issues/5374
# top-level superclass for all events
class Event(metaclass=ABCMeta):
@dataclass
class Event(dbtClassEventMixin, metaclass=ABCMeta):
# fields that should be on all events with their default implementations
log_version: int = 1
ts: Optional[datetime] = None # use getter for non-optional
ts_rfc3339: Optional[str] = None # use getter for non-optional
pid: Optional[int] = None # use getter for non-optional
node_info: Optional[Node]

# four digit string code that uniquely identifies this type of event
# uniqueness and valid characters are enforced by tests
@abstractproperty
@property
@staticmethod
def code() -> str:
raise Exception("code() not implemented for event")

# do not define this yourself. inherit it from one of the above level types.
@abstractmethod
def level_tag(self) -> str:
raise Exception("level_tag not implemented for Event")

# Solely the human readable message. Timestamps and formatting will be added by the logger.
# Must override yourself
@abstractmethod
def message(self) -> str:
raise Exception("msg not implemented for Event")

Expand Down Expand Up @@ -99,21 +84,6 @@ def get_invocation_id(cls) -> str:
from dbt.events.functions import get_invocation_id
return get_invocation_id()

# default dict factory for all events. can override on concrete classes.
@classmethod
def asdict(cls, data: list) -> dict:
d = dict()
for k, v in data:
# stringify all exceptions
if isinstance(v, Exception) or isinstance(v, BaseException):
d[k] = str(v)
# skip all binary data
elif isinstance(v, bytes):
continue
else:
d[k] = v
return d


# in preparation for #3977
class TestLevel(Event):
Expand Down Expand Up @@ -141,24 +111,6 @@ def level_tag(self) -> str:
return "error"


@dataclass # type: ignore
class NodeInfo(Event, metaclass=ABCMeta):
report_node_data: Any # Union[ParsedModelNode, ...] TODO: resolve circular imports

def get_node_info(self):
node_info = Node(
node_path=self.report_node_data.path,
node_name=self.report_node_data.name,
unique_id=self.report_node_data.unique_id,
resource_type=self.report_node_data.resource_type.value,
materialized=self.report_node_data.config.get('materialized'),
node_status=str(self.report_node_data._event_status.get('node_status')),
node_started_at=self.report_node_data._event_status.get("started_at"),
node_finished_at=self.report_node_data._event_status.get("finished_at")
)
return node_info


# prevents an event from going to the file
class NoFile():
pass
Expand All @@ -167,3 +119,8 @@ class NoFile():
# prevents an event from going to stdout
class NoStdOut():
pass


@dataclass
class NodeInfo():
node_info: Dict[str, Any] = field(default_factory=dict)
Loading

0 comments on commit 8a44af0

Please sign in to comment.