From 67ca860e72ba2547cdfad324b46c579eee5d3200 Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Fri, 6 Oct 2023 09:51:04 -0700 Subject: [PATCH] Add Python backend request cancellation (#304) * Add cancelled response status * Add request cancellation * Check cancellation on response factory if available * Remove unnecessary wrapping * Throw error instead of log error * Add is cancelled check at response sender * Enable more reuse on request cancellation and improve model interface * Documentation wording updates * Copyright year update * Rollback response sender auto close on cancel * Rollback non-decoupled any response on cancel * Decoupled final flag docs update --- CMakeLists.txt | 2 + README.md | 36 +++++++++++++++++ src/infer_request.cc | 14 ++++++- src/infer_request.h | 3 ++ src/ipc_message.h | 3 +- src/pb_cancel.cc | 90 ++++++++++++++++++++++++++++++++++++++++++ src/pb_cancel.h | 64 ++++++++++++++++++++++++++++++ src/pb_stub.cc | 51 +++++++++++++++++++++++- src/pb_stub.h | 7 ++++ src/pb_utils.h | 6 +++ src/python_be.cc | 38 ++++++++++++++++++ src/python_be.h | 3 ++ src/response_sender.cc | 14 +++++-- src/response_sender.h | 8 +++- 14 files changed, 329 insertions(+), 10 deletions(-) create mode 100644 src/pb_cancel.cc create mode 100644 src/pb_cancel.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 93a7ae60..3f20bbc3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -208,6 +208,8 @@ set( src/pb_stub.cc src/pb_response_iterator.h src/pb_response_iterator.cc + src/pb_cancel.cc + src/pb_cancel.h ) list(APPEND diff --git a/README.md b/README.md index 517a9b64..4cb9a960 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ any C++ code. - [`execute`](#execute) - [Default Mode](#default-mode) - [Error Handling](#error-handling) + - [Request Cancellation Handling](#request-cancellation-handling) - [Decoupled mode](#decoupled-mode) - [Use Cases](#use-cases) - [Known Issues](#known-issues) @@ -502,6 +503,36 @@ Supported error codes: * `pb_utils.TritonError.UNAVAILABLE` * `pb_utils.TritonError.UNSUPPORTED` * `pb_utils.TritonError.ALREADY_EXISTS` +* `pb_utils.TritonError.CANCELLED` (since 23.10) + +#### Request Cancellation Handling + +One or more requests may be cancelled by the client during execution. Starting +from 23.10, `request.is_cancelled()` returns whether the request is cancelled or +not. For example: + +```python +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + ... + + def execute(self, requests): + responses = [] + + for request in requests: + if request.is_cancelled(): + responses.append(pb_utils.InferenceResponse( + error=pb_utils.TritonError("Message", pb_utils.TritonError.CANCELLED))) + else: + ... + + return responses +``` + +Although checking for request cancellation is optional, it is recommended to +check for cancellation at strategic request execution stages that can early +terminate the execution in the event of its response is no longer needed. #### Decoupled mode @@ -543,6 +574,11 @@ request. After setting errors for an pb_utils.InferenceResponse object, use InferenceResponseSender.send() to send response with the error back to the user. +Starting from 23.10, request cancellation can be checked directly on the +`InferenceResponseSender` object using `response_sender.is_cancelled()`. Sending +the TRITONSERVER_RESPONSE_COMPLETE_FINAL flag at the end of response is still +needed even the request is cancelled. + ##### Use Cases The decoupled mode is powerful and supports various other use cases: diff --git a/src/infer_request.cc b/src/infer_request.cc index 5fdae669..e9d243f1 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -71,9 +71,11 @@ InferRequest::InferRequest( inputs_ = inputs; requested_output_names_ = requested_output_names; #ifdef TRITON_PB_STUB + pb_cancel_ = + std::make_shared(response_factory_address_, request_address_); response_sender_ = std::make_shared( request_address_, response_factory_address_, - Stub::GetOrCreateInstance()->SharedMemory()); + Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_); #endif } @@ -379,9 +381,11 @@ InferRequest::InferRequest( trace_ = infer_request_shm_ptr_->trace; #ifdef TRITON_PB_STUB + pb_cancel_ = + std::make_shared(response_factory_address_, request_address_); response_sender_ = std::make_shared( request_address_, response_factory_address_, - Stub::GetOrCreateInstance()->SharedMemory()); + Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_); #endif } @@ -400,6 +404,12 @@ InferRequest::DeleteResponseFactory() #endif #ifdef TRITON_PB_STUB +bool +InferRequest::IsCancelled() +{ + return pb_cancel_->IsCancelled(); +} + std::shared_ptr InferRequest::GetResponseSender() { diff --git a/src/infer_request.h b/src/infer_request.h index 6652b2fb..bc6a2acf 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -34,6 +34,7 @@ #include "pb_tensor.h" #ifdef TRITON_PB_STUB +#include "pb_cancel.h" #include "response_sender.h" #endif @@ -107,6 +108,7 @@ class InferRequest { #ifdef TRITON_PB_STUB std::shared_ptr Exec(const bool is_decoupled); std::shared_ptr GetResponseSender(); + bool IsCancelled(); #endif /// Save an Inference Request to shared memory. @@ -173,6 +175,7 @@ class InferRequest { std::unique_ptr parameters_shm_; #ifdef TRITON_PB_STUB + std::shared_ptr pb_cancel_; std::shared_ptr response_sender_; #endif }; diff --git a/src/ipc_message.h b/src/ipc_message.h index 7040f2b4..14d3dc5f 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -62,7 +62,8 @@ typedef enum PYTHONSTUB_commandtype_enum { PYTHONSTUB_MetricRequestSet, PYTHONSTUB_LoadModelRequest, PYTHONSTUB_UnloadModelRequest, - PYTHONSTUB_ModelReadinessRequest + PYTHONSTUB_ModelReadinessRequest, + PYTHONSTUB_IsRequestCancelled } PYTHONSTUB_CommandType; /// diff --git a/src/pb_cancel.cc b/src/pb_cancel.cc new file mode 100644 index 00000000..4c9b926b --- /dev/null +++ b/src/pb_cancel.cc @@ -0,0 +1,90 @@ +// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "pb_cancel.h" + +#include "pb_stub.h" + +namespace triton { namespace backend { namespace python { + +void +PbCancel::SaveToSharedMemory(std::unique_ptr& shm_pool) +{ + cancel_shm_ = shm_pool->Construct(); + new (&(cancel_shm_.data_->mu)) bi::interprocess_mutex; + new (&(cancel_shm_.data_->cv)) bi::interprocess_condition; + cancel_shm_.data_->waiting_on_stub = false; + cancel_shm_.data_->response_factory_address = response_factory_address_; + cancel_shm_.data_->request_address = request_address_; + cancel_shm_.data_->is_cancelled = is_cancelled_; +} + +bi::managed_external_buffer::handle_t +PbCancel::ShmHandle() +{ + return cancel_shm_.handle_; +} + +IsCancelledMessage* +PbCancel::ShmPayload() +{ + return cancel_shm_.data_.get(); +} + +bool +PbCancel::IsCancelled() +{ + std::unique_lock lk(mu_); + // The cancelled flag can only move from false to true, not the other way, so + // it is checked on each query until cancelled and then implicitly cached. + if (is_cancelled_) { + return is_cancelled_; + } + if (!updating_) { + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + if (!stub->StubToParentServiceActive()) { + LOG_ERROR << "Cannot communicate with parent service"; + return false; + } + stub->EnqueueIsCancelled(this); + updating_ = true; + } + cv_.wait(lk, [this] { return !updating_; }); + return is_cancelled_; +} + +void +PbCancel::ReportIsCancelled(bool is_cancelled) +{ + { + std::lock_guard lk(mu_); + is_cancelled_ = is_cancelled; + updating_ = false; + } + cv_.notify_all(); +} + +}}} // namespace triton::backend::python diff --git a/src/pb_cancel.h b/src/pb_cancel.h new file mode 100644 index 00000000..3ebf07b5 --- /dev/null +++ b/src/pb_cancel.h @@ -0,0 +1,64 @@ +// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +#include "pb_utils.h" + +namespace triton { namespace backend { namespace python { + +class PbCancel { + public: + PbCancel(intptr_t response_factory_address, intptr_t request_address) + : updating_(false), response_factory_address_(response_factory_address), + request_address_(request_address), is_cancelled_(false) + { + } + DISALLOW_COPY_AND_ASSIGN(PbCancel); + + void SaveToSharedMemory(std::unique_ptr& shm_pool); + bi::managed_external_buffer::handle_t ShmHandle(); + IsCancelledMessage* ShmPayload(); + + bool IsCancelled(); + void ReportIsCancelled(bool is_cancelled); + + private: + AllocatedSharedMemory cancel_shm_; + + std::mutex mu_; + std::condition_variable cv_; + bool updating_; + + intptr_t response_factory_address_; + intptr_t request_address_; + bool is_cancelled_; +}; + +}}}; // namespace triton::backend::python diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 37c9a5b5..87abe583 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -945,6 +945,9 @@ Stub::ServiceStubToParentRequests() SendLogMessage(utils_msg_payload); } else if (utils_msg_payload->command_type == PYTHONSTUB_CleanupRequest) { SendCleanupId(utils_msg_payload); + } else if ( + utils_msg_payload->command_type == PYTHONSTUB_IsRequestCancelled) { + SendIsCancelled(utils_msg_payload); } else { std::cerr << "Error when sending message via stub_to_parent message " "buffer - unknown command\n"; @@ -1028,6 +1031,44 @@ Stub::EnqueueCleanupId(void* id) } } +void +Stub::EnqueueIsCancelled(PbCancel* pb_cancel) +{ + std::unique_ptr utils_msg_payload = + std::make_unique( + PYTHONSTUB_IsRequestCancelled, reinterpret_cast(pb_cancel)); + EnqueueUtilsMessage(std::move(utils_msg_payload)); +} + +void +Stub::SendIsCancelled(std::unique_ptr& utils_msg_payload) +{ + PbCancel* pb_cancel = + reinterpret_cast(utils_msg_payload->utils_message_ptr); + pb_cancel->SaveToSharedMemory(shm_pool_); + + IsCancelledMessage* message_payload = pb_cancel->ShmPayload(); + std::unique_ptr ipc_message = + IPCMessage::Create(shm_pool_, false /* inline_response */); + ipc_message->Command() = utils_msg_payload->command_type; + ipc_message->Args() = pb_cancel->ShmHandle(); + + bool is_cancelled = false; + { + bi::scoped_lock lk(message_payload->mu); + + SendIPCUtilsMessage(ipc_message); + while (!message_payload->waiting_on_stub) { + message_payload->cv.wait(lk); + } + + is_cancelled = message_payload->is_cancelled; + message_payload->waiting_on_stub = false; + message_payload->cv.notify_all(); + } + pb_cancel->ReportIsCancelled(is_cancelled); +} + bool Stub::StubToParentServiceActive() { @@ -1364,6 +1405,7 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) .value( "ALREADY_EXISTS", TRITONSERVER_Error_Code::TRITONSERVER_ERROR_ALREADY_EXISTS) + .value("CANCELLED", TRITONSERVER_Error_Code::TRITONSERVER_ERROR_CANCELLED) .export_values(); triton_error.def_property_readonly_static( "UNKNOWN", @@ -1386,6 +1428,9 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) triton_error.def_property_readonly_static( "ALREADY_EXISTS", [](py::object /* self */) { return TRITONSERVER_ERROR_ALREADY_EXISTS; }); + triton_error.def_property_readonly_static( + "CANCELLED", + [](py::object /* self */) { return TRITONSERVER_ERROR_CANCELLED; }); triton_error.def( py::init(), py::arg("message").none(false), @@ -1501,7 +1546,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) .def( "requested_output_names", &InferRequest::RequestedOutputNames, py::return_value_policy::reference_internal) - .def("get_response_sender", &InferRequest::GetResponseSender); + .def("get_response_sender", &InferRequest::GetResponseSender) + .def("is_cancelled", &InferRequest::IsCancelled); py::class_>(module, "Tensor") .def(py::init(&PbTensor::FromNumpy)) @@ -1539,7 +1585,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) module, "InferenceResponseSender") .def( "send", &ResponseSender::Send, py::arg("response") = nullptr, - py::arg("flags") = 0); + py::arg("flags") = 0) + .def("is_cancelled", &ResponseSender::IsCancelled); py::class_>( module, "ResponseIterator") diff --git a/src/pb_stub.h b/src/pb_stub.h index 6d047d29..d52196e1 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -49,6 +49,7 @@ #include "message_queue.h" #include "metric.h" #include "metric_family.h" +#include "pb_cancel.h" #include "pb_log.h" #include "pb_response_iterator.h" #include "pb_utils.h" @@ -308,6 +309,12 @@ class Stub { /// Add cleanup id to queue void EnqueueCleanupId(void* id); + /// Add request cancellation query to queue + void EnqueueIsCancelled(PbCancel* pb_cancel); + + /// Send request cancellation query to python backend + void SendIsCancelled(std::unique_ptr& utils_msg_payload); + /// Is the stub initialized bool IsInitialized(); diff --git a/src/pb_utils.h b/src/pb_utils.h index 1d651f3f..612c46a4 100644 --- a/src/pb_utils.h +++ b/src/pb_utils.h @@ -182,6 +182,12 @@ struct CleanupMessage : SendMessageBase { void* id; }; +struct IsCancelledMessage : SendMessageBase { + intptr_t response_factory_address; + intptr_t request_address; + bool is_cancelled; +}; + struct CustomMetricsMessage : SendMessageBase { bi::managed_external_buffer::handle_t message; bool has_error; diff --git a/src/python_be.cc b/src/python_be.cc index b196cfab..7f46d473 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -817,6 +817,10 @@ ModelInstanceState::StubToParentMQMonitor() ProcessBLSCleanupRequest(message); break; } + case PYTHONSTUB_IsRequestCancelled: { + ProcessIsRequestCancelled(message); + break; + } case PYTHONSTUB_MetricFamilyRequestNew: case PYTHONSTUB_MetricFamilyRequestDelete: { ProcessMetricFamilyRequest(message); @@ -918,6 +922,40 @@ ModelInstanceState::ProcessBLSCleanupRequest( } } +void +ModelInstanceState::ProcessIsRequestCancelled( + const std::unique_ptr& message) +{ + AllocatedSharedMemory message_shm = + Stub()->ShmPool()->Load(message->Args()); + IsCancelledMessage* message_payload = + reinterpret_cast(message_shm.data_.get()); + + { + bi::scoped_lock lk{message_payload->mu}; + + if (message_payload->response_factory_address != 0) { + TRITONBACKEND_ResponseFactory* response_factory = + reinterpret_cast( + message_payload->response_factory_address); + TRITONBACKEND_ResponseFactoryIsCancelled( + response_factory, &message_payload->is_cancelled); + } else if (message_payload->request_address != 0) { + TRITONBACKEND_Request* request = reinterpret_cast( + message_payload->request_address); + TRITONBACKEND_RequestIsCancelled(request, &message_payload->is_cancelled); + } else { + throw PythonBackendException("Cannot determine request cancellation"); + } + + message_payload->waiting_on_stub = true; + message_payload->cv.notify_all(); + while (message_payload->waiting_on_stub) { + message_payload->cv.wait(lk); + } + } +} + template void ModelInstanceState::ProcessMessage( diff --git a/src/python_be.h b/src/python_be.h index 825c45de..4c8d702f 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -394,6 +394,9 @@ class ModelInstanceState : public BackendModelInstance { // Process the bls decoupled cleanup request void ProcessBLSCleanupRequest(const std::unique_ptr& message); + // Process request cancellation query + void ProcessIsRequestCancelled(const std::unique_ptr& message); + // Process a message. The function 'request_handler' is invoked // to handle the request. T should be either 'MetricFamily', 'Metric' or // 'ModelLoader', and MessageType should be either 'MetricFamilyMessage', diff --git a/src/response_sender.cc b/src/response_sender.cc index a74459f6..1e2e9b50 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -37,10 +37,11 @@ namespace triton { namespace backend { namespace python { ResponseSender::ResponseSender( intptr_t request_address, intptr_t response_factory_address, - std::unique_ptr& shm_pool) + std::unique_ptr& shm_pool, + const std::shared_ptr& pb_cancel) : request_address_(request_address), response_factory_address_(response_factory_address), shm_pool_(shm_pool), - closed_(false) + closed_(false), pb_cancel_(pb_cancel) { } @@ -184,4 +185,11 @@ ResponseSender::Send( } } } + +bool +ResponseSender::IsCancelled() +{ + return pb_cancel_->IsCancelled(); +} + }}} // namespace triton::backend::python diff --git a/src/response_sender.h b/src/response_sender.h index 114f22c0..fda0d5d3 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -1,4 +1,4 @@ -// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -27,6 +27,7 @@ #pragma once #include "infer_response.h" +#include "pb_cancel.h" #include "shm_manager.h" namespace triton { namespace backend { namespace python { @@ -35,13 +36,16 @@ class ResponseSender { public: ResponseSender( intptr_t request_address, intptr_t response_factory_address, - std::unique_ptr& shm_pool); + std::unique_ptr& shm_pool, + const std::shared_ptr& pb_cancel); void Send(std::shared_ptr response, const uint32_t flags); + bool IsCancelled(); private: intptr_t request_address_; intptr_t response_factory_address_; std::unique_ptr& shm_pool_; bool closed_; + std::shared_ptr pb_cancel_; }; }}} // namespace triton::backend::python