diff --git a/HISTORY.md b/HISTORY.md index 80528da26b1..06bf38fe98c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -20,12 +20,14 @@ * When using block cache strict capacity limit (`LRUCache` with `strict_capacity_limit=true`), DB operations now fail with Status code `kAborted` subcode `kMemoryLimit` (`IsMemoryLimit()`) instead of `kIncomplete` (`IsIncomplete()`) when the capacity limit is reached, because Incomplete can mean other specific things for some operations. In more detail, `Cache::Insert()` now returns the updated Status code and this usually propagates through RocksDB to the user on failure. * NewClockCache calls temporarily return an LRUCache (with similar characteristics as the desired ClockCache). This is because ClockCache is being replaced by a new version (the old one had unknown bugs) but this is still under development. * Add two functions `int ReserveThreads(int threads_to_be_reserved)` and `int ReleaseThreads(threads_to_be_released)` into `Env` class. In the default implementation, both return 0. Newly added `xxxEnv` class that inherits `Env` should implement these two functions for thread reservation/releasing features. +* Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions. ### Bug Fixes * Fix a bug in which backup/checkpoint can include a WAL deleted by RocksDB. * Fix a bug where concurrent compactions might cause unnecessary further write stalling. In some cases, this might cause write rate to drop to minimum. * Fix a bug in Logger where if dbname and db_log_dir are on different filesystems, dbname creation would fail wrt to db_log_dir path returning an error and fails to open the DB. -* Fix a CPU and memory efficiency issue introduce by https://github.com/facebook/rocksdb/pull/8336 which made InternalKeyComparator configurable as an unintended side effect. +* Fix a CPU and memory efficiency issue introduce by https://github.com/facebook/rocksdb/pull/8336 which made InternalKeyComparator configurable as an unintended side effect +* Fix a bug where `GenericRateLimiter` could revert the bandwidth set dynamically using `SetBytesPerSecond()` when a user configures a structure enclosing it, e.g., using `GetOptionsFromString()` to configure an `Options` that references an existing `RateLimiter` object. ## Behavior Change * In leveled compaction with dynamic levelling, level multiplier is not anymore adjusted due to oversized L0. Instead, compaction score is adjusted by increasing size level target by adding incoming bytes from upper levels. This would deprioritize compactions from upper levels if more data from L0 is coming. This is to fix some unnecessary full stalling due to drastic change of level targets, while not wasting write bandwidth for compaction while writes are overloaded. diff --git a/db/db_test.cc b/db/db_test.cc index 1e430d08c8e..9defb81d060 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4101,9 +4101,6 @@ class MockedRateLimiterWithNoOptionalAPIImpl : public RateLimiter { ~MockedRateLimiterWithNoOptionalAPIImpl() override {} - const char* Name() const override { - return "MockedRateLimiterWithNoOptionalAPI"; - } void SetBytesPerSecond(int64_t bytes_per_second) override { (void)bytes_per_second; } diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 203d73dcfa2..9cad6edf4aa 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -9,7 +9,6 @@ #pragma once -#include "rocksdb/customizable.h" #include "rocksdb/env.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" @@ -19,7 +18,7 @@ namespace ROCKSDB_NAMESPACE { // Exceptions MUST NOT propagate out of overridden functions into RocksDB, // because RocksDB is not exception-safe. This could cause undefined behavior // including data loss, unreported corruption, deadlocks, and more. -class RateLimiter : public Customizable { +class RateLimiter { public: enum class OpType { kRead, @@ -32,20 +31,11 @@ class RateLimiter : public Customizable { kAllIo, }; - static const char* Type() { return "RateLimiter"; } - static Status CreateFromString(const ConfigOptions& options, - const std::string& value, - std::shared_ptr* result); - // For API compatibility, default to rate-limiting writes only. - explicit RateLimiter(Mode mode = Mode::kWritesOnly); + explicit RateLimiter(Mode mode = Mode::kWritesOnly) : mode_(mode) {} virtual ~RateLimiter() {} - // Deprecated. Will be removed in a major release. Derived classes - // should implement this method. - virtual const char* Name() const override { return ""; } - // This API allows user to dynamically change rate limiter's bytes per second. // REQUIRED: bytes_per_second > 0 virtual void SetBytesPerSecond(int64_t bytes_per_second) = 0; @@ -135,7 +125,7 @@ class RateLimiter : public Customizable { Mode GetMode() { return mode_; } private: - Mode mode_; + const Mode mode_; }; // Create a RateLimiter object, which can be shared among RocksDB instances to diff --git a/options/customizable_test.cc b/options/customizable_test.cc index 61c0eb36876..e7ab2cd08f9 100644 --- a/options/customizable_test.cc +++ b/options/customizable_test.cc @@ -27,7 +27,6 @@ #include "rocksdb/filter_policy.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/memory_allocator.h" -#include "rocksdb/rate_limiter.h" #include "rocksdb/secondary_cache.h" #include "rocksdb/slice_transform.h" #include "rocksdb/sst_partitioner.h" @@ -42,7 +41,6 @@ #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/file_checksum_helper.h" -#include "util/rate_limiter.h" #include "util/string_util.h" #include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h" #include "utilities/memory_allocators.h" @@ -1472,21 +1470,6 @@ class MockFileChecksumGenFactory : public FileChecksumGenFactory { } }; -class MockRateLimiter : public RateLimiter { - public: - static const char* kClassName() { return "MockRateLimiter"; } - const char* Name() const override { return kClassName(); } - void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {} - int64_t GetBytesPerSecond() const override { return 0; } - int64_t GetSingleBurstBytes() const override { return 0; } - int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override { - return 0; - } - int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override { - return 0; - } -}; - class MockFilterPolicy : public FilterPolicy { public: static const char* kClassName() { return "MockFilterPolicy"; } @@ -1618,14 +1601,6 @@ static int RegisterLocalObjects(ObjectLibrary& library, return guard->get(); }); - library.AddFactory( - MockRateLimiter::kClassName(), - [](const std::string& /*uri*/, std::unique_ptr* guard, - std::string* /* errmsg */) { - guard->reset(new MockRateLimiter()); - return guard->get(); - }); - library.AddFactory( MockFilterPolicy::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* guard, @@ -2149,37 +2124,6 @@ TEST_F(LoadCustomizableTest, LoadMemoryAllocatorTest) { } } -TEST_F(LoadCustomizableTest, LoadRateLimiterTest) { -#ifndef ROCKSDB_LITE - ASSERT_OK(TestSharedBuiltins(MockRateLimiter::kClassName(), - GenericRateLimiter::kClassName())); -#else - ASSERT_OK(TestSharedBuiltins(MockRateLimiter::kClassName(), "")); -#endif // ROCKSDB_LITE - - std::shared_ptr result; - ASSERT_OK(RateLimiter::CreateFromString( - config_options_, std::string(GenericRateLimiter::kClassName()) + ":1234", - &result)); - ASSERT_NE(result, nullptr); - ASSERT_TRUE(result->IsInstanceOf(GenericRateLimiter::kClassName())); -#ifndef ROCKSDB_LITE - ASSERT_OK(GetDBOptionsFromString( - config_options_, db_opts_, - std::string("rate_limiter=") + GenericRateLimiter::kClassName(), - &db_opts_)); - ASSERT_NE(db_opts_.rate_limiter, nullptr); - if (RegisterTests("Test")) { - ExpectCreateShared(MockRateLimiter::kClassName()); - ASSERT_OK(GetDBOptionsFromString( - config_options_, db_opts_, - std::string("rate_limiter=") + MockRateLimiter::kClassName(), - &db_opts_)); - ASSERT_NE(db_opts_.rate_limiter, nullptr); - } -#endif // ROCKSDB_LITE -} - TEST_F(LoadCustomizableTest, LoadFilterPolicyTest) { const std::string kAutoBloom = BloomFilterPolicy::kClassName(); const std::string kAutoRibbon = RibbonFilterPolicy::kClassName(); diff --git a/options/db_options.cc b/options/db_options.cc index 92c56398daf..e0bc892fc03 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -421,12 +421,11 @@ static std::unordered_map {"db_host_id", {offsetof(struct ImmutableDBOptions, db_host_id), OptionType::kString, OptionVerificationType::kNormal, OptionTypeFlags::kCompareNever}}, + // Temporarily deprecated due to race conditions (examples in PR 10375). {"rate_limiter", - OptionTypeInfo::AsCustomSharedPtr( - offsetof(struct ImmutableDBOptions, rate_limiter), - OptionVerificationType::kNormal, - OptionTypeFlags::kCompareNever | OptionTypeFlags::kAllowNull)}, - + {offsetof(struct ImmutableDBOptions, rate_limiter), + OptionType::kUnknown, OptionVerificationType::kDeprecated, + OptionTypeFlags::kDontSerialize | OptionTypeFlags::kCompareNever}}, // The following properties were handled as special cases in ParseOption // This means that the properties could be read from the options file // but never written to the file or compared to each other. diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index f369e3220bd..3e3fe1787bf 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -13,14 +13,9 @@ #include "monitoring/statistics.h" #include "port/port.h" -#include "rocksdb/convenience.h" #include "rocksdb/system_clock.h" -#include "rocksdb/utilities/customizable_util.h" -#include "rocksdb/utilities/object_registry.h" -#include "rocksdb/utilities/options_type.h" #include "test_util/sync_point.h" #include "util/aligned_buffer.h" -#include "util/string_util.h" namespace ROCKSDB_NAMESPACE { size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, @@ -50,68 +45,33 @@ struct GenericRateLimiter::Req { bool granted; }; -static std::unordered_map - generic_rate_limiter_type_info = { -#ifndef ROCKSDB_LITE - {"rate_bytes_per_sec", - {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - max_bytes_per_sec), - OptionType::kInt64T}}, - {"refill_period_us", - {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - refill_period_us), - OptionType::kInt64T}}, - {"fairness", - {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - fairness), - OptionType::kInt32T}}, - {"auto_tuned", - {offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - auto_tuned), - OptionType::kBoolean}}, - {"clock", - OptionTypeInfo::AsCustomSharedPtr( - offsetof(struct GenericRateLimiter::GenericRateLimiterOptions, - clock), - OptionVerificationType::kByNameAllowFromNull, - OptionTypeFlags::kAllowNull)}, -#endif // ROCKSDB_LITE -}; - GenericRateLimiter::GenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness, RateLimiter::Mode mode, const std::shared_ptr& clock, bool auto_tuned) : RateLimiter(mode), - options_(rate_bytes_per_sec, refill_period_us, fairness, clock, - auto_tuned), + refill_period_us_(refill_period_us), + rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 + : rate_bytes_per_sec), + refill_bytes_per_period_( + CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), + clock_(clock), stop_(false), exit_cv_(&request_mutex_), requests_to_wait_(0), available_bytes_(0), + next_refill_us_(NowMicrosMonotonic()), + fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), wait_until_refill_pending_(false), - num_drains_(0) { - RegisterOptions(&options_, &generic_rate_limiter_type_info); + auto_tuned_(auto_tuned), + num_drains_(0), + max_bytes_per_sec_(rate_bytes_per_sec), + tuned_time_(NowMicrosMonotonic()) { for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { total_requests_[i] = 0; total_bytes_through_[i] = 0; } - Initialize(); -} -void GenericRateLimiter::Initialize() { - if (options_.clock == nullptr) { - options_.clock = SystemClock::Default(); - } - options_.fairness = std::min(options_.fairness, 100); - next_refill_us_ = NowMicrosMonotonic(); - tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); - if (options_.auto_tuned) { - rate_bytes_per_sec_ = options_.max_bytes_per_sec / 2; - } else { - rate_bytes_per_sec_ = options_.max_bytes_per_sec; - } - refill_bytes_per_period_ = CalculateRefillBytesPerPeriod(rate_bytes_per_sec_); } GenericRateLimiter::~GenericRateLimiter() { @@ -135,18 +95,6 @@ GenericRateLimiter::~GenericRateLimiter() { } } -Status GenericRateLimiter::PrepareOptions(const ConfigOptions& options) { - if (options_.fairness <= 0) { - return Status::InvalidArgument("Fairness must be > 0"); - } else if (options_.max_bytes_per_sec <= 0) { - return Status::InvalidArgument("max_bytes_per_sec must be > 0"); - } else if (options_.refill_period_us <= 0) { - return Status::InvalidArgument("Refill_period_us must be > 0"); - } - Initialize(); - return RateLimiter::PrepareOptions(options); -} - // This API allows user to dynamically change rate limiter's bytes per second. void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { assert(bytes_per_second > 0); @@ -165,11 +113,11 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, &rate_bytes_per_sec_); MutexLock g(&request_mutex_); - if (options_.auto_tuned) { + if (auto_tuned_) { static const int kRefillsPerTune = 100; std::chrono::microseconds now(NowMicrosMonotonic()); - if (now - tuned_time_ >= kRefillsPerTune * std::chrono::microseconds( - options_.refill_period_us)) { + if (now - tuned_time_ >= + kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { Status s = Tune(); s.PermitUncheckedError(); //**TODO: What to do on error? } @@ -213,7 +161,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } else { // Whichever thread reaches here first performs duty (1) as described // above. - int64_t wait_until = options_.clock->NowMicros() + time_until_refill_us; + int64_t wait_until = clock_->NowMicros() + time_until_refill_us; RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); ++num_drains_; wait_until_refill_pending_ = true; @@ -273,12 +221,12 @@ GenericRateLimiter::GeneratePriorityIterationOrder() { // first pri_iteration_order[0] = Env::IO_USER; - bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(options_.fairness); + bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_); TEST_SYNC_POINT_CALLBACK( "GenericRateLimiter::GeneratePriorityIterationOrder::" "PostRandomOneInFairnessForHighPri", &high_pri_iterated_after_mid_low_pri); - bool mid_pri_itereated_after_low_pri = rnd_.OneIn(options_.fairness); + bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_); TEST_SYNC_POINT_CALLBACK( "GenericRateLimiter::GeneratePriorityIterationOrder::" "PostRandomOneInFairnessForMidPri", @@ -307,7 +255,7 @@ GenericRateLimiter::GeneratePriorityIterationOrder() { void GenericRateLimiter::RefillBytesAndGrantRequests() { TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); - next_refill_us_ = NowMicrosMonotonic() + options_.refill_period_us; + next_refill_us_ = NowMicrosMonotonic() + refill_period_us_; // Carry over the left over quota from the last period auto refill_bytes_per_period = refill_bytes_per_period_.load(std::memory_order_relaxed); @@ -348,12 +296,12 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( int64_t rate_bytes_per_sec) { if (std::numeric_limits::max() / rate_bytes_per_sec < - options_.refill_period_us) { + refill_period_us_) { // Avoid unexpected result in the overflow case. The result now is still // inaccurate but is a number that is large enough. return std::numeric_limits::max() / 1000000; } else { - return rate_bytes_per_sec * options_.refill_period_us / 1000000; + return rate_bytes_per_sec * refill_period_us_ / 1000000; } } @@ -368,11 +316,10 @@ Status GenericRateLimiter::Tune() { std::chrono::microseconds prev_tuned_time = tuned_time_; tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic()); - int64_t elapsed_intervals = - (tuned_time_ - prev_tuned_time + - std::chrono::microseconds(options_.refill_period_us) - - std::chrono::microseconds(1)) / - std::chrono::microseconds(options_.refill_period_us); + int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + + std::chrono::microseconds(refill_period_us_) - + std::chrono::microseconds(1)) / + std::chrono::microseconds(refill_period_us_); // We tune every kRefillsPerTune intervals, so the overflow and division-by- // zero conditions should never happen. assert(num_drains_ <= std::numeric_limits::max() / 100); @@ -382,13 +329,13 @@ Status GenericRateLimiter::Tune() { int64_t prev_bytes_per_sec = GetBytesPerSecond(); int64_t new_bytes_per_sec; if (drained_pct == 0) { - new_bytes_per_sec = options_.max_bytes_per_sec / kAllowedRangeFactor; + new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor; } else if (drained_pct < kLowWatermarkPct) { // sanitize to prevent overflow int64_t sanitized_prev_bytes_per_sec = std::min(prev_bytes_per_sec, std::numeric_limits::max() / 100); new_bytes_per_sec = - std::max(options_.max_bytes_per_sec / kAllowedRangeFactor, + std::max(max_bytes_per_sec_ / kAllowedRangeFactor, sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); } else if (drained_pct > kHighWatermarkPct) { // sanitize to prevent overflow @@ -396,7 +343,7 @@ Status GenericRateLimiter::Tune() { std::min(prev_bytes_per_sec, std::numeric_limits::max() / (100 + kAdjustFactorPct)); new_bytes_per_sec = - std::min(options_.max_bytes_per_sec, + std::min(max_bytes_per_sec_, sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); } else { new_bytes_per_sec = prev_bytes_per_sec; @@ -419,79 +366,7 @@ RateLimiter* NewGenericRateLimiter( std::unique_ptr limiter( new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, mode, SystemClock::Default(), auto_tuned)); - Status s = limiter->PrepareOptions(ConfigOptions()); - if (s.ok()) { - return limiter.release(); - } else { - assert(false); - return nullptr; - } -} -namespace { -#ifndef ROCKSDB_LITE -static int RegisterBuiltinRateLimiters(ObjectLibrary& library, - const std::string& /*arg*/) { - library.AddFactory( - GenericRateLimiter::kClassName(), - [](const std::string& /*uri*/, std::unique_ptr* guard, - std::string* /*errmsg*/) { - guard->reset( - new GenericRateLimiter(std::numeric_limits::max())); - return guard->get(); - }); - size_t num_types; - return static_cast(library.GetFactoryCount(&num_types)); -} - -static std::unordered_map - rate_limiter_mode_map = { - {"kReadsOnly", RateLimiter::Mode::kReadsOnly}, - {"kWritesOnly", RateLimiter::Mode::kWritesOnly}, - {"kAllIo", RateLimiter::Mode::kAllIo}, -}; -#endif // ROCKSDB_LITE -static bool LoadRateLimiter(const std::string& name, - std::shared_ptr* limiter) { - auto plen = strlen(GenericRateLimiter::kClassName()); - if (name.size() > plen + 2 && name[plen] == ':' && - StartsWith(name, GenericRateLimiter::kClassName())) { - auto rate = ParseInt64(name.substr(plen + 1)); - limiter->reset(new GenericRateLimiter(rate)); - return true; - } else { - return false; - } -} - -static std::unordered_map rate_limiter_type_info = - { -#ifndef ROCKSDB_LITE - {"mode", - OptionTypeInfo::Enum(0, &rate_limiter_mode_map)}, -#endif // ROCKSDB_LITE -}; -} // namespace - -RateLimiter::RateLimiter(Mode mode) : mode_(mode) { - RegisterOptions("", &mode_, &rate_limiter_type_info); -} - -Status RateLimiter::CreateFromString(const ConfigOptions& config_options, - const std::string& value, - std::shared_ptr* result) { - if (value.empty()) { - result->reset(); - return Status::OK(); - } else { -#ifndef ROCKSDB_LITE - static std::once_flag once; - std::call_once(once, [&]() { - RegisterBuiltinRateLimiters(*(ObjectLibrary::Default().get()), ""); - }); -#endif // ROCKSDB_LITE - return LoadSharedObject(config_options, value, LoadRateLimiter, - result); - } + return limiter.release(); } } // namespace ROCKSDB_NAMESPACE diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 75751d3c5d8..7f01864c5b8 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -26,38 +26,13 @@ namespace ROCKSDB_NAMESPACE { class GenericRateLimiter : public RateLimiter { public: - struct GenericRateLimiterOptions { - static const char* kName() { return "GenericRateLimiterOptions"; } - GenericRateLimiterOptions(int64_t _rate_bytes_per_sec, - int64_t _refill_period_us, int32_t _fairness, - const std::shared_ptr& _clock, - bool _auto_tuned) - : max_bytes_per_sec(_rate_bytes_per_sec), - refill_period_us(_refill_period_us), - clock(_clock), - fairness(_fairness > 100 ? 100 : _fairness), - auto_tuned(_auto_tuned) {} - int64_t max_bytes_per_sec; - int64_t refill_period_us; - std::shared_ptr clock; - int32_t fairness; - bool auto_tuned; - }; - - public: - explicit GenericRateLimiter( - int64_t refill_bytes, int64_t refill_period_us = 100 * 1000, - int32_t fairness = 10, - RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly, - const std::shared_ptr& clock = nullptr, - bool auto_tuned = false); + GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us, + int32_t fairness, RateLimiter::Mode mode, + const std::shared_ptr& clock, + bool auto_tuned); virtual ~GenericRateLimiter(); - static const char* kClassName() { return "GenericRateLimiter"; } - const char* Name() const override { return kClassName(); } - Status PrepareOptions(const ConfigOptions& options) override; - // This API allows user to dynamically change rate limiter's bytes per second. virtual void SetBytesPerSecond(int64_t bytes_per_second) override; @@ -120,25 +95,29 @@ class GenericRateLimiter : public RateLimiter { return rate_bytes_per_sec_; } + virtual void TEST_SetClock(std::shared_ptr clock) { + MutexLock g(&request_mutex_); + clock_ = std::move(clock); + next_refill_us_ = NowMicrosMonotonic(); + } + private: - void Initialize(); void RefillBytesAndGrantRequests(); std::vector GeneratePriorityIterationOrder(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); Status Tune(); - uint64_t NowMicrosMonotonic() { - return options_.clock->NowNanos() / std::milli::den; - } + uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; } // This mutex guard all internal states mutable port::Mutex request_mutex_; - GenericRateLimiterOptions options_; + const int64_t refill_period_us_; int64_t rate_bytes_per_sec_; // This variable can be changed dynamically. std::atomic refill_bytes_per_period_; + std::shared_ptr clock_; bool stop_; port::CondVar exit_cv_; @@ -149,13 +128,16 @@ class GenericRateLimiter : public RateLimiter { int64_t available_bytes_; int64_t next_refill_us_; + int32_t fairness_; Random rnd_; struct Req; std::deque queue_[Env::IO_TOTAL]; bool wait_until_refill_pending_; + bool auto_tuned_; int64_t num_drains_; + const int64_t max_bytes_per_sec_; std::chrono::microseconds tuned_time_; }; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index cd809d183f5..5691ab26c31 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -15,11 +15,8 @@ #include #include "db/db_test_util.h" -#include "options/options_parser.h" #include "port/port.h" -#include "rocksdb/convenience.h" #include "rocksdb/system_clock.h" -#include "rocksdb/utilities/options_type.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/random.h" @@ -466,95 +463,6 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); } -TEST_F(RateLimiterTest, CreateGenericRateLimiterFromString) { - std::shared_ptr limiter; - ConfigOptions config_options; - std::string limiter_id = GenericRateLimiter::kClassName(); - ASSERT_OK(RateLimiter::CreateFromString(config_options, limiter_id + ":1024", - &limiter)); - ASSERT_NE(limiter, nullptr); - ASSERT_EQ(limiter->GetBytesPerSecond(), 1024U); -#ifndef ROCKSDB_LITE - ASSERT_OK(RateLimiter::CreateFromString( - config_options, "rate_bytes_per_sec=2048;id=" + limiter_id, &limiter)); - ASSERT_NE(limiter, nullptr); - ASSERT_EQ(limiter->GetBytesPerSecond(), 2048U); - ASSERT_NOK(RateLimiter::CreateFromString( - config_options, "rate_bytes_per_sec=0;id=" + limiter_id, &limiter)); - ASSERT_NOK(RateLimiter::CreateFromString( - config_options, "rate_bytes_per_sec=2048;fairness=0;id=" + limiter_id, - &limiter)); - - ASSERT_OK( - RateLimiter::CreateFromString(config_options, - "rate_bytes_per_sec=2048;refill_period_us=" - "1024;fairness=42;auto_tuned=true;" - "mode=kReadsOnly;id=" + - limiter_id, - &limiter)); - ASSERT_NE(limiter, nullptr); - auto opts = - limiter->GetOptions(); - ASSERT_NE(opts, nullptr); - ASSERT_EQ(opts->max_bytes_per_sec, 2048); - ASSERT_EQ(opts->refill_period_us, 1024); - ASSERT_EQ(opts->fairness, 42); - ASSERT_EQ(opts->auto_tuned, true); - ASSERT_TRUE(limiter->IsRateLimited(RateLimiter::OpType::kRead)); - ASSERT_FALSE(limiter->IsRateLimited(RateLimiter::OpType::kWrite)); -#endif // ROCKSDB_LITE -} - -#ifndef ROCKSDB_LITE -// This test is for a rate limiter that has no name (Name() returns ""). -// When the default Name() method is deprecated, this test should be removed. -TEST_F(RateLimiterTest, NoNameRateLimiter) { - static std::unordered_map dummy_limiter_options = - { - {"dummy", - {0, OptionType::kInt, OptionVerificationType::kNormal, - OptionTypeFlags::kNone}}, - }; - class NoNameRateLimiter : public RateLimiter { - public: - explicit NoNameRateLimiter(bool do_register) { - if (do_register) { - RegisterOptions("", &dummy, &dummy_limiter_options); - } - } - void SetBytesPerSecond(int64_t /*bytes_per_second*/) override {} - int64_t GetSingleBurstBytes() const override { return 0; } - int64_t GetTotalBytesThrough(const Env::IOPriority /*pri*/) const override { - return 0; - } - int64_t GetTotalRequests(const Env::IOPriority /*pri*/) const override { - return 0; - } - int64_t GetBytesPerSecond() const override { return 0; } - - private: - int dummy; - }; - - ConfigOptions config_options; - DBOptions db_opts, copy; - db_opts.rate_limiter.reset(new NoNameRateLimiter(false)); - ASSERT_EQ(db_opts.rate_limiter->GetId(), ""); - ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), ""); - db_opts.rate_limiter.reset(new NoNameRateLimiter(true)); - ASSERT_EQ(db_opts.rate_limiter->GetId(), ""); - ASSERT_EQ(db_opts.rate_limiter->ToString(config_options), ""); - std::string opt_str; - ASSERT_OK(GetStringFromDBOptions(config_options, db_opts, &opt_str)); - ASSERT_OK( - GetDBOptionsFromString(config_options, DBOptions(), opt_str, ©)); - ASSERT_OK( - RocksDBOptionsParser::VerifyDBOptions(config_options, db_opts, copy)); - ASSERT_EQ(copy.rate_limiter, nullptr); - ASSERT_NE(copy.rate_limiter, db_opts.rate_limiter); -} -#endif // ROCKSDB_LITE - } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index f6f79599386..754293e74d2 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -186,54 +186,13 @@ class BackupEngineImpl { const std::shared_ptr& backup_rate_limiter_clock, const std::shared_ptr& restore_rate_limiter_clock) { if (backup_rate_limiter_clock) { - assert(options_.backup_rate_limiter->IsInstanceOf( - GenericRateLimiter::kClassName())); - auto* backup_rate_limiter_options = - options_.backup_rate_limiter - ->GetOptions(); - - assert(backup_rate_limiter_options); - RateLimiter::Mode backup_rate_limiter_mode; - if (!options_.backup_rate_limiter->IsRateLimited( - RateLimiter::OpType::kRead)) { - backup_rate_limiter_mode = RateLimiter::Mode::kWritesOnly; - } else if (!options_.backup_rate_limiter->IsRateLimited( - RateLimiter::OpType::kWrite)) { - backup_rate_limiter_mode = RateLimiter::Mode::kReadsOnly; - } else { - backup_rate_limiter_mode = RateLimiter::Mode::kAllIo; - } - options_.backup_rate_limiter.reset(new GenericRateLimiter( - backup_rate_limiter_options->max_bytes_per_sec, - backup_rate_limiter_options->refill_period_us, - backup_rate_limiter_options->fairness, backup_rate_limiter_mode, - backup_rate_limiter_clock, backup_rate_limiter_options->auto_tuned)); + static_cast(options_.backup_rate_limiter.get()) + ->TEST_SetClock(backup_rate_limiter_clock); } if (restore_rate_limiter_clock) { - assert(options_.restore_rate_limiter->IsInstanceOf( - GenericRateLimiter::kClassName())); - auto* restore_rate_limiter_options = - options_.restore_rate_limiter - ->GetOptions(); - assert(restore_rate_limiter_options); - - RateLimiter::Mode restore_rate_limiter_mode; - if (!options_.restore_rate_limiter->IsRateLimited( - RateLimiter::OpType::kRead)) { - restore_rate_limiter_mode = RateLimiter::Mode::kWritesOnly; - } else if (!options_.restore_rate_limiter->IsRateLimited( - RateLimiter::OpType::kWrite)) { - restore_rate_limiter_mode = RateLimiter::Mode::kReadsOnly; - } else { - restore_rate_limiter_mode = RateLimiter::Mode::kAllIo; - } - options_.restore_rate_limiter.reset(new GenericRateLimiter( - restore_rate_limiter_options->max_bytes_per_sec, - restore_rate_limiter_options->refill_period_us, - restore_rate_limiter_options->fairness, restore_rate_limiter_mode, - restore_rate_limiter_clock, - restore_rate_limiter_options->auto_tuned)); + static_cast(options_.restore_rate_limiter.get()) + ->TEST_SetClock(restore_rate_limiter_clock); } }