Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decoupled Async Execute #350

Merged
merged 14 commits into from
Apr 11, 2024
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ any C++ code.
- [Request Cancellation Handling](#request-cancellation-handling)
- [Decoupled mode](#decoupled-mode)
- [Use Cases](#use-cases)
- [Known Issues](#known-issues)
- [Async Execute](#async-execute)
- [Request Rescheduling](#request-rescheduling)
- [`finalize`](#finalize)
- [Model Config File](#model-config-file)
Expand Down Expand Up @@ -620,9 +620,24 @@ full power of what can be achieved from decoupled API. Read
[Decoupled Backends and Models](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/decoupled_models.md)
for more details on how to host a decoupled model.

##### Known Issues
##### Async Execute

* Currently, decoupled Python models can not make async infer requests.
Starting from 24.04, `async def execute(self, requests):` is supported for
decoupled Python models. Its coroutine will be executed by an AsyncIO event loop
shared with requests executing in the same model instance. The next request for
the model instance can start executing while the current request is waiting.

This is useful for minimizing the number of model instances for models that
spend the majority of its time waiting, given requests can be executed
concurrently by AsyncIO. To take full advantage of the concurrency, it is vital
for the async execute function to not block the event loop from making progress
while it is waiting, i.e. downloading over the network.

Notes:
* The model should not modify the running event loop, as this might cause
unexpected issues.
* The server/backend do not control how many requests are added to the event
loop by a model instance.

#### Request Rescheduling

Expand Down
90 changes: 80 additions & 10 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,32 @@ PyDefaultArgumentToMutableType(const py::object& argument)
std::string(py::str(argument.get_type())));
}

void
AsyncEventFutureDoneCallback(const py::object& py_future)
{
// TODO: Why using `py_future.result()` with error hangs on exit?
try {
py::object exception = py_future.attr("exception")();
if (!py::isinstance<py::none>(exception)) {
std::string err_msg = "";
py::object traceback = py::module_::import("traceback")
.attr("TracebackException")
.attr("from_exception")(exception)
.attr("format")();
for (py::handle line : traceback) {
err_msg += py::str(line);
}
LOG_ERROR << err_msg;
}
}
catch (const PythonBackendException& pb_exception) {
LOG_ERROR << pb_exception.what();
}
catch (const py::error_already_set& error) {
LOG_ERROR << error.what();
}
}

void
Stub::Instantiate(
int64_t shm_growth_size, int64_t shm_default_size,
Expand Down Expand Up @@ -533,6 +559,8 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
c_python_backend_utils.attr("InferenceResponse"));
c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());

async_event_loop_ = py::none();

py::object TritonPythonModel = sys.attr("TritonPythonModel");
deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor");
serialize_bytes_ = python_backend_utils.attr("serialize_byte_tensor");
Expand Down Expand Up @@ -690,11 +718,18 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)

py::object execute_return =
model_instance_.attr("execute")(py_request_list);
if (!py::isinstance<py::none>(execute_return)) {
throw PythonBackendException(
"Python model '" + name_ +
"' is using the decoupled mode and the execute function must "
"return None.");
bool is_coroutine = py::module::import("asyncio")
.attr("iscoroutine")(execute_return)
.cast<bool>();
if (is_coroutine) {
RunCoroutine(execute_return);
} else {
if (!py::isinstance<py::none>(execute_return)) {
throw PythonBackendException(
"Python model '" + name_ +
"' is using the decoupled mode and the execute function must "
"return None.");
}
}
}
}
Expand Down Expand Up @@ -870,6 +905,35 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
}
}

py::object
Stub::GetAsyncEventLoop()
{
if (py::isinstance<py::none>(async_event_loop_)) {
// Create the event loop if not already.
py::module asyncio = py::module_::import("asyncio");
async_event_loop_ = asyncio.attr("new_event_loop")();
asyncio.attr("set_event_loop")(async_event_loop_);
py::object py_thread =
py::module_::import("threading")
.attr("Thread")(
"target"_a = async_event_loop_.attr("run_forever"),
"daemon"_a = true);
py_thread.attr("start")();
}
return async_event_loop_;
}

void
Stub::RunCoroutine(py::object coroutine)
{
py::object loop = GetAsyncEventLoop();
py::object py_future = py::module_::import("asyncio").attr(
"run_coroutine_threadsafe")(coroutine, loop);
py_future.attr("add_done_callback")(
py::module_::import("c_python_backend_utils")
.attr("async_event_future_done_callback"));
}

void
Stub::UpdateHealth()
{
Expand All @@ -881,6 +945,10 @@ void
Stub::Finalize()
{
finalizing_ = true;
// Stop async event loop if created.
if (!py::isinstance<py::none>(async_event_loop_)) {
async_event_loop_.attr("stop")();
}
Tabrizian marked this conversation as resolved.
Show resolved Hide resolved
// Call finalize if exists.
if (initialized_ && py::hasattr(model_instance_, "finalize")) {
try {
Expand Down Expand Up @@ -943,6 +1011,7 @@ Stub::~Stub()

{
py::gil_scoped_acquire acquire;
async_event_loop_ = py::none();
model_instance_ = py::none();
}
stub_instance_.reset();
Expand Down Expand Up @@ -1729,11 +1798,6 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
[](std::shared_ptr<InferRequest>& infer_request,
const bool decoupled) {
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
if (stub->IsDecoupled()) {
throw PythonBackendException(
"Async BLS request execution is not support in the decoupled "
"API.");
}
py::object loop =
py::module_::import("asyncio").attr("get_running_loop")();
py::cpp_function callback = [&stub, infer_request, decoupled]() {
Expand Down Expand Up @@ -1860,6 +1924,12 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
"is_model_ready", &IsModelReady, py::arg("model_name").none(false),
py::arg("model_version").none(false) = "");

// This function is not part of the public API for Python backend. This is
// only used for internal callbacks.
module.def(
"async_event_future_done_callback", &AsyncEventFutureDoneCallback,
py::arg("py_future").none(false));

// This class is not part of the public API for Python backend. This is only
// used for internal testing purposes.
py::class_<SharedMemoryManager>(module, "SharedMemory")
Expand Down
7 changes: 6 additions & 1 deletion src/pb_stub.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -255,6 +255,10 @@ class Stub {

void ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr);

py::object GetAsyncEventLoop();

void RunCoroutine(py::object coroutine);

/// Get the memory manager message queue
std::unique_ptr<MessageQueue<uint64_t>>& MemoryManagerQueue();

Expand Down Expand Up @@ -363,6 +367,7 @@ class Stub {
py::object model_instance_;
py::object deserialize_bytes_;
py::object serialize_bytes_;
py::object async_event_loop_;
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
stub_message_queue_;
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
Expand Down
2 changes: 2 additions & 0 deletions src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ ModelInstanceState::ExecuteBLSRequest(
if (is_decoupled && (infer_response->Id() != nullptr)) {
// Need to manage the lifetime of InferPayload object for bls
// decoupled responses.
std::lock_guard<std::mutex> lock(infer_payload_mu_);
infer_payload_[reinterpret_cast<intptr_t>(infer_payload.get())] =
infer_payload;
}
Expand Down Expand Up @@ -961,6 +962,7 @@ ModelInstanceState::ProcessCleanupRequest(
intptr_t id = reinterpret_cast<intptr_t>(cleanup_message_ptr->id);
if (message->Command() == PYTHONSTUB_BLSDecoupledInferPayloadCleanup) {
// Remove the InferPayload object from the map.
std::lock_guard<std::mutex> lock(infer_payload_mu_);
infer_payload_.erase(id);
} else if (message->Command() == PYTHONSTUB_DecoupledResponseFactoryCleanup) {
// Delete response factory
Expand Down
1 change: 1 addition & 0 deletions src/python_be.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ class ModelInstanceState : public BackendModelInstance {
std::vector<std::future<void>> futures_;
std::unique_ptr<boost::asio::thread_pool> thread_pool_;
std::unordered_map<intptr_t, std::shared_ptr<InferPayload>> infer_payload_;
std::mutex infer_payload_mu_;
std::unique_ptr<RequestExecutor> request_executor_;

public:
Expand Down
Loading