Skip to content

Commit

Permalink
Restructure cleaning up of the futures in decoupled mode (#309)
Browse files Browse the repository at this point in the history
* Restructure cleaning up of the futures in decoupled

* Minor improvement
  • Loading branch information
Tabrizian authored Oct 12, 2023
1 parent 4c4a552 commit a2e8f9b
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -777,9 +777,7 @@ ModelInstanceState::DecoupledMessageQueueMonitor()
std::packaged_task<void()> task([this, response_send_message] {
ResponseSendDecoupled(response_send_message);
});
std::future<void> future =
boost::asio::post(*thread_pool_, std::move(task));
futures_.emplace_back(std::move(future));
boost::asio::post(*thread_pool_, std::move(task));
} else if (
message->Command() == PYTHONSTUB_InferExecRequest ||
message->Command() == PYTHONSTUB_InferStreamExecRequest) {
Expand All @@ -789,9 +787,7 @@ ModelInstanceState::DecoupledMessageQueueMonitor()
bls_execute,
(bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest));
});
std::future<void> future =
boost::asio::post(*thread_pool_, std::move(task));
futures_.emplace_back(std::move(future));
boost::asio::post(*thread_pool_, std::move(task));
}
}
}
Expand Down Expand Up @@ -1708,12 +1704,14 @@ ModelInstanceState::~ModelInstanceState()
Stub()->UpdateHealth();
if (Stub()->IsHealthy()) {
if (model_state->IsDecoupled()) {
futures_.clear();
// Wait for all the pending tasks to finish.
thread_pool_->wait();
// Push a dummy message to signal the thread to terminate.
Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE);
decoupled_monitor_.join();
} else {
thread_pool_->wait();
}
thread_pool_->wait();
}
// Terminate stub first to allow any last messages to be received by the back
// end before deallocating the queue memory
Expand Down

0 comments on commit a2e8f9b

Please sign in to comment.