Skip to content

Commit

Permalink
[internal] Port PyNailgunClient to PyO3 (#12181)
Browse files Browse the repository at this point in the history
We no longer use the `with_executor` mechanism from #11722, as we can directly store an `Executor` on the `PyNailgunClient` cheaply. This allows us to use `Python.allow_threads()` for better parallelism.

It's awkward that our Python code now has `PyExecutor` from Rust-Cpython and PyO3, and that those aren't compatible. But this is less offensive than the risk of one big change rather than an incremental migration.

We can't yet migrate `PyNailgunSession` because `PyCancellationLatch` is used by `PySession`.

--

This also improves our file organization. We use a new pattern of having a `register()` function in each `interface/` file to register that file's functions and classes on the single native extension module, which makes `interface.rs` less bloated.
  • Loading branch information
Eric-Arellano authored Jun 9, 2021
1 parent 3adc0e0 commit 0429258
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 92 deletions.
9 changes: 3 additions & 6 deletions src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from typing import List, Mapping

from pants.base.exiter import ExitCode
from pants.engine.internals import native_engine
from pants.engine.internals.native_engine import PantsdConnectionException
from pants.engine.internals.native_engine_pyo3 import PantsdConnectionException, PyNailgunClient
from pants.nailgun.nailgun_protocol import NailgunProtocol
from pants.option.global_options import GlobalOptions
from pants.option.options_bootstrapper import OptionsBootstrapper
Expand Down Expand Up @@ -101,7 +100,7 @@ def run(self) -> ExitCode:

def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitCode:
global_options = self._bootstrap_options.for_global_scope()
executor = GlobalOptions.create_py_executor(global_options)
executor = GlobalOptions.create_py_executor_pyo3(global_options)

# Merge the nailgun TTY capability environment variables with the passed environment dict.
ng_env = NailgunProtocol.ttynames_to_env(sys.stdin, sys.stdout, sys.stderr)
Expand Down Expand Up @@ -129,9 +128,7 @@ def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitC
# We ignore keyboard interrupts because the nailgun client will handle them.
with STTYSettings.preserved(), interrupts_ignored():
try:
return native_engine.nailgun_client_create(executor, port).execute(
command, args, modified_env
)
return PyNailgunClient(port, executor).execute(command, args, modified_env)
# Retry if we failed to connect to Pantsd.
except PantsdConnectionException as e:
if attempt > retries:
Expand Down
10 changes: 0 additions & 10 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ def execution_add_root_select(
param_vals: Sequence,
product: type,
) -> None: ...
def nailgun_client_create(executor: PyExecutor, port: int) -> PyNailgunClient: ...
def nailgun_server_await_shutdown(server: PyNailgunServer) -> None: ...
def nailgun_server_create(
executor: PyExecutor, port: int, runner: RawFdRunner
Expand Down Expand Up @@ -179,9 +178,6 @@ class PyGeneratorResponseGetMulti:
class PyNailgunServer:
pass

class PyNailgunClient:
def execute(self, command: str, args: list[str], env: dict[str, str]) -> int: ...

class PyRemotingOptions:
def __init__(self, **kwargs: Any) -> None: ...

Expand Down Expand Up @@ -215,11 +211,5 @@ class PyTypes:
class PyStdioDestination:
pass

class PantsdConnectionException(Exception):
pass

class PantsdClientException(Exception):
pass

class PollTimeout(Exception):
pass
12 changes: 12 additions & 0 deletions src/python/pants/engine/internals/native_engine_pyo3.pyi
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import annotations

# TODO: black and flake8 disagree about the content of this file:
# see https://github.com/psf/black/issues/1548
# flake8: noqa: E302

class PyNailgunClient:
def __init__(self, port: int, executor: PyExecutor) -> None: ...
def execute(self, command: str, args: list[str], env: dict[str, str]) -> int: ...

class PantsdConnectionException(Exception):
pass

class PantsdClientException(Exception):
pass

class PyExecutor:
def __init__(self, core_threads: int, max_threads: int) -> None: ...

Expand Down
12 changes: 12 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from pants.engine.environment import CompleteEnvironment
from pants.engine.internals.native_engine import PyExecutor
from pants.engine.internals.native_engine_pyo3 import PyExecutor as PyExecutorPyO3
from pants.option.custom_types import dir_option, memory_size
from pants.option.errors import OptionsError
from pants.option.option_value_container import OptionValueContainer
Expand Down Expand Up @@ -1422,6 +1423,17 @@ def create_py_executor(bootstrap_options: OptionValueContainer) -> PyExecutor:
core_threads=bootstrap_options.rule_threads_core, max_threads=rule_threads_max
)

@staticmethod
def create_py_executor_pyo3(bootstrap_options: OptionValueContainer) -> PyExecutorPyO3:
rule_threads_max = (
bootstrap_options.rule_threads_max
if bootstrap_options.rule_threads_max
else 4 * bootstrap_options.rule_threads_core
)
return PyExecutorPyO3(
core_threads=bootstrap_options.rule_threads_core, max_threads=rule_threads_max
)

@staticmethod
def compute_pants_ignore(buildroot, global_options):
"""Computes the merged value of the `--pants-ignore` flag.
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/engine_pyo3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ default = []

[dependencies]
hashing = { path = "../hashing" }
nailgun = { path = "../nailgun" }
parking_lot = "0.11"
# We must disable the `auto-initialize` feature because we do not enable `extension-module` normally
# (see above comment in `features`), so `auto-initialize` would try to link to a static Python interpreter during
Expand Down
9 changes: 5 additions & 4 deletions src/rust/engine/engine_pyo3/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
use pyo3::exceptions::PyException;
use pyo3::prelude::*;

mod nailgun;
mod testutil;

#[pymodule]
fn native_engine_pyo3(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyExecutor>()?;
fn native_engine_pyo3(py: Python, m: &PyModule) -> PyResult<()> {
self::nailgun::register(py, m)?;
self::testutil::register(m)?;

m.add_class::<self::testutil::PyStubCAS>()?;
m.add_class::<self::testutil::PyStubCASBuilder>()?;
m.add_class::<PyExecutor>()?;

Ok(())
}
Expand Down
64 changes: 64 additions & 0 deletions src/rust/engine/engine_pyo3/src/externs/interface/nailgun.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use super::PyExecutor;
use pyo3::create_exception;
use pyo3::exceptions::{PyBrokenPipeError, PyException, PyKeyboardInterrupt};
use pyo3::prelude::*;
use pyo3::types::PyDict;
use task_executor::Executor;

pub(crate) fn register(py: Python, m: &PyModule) -> PyResult<()> {
m.add(
"PantsdConnectionException",
py.get_type::<PantsdConnectionException>(),
)?;
m.add(
"PantsdClientException",
py.get_type::<PantsdClientException>(),
)?;
m.add_class::<PyNailgunClient>()?;
Ok(())
}

create_exception!(native_engine_pyo3, PantsdConnectionException, PyException);
create_exception!(native_engine_pyo3, PantsdClientException, PyException);

#[pyclass]
struct PyNailgunClient {
port: u16,
executor: Executor,
}

#[pymethods]
impl PyNailgunClient {
#[new]
fn __new__(port: u16, py_executor: PyExecutor) -> Self {
Self {
port,
executor: py_executor.executor,
}
}

fn execute(&self, command: String, args: Vec<String>, env: &PyDict, py: Python) -> PyResult<i32> {
use nailgun::NailgunClientError;

let env_list: Vec<(String, String)> = env
.items()
.into_iter()
.map(|kv_pair| kv_pair.extract::<(String, String)>())
.collect::<Result<Vec<_>, _>>()?;

py.allow_threads(|| {
self
.executor
.block_on(nailgun::client_execute(self.port, command, args, env_list))
.map_err(|e| match e {
NailgunClientError::PreConnect(err_str) => PantsdConnectionException::new_err(err_str),
NailgunClientError::PostConnect(err_str) => PantsdClientException::new_err(err_str),
NailgunClientError::BrokenPipe => PyBrokenPipeError::new_err(""),
NailgunClientError::KeyboardInterrupt => PyKeyboardInterrupt::new_err(""),
})
})
}
}
10 changes: 8 additions & 2 deletions src/rust/engine/engine_pyo3/src/externs/interface/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ use pyo3::prelude::*;
use pyo3::types::PyType;
use testutil_mock::{StubCAS, StubCASBuilder};

pub(crate) fn register(m: &PyModule) -> PyResult<()> {
m.add_class::<PyStubCAS>()?;
m.add_class::<PyStubCASBuilder>()?;
Ok(())
}

#[pyclass]
pub struct PyStubCASBuilder {
struct PyStubCASBuilder {
builder: Arc<Mutex<Option<StubCASBuilder>>>,
}

Expand Down Expand Up @@ -43,7 +49,7 @@ impl PyStubCASBuilder {
}

#[pyclass]
pub struct PyStubCAS {
struct PyStubCAS {
stub_cas: StubCAS,
}

Expand Down
72 changes: 2 additions & 70 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ use std::time::Duration;
use async_latch::AsyncLatch;
use cpython::{
exc, py_class, py_exception, py_fn, py_module_initializer, NoArgs, PyBytes, PyClone, PyDict,
PyErr, PyInt, PyList, PyObject, PyResult as CPyResult, PyString, PyTuple, PyType, Python,
PythonObject, ToPyObject,
PyErr, PyList, PyObject, PyResult as CPyResult, PyString, PyTuple, PyType, Python, PythonObject,
ToPyObject,
};
use futures::future::FutureExt;
use futures::future::{self, TryFutureExt};
Expand All @@ -82,24 +82,11 @@ use crate::{
};

py_exception!(native_engine, PollTimeout);
py_exception!(native_engine, PantsdConnectionException);
py_exception!(native_engine, PantsdClientException);

py_module_initializer!(native_engine, |py, m| {
m.add(py, "PollTimeout", py.get_type::<PollTimeout>())
.unwrap();

m.add(
py,
"PantsdClientException",
py.get_type::<PantsdClientException>(),
)?;
m.add(
py,
"PantsdConnectionException",
py.get_type::<PantsdConnectionException>(),
)?;

m.add(py, "default_cache_path", py_fn!(py, default_cache_path()))?;

m.add(
Expand Down Expand Up @@ -214,12 +201,6 @@ py_module_initializer!(native_engine, |py, m| {
py_fn!(py, graph_visualize(a: PyScheduler, b: PySession, d: String)),
)?;

m.add(
py,
"nailgun_client_create",
py_fn!(py, nailgun_client_create(a: PyExecutor, b: u16)),
)?;

m.add(
py,
"nailgun_server_create",
Expand Down Expand Up @@ -418,7 +399,6 @@ py_module_initializer!(native_engine, |py, m| {
m.add_class::<PyExecutionStrategyOptions>(py)?;
m.add_class::<PyExecutor>(py)?;
m.add_class::<PyNailgunServer>(py)?;
m.add_class::<PyNailgunClient>(py)?;
m.add_class::<PyRemotingOptions>(py)?;
m.add_class::<PyLocalStoreOptions>(py)?;
m.add_class::<PyResult>(py)?;
Expand Down Expand Up @@ -668,46 +648,6 @@ py_class!(class PyNailgunServer |py| {
}
});

py_class!(class PyNailgunClient |py| {
data executor: PyExecutor;
data port: u16;

def execute(&self, command: String, args: Vec<String>, env: PyDict) -> CPyResult<PyInt> {
use nailgun::NailgunClientError;

let env_list: Vec<(String, String)> = env
.items(py)
.into_iter()
.map(|(k, v): (PyObject, PyObject)| -> Result<(String, String), PyErr> {
let k: String = k.extract::<String>(py)?;
let v: String = v.extract::<String>(py)?;
Ok((k, v))
})
.collect::<Result<Vec<_>, _>>()?;

let port = *self.port(py);
let executor_ptr = self.executor(py);

with_executor(py, executor_ptr, |executor| {
executor.block_on(nailgun::client_execute(
port,
command,
args,
env_list,
)).map(|code| code.to_py_object(py)).map_err(|e| match e{
NailgunClientError::PreConnect(err_str) => PyErr::new::<PantsdConnectionException, _>(py, (err_str,)),
NailgunClientError::PostConnect(err_str) => PyErr::new::<PantsdClientException, _>(py, (err_str,)),
NailgunClientError::BrokenPipe => {
PyErr::new::<exc::BrokenPipeError, _>(py, NoArgs)
}
NailgunClientError::KeyboardInterrupt => {
PyErr::new::<exc::KeyboardInterrupt, _>(py, NoArgs)
}
})
})
}
});

py_class!(class PyExecutionRequest |py| {
data execution_request: RefCell<ExecutionRequest>;
def __new__(
Expand Down Expand Up @@ -795,14 +735,6 @@ fn py_result_from_root(py: Python, result: Result<Value, Failure>) -> CPyResult<
// TODO: It's not clear how to return "nothing" (None) in a CPyResult, so this is a placeholder.
type PyUnitResult = CPyResult<Option<bool>>;

fn nailgun_client_create(
py: Python,
executor_ptr: PyExecutor,
port: u16,
) -> CPyResult<PyNailgunClient> {
PyNailgunClient::create_instance(py, executor_ptr, port)
}

fn nailgun_server_create(
py: Python,
executor_ptr: PyExecutor,
Expand Down

0 comments on commit 0429258

Please sign in to comment.