Skip to content

Commit

Permalink
[WIP] Enable batcher reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
kthui committed Jun 23, 2023
1 parent c50cf15 commit 13af988
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 52 deletions.
121 changes: 74 additions & 47 deletions src/sequence_batch_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,6 @@ Now()
.count();
}

std::unordered_map<TritonModelInstance*, std::shared_ptr<TritonModelInstance>>
IndexTritonModelInstances(
const std::vector<std::shared_ptr<TritonModelInstance>>& instances)
{
std::unordered_map<TritonModelInstance*, std::shared_ptr<TritonModelInstance>> mapped_instances;
for (auto& instance : instances) {
mapped_instances.emplace(instance.get(), instance);
}
return mapped_instances;
}

} // namespace

Status
Expand Down Expand Up @@ -129,7 +118,7 @@ SequenceBatchScheduler::Create(
}

// Create batchers.
sched->CreateBatchers();
RETURN_IF_ERROR(sched->CreateBatchers(model->BackgroundInstances()));

// Create a reaper thread that watches for idle sequences. Run the
// reaper a lower priority.
Expand All @@ -148,14 +137,10 @@ void
SequenceBatchScheduler::Update()
{
// Find added and removed instances for this update.
InstanceMap added_instances, removed_instances;
std::vector<std::shared_ptr<TritonModelInstance>> added_instances;
std::unordered_map<TritonModelInstance*, std::shared_ptr<TritonModelInstance>> removed_instances;
InstancesDiff(&added_instances, &removed_instances);

// For new instances, only need to add them safely.
// For removed instances, make sure any ongoing sequences related them have completed, and then remove those threads.

// Stop making changes from this point, make sure the transition from index to pointer is correct first.

std::unique_lock<std::mutex> lk(mu_);

// Instruct 'Enqueue()' to begin pausing new sequence.
Expand All @@ -173,54 +158,74 @@ SequenceBatchScheduler::Update()

// Flush all pending requests to the rate limiter.
lk.unlock();
batchers_.clear();
//batchers_.clear();
for (auto& pair : removed_instances) {
batchers_.erase(pair.second.get());
}
lk.lock();

// Re-construct the scheduler with new instances.
//auto instance_count = model_->BackgroundInstances().size();
queue_request_cnts_.clear();
//queue_request_cnts_.resize(instance_count, 0);
std::priority_queue<
//queue_request_cnts_.clear();
for (auto& pair : removed_instances) {
queue_request_cnts_.erase(pair.second.get());
}

/*std::priority_queue<
BatcherSequenceSlot, std::vector<BatcherSequenceSlot>,
BatcherSequenceSlotCompare> empty_ready_batcher_seq_slots;
ready_batcher_seq_slots_.swap(empty_ready_batcher_seq_slots);
CreateBatchers();
ready_batcher_seq_slots_.swap(empty_ready_batcher_seq_slots);*/
std::priority_queue<
BatcherSequenceSlot, std::vector<BatcherSequenceSlot>,
BatcherSequenceSlotCompare> new_ready_batcher_seq_slots;
while (!ready_batcher_seq_slots_.empty()) {
auto& ready_batcher_seq_slot = ready_batcher_seq_slots_.top();
if (removed_instances.find(ready_batcher_seq_slot.model_instance_) == removed_instances.end()) {
new_ready_batcher_seq_slots.push(ready_batcher_seq_slot);
}
ready_batcher_seq_slots_.pop();
}
ready_batcher_seq_slots_.swap(new_ready_batcher_seq_slots);

// Add the new batchers.
LOG_STATUS_ERROR(CreateBatchers(added_instances), "failed creating new batchers during update");

// The update is completed.
updating_ = false;
update_complete_cv_.notify_all();
}

void
SequenceBatchScheduler::InstancesDiff(InstanceMap* added_instances, InstanceMap* removed_instances)
SequenceBatchScheduler::InstancesDiff(
std::vector<std::shared_ptr<TritonModelInstance>>* added_instances,
std::unordered_map<TritonModelInstance*, std::shared_ptr<TritonModelInstance>>* removed_instances)
{
added_instances->clear();
removed_instances->clear();

auto curr_instances = IndexTritonModelInstances(model_->Instances()); // can come from previous new_instances
auto new_instances = IndexTritonModelInstances(model_->BackgroundInstances());
// Index currently serving instances.
auto* curr_instances = removed_instances; // rename the container
for (auto& instance : model_->Instances()) {
curr_instances->emplace(instance.get(), instance);
}

for (auto& pair : new_instances) {
auto& new_instance = pair.second;
if (curr_instances.find(new_instance.get()) == curr_instances.end()) {
added_instances->emplace(new_instance.get(), new_instance);
// Compare the current set of serving instance against the next set.
for (auto& instance : model_->BackgroundInstances()) {
if (curr_instances->find(instance.get()) == curr_instances->end()) {
// The instance is on the next set but not current, so it is added.
added_instances->push_back(instance);
}
else {
// The instance is re-used, remove it from current to reduce search size.
curr_instances.erase(pair.first);
}
}

for (auto& pair : curr_instances) {
auto& curr_instance = pair.second;
if (new_instances.find(curr_instance.get()) == new_instances.end()) {
removed_instances->emplace(curr_instance.get(), curr_instance);
// Remove overlappings from current, so any remainings are removed.
curr_instances->erase(instance.get());
}
}
}

Status
SequenceBatchScheduler::CreateBatchers()
SequenceBatchScheduler::CreateBatchers(
const std::vector<std::shared_ptr<TritonModelInstance>>& instances)
{
auto& config = model_->Config();

Expand All @@ -246,7 +251,7 @@ SequenceBatchScheduler::CreateBatchers()
// Create one SequenceBatch object for each requested runner. The
// SequenceBatch object has a thread that manages the batch of
// requests.
const auto& instances = model_->BackgroundInstances();
//const auto& instances = model_->BackgroundInstances();
//uint32_t index = 0;
for (const auto& instance : instances) {
bool init_state;
Expand Down Expand Up @@ -1337,10 +1342,32 @@ DirectSequenceBatch::DirectSequenceBatch(

DirectSequenceBatch::~DirectSequenceBatch()
{
// Signal the scheduler thread to exit...
{
std::unique_lock<std::mutex> lock(mu_);
scheduler_thread_exit_ = true;
// Wait until all pending requests are completed.
bool waiting_for_requests = true;
while (waiting_for_requests) {
std::unique_lock<std::mutex> lk1(mu_);
std::unique_lock<std::mutex> lk2(payload_mu_);

// Wait until the last enqueued payload completes execution.
while (!exec_complete_ || curr_payload_->RequestCount() > 0) {
LOG_VERBOSE(1) << "Waiting for current payload to complete execution before exiting";
payload_cv_.wait(lk2);
}

// Make sure there are no more queued requests.
waiting_for_requests = false;
for (uint32_t seq_slot = 0; seq_slot < queues_.size(); seq_slot++) {
if (!queues_[seq_slot].empty()) {
LOG_VERBOSE(1) << "Waiting for slot " << seq_slot << " to begin execution before exiting";
waiting_for_requests = true;
break;
}
}

// Signal the scheduler thread to exit.
if (!waiting_for_requests) {
scheduler_thread_exit_ = true;
}
}

cv_.notify_one();
Expand Down Expand Up @@ -1695,7 +1722,7 @@ DirectSequenceBatch::BatcherThread(const int nice)
std::unique_lock<std::mutex> lk(payload_mu_);
exec_complete_ = true;
}
payload_cv_.notify_one();
payload_cv_.notify_all();
};
curr_payload_->AddInternalReleaseCallback(callback);
curr_payload_->MarkSaturated();
Expand Down Expand Up @@ -1775,7 +1802,7 @@ OldestSequenceBatch::~OldestSequenceBatch()
{
std::unique_lock<std::mutex> lock(mu_);

// Flush all queued requests onto the dynamic batcher.
// Wait until all pending requests are completed.
for (uint32_t seq_slot = 0; seq_slot < queues_.size(); seq_slot++) {
while (in_flight_[seq_slot] || !queues_[seq_slot].empty()) {
LOG_VERBOSE(1) << "Waiting for slot " << seq_slot << " with "
Expand Down
12 changes: 7 additions & 5 deletions src/sequence_batch_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ class SequenceBatchScheduler : public Scheduler {
}

private:
using InstanceMap = std::unordered_map<TritonModelInstance*, std::shared_ptr<TritonModelInstance>>;

SequenceBatchScheduler(
TritonModel* model,
const std::unordered_map<std::string, bool>& enforce_equal_shape_tensors)
Expand All @@ -133,8 +131,6 @@ class SequenceBatchScheduler : public Scheduler {

void ReaperThread(const int nice);

Status CreateBatchers();

Status CreateBooleanControlTensors(
const inference::ModelConfig& config,
std::shared_ptr<ControlInputs>* start_input_overrides,
Expand All @@ -155,8 +151,14 @@ class SequenceBatchScheduler : public Scheduler {
}
};

// Create a batcher for each of the provided instances.
Status CreateBatchers(
const std::vector<std::shared_ptr<TritonModelInstance>>& instances);

// Return the added and removed instances for scheduler update purposes.
void InstancesDiff(InstanceMap* added_instances, InstanceMap* removed_instances);
void InstancesDiff(
std::vector<std::shared_ptr<TritonModelInstance>>* added_instances,
std::unordered_map<TritonModelInstance*, std::shared_ptr<TritonModelInstance>>* removed_instances);

// The 'TritonModel' and 'enforce_equal_shape_tensors' when this scheduler is
// created.
Expand Down

0 comments on commit 13af988

Please sign in to comment.