Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [precaution fix] Capture Python futures while running in the background #365

Merged
merged 3 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 40 additions & 21 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,8 @@ PyDefaultArgumentToMutableType(const py::object& argument)
void
AsyncEventFutureDoneCallback(const py::object& py_future)
{
// TODO: Why using `py_future.result()` with error hangs on exit?
try {
py::object exception = py_future.attr("exception")();
if (!py::isinstance<py::none>(exception)) {
std::string err_msg = "";
py::object traceback = py::module_::import("traceback")
.attr("TracebackException")
.attr("from_exception")(exception)
.attr("format")();
for (py::handle line : traceback) {
err_msg += py::str(line);
}
LOG_ERROR << err_msg;
}
}
catch (const PythonBackendException& pb_exception) {
LOG_ERROR << pb_exception.what();
}
catch (const py::error_already_set& error) {
LOG_ERROR << error.what();
}
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
stub->BackgroundFutureDone(py_future);
}

void
Expand Down Expand Up @@ -556,6 +537,7 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());

async_event_loop_ = py::none();
background_futures_ = py::set();

py::object TritonPythonModel = sys.attr("TritonPythonModel");
deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor");
Expand Down Expand Up @@ -838,11 +820,47 @@ Stub::RunCoroutine(py::object coroutine, bool in_background)
py_future.attr("add_done_callback")(
py::module_::import("c_python_backend_utils")
.attr("async_event_future_done_callback"));
background_futures_.attr("add")(py_future);
return py::none();
}
return py_future.attr("result")();
}

void
Stub::BackgroundFutureDone(const py::object& py_future)
{
ScopedDefer _([this, &py_future] {
// Remove future from background
try {
background_futures_.attr("remove")(py_future);
}
catch (const py::error_already_set& error) {
LOG_ERROR << "Cannot remove future from background; " << error.what();
}
});
// TODO: Why using `py_future.result()` with error hangs on exit?
try {
py::object exception = py_future.attr("exception")();
if (!py::isinstance<py::none>(exception)) {
std::string err_msg = "";
py::object traceback = py::module_::import("traceback")
.attr("TracebackException")
.attr("from_exception")(exception)
.attr("format")();
for (py::handle line : traceback) {
err_msg += py::str(line);
}
LOG_ERROR << err_msg;
}
}
catch (const PythonBackendException& pb_exception) {
LOG_ERROR << pb_exception.what();
}
catch (const py::error_already_set& error) {
LOG_ERROR << error.what();
}
}

void
Stub::UpdateHealth()
{
Expand Down Expand Up @@ -923,6 +941,7 @@ Stub::~Stub()
{
py::gil_scoped_acquire acquire;
async_event_loop_ = py::none();
background_futures_ = py::none();
model_instance_ = py::none();
}
stub_instance_.reset();
Expand Down
3 changes: 3 additions & 0 deletions src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ class Stub {

py::object RunCoroutine(py::object coroutine, bool in_background);

void BackgroundFutureDone(const py::object& py_future);

/// Get the memory manager message queue
std::unique_ptr<MessageQueue<uint64_t>>& MemoryManagerQueue();

Expand Down Expand Up @@ -367,6 +369,7 @@ class Stub {
py::object deserialize_bytes_;
py::object serialize_bytes_;
py::object async_event_loop_;
py::object background_futures_;
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
stub_message_queue_;
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
Expand Down
Loading