From 0dcb8700b5564da9dfc69d68f713bbf05bd9fae6 Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Thu, 12 Oct 2023 09:42:10 -0400 Subject: [PATCH] Restructure cleaning up of the futures in decoupled mode (#309) * Restructure cleaning up of the futures in decoupled * Minor improvement --- src/python_be.cc | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/python_be.cc b/src/python_be.cc index db979562..de639df3 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -777,9 +777,7 @@ ModelInstanceState::DecoupledMessageQueueMonitor() std::packaged_task task([this, response_send_message] { ResponseSendDecoupled(response_send_message); }); - std::future future = - boost::asio::post(*thread_pool_, std::move(task)); - futures_.emplace_back(std::move(future)); + boost::asio::post(*thread_pool_, std::move(task)); } else if ( message->Command() == PYTHONSTUB_InferExecRequest || message->Command() == PYTHONSTUB_InferStreamExecRequest) { @@ -789,9 +787,7 @@ ModelInstanceState::DecoupledMessageQueueMonitor() bls_execute, (bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest)); }); - std::future future = - boost::asio::post(*thread_pool_, std::move(task)); - futures_.emplace_back(std::move(future)); + boost::asio::post(*thread_pool_, std::move(task)); } } } @@ -1708,12 +1704,14 @@ ModelInstanceState::~ModelInstanceState() Stub()->UpdateHealth(); if (Stub()->IsHealthy()) { if (model_state->IsDecoupled()) { - futures_.clear(); + // Wait for all the pending tasks to finish. + thread_pool_->wait(); // Push a dummy message to signal the thread to terminate. Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE); decoupled_monitor_.join(); + } else { + thread_pool_->wait(); } - thread_pool_->wait(); } // Terminate stub first to allow any last messages to be received by the back // end before deallocating the queue memory