Skip to content

Commit

Permalink
Improve direct batcher destruction logic
Browse files Browse the repository at this point in the history
  • Loading branch information
kthui committed Jul 11, 2023
1 parent bde56a3 commit ee04d8d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
59 changes: 33 additions & 26 deletions src/sequence_batch_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1323,36 +1323,39 @@ DirectSequenceBatch::DirectSequenceBatch(

DirectSequenceBatch::~DirectSequenceBatch()
{
// Wait until all pending requests are completed.
bool waiting_for_requests = true;
while (waiting_for_requests) {
std::unique_lock<std::mutex> lk1(mu_);
std::unique_lock<std::mutex> lk2(payload_mu_);

// Wait until the last enqueued payload completes execution.
while (!exec_complete_ || curr_payload_->RequestCount() > 0) {
LOG_VERBOSE(1)
<< "Waiting for current payload to complete execution before exiting";
payload_cv_.wait(lk2);
}

// Make sure there are no more queued requests.
waiting_for_requests = false;
for (uint32_t seq_slot = 0; seq_slot < queues_.size(); seq_slot++) {
if (!queues_[seq_slot].empty()) {
LOG_VERBOSE(1) << "Waiting for slot " << seq_slot
<< " to begin execution before exiting";
waiting_for_requests = true;
break;
// Wait until all queued requests begin execution.
{
std::unique_lock<std::mutex> lk(mu_);
queues_cv_.wait(lk, [this] {
for (uint32_t seq_slot = 0; seq_slot < queues_.size(); seq_slot++) {
if (!queues_[seq_slot].empty()) {
LOG_VERBOSE(1) << "Waiting for slot " << seq_slot
<< " to begin execution before exiting";
return false;
}
}
}
return true;
});
}

// Signal the scheduler thread to exit.
if (!waiting_for_requests) {
scheduler_thread_exit_ = true;
}
// Wait until the last enqueued payload completes execution.
{
std::unique_lock<std::mutex> lk(payload_mu_);
payload_cv_.wait(lk, [this] {
if (!exec_complete_ || curr_payload_->RequestCount() > 0) {
LOG_VERBOSE(1) << "Waiting for current payload to complete execution "
"before exiting";
return false;
}
return true;
});
}

// Signal the scheduler thread to exit.
{
std::lock_guard<std::mutex> lk(mu_);
scheduler_thread_exit_ = true;
}
cv_.notify_one();

// It is possible for the scheduler thread to be the last holder of
Expand Down Expand Up @@ -1698,6 +1701,10 @@ DirectSequenceBatch::BatcherThread(const int nice)
}
}

// Requests could be removed from the queue, so notify a thread waiting for
// requests removal from the queue.
queues_cv_.notify_one();

if (curr_payload_->GetState() == Payload::State::READY) {
// Add callback to signal the execution completion
exec_complete_ = false;
Expand Down
2 changes: 2 additions & 0 deletions src/sequence_batch_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ class DirectSequenceBatch : public SequenceBatch {
// queues, one for each sequence slot where requests assigned to
// that slot are enqueued to wait for inferencing.
std::vector<std::deque<std::unique_ptr<InferenceRequest>>> queues_;
// Notify when requests are removed from the queue.
std::condition_variable queues_cv_;

// Is each sequence slot active or not? A zero or empty value indicates
// inactive, a non-zero/non-empty value indicates active and is the
Expand Down

0 comments on commit ee04d8d

Please sign in to comment.