Skip to content

Commit

Permalink
Refine algorithm for WriteAmpBasedRateLimiter (#213)
Browse files Browse the repository at this point in the history
* Bugfix
Only compaction triggers auto-tuner to collect necessary data for training rate limit. When compaction frequency is low, data from long period of time is fused into one sample, causing inaccurate estimation. Fix this issue by looping through missing timeslice.
Recent window size (10s) is too small, make it 30s.

* Better support for low pressure scenarios
Before this PR, flush flow is padded to 20MB/s which makes rate limit always larger than 28MB/s. After removing this restriction, we notice that it's easier to accumulate pending bytes under low pressure. Adjust the padding calculation to partially resolve this problem.
Also notice that with new formula, the minimal rate limit is still around 28MB/s.

* Control reshuffle
Remove the use of long term sampler, instead enlarge the window of short term sampler. Reduce the use of `ratio_delta` which often causes unnecessary jitters. With algorithm being simplified, we can now deduce the actual limit by prometheus expression `sum(rate(tikv_engine_compaction_flow_bytes{instance=~"$instance", db="kv", type="bytes_written"}[5m]))`

* Normal pace up
Add normal pace in addition to critical pace up to reduce pending bytes issue.

Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie authored Dec 18, 2020
1 parent e62a752 commit 7d209a8
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 58 deletions.
7 changes: 5 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,9 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
// compaction.
write_controller_token_ =
write_controller->GetCompactionPressureToken();
if (rate_limiter) {
rate_limiter->PaceUp(false /*critical*/);
}
if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
ROCKS_LOG_INFO(
ioptions_.info_log,
Expand Down Expand Up @@ -881,8 +884,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
vstorage->l0_delay_trigger_count() >=
0.8 * mutable_cf_options.level0_slowdown_writes_trigger ||
vstorage->estimated_compaction_needed_bytes() >=
0.6 * mutable_cf_options.soft_pending_compaction_bytes_limit) {
rate_limiter->PaceUp();
0.5 * mutable_cf_options.soft_pending_compaction_bytes_limit) {
rate_limiter->PaceUp(true /*critical*/);
}
}
prev_compaction_needed_bytes_ = compaction_needed_bytes;
Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class RateLimiter {
return true;
}

virtual void PaceUp() {}
virtual void PaceUp(bool /*critical*/) {}

protected:
Mode GetMode() { return mode_; }
Expand Down
111 changes: 64 additions & 47 deletions utilities/rate_limiters/write_amp_based_rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ struct WriteAmpBasedRateLimiter::Req {
bool granted;
};

namespace {
constexpr int kSecondsPerTune = 1;
constexpr int kMillisPerTune = 1000 * kSecondsPerTune;
constexpr int kMicrosPerTune = 1000 * 1000 * kSecondsPerTune;

// Two reasons for adding padding to baseline limit:
// 1. compaction cannot fully utilize the IO quota we set.
// 2. make it faster to digest unexpected burst of pending compaction bytes,
// generally this will help flatten IO waves.
// Padding is calculated through hyperbola based on empirical percentage of 10%
// and special care for low-pressure domain. E.g. coordinates (5M, 18M) and
// (10M, 16M) are on this curve.
int64_t CalculatePadding(int64_t base) {
return base / 10 + 577464606419583ll / (base + 26225305);
}
} // unnamed namespace

WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness,
Expand All @@ -51,8 +68,9 @@ WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(int64_t rate_bytes_per_sec,
tuned_time_(NowMicrosMonotonic(env_)),
duration_highpri_bytes_through_(0),
duration_bytes_through_(0),
should_pace_up_(false),
ratio_delta_(0) {
critical_pace_up_(false),
normal_pace_up_(false),
percent_delta_(0) {
total_requests_[0] = 0;
total_requests_[1] = 0;
total_bytes_through_[0] = 0;
Expand Down Expand Up @@ -114,15 +132,15 @@ void WriteAmpBasedRateLimiter::SetActualBytesPerSecond(

void WriteAmpBasedRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
Statistics* stats) {
static constexpr int kSecondsPerTune = 1;
static constexpr int kMicrosPerTune = 1000 * 1000 * kSecondsPerTune;
TEST_SYNC_POINT("WriteAmpBasedRateLimiter::Request");
TEST_SYNC_POINT_CALLBACK("WriteAmpBasedRateLimiter::Request:1",
&rate_bytes_per_sec_);
if (auto_tuned_.load(std::memory_order_acquire) && pri == Env::IO_HIGH &&
duration_highpri_bytes_through_ + duration_bytes_through_ + bytes <=
max_bytes_per_sec_.load(std::memory_order_relaxed) *
kSecondsPerTune) {
// In the case where low-priority request is absent, actual time elapsed
// will be larger than kSecondsPerTune, making the limit even tighter.
total_bytes_through_[Env::IO_HIGH] += bytes;
++total_requests_[Env::IO_HIGH];
duration_highpri_bytes_through_ += bytes;
Expand Down Expand Up @@ -297,17 +315,13 @@ int64_t WriteAmpBasedRateLimiter::CalculateRefillBytesPerPeriod(
}

Status WriteAmpBasedRateLimiter::Tune() {
// computed rate limit will be larger than `kMinBytesPerSec`
const int64_t kMinBytesPerSec = 10 * 1024 * 1024;
// high-priority bytes are padded to 20MB
const int64_t kHighBytesLower = 20 * 1024 * 1024;
// computed rate limit will be larger than 10MB/s
const int64_t kMinBytesPerSec = 10 << 20;
// high-priority bytes are padded to 8MB
const int64_t kHighBytesLower = 8 << 20;
// lower bound for write amplification estimation
const int kRatioLower = 12;
// Two reasons for using a ratio larger than estimation:
// 1. compaction cannot fully utilize the IO quota we set.
// 2. make it faster to digest unexpected burst of pending compaction bytes,
// generally this will help flatten IO waves.
const int kRatioPaddingPercent = 18;
const int kRatioLower = 10;
const int kPercentDeltaMax = 6;

std::chrono::microseconds prev_tuned_time = tuned_time_;
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_));
Expand All @@ -317,46 +331,45 @@ Status WriteAmpBasedRateLimiter::Tune() {

int64_t prev_bytes_per_sec = GetBytesPerSecond();

bytes_sampler_.AddSample(duration_bytes_through_ * 1000 / duration_ms);
highpri_bytes_sampler_.AddSample(duration_highpri_bytes_through_ * 1000 /
duration_ms);
if (bytes_sampler_.AtTimePoint()) {
long_term_bytes_sampler_.AddSample(bytes_sampler_.GetFullValue());
long_term_highpri_bytes_sampler_.AddSample(
highpri_bytes_sampler_.GetFullValue());
// Loop through the actual time slice to make sure bytes flow from long period
// of time is properly estimated when the compaction rate is low.
for (uint32_t i = 0; i < duration_ms / kMillisPerTune; i++) {
bytes_sampler_.AddSample(duration_bytes_through_ * 1000 / duration_ms);
highpri_bytes_sampler_.AddSample(duration_highpri_bytes_through_ * 1000 /
duration_ms);
limit_bytes_sampler_.AddSample(prev_bytes_per_sec);
}
limit_bytes_sampler_.AddSample(prev_bytes_per_sec);
// As LSM grows higher, it tends to generate compaction tasks in waves
// (cascaded). We use extra long-term window to help reduce this fluctuation.
int32_t ratio = std::max(
kRatioLower, static_cast<int32_t>(
long_term_bytes_sampler_.GetFullValue() * 10 /
std::max(long_term_highpri_bytes_sampler_.GetFullValue(),
kHighBytesLower)));
ratio = std::max(ratio, static_cast<int32_t>(
bytes_sampler_.GetFullValue() * 10 /
std::max(highpri_bytes_sampler_.GetFullValue(),
kHighBytesLower)));
int32_t ratio_padding = ratio * kRatioPaddingPercent / 100;
kRatioLower,
static_cast<int32_t>(
bytes_sampler_.GetFullValue() * 10 /
std::max(highpri_bytes_sampler_.GetFullValue(), kHighBytesLower)));

// in case there are compaction bursts even when online writes are stable
auto util = bytes_sampler_.GetRecentValue() * 100 /
limit_bytes_sampler_.GetRecentValue();
if (util > 98) {
ratio_delta_ += 1;
} else if (util < 95 && ratio_delta_ > 0) {
ratio_delta_ -= 1;
}
if (should_pace_up_.load(std::memory_order_relaxed)) {
if (ratio_delta_ < 60) {
ratio_delta_ += 60; // effect lasts for at least 60 * kSecondsPerTune = 1m
int64_t util = bytes_sampler_.GetRecentValue() * 1000 /
limit_bytes_sampler_.GetRecentValue();
if (util >= 995) {
if (percent_delta_ < kPercentDeltaMax) {
percent_delta_ += 1;
}
should_pace_up_.store(false, std::memory_order_relaxed);
} else if (percent_delta_ > 0) {
percent_delta_ -= 1;
}

int64_t new_bytes_per_sec =
(ratio + ratio_padding + ratio_delta_) *
ratio *
std::max(highpri_bytes_sampler_.GetRecentValue(), kHighBytesLower) / 10;
int64_t padding = CalculatePadding(new_bytes_per_sec);
if (critical_pace_up_.load(std::memory_order_relaxed)) {
percent_delta_ = 150;
critical_pace_up_.store(false, std::memory_order_relaxed);
} else if (normal_pace_up_.load(std::memory_order_relaxed)) {
percent_delta_ =
std::max(percent_delta_,
static_cast<uint32_t>(padding * 150 / new_bytes_per_sec));
normal_pace_up_.store(false, std::memory_order_relaxed);
}
new_bytes_per_sec += padding + new_bytes_per_sec * percent_delta_ / 100;
new_bytes_per_sec =
std::max(kMinBytesPerSec,
std::min(new_bytes_per_sec,
Expand All @@ -371,9 +384,13 @@ Status WriteAmpBasedRateLimiter::Tune() {
return Status::OK();
}

void WriteAmpBasedRateLimiter::PaceUp() {
void WriteAmpBasedRateLimiter::PaceUp(bool critical) {
if (auto_tuned_.load(std::memory_order_acquire)) {
should_pace_up_.store(true, std::memory_order_relaxed);
if (critical) {
critical_pace_up_.store(true, std::memory_order_relaxed);
} else {
normal_pace_up_.store(true, std::memory_order_relaxed);
}
}
}

Expand Down
15 changes: 7 additions & 8 deletions utilities/rate_limiters/write_amp_based_rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
return auto_tuned_.load(std::memory_order_acquire);
}

virtual void PaceUp() override;
virtual void PaceUp(bool critical) override;

private:
void Refill();
Expand Down Expand Up @@ -151,18 +151,17 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
int64_t recent_sum_{0};
};

static constexpr size_t kSmoothWindowSize = 120; // 120 * 1s = 2m
static constexpr size_t kRecentSmoothWindowSize = 10; // 10 * 1s = 10s
static constexpr size_t kLongTermWindowSize = 15; // 15 * 2m = 30m
static constexpr size_t kSmoothWindowSize = 300; // 300 * 1s = 5m
static constexpr size_t kRecentSmoothWindowSize = 30; // 30 * 1s = 30s

WindowSmoother<kSmoothWindowSize, kRecentSmoothWindowSize> bytes_sampler_;
WindowSmoother<kSmoothWindowSize, kRecentSmoothWindowSize>
highpri_bytes_sampler_;
WindowSmoother<kLongTermWindowSize> long_term_bytes_sampler_;
WindowSmoother<kLongTermWindowSize> long_term_highpri_bytes_sampler_;
WindowSmoother<kRecentSmoothWindowSize, kRecentSmoothWindowSize>
limit_bytes_sampler_;
std::atomic<bool> should_pace_up_;
int32_t ratio_delta_;
std::atomic<bool> critical_pace_up_;
std::atomic<bool> normal_pace_up_;
uint32_t percent_delta_;
};

} // namespace rocksdb

0 comments on commit 7d209a8

Please sign in to comment.