Skip to content

Commit

Permalink
Create a no-op exposure runner
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke committed Dec 2, 2024
1 parent 1b7d9b5 commit e6325c6
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 50 deletions.
2 changes: 2 additions & 0 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ class NodeStatus(StrEnum):
PartialSuccess = "partial success"
Pass = "pass"
RuntimeErr = "runtime error"
NoOp = "no-op"


class RunStatus(StrEnum):
Success = NodeStatus.Success
Error = NodeStatus.Error
Skipped = NodeStatus.Skipped
PartialSuccess = NodeStatus.PartialSuccess
NoOp = NodeStatus.NoOp


class TestStatus(StrEnum):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ def group(self):


@dataclass
class Exposure(GraphNode, ExposureResource):
class Exposure(NodeInfoMixin, GraphNode, ExposureResource):
@property
def depends_on_nodes(self):
return self.depends_on.nodes
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,9 @@ def code(self) -> str:
return "Z023"

def message(self) -> str:
stats_line = "Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}"
stats_line = (
"Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} NO-OP={noop} TOTAL={total}"
)
return stats_line.format(**self.stats)


Expand Down
10 changes: 6 additions & 4 deletions core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ def _is_graph_member(self, unique_id: UniqueId) -> bool:
elif unique_id in self.manifest.saved_queries:
saved_query = self.manifest.saved_queries[unique_id]
return saved_query.config.enabled

node = self.manifest.nodes[unique_id]

return node.config.enabled
elif unique_id in self.manifest.exposures:
exposure = self.manifest.exposures[unique_id]
return exposure.config.enabled

Check warning on line 183 in core/dbt/graph/selector.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/graph/selector.py#L182-L183

Added lines #L182 - L183 were not covered by tests
else:
node = self.manifest.nodes[unique_id]
return node.config.enabled

def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/runner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .exposure_runner import ExposureRunner
from .saved_query_runner import SavedQueryRunner
7 changes: 7 additions & 0 deletions core/dbt/runner/exposure_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.runner.no_op_runner import NoOpRunner


class ExposureRunner(NoOpRunner):
@property
def description(self) -> str:
return f"exposure {self.node.name}"

Check warning on line 7 in core/dbt/runner/exposure_runner.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/runner/exposure_runner.py#L7

Added line #L7 was not covered by tests
45 changes: 45 additions & 0 deletions core/dbt/runner/no_op_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import threading

from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.task.base import BaseRunner
from dbt_common.events.functions import fire_event


class NoOpRunner(BaseRunner):
@property
def description(self) -> str:
raise NotImplementedError("description not implemented")

Check warning on line 14 in core/dbt/runner/no_op_runner.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/runner/no_op_runner.py#L14

Added line #L14 was not covered by tests

def before_execute(self) -> None:
pass

def compile(self, manifest: Manifest):
return self.node

def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)

def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.NoOp,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)
7 changes: 7 additions & 0 deletions core/dbt/runner/saved_query_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.runner.no_op_runner import NoOpRunner


class SavedQueryRunner(NoOpRunner):
@property
def description(self) -> str:
return f"saved query {self.node.name}"
48 changes: 5 additions & 43 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import threading
from typing import Dict, List, Optional, Set, Type

from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.results import NodeStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.exceptions import DbtInternalError
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.runner import ExposureRunner as exposure_runner
from dbt.runner import SavedQueryRunner as saved_query_runner
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.run import MicrobatchModelRunner
from dbt_common.events.functions import fire_event

from .run import ModelRunner as run_model_runner
from .run import RunTask
Expand All @@ -21,44 +20,6 @@
from .test import TestRunner as test_runner


class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self) -> str:
return f"saved query {self.node.name}"

def before_execute(self) -> None:
pass

def compile(self, manifest: Manifest):
return self.node

def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)

def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)


class BuildTask(RunTask):
"""The Build task processes all assets of a given process and attempts to
'build' them in an opinionated fashion. Every resource type outlined in
Expand All @@ -80,7 +41,8 @@ class BuildTask(RunTask):
NodeType.Seed: seed_runner,
NodeType.Test: test_runner,
NodeType.Unit: test_runner,
NodeType.SavedQuery: SavedQueryRunner,
NodeType.SavedQuery: saved_query_runner,
NodeType.Exposure: exposure_runner,
}
ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()})

Expand Down
3 changes: 3 additions & 0 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def interpret_run_result(result) -> str:
return "warn"
elif result.status in (NodeStatus.Pass, NodeStatus.Success):
return "pass"
elif result.status == NodeStatus.NoOp:
return "noop"
else:
raise RuntimeError(f"unhandled result {result}")

Expand All @@ -58,6 +60,7 @@ def print_run_status_line(results) -> None:
"skip": 0,
"pass": 0,
"warn": 0,
"noop": 0,
"total": 0,
}

Expand Down
2 changes: 2 additions & 0 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def _runtime_initialize(self):
self._flattened_nodes.append(self.manifest.saved_queries[uid])
elif uid in self.manifest.unit_tests:
self._flattened_nodes.append(self.manifest.unit_tests[uid])
elif uid in self.manifest.exposures:
self._flattened_nodes.append(self.manifest.exposures[uid])

Check warning on line 188 in core/dbt/task/runnable.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/runnable.py#L187-L188

Added lines #L187 - L188 were not covered by tests
else:
raise DbtInternalError(
f"Node selection returned {uid}, expected a node, a source, or a unit test"
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/task/test_build.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dbt.contracts.graph.nodes import SavedQuery
from dbt.task.build import SavedQueryRunner
from dbt.runner import SavedQueryRunner


def test_saved_query_runner_on_skip(saved_query: SavedQuery):
Expand Down

0 comments on commit e6325c6

Please sign in to comment.