Skip to content

Commit

Permalink
Revert "Move graph cycle detection to rust. (#11202)" (#11272)
Browse files Browse the repository at this point in the history
#11202 introduced a lot of contention on the GIL that was only obvious when larger numbers of targets were being built. #11270 is a holistic fix for that issue, but it is currently blocked on #11269.

In the meantime, we will land #11271 to dodge the original issue in #11201 to get us back to a medium-slow-but-working state, and then follow up on #11269 and #11270 to get us to the best place.

This reverts commit fabcb45.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Dec 8, 2020
1 parent 4f8fef3 commit 9c11a5e
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 105 deletions.
43 changes: 34 additions & 9 deletions src/python/pants/engine/internals/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
Snapshot,
SpecsSnapshot,
)
from pants.engine.internals.native_engine import cyclic_paths
from pants.engine.internals.target_adaptor import TargetAdaptor
from pants.engine.rules import Get, MultiGet, collect_rules, rule
from pants.engine.target import (
Expand Down Expand Up @@ -191,18 +190,21 @@ def __init__(self, subject: Address, path: Tuple[Address, ...]) -> None:
self.path = path


def _detect_cycles(dependency_mapping: Dict[Address, Tuple[Address, ...]]) -> None:
for path in cyclic_paths(dependency_mapping):
address = path[-1]
def _detect_cycles(
roots: Tuple[Address, ...], dependency_mapping: Dict[Address, Tuple[Address, ...]]
) -> None:
path_stack: OrderedSet[Address] = OrderedSet()
visited: Set[Address] = set()

def maybe_report_cycle(address: Address) -> None:
# NB: File-level dependencies are cycle tolerant.
if address.is_file_target:
if address.is_file_target or address not in path_stack:
return

# The path of the cycle is shorter than the entire path to the cycle: if the suffix of
# the path representing the cycle contains a file dep, it is ignored.
in_cycle = False
for path_address in path:
for path_address in path_stack:
if in_cycle and path_address.is_file_target:
# There is a file address inside the cycle: do not report it.
return
Expand All @@ -214,7 +216,27 @@ def _detect_cycles(dependency_mapping: Dict[Address, Tuple[Address, ...]]) -> No
# the address in question.
in_cycle = path_address == address
# If we did not break out early, it's because there were no file addresses in the cycle.
raise CycleException(address, path)
raise CycleException(address, (*path_stack, address))

def visit(address: Address):
if address in visited:
maybe_report_cycle(address)
return
path_stack.add(address)
visited.add(address)

for dep_address in dependency_mapping[address]:
visit(dep_address)

path_stack.remove(address)

for root in roots:
visit(root)
if path_stack:
raise AssertionError(
f"The stack of visited nodes should have been empty at the end of recursion, "
f"but it still contained: {path_stack}"
)


@rule(desc="Resolve transitive targets")
Expand Down Expand Up @@ -253,7 +275,10 @@ async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTar
)
visited.update(queued)

_detect_cycles(dependency_mapping)
# NB: We use `roots_as_targets` to get the root addresses, rather than `request.roots`. This
# is because expanding from the `Addresses` -> `Targets` may have resulted in generated
# subtargets being used, so we need to use `roots_as_targets` to have this expansion.
_detect_cycles(tuple(t.address for t in roots_as_targets), dependency_mapping)

# Apply any transitive excludes (`!!` ignores).
transitive_excludes: FrozenOrderedSet[Target] = FrozenOrderedSet()
Expand Down Expand Up @@ -310,7 +335,7 @@ async def transitive_targets_lite(request: TransitiveTargetsRequestLite) -> Tran
# NB: We use `roots_as_targets` to get the root addresses, rather than `request.roots`. This
# is because expanding from the `Addresses` -> `Targets` may have resulted in generated
# subtargets being used, so we need to use `roots_as_targets` to have this expansion.
_detect_cycles(dependency_mapping)
_detect_cycles(tuple(t.address for t in roots_as_targets), dependency_mapping)

# Apply any transitive excludes (`!!` ignores).
wrapped_transitive_excludes = await MultiGet(
Expand Down
5 changes: 1 addition & 4 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from typing import Any, Callable, Dict, List, Tuple, TypeVar
from typing import Any, Callable, Dict, List

# TODO: black and flake8 disagree about the content of this file:
# see https://github.com/psf/black/issues/1548
# flake8: noqa: E302

T = TypeVar("T")

def cyclic_paths(adjacencies: Dict[T, Tuple[T, ...]]) -> Tuple[Tuple[T]]: ...
def session_cancel_all() -> None: ...

class PyDigest:
Expand Down
54 changes: 0 additions & 54 deletions src/rust/engine/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use fnv::FnvHasher;

use std::collections::HashSet;
use std::convert::AsRef;
use std::ops::Deref;
use std::sync::Arc;
Expand All @@ -14,7 +13,6 @@ use crate::externs;
use cpython::{
FromPyObject, PyClone, PyDict, PyErr, PyObject, PyResult, PyType, Python, ToPyObject,
};
use indexmap::{IndexMap, IndexSet};
use smallvec::SmallVec;

pub type FNV = hash::BuildHasherDefault<FnvHasher>;
Expand Down Expand Up @@ -446,55 +444,3 @@ pub fn throw(msg: &str) -> Failure {
engine_traceback: Vec::new(),
}
}

///
/// Given a graph represented as an adjacency list, return a collection of cyclic paths.
///
pub fn cyclic_paths<N: hash::Hash + Eq + Copy>(adjacencies: IndexMap<N, Vec<N>>) -> Vec<Vec<N>> {
let mut cyclic_paths = Vec::new();
let mut path_stack = IndexSet::new();
let mut visited = HashSet::new();

for node in adjacencies.keys() {
cyclic_paths_visit(
*node,
&adjacencies,
&mut cyclic_paths,
&mut path_stack,
&mut visited,
);
}

cyclic_paths
}

fn cyclic_paths_visit<N: hash::Hash + Eq + Copy>(
node: N,
adjacencies: &IndexMap<N, Vec<N>>,
cyclic_paths: &mut Vec<Vec<N>>,
path_stack: &mut IndexSet<N>,
visited: &mut HashSet<N>,
) {
if visited.contains(&node) {
if path_stack.contains(&node) {
cyclic_paths.push(
path_stack
.iter()
.cloned()
.chain(std::iter::once(node))
.collect::<Vec<_>>(),
);
}
return;
}
path_stack.insert(node);
visited.insert(node);

if let Some(adjacent) = adjacencies.get(&node) {
for dep_node in adjacent {
cyclic_paths_visit(*dep_node, adjacencies, cyclic_paths, path_stack, visited);
}
}

path_stack.remove(&node);
}
40 changes: 2 additions & 38 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ use futures::future::FutureExt;
use futures::future::{self as future03, TryFutureExt};
use futures01::Future;
use hashing::Digest;
use indexmap::IndexMap;
use log::{self, debug, error, warn, Log};
use logging::logger::PANTS_LOGGER;
use logging::{Destination, Logger, PythonLogLevel};
Expand All @@ -74,8 +73,8 @@ use workunit_store::{UserMetadataItem, Workunit, WorkunitState};

use crate::{
externs, nodes, sessions_cancel, Core, ExecutionRequest, ExecutionStrategyOptions,
ExecutionTermination, Failure, Function, Intrinsics, Key, Params, RemotingOptions, Rule,
Scheduler, Session, Tasks, Types, Value,
ExecutionTermination, Failure, Function, Intrinsics, Params, RemotingOptions, Rule, Scheduler,
Session, Tasks, Types, Value,
};

py_exception!(native_engine, PollTimeout);
Expand All @@ -101,8 +100,6 @@ py_module_initializer!(native_engine, |py, m| {

m.add(py, "default_config_path", py_fn!(py, default_config_path()))?;

m.add(py, "cyclic_paths", py_fn!(py, cyclic_paths(a: PyDict)))?;

m.add(
py,
"init_logging",
Expand Down Expand Up @@ -1791,39 +1788,6 @@ fn default_config_path(py: Python) -> CPyResult<String> {
})
}

fn cyclic_paths(py: Python, adjacencies: PyDict) -> CPyResult<Vec<PyTuple>> {
let adjacencies = adjacencies
.items(py)
.into_iter()
.map(|(k, v)| {
let node = externs::key_for(k.into())?;
let adjacent = v
.extract::<Vec<PyObject>>(py)?
.into_iter()
.map(|v| externs::key_for(v.into()))
.collect::<Result<Vec<Key>, _>>()?;
let res: Result<_, PyErr> = Ok((node, adjacent));
res
})
.collect::<Result<IndexMap<Key, Vec<Key>>, _>>()?;
let paths = py.allow_threads(move || crate::core::cyclic_paths(adjacencies));

Ok(
paths
.into_iter()
.map(|path| {
let gil = Python::acquire_gil();
let path_vec = path
.iter()
.map(externs::val_for)
.map(|node| node.consume_into_py_object(gil.python()))
.collect::<Vec<_>>();
PyTuple::new(gil.python(), &path_vec)
})
.collect(),
)
}

fn init_logging(
py: Python,
level: u64,
Expand Down

0 comments on commit 9c11a5e

Please sign in to comment.