Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build task arg parity #3884

Merged
merged 6 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class RPCDocsGenerateParameters(RPCParameters):

@dataclass
class RPCBuildParameters(RPCParameters):
resource_types: Optional[List[str]] = None
threads: Optional[int] = None
models: Union[None, str, List[str]] = None
select: Union[None, str, List[str]] = None
Expand Down
63 changes: 51 additions & 12 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@

import dbt.version
import dbt.flags as flags
import dbt.task.run as run_task
import dbt.task.build as build_task
import dbt.task.clean as clean_task
import dbt.task.compile as compile_task
import dbt.task.debug as debug_task
import dbt.task.clean as clean_task
import dbt.task.deps as deps_task
import dbt.task.freshness as freshness_task
import dbt.task.generate as generate_task
import dbt.task.init as init_task
import dbt.task.list as list_task
import dbt.task.parse as parse_task
import dbt.task.run as run_task
import dbt.task.run_operation as run_operation_task
import dbt.task.seed as seed_task
import dbt.task.test as test_task
import dbt.task.snapshot as snapshot_task
import dbt.task.generate as generate_task
import dbt.task.serve as serve_task
import dbt.task.freshness as freshness_task
import dbt.task.run_operation as run_operation_task
import dbt.task.parse as parse_task
import dbt.task.snapshot as snapshot_task
import dbt.task.test as test_task
from dbt.profiler import profiler
from dbt.task.list import ListTask
from dbt.task.rpc.server import RPCServerTask
from dbt.adapters.factory import reset_adapters, cleanup_connections

Expand Down Expand Up @@ -399,6 +399,45 @@ def _build_build_subparser(subparsers, base_subparser):
Stop execution upon a first failure.
'''
)
sub.add_argument(
'--store-failures',
action='store_true',
help='''
Store test results (failing rows) in the database
'''
)
resource_values: List[str] = [
str(s) for s in build_task.BuildTask.ALL_RESOURCE_VALUES
] + ['all']
sub.add_argument('--resource-type',
choices=resource_values,
action='append',
default=[],
dest='resource_types')
sub.add_argument(
'-m',
'--models',
dest='models',
nargs='+',
help='''
Specify the models to select and set the resource-type to 'model'.
Mutually exclusive with '--select' (or '-s') and '--resource-type'
''',
metavar='SELECTOR',
required=False,
)
sub.add_argument(
'-s',
'--select',
dest='select',
nargs='+',
help='''
Specify the nodes to include.
''',
metavar='SELECTOR',
required=False,
)
_add_common_selector_arguments(sub)
return sub


Expand Down Expand Up @@ -815,9 +854,9 @@ def _build_list_subparser(subparsers, base_subparser):
''',
aliases=['ls'],
)
sub.set_defaults(cls=ListTask, which='list', rpc_method=None)
sub.set_defaults(cls=list_task.ListTask, which='list', rpc_method=None)
resource_values: List[str] = [
str(s) for s in ListTask.ALL_RESOURCE_VALUES
str(s) for s in list_task.ListTask.ALL_RESOURCE_VALUES
] + ['default', 'all']
sub.add_argument('--resource-type',
choices=resource_values,
Expand Down Expand Up @@ -1062,7 +1101,7 @@ def parse_args(args, cls=DBTArgumentParser):
# --select, --exclude
# list_sub sets up its own arguments.
_add_selection_arguments(
build_sub, run_sub, compile_sub, generate_sub, test_sub, snapshot_sub, seed_sub)
run_sub, compile_sub, generate_sub, test_sub, snapshot_sub, seed_sub)
# --defer
_add_defer_argument(run_sub, test_sub, build_sub)
# --full-refresh
Expand Down
74 changes: 65 additions & 9 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,96 @@
from .test import TestRunner as test_runner

from dbt.contracts.results import NodeStatus
from dbt.graph import ResourceTypeSelector
from dbt.exceptions import InternalException
from dbt.exceptions import RuntimeException, InternalException
from dbt.graph import (
parse_difference,
ResourceTypeSelector,
SelectionSpec,
)
from dbt.node_types import NodeType
from dbt.task.test import TestSelector


class BuildTask(RunTask):
"""The Build task processes all assets of a given process and
attempts to 'build' them in an opinionated fashion. Every resource
type outlined in RUNNER_MAP will be processed by the mapped runner class.
"""The Build task processes all assets of a given process and attempts to
'build' them in an opinionated fashion. Every resource type outlined in
RUNNER_MAP will be processed by the mapped runner class.

I.E. a resource of type Model is handled by the ModelRunner which is
imported as run_model_runner.
"""
imported as run_model_runner. """

MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error, NodeStatus.Fail]

RUNNER_MAP = {
NodeType.Model: run_model_runner,
NodeType.Snapshot: snapshot_model_runner,
NodeType.Seed: seed_runner,
NodeType.Test: test_runner,
}
ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()})

MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error, NodeStatus.Fail]
def __init__(self, args, config):
super().__init__(args, config)
if self.args.models:
if self.args.select:
raise RuntimeException(
'"models" and "select" are mutually exclusive arguments'
)
if self.args.resource_types:
raise RuntimeException(
'"models" and "resource_type" are mutually exclusive '
'arguments'
)

@property
def resource_types(self):
if self.args.models:
return [NodeType.Model]

if not self.args.resource_types:
return list(self.ALL_RESOURCE_VALUES)

values = set(self.args.resource_types)
kwigley marked this conversation as resolved.
Show resolved Hide resolved

if 'all' in values:
values.remove('all')
values.update(self.ALL_RESOURCE_VALUES)

return list(values)

@property
def selector(self):
if self.args.models:
return self.args.models
else:
return self.args.select

def get_selection_spec(self) -> SelectionSpec:
if self.args.selector_name:
spec = self.config.get_selector(self.args.selector_name)
else:
spec = parse_difference(self.selector, self.args.exclude)
return spec

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get node selection'
)

resource_types = self.resource_types

if resource_types == [NodeType.Test]:
return TestSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
)
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=[x for x in self.RUNNER_MAP.keys()],
resource_types=resource_types,
)

def get_runner_type(self, node):
Expand Down
2 changes: 0 additions & 2 deletions core/dbt/task/list.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
from typing import Type

from dbt.contracts.graph.parsed import (
ParsedExposure,
Expand Down Expand Up @@ -186,7 +185,6 @@ def get_node_selector(self):
raise InternalException(
'manifest and graph must be set to get perform node selection'
)
cls: Type[ResourceTypeSelector]
if self.resource_types == [NodeType.Test]:
return TestSelector(
graph=self.graph,
Expand Down
8 changes: 4 additions & 4 deletions core/dbt/task/rpc/project_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,13 @@ def output_results(results):


class RemoteBuildProjectTask(RPCCommandTask[RPCBuildParameters], BuildTask):

METHOD_NAME = 'build'

def set_args(self, params: RPCBuildParameters) -> None:
if params.models:
self.args.select = self._listify(params.models)
else:
self.args.select = self._listify(params.select)
self.args.resource_types = self._listify(params.resource_types)
self.args.models = self._listify(params.models)
self.args.select = self._listify(params.select)
self.args.exclude = self._listify(params.exclude)
self.args.selector_name = params.selector

Expand Down
79 changes: 76 additions & 3 deletions test/rpc/test_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def test_rpc_build_state(

get_write_manifest(querier, os.path.join(state_dir, 'manifest.json'))

project.models['my_model.sql'] = 'select * from {{ ref("data" )}} where id = 2'
project.models['my_model.sql'] =\
'select * from {{ ref("data" )}} where id = 2'
project.write_models(project_root, remove=True)
querier.sighup()
assert querier.wait_for_status('ready') is True
Expand All @@ -129,9 +130,81 @@ def test_rpc_build_state(
querier.build(state='./state', models=['state:modified']),
)
assert len(results['results']) == 0

# a better test of defer would require multiple targets
results = querier.async_wait_for_result(
querier.build(state='./state', models=['state:modified'], defer=True)
querier.build(
state='./state',
models=['state:modified'],
defer=True
)
)
assert len(results['results']) == 0


@pytest.mark.supported('postgres')
def test_rpc_build_selectors(
project_root, profiles_root, dbt_profile, unique_schema
):
schema_yaml = {
'version': 2,
'models': [{
'name': 'my_model',
'columns': [
{
'name': 'id',
'tests': ['not_null', 'unique'],
},
],
}],
}
project = ProjectDefinition(
name='test',
project_data={
'seeds': {'+quote_columns': False},
'models': {'test': {'my_model': {'+tags': 'example_tag'}}}
},
models={
'my_model.sql': 'select * from {{ ref("data") }}',
'schema.yml': yaml.safe_dump(schema_yaml)
},
seeds={'data.csv': 'id,message\n1,hello\n2,goodbye'},
snapshots={'my_snapshots.sql': snapshot_data},
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
)
with querier_ctx as querier:
# test simple resource_types param
results = querier.async_wait_for_result(
querier.build(resource_types=['seed'])
)
assert len(results['results']) == 1
assert results['results'][0]['node']['resource_type'] == 'seed'

# test simple models param
results = querier.async_wait_for_result(
querier.build(models=['my_model'])
)
assert len(results['results']) == 1
assert results['results'][0]['node']['resource_type'] == 'model'

# test simple models param
results = querier.async_wait_for_result(
querier.build(models=['my_model'])
)
assert len(results['results']) == 1
assert results['results'][0]['node']['resource_type'] == 'model'

# test simple select param (should select tagged model and its tests)
results = querier.async_wait_for_result(
querier.build(select=['tag:example_tag'])
)
assert len(results['results']) == 3
assert sorted(
[result['node']['resource_type'] for result in results['results']]
) == ['model', 'test', 'test']
Loading