Skip to content

Commit

Permalink
Add an API to coarsen/partition Targets by their cycles (#12251)
Browse files Browse the repository at this point in the history
In order to recursively (such as for compilation) consume a graph of `Targets` containing cycles, we must "coarsen" the `Targets` in each cycle into a batch (called a [strongly connected component](https://en.wikipedia.org/wiki/Strongly_connected_component)).

This change adds a rule to compute the `CoarsenedTargets` for `Addresses`. To compile a graph of `Targets` with fine grained compiler invokes, you can "walk" the coarsened graph by requesting `CoarsenedTargets` for the roots, and then recursively requesting a compiler output per `CoarsenedTarget` dependency.
  • Loading branch information
stuhood authored Jul 13, 2021
1 parent be5b479 commit b0b7b0b
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 20 deletions.
119 changes: 101 additions & 18 deletions src/python/pants/engine/internals/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
Snapshot,
SpecsSnapshot,
)
from pants.engine.internals import native_engine
from pants.engine.internals.target_adaptor import TargetAdaptor
from pants.engine.rules import Get, MultiGet, collect_rules, rule
from pants.engine.target import (
CoarsenedTarget,
CoarsenedTargets,
Dependencies,
DependenciesRequest,
ExplicitlyProvidedDependencies,
Expand Down Expand Up @@ -77,6 +80,7 @@
from pants.option.global_options import GlobalOptions, OwnersNotFoundBehavior
from pants.source.filespec import matches_filespec
from pants.util.docutil import doc_url
from pants.util.frozendict import FrozenDict
from pants.util.logging import LogLevel
from pants.util.ordered_set import FrozenOrderedSet, OrderedSet

Expand Down Expand Up @@ -238,29 +242,58 @@ def visit(address: Address):
)


@rule(desc="Resolve transitive targets")
async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTargets:
"""Find all the targets transitively depended upon by the target roots.
@dataclass(frozen=True)
class _DependencyMappingRequest:
tt_request: TransitiveTargetsRequest
expanded_targets: bool


@dataclass(frozen=True)
class _DependencyMapping:
mapping: FrozenDict[Address, Tuple[Address, ...]]
visited: FrozenOrderedSet[Target]
roots_as_targets: Collection[Target]


@rule
async def transitive_dependency_mapping(request: _DependencyMappingRequest) -> _DependencyMapping:
"""This uses iteration, rather than recursion, so that we can tolerate dependency cycles.
This uses iteration, rather than recursion, so that we can tolerate dependency cycles. Unlike a
traditional BFS algorithm, we batch each round of traversals via `MultiGet` for improved
performance / concurrency.
Unlike a traditional BFS algorithm, we batch each round of traversals via `MultiGet` for
improved performance / concurrency.
"""
roots_as_targets = await Get(Targets, Addresses(request.roots))
roots_as_targets: Collection[Target]
if request.expanded_targets:
roots_as_targets = await Get(Targets, Addresses(request.tt_request.roots))
else:
roots_as_targets = await Get(UnexpandedTargets, Addresses(request.tt_request.roots))
visited: OrderedSet[Target] = OrderedSet()
queued = FrozenOrderedSet(roots_as_targets)
dependency_mapping: Dict[Address, Tuple[Address, ...]] = {}
while queued:
direct_dependencies = await MultiGet(
Get(
Targets,
DependenciesRequest(
tgt.get(Dependencies),
include_special_cased_deps=request.include_special_cased_deps,
),
direct_dependencies: Tuple[Collection[Target], ...]
if request.expanded_targets:
direct_dependencies = await MultiGet(
Get(
Targets,
DependenciesRequest(
tgt.get(Dependencies),
include_special_cased_deps=request.tt_request.include_special_cased_deps,
),
)
for tgt in queued
)
else:
direct_dependencies = await MultiGet(
Get(
UnexpandedTargets,
DependenciesRequest(
tgt.get(Dependencies),
include_special_cased_deps=request.tt_request.include_special_cased_deps,
),
)
for tgt in queued
)
for tgt in queued
)

dependency_mapping.update(
zip(
Expand All @@ -278,11 +311,21 @@ async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTar
# is because expanding from the `Addresses` -> `Targets` may have resulted in generated
# subtargets being used, so we need to use `roots_as_targets` to have this expansion.
_detect_cycles(tuple(t.address for t in roots_as_targets), dependency_mapping)
return _DependencyMapping(
FrozenDict(dependency_mapping), FrozenOrderedSet(visited), roots_as_targets
)


@rule(desc="Resolve transitive targets")
async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTargets:
"""Find all the targets transitively depended upon by the target roots."""

dependency_mapping = await Get(_DependencyMapping, _DependencyMappingRequest(request, True))

# Apply any transitive excludes (`!!` ignores).
transitive_excludes: FrozenOrderedSet[Target] = FrozenOrderedSet()
unevaluated_transitive_excludes = []
for t in (*roots_as_targets, *visited):
for t in (*dependency_mapping.roots_as_targets, *dependency_mapping.visited):
unparsed = t.get(Dependencies).unevaluated_transitive_excludes
if unparsed.values:
unevaluated_transitive_excludes.append(unparsed)
Expand All @@ -296,8 +339,48 @@ async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTar
)

return TransitiveTargets(
tuple(roots_as_targets), FrozenOrderedSet(visited.difference(transitive_excludes))
tuple(dependency_mapping.roots_as_targets),
FrozenOrderedSet(dependency_mapping.visited.difference(transitive_excludes)),
)


# -----------------------------------------------------------------------------------------------
# CoarsenedTargets
# -----------------------------------------------------------------------------------------------


@rule
async def coarsened_targets(addresses: Addresses) -> CoarsenedTargets:
dependency_mapping = await Get(
_DependencyMapping,
_DependencyMappingRequest(
# NB: We set include_special_cased_deps=True because although computing CoarsenedTargets
# requires a transitive graph walk (to ensure that all cycles are actually detected),
# the resulting CoarsenedTargets instance is not itself transitive: everything not directly
# involved in a cycle with one of the input Addresses is discarded in the output.
TransitiveTargetsRequest(addresses, include_special_cased_deps=True),
expanded_targets=False,
),
)
components = native_engine.strongly_connected_components(
list(dependency_mapping.mapping.items())
)

addresses_set = set(addresses)
addresses_to_targets = {
t.address: t for t in [*dependency_mapping.visited, *dependency_mapping.roots_as_targets]
}
targets = []
for component in components:
if not any(component_address in addresses_set for component_address in component):
continue
component_set = set(component)
members = tuple(addresses_to_targets[a] for a in component)
dependencies = FrozenOrderedSet(
[d for a in component for d in dependency_mapping.mapping[a] if d not in component_set]
)
targets.append(CoarsenedTarget(members, dependencies))
return CoarsenedTargets(targets)


# -----------------------------------------------------------------------------------------------
Expand Down
77 changes: 77 additions & 0 deletions src/python/pants/engine/internals/graph_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from pants.engine.rules import Get, MultiGet, rule
from pants.engine.target import (
AsyncFieldMixin,
CoarsenedTargets,
Dependencies,
DependenciesRequest,
ExplicitlyProvidedDependencies,
Expand Down Expand Up @@ -97,6 +98,7 @@ class MockTarget(Target):
def transitive_targets_rule_runner() -> RuleRunner:
return RuleRunner(
rules=[
QueryRule(CoarsenedTargets, (Addresses,)),
QueryRule(Targets, (DependenciesRequest,)),
QueryRule(TransitiveTargets, (TransitiveTargetsRequest,)),
],
Expand Down Expand Up @@ -306,6 +308,81 @@ def test_transitive_targets_tolerates_subtarget_cycles(
]


def test_coarsened_targets(transitive_targets_rule_runner: RuleRunner) -> None:
"""CoarsenedTargets should "coarsen" a cycle into a single CoarsenedTarget instance.
Cycles are only allowed for file targets, so we use explicit file dependencies in this test.
"""
transitive_targets_rule_runner.create_files("", ["dep.txt", "t1.txt", "t2.txt"])
transitive_targets_rule_runner.add_to_build_file(
"",
dedent(
"""\
target(name='dep', sources=['dep.txt'])
target(name='t1', sources=['t1.txt'], dependencies=['dep.txt:dep', 't2.txt:t2'])
target(name='t2', sources=['t2.txt'], dependencies=['t1.txt:t1'])
"""
),
)

def assert_coarsened(
a: Address, expected_members: List[Address], expected_dependencies: List[Address]
) -> None:
coarsened_targets = transitive_targets_rule_runner.request(
CoarsenedTargets,
[Addresses([a])],
)
assert list(sorted(t.address for t in coarsened_targets[0].members)) == expected_members
assert list(sorted(d for d in coarsened_targets[0].dependencies)) == expected_dependencies

# BUILD targets are never involved in cycles, and so always coarsen to themselves.
assert_coarsened(
Address("", target_name="dep"),
[Address("", target_name="dep")],
[Address("", relative_file_path="dep.txt", target_name="dep")],
)
assert_coarsened(
Address("", target_name="t1"),
[Address("", target_name="t1")],
[
Address("", relative_file_path="dep.txt", target_name="dep"),
Address("", relative_file_path="t1.txt", target_name="t1"),
Address("", relative_file_path="t2.txt", target_name="t2"),
],
)
assert_coarsened(
Address("", target_name="t2"),
[Address("", target_name="t2")],
[
Address("", relative_file_path="t1.txt", target_name="t1"),
Address("", relative_file_path="t2.txt", target_name="t2"),
],
)

# As will file targets not involved in cycles.
assert_coarsened(
Address("", relative_file_path="dep.txt", target_name="dep"),
[Address("", relative_file_path="dep.txt", target_name="dep")],
[],
)

# But file targets involved in cycles will coarsen to the cycle, and have only dependencies outside of the cycle.
cycle_files = [
Address("", relative_file_path="t1.txt", target_name="t1"),
Address("", relative_file_path="t2.txt", target_name="t2"),
]
assert_coarsened(
Address("", relative_file_path="t1.txt", target_name="t1"),
cycle_files,
[Address("", relative_file_path="dep.txt", target_name="dep")],
)
assert_coarsened(
Address("", relative_file_path="t2.txt", target_name="t2"),
cycle_files,
[Address("", relative_file_path="dep.txt", target_name="dep")],
)


def assert_failed_cycle(
rule_runner: RuleRunner,
*,
Expand Down
5 changes: 4 additions & 1 deletion src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

from io import RawIOBase
from typing import Any, Sequence, TextIO
from typing import Any, Sequence, TextIO, Tuple

from typing_extensions import Protocol

Expand Down Expand Up @@ -139,6 +139,9 @@ def rule_subgraph_visualize(
) -> None: ...
def garbage_collect_store(scheduler: PyScheduler, target_size_bytes: int) -> None: ...
def lease_files_in_graph(scheduler: PyScheduler, session: PySession) -> None: ...
def strongly_connected_components(
adjacency_lists: Sequence[Tuple[Any, Sequence[Any]]]
) -> Sequence[Sequence[Any]]: ...

class PyDigest:
def __init__(self, fingerprint: str, serialized_bytes_length: int) -> None: ...
Expand Down
17 changes: 17 additions & 0 deletions src/python/pants/engine/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,23 @@ def expect_single(self) -> Target:
return self[0]


@dataclass(frozen=True)
class CoarsenedTarget:
"""A set of Targets which cyclicly reach one another, and are thus indivisable."""

# The members of the cycle.
members: Tuple[Target, ...]
# The deduped direct (not transitive) dependencies of all Targets in the cycle. Dependencies
# between members of the cycle are excluded.
#
# To expand these dependencies, request `CoarsenedTargets` for them.
dependencies: FrozenOrderedSet[Address]


class CoarsenedTargets(Collection[CoarsenedTarget]):
"""A set of direct (not transitive) disjoint CoarsenedTarget instances."""


@dataclass(frozen=True)
class TransitiveTargets:
"""A set of Target roots, and their transitive, flattened, de-duped dependencies.
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ logging = { path = "logging" }
nailgun = { path = "nailgun" }
num_enum = "0.4"
parking_lot = "0.11"
petgraph = "0.5"
process_execution = { path = "process_execution" }
rand = "0.8"
regex = "1"
Expand Down
46 changes: 45 additions & 1 deletion src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use hashing::Digest;
use log::{self, debug, error, warn, Log};
use logging::logger::PANTS_LOGGER;
use logging::{Logger, PythonLogLevel};
use petgraph::graph::{DiGraph, Graph};
use process_execution::RemoteCacheWarningsBehavior;
use regex::Regex;
use rule_graph::{self, RuleGraph};
Expand All @@ -77,7 +78,7 @@ use workunit_store::{

use crate::{
externs, nodes, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination, Failure,
Function, Intrinsics, LocalStoreOptions, Params, RemotingOptions, Rule, Scheduler, Session,
Function, Intrinsics, Key, LocalStoreOptions, Params, RemotingOptions, Rule, Scheduler, Session,
Tasks, Types, Value,
};

Expand Down Expand Up @@ -396,6 +397,15 @@ py_module_initializer!(native_engine, |py, m| {
py_fn!(py, ensure_remote_has_recursive(a: PyScheduler, b: PyList)),
)?;

m.add(
py,
"strongly_connected_components",
py_fn!(
py,
strongly_connected_components(a: Vec<(PyObject, Vec<PyObject>)>)
),
)?;

m.add_class::<PyExecutionRequest>(py)?;
m.add_class::<PyExecutionStrategyOptions>(py)?;
m.add_class::<PyExecutor>(py)?;
Expand Down Expand Up @@ -830,6 +840,40 @@ fn nailgun_server_await_shutdown(py: Python, nailgun_server_ptr: PyNailgunServer
})
}

fn strongly_connected_components(
py: Python,
adjacency_lists: Vec<(PyObject, Vec<PyObject>)>,
) -> CPyResult<Vec<Vec<PyObject>>> {
let mut graph: DiGraph<Key, (), u32> = Graph::new();
let mut node_ids: HashMap<Key, _> = HashMap::new();

for (node, adjacency_list) in adjacency_lists {
let node_key = externs::key_for(node.clone_ref(py).into())?;
let node_id = *node_ids
.entry(node_key)
.or_insert_with(|| graph.add_node(node_key));
for dependency in adjacency_list {
let dependency_key = externs::key_for(dependency.clone_ref(py).into())?;
let dependency_id = node_ids
.entry(dependency_key)
.or_insert_with(|| graph.add_node(dependency_key));
graph.add_edge(node_id, *dependency_id, ());
}
}

Ok(
petgraph::algo::tarjan_scc(&graph)
.into_iter()
.map(|component| {
component
.into_iter()
.map(|node_id| externs::val_for(&graph[node_id]).consume_into_py_object(py))
.collect::<Vec<_>>()
})
.collect(),
)
}

///
/// Given a set of Tasks and type information, creates a Scheduler.
///
Expand Down

0 comments on commit b0b7b0b

Please sign in to comment.