Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Support batch inference with shape clustering #1668

Merged
merged 4 commits into from
Feb 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 139 additions & 46 deletions csrc/mmdeploy/net/net_module.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) OpenMMLab. All rights reserved.

#include "net_module.h"
#include "mmdeploy/net/net_module.h"

#include <algorithm>
#include <numeric>
#include <thread>

#include "mmdeploy/archive/value_archive.h"
Expand Down Expand Up @@ -31,6 +33,11 @@ struct NetModule::Impl {
is_profiling_ = true;
}
auto model = context["model"].get<Model>();
for (const auto& meta : model.meta().models) {
if (meta.name == name) {
max_batch_size_ = meta.batch_size;
}
}
OUTCOME_TRY(auto config, model.GetModelConfig(name));
device_ = context.value("device", Device{"cpu"});
stream_ = context.value("stream", Stream::GetDefault(device_));
Expand Down Expand Up @@ -78,112 +85,197 @@ struct NetModule::Impl {
return success();
}

Result<TensorShape> InferInputShape(const vector<Tensor>& input) {
static Result<TensorShape> InferBatchShape(const vector<Tensor>& input) {
auto batch_size = input.size();
auto& exemplar = input.front();
auto shape = exemplar.shape();
if (batch_size == 1) {
return shape;
}
if (shape[0] != 1) {
MMDEPLOY_ERROR("unsupported shape for batch assemble: {}", shape);
MMDEPLOY_WARN("unsupported shape for batch assemble: {}", shape);
return Status(eNotSupported);
}
for (int i = 1; i < input.size(); ++i) {
auto& sample = input[i];
if (sample.shape() != shape) {
MMDEPLOY_ERROR("shapes are not consistent across the batch");
MMDEPLOY_WARN("shapes are not consistent across the batch");
return Status(eNotSupported);
}
}
shape[0] = static_cast<int64_t>(batch_size);
return shape;
}

Result<vector<TensorShape> > InferInputShape(const vector<vector<Tensor> >& inputs) {
static Result<vector<TensorShape>> InferBatchShape(const vector<vector<Tensor>>& inputs) {
vector<TensorShape> shapes;
shapes.reserve(inputs.size());
for (const auto& input : inputs) {
OUTCOME_TRY(auto shape, InferInputShape(input));
OUTCOME_TRY(auto shape, InferBatchShape(input));
shapes.push_back(std::move(shape));
}
return shapes;
}

Result<std::vector<Output> > Forward(const std::vector<Input>& input) {
// auto t0 = std::chrono::high_resolution_clock::now();
//
auto batch_size = static_cast<int>(input.size());

std::vector<std::vector<Tensor> > input_samples;
Result<vector<vector<Tensor>>> CollectInputTensors(const vector<Input>& inputs) {
vector<vector<Tensor>> input_samples;
input_samples.reserve(inputs_.size());
for (const auto& t : inputs_) {
auto name = input_mapping_.at(t.name());
std::vector<Tensor> tmp;
tmp.reserve(input.size());
for (int i = 0; i < input.size(); ++i) {
auto& sample = input[i];
auto& tmp = input_samples.emplace_back();
for (const auto& sample : inputs) {
if (auto it = sample.find(name); it != sample.end()) {
tmp.push_back(it->second);
} else {
MMDEPLOY_ERROR("sample {} missing key {}", i, name);
MMDEPLOY_ERROR("sample {} missing key {}", &sample - inputs.data(), name);
return Status(eInvalidArgument);
}
}
input_samples.push_back(std::move(tmp));
}
return input_samples;
}

void SaveBatch(vector<vector<Tensor>> samples, vector<int> indices,
vector<vector<vector<Tensor>>>& batch_tensors,
vector<vector<TensorShape>>& batch_shapes,
vector<vector<int>>& batch_sample_idxs) const {
if (auto maybe_batch_shape = InferBatchShape(samples)) {
batch_shapes.push_back(maybe_batch_shape.value());
batch_tensors.push_back(std::move(samples));
batch_sample_idxs.push_back(std::move(indices));
} else {
// cannot assemble batch, do it one by one
for (int k = 0; k < indices.size(); ++k) {
auto& shapes = batch_shapes.emplace_back();
auto& batch = batch_tensors.emplace_back(inputs_.size());
batch_sample_idxs.push_back({indices[k]});
for (int j = 0; j < inputs_.size(); ++j) {
shapes.push_back(samples[j][k].shape());
batch[j].push_back(std::move(samples[j][k]));
}
}
}
}

void SamplesToBatches(const vector<vector<Tensor>>& input_samples, size_t n_samples,
vector<vector<vector<Tensor>>>& batch_tensors,
vector<vector<TensorShape>>& batch_shapes,
vector<vector<int>>& batch_sample_idxs) const {
// concat all shapes in samples to make comparison easier
vector<vector<int64_t>> concat_shapes;
concat_shapes.reserve(n_samples);
for (size_t i = 0; i < n_samples; ++i) {
auto& shape = concat_shapes.emplace_back();
for (const auto& input : input_samples) {
shape.insert(shape.end(), input[i].shape().begin(), input[i].shape().end());
}
}

// cluster samples by concatenated shapes
vector<int> shape_idxs(concat_shapes.size());
std::iota(shape_idxs.begin(), shape_idxs.end(), 0);
std::sort(shape_idxs.begin(), shape_idxs.end(),
[&concat_shapes](int i, int j) { return concat_shapes[i] < concat_shapes[j]; });
shape_idxs.erase(std::unique(shape_idxs.begin(), shape_idxs.end(),
[&concat_shapes](int i, int j) {
return concat_shapes[i] == concat_shapes[j];
}),
shape_idxs.end());

// generate batches of samples with equal shapes, limit the batch size by max_batch_size_
for (const auto ref_shape_idx : shape_idxs) {
const auto& ref_shape = concat_shapes[ref_shape_idx];
vector<vector<Tensor>> samples(inputs_.size());
vector<int> indices;
for (size_t i = 0; i < concat_shapes.size(); ++i) {
if (concat_shapes[i] == ref_shape) {
for (size_t j = 0; j < inputs_.size(); ++j) {
samples[j].push_back(input_samples[j][i]);
}
indices.push_back(static_cast<int>(i));
if (indices.size() == max_batch_size_) {
SaveBatch(std::move(samples), std::move(indices), batch_tensors, batch_shapes,
batch_sample_idxs);
samples = vector<vector<Tensor>>(inputs_.size());
indices = {};
}
}
}
if (!indices.empty()) {
SaveBatch(std::move(samples), std::move(indices), batch_tensors, batch_shapes,
batch_sample_idxs);
}
}
}

Result<vector<Output>> Forward(const vector<Input>& inputs) {
OUTCOME_TRY(auto input_samples, CollectInputTensors(inputs));

vector<vector<vector<Tensor>>> batch_tensors;
vector<vector<TensorShape>> batch_shapes;
vector<vector<int>> batch_sample_indices;

SamplesToBatches(input_samples, inputs.size(), batch_tensors, batch_shapes,
batch_sample_indices);

vector<Output> outputs(inputs.size());
for (size_t i = 0; i < batch_tensors.size(); ++i) {
OUTCOME_TRY(net_->Reshape(batch_shapes[i]));
OUTCOME_TRY(CopyInputTensors(batch_tensors[i], batch_shapes[i]));
OUTCOME_TRY(net_->Forward());
OUTCOME_TRY(CopyOutputTensors(batch_sample_indices[i], outputs));
if (i + 1 < batch_tensors.size()) { // sync if not the last batch
OUTCOME_TRY(stream_.Wait());
}
}

// 1. calculate input shape
OUTCOME_TRY(auto input_shapes, InferInputShape(input_samples));
if (is_profiling_) {
OUTCOME_TRY(stream_.Wait());
}

// 2. call backend's reshape
OUTCOME_TRY(net_->Reshape(input_shapes));
return outputs;
}

// 3. fill input tensor
Result<void> CopyInputTensors(const vector<vector<Tensor>>& batch,
const vector<TensorShape>& shapes) const {
for (int i = 0; i < inputs_.size(); ++i) {
auto& src = input_samples[i];
auto& src = batch[i];
auto& dst = inputs_[i];
if (dst.shape() != input_shapes[i]) {
MMDEPLOY_ERROR("inconsistent input shape, expect {}, got {}", input_shapes[i], dst.shape());
if (dst.shape() != shapes[i]) {
MMDEPLOY_ERROR("inconsistent input shape, expect {}, got {}", shapes[i], dst.shape());
return Status(eFail);
}
if (src.size() > 1) {
for (int j = 0; j < src.size(); ++j) {
auto slice = dst.Slice(j);
OUTCOME_TRY(src[j].CopyTo(slice, stream_));
OUTCOME_TRY(dst.Slice(j).CopyFrom(src[j], stream_));
}
} else {
OUTCOME_TRY(src[0].CopyTo(dst, stream_));
OUTCOME_TRY(src.front().CopyTo(dst, stream_));
}
}
return success();
}

// 5. forward
OUTCOME_TRY(net_->Forward());

vector<Output> output(batch_size);
for (const auto& t : outputs_) {
auto name = output_mapping_.at(t.name());
auto desc = t.desc();
Result<void> CopyOutputTensors(const vector<int>& indices, vector<Output>& outputs) {
for (const auto& output : outputs_) {
auto name = output_mapping_.at(output.name());
auto desc = output.desc();
desc.device = device_;
Tensor tmp(desc);
if (tmp.size()) {
OUTCOME_TRY(t.CopyTo(tmp, stream_));
OUTCOME_TRY(output.CopyTo(tmp, stream_));
} else {
MMDEPLOY_WARN("copy skipped due to zero sized tensor");
}
if (output.size() > 1) {
for (int i = 0; i < output.size(); ++i) {
output[i].emplace(name, tmp.Slice(i));
if (indices.size() > 1) {
for (int i = 0; i < indices.size(); ++i) {
outputs[indices[i]].emplace(name, tmp.Slice(i));
}
} else {
output[0].emplace(name, std::move(tmp));
outputs[indices.front()].emplace(name, std::move(tmp));
}
}
if (is_profiling_) {
OUTCOME_TRY(stream_.Wait());
}

return output;
return success();
}

Device device_;
Expand All @@ -195,6 +287,7 @@ struct NetModule::Impl {
std::map<std::string, std::string> input_mapping_;
// outer scope to model output names
std::map<std::string, std::string> output_mapping_;
int max_batch_size_{1};
bool is_profiling_{false};
};

Expand Down