Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add response sender to non-decoupled models and unify data pipelines #360

Merged
merged 2 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,6 @@ InferRequest::IsCancelled()
std::shared_ptr<ResponseSender>
InferRequest::GetResponseSender()
{
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
if (!stub->IsDecoupled()) {
throw PythonBackendException(
"'get_response_sender' function must be called only when the model is "
"using the decoupled transaction policy.");
}

return response_sender_;
}

Expand Down
240 changes: 70 additions & 170 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,7 @@ Stub::RunCommand()
shm_pool_->Load<char>(ipc_message->Args());
RequestBatch* request_batch_shm_ptr =
reinterpret_cast<RequestBatch*>(request_batch.data_.get());
if (!ipc_control_->decoupled) {
ProcessRequests(request_batch_shm_ptr);
} else {
ProcessRequestsDecoupled(request_batch_shm_ptr);
}
ProcessRequests(request_batch_shm_ptr);

} break;
case PYTHONSTUB_CommandType::PYTHONSTUB_FinalizeRequest:
Expand Down Expand Up @@ -597,18 +593,6 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
initialized_ = true;
}

void
Stub::ProcessResponse(InferResponse* response)
{
response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */);

for (auto& output_tensor : response->OutputTensors()) {
if (!output_tensor->IsCPU()) {
gpu_tensors_.push_back(output_tensor);
}
}
}

void
Stub::LoadGPUBuffers(std::unique_ptr<IPCMessage>& ipc_message)
{
Expand Down Expand Up @@ -682,7 +666,7 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr)
}

void
Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)
Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
Expand Down Expand Up @@ -718,18 +702,21 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)

py::object execute_return =
model_instance_.attr("execute")(py_request_list);

bool is_coroutine = py::module::import("asyncio")
.attr("iscoroutine")(execute_return)
.cast<bool>();
if (is_coroutine) {
RunCoroutine(execute_return);
} else {
if (!py::isinstance<py::none>(execute_return)) {
throw PythonBackendException(
"Python model '" + name_ +
"' is using the decoupled mode and the execute function must "
"return None.");
if (IsDecoupled()) {
// Do not wait for async decoupled execute to return.
RunCoroutine(execute_return, true /* in_background */);
} else {
py::object coroutine_return =
RunCoroutine(execute_return, false /* in_background */);
ProcessReturnedResponses(py_request_list, coroutine_return);
}
} else {
ProcessReturnedResponses(py_request_list, execute_return);
}
}
}
Expand Down Expand Up @@ -757,151 +744,60 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)
}

void
Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
Stub::ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj)
{
std::unique_ptr<IPCMessage> execute_response =
IPCMessage::Create(shm_pool_, false /* Inline response */);
execute_response->Command() = PYTHONSTUB_ExecuteResponse;

AllocatedSharedMemory<char> response_batch = shm_pool_->Construct<char>(
request_batch_shm_ptr->batch_size *
sizeof(bi::managed_external_buffer::handle_t) +
sizeof(ResponseBatch));
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());

std::unique_ptr<PbString> error_string_shm;
py::list inference_responses;

bi::managed_external_buffer::handle_t* responses_shm_handle =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.data_.get() + sizeof(ResponseBatch));

py::list responses;

// Notifying the stub should be after responses.
ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); });
ScopedDefer _(
[this, &execute_response] { SendIPCMessage(execute_response); });

execute_response->Args() = response_batch.handle_;

bool has_exception = false;
std::string error_string;
try {
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;

uint32_t batch_size = request_batch_shm_ptr->batch_size;

if (batch_size == 0) {
return;
}

py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);

if (!py::hasattr(model_instance_, "execute")) {
std::string message = "Python model " + model_context_.PythonModelPath() +
" does not implement `execute` method.";
throw PythonBackendException(message);
}

py::object request_list = py_request_list;
py::module asyncio = py::module::import("asyncio");

// Execute Response
py::object execute_return;
py::object responses_obj;
bool is_coroutine;

{
NVTX_RANGE(nvtx_, "PyExecute " + name_);
execute_return = model_instance_.attr("execute")(request_list);
is_coroutine = asyncio.attr("iscoroutine")(execute_return).cast<bool>();
}

if (is_coroutine) {
responses_obj = asyncio.attr("run")(execute_return);
} else {
responses_obj = execute_return;
}

// Check the return type of execute function.
if (!py::isinstance<py::list>(responses_obj)) {
std::string str = py::str(execute_return.get_type());
throw PythonBackendException(
std::string("Expected a list in the execute return, found type '") +
str + "'.");
}

responses = responses_obj;
size_t response_size = py::len(responses);

// If the number of request objects do not match the number of
// response objects throw an error.
if (response_size != batch_size) {
std::string err =
"Number of InferenceResponse objects do not match the number "
"of "
"InferenceRequest objects. InferenceRequest(s) size is:" +
std::to_string(batch_size) + ", and InferenceResponse(s) size is:" +
std::to_string(response_size) + "\n";
throw PythonBackendException(err);
}

for (size_t i = 0; i < response_size; i++) {
// Check the return type of execute function.
InferRequest* infer_request = py_request_list[i].cast<InferRequest*>();
if (infer_request->ReleaseFlags() ==
TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) {
if (!py::isinstance<py::none>(responses[i])) {
// When the request is rescheduled in non-decoupled model, the
// response must be None.
std::string str = py::str(responses[i].get_type());
throw PythonBackendException(
"Expected a None object in the execute function return list for "
"reschduled request, "
"found type '" +
str + "'.");
}
} else {
if (!py::isinstance<InferResponse>(responses[i])) {
std::string str = py::str(responses[i].get_type());
throw PythonBackendException(
std::string(
"Expected an 'InferenceResponse' object in the execute "
"function return list, found type '") +
str + "'.");
}
InferResponse* infer_response = responses[i].cast<InferResponse*>();
infer_response->PruneOutputTensors(
infer_request->RequestedOutputNames());
ProcessResponse(infer_response);
responses_shm_handle[i] = infer_response->ShmHandle();
}
}
response_batch_shm_ptr->batch_size = response_size;
// Return if there is nothing to process.
if (py::isinstance<py::none>(py_responses_obj)) {
return;
}
catch (const PythonBackendException& pb_exception) {
has_exception = true;
error_string = pb_exception.what();
// Only non-decoupled may return responses.
if (IsDecoupled()) {
throw PythonBackendException(
"Python model '" + name_ +
"' is using the decoupled mode and the execute function must return "
"None.");
}
catch (const py::error_already_set& error) {
has_exception = true;
error_string = error.what();
// Check responses is a list.
if (!py::isinstance<py::list>(py_responses_obj)) {
throw PythonBackendException(
"Expected a list in the execute return, found type '" +
std::string(py::str(py_responses_obj.get_type())) + "'.");
}
py::list py_responses = py_responses_obj;
// Responses and requests length must match.
size_t requests_size = py::len(py_requests);
size_t responses_size = py::len(py_responses);
if (requests_size != responses_size) {
throw PythonBackendException(
"Number of InferenceResponse objects do not match the number of "
"InferenceRequest objects. InferenceRequest(s) size is:" +
std::to_string(requests_size) + ", and InferenceResponse(s) size is:" +
std::to_string(responses_size) + "\n");
}

if (has_exception) {
std::string err_message =
std::string(
"Failed to process the request(s) for model '" + name_ +
"', message: ") +
error_string;
error_string_shm = PbString::Create(shm_pool_, error_string);
response_batch_shm_ptr->has_error = true;
response_batch_shm_ptr->is_error_set = true;
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
for (size_t i = 0; i < responses_size; i++) {
if (!py::isinstance<py::none>(py_responses[i])) {
InferRequest* request = py_requests[i].cast<InferRequest*>();
// Response must be None if rescheduled.
if (request->ReleaseFlags() == TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) {
throw PythonBackendException(
"Expected a None object in the execute function return list for "
"reschduled request, found type '" +
std::string(py::str(py_responses[i].get_type())) + "'.");
}
// Send the response.
if (!py::isinstance<InferResponse>(py_responses[i])) {
throw PythonBackendException(
"Expected an 'InferenceResponse' object in the execute function "
"return list, found type '" +
std::string(py::str(py_responses[i].get_type())) + "'.");
}
std::shared_ptr<InferResponse> response =
py_responses[i].cast<std::shared_ptr<InferResponse>>();
request->GetResponseSender()->Send(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
}
}
}

Expand All @@ -923,15 +819,19 @@ Stub::GetAsyncEventLoop()
return async_event_loop_;
}

void
Stub::RunCoroutine(py::object coroutine)
py::object
Stub::RunCoroutine(py::object coroutine, bool in_background)
{
py::object loop = GetAsyncEventLoop();
py::object py_future = py::module_::import("asyncio").attr(
"run_coroutine_threadsafe")(coroutine, loop);
py_future.attr("add_done_callback")(
py::module_::import("c_python_backend_utils")
.attr("async_event_future_done_callback"));
if (in_background) {
py_future.attr("add_done_callback")(
py::module_::import("c_python_backend_utils")
.attr("async_event_future_done_callback"));
return py::none();
}
return py_future.attr("result")();
}

void
Expand Down
7 changes: 3 additions & 4 deletions src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,19 @@ class Stub {
/// Execute a batch of requests.
void ProcessRequests(RequestBatch* request_batch_shm_ptr);

void ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr);
void ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj);

py::object GetAsyncEventLoop();

void RunCoroutine(py::object coroutine);
py::object RunCoroutine(py::object coroutine, bool in_background);

/// Get the memory manager message queue
std::unique_ptr<MessageQueue<uint64_t>>& MemoryManagerQueue();

/// Get the shared memory pool
std::unique_ptr<SharedMemoryManager>& ShmPool() { return shm_pool_; }

void ProcessResponse(InferResponse* response);

void ProcessBLSResponseDecoupled(std::unique_ptr<IPCMessage>& ipc_message);

void LoadGPUBuffers(std::unique_ptr<IPCMessage>& ipc_message);
Expand Down
Loading
Loading