Skip to content

Commit

Permalink
Merge pull request #2813 from fishtown-analytics/feature/2510-save-ar…
Browse files Browse the repository at this point in the history
…gs-run_results

Save args in run_results.json
  • Loading branch information
gshank authored Oct 9, 2020
2 parents fd5e10c + 9472288 commit 3888e00
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Improved error messages for YAML selectors ([#2700](https://github.com/fishtown-analytics/dbt/issues/2700), [#2781](https://github.com/fishtown-analytics/dbt/pull/2781))
- Save manifest at the same time we save the run_results at the end of a run ([#2765](https://github.com/fishtown-analytics/dbt/issues/2765), [#2799](https://github.com/fishtown-analytics/dbt/pull/2799))
- Added dbt_invocation_id for each BigQuery job to enable performance analysis ([#2808](https://github.com/fishtown-analytics/dbt/issues/2808), [#2809](https://github.com/fishtown-analytics/dbt/pull/2809))
- Save cli and rpc arguments in run_results.json ([#2510](https://github.com/fishtown-analytics/dbt/issues/2510), [#2813](https://github.com/fishtown-analytics/dbt/pull/2813))

### Under the hood
- Added strategy-specific validation to improve the relevancy of compilation errors for the `timestamp` and `check` snapshot strategies. (([#2787](https://github.com/fishtown-analytics/dbt/issues/2787), [#2791](https://github.com/fishtown-analytics/dbt/pull/2791))
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,15 @@ class RunResultsArtifact(
ArtifactMixin,
):
results: Sequence[RunResult]
args: Dict[str, Any] = field(default_factory=dict)

@classmethod
def from_node_results(
cls,
results: Sequence[RunResult],
elapsed_time: float,
generated_at: datetime,
args: Dict,
):
meta = RunResultsMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
Expand All @@ -145,6 +147,7 @@ def from_node_results(
metadata=meta,
results=results,
elapsed_time=elapsed_time,
args=args
)


Expand Down
3 changes: 3 additions & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def error(self):
@dataclass
@schema_version('remote-execution-result', 1)
class RemoteExecutionResult(ExecutionResult, RemoteResult):
args: Dict[str, Any] = field(default_factory=dict)
results: Sequence[RunResult]
generated_at: datetime = field(default_factory=datetime.utcnow)

Expand All @@ -233,6 +234,7 @@ def write(self, path: str):
generated_at=self.generated_at,
results=self.results,
elapsed_time=self.elapsed_time,
args=self.args,
)
writable.write(path)

Expand All @@ -246,6 +248,7 @@ def from_local_result(
generated_at=base.metadata.generated_at,
results=base.results,
elapsed_time=base.elapsed_time,
args=base.args,
logs=logs,
)

Expand Down
30 changes: 29 additions & 1 deletion core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool
from typing import Optional, Dict, List, Set, Tuple, Iterable, AbstractSet
from pathlib import PosixPath, WindowsPath

from .printer import (
print_run_result_error,
Expand Down Expand Up @@ -530,11 +531,38 @@ def create_schema(relation: BaseRelation) -> None:
create_future.result()

def get_result(self, results, elapsed_time, generated_at):

return RunResultsArtifact.from_node_results(
results=results,
elapsed_time=elapsed_time,
generated_at=generated_at
generated_at=generated_at,
args=self.args_to_dict(),
)

def args_to_dict(self):
var_args = vars(self.args)
dict_args = {}
# remove args keys that clutter up the dictionary
for key in var_args:
if key == 'cls':
continue
if var_args[key] is None:
continue
default_false_keys = (
'debug', 'full_refresh', 'fail_fast', 'warn_error',
'single_threaded', 'test_new_parser', 'log_cache_events',
'strict'
)
if key in default_false_keys and var_args[key] is False:
continue
if key == 'vars' and var_args[key] == '{}':
continue
# this was required for a test case
if (isinstance(var_args[key], PosixPath) or
isinstance(var_args[key], WindowsPath)):
var_args[key] = str(var_args[key])
dict_args[key] = var_args[key]
return dict_args

def task_end_messages(self, results):
print_run_end_messages(results)
9 changes: 9 additions & 0 deletions test/integration/028_cli_vars/test_cli_vars.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from test.integration.base import DBTIntegrationTest, use_profile
import yaml
import json


class TestCLIVars(DBTIntegrationTest):
Expand Down Expand Up @@ -57,3 +58,11 @@ def test__postgres_cli_vars_longer(self):
self.assertEqual(len(results), 1)
results = self.run_dbt(["test", "--vars", "{simple: abc, unused: def}"])
self.assertEqual(len(results), 1)
run_results = _read_json('./target/run_results.json')
self.assertEqual(run_results['args']['vars'], "{simple: abc, unused: def}")


def _read_json(path):
# read json generated by dbt.
with open(path) as fp:
return json.load(fp)
24 changes: 13 additions & 11 deletions test/integration/029_docs_generate_tests/test_docs_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3333,21 +3333,23 @@ def expected_postgres_references_run_results(self):
]

def verify_run_results(self, expected_run_results):
run_result = _read_json('./target/run_results.json')
run_results = _read_json('./target/run_results.json')

assert 'metadata' in run_result
self.verify_metadata(run_result['metadata'], 'https://schemas.getdbt.com/dbt/run-results/v1.json')
self.assertIn('elapsed_time', run_result)
self.assertGreater(run_result['elapsed_time'], 0)
assert 'metadata' in run_results
self.verify_metadata(run_results['metadata'], 'https://schemas.getdbt.com/dbt/run-results/v1.json')
self.assertIn('elapsed_time', run_results)
self.assertGreater(run_results['elapsed_time'], 0)
self.assertTrue(
isinstance(run_result['elapsed_time'], float),
"run_result['elapsed_time'] is of type {}, expected float".format(
str(type(run_result['elapsed_time'])))
isinstance(run_results['elapsed_time'], float),
"run_results['elapsed_time'] is of type {}, expected float".format(
str(type(run_results['elapsed_time'])))
)

assert 'args' in run_results
# sort the results so we can make reasonable assertions
run_result['results'].sort(key=lambda r: r['node']['unique_id'])
assert run_result['results'] == expected_run_results
set(run_result) == {'elapsed_time', 'results', 'metadata'}
run_results['results'].sort(key=lambda r: r['node']['unique_id'])
assert run_results['results'] == expected_run_results
set(run_results) == {'elapsed_time', 'results', 'metadata'}

@use_profile('postgres')
def test__postgres__run_and_generate_no_compile(self):
Expand Down
1 change: 1 addition & 0 deletions test/integration/100_rpc_test/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ def test_compile_project_cli_postgres(self):
@use_profile('postgres')
def test_run_project_postgres(self):
result = self.async_query('run').json()
assert 'args' in result['result']
self.assertHasResults(result, {'descendant_model', 'multi_source_model', 'nonsource_descendant'})
self.assertTablesEqual('multi_source_model', 'expected_multi_source')

Expand Down

0 comments on commit 3888e00

Please sign in to comment.