From 8379edce9987371d59cde5fa0f31d965fa902fe3 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Thu, 17 Sep 2020 11:07:40 -0600 Subject: [PATCH 1/3] Add a common metadata field to JSON artifacts Adjusted how schema versions are set RPC calls no longer have the schema version in their replies --- CHANGELOG.md | 3 + core/dbt/contracts/graph/manifest.py | 29 +- core/dbt/contracts/results.py | 302 ++++++++++++------ core/dbt/contracts/rpc.py | 150 +++++++-- core/dbt/contracts/util.py | 88 ++--- core/dbt/parser/manifest.py | 2 - core/dbt/rpc/node_runners.py | 12 +- core/dbt/task/freshness.py | 11 +- core/dbt/task/generate.py | 16 +- core/dbt/task/rpc/base.py | 6 +- core/dbt/task/rpc/project_commands.py | 8 +- core/dbt/task/run_operation.py | 9 +- core/dbt/task/runnable.py | 9 +- .../test_docs_generate.py | 75 ++--- .../042_sources_test/test_sources.py | 23 +- test/unit/test_compiler.py | 6 - test/unit/test_docs_generate.py | 1 - test/unit/test_graph_selector_methods.py | 1 - test/unit/test_manifest.py | 72 +++-- test/unit/test_parser.py | 2 +- 20 files changed, 510 insertions(+), 315 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 59a2ef92d4a..559244ad62b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## dbt 0.19.0 (Release TBD) +### Breaking changes +- The format for sources.json, run-results.json, manifest.json, and catalog.json has changed to include a common metadata field ([#2761](https://github.com/fishtown-analytics/dbt/issues/2761), [#2778](https://github.com/fishtown-analytics/dbt/pull/2778)) + ### Features - dbt will compare configurations using the un-rendered form of the config block in dbt_project.yml ([#2713](https://github.com/fishtown-analytics/dbt/issues/2713), [#2735](https://github.com/fishtown-analytics/dbt/pull/2735)) - Added state and defer arguments to the RPC client, matching the CLI ([#2678](https://github.com/fishtown-analytics/dbt/issues/2678), [#2736](https://github.com/fishtown-analytics/dbt/pull/2736)) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index b043cfc95fb..544a29312ce 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -1,7 +1,6 @@ import abc import enum from dataclasses import dataclass, field -from datetime import datetime from itertools import chain, islice from multiprocessing.synchronize import Lock from typing import ( @@ -11,8 +10,6 @@ from typing_extensions import Protocol from uuid import UUID -from hologram import JsonSchemaMixin - from dbt.contracts.graph.compiled import ( CompileResultNode, ManifestNode, NonSourceCompiledNode, GraphMemberNode ) @@ -22,7 +19,7 @@ ) from dbt.contracts.files import SourceFile from dbt.contracts.util import ( - VersionedSchema, Replaceable, MacroKey, SourceKey, SchemaVersion + BaseArtifactMetadata, MacroKey, SourceKey, ArtifactMixin, schema_version ) from dbt.exceptions import ( raise_duplicate_resource_name, raise_compiler_error, warn_or_error, @@ -172,8 +169,11 @@ def _search_packages( @dataclass -class ManifestMetadata(JsonSchemaMixin, Replaceable): +class ManifestMetadata(BaseArtifactMetadata): """Metadata for the manifest.""" + dbt_schema_version: str = field( + default_factory=lambda: str(WritableManifest.dbt_schema_version) + ) project_id: Optional[str] = field( default=None, metadata={ @@ -209,6 +209,12 @@ def __post_init__(self): not tracking.active_user.do_not_track ) + @classmethod + def default(cls): + return cls( + dbt_schema_version=str(WritableManifest.dbt_schema_version), + ) + def _sort_values(dct): """Given a dictionary, sort each value. This makes output deterministic, @@ -430,7 +436,6 @@ class Manifest: macros: MutableMapping[str, ParsedMacro] docs: MutableMapping[str, ParsedDocumentation] reports: MutableMapping[str, ParsedReport] - generated_at: datetime disabled: List[CompileResultNode] files: MutableMapping[str, SourceFile] metadata: ManifestMetadata = field(default_factory=ManifestMetadata) @@ -456,7 +461,6 @@ def from_macros( macros=macros, docs={}, reports={}, - generated_at=datetime.utcnow(), disabled=[], files=files, ) @@ -726,7 +730,6 @@ def deepcopy(self): macros={k: _deepcopy(v) for k, v in self.macros.items()}, docs={k: _deepcopy(v) for k, v in self.docs.items()}, reports={k: _deepcopy(v) for k, v in self.reports.items()}, - generated_at=self.generated_at, disabled=[_deepcopy(n) for n in self.disabled], metadata=self.metadata, files={k: _deepcopy(v) for k, v in self.files.items()}, @@ -746,7 +749,6 @@ def writable_manifest(self): macros=self.macros, docs=self.docs, reports=self.reports, - generated_at=self.generated_at, metadata=self.metadata, disabled=self.disabled, child_map=forward_edges, @@ -911,7 +913,6 @@ def __reduce_ex__(self, protocol): self.macros, self.docs, self.reports, - self.generated_at, self.disabled, self.files, self.metadata, @@ -924,9 +925,8 @@ def __reduce_ex__(self, protocol): @dataclass -class WritableManifest(VersionedSchema): - dbt_schema_version = SchemaVersion('manifest', 1) - +@schema_version('manifest', 1) +class WritableManifest(ArtifactMixin): nodes: Mapping[UniqueID, ManifestNode] = field( metadata=dict(description=( 'The nodes defined in the dbt project and its dependencies' @@ -955,9 +955,6 @@ class WritableManifest(VersionedSchema): disabled: Optional[List[CompileResultNode]] = field(metadata=dict( description='A list of the disabled nodes in the target' )) - generated_at: datetime = field(metadata=dict( - description='The time at which the manifest was generated', - )) parent_map: Optional[NodeEdgeMap] = field(metadata=dict( description='A mapping fromĀ child nodes to their dependencies', )) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index da95ea6577c..fd67532653d 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -1,10 +1,15 @@ from dbt.contracts.graph.manifest import CompileResultNode from dbt.contracts.graph.unparsed import ( - Time, FreshnessStatus, FreshnessThreshold + FreshnessStatus, FreshnessThreshold ) from dbt.contracts.graph.parsed import ParsedSourceDefinition from dbt.contracts.util import ( - Writable, VersionedSchema, Replaceable, SchemaVersion + BaseArtifactMetadata, + ArtifactMixin, + Writable, + VersionedSchema, + Replaceable, + schema_version, ) from dbt.exceptions import InternalException from dbt.logger import ( @@ -20,7 +25,7 @@ from dataclasses import dataclass, field from datetime import datetime -from typing import Union, Dict, List, Optional, Any, NamedTuple +from typing import Union, Dict, List, Optional, Any, NamedTuple, Sequence @dataclass @@ -51,7 +56,7 @@ def __exit__(self, exc_type, exc_value, traceback): @dataclass -class PartialResult(JsonSchemaMixin, Writable): +class BaseResult(JsonSchemaMixin): node: CompileResultNode error: Optional[str] = None status: Union[None, str, int, bool] = None @@ -61,6 +66,11 @@ class PartialResult(JsonSchemaMixin, Writable): fail: Optional[bool] = None warn: Optional[bool] = None + +@dataclass +class PartialResult(BaseResult, Writable): + pass + # if the result got to the point where it could be skipped/failed, we would # be returning a real result, not a partial. @property @@ -69,7 +79,7 @@ def skipped(self): @dataclass -class WritableRunModelResult(PartialResult): +class WritableRunModelResult(BaseResult, Writable): skip: bool = False @property @@ -88,10 +98,8 @@ def to_dict(self, *args, **kwargs): @dataclass -class ExecutionResult(VersionedSchema): - dbt_schema_version = SchemaVersion('run-results', 1) - results: List[Union[WritableRunModelResult, PartialResult]] - generated_at: datetime +class ExecutionResult(JsonSchemaMixin): + results: Sequence[BaseResult] elapsed_time: float def __len__(self): @@ -104,25 +112,90 @@ def __getitem__(self, idx): return self.results[idx] +RunResult = Union[PartialResult, WritableRunModelResult] + + +@dataclass +class RunResultsMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field( + default_factory=lambda: str(RunResultsArtifact.dbt_schema_version) + ) + + +@dataclass +@schema_version('run-results', 1) +class RunResultsArtifact( + ExecutionResult, + ArtifactMixin, +): + results: Sequence[RunResult] + + @classmethod + def from_node_results( + cls, + results: Sequence[RunResult], + elapsed_time: float, + generated_at: datetime, + ): + meta = RunResultsMetadata( + dbt_schema_version=str(cls.dbt_schema_version), + generated_at=generated_at, + ) + return cls( + metadata=meta, + results=results, + elapsed_time=elapsed_time, + ) + + @dataclass class RunOperationResult(ExecutionResult): success: bool -# due to issues with typing.Union collapsing subclasses, this can't subclass -# PartialResult @dataclass -class SourceFreshnessResult(JsonSchemaMixin, Writable): - node: ParsedSourceDefinition +class RunOperationResultMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field(default_factory=lambda: str( + RunOperationResultsArtifact.dbt_schema_version + )) + + +@dataclass +@schema_version('run-operation-result', 1) +class RunOperationResultsArtifact(RunOperationResult, ArtifactMixin): + + @classmethod + def from_success( + cls, + success: bool, + elapsed_time: float, + generated_at: datetime, + ): + meta = RunResultsMetadata( + dbt_schema_version=str(cls.dbt_schema_version), + generated_at=generated_at, + ) + return cls( + metadata=meta, + results=[], + elapsed_time=elapsed_time, + success=success, + ) + + +@dataclass +class SourceFreshnessResultMixin(JsonSchemaMixin): max_loaded_at: datetime snapshotted_at: datetime age: float - status: FreshnessStatus - error: Optional[str] = None - execution_time: Union[str, int] = 0 - thread_id: Optional[str] = None - timing: List[TimingInfo] = field(default_factory=list) - fail: Optional[bool] = None + + +# due to issues with typing.Union collapsing subclasses, this can't subclass +# PartialResult +@dataclass +class SourceFreshnessResult(BaseResult, Writable, SourceFreshnessResultMixin): + node: ParsedSourceDefinition + status: FreshnessStatus = FreshnessStatus.Pass def __post_init__(self): self.fail = self.status == 'error' @@ -136,92 +209,24 @@ def skipped(self): return False -@dataclass -class FreshnessMetadata(JsonSchemaMixin): - generated_at: datetime - elapsed_time: float - - -@dataclass -class FreshnessExecutionResult(VersionedSchema, FreshnessMetadata): - dbt_schema_version = SchemaVersion('sources', 1) - results: List[Union[PartialResult, SourceFreshnessResult]] - - def write(self, path, omit_none=True): - """Create a new object with the desired output schema and write it.""" - meta = FreshnessMetadata( - generated_at=self.generated_at, - elapsed_time=self.elapsed_time, - ) - sources = {} - for result in self.results: - result_value: Union[ - SourceFreshnessRuntimeError, SourceFreshnessOutput - ] - unique_id = result.node.unique_id - if result.error is not None: - result_value = SourceFreshnessRuntimeError( - error=result.error, - state=FreshnessErrorEnum.runtime_error, - ) - else: - # we know that this must be a SourceFreshnessResult - if not isinstance(result, SourceFreshnessResult): - raise InternalException( - 'Got {} instead of a SourceFreshnessResult for a ' - 'non-error result in freshness execution!' - .format(type(result)) - ) - # if we're here, we must have a non-None freshness threshold - criteria = result.node.freshness - if criteria is None: - raise InternalException( - 'Somehow evaluated a freshness result for a source ' - 'that has no freshness criteria!' - ) - result_value = SourceFreshnessOutput( - max_loaded_at=result.max_loaded_at, - snapshotted_at=result.snapshotted_at, - max_loaded_at_time_ago_in_s=result.age, - state=result.status, - criteria=criteria, - ) - sources[unique_id] = result_value - output = FreshnessRunOutput(meta=meta, sources=sources) - output.write(path, omit_none=omit_none) - - def __len__(self): - return len(self.results) - - def __iter__(self): - return iter(self.results) - - def __getitem__(self, idx): - return self.results[idx] - - def _copykeys(src, keys, **updates): return {k: getattr(src, k) for k in keys} -@dataclass -class FreshnessCriteria(JsonSchemaMixin): - warn_after: Time - error_after: Time - - class FreshnessErrorEnum(StrEnum): runtime_error = 'runtime error' @dataclass class SourceFreshnessRuntimeError(JsonSchemaMixin): + unique_id: str error: str state: FreshnessErrorEnum @dataclass class SourceFreshnessOutput(JsonSchemaMixin): + unique_id: str max_loaded_at: datetime snapshotted_at: datetime max_loaded_at_time_ago_in_s: float @@ -229,14 +234,88 @@ class SourceFreshnessOutput(JsonSchemaMixin): criteria: FreshnessThreshold -SourceFreshnessRunResult = Union[SourceFreshnessOutput, - SourceFreshnessRuntimeError] +FreshnessNodeResult = Union[PartialResult, SourceFreshnessResult] +FreshnessNodeOutput = Union[SourceFreshnessRuntimeError, SourceFreshnessOutput] + + +def process_freshness_result( + result: FreshnessNodeResult +) -> FreshnessNodeOutput: + unique_id = result.node.unique_id + if result.error is not None: + return SourceFreshnessRuntimeError( + unique_id=unique_id, + error=result.error, + state=FreshnessErrorEnum.runtime_error, + ) + + # we know that this must be a SourceFreshnessResult + if not isinstance(result, SourceFreshnessResult): + raise InternalException( + 'Got {} instead of a SourceFreshnessResult for a ' + 'non-error result in freshness execution!' + .format(type(result)) + ) + # if we're here, we must have a non-None freshness threshold + criteria = result.node.freshness + if criteria is None: + raise InternalException( + 'Somehow evaluated a freshness result for a source ' + 'that has no freshness criteria!' + ) + return SourceFreshnessOutput( + unique_id=unique_id, + max_loaded_at=result.max_loaded_at, + snapshotted_at=result.snapshotted_at, + max_loaded_at_time_ago_in_s=result.age, + state=result.status, + criteria=criteria, + ) + + +@dataclass +class FreshnessMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field( + default_factory=lambda: str( + FreshnessExecutionResultArtifact.dbt_schema_version + ) + ) @dataclass -class FreshnessRunOutput(JsonSchemaMixin, Writable): - meta: FreshnessMetadata - sources: Dict[str, SourceFreshnessRunResult] +class FreshnessResult(ExecutionResult): + metadata: FreshnessMetadata + results: Sequence[FreshnessNodeResult] + + @classmethod + def from_node_results( + cls, + results: List[FreshnessNodeResult], + elapsed_time: float, + generated_at: datetime, + ): + meta = FreshnessMetadata(generated_at=generated_at) + return cls(metadata=meta, results=results, elapsed_time=elapsed_time) + + +@dataclass +@schema_version('sources', 1) +class FreshnessExecutionResultArtifact( + ArtifactMixin, + VersionedSchema, +): + metadata: FreshnessMetadata + results: Sequence[FreshnessNodeOutput] + elapsed_time: float + + @classmethod + def from_result(cls, base: FreshnessResult): + processed = [process_freshness_result(r) for r in base.results] + return cls( + metadata=base.metadata, + results=processed, + elapsed_time=base.elapsed_time, + ) Primitive = Union[bool, str, float, None] @@ -297,10 +376,39 @@ def key(self) -> CatalogKey: @dataclass -class CatalogResults(VersionedSchema): - dbt_schema_version = SchemaVersion('catalog', 1) +class CatalogMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field( + default_factory=lambda: str(CatalogArtifact.dbt_schema_version) + ) + + +@dataclass +class CatalogResults(JsonSchemaMixin): nodes: Dict[str, CatalogTable] sources: Dict[str, CatalogTable] - generated_at: datetime errors: Optional[List[str]] _compile_results: Optional[Any] = None + + +@dataclass +@schema_version('catalog', 1) +class CatalogArtifact(CatalogResults, ArtifactMixin): + metadata: CatalogMetadata + + @classmethod + def from_results( + cls, + generated_at: datetime, + nodes: Dict[str, CatalogTable], + sources: Dict[str, CatalogTable], + compile_results: Optional[Any], + errors: Optional[List[str]] + ) -> 'CatalogArtifact': + meta = CatalogMetadata(generated_at=generated_at) + return cls( + metadata=meta, + nodes=nodes, + sources=sources, + errors=errors, + _compile_results=compile_results, + ) diff --git a/core/dbt/contracts/rpc.py b/core/dbt/contracts/rpc.py index 7ef90d6299d..8241695de2f 100644 --- a/core/dbt/contracts/rpc.py +++ b/core/dbt/contracts/rpc.py @@ -3,7 +3,7 @@ import uuid from dataclasses import dataclass, field from datetime import datetime, timedelta -from typing import Optional, Union, List, Any, Dict, Type +from typing import Optional, Union, List, Any, Dict, Type, Sequence from hologram import JsonSchemaMixin from hologram.helpers import StrEnum @@ -12,10 +12,15 @@ from dbt.contracts.graph.manifest import WritableManifest from dbt.contracts.results import ( TimingInfo, + CatalogArtifact, CatalogResults, ExecutionResult, + RunOperationResult, + RunOperationResultsArtifact, + RunResult, + RunResultsArtifact, ) -from dbt.contracts.util import SchemaVersion, VersionedSchema +from dbt.contracts.util import VersionedSchema, schema_version from dbt.exceptions import InternalException from dbt.logger import LogMessage from dbt.utils import restrict_to @@ -176,31 +181,71 @@ class RemoteResult(VersionedSchema): @dataclass +@schema_version('remote-deps-result', 1) class RemoteDepsResult(RemoteResult): - dbt_schema_version = SchemaVersion('remote-deps-result', 1) + generated_at: datetime = field(default_factory=datetime.utcnow) @dataclass +@schema_version('remote-catalog-result', 1) class RemoteCatalogResults(CatalogResults, RemoteResult): - dbt_schema_version = SchemaVersion('remote-catalog-result', 1) + generated_at: datetime = field(default_factory=datetime.utcnow) + + def write(self, path: str): + artifact = CatalogArtifact.from_results( + generated_at=self.generated_at, + nodes=self.nodes, + sources=self.sources, + compile_results=self._compile_results, + errors=self.errors, + ) + artifact.write(path) @dataclass -class RemoteCompileResult(RemoteResult): - dbt_schema_version = SchemaVersion('remote-compile-result', 1) +class RemoteCompileResultMixin(RemoteResult): raw_sql: str compiled_sql: str node: CompileResultNode timing: List[TimingInfo] + +@dataclass +@schema_version('remote-compile-result', 1) +class RemoteCompileResult(RemoteCompileResultMixin): + generated_at: datetime = field(default_factory=datetime.utcnow) + @property def error(self): return None @dataclass +@schema_version('remote-execution-result', 1) class RemoteExecutionResult(ExecutionResult, RemoteResult): - dbt_schema_version = SchemaVersion('remote-execution-result', 1) + results: Sequence[RunResult] + generated_at: datetime = field(default_factory=datetime.utcnow) + + def write(self, path: str): + writable = RunResultsArtifact.from_node_results( + generated_at=self.generated_at, + results=self.results, + elapsed_time=self.elapsed_time, + ) + writable.write(path) + + @classmethod + def from_local_result( + cls, + base: RunResultsArtifact, + logs: List[LogMessage], + ) -> 'RemoteExecutionResult': + return cls( + generated_at=base.metadata.generated_at, + results=base.results, + elapsed_time=base.elapsed_time, + logs=logs, + ) @dataclass @@ -210,15 +255,38 @@ class ResultTable(JsonSchemaMixin): @dataclass -class RemoteRunOperationResult(ExecutionResult, RemoteResult): - dbt_schema_version = SchemaVersion('remote-run-operation-result', 1) - success: bool +@schema_version('remote-run-operation-result', 1) +class RemoteRunOperationResult(RunOperationResult, RemoteResult): + generated_at: datetime = field(default_factory=datetime.utcnow) + + @classmethod + def from_local_result( + cls, + base: RunOperationResultsArtifact, + logs: List[LogMessage], + ) -> 'RemoteRunOperationResult': + return cls( + generated_at=base.metadata.generated_at, + results=base.results, + elapsed_time=base.elapsed_time, + success=base.success, + logs=logs, + ) + + def write(self, path: str): + writable = RunOperationResultsArtifact.from_success( + success=self.success, + generated_at=self.generated_at, + elapsed_time=self.elapsed_time, + ) + writable.write(path) @dataclass -class RemoteRunResult(RemoteCompileResult): - dbt_schema_version = SchemaVersion('remote-run-result', 1) +@schema_version('remote-run-result', 1) +class RemoteRunResult(RemoteCompileResultMixin): table: ResultTable + generated_at: datetime = field(default_factory=datetime.utcnow) RPCResult = Union[ @@ -240,8 +308,8 @@ class GCResultState(StrEnum): @dataclass +@schema_version('remote-gc-result', 1) class GCResult(RemoteResult): - dbt_schema_version = SchemaVersion('remote-gc-result', 1) logs: List[LogMessage] = field(default_factory=list) deleted: List[TaskID] = field(default_factory=list) missing: List[TaskID] = field(default_factory=list) @@ -334,8 +402,8 @@ class TaskRow(TaskTiming): @dataclass +@schema_version('remote-ps-result', 1) class PSResult(RemoteResult): - dbt_schema_version = SchemaVersion('remote-ps-result', 1) rows: List[TaskRow] @@ -347,15 +415,15 @@ class KillResultStatus(StrEnum): @dataclass +@schema_version('remote-kill-result', 1) class KillResult(RemoteResult): - dbt_schema_version = SchemaVersion('remote-kill-result', 1) state: KillResultStatus = KillResultStatus.Missing logs: List[LogMessage] = field(default_factory=list) @dataclass +@schema_version('remote-manifest-result', 1) class GetManifestResult(RemoteResult): - dbt_schema_version = SchemaVersion('remote-manifest-result', 1) manifest: Optional[WritableManifest] @@ -382,12 +450,13 @@ class PollResult(RemoteResult, TaskTiming): @dataclass -class PollRemoteEmptyCompleteResult(PollResult, RemoteDepsResult): - dbt_schema_version = SchemaVersion('poll-remote-deps-result', 1) +@schema_version('poll-remote-deps-result', 1) +class PollRemoteEmptyCompleteResult(PollResult, RemoteResult): state: TaskHandlerState = field( metadata=restrict_to(TaskHandlerState.Success, TaskHandlerState.Failed), ) + generated_at: datetime = field(default_factory=datetime.utcnow) @classmethod def from_result( @@ -404,21 +473,24 @@ def from_result( start=timing.start, end=timing.end, elapsed=timing.elapsed, + generated_at=base.generated_at ) @dataclass +@schema_version('poll-remote-killed-result', 1) class PollKilledResult(PollResult): - dbt_schema_version = SchemaVersion('poll-remote-killed-result', 1) state: TaskHandlerState = field( metadata=restrict_to(TaskHandlerState.Killed), ) @dataclass -class PollExecuteCompleteResult(RemoteExecutionResult, PollResult): - dbt_schema_version = SchemaVersion('poll-remote-execution-result', 1) - +@schema_version('poll-remote-execution-result', 1) +class PollExecuteCompleteResult( + RemoteExecutionResult, + PollResult, +): state: TaskHandlerState = field( metadata=restrict_to(TaskHandlerState.Success, TaskHandlerState.Failed), @@ -434,7 +506,6 @@ def from_result( ) -> 'PollExecuteCompleteResult': return cls( results=base.results, - generated_at=base.generated_at, elapsed_time=base.elapsed_time, logs=logs, tags=tags, @@ -442,12 +513,16 @@ def from_result( start=timing.start, end=timing.end, elapsed=timing.elapsed, + generated_at=base.generated_at, ) @dataclass -class PollCompileCompleteResult(RemoteCompileResult, PollResult): - dbt_schema_version = SchemaVersion('poll-remote-compile-result', 1) +@schema_version('poll-remote-compile-result', 1) +class PollCompileCompleteResult( + RemoteCompileResult, + PollResult, +): state: TaskHandlerState = field( metadata=restrict_to(TaskHandlerState.Success, TaskHandlerState.Failed), @@ -472,12 +547,16 @@ def from_result( start=timing.start, end=timing.end, elapsed=timing.elapsed, + generated_at=base.generated_at ) @dataclass -class PollRunCompleteResult(RemoteRunResult, PollResult): - dbt_schema_version = SchemaVersion('poll-remote-run-result', 1) +@schema_version('poll-remote-run-result', 1) +class PollRunCompleteResult( + RemoteRunResult, + PollResult, +): state: TaskHandlerState = field( metadata=restrict_to(TaskHandlerState.Success, TaskHandlerState.Failed), @@ -503,12 +582,16 @@ def from_result( start=timing.start, end=timing.end, elapsed=timing.elapsed, + generated_at=base.generated_at ) @dataclass -class PollRunOperationCompleteResult(RemoteRunOperationResult, PollResult): - dbt_schema_version = SchemaVersion('poll-remote-run-operation-result', 1) +@schema_version('poll-remote-run-operation-result', 1) +class PollRunOperationCompleteResult( + RemoteRunOperationResult, + PollResult, +): state: TaskHandlerState = field( metadata=restrict_to(TaskHandlerState.Success, TaskHandlerState.Failed), @@ -537,8 +620,8 @@ def from_result( @dataclass +@schema_version('poll-remote-catalog-result', 1) class PollCatalogCompleteResult(RemoteCatalogResults, PollResult): - dbt_schema_version = SchemaVersion('poll-remote-catalog-result', 1) state: TaskHandlerState = field( metadata=restrict_to(TaskHandlerState.Success, TaskHandlerState.Failed), @@ -568,13 +651,14 @@ def from_result( @dataclass +@schema_version('poll-remote-in-progress-result', 1) class PollInProgressResult(PollResult): - dbt_schema_version = SchemaVersion('poll-in-progress-result', 1) + pass @dataclass +@schema_version('poll-remote-get-manifest-result', 1) class PollGetManifestResult(GetManifestResult, PollResult): - dbt_schema_version = SchemaVersion('poll-remote-get-manifest-result', 1) state: TaskHandlerState = field( metadata=restrict_to(TaskHandlerState.Success, TaskHandlerState.Failed), @@ -609,8 +693,8 @@ class ManifestStatus(StrEnum): @dataclass +@schema_version('remote-status-result', 1) class LastParse(RemoteResult): - dbt_schema_version = SchemaVersion('status-result', 1) state: ManifestStatus = ManifestStatus.Init logs: List[LogMessage] = field(default_factory=list) error: Optional[Dict[str, Any]] = None diff --git a/core/dbt/contracts/util.py b/core/dbt/contracts/util.py index 801cec8cc59..827b971c344 100644 --- a/core/dbt/contracts/util.py +++ b/core/dbt/contracts/util.py @@ -1,8 +1,15 @@ import dataclasses -from typing import List, Tuple, ClassVar, Type, TypeVar, Dict, Any +from datetime import datetime +from typing import ( + List, Tuple, ClassVar, Type, TypeVar, Dict, Any +) from dbt.clients.system import write_json, read_json -from dbt.exceptions import RuntimeException, IncompatibleSchemaException +from dbt.exceptions import ( + IncompatibleSchemaException, + InternalException, + RuntimeException, +) from dbt.version import __version__ from hologram import JsonSchemaMixin @@ -97,9 +104,6 @@ def read(cls, path: str): return cls.from_dict(data) # type: ignore -T = TypeVar('T', bound='VersionedSchema') - - BASE_SCHEMAS_URL = 'https://schemas.getdbt.com/dbt/{name}/v{version}.json' @@ -115,51 +119,63 @@ def __str__(self) -> str: ) -DBT_VERSION_KEY = 'dbt_version' SCHEMA_VERSION_KEY = 'dbt_schema_version' @dataclasses.dataclass -class VersionedSchema(JsonSchemaMixin, Readable, Writable): +class BaseArtifactMetadata(JsonSchemaMixin): + dbt_schema_version: str + dbt_version: str = __version__ + generated_at: datetime = dataclasses.field( + default_factory=datetime.utcnow + ) + + +def schema_version(name: str, version: int): + def inner(cls: Type[VersionedSchema]): + cls.dbt_schema_version = SchemaVersion( + name=name, + version=version, + ) + return cls + return inner + + +@dataclasses.dataclass +class VersionedSchema(JsonSchemaMixin): dbt_schema_version: ClassVar[SchemaVersion] - def to_dict( - self, omit_none: bool = True, validate: bool = False - ) -> Dict[str, Any]: - dct = super().to_dict(omit_none=omit_none, validate=validate) - dct[SCHEMA_VERSION_KEY] = str(self.dbt_schema_version) - dct[DBT_VERSION_KEY] = __version__ - return dct + @classmethod + def json_schema(cls, embeddable: bool = False) -> Dict[str, Any]: + result = super().json_schema(embeddable=embeddable) + if not embeddable: + result['$id'] = str(cls.dbt_schema_version) + return result + + +T = TypeVar('T', bound='ArtifactMixin') + + +# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to +# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue: +# https://github.com/python/mypy/issues/7520 +@dataclasses.dataclass(init=False) +class ArtifactMixin(VersionedSchema, Writable, Readable): + metadata: BaseArtifactMetadata @classmethod def from_dict( cls: Type[T], data: Dict[str, Any], validate: bool = True ) -> T: + if cls.dbt_schema_version is None: + raise InternalException( + 'Cannot call from_dict with no schema version!' + ) + if validate: expected = str(cls.dbt_schema_version) - found = data.get(SCHEMA_VERSION_KEY) + found = data.get('metadata', {}).get(SCHEMA_VERSION_KEY) if found != expected: raise IncompatibleSchemaException(expected, found) return super().from_dict(data=data, validate=validate) - - @classmethod - def _collect_json_schema( - cls, definitions: Dict[str, Any] - ) -> Dict[str, Any]: - result = super()._collect_json_schema(definitions) - result['properties'][SCHEMA_VERSION_KEY] = { - 'const': str(cls.dbt_schema_version) - } - result['properties'][DBT_VERSION_KEY] = {'type': 'string'} - result['required'].extend([SCHEMA_VERSION_KEY, DBT_VERSION_KEY]) - return result - - @classmethod - def json_schema(cls, embeddable: bool = False) -> Dict[str, Any]: - result = super().json_schema(embeddable=embeddable) - # it would be nice to do this in hologram! - # in the schema itself, include the version url as $id - if not embeddable: - result['$id'] = str(cls.dbt_schema_version) - return result diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index ecac96cd8b5..c74de47be64 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1,6 +1,5 @@ import os import pickle -from datetime import datetime from typing import ( Dict, Optional, Mapping, Callable, Any, List, Type, Union, MutableMapping ) @@ -316,7 +315,6 @@ def create_manifest(self) -> Manifest: macros=self.results.macros, docs=self.results.docs, reports=self.results.reports, - generated_at=datetime.utcnow(), metadata=self.root_project.get_metadata(), disabled=disabled, files=self.results.files, diff --git a/core/dbt/rpc/node_runners.py b/core/dbt/rpc/node_runners.py index 805687767cc..2bbf866d4a5 100644 --- a/core/dbt/rpc/node_runners.py +++ b/core/dbt/rpc/node_runners.py @@ -1,16 +1,20 @@ from abc import abstractmethod +from datetime import datetime from typing import Generic, TypeVar import dbt.exceptions from dbt.contracts.rpc import ( - RemoteCompileResult, RemoteRunResult, ResultTable, + RemoteCompileResult, + RemoteCompileResultMixin, + RemoteRunResult, + ResultTable, ) from dbt.logger import GLOBAL_LOGGER as logger from dbt.task.compile import CompileRunner from dbt.rpc.error import dbt_error, RPCException, server_error -RPCSQLResult = TypeVar('RPCSQLResult', bound=RemoteCompileResult) +RPCSQLResult = TypeVar('RPCSQLResult', bound=RemoteCompileResultMixin) class GenericRPCRunner(CompileRunner, Generic[RPCSQLResult]): @@ -65,6 +69,7 @@ def execute(self, compiled_node, manifest) -> RemoteCompileResult: node=compiled_node, timing=[], # this will get added later logs=[], + generated_at=datetime.utcnow(), ) def from_run_result( @@ -76,6 +81,7 @@ def from_run_result( node=result.node, timing=timing_info, logs=[], + generated_at=datetime.utcnow(), ) @@ -97,6 +103,7 @@ def execute(self, compiled_node, manifest) -> RemoteRunResult: table=table, timing=[], logs=[], + generated_at=datetime.utcnow(), ) def from_run_result( @@ -109,4 +116,5 @@ def from_run_result( table=result.table, timing=timing_info, logs=[], + generated_at=datetime.utcnow(), ) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 9637f215208..7e54cc1fbc9 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -12,9 +12,10 @@ from .runnable import GraphRunnableTask from dbt.contracts.results import ( - FreshnessExecutionResult, - SourceFreshnessResult, + FreshnessExecutionResultArtifact, + FreshnessResult, PartialResult, + SourceFreshnessResult, ) from dbt.exceptions import RuntimeException, InternalException from dbt.logger import print_timestamped_line @@ -146,8 +147,12 @@ def get_node_selector(self): def get_runner_type(self): return FreshnessRunner + def write_result(self, result): + artifact = FreshnessExecutionResultArtifact.from_result(result) + artifact.write(self.result_path()) + def get_result(self, results, elapsed_time, generated_at): - return FreshnessExecutionResult( + return FreshnessResult.from_node_results( elapsed_time=elapsed_time, generated_at=generated_at, results=results diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index 246a3336cb2..1f666466f59 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -12,7 +12,7 @@ from dbt.contracts.graph.manifest import Manifest from dbt.contracts.results import ( TableMetadata, CatalogTable, CatalogResults, Primitive, CatalogKey, - StatsItem, StatsDict, ColumnMetadata + StatsItem, StatsDict, ColumnMetadata, CatalogArtifact ) from dbt.exceptions import InternalException from dbt.include.global_project import DOCS_INDEX_FILE_PATH @@ -207,7 +207,7 @@ def _get_manifest(self) -> Manifest: ) return self.manifest - def run(self) -> CatalogResults: + def run(self) -> CatalogArtifact: compile_results = None if self.args.compile: compile_results = CompileTask.run(self) @@ -215,12 +215,12 @@ def run(self) -> CatalogResults: print_timestamped_line( 'compile failed, cannot generate docs' ) - return CatalogResults( + return CatalogArtifact.from_results( nodes={}, sources={}, generated_at=datetime.utcnow(), errors=None, - _compile_results=compile_results + compile_results=compile_results ) else: self.manifest = get_full_manifest(self.config) @@ -294,12 +294,12 @@ def get_catalog_results( generated_at: datetime, compile_results: Optional[Any], errors: Optional[List[str]] - ) -> CatalogResults: - return CatalogResults( + ) -> CatalogArtifact: + return CatalogArtifact.from_results( + generated_at=generated_at, nodes=nodes, sources=sources, - generated_at=generated_at, - _compile_results=compile_results, + compile_results=compile_results, errors=errors, ) diff --git a/core/dbt/task/rpc/base.py b/core/dbt/task/rpc/base.py index 6989756b884..306e6fa6763 100644 --- a/core/dbt/task/rpc/base.py +++ b/core/dbt/task/rpc/base.py @@ -1,3 +1,4 @@ +from dbt.contracts.results import RunResultsArtifact from dbt.contracts.rpc import RemoteExecutionResult from dbt.task.runnable import GraphRunnableTask from dbt.rpc.method import RemoteManifestMethod, Parameters @@ -20,9 +21,10 @@ def load_manifest(self): def get_result( self, results, elapsed_time, generated_at ) -> RemoteExecutionResult: - return RemoteExecutionResult( + base = RunResultsArtifact.from_node_results( results=results, elapsed_time=elapsed_time, generated_at=generated_at, - logs=[], ) + rpc_result = RemoteExecutionResult.from_local_result(base, logs=[]) + return rpc_result diff --git a/core/dbt/task/rpc/project_commands.py b/core/dbt/task/rpc/project_commands.py index 6a7d843b864..f9735ec30b9 100644 --- a/core/dbt/task/rpc/project_commands.py +++ b/core/dbt/task/rpc/project_commands.py @@ -193,13 +193,7 @@ def _runtime_initialize(self): def handle_request(self) -> RemoteRunOperationResult: base = RunOperationTask.run(self) - result = RemoteRunOperationResult( - results=base.results, - generated_at=base.generated_at, - logs=[], - success=base.success, - elapsed_time=base.elapsed_time - ) + result = RemoteRunOperationResult.from_local_result(base=base, logs=[]) return result def interpret_results(self, results): diff --git a/core/dbt/task/run_operation.py b/core/dbt/task/run_operation.py index fc786724a10..23bff66c5b0 100644 --- a/core/dbt/task/run_operation.py +++ b/core/dbt/task/run_operation.py @@ -8,7 +8,7 @@ import dbt.exceptions from dbt.adapters.factory import get_adapter from dbt.config.utils import parse_cli_vars -from dbt.contracts.results import RunOperationResult +from dbt.contracts.results import RunOperationResultsArtifact from dbt.exceptions import InternalException from dbt.logger import GLOBAL_LOGGER as logger @@ -47,7 +47,7 @@ def _run_unsafe(self) -> agate.Table: return res - def run(self) -> RunOperationResult: + def run(self) -> RunOperationResultsArtifact: start = datetime.utcnow() self._runtime_initialize() try: @@ -69,11 +69,10 @@ def run(self) -> RunOperationResult: else: success = True end = datetime.utcnow() - return RunOperationResult( - results=[], + return RunOperationResultsArtifact.from_success( generated_at=end, elapsed_time=(end - start).total_seconds(), - success=success + success=success, ) def interpret_results(self, results): diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 6f1ef7c8e25..994f106accb 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -30,7 +30,7 @@ from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.parsed import ParsedSourceDefinition -from dbt.contracts.results import ExecutionResult +from dbt.contracts.results import RunResultsArtifact from dbt.contracts.state import PreviousState from dbt.exceptions import ( InternalException, @@ -395,6 +395,9 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): ) return result + def write_result(self, result): + result.write(self.result_path()) + def run(self): """ Run dbt for the query, based on the graph. @@ -422,7 +425,7 @@ def run(self): result = self.execute_with_hooks(selected_uids) if flags.WRITE_JSON: - result.write(self.result_path()) + self.write_result(result) self.task_end_messages(result.results) return result @@ -526,7 +529,7 @@ def create_schema(relation: BaseRelation) -> None: create_future.result() def get_result(self, results, elapsed_time, generated_at): - return ExecutionResult( + return RunResultsArtifact.from_node_results( results=results, elapsed_time=elapsed_time, generated_at=generated_at diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index 67fb6b095d3..5e5f93865ff 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -377,8 +377,6 @@ def _expected_catalog(self, id_type, text_type, time_type, view_type, }, } return { - 'dbt_schema_version': 'https://schemas.getdbt.com/dbt/catalog/v1.json', - 'dbt_version': dbt.version.__version__, 'nodes': { 'model.test.model': { 'unique_id': 'model.test.model', @@ -879,13 +877,13 @@ def verify_catalog(self, expected): catalog = _read_json('./target/catalog.json') - self.assertIn('generated_at', catalog) - self.assertBetween( - catalog.pop('generated_at'), - start=self.generate_start_time, - ) + assert set(catalog) == {'errors', 'metadata', 'nodes', 'sources'} + + self.verify_metadata(catalog['metadata'], 'https://schemas.getdbt.com/dbt/catalog/v1.json') + assert not catalog['errors'] + for key in 'nodes', 'sources': - self.assertEqual(catalog[key], expected[key]) + assert catalog[key] == expected[key] def verify_manifest_macros(self, manifest, expected=None): self.assertIn('macros', manifest) @@ -1536,12 +1534,6 @@ def expected_seeded_manifest(self, model_database=None): 'test.macro_info': ANY, 'test.macro_arg_info': ANY, }, - 'metadata': { - 'project_id': '098f6bcd4621d373cade4e832627b4f6', - 'send_anonymous_usage_stats': False, - 'user_id': None, - 'adapter_type': self.adapter_type, - }, 'disabled': [], } @@ -1938,12 +1930,6 @@ def expected_postgres_references_manifest(self, model_database=None): 'seed.test.seed': [], 'source.test.my_source.my_table': [], }, - 'metadata': { - 'project_id': '098f6bcd4621d373cade4e832627b4f6', - 'send_anonymous_usage_stats': False, - 'user_id': None, - 'adapter_type': self.adapter_type, - }, 'disabled': [], 'macros': { 'macro.test.test_nothing': { @@ -2362,12 +2348,6 @@ def expected_bigquery_complex_manifest(self): 'test.macro_info': ANY, 'test.macro_arg_info': ANY, }, - 'metadata': { - 'project_id': '098f6bcd4621d373cade4e832627b4f6', - 'send_anonymous_usage_stats': False, - 'user_id': None, - 'adapter_type': self.adapter_type, - }, 'disabled': [], } @@ -2575,23 +2555,26 @@ def expected_redshift_incremental_view_manifest(self): 'test.macro_info': ANY, 'test.macro_arg_info': ANY, }, - 'metadata': { - 'project_id': '098f6bcd4621d373cade4e832627b4f6', - 'send_anonymous_usage_stats': False, - 'user_id': None, - 'adapter_type': self.adapter_type, - }, 'disabled': [], } + def verify_metadata(self, metadata, dbt_schema_version): + assert 'generated_at' in metadata + self.assertBetween(metadata['generated_at'], + start=self.generate_start_time) + assert 'dbt_version' in metadata + assert metadata['dbt_version'] == dbt.version.__version__ + assert 'dbt_schema_version' in metadata + assert metadata['dbt_schema_version'] == dbt_schema_version + def verify_manifest(self, expected_manifest): self.assertTrue(os.path.exists('./target/manifest.json')) manifest = _read_json('./target/manifest.json') manifest_keys = frozenset({ - 'nodes', 'sources', 'macros', 'parent_map', 'child_map', 'generated_at', - 'docs', 'metadata', 'docs', 'disabled', 'reports', 'dbt_schema_version', 'dbt_version', + 'nodes', 'sources', 'macros', 'parent_map', 'child_map', + 'docs', 'metadata', 'docs', 'disabled', 'reports' }) self.assertEqual(frozenset(manifest), manifest_keys) @@ -2599,9 +2582,13 @@ def verify_manifest(self, expected_manifest): for key in manifest_keys: if key == 'macros': self.verify_manifest_macros(manifest, expected_manifest.get('macros')) - elif key == 'generated_at': - self.assertBetween(manifest['generated_at'], - start=self.generate_start_time) + elif key == 'metadata': + metadata = manifest['metadata'] + self.verify_metadata(metadata, 'https://schemas.getdbt.com/dbt/manifest/v1.json') + assert 'project_id' in metadata and metadata['project_id'] == '098f6bcd4621d373cade4e832627b4f6' + assert 'send_anonymous_usage_stats' in metadata and metadata['send_anonymous_usage_stats'] is False + assert 'user_id' in metadata and metadata['user_id'] is None + assert 'adapter_type' in metadata and metadata['adapter_type'] == self.adapter_type else: self.assertIn(key, expected_manifest) # sanity check self.assertEqual(manifest[key], expected_manifest[key]) @@ -3340,12 +3327,9 @@ def expected_postgres_references_run_results(self): def verify_run_results(self, expected_run_results): run_result = _read_json('./target/run_results.json') - self.assertIn('generated_at', run_result) + assert 'metadata' in run_result + self.verify_metadata(run_result['metadata'], 'https://schemas.getdbt.com/dbt/run-results/v1.json') self.assertIn('elapsed_time', run_result) - self.assertBetween( - run_result['generated_at'], - start=self.generate_start_time - ) self.assertGreater(run_result['elapsed_time'], 0) self.assertTrue( isinstance(run_result['elapsed_time'], float), @@ -3355,12 +3339,7 @@ def verify_run_results(self, expected_run_results): # sort the results so we can make reasonable assertions run_result['results'].sort(key=lambda r: r['node']['unique_id']) assert run_result['results'] == expected_run_results - assert run_result['dbt_schema_version'] == 'https://schemas.getdbt.com/dbt/run-results/v1.json' - assert run_result['dbt_version'] == dbt.version.__version__ - set(run_result) == { - 'generated_at', 'elapsed_time', 'results', 'dbt_schema_version', - 'dbt_version' - } + set(run_result) == {'elapsed_time', 'results', 'metadata'} @use_profile('postgres') def test__postgres__run_and_generate_no_compile(self): diff --git a/test/integration/042_sources_test/test_sources.py b/test/integration/042_sources_test/test_sources.py index 67be460863c..b7e6ebf7495 100644 --- a/test/integration/042_sources_test/test_sources.py +++ b/test/integration/042_sources_test/test_sources.py @@ -5,6 +5,7 @@ import yaml from dbt.exceptions import CompilationException +import dbt.version from test.integration.base import DBTIntegrationTest, use_profile, AnyFloat, \ AnyStringWith @@ -234,29 +235,33 @@ def _assert_freshness_results(self, path, state): with open(path) as fp: data = json.load(fp) - self.assertEqual(set(data), {'meta', 'sources'}) - self.assertIn('generated_at', data['meta']) - self.assertIn('elapsed_time', data['meta']) - self.assertTrue(isinstance(data['meta']['elapsed_time'], float)) - self.assertBetween(data['meta']['generated_at'], + assert set(data) == {'metadata', 'results', 'elapsed_time'} + assert 'generated_at' in data['metadata'] + assert isinstance(data['elapsed_time'], float) + self.assertBetween(data['metadata']['generated_at'], self.freshness_start_time) + assert data['metadata']['dbt_schema_version'] == 'https://schemas.getdbt.com/dbt/sources/v1.json' + assert data['metadata']['dbt_version'] == dbt.version.__version__ + last_inserted_time = self.last_inserted_time - self.assertEqual(len(data['sources']), 1) + self.assertEqual(len(data['results']), 1) - self.assertEqual(data['sources'], { - 'source.test.test_source.test_table': { + self.assertEqual(data['results'], [ + { + 'unique_id': 'source.test.test_source.test_table', 'max_loaded_at': last_inserted_time, 'snapshotted_at': AnyStringWith(), 'max_loaded_at_time_ago_in_s': AnyFloat(), 'state': state, 'criteria': { + 'filter': None, 'warn_after': {'count': 10, 'period': 'hour'}, 'error_after': {'count': 18, 'period': 'hour'}, }, } - }) + ]) def _run_source_freshness(self): # test_source.test_table should have a loaded_at field of `updated_at` diff --git a/test/unit/test_compiler.py b/test/unit/test_compiler.py index 783336104c1..a176a5d3e1f 100644 --- a/test/unit/test_compiler.py +++ b/test/unit/test_compiler.py @@ -153,8 +153,6 @@ def test__prepend_ctes__already_has_cte(self): }, sources={}, docs={}, - # '2018-02-14T09:15:13Z' - generated_at=datetime(2018, 2, 14, 9, 15, 13), disabled=[], files={}, reports={}, @@ -239,7 +237,6 @@ def test__prepend_ctes__no_ctes(self): }, sources={}, docs={}, - generated_at='2018-02-14T09:15:13Z', disabled=[], files={}, reports={}, @@ -333,7 +330,6 @@ def test__prepend_ctes(self): }, sources={}, docs={}, - generated_at='2018-02-14T09:15:13Z', disabled=[], files={}, reports={}, @@ -438,7 +434,6 @@ def test__prepend_ctes__cte_not_compiled(self): }, sources={}, docs={}, - generated_at='2018-02-14T09:15:13Z', disabled=[], files={}, reports={}, @@ -544,7 +539,6 @@ def test__prepend_ctes__multiple_levels(self): }, sources={}, docs={}, - generated_at='2018-02-14T09:15:13Z', disabled=[], files={}, reports={}, diff --git a/test/unit/test_docs_generate.py b/test/unit/test_docs_generate.py index 258371b9707..49576e9e100 100644 --- a/test/unit/test_docs_generate.py +++ b/test/unit/test_docs_generate.py @@ -30,7 +30,6 @@ def generate_catalog_dict(self, columns): result = generate.CatalogResults( nodes=nodes, sources=sources, - generated_at=datetime.utcnow(), errors=None, ) return result.to_dict(omit_none=False)['nodes'] diff --git a/test/unit/test_graph_selector_methods.py b/test/unit/test_graph_selector_methods.py index a0a16441a06..2ffeb46e656 100644 --- a/test/unit/test_graph_selector_methods.py +++ b/test/unit/test_graph_selector_methods.py @@ -470,7 +470,6 @@ def manifest(seed, source, ephemeral_model, view_model, table_model, ext_source, docs={}, files={}, reports={}, - generated_at=datetime.utcnow(), disabled=[], ) return manifest diff --git a/test/unit/test_manifest.py b/test/unit/test_manifest.py index e5e4aa527f9..9ab1c35f9e4 100644 --- a/test/unit/test_manifest.py +++ b/test/unit/test_manifest.py @@ -214,23 +214,25 @@ def setUp(self): @freezegun.freeze_time('2018-02-14T09:15:13Z') def test__no_nodes(self): - manifest = Manifest(nodes={}, sources={}, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - files={}, reports={}) + manifest = Manifest( + nodes={}, sources={}, macros={}, docs={}, disabled=[], files={}, reports={}, + metadata=ManifestMetadata(generated_at=datetime.utcnow()), + ) self.assertEqual( manifest.writable_manifest().to_dict(), { - 'dbt_schema_version': 'https://schemas.getdbt.com/dbt/manifest/v1.json', - 'dbt_version': dbt.version.__version__, 'nodes': {}, 'sources': {}, 'macros': {}, 'reports': {}, 'parent_map': {}, 'child_map': {}, - 'generated_at': '2018-02-14T09:15:13Z', + 'metadata': { + 'generated_at': '2018-02-14T09:15:13Z', + 'dbt_schema_version': 'https://schemas.getdbt.com/dbt/manifest/v1.json', + 'dbt_version': dbt.version.__version__, + }, 'docs': {}, - 'metadata': {}, 'disabled': [], } ) @@ -238,11 +240,12 @@ def test__no_nodes(self): @freezegun.freeze_time('2018-02-14T09:15:13Z') def test__nested_nodes(self): nodes = copy.copy(self.nested_nodes) - manifest = Manifest(nodes=nodes, sources={}, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - files={}, reports={}) + manifest = Manifest( + nodes=nodes, sources={}, macros={}, docs={}, disabled=[], files={}, reports={}, + metadata=ManifestMetadata(generated_at=datetime.utcnow()), + ) serialized = manifest.writable_manifest().to_dict() - self.assertEqual(serialized['generated_at'], '2018-02-14T09:15:13Z') + self.assertEqual(serialized['metadata']['generated_at'], '2018-02-14T09:15:13Z') self.assertEqual(serialized['docs'], {}) self.assertEqual(serialized['disabled'], []) parent_map = serialized['parent_map'] @@ -305,8 +308,7 @@ def test__build_flat_graph(self): nodes = copy.copy(self.nested_nodes) sources = copy.copy(self.sources) manifest = Manifest(nodes=nodes, sources=sources, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - files={}, reports={}) + disabled=[], files={}, reports={}) manifest.build_flat_graph() flat_graph = manifest.flat_graph flat_nodes = flat_graph['nodes'] @@ -321,16 +323,19 @@ def test__build_flat_graph(self): def test_metadata(self, mock_user): mock_user.id = 'cfc9500f-dc7f-4c83-9ea7-2c581c1b38cf' mock_user.do_not_track = True + now = datetime.utcnow() self.assertEqual( ManifestMetadata( project_id='098f6bcd4621d373cade4e832627b4f6', adapter_type='postgres', + generated_at=now, ), ManifestMetadata( project_id='098f6bcd4621d373cade4e832627b4f6', user_id='cfc9500f-dc7f-4c83-9ea7-2c581c1b38cf', send_anonymous_usage_stats=False, adapter_type='postgres', + generated_at=now, ) ) @@ -342,25 +347,26 @@ def test_no_nodes_with_metadata(self, mock_user): metadata = ManifestMetadata( project_id='098f6bcd4621d373cade4e832627b4f6', adapter_type='postgres', + generated_at=datetime.utcnow(), ) manifest = Manifest(nodes={}, sources={}, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - metadata=metadata, files={}, reports={}) + disabled=[], metadata=metadata, files={}, + reports={}) self.assertEqual( manifest.writable_manifest().to_dict(), { - 'dbt_schema_version': 'https://schemas.getdbt.com/dbt/manifest/v1.json', - 'dbt_version': dbt.version.__version__, 'nodes': {}, 'sources': {}, 'macros': {}, 'reports': {}, 'parent_map': {}, 'child_map': {}, - 'generated_at': '2018-02-14T09:15:13Z', 'docs': {}, 'metadata': { + 'generated_at': '2018-02-14T09:15:13Z', + 'dbt_schema_version': 'https://schemas.getdbt.com/dbt/manifest/v1.json', + 'dbt_version': dbt.version.__version__, 'project_id': '098f6bcd4621d373cade4e832627b4f6', 'user_id': 'cfc9500f-dc7f-4c83-9ea7-2c581c1b38cf', 'send_anonymous_usage_stats': False, @@ -372,8 +378,7 @@ def test_no_nodes_with_metadata(self, mock_user): def test_get_resource_fqns_empty(self): manifest = Manifest(nodes={}, sources={}, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - files={}, reports={}) + disabled=[], files={}, reports={}) self.assertEqual(manifest.get_resource_fqns(), {}) def test_get_resource_fqns(self): @@ -399,8 +404,7 @@ def test_get_resource_fqns(self): checksum=FileHash.empty(), ) manifest = Manifest(nodes=nodes, sources=self.sources, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - files={}, reports={}) + disabled=[], files={}, reports={}) expect = { 'models': frozenset([ ('snowplow', 'events'), @@ -581,22 +585,23 @@ def setUp(self): @freezegun.freeze_time('2018-02-14T09:15:13Z') def test__no_nodes(self): manifest = Manifest(nodes={}, sources={}, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - files={}, reports={}) + disabled=[], files={}, reports={}, + metadata=ManifestMetadata(generated_at=datetime.utcnow())) self.assertEqual( manifest.writable_manifest().to_dict(), { - 'dbt_schema_version': 'https://schemas.getdbt.com/dbt/manifest/v1.json', - 'dbt_version': dbt.version.__version__, 'nodes': {}, 'macros': {}, 'sources': {}, 'reports': {}, 'parent_map': {}, 'child_map': {}, - 'generated_at': '2018-02-14T09:15:13Z', + 'metadata': { + 'generated_at': '2018-02-14T09:15:13Z', + 'dbt_schema_version': 'https://schemas.getdbt.com/dbt/manifest/v1.json', + 'dbt_version': dbt.version.__version__, + }, 'docs': {}, - 'metadata': {}, 'disabled': [], } ) @@ -605,10 +610,10 @@ def test__no_nodes(self): def test__nested_nodes(self): nodes = copy.copy(self.nested_nodes) manifest = Manifest(nodes=nodes, sources={}, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - files={}, reports={}) + disabled=[], files={}, reports={}, + metadata=ManifestMetadata(generated_at=datetime.utcnow())) serialized = manifest.writable_manifest().to_dict() - self.assertEqual(serialized['generated_at'], '2018-02-14T09:15:13Z') + self.assertEqual(serialized['metadata']['generated_at'], '2018-02-14T09:15:13Z') self.assertEqual(serialized['disabled'], []) parent_map = serialized['parent_map'] child_map = serialized['child_map'] @@ -669,8 +674,7 @@ def test__nested_nodes(self): def test__build_flat_graph(self): nodes = copy.copy(self.nested_nodes) manifest = Manifest(nodes=nodes, sources={}, macros={}, docs={}, - generated_at=datetime.utcnow(), disabled=[], - files={}, reports={}) + disabled=[], files={}, reports={}) manifest.build_flat_graph() flat_graph = manifest.flat_graph flat_nodes = flat_graph['nodes'] @@ -715,7 +719,6 @@ def setUp(self): docs={ d.unique_id: d for d in self.docs }, - generated_at=datetime.utcnow(), disabled=[], files={}, reports={}, @@ -736,7 +739,6 @@ def make_manifest(nodes=[], sources=[], macros=[], docs=[]): docs={ d.unique_id: d for d in docs }, - generated_at=datetime.utcnow(), disabled=[], files={}, reports={}, diff --git a/test/unit/test_parser.py b/test/unit/test_parser.py index 760ad2d579f..4f6e991bca0 100644 --- a/test/unit/test_parser.py +++ b/test/unit/test_parser.py @@ -860,7 +860,7 @@ def setUp(self): self.doc.unique_id: self.doc, } self.manifest = Manifest( - nodes=nodes, sources=sources, macros={}, docs=docs, disabled=[], files={}, reports={}, generated_at=mock.MagicMock() + nodes=nodes, sources=sources, macros={}, docs=docs, disabled=[], files={}, reports={} ) def test_process_docs(self): From 204b02de3e3dc51860d6b7552f1f276dd7ac9951 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 22 Sep 2020 12:34:30 -0600 Subject: [PATCH 2/3] fix freshness RPC response behavior --- core/dbt/contracts/rpc.py | 55 ++++++++++++++++++++++++++++++++++++++- core/dbt/rpc/builtins.py | 7 ++++- core/dbt/task/rpc/base.py | 28 ++++++++++++++------ 3 files changed, 80 insertions(+), 10 deletions(-) diff --git a/core/dbt/contracts/rpc.py b/core/dbt/contracts/rpc.py index 8241695de2f..1270acadfb4 100644 --- a/core/dbt/contracts/rpc.py +++ b/core/dbt/contracts/rpc.py @@ -15,6 +15,8 @@ CatalogArtifact, CatalogResults, ExecutionResult, + FreshnessExecutionResultArtifact, + FreshnessResult, RunOperationResult, RunOperationResultsArtifact, RunResult, @@ -282,6 +284,28 @@ def write(self, path: str): writable.write(path) +@dataclass +@schema_version('remote-freshness-result', 1) +class RemoteFreshnessResult(FreshnessResult, RemoteResult): + + @classmethod + def from_local_result( + cls, + base: FreshnessResult, + logs: List[LogMessage], + ) -> 'RemoteFreshnessResult': + return cls( + metadata=base.metadata, + results=base.results, + elapsed_time=base.elapsed_time, + logs=logs, + ) + + def write(self, path: str): + writable = FreshnessExecutionResultArtifact.from_result(base=self) + writable.write(path) + + @dataclass @schema_version('remote-run-result', 1) class RemoteRunResult(RemoteCompileResultMixin): @@ -292,6 +316,7 @@ class RemoteRunResult(RemoteCompileResultMixin): RPCResult = Union[ RemoteCompileResult, RemoteExecutionResult, + RemoteFreshnessResult, RemoteCatalogResults, RemoteDepsResult, RemoteRunOperationResult, @@ -300,7 +325,6 @@ class RemoteRunResult(RemoteCompileResultMixin): # GC types - class GCResultState(StrEnum): Deleted = 'deleted' # successful GC Missing = 'missing' # nothing to GC @@ -682,6 +706,35 @@ def from_result( elapsed=timing.elapsed, ) + +@dataclass +@schema_version('poll-remote-freshness-result', 1) +class PollFreshnessResult(RemoteFreshnessResult, PollResult): + state: TaskHandlerState = field( + metadata=restrict_to(TaskHandlerState.Success, + TaskHandlerState.Failed), + ) + + @classmethod + def from_result( + cls: Type['PollFreshnessResult'], + base: RemoteFreshnessResult, + tags: TaskTags, + timing: TaskTiming, + logs: List[LogMessage], + ) -> 'PollFreshnessResult': + return cls( + logs=logs, + tags=tags, + state=timing.state, + start=timing.start, + end=timing.end, + elapsed=timing.elapsed, + metadata=base.metadata, + results=base.results, + elapsed_time=base.elapsed_time, + ) + # Manifest parsing types diff --git a/core/dbt/rpc/builtins.py b/core/dbt/rpc/builtins.py index 66cb171ff41..7e900ca5357 100644 --- a/core/dbt/rpc/builtins.py +++ b/core/dbt/rpc/builtins.py @@ -18,6 +18,7 @@ TaskRow, PSResult, RemoteExecutionResult, + RemoteFreshnessResult, RemoteRunResult, RemoteCompileResult, RemoteCatalogResults, @@ -32,6 +33,7 @@ PollRunCompleteResult, PollCompileCompleteResult, PollCatalogCompleteResult, + PollFreshnessResult, PollRemoteEmptyCompleteResult, PollRunOperationCompleteResult, TaskHandlerState, @@ -146,7 +148,8 @@ def poll_complete( PollCatalogCompleteResult, PollRemoteEmptyCompleteResult, PollRunOperationCompleteResult, - PollGetManifestResult + PollGetManifestResult, + PollFreshnessResult, ]] if isinstance(result, RemoteExecutionResult): @@ -164,6 +167,8 @@ def poll_complete( cls = PollRunOperationCompleteResult elif isinstance(result, GetManifestResult): cls = PollGetManifestResult + elif isinstance(result, RemoteFreshnessResult): + cls = PollFreshnessResult else: raise dbt.exceptions.InternalException( 'got invalid result in poll_complete: {}'.format(result) diff --git a/core/dbt/task/rpc/base.py b/core/dbt/task/rpc/base.py index 306e6fa6763..d31b4310905 100644 --- a/core/dbt/task/rpc/base.py +++ b/core/dbt/task/rpc/base.py @@ -1,9 +1,24 @@ -from dbt.contracts.results import RunResultsArtifact -from dbt.contracts.rpc import RemoteExecutionResult +from dbt.contracts.results import ( + RunResult, + RunOperationResult, + FreshnessResult, +) +from dbt.contracts.rpc import ( + RemoteExecutionResult, + RemoteFreshnessResult, + RemoteRunOperationResult, +) from dbt.task.runnable import GraphRunnableTask from dbt.rpc.method import RemoteManifestMethod, Parameters +RESULT_TYPE_MAP = { + RunResult: RemoteExecutionResult, + RunOperationResult: RemoteRunOperationResult, + FreshnessResult: RemoteFreshnessResult, +} + + class RPCTask( GraphRunnableTask, RemoteManifestMethod[Parameters, RemoteExecutionResult] @@ -21,10 +36,7 @@ def load_manifest(self): def get_result( self, results, elapsed_time, generated_at ) -> RemoteExecutionResult: - base = RunResultsArtifact.from_node_results( - results=results, - elapsed_time=elapsed_time, - generated_at=generated_at, - ) - rpc_result = RemoteExecutionResult.from_local_result(base, logs=[]) + base = super().get_result(results, elapsed_time, generated_at) + cls = RESULT_TYPE_MAP.get(type(base), RemoteExecutionResult) + rpc_result = cls.from_local_result(base, logs=[]) return rpc_result From a32295e74a95f086795d358135aa19aeea29d049 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 22 Sep 2020 13:48:31 -0600 Subject: [PATCH 3/3] fix schema collection script --- scripts/collect-artifact-schema.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/collect-artifact-schema.py b/scripts/collect-artifact-schema.py index 0d0484cc704..ecdf98a82eb 100644 --- a/scripts/collect-artifact-schema.py +++ b/scripts/collect-artifact-schema.py @@ -6,7 +6,7 @@ from hologram import JsonSchemaMixin from dbt.contracts.graph.manifest import WritableManifest from dbt.contracts.results import ( - CatalogResults, ExecutionResult, FreshnessExecutionResult + CatalogArtifact, RunResultsArtifact, FreshnessExecutionResultArtifact ) @@ -21,9 +21,9 @@ class Schemas(JsonSchemaMixin): def main(): schemas = Schemas( manifest=WritableManifest.json_schema(), - catalog=CatalogResults.json_schema(), - run_results=ExecutionResult.json_schema(), - freshness_results=FreshnessExecutionResult.json_schema(), + catalog=CatalogArtifact.json_schema(), + run_results=RunResultsArtifact.json_schema(), + freshness_results=FreshnessExecutionResultArtifact.json_schema(), ) print(json.dumps(schemas.to_dict()))