Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to update existing schedulers #207

Merged
merged 36 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ae751e0
Add scheduler update function
kthui May 15, 2023
7150225
[WIP] Add sequence scheduler update
kthui May 17, 2023
a6ee8c4
Enable sequence model update
kthui May 18, 2023
d416a6f
[WIP cont] Add sequence scheduler update
kthui May 19, 2023
7efb91f
[WIP] Oldest sequence batch flush queued request
kthui May 20, 2023
5b5f7e8
Refactor code
kthui May 22, 2023
31195b3
Use a different way of updating instance group setting
kthui May 23, 2023
4880732
Remove lock on dynamic scheduler enqueue
kthui May 24, 2023
b7bf5ff
Remove lock returned from scheduler update
kthui Jun 1, 2023
210a713
Scheduler interface update
kthui Jun 13, 2023
833d8a7
[WIP] Replace batcher id with model instance pointer
kthui Jun 14, 2023
b3307a3
Fix issues
kthui Jun 15, 2023
a52fec7
[WIP] Enable batcher reuse
kthui Jun 23, 2023
046862d
Minor fixes
kthui Jun 23, 2023
e28ef39
[WIP] Do not block unchanged batchers from infering
kthui Jun 26, 2023
7ff5a14
Clean up and format
kthui Jun 26, 2023
db20a62
Use reinterpret_cast for pointer type conversion
kthui Jun 28, 2023
b45f4ae
Destruct scheduler before instances on model destructor
kthui Jun 28, 2023
1164713
Use dynamic cast for model downcast on model lifecycle
kthui Jul 6, 2023
f60e175
Refactor model reuse code
kthui Jul 11, 2023
1684117
Use slot count when batcher is created during removal
kthui Jul 11, 2023
27630a4
Improve direct batcher destruction logic
kthui Jul 11, 2023
169870b
Add more descriptions and improve function naming
kthui Jul 13, 2023
93f4610
Remove batcher pending sequence completion after update return
kthui Jul 14, 2023
5524945
[WIP] Async destruction of batchers
kthui Jul 15, 2023
00d0ef1
Async destruction of resources
kthui Jul 17, 2023
98cf6d8
Address pre-commit action
kthui Jul 18, 2023
7d47625
Use struct for instance setting
kthui Jul 19, 2023
a21e40e
Adjust host policy init logic
kthui Jul 19, 2023
a05d4f9
Move verify model load gpu fraction to helper function
kthui Jul 19, 2023
20ca4bb
Minor comment improvement
kthui Jul 19, 2023
6cdd4eb
Simplify steps before starting background threads
kthui Jul 20, 2023
f3338a9
Improve iterator variable naming
kthui Jul 20, 2023
db77e37
Group set thread priority into a function
kthui Jul 20, 2023
d875d79
Use atomic bool for thread exit vars
kthui Jul 20, 2023
b6a8a63
Simplify pending removal to removed steps
kthui Jul 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 181 additions & 37 deletions src/backend_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,19 @@ TritonModel::Create(
}

// Create or update the model instances for this model.
RETURN_IF_ERROR(TritonModelInstance::SetInstances(
raw_local_model, backend_cmdline_config_map, host_policy_map,
model_config));
RETURN_IF_ERROR(local_model->CommitInstances());

RETURN_IF_ERROR(local_model->SetConfiguredScheduler());
std::vector<std::shared_ptr<TritonModelInstance>> added_instances,
removed_instances;
RETURN_IF_ERROR(local_model->PrepareInstances(
model_config, &added_instances, &removed_instances));
RETURN_IF_ERROR(local_model->SetConfiguredScheduler(added_instances));
local_model->CommitInstances();

*model = std::move(local_model);
return Status::Success;
}

Status
TritonModel::UpdateInstanceGroup(
const inference::ModelConfig& new_model_config,
std::unique_lock<std::mutex>* caller_lock)
TritonModel::UpdateInstanceGroup(const inference::ModelConfig& new_model_config)
{
// Generate normalized model config with new instance group.
inference::ModelConfig model_config = config_;
Expand All @@ -268,25 +266,26 @@ TritonModel::UpdateInstanceGroup(
&model_config));
RETURN_IF_ERROR(ValidateInstanceGroup(model_config, min_compute_capability_));

// Update the instances to the new config.
caller_lock->unlock(); // allow inference while creating instances
Status status = TritonModelInstance::SetInstances(
this, backend_cmdline_config_map_, host_policy_map_, model_config);
caller_lock->lock();
// Prepare the new instances on the new config.
std::vector<std::shared_ptr<TritonModelInstance>> added_instances,
removed_instances;
Status status =
PrepareInstances(model_config, &added_instances, &removed_instances);
if (!status.IsOk()) {
ClearBackgroundInstances();
return status;
}

// Update the scheduler.
status = UpdateConfiguredScheduler(added_instances, removed_instances);
if (!status.IsOk()) {
// Remove any pending instances if created.
bg_instances_.clear();
bg_passive_instances_.clear();
ClearBackgroundInstances();
return status;
}

// At this point, the new model config is ready but not yet written into this
// object. The 'caller_lock' is held, so 'model_lifecycle' will pause any new
// inference request. It is safe to move forward and commit the change.
config_.mutable_instance_group()->Swap(model_config.mutable_instance_group());
RETURN_IF_ERROR(CommitInstances());
// Only model owned dynamic batch scheduler can be updated currently, so there
// is no need to update the scheduler.
// Commit the instance update.
CommitInstances();
*config_.mutable_instance_group() = model_config.instance_group();
rmccorm4 marked this conversation as resolved.
Show resolved Hide resolved

return Status::Success;
}
Expand Down Expand Up @@ -394,27 +393,146 @@ TritonModel::IndexInstances() const
}

Status
TritonModel::RegisterInstance(
std::shared_ptr<TritonModelInstance>&& instance, const bool passive)
TritonModel::PrepareInstances(
const inference::ModelConfig& model_config,
std::vector<std::shared_ptr<TritonModelInstance>>* added_instances,
std::vector<std::shared_ptr<TritonModelInstance>>* removed_instances)
{
if (passive) {
bg_passive_instances_.emplace_back(std::move(instance));
} else {
bg_instances_.emplace_back(std::move(instance));
added_instances->clear();
removed_instances->clear();

std::unordered_map<
TritonModelInstance::Signature,
std::vector<std::shared_ptr<TritonModelInstance>>>
existing_instances = IndexInstances();

rmccorm4 marked this conversation as resolved.
Show resolved Hide resolved
// Iterates over all the requested instances on the model config, and decides
// if each requested instance can reuse an existing instance or a new instance
// is needed.
for (const auto& group : model_config.instance_group()) {
std::vector<std::string> profile_names;
for (const auto& profile_name : group.profile()) {
profile_names.push_back(profile_name);
}
std::vector<TritonModelInstance::SecondaryDevice> secondary_devices;
for (const auto& secondary_device : group.secondary_devices()) {
secondary_devices.emplace_back(
inference::
ModelInstanceGroup_SecondaryDevice_SecondaryDeviceKind_Name(
secondary_device.kind()),
secondary_device.device_id());
}
for (int32_t c = 0; c < group.count(); ++c) {
std::string instance_name{group.name() + "_" + std::to_string(c)};
const bool passive = group.passive();
struct InstanceSetting {
InstanceSetting(
const std::string& policy_name, TRITONSERVER_InstanceGroupKind kind,
int32_t device_id,
const inference::ModelRateLimiter* rate_limiter_config)
: policy_name_(policy_name), kind_(kind), device_id_(device_id),
rate_limiter_config_(rate_limiter_config)
{
}
const std::string policy_name_;
const TRITONSERVER_InstanceGroupKind kind_;
const int32_t device_id_;
const inference::ModelRateLimiter* rate_limiter_config_;
};
std::vector<InstanceSetting> instance_settings;
if (group.kind() == inference::ModelInstanceGroup::KIND_CPU) {
instance_settings.emplace_back(
group.host_policy().empty() ? "cpu" : group.host_policy(),
TRITONSERVER_INSTANCEGROUPKIND_CPU, 0 /* device_id */,
&group.rate_limiter());
} else if (group.kind() == inference::ModelInstanceGroup::KIND_GPU) {
for (const int32_t device_id : group.gpus()) {
instance_settings.emplace_back(
group.host_policy().empty() ? ("gpu_" + std::to_string(device_id))
: group.host_policy(),
TRITONSERVER_INSTANCEGROUPKIND_GPU, device_id,
&group.rate_limiter());
}
} else if (group.kind() == inference::ModelInstanceGroup::KIND_MODEL) {
instance_settings.emplace_back(
group.host_policy().empty() ? "model" : group.host_policy(),
TRITONSERVER_INSTANCEGROUPKIND_MODEL, 0 /* device_id */,
&group.rate_limiter());
} else {
return Status(
Status::Code::INVALID_ARG,
std::string("instance_group kind ") +
ModelInstanceGroup_Kind_Name(group.kind()) + " not supported");
}
for (const auto& is : instance_settings) {
// All the information for the requested instance is ready. Create a
// signature that identifies the requested instance.
const TritonModelInstance::Signature signature(group, is.device_id_);

// Check if the requested instance can reuse an existing instance.
if (!TritonModelInstance::ShareBackendThread(
DeviceBlocking(), is.kind_)) {
auto itr = existing_instances.find(signature);
if (itr != existing_instances.end() && !itr->second.empty()) {
auto existing_instance = itr->second.back();
itr->second.pop_back();
LOG_VERBOSE(2) << "Re-using model instance named '"
<< existing_instance->Name() << "' with device id '"
<< existing_instance->DeviceId() << "'";
RegisterBackgroundInstance(std::move(existing_instance), passive);

continue;
}
}

// The requested instance did not match an existing instance. Create a
// new instance.
std::shared_ptr<TritonModelInstance> new_instance;
LOG_VERBOSE(2) << "Creating model instance named '" << instance_name
<< "' with device id '" << is.device_id_ << "'";
RETURN_IF_ERROR(TritonModelInstance::CreateInstance(
this, instance_name, signature, is.kind_, is.device_id_,
profile_names, passive, is.policy_name_, *is.rate_limiter_config_,
secondary_devices, &new_instance));
added_instances->push_back(new_instance);
RegisterBackgroundInstance(std::move(new_instance), passive);
}
}
}

// Any existing instances not reused will be removed.
for (auto pair : existing_instances) {
removed_instances->insert(
removed_instances->end(), pair.second.begin(), pair.second.end());
}

return Status::Success;
}

Status
void
TritonModel::CommitInstances()
{
instances_.swap(bg_instances_);
passive_instances_.swap(bg_passive_instances_);
ClearBackgroundInstances();
}

void
TritonModel::RegisterBackgroundInstance(
std::shared_ptr<TritonModelInstance>&& instance, const bool passive)
{
if (passive) {
bg_passive_instances_.emplace_back(std::move(instance));
} else {
bg_instances_.emplace_back(std::move(instance));
}
}

void
TritonModel::ClearBackgroundInstances()
{
bg_instances_.clear();
bg_passive_instances_.clear();

return Status::Success;
}

std::vector<std::shared_ptr<TritonModelInstance>>
Expand Down Expand Up @@ -483,7 +601,8 @@ TritonModel::UpdateModelConfig(
}

Status
TritonModel::SetConfiguredScheduler()
TritonModel::SetConfiguredScheduler(
const std::vector<std::shared_ptr<TritonModelInstance>>& new_instances)
{
std::unique_ptr<Scheduler> scheduler;

Expand Down Expand Up @@ -513,7 +632,7 @@ TritonModel::SetConfiguredScheduler()
"sequence batcher, using default batching strategy";
}
RETURN_IF_ERROR(SequenceBatchScheduler::Create(
this, enforce_equal_shape_tensors, &scheduler));
this, new_instances, enforce_equal_shape_tensors, &scheduler));
} else if (config_.has_dynamic_batching()) {
// Dynamic batcher
RETURN_IF_ERROR(DynamicBatchScheduler::Create(
Expand All @@ -539,6 +658,29 @@ TritonModel::SetConfiguredScheduler()
return SetScheduler(std::move(scheduler));
}

Status
TritonModel::UpdateConfiguredScheduler(
const std::vector<std::shared_ptr<TritonModelInstance>>& added_instances,
const std::vector<std::shared_ptr<TritonModelInstance>>& removed_instances)
{
if (config_.has_sequence_batching()) {
SequenceBatchScheduler* sched =
dynamic_cast<SequenceBatchScheduler*>(scheduler_.get());
if (sched == nullptr) {
return Status(
Status::Code::INTERNAL,
"Unable to downcast from 'Scheduler' to 'SequenceBatchScheduler' "
"during scheduler update");
}
return sched->Update(added_instances, removed_instances);
}

// Non-sequence scheduler does not need to be updated, because other
// schedulers do not require the information on model instances to function,
// and only interact with the rate limiter.
return Status::Success;
}

Status
TritonModel::SetBatchingStrategy(const std::string& batch_libpath)
{
Expand Down Expand Up @@ -629,12 +771,14 @@ TritonModel::~TritonModel()
// Clear library handles.
ClearHandles();

// Explicitly delete/finalize the scheduler before the model instances.
scheduler_.reset(nullptr);

// Explicitly delete/finalize all model instances before finalizing
// the model itself.
instances_.clear();
passive_instances_.clear();
bg_instances_.clear();
bg_passive_instances_.clear();
ClearBackgroundInstances();

// Unregister itself from the rate limiter. Note this should happen
// after all instances are destructed. Destrucing instances ensures
Expand Down
61 changes: 39 additions & 22 deletions src/backend_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,15 @@ class TritonModel : public Model {
TRITONSERVER_Message* updated_config_message);
// Return the underlying backend.
const std::shared_ptr<TritonBackend>& Backend() const { return backend_; }
// Return the foreground instances, excluding passive instances.
const std::vector<std::shared_ptr<TritonModelInstance>>& Instances() const
// Return the backend command line config map.
const triton::common::BackendCmdlineConfigMap& BackendConfigMap() const
{
return instances_;
return backend_cmdline_config_map_;
}
// Return the host policy command line config map.
const triton::common::HostPolicyCmdlineConfigMap& HostPolicyMap() const
{
return host_policy_map_;
}

// True if different instances should be grouped by device; false otherwise.
Expand All @@ -96,21 +101,8 @@ class TritonModel : public Model {
void* State() { return state_; }
void SetState(void* state) { state_ = state; }

// Return all foreground instances indexed by its respective signature.
std::unordered_map<
TritonModelInstance::Signature,
std::vector<std::shared_ptr<TritonModelInstance>>>
IndexInstances() const;
// Register new instances into the background.
Status RegisterInstance(
std::shared_ptr<TritonModelInstance>&& instance, const bool passive);

// Update instance group. 'caller_lock' will be released when creating new
// instances and re-held when returning, to allow atomic switch over to the
// new instances.
Status UpdateInstanceGroup(
const inference::ModelConfig& new_model_config,
std::unique_lock<std::mutex>* caller_lock);
// Update instance group.
Status UpdateInstanceGroup(const inference::ModelConfig& new_model_config);

// Custom batching function getters.
TritonModelBatchInclFn_t ModelBatchInclFn() const { return batch_incl_fn_; }
Expand All @@ -130,8 +122,27 @@ class TritonModel : public Model {
const triton::common::BackendCmdlineConfigMap& backend_cmdline_config_map,
const triton::common::HostPolicyCmdlineConfigMap& host_policy_map);

// Prepare the next set of instances on the background. Returns the instances
// that will be added and removed if the next set of instances is to be
// committed.
Status PrepareInstances(
const inference::ModelConfig& model_config,
std::vector<std::shared_ptr<TritonModelInstance>>* added_instances,
std::vector<std::shared_ptr<TritonModelInstance>>* removed_instances);
// Replace the foreground instances with background instances.
Status CommitInstances();
void CommitInstances();

// Return all foreground instances indexed by its respective signature.
std::unordered_map<
TritonModelInstance::Signature,
std::vector<std::shared_ptr<TritonModelInstance>>>
IndexInstances() const;

// Add a new instance into the background.
void RegisterBackgroundInstance(
std::shared_ptr<TritonModelInstance>&& instance, const bool passive);
// Clear all background instances.
void ClearBackgroundInstances();

// Gets the execution policy setting from the backend.
Status GetExecutionPolicy(const inference::ModelConfig& model_config);
Expand All @@ -157,9 +168,15 @@ class TritonModel : public Model {
return res;
}

// Set the scheduler based on the model configuration and foreground
// 'instances'.
Status SetConfiguredScheduler();
// Set the scheduler based on the model configuration and the provided
// instances.
Status SetConfiguredScheduler(
const std::vector<std::shared_ptr<TritonModelInstance>>& new_instances);
// Update the set scheduler to the new set of instances.
Status UpdateConfiguredScheduler(
const std::vector<std::shared_ptr<TritonModelInstance>>& added_instances,
const std::vector<std::shared_ptr<TritonModelInstance>>&
removed_instances);

// Set the batching strategy, if custom functions provided by user.
// This function should only be called with the dynamic batcher.
Expand Down
Loading