Skip to content

Commit

Permalink
Move graph cycle detection to rust. (pantsbuild#11202)
Browse files Browse the repository at this point in the history
### Problem

Our existing cycle detection (and reporting) algorithm is implemented recursively in Python, and can hit recursion depth limits for larger graphs.

### Solution

Rather than porting to an iterative algorithm (which is complex when you need to actually report the cycle path), or adjusting recursion limits (which would work fine, but which exist as a reminder that stack frames are not cheap in Python), port our existing algorithm to Rust. 

### Result

Fixes pantsbuild#11201.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Nov 18, 2020
1 parent ea2c5d6 commit fabcb45
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 36 deletions.
43 changes: 9 additions & 34 deletions src/python/pants/engine/internals/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
Snapshot,
SpecsSnapshot,
)
from pants.engine.internals.native_engine import cyclic_paths
from pants.engine.internals.target_adaptor import TargetAdaptor
from pants.engine.rules import Get, MultiGet, collect_rules, rule
from pants.engine.target import (
Expand Down Expand Up @@ -186,21 +187,18 @@ def __init__(self, subject: Address, path: Tuple[Address, ...]) -> None:
self.path = path


def _detect_cycles(
roots: Tuple[Address, ...], dependency_mapping: Dict[Address, Tuple[Address, ...]]
) -> None:
path_stack: OrderedSet[Address] = OrderedSet()
visited: Set[Address] = set()
def _detect_cycles(dependency_mapping: Dict[Address, Tuple[Address, ...]]) -> None:
for path in cyclic_paths(dependency_mapping):
address = path[-1]

def maybe_report_cycle(address: Address) -> None:
# NB: File-level dependencies are cycle tolerant.
if address.is_file_target or address not in path_stack:
if address.is_file_target:
return

# The path of the cycle is shorter than the entire path to the cycle: if the suffix of
# the path representing the cycle contains a file dep, it is ignored.
in_cycle = False
for path_address in path_stack:
for path_address in path:
if in_cycle and path_address.is_file_target:
# There is a file address inside the cycle: do not report it.
return
Expand All @@ -212,27 +210,7 @@ def maybe_report_cycle(address: Address) -> None:
# the address in question.
in_cycle = path_address == address
# If we did not break out early, it's because there were no file addresses in the cycle.
raise CycleException(address, (*path_stack, address))

def visit(address: Address):
if address in visited:
maybe_report_cycle(address)
return
path_stack.add(address)
visited.add(address)

for dep_address in dependency_mapping[address]:
visit(dep_address)

path_stack.remove(address)

for root in roots:
visit(root)
if path_stack:
raise AssertionError(
f"The stack of visited nodes should have been empty at the end of recursion, "
f"but it still contained: {path_stack}"
)
raise CycleException(address, path)


@rule(desc="Resolve transitive targets")
Expand Down Expand Up @@ -271,10 +249,7 @@ async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTar
)
visited.update(queued)

# NB: We use `roots_as_targets` to get the root addresses, rather than `request.roots`. This
# is because expanding from the `Addresses` -> `Targets` may have resulted in generated
# subtargets being used, so we need to use `roots_as_targets` to have this expansion.
_detect_cycles(tuple(t.address for t in roots_as_targets), dependency_mapping)
_detect_cycles(dependency_mapping)

# Apply any transitive excludes (`!!` ignores).
transitive_excludes: FrozenOrderedSet[Target] = FrozenOrderedSet()
Expand Down Expand Up @@ -331,7 +306,7 @@ async def transitive_targets_lite(request: TransitiveTargetsRequestLite) -> Tran
# NB: We use `roots_as_targets` to get the root addresses, rather than `request.roots`. This
# is because expanding from the `Addresses` -> `Targets` may have resulted in generated
# subtargets being used, so we need to use `roots_as_targets` to have this expansion.
_detect_cycles(tuple(t.address for t in roots_as_targets), dependency_mapping)
_detect_cycles(dependency_mapping)

# Apply any transitive excludes (`!!` ignores).
wrapped_transitive_excludes = await MultiGet(
Expand Down
6 changes: 5 additions & 1 deletion src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from typing import Any, Callable, Dict, List
from typing import Any, Callable, Dict, List, Tuple, TypeVar

# TODO: black and flake8 disagree about the content of this file:
# see https://github.com/psf/black/issues/1548
# flake8: noqa: E302

T = TypeVar("T")

def cyclic_paths(adjacencies: Dict[T, Tuple[T, ...]]) -> Tuple[Tuple[T]]: ...

class PyDigest:
def __init__(self, fingerprint: str, serialized_bytes_length: int) -> None: ...
@property
Expand Down
54 changes: 54 additions & 0 deletions src/rust/engine/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use fnv::FnvHasher;

use std::collections::HashSet;
use std::convert::AsRef;
use std::ops::Deref;
use std::sync::Arc;
Expand All @@ -11,6 +12,7 @@ use std::{fmt, hash};
use crate::externs;

use cpython::{FromPyObject, PyClone, PyDict, PyErr, PyObject, PyResult, Python, ToPyObject};
use indexmap::{IndexMap, IndexSet};
use smallvec::SmallVec;

pub type FNV = hash::BuildHasherDefault<FnvHasher>;
Expand Down Expand Up @@ -424,3 +426,55 @@ pub fn throw(msg: &str) -> Failure {
engine_traceback: Vec::new(),
}
}

///
/// Given a graph represented as an adjacency list, return a collection of cyclic paths.
///
pub fn cyclic_paths<N: hash::Hash + Eq + Copy>(adjacencies: IndexMap<N, Vec<N>>) -> Vec<Vec<N>> {
let mut cyclic_paths = Vec::new();
let mut path_stack = IndexSet::new();
let mut visited = HashSet::new();

for node in adjacencies.keys() {
cyclic_paths_visit(
*node,
&adjacencies,
&mut cyclic_paths,
&mut path_stack,
&mut visited,
);
}

cyclic_paths
}

fn cyclic_paths_visit<N: hash::Hash + Eq + Copy>(
node: N,
adjacencies: &IndexMap<N, Vec<N>>,
cyclic_paths: &mut Vec<Vec<N>>,
path_stack: &mut IndexSet<N>,
visited: &mut HashSet<N>,
) {
if visited.contains(&node) {
if path_stack.contains(&node) {
cyclic_paths.push(
path_stack
.iter()
.cloned()
.chain(std::iter::once(node))
.collect::<Vec<_>>(),
);
}
return;
}
path_stack.insert(node);
visited.insert(node);

if let Some(adjacent) = adjacencies.get(&node) {
for dep_node in adjacent {
cyclic_paths_visit(*dep_node, adjacencies, cyclic_paths, path_stack, visited);
}
}

path_stack.remove(&node);
}
39 changes: 38 additions & 1 deletion src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use futures::future::FutureExt;
use futures::future::{self as future03, TryFutureExt};
use futures01::Future;
use hashing::{Digest, EMPTY_DIGEST};
use indexmap::IndexMap;
use log::{self, debug, error, warn, Log};
use logging::logger::PANTS_LOGGER;
use logging::{Destination, Logger, PythonLogLevel};
Expand All @@ -75,7 +76,8 @@ use workunit_store::{UserMetadataItem, Workunit, WorkunitState};
use crate::scheduler::maybe_break_execution_loop;
use crate::{
externs, nodes, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination, Failure,
Function, Intrinsics, Params, RemotingOptions, Rule, Scheduler, Session, Tasks, Types, Value,
Function, Intrinsics, Key, Params, RemotingOptions, Rule, Scheduler, Session, Tasks, Types,
Value,
};

py_exception!(native_engine, PollTimeout);
Expand All @@ -101,6 +103,8 @@ py_module_initializer!(native_engine, |py, m| {

m.add(py, "default_config_path", py_fn!(py, default_config_path()))?;

m.add(py, "cyclic_paths", py_fn!(py, cyclic_paths(a: PyDict)))?;

m.add(
py,
"init_logging",
Expand Down Expand Up @@ -1834,6 +1838,39 @@ fn default_config_path(py: Python) -> CPyResult<String> {
})
}

fn cyclic_paths(py: Python, adjacencies: PyDict) -> CPyResult<Vec<PyTuple>> {
let adjacencies = adjacencies
.items(py)
.into_iter()
.map(|(k, v)| {
let node = externs::key_for(k.into())?;
let adjacent = v
.extract::<Vec<PyObject>>(py)?
.into_iter()
.map(|v| externs::key_for(v.into()))
.collect::<Result<Vec<Key>, _>>()?;
let res: Result<_, PyErr> = Ok((node, adjacent));
res
})
.collect::<Result<IndexMap<Key, Vec<Key>>, _>>()?;
let paths = py.allow_threads(move || crate::core::cyclic_paths(adjacencies));

Ok(
paths
.into_iter()
.map(|path| {
let gil = Python::acquire_gil();
let path_vec = path
.iter()
.map(externs::val_for)
.map(|node| node.consume_into_py_object(gil.python()))
.collect::<Vec<_>>();
PyTuple::new(gil.python(), &path_vec)
})
.collect(),
)
}

fn init_logging(
py: Python,
level: u64,
Expand Down

0 comments on commit fabcb45

Please sign in to comment.