Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always explicitly shutdown executors (Cherry-pick of #18216) #18218

Merged
merged 1 commit into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def create(
bootstrap_options = options.bootstrap_option_values()
assert bootstrap_options is not None
scheduler = EngineInitializer.setup_graph(
bootstrap_options, build_config, dynamic_remote_options
bootstrap_options, build_config, dynamic_remote_options, executor
)
with options_initializer.handle_unknown_flags(options_bootstrapper, env, raise_=True):
global_options = options.for_global_scope()
Expand Down
20 changes: 14 additions & 6 deletions src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from typing import List, Mapping

from pants.base.exiter import ExitCode
from pants.engine.internals.native_engine import PantsdConnectionException, PyNailgunClient
from pants.engine.internals.native_engine import (
PantsdConnectionException,
PyExecutor,
PyNailgunClient,
)
from pants.option.global_options import GlobalOptions
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.pantsd.pants_daemon_client import PantsDaemonClient
Expand Down Expand Up @@ -114,15 +118,19 @@ def run(self, start_time: float) -> ExitCode:
pantsd_handle = self._client.maybe_launch()
logger.debug(f"Connecting to pantsd on port {pantsd_handle.port}")

return self._connect_and_execute(pantsd_handle, start_time)
executor = GlobalOptions.create_py_executor(self._bootstrap_options.for_global_scope())
try:
return self._connect_and_execute(pantsd_handle, executor, start_time)
finally:
executor.shutdown(3)

def _connect_and_execute(
self, pantsd_handle: PantsDaemonClient.Handle, start_time: float
self,
pantsd_handle: PantsDaemonClient.Handle,
executor: PyExecutor,
start_time: float,
) -> ExitCode:
global_options = self._bootstrap_options.for_global_scope()
# We do not explicitly shut this PyExecutor down, because the client should not run any long lived
# tasks which we would want to wait for (in particular: it runs no Python code). See #16105.
executor = GlobalOptions.create_py_executor(global_options)

# Merge the nailgun TTY capability environment variables with the passed environment dict.
ng_env = ttynames_to_env(sys.stdin, sys.stdout, sys.stderr)
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def setup_graph(
bootstrap_options: OptionValueContainer,
build_configuration: BuildConfiguration,
dynamic_remote_options: DynamicRemoteOptions,
executor: PyExecutor | None = None,
executor: PyExecutor,
is_bootstrap: bool = False,
) -> GraphScheduler:
build_root = get_buildroot()
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/src/externs/nailgun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ struct PyNailgunClient {
#[pymethods]
impl PyNailgunClient {
#[new]
fn __new__(port: u16, py_executor: PyExecutor) -> Self {
fn __new__(port: u16, py_executor: &PyExecutor) -> Self {
Self {
port,
executor: py_executor.0,
executor: py_executor.0.clone(),
}
}

Expand Down
16 changes: 13 additions & 3 deletions src/rust/engine/src/externs/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extern "C" {
}

#[pyclass]
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PyExecutor(pub task_executor::Executor);

#[pymethods]
Expand Down Expand Up @@ -52,7 +52,17 @@ impl PyExecutor {

/// Shut down this executor, waiting for all tasks to exit. Any tasks which have not exited at
/// the end of the timeout will be leaked.
fn shutdown(&self, duration_secs: f64) {
self.0.shutdown(Duration::from_secs_f64(duration_secs))
fn shutdown(&self, py: Python, duration_secs: f64) {
py.allow_threads(|| self.0.shutdown(Duration::from_secs_f64(duration_secs)))
}
}

impl Drop for PyExecutor {
fn drop(&mut self) {
if !self.0.is_shutdown() {
// This can lead to hangs, since `Drop` will run on an arbitrary thread under arbitrary
// locks. See #18211.
log::warn!("Executor was not shut down explicitly.");
}
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/src/externs/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl PyStubCASBuilder {
Ok(PyStubCASBuilder(self.0.clone()))
}

fn build(&mut self, py_executor: PyExecutor) -> PyResult<PyStubCAS> {
fn build(&mut self, py_executor: &PyExecutor) -> PyResult<PyStubCAS> {
let mut builder_opt = self.0.lock();
let builder = builder_opt
.take()
Expand Down
6 changes: 6 additions & 0 deletions src/rust/engine/task_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ impl Executor {
log::warn!("Executor shutdown took unexpectedly long: tasks were likely leaked!");
}
}

/// Returns true if `shutdown` has been called for this Executor. Always returns true for
/// borrowed Executors.
pub fn is_shutdown(&self) -> bool {
self.runtime.lock().is_none()
}
}

/// Store "tail" tasks which are async tasks that can execute concurrently with regular
Expand Down