Skip to content

Commit

Permalink
[Refactor] Support batch inference with shape clustering (#1733)
Browse files Browse the repository at this point in the history
* refactor `NetModule`

* name

* fix sorting

* fix indices

(cherry picked from commit f5a05b5)
  • Loading branch information
lzhangzz authored and lvhan028 committed Mar 1, 2023
1 parent ef64224 commit 44aa3b6
Showing 1 changed file with 139 additions and 46 deletions.
185 changes: 139 additions & 46 deletions csrc/mmdeploy/net/net_module.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) OpenMMLab. All rights reserved.

#include "net_module.h"
#include "mmdeploy/net/net_module.h"

#include <algorithm>
#include <numeric>
#include <thread>

#include "mmdeploy/archive/value_archive.h"
Expand Down Expand Up @@ -31,6 +33,11 @@ struct NetModule::Impl {
is_profiling_ = true;
}
auto model = context["model"].get<Model>();
for (const auto& meta : model.meta().models) {
if (meta.name == name) {
max_batch_size_ = meta.batch_size;
}
}
OUTCOME_TRY(auto config, model.GetModelConfig(name));
device_ = context.value("device", Device{"cpu"});
stream_ = context.value("stream", Stream::GetDefault(device_));
Expand Down Expand Up @@ -78,112 +85,197 @@ struct NetModule::Impl {
return success();
}

Result<TensorShape> InferInputShape(const vector<Tensor>& input) {
static Result<TensorShape> InferBatchShape(const vector<Tensor>& input) {
auto batch_size = input.size();
auto& exemplar = input.front();
auto shape = exemplar.shape();
if (batch_size == 1) {
return shape;
}
if (shape[0] != 1) {
MMDEPLOY_ERROR("unsupported shape for batch assemble: {}", shape);
MMDEPLOY_WARN("unsupported shape for batch assemble: {}", shape);
return Status(eNotSupported);
}
for (int i = 1; i < input.size(); ++i) {
auto& sample = input[i];
if (sample.shape() != shape) {
MMDEPLOY_ERROR("shapes are not consistent across the batch");
MMDEPLOY_WARN("shapes are not consistent across the batch");
return Status(eNotSupported);
}
}
shape[0] = static_cast<int64_t>(batch_size);
return shape;
}

Result<vector<TensorShape> > InferInputShape(const vector<vector<Tensor> >& inputs) {
static Result<vector<TensorShape>> InferBatchShape(const vector<vector<Tensor>>& inputs) {
vector<TensorShape> shapes;
shapes.reserve(inputs.size());
for (const auto& input : inputs) {
OUTCOME_TRY(auto shape, InferInputShape(input));
OUTCOME_TRY(auto shape, InferBatchShape(input));
shapes.push_back(std::move(shape));
}
return shapes;
}

Result<std::vector<Output> > Forward(const std::vector<Input>& input) {
// auto t0 = std::chrono::high_resolution_clock::now();
//
auto batch_size = static_cast<int>(input.size());

std::vector<std::vector<Tensor> > input_samples;
Result<vector<vector<Tensor>>> CollectInputTensors(const vector<Input>& inputs) {
vector<vector<Tensor>> input_samples;
input_samples.reserve(inputs_.size());
for (const auto& t : inputs_) {
auto name = input_mapping_.at(t.name());
std::vector<Tensor> tmp;
tmp.reserve(input.size());
for (int i = 0; i < input.size(); ++i) {
auto& sample = input[i];
auto& tmp = input_samples.emplace_back();
for (const auto& sample : inputs) {
if (auto it = sample.find(name); it != sample.end()) {
tmp.push_back(it->second);
} else {
MMDEPLOY_ERROR("sample {} missing key {}", i, name);
MMDEPLOY_ERROR("sample {} missing key {}", &sample - inputs.data(), name);
return Status(eInvalidArgument);
}
}
input_samples.push_back(std::move(tmp));
}
return input_samples;
}

void SaveBatch(vector<vector<Tensor>> samples, vector<int> indices,
vector<vector<vector<Tensor>>>& batch_tensors,
vector<vector<TensorShape>>& batch_shapes,
vector<vector<int>>& batch_sample_idxs) const {
if (auto maybe_batch_shape = InferBatchShape(samples)) {
batch_shapes.push_back(maybe_batch_shape.value());
batch_tensors.push_back(std::move(samples));
batch_sample_idxs.push_back(std::move(indices));
} else {
// cannot assemble batch, do it one by one
for (int k = 0; k < indices.size(); ++k) {
auto& shapes = batch_shapes.emplace_back();
auto& batch = batch_tensors.emplace_back(inputs_.size());
batch_sample_idxs.push_back({indices[k]});
for (int j = 0; j < inputs_.size(); ++j) {
shapes.push_back(samples[j][k].shape());
batch[j].push_back(std::move(samples[j][k]));
}
}
}
}

void SamplesToBatches(const vector<vector<Tensor>>& input_samples, size_t n_samples,
vector<vector<vector<Tensor>>>& batch_tensors,
vector<vector<TensorShape>>& batch_shapes,
vector<vector<int>>& batch_sample_idxs) const {
// concat all shapes in samples to make comparison easier
vector<vector<int64_t>> concat_shapes;
concat_shapes.reserve(n_samples);
for (size_t i = 0; i < n_samples; ++i) {
auto& shape = concat_shapes.emplace_back();
for (const auto& input : input_samples) {
shape.insert(shape.end(), input[i].shape().begin(), input[i].shape().end());
}
}

// cluster samples by concatenated shapes
vector<int> shape_idxs(concat_shapes.size());
std::iota(shape_idxs.begin(), shape_idxs.end(), 0);
std::sort(shape_idxs.begin(), shape_idxs.end(),
[&concat_shapes](int i, int j) { return concat_shapes[i] < concat_shapes[j]; });
shape_idxs.erase(std::unique(shape_idxs.begin(), shape_idxs.end(),
[&concat_shapes](int i, int j) {
return concat_shapes[i] == concat_shapes[j];
}),
shape_idxs.end());

// generate batches of samples with equal shapes, limit the batch size by max_batch_size_
for (const auto ref_shape_idx : shape_idxs) {
const auto& ref_shape = concat_shapes[ref_shape_idx];
vector<vector<Tensor>> samples(inputs_.size());
vector<int> indices;
for (size_t i = 0; i < concat_shapes.size(); ++i) {
if (concat_shapes[i] == ref_shape) {
for (size_t j = 0; j < inputs_.size(); ++j) {
samples[j].push_back(input_samples[j][i]);
}
indices.push_back(static_cast<int>(i));
if (indices.size() == max_batch_size_) {
SaveBatch(std::move(samples), std::move(indices), batch_tensors, batch_shapes,
batch_sample_idxs);
samples = vector<vector<Tensor>>(inputs_.size());
indices = {};
}
}
}
if (!indices.empty()) {
SaveBatch(std::move(samples), std::move(indices), batch_tensors, batch_shapes,
batch_sample_idxs);
}
}
}

Result<vector<Output>> Forward(const vector<Input>& inputs) {
OUTCOME_TRY(auto input_samples, CollectInputTensors(inputs));

vector<vector<vector<Tensor>>> batch_tensors;
vector<vector<TensorShape>> batch_shapes;
vector<vector<int>> batch_sample_indices;

SamplesToBatches(input_samples, inputs.size(), batch_tensors, batch_shapes,
batch_sample_indices);

vector<Output> outputs(inputs.size());
for (size_t i = 0; i < batch_tensors.size(); ++i) {
OUTCOME_TRY(net_->Reshape(batch_shapes[i]));
OUTCOME_TRY(CopyInputTensors(batch_tensors[i], batch_shapes[i]));
OUTCOME_TRY(net_->Forward());
OUTCOME_TRY(CopyOutputTensors(batch_sample_indices[i], outputs));
if (i + 1 < batch_tensors.size()) { // sync if not the last batch
OUTCOME_TRY(stream_.Wait());
}
}

// 1. calculate input shape
OUTCOME_TRY(auto input_shapes, InferInputShape(input_samples));
if (is_profiling_) {
OUTCOME_TRY(stream_.Wait());
}

// 2. call backend's reshape
OUTCOME_TRY(net_->Reshape(input_shapes));
return outputs;
}

// 3. fill input tensor
Result<void> CopyInputTensors(const vector<vector<Tensor>>& batch,
const vector<TensorShape>& shapes) const {
for (int i = 0; i < inputs_.size(); ++i) {
auto& src = input_samples[i];
auto& src = batch[i];
auto& dst = inputs_[i];
if (dst.shape() != input_shapes[i]) {
MMDEPLOY_ERROR("inconsistent input shape, expect {}, got {}", input_shapes[i], dst.shape());
if (dst.shape() != shapes[i]) {
MMDEPLOY_ERROR("inconsistent input shape, expect {}, got {}", shapes[i], dst.shape());
return Status(eFail);
}
if (src.size() > 1) {
for (int j = 0; j < src.size(); ++j) {
auto slice = dst.Slice(j);
OUTCOME_TRY(src[j].CopyTo(slice, stream_));
OUTCOME_TRY(dst.Slice(j).CopyFrom(src[j], stream_));
}
} else {
OUTCOME_TRY(src[0].CopyTo(dst, stream_));
OUTCOME_TRY(src.front().CopyTo(dst, stream_));
}
}
return success();
}

// 5. forward
OUTCOME_TRY(net_->Forward());

vector<Output> output(batch_size);
for (const auto& t : outputs_) {
auto name = output_mapping_.at(t.name());
auto desc = t.desc();
Result<void> CopyOutputTensors(const vector<int>& indices, vector<Output>& outputs) {
for (const auto& output : outputs_) {
auto name = output_mapping_.at(output.name());
auto desc = output.desc();
desc.device = device_;
Tensor tmp(desc);
if (tmp.size()) {
OUTCOME_TRY(t.CopyTo(tmp, stream_));
OUTCOME_TRY(output.CopyTo(tmp, stream_));
} else {
MMDEPLOY_WARN("copy skipped due to zero sized tensor");
}
if (output.size() > 1) {
for (int i = 0; i < output.size(); ++i) {
output[i].emplace(name, tmp.Slice(i));
if (indices.size() > 1) {
for (int i = 0; i < indices.size(); ++i) {
outputs[indices[i]].emplace(name, tmp.Slice(i));
}
} else {
output[0].emplace(name, std::move(tmp));
outputs[indices.front()].emplace(name, std::move(tmp));
}
}
if (is_profiling_) {
OUTCOME_TRY(stream_.Wait());
}

return output;
return success();
}

Device device_;
Expand All @@ -195,6 +287,7 @@ struct NetModule::Impl {
std::map<std::string, std::string> input_mapping_;
// outer scope to model output names
std::map<std::string, std::string> output_mapping_;
int max_batch_size_{1};
bool is_profiling_{false};
};

Expand Down

0 comments on commit 44aa3b6

Please sign in to comment.