From 4df4d4cf9208b83361e15e04a0d349c1965a1bc3 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Mon, 20 Apr 2020 15:44:38 +0800 Subject: [PATCH 01/49] rate limiter --- include/dsn/utility/TokenBucket.h | 507 +++++++++++++++++++++ src/core/tests/TokenBucket.h | 507 +++++++++++++++++++++ src/dist/block_service/fds/fds_service.cpp | 78 +++- src/dist/block_service/fds/fds_service.h | 9 + 4 files changed, 1096 insertions(+), 5 deletions(-) create mode 100644 include/dsn/utility/TokenBucket.h create mode 100644 src/core/tests/TokenBucket.h diff --git a/include/dsn/utility/TokenBucket.h b/include/dsn/utility/TokenBucket.h new file mode 100644 index 0000000000..5581289592 --- /dev/null +++ b/include/dsn/utility/TokenBucket.h @@ -0,0 +1,507 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace folly { + +/** + * Thread-safe (atomic) token bucket implementation. + * + * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream + * of events with an average rate and some amount of burstiness. The canonical + * example is a packet switched network: the network can accept some number of + * bytes per second and the bytes come in finite packets (bursts). A token + * bucket stores up to a fixed number of tokens (the burst size). Some number + * of tokens are removed when an event occurs. The tokens are replenished at a + * fixed rate. Failure to allocate tokens implies resource is unavailable and + * caller needs to implement its own retry mechanism. For simple cases where + * caller is okay with a FIFO starvation-free scheduling behavior, there are + * also APIs to 'borrow' from the future effectively assigning a start time to + * the caller when it should proceed with using the resource. It is also + * possible to 'return' previously allocated tokens to make them available to + * other users. Returns in excess of burstSize are considered expired and + * will not be available to later callers. + * + * This implementation records the last time it was updated. This allows the + * token bucket to add tokens "just in time" when tokens are requested. + * + * The "dynamic" base variant allows the token generation rate and maximum + * burst size to change with every token consumption. + * + * @tparam Clock Clock type, must be steady i.e. monotonic. + */ +template +class BasicDynamicTokenBucket +{ + static_assert(Clock::is_steady, "clock must be steady"); + +public: + /** + * Constructor. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * buckets are "full" after construction. + */ + explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept : zeroTime_(zeroTime) {} + + /** + * Copy constructor. + * + * Thread-safe. (Copy constructors of derived classes may not be thread-safe + * however.) + */ + BasicDynamicTokenBucket(const BasicDynamicTokenBucket &other) noexcept + : zeroTime_(other.zeroTime_.load()) + { + } + + /** + * Copy-assignment operator. + * + * Warning: not thread safe for the object being assigned to (including + * self-assignment). Thread-safe for the other object. + */ + BasicDynamicTokenBucket &operator=(const BasicDynamicTokenBucket &other) noexcept + { + zeroTime_ = other.zeroTime_.load(); + return *this; + } + + /** + * Re-initialize token bucket. + * + * Thread-safe. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is reset to "full". + */ + void reset(double zeroTime = 0) noexcept { zeroTime_ = zeroTime; } + + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept + { + auto const now = Clock::now().time_since_epoch(); + return std::chrono::duration(now).count(); + } + + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (nowInSeconds <= zeroTime_.load()) { + return 0; + } + + return consumeImpl(rate, burstSize, nowInSeconds, [toConsume](double &tokens) { + if (tokens < toConsume) { + return false; + } + tokens -= toConsume; + return true; + }); + } + + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (nowInSeconds <= zeroTime_.load()) { + return 0; + } + + double consumed; + consumeImpl(rate, burstSize, nowInSeconds, [&consumed, toConsume](double &tokens) { + if (tokens < toConsume) { + consumed = tokens; + tokens = 0.0; + } else { + consumed = toConsume; + tokens -= toConsume; + } + return true; + }); + return consumed; + } + + /** + * Return extra tokens back to the bucket. This will move the zeroTime_ + * value back based on the rate. + * + * Thread-safe. + */ + void returnTokens(double tokensToReturn, double rate) + { + assert(rate > 0); + assert(tokensToReturn > 0); + + returnTokensImpl(tokensToReturn, rate); + } + + /** + * Like consumeOrDrain but the call will always satisfy the asked for count. + * It does so by borrowing tokens from the future (zeroTime_ will move + * forward) if the currently available count isn't sufficient. + * + * Returns a boost::optional. The optional wont be set if the request + * cannot be satisfied: only case is when it is larger than burstSize. The + * value of the optional is a double indicating the time in seconds that the + * caller needs to wait at which the reservation becomes valid. The caller + * could simply sleep for the returned duration to smooth out the allocation + * to match the rate limiter or do some other computation in the meantime. In + * any case, any regular consume or consumeOrDrain calls will fail to allocate + * any tokens until the future time is reached. + * + * Note: It is assumed the caller will not ask for a very large count nor use + * it immediately (if not waiting inline) as that would break the burst + * prevention the limiter is meant to be used for. + * + * Thread-safe. + */ + boost::optional consumeWithBorrowNonBlocking(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (burstSize < toConsume) { + return boost::none; + } + + while (toConsume > 0) { + double consumed = consumeOrDrain(toConsume, rate, burstSize, nowInSeconds); + if (consumed > 0) { + toConsume -= consumed; + } else { + double zeroTimeNew = returnTokensImpl(-toConsume, rate); + double napTime = std::max(0.0, zeroTimeNew - nowInSeconds); + return napTime; + } + } + return 0; + } + + /** + * Convenience wrapper around non-blocking borrow to sleep inline until + * reservation is valid. + */ + bool consumeWithBorrowAndWait(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + auto res = consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds); + if (res.value_or(0) > 0) { + int64_t napUSec = res.value() * 1000000; + std::this_thread::sleep_for(std::chrono::microseconds(napUSec)); + } + return res.is_initialized(); + } + + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available(double rate, double burstSize, double nowInSeconds = defaultClockNow()) const + noexcept + { + assert(rate > 0); + assert(burstSize > 0); + + double zt = this->zeroTime_.load(); + if (nowInSeconds <= zt) { + return 0; + } + return std::min((nowInSeconds - zt) * rate, burstSize); + } + +private: + template + bool consumeImpl(double rate, double burstSize, double nowInSeconds, const TCallback &callback) + { + auto zeroTimeOld = zeroTime_.load(); + double zeroTimeNew; + do { + auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); + if (!callback(tokens)) { + return false; + } + zeroTimeNew = nowInSeconds - tokens / rate; + } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); + + return true; + } + + /** + * Adjust zeroTime based on rate and tokenCount and return the new value of + * zeroTime_. Note: Token count can be negative to move the zeroTime_ value + * into the future. + */ + double returnTokensImpl(double tokenCount, double rate) + { + auto zeroTimeOld = zeroTime_.load(); + double zeroTimeNew; + do { + zeroTimeNew = zeroTimeOld - tokenCount / rate; + } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); + return zeroTimeNew; + } + + std::atomic zeroTime_; +}; + +/** + * Specialization of BasicDynamicTokenBucket with a fixed token + * generation rate and a fixed maximum burst size. + */ +template +class BasicTokenBucket +{ + static_assert(Clock::is_steady, "clock must be steady"); + +private: + using Impl = BasicDynamicTokenBucket; + +public: + /** + * Construct a token bucket with a specific maximum rate and burst size. + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is "full" after construction. + */ + BasicTokenBucket(double genRate, double burstSize, double zeroTime = 0) noexcept + : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) + { + assert(rate_ > 0); + assert(burstSize_ > 0); + } + + /** + * Copy constructor. + * + * Warning: not thread safe! + */ + BasicTokenBucket(const BasicTokenBucket &other) noexcept = default; + + /** + * Copy-assignment operator. + * + * Warning: not thread safe! + */ + BasicTokenBucket &operator=(const BasicTokenBucket &other) noexcept = default; + + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) + { + return Impl::defaultClockNow(); + } + + /** + * Change rate and burst size. + * + * Warning: not thread safe! + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void reset(double genRate, double burstSize, double nowInSeconds = defaultClockNow()) noexcept + { + assert(genRate > 0); + assert(burstSize > 0); + const double availTokens = available(nowInSeconds); + rate_ = genRate; + burstSize_ = burstSize; + setCapacity(availTokens, nowInSeconds); + } + + /** + * Change number of tokens in bucket. + * + * Warning: not thread safe! + * + * @param tokens Desired number of tokens in bucket after the call. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void setCapacity(double tokens, double nowInSeconds) noexcept + { + tokenBucket_.reset(nowInSeconds - tokens / rate_); + } + + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeOrDrain(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Returns extra token back to the bucket. + */ + void returnTokens(double tokensToReturn) + { + return tokenBucket_.returnTokens(tokensToReturn, rate_); + } + + /** + * Reserve tokens and return time to wait for in order for the reservation to + * be compatible with the bucket configuration. + */ + boost::optional consumeWithBorrowNonBlocking(double toConsume, + double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeWithBorrowNonBlocking( + toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Reserve tokens. Blocks if need be until reservation is satisfied. + */ + bool consumeWithBorrowAndWait(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeWithBorrowAndWait(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available(double nowInSeconds = defaultClockNow()) const + { + return tokenBucket_.available(rate_, burstSize_, nowInSeconds); + } + + /** + * Returns the number of tokens generated per second. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double rate() const noexcept { return rate_; } + + /** + * Returns the maximum burst size. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double burst() const noexcept { return burstSize_; } + +private: + Impl tokenBucket_; + double rate_; + double burstSize_; +}; + +using TokenBucket = BasicTokenBucket<>; +using DynamicTokenBucket = BasicDynamicTokenBucket<>; + +} // namespace folly diff --git a/src/core/tests/TokenBucket.h b/src/core/tests/TokenBucket.h new file mode 100644 index 0000000000..5581289592 --- /dev/null +++ b/src/core/tests/TokenBucket.h @@ -0,0 +1,507 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace folly { + +/** + * Thread-safe (atomic) token bucket implementation. + * + * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream + * of events with an average rate and some amount of burstiness. The canonical + * example is a packet switched network: the network can accept some number of + * bytes per second and the bytes come in finite packets (bursts). A token + * bucket stores up to a fixed number of tokens (the burst size). Some number + * of tokens are removed when an event occurs. The tokens are replenished at a + * fixed rate. Failure to allocate tokens implies resource is unavailable and + * caller needs to implement its own retry mechanism. For simple cases where + * caller is okay with a FIFO starvation-free scheduling behavior, there are + * also APIs to 'borrow' from the future effectively assigning a start time to + * the caller when it should proceed with using the resource. It is also + * possible to 'return' previously allocated tokens to make them available to + * other users. Returns in excess of burstSize are considered expired and + * will not be available to later callers. + * + * This implementation records the last time it was updated. This allows the + * token bucket to add tokens "just in time" when tokens are requested. + * + * The "dynamic" base variant allows the token generation rate and maximum + * burst size to change with every token consumption. + * + * @tparam Clock Clock type, must be steady i.e. monotonic. + */ +template +class BasicDynamicTokenBucket +{ + static_assert(Clock::is_steady, "clock must be steady"); + +public: + /** + * Constructor. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * buckets are "full" after construction. + */ + explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept : zeroTime_(zeroTime) {} + + /** + * Copy constructor. + * + * Thread-safe. (Copy constructors of derived classes may not be thread-safe + * however.) + */ + BasicDynamicTokenBucket(const BasicDynamicTokenBucket &other) noexcept + : zeroTime_(other.zeroTime_.load()) + { + } + + /** + * Copy-assignment operator. + * + * Warning: not thread safe for the object being assigned to (including + * self-assignment). Thread-safe for the other object. + */ + BasicDynamicTokenBucket &operator=(const BasicDynamicTokenBucket &other) noexcept + { + zeroTime_ = other.zeroTime_.load(); + return *this; + } + + /** + * Re-initialize token bucket. + * + * Thread-safe. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is reset to "full". + */ + void reset(double zeroTime = 0) noexcept { zeroTime_ = zeroTime; } + + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept + { + auto const now = Clock::now().time_since_epoch(); + return std::chrono::duration(now).count(); + } + + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (nowInSeconds <= zeroTime_.load()) { + return 0; + } + + return consumeImpl(rate, burstSize, nowInSeconds, [toConsume](double &tokens) { + if (tokens < toConsume) { + return false; + } + tokens -= toConsume; + return true; + }); + } + + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (nowInSeconds <= zeroTime_.load()) { + return 0; + } + + double consumed; + consumeImpl(rate, burstSize, nowInSeconds, [&consumed, toConsume](double &tokens) { + if (tokens < toConsume) { + consumed = tokens; + tokens = 0.0; + } else { + consumed = toConsume; + tokens -= toConsume; + } + return true; + }); + return consumed; + } + + /** + * Return extra tokens back to the bucket. This will move the zeroTime_ + * value back based on the rate. + * + * Thread-safe. + */ + void returnTokens(double tokensToReturn, double rate) + { + assert(rate > 0); + assert(tokensToReturn > 0); + + returnTokensImpl(tokensToReturn, rate); + } + + /** + * Like consumeOrDrain but the call will always satisfy the asked for count. + * It does so by borrowing tokens from the future (zeroTime_ will move + * forward) if the currently available count isn't sufficient. + * + * Returns a boost::optional. The optional wont be set if the request + * cannot be satisfied: only case is when it is larger than burstSize. The + * value of the optional is a double indicating the time in seconds that the + * caller needs to wait at which the reservation becomes valid. The caller + * could simply sleep for the returned duration to smooth out the allocation + * to match the rate limiter or do some other computation in the meantime. In + * any case, any regular consume or consumeOrDrain calls will fail to allocate + * any tokens until the future time is reached. + * + * Note: It is assumed the caller will not ask for a very large count nor use + * it immediately (if not waiting inline) as that would break the burst + * prevention the limiter is meant to be used for. + * + * Thread-safe. + */ + boost::optional consumeWithBorrowNonBlocking(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (burstSize < toConsume) { + return boost::none; + } + + while (toConsume > 0) { + double consumed = consumeOrDrain(toConsume, rate, burstSize, nowInSeconds); + if (consumed > 0) { + toConsume -= consumed; + } else { + double zeroTimeNew = returnTokensImpl(-toConsume, rate); + double napTime = std::max(0.0, zeroTimeNew - nowInSeconds); + return napTime; + } + } + return 0; + } + + /** + * Convenience wrapper around non-blocking borrow to sleep inline until + * reservation is valid. + */ + bool consumeWithBorrowAndWait(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + auto res = consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds); + if (res.value_or(0) > 0) { + int64_t napUSec = res.value() * 1000000; + std::this_thread::sleep_for(std::chrono::microseconds(napUSec)); + } + return res.is_initialized(); + } + + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available(double rate, double burstSize, double nowInSeconds = defaultClockNow()) const + noexcept + { + assert(rate > 0); + assert(burstSize > 0); + + double zt = this->zeroTime_.load(); + if (nowInSeconds <= zt) { + return 0; + } + return std::min((nowInSeconds - zt) * rate, burstSize); + } + +private: + template + bool consumeImpl(double rate, double burstSize, double nowInSeconds, const TCallback &callback) + { + auto zeroTimeOld = zeroTime_.load(); + double zeroTimeNew; + do { + auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); + if (!callback(tokens)) { + return false; + } + zeroTimeNew = nowInSeconds - tokens / rate; + } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); + + return true; + } + + /** + * Adjust zeroTime based on rate and tokenCount and return the new value of + * zeroTime_. Note: Token count can be negative to move the zeroTime_ value + * into the future. + */ + double returnTokensImpl(double tokenCount, double rate) + { + auto zeroTimeOld = zeroTime_.load(); + double zeroTimeNew; + do { + zeroTimeNew = zeroTimeOld - tokenCount / rate; + } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); + return zeroTimeNew; + } + + std::atomic zeroTime_; +}; + +/** + * Specialization of BasicDynamicTokenBucket with a fixed token + * generation rate and a fixed maximum burst size. + */ +template +class BasicTokenBucket +{ + static_assert(Clock::is_steady, "clock must be steady"); + +private: + using Impl = BasicDynamicTokenBucket; + +public: + /** + * Construct a token bucket with a specific maximum rate and burst size. + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is "full" after construction. + */ + BasicTokenBucket(double genRate, double burstSize, double zeroTime = 0) noexcept + : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) + { + assert(rate_ > 0); + assert(burstSize_ > 0); + } + + /** + * Copy constructor. + * + * Warning: not thread safe! + */ + BasicTokenBucket(const BasicTokenBucket &other) noexcept = default; + + /** + * Copy-assignment operator. + * + * Warning: not thread safe! + */ + BasicTokenBucket &operator=(const BasicTokenBucket &other) noexcept = default; + + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) + { + return Impl::defaultClockNow(); + } + + /** + * Change rate and burst size. + * + * Warning: not thread safe! + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void reset(double genRate, double burstSize, double nowInSeconds = defaultClockNow()) noexcept + { + assert(genRate > 0); + assert(burstSize > 0); + const double availTokens = available(nowInSeconds); + rate_ = genRate; + burstSize_ = burstSize; + setCapacity(availTokens, nowInSeconds); + } + + /** + * Change number of tokens in bucket. + * + * Warning: not thread safe! + * + * @param tokens Desired number of tokens in bucket after the call. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void setCapacity(double tokens, double nowInSeconds) noexcept + { + tokenBucket_.reset(nowInSeconds - tokens / rate_); + } + + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeOrDrain(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Returns extra token back to the bucket. + */ + void returnTokens(double tokensToReturn) + { + return tokenBucket_.returnTokens(tokensToReturn, rate_); + } + + /** + * Reserve tokens and return time to wait for in order for the reservation to + * be compatible with the bucket configuration. + */ + boost::optional consumeWithBorrowNonBlocking(double toConsume, + double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeWithBorrowNonBlocking( + toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Reserve tokens. Blocks if need be until reservation is satisfied. + */ + bool consumeWithBorrowAndWait(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeWithBorrowAndWait(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available(double nowInSeconds = defaultClockNow()) const + { + return tokenBucket_.available(rate_, burstSize_, nowInSeconds); + } + + /** + * Returns the number of tokens generated per second. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double rate() const noexcept { return rate_; } + + /** + * Returns the maximum burst size. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double burst() const noexcept { return burstSize_; } + +private: + Impl tokenBucket_; + double rate_; + double burstSize_; +}; + +using TokenBucket = BasicTokenBucket<>; +using DynamicTokenBucket = BasicDynamicTokenBucket<>; + +} // namespace folly diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 7aa9f1ef24..2cf513c11b 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -17,6 +17,7 @@ #include #include #include +#include namespace dsn { namespace dist { @@ -98,7 +99,13 @@ const std::string fds_service::FILE_LENGTH_CUSTOM_KEY = "x-xiaomi-meta-content-l const std::string fds_service::FILE_LENGTH_KEY = "content-length"; const std::string fds_service::FILE_MD5_KEY = "content-md5"; -fds_service::fds_service() {} +fds_service::fds_service() +{ + uint32_t limit_rate = (uint32_t)dsn_config_get_value_uint64( + "replication", "fds_limit_rate", 0, "rate limit of fds(MB)"); + _token_bucket.reset(new folly::TokenBucket(limit_rate * 1e6, limit_rate * 5)); +} + fds_service::~fds_service() {} /** @@ -484,6 +491,33 @@ fds_file_object::fds_file_object(fds_service *s, fds_file_object::~fds_file_object() {} +error_code fds_file_object::get_content_with_throttling(uint64_t start, + int64_t length, + std::ostream &os, + uint64_t &transfered_bytes) +{ + const uint64_t BATCH_MAX = 1e6; // 1MB(8Mb) + uint64_t once_transfered_bytes = 0; + uint64_t pos = start; + while (pos < start + length) { + int64_t batch = std::min(BATCH_MAX, start + length - pos); + + // get tokens from token bucket + if (_service->_token_bucket != nullptr) { + _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); + } + + error_code err = get_content(pos, batch, os, once_transfered_bytes); + transfered_bytes += once_transfered_bytes; + if (err != ERR_OK || once_transfered_bytes < length) { + return err; + } + pos += batch; + } + + return ERR_OK; +} + dsn::error_code fds_file_object::get_content(uint64_t pos, int64_t length, /*out*/ std::ostream &os, @@ -554,6 +588,38 @@ dsn::error_code fds_file_object::get_content(uint64_t pos, return err; } +error_code fds_file_object::put_content_with_throttling(std::istream &is, + uint64_t &transfered_bytes) +{ + const uint64_t BATCH_MAX = 1e6; // 1MB(8Mb) + uint64_t once_transfered_bytes = 0; + uint64_t pos = 0; + uint64_t length = is.gcount(); + char *buffer = new char[BATCH_MAX]; + auto cleanup = defer([buffer]() { delete[] buffer; }); + + while (pos < length) { + int64_t batch = std::min(BATCH_MAX, length - pos); + + // get tokens from token bucket + if (_service->_token_bucket != nullptr) { + _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); + } + + is.readsome(buffer, batch); + std::istringstream part_is(std::string(buffer, batch)); + + error_code err = put_content(part_is, once_transfered_bytes); + transfered_bytes += once_transfered_bytes; + if (err != ERR_OK || once_transfered_bytes < length) { + return err; + } + pos += batch; + } + + return ERR_OK; +} + dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is, uint64_t &transfered_bytes) { @@ -621,7 +687,7 @@ dsn::task_ptr fds_file_object::write(const write_request &req, write_response resp; std::istringstream is; is.str(std::string(req.buffer.data(), req.buffer.length())); - resp.err = put_content(is, resp.written_size); + resp.err = put_content_with_throttling(is, resp.written_size); t->enqueue_with(resp); release_ref(); @@ -658,7 +724,7 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req, ptr); resp.err = dsn::ERR_FILE_OPERATION_FAILED; } else { - resp.err = put_content(is, resp.uploaded_size); + resp.err = put_content_with_throttling(is, resp.uploaded_size); is.close(); } @@ -691,7 +757,8 @@ dsn::task_ptr fds_file_object::read(const read_request &req, read_response resp; std::ostringstream os; uint64_t transferd_size; - resp.err = get_content(req.remote_pos, req.remote_length, os, transferd_size); + resp.err = + get_content_with_throttling(req.remote_pos, req.remote_length, os, transferd_size); if (os.tellp() > 0) { std::string *output = new std::string(); *output = os.str(); @@ -743,7 +810,8 @@ dsn::task_ptr fds_file_object::download(const download_request &req, auto download_background = [this, req, handle, t]() { download_response resp; uint64_t transfered_size; - resp.err = get_content(req.remote_pos, req.remote_length, *handle, transfered_size); + resp.err = get_content_with_throttling( + req.remote_pos, req.remote_length, *handle, transfered_size); resp.downloaded_size = 0; if (handle->tellp() != -1) resp.downloaded_size = handle->tellp(); diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index ea4773f9ff..bf49d2ed8f 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -2,6 +2,7 @@ #define FDS_SERVICE_H #include +#include namespace galaxy { namespace fds { @@ -64,6 +65,9 @@ class fds_service : public block_filesystem private: std::shared_ptr _client; std::string _bucket_name; + std::unique_ptr _token_bucket; + + friend class fds_file_object; }; class fds_file_object : public block_file @@ -101,10 +105,15 @@ class fds_file_object : public block_file dsn::task_tracker *tracker) override; private: + error_code get_content_with_throttling(uint64_t start, + int64_t length, + std::ostream &os, + uint64_t &transfered_bytes); dsn::error_code get_content(uint64_t pos, int64_t length, /*out*/ std::ostream &os, /*out*/ uint64_t &transfered_bytes); + error_code put_content_with_throttling(std::istream &is, uint64_t &transfered_bytes); dsn::error_code put_content(/*in-out*/ std::istream &is, /*out*/ uint64_t &transfered_bytes); fds_service *_service; std::string _fds_path; From cfa5a50081600459da241fa7db29b9f01875281c Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Mon, 20 Apr 2020 17:31:12 +0800 Subject: [PATCH 02/49] fix --- src/dist/block_service/fds/fds_service.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 2cf513c11b..0bd10ed823 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -102,7 +102,7 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { uint32_t limit_rate = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_limit_rate", 0, "rate limit of fds(MB)"); + "replication", "fds_limit_rate", 20, "rate limit of fds(Mb)"); _token_bucket.reset(new folly::TokenBucket(limit_rate * 1e6, limit_rate * 5)); } @@ -608,7 +608,6 @@ error_code fds_file_object::put_content_with_throttling(std::istream &is, is.readsome(buffer, batch); std::istringstream part_is(std::string(buffer, batch)); - error_code err = put_content(part_is, once_transfered_bytes); transfered_bytes += once_transfered_bytes; if (err != ERR_OK || once_transfered_bytes < length) { From 18be2e3f41dfab5305a9dd753cbbf17d19879505 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 21 Apr 2020 10:01:56 +0800 Subject: [PATCH 03/49] fix --- src/dist/block_service/fds/fds_service.cpp | 32 ++++++++-------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 0bd10ed823..560f0abab1 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -498,20 +498,18 @@ error_code fds_file_object::get_content_with_throttling(uint64_t start, { const uint64_t BATCH_MAX = 1e6; // 1MB(8Mb) uint64_t once_transfered_bytes = 0; + transfered_bytes = 0; uint64_t pos = start; while (pos < start + length) { int64_t batch = std::min(BATCH_MAX, start + length - pos); - // get tokens from token bucket - if (_service->_token_bucket != nullptr) { - _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); - } + _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); error_code err = get_content(pos, batch, os, once_transfered_bytes); - transfered_bytes += once_transfered_bytes; - if (err != ERR_OK || once_transfered_bytes < length) { + if (err != ERR_OK) { return err; } + transfered_bytes += once_transfered_bytes; pos += batch; } @@ -593,27 +591,21 @@ error_code fds_file_object::put_content_with_throttling(std::istream &is, { const uint64_t BATCH_MAX = 1e6; // 1MB(8Mb) uint64_t once_transfered_bytes = 0; - uint64_t pos = 0; - uint64_t length = is.gcount(); + transfered_bytes = 0; char *buffer = new char[BATCH_MAX]; auto cleanup = defer([buffer]() { delete[] buffer; }); - while (pos < length) { - int64_t batch = std::min(BATCH_MAX, length - pos); - + while (!is.eof()) { + int batch = is.readsome(buffer, BATCH_MAX); // get tokens from token bucket - if (_service->_token_bucket != nullptr) { - _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); - } + _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); - is.readsome(buffer, batch); - std::istringstream part_is(std::string(buffer, batch)); - error_code err = put_content(part_is, once_transfered_bytes); - transfered_bytes += once_transfered_bytes; - if (err != ERR_OK || once_transfered_bytes < length) { + std::istringstream batch_is(std::string(buffer, batch)); + error_code err = put_content(batch_is, once_transfered_bytes); + if (err != ERR_OK) { return err; } - pos += batch; + transfered_bytes += once_transfered_bytes; } return ERR_OK; From 96cfabc7398f4916664312d114ad76f0ce1d0914 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 21 Apr 2020 11:12:40 +0800 Subject: [PATCH 04/49] test --- src/dist/block_service/fds/fds_service.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 560f0abab1..20bc958c99 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -506,10 +506,10 @@ error_code fds_file_object::get_content_with_throttling(uint64_t start, _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); error_code err = get_content(pos, batch, os, once_transfered_bytes); - if (err != ERR_OK) { + transfered_bytes += once_transfered_bytes; + if (err != ERR_OK || once_transfered_bytes < batch) { return err; } - transfered_bytes += once_transfered_bytes; pos += batch; } @@ -597,15 +597,18 @@ error_code fds_file_object::put_content_with_throttling(std::istream &is, while (!is.eof()) { int batch = is.readsome(buffer, BATCH_MAX); + if (0 == batch) { + break; + } // get tokens from token bucket _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); std::istringstream batch_is(std::string(buffer, batch)); error_code err = put_content(batch_is, once_transfered_bytes); - if (err != ERR_OK) { + transfered_bytes += once_transfered_bytes; + if (err != ERR_OK || once_transfered_bytes < batch) { return err; } - transfered_bytes += once_transfered_bytes; } return ERR_OK; From ec56f9cb9f28d5d585d21df4ba8af809a4a0f248 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 21 Apr 2020 11:23:02 +0800 Subject: [PATCH 05/49] test --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 20bc958c99..b97aea4365 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -595,7 +595,7 @@ error_code fds_file_object::put_content_with_throttling(std::istream &is, char *buffer = new char[BATCH_MAX]; auto cleanup = defer([buffer]() { delete[] buffer; }); - while (!is.eof()) { + while (is.good() && !is.eof()) { int batch = is.readsome(buffer, BATCH_MAX); if (0 == batch) { break; From 45507cf6c2dffc4ab926581bd30bbd927b876ba4 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 21 Apr 2020 11:28:41 +0800 Subject: [PATCH 06/49] test --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index b97aea4365..33cb78d8d3 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,7 +103,7 @@ fds_service::fds_service() { uint32_t limit_rate = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_limit_rate", 20, "rate limit of fds(Mb)"); - _token_bucket.reset(new folly::TokenBucket(limit_rate * 1e6, limit_rate * 5)); + _token_bucket.reset(new folly::TokenBucket(limit_rate * 1e6, 5 * limit_rate * 1e6)); } fds_service::~fds_service() {} From f91890f6d795140df6709122401ab44aaf08e47f Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 21 Apr 2020 12:07:19 +0800 Subject: [PATCH 07/49] fix --- src/core/tests/TokenBucket.h | 507 --------------------- src/core/tests/TokenBucketTest.cpp | 131 ++++++ src/core/tests/TokenBucketTest.h | 32 ++ src/dist/block_service/fds/fds_service.cpp | 2 +- 4 files changed, 164 insertions(+), 508 deletions(-) delete mode 100644 src/core/tests/TokenBucket.h create mode 100644 src/core/tests/TokenBucketTest.cpp create mode 100644 src/core/tests/TokenBucketTest.h diff --git a/src/core/tests/TokenBucket.h b/src/core/tests/TokenBucket.h deleted file mode 100644 index 5581289592..0000000000 --- a/src/core/tests/TokenBucket.h +++ /dev/null @@ -1,507 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. - -#pragma once - -#include -#include -#include -#include -#include -#include - -namespace folly { - -/** - * Thread-safe (atomic) token bucket implementation. - * - * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream - * of events with an average rate and some amount of burstiness. The canonical - * example is a packet switched network: the network can accept some number of - * bytes per second and the bytes come in finite packets (bursts). A token - * bucket stores up to a fixed number of tokens (the burst size). Some number - * of tokens are removed when an event occurs. The tokens are replenished at a - * fixed rate. Failure to allocate tokens implies resource is unavailable and - * caller needs to implement its own retry mechanism. For simple cases where - * caller is okay with a FIFO starvation-free scheduling behavior, there are - * also APIs to 'borrow' from the future effectively assigning a start time to - * the caller when it should proceed with using the resource. It is also - * possible to 'return' previously allocated tokens to make them available to - * other users. Returns in excess of burstSize are considered expired and - * will not be available to later callers. - * - * This implementation records the last time it was updated. This allows the - * token bucket to add tokens "just in time" when tokens are requested. - * - * The "dynamic" base variant allows the token generation rate and maximum - * burst size to change with every token consumption. - * - * @tparam Clock Clock type, must be steady i.e. monotonic. - */ -template -class BasicDynamicTokenBucket -{ - static_assert(Clock::is_steady, "clock must be steady"); - -public: - /** - * Constructor. - * - * @param zeroTime Initial time at which to consider the token bucket - * starting to fill. Defaults to 0, so by default token - * buckets are "full" after construction. - */ - explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept : zeroTime_(zeroTime) {} - - /** - * Copy constructor. - * - * Thread-safe. (Copy constructors of derived classes may not be thread-safe - * however.) - */ - BasicDynamicTokenBucket(const BasicDynamicTokenBucket &other) noexcept - : zeroTime_(other.zeroTime_.load()) - { - } - - /** - * Copy-assignment operator. - * - * Warning: not thread safe for the object being assigned to (including - * self-assignment). Thread-safe for the other object. - */ - BasicDynamicTokenBucket &operator=(const BasicDynamicTokenBucket &other) noexcept - { - zeroTime_ = other.zeroTime_.load(); - return *this; - } - - /** - * Re-initialize token bucket. - * - * Thread-safe. - * - * @param zeroTime Initial time at which to consider the token bucket - * starting to fill. Defaults to 0, so by default token - * bucket is reset to "full". - */ - void reset(double zeroTime = 0) noexcept { zeroTime_ = zeroTime; } - - /** - * Returns the current time in seconds since Epoch. - */ - static double defaultClockNow() noexcept - { - auto const now = Clock::now().time_since_epoch(); - return std::chrono::duration(now).count(); - } - - /** - * Attempts to consume some number of tokens. Tokens are first added to the - * bucket based on the time elapsed since the last attempt to consume tokens. - * Note: Attempts to consume more tokens than the burst size will always - * fail. - * - * Thread-safe. - * - * @param toConsume The number of tokens to consume. - * @param rate Number of tokens to generate per second. - * @param burstSize Maximum burst size. Must be greater than 0. - * @param nowInSeconds Current time in seconds. Should be monotonically - * increasing from the nowInSeconds specified in - * this token bucket's constructor. - * @return True if the rate limit check passed, false otherwise. - */ - bool consume(double toConsume, - double rate, - double burstSize, - double nowInSeconds = defaultClockNow()) - { - assert(rate > 0); - assert(burstSize > 0); - - if (nowInSeconds <= zeroTime_.load()) { - return 0; - } - - return consumeImpl(rate, burstSize, nowInSeconds, [toConsume](double &tokens) { - if (tokens < toConsume) { - return false; - } - tokens -= toConsume; - return true; - }); - } - - /** - * Similar to consume, but always consumes some number of tokens. If the - * bucket contains enough tokens - consumes toConsume tokens. Otherwise the - * bucket is drained. - * - * Thread-safe. - * - * @param toConsume The number of tokens to consume. - * @param rate Number of tokens to generate per second. - * @param burstSize Maximum burst size. Must be greater than 0. - * @param nowInSeconds Current time in seconds. Should be monotonically - * increasing from the nowInSeconds specified in - * this token bucket's constructor. - * @return number of tokens that were consumed. - */ - double consumeOrDrain(double toConsume, - double rate, - double burstSize, - double nowInSeconds = defaultClockNow()) - { - assert(rate > 0); - assert(burstSize > 0); - - if (nowInSeconds <= zeroTime_.load()) { - return 0; - } - - double consumed; - consumeImpl(rate, burstSize, nowInSeconds, [&consumed, toConsume](double &tokens) { - if (tokens < toConsume) { - consumed = tokens; - tokens = 0.0; - } else { - consumed = toConsume; - tokens -= toConsume; - } - return true; - }); - return consumed; - } - - /** - * Return extra tokens back to the bucket. This will move the zeroTime_ - * value back based on the rate. - * - * Thread-safe. - */ - void returnTokens(double tokensToReturn, double rate) - { - assert(rate > 0); - assert(tokensToReturn > 0); - - returnTokensImpl(tokensToReturn, rate); - } - - /** - * Like consumeOrDrain but the call will always satisfy the asked for count. - * It does so by borrowing tokens from the future (zeroTime_ will move - * forward) if the currently available count isn't sufficient. - * - * Returns a boost::optional. The optional wont be set if the request - * cannot be satisfied: only case is when it is larger than burstSize. The - * value of the optional is a double indicating the time in seconds that the - * caller needs to wait at which the reservation becomes valid. The caller - * could simply sleep for the returned duration to smooth out the allocation - * to match the rate limiter or do some other computation in the meantime. In - * any case, any regular consume or consumeOrDrain calls will fail to allocate - * any tokens until the future time is reached. - * - * Note: It is assumed the caller will not ask for a very large count nor use - * it immediately (if not waiting inline) as that would break the burst - * prevention the limiter is meant to be used for. - * - * Thread-safe. - */ - boost::optional consumeWithBorrowNonBlocking(double toConsume, - double rate, - double burstSize, - double nowInSeconds = defaultClockNow()) - { - assert(rate > 0); - assert(burstSize > 0); - - if (burstSize < toConsume) { - return boost::none; - } - - while (toConsume > 0) { - double consumed = consumeOrDrain(toConsume, rate, burstSize, nowInSeconds); - if (consumed > 0) { - toConsume -= consumed; - } else { - double zeroTimeNew = returnTokensImpl(-toConsume, rate); - double napTime = std::max(0.0, zeroTimeNew - nowInSeconds); - return napTime; - } - } - return 0; - } - - /** - * Convenience wrapper around non-blocking borrow to sleep inline until - * reservation is valid. - */ - bool consumeWithBorrowAndWait(double toConsume, - double rate, - double burstSize, - double nowInSeconds = defaultClockNow()) - { - auto res = consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds); - if (res.value_or(0) > 0) { - int64_t napUSec = res.value() * 1000000; - std::this_thread::sleep_for(std::chrono::microseconds(napUSec)); - } - return res.is_initialized(); - } - - /** - * Returns the number of tokens currently available. - * - * Thread-safe (but returned value may immediately be outdated). - */ - double available(double rate, double burstSize, double nowInSeconds = defaultClockNow()) const - noexcept - { - assert(rate > 0); - assert(burstSize > 0); - - double zt = this->zeroTime_.load(); - if (nowInSeconds <= zt) { - return 0; - } - return std::min((nowInSeconds - zt) * rate, burstSize); - } - -private: - template - bool consumeImpl(double rate, double burstSize, double nowInSeconds, const TCallback &callback) - { - auto zeroTimeOld = zeroTime_.load(); - double zeroTimeNew; - do { - auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); - if (!callback(tokens)) { - return false; - } - zeroTimeNew = nowInSeconds - tokens / rate; - } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); - - return true; - } - - /** - * Adjust zeroTime based on rate and tokenCount and return the new value of - * zeroTime_. Note: Token count can be negative to move the zeroTime_ value - * into the future. - */ - double returnTokensImpl(double tokenCount, double rate) - { - auto zeroTimeOld = zeroTime_.load(); - double zeroTimeNew; - do { - zeroTimeNew = zeroTimeOld - tokenCount / rate; - } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); - return zeroTimeNew; - } - - std::atomic zeroTime_; -}; - -/** - * Specialization of BasicDynamicTokenBucket with a fixed token - * generation rate and a fixed maximum burst size. - */ -template -class BasicTokenBucket -{ - static_assert(Clock::is_steady, "clock must be steady"); - -private: - using Impl = BasicDynamicTokenBucket; - -public: - /** - * Construct a token bucket with a specific maximum rate and burst size. - * - * @param genRate Number of tokens to generate per second. - * @param burstSize Maximum burst size. Must be greater than 0. - * @param zeroTime Initial time at which to consider the token bucket - * starting to fill. Defaults to 0, so by default token - * bucket is "full" after construction. - */ - BasicTokenBucket(double genRate, double burstSize, double zeroTime = 0) noexcept - : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) - { - assert(rate_ > 0); - assert(burstSize_ > 0); - } - - /** - * Copy constructor. - * - * Warning: not thread safe! - */ - BasicTokenBucket(const BasicTokenBucket &other) noexcept = default; - - /** - * Copy-assignment operator. - * - * Warning: not thread safe! - */ - BasicTokenBucket &operator=(const BasicTokenBucket &other) noexcept = default; - - /** - * Returns the current time in seconds since Epoch. - */ - static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) - { - return Impl::defaultClockNow(); - } - - /** - * Change rate and burst size. - * - * Warning: not thread safe! - * - * @param genRate Number of tokens to generate per second. - * @param burstSize Maximum burst size. Must be greater than 0. - * @param nowInSeconds Current time in seconds. Should be monotonically - * increasing from the nowInSeconds specified in - * this token bucket's constructor. - */ - void reset(double genRate, double burstSize, double nowInSeconds = defaultClockNow()) noexcept - { - assert(genRate > 0); - assert(burstSize > 0); - const double availTokens = available(nowInSeconds); - rate_ = genRate; - burstSize_ = burstSize; - setCapacity(availTokens, nowInSeconds); - } - - /** - * Change number of tokens in bucket. - * - * Warning: not thread safe! - * - * @param tokens Desired number of tokens in bucket after the call. - * @param nowInSeconds Current time in seconds. Should be monotonically - * increasing from the nowInSeconds specified in - * this token bucket's constructor. - */ - void setCapacity(double tokens, double nowInSeconds) noexcept - { - tokenBucket_.reset(nowInSeconds - tokens / rate_); - } - - /** - * Attempts to consume some number of tokens. Tokens are first added to the - * bucket based on the time elapsed since the last attempt to consume tokens. - * Note: Attempts to consume more tokens than the burst size will always - * fail. - * - * Thread-safe. - * - * @param toConsume The number of tokens to consume. - * @param nowInSeconds Current time in seconds. Should be monotonically - * increasing from the nowInSeconds specified in - * this token bucket's constructor. - * @return True if the rate limit check passed, false otherwise. - */ - bool consume(double toConsume, double nowInSeconds = defaultClockNow()) - { - return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); - } - - /** - * Similar to consume, but always consumes some number of tokens. If the - * bucket contains enough tokens - consumes toConsume tokens. Otherwise the - * bucket is drained. - * - * Thread-safe. - * - * @param toConsume The number of tokens to consume. - * @param nowInSeconds Current time in seconds. Should be monotonically - * increasing from the nowInSeconds specified in - * this token bucket's constructor. - * @return number of tokens that were consumed. - */ - double consumeOrDrain(double toConsume, double nowInSeconds = defaultClockNow()) - { - return tokenBucket_.consumeOrDrain(toConsume, rate_, burstSize_, nowInSeconds); - } - - /** - * Returns extra token back to the bucket. - */ - void returnTokens(double tokensToReturn) - { - return tokenBucket_.returnTokens(tokensToReturn, rate_); - } - - /** - * Reserve tokens and return time to wait for in order for the reservation to - * be compatible with the bucket configuration. - */ - boost::optional consumeWithBorrowNonBlocking(double toConsume, - double nowInSeconds = defaultClockNow()) - { - return tokenBucket_.consumeWithBorrowNonBlocking( - toConsume, rate_, burstSize_, nowInSeconds); - } - - /** - * Reserve tokens. Blocks if need be until reservation is satisfied. - */ - bool consumeWithBorrowAndWait(double toConsume, double nowInSeconds = defaultClockNow()) - { - return tokenBucket_.consumeWithBorrowAndWait(toConsume, rate_, burstSize_, nowInSeconds); - } - - /** - * Returns the number of tokens currently available. - * - * Thread-safe (but returned value may immediately be outdated). - */ - double available(double nowInSeconds = defaultClockNow()) const - { - return tokenBucket_.available(rate_, burstSize_, nowInSeconds); - } - - /** - * Returns the number of tokens generated per second. - * - * Thread-safe (but returned value may immediately be outdated). - */ - double rate() const noexcept { return rate_; } - - /** - * Returns the maximum burst size. - * - * Thread-safe (but returned value may immediately be outdated). - */ - double burst() const noexcept { return burstSize_; } - -private: - Impl tokenBucket_; - double rate_; - double burstSize_; -}; - -using TokenBucket = BasicTokenBucket<>; -using DynamicTokenBucket = BasicDynamicTokenBucket<>; - -} // namespace folly diff --git a/src/core/tests/TokenBucketTest.cpp b/src/core/tests/TokenBucketTest.cpp new file mode 100644 index 0000000000..81434b9f3d --- /dev/null +++ b/src/core/tests/TokenBucketTest.cpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "TokenBucketTest.h" +#include +#include + +using namespace folly; + +TEST(TokenBucket, ReverseTime) +{ + const double rate = 1000; + TokenBucket tokenBucket(rate, rate * 0.01 + 1e-6, 0); + size_t count = 0; + while (tokenBucket.consume(1, 0.1)) { + count += 1; + } + EXPECT_EQ(10, count); + // Going backwards in time has no affect on the toke count (this protects + // against different threads providing out of order timestamps). + double tokensBefore = tokenBucket.available(); + EXPECT_FALSE(tokenBucket.consume(1, 0.09999999)); + EXPECT_EQ(tokensBefore, tokenBucket.available()); +} + +TEST_P(TokenBucketTest, sanity) +{ + std::pair params = GetParam(); + double rate = params.first; + double consumeSize = params.second; + + const double tenMillisecondBurst = rate * 0.010; + // Select a burst size of 10 milliseconds at the max rate or the consume size + // if 10 ms at rate is too small. + const double burstSize = std::max(consumeSize, tenMillisecondBurst); + TokenBucket tokenBucket(rate, burstSize, 0); + double tokenCounter = 0; + double currentTime = 0; + // Simulate time advancing 10 seconds + for (; currentTime <= 10.0; currentTime += 0.001) { + EXPECT_FALSE(tokenBucket.consume(burstSize + 1, currentTime)); + while (tokenBucket.consume(consumeSize, currentTime)) { + tokenCounter += consumeSize; + } + // Tokens consumed should exceed some lower bound based on rate. + // Note: The token bucket implementation is not precise, so the lower bound + // is somewhat fudged. The upper bound is accurate however. + EXPECT_LE(rate * currentTime * 0.9 - 1, tokenCounter); + // Tokens consumed should not exceed some upper bound based on rate. + EXPECT_GE(rate * currentTime + 1e-6, tokenCounter); + } +} + +static std::vector> rateToConsumeSize = { + {100, 1}, {1000, 1}, {10000, 1}, {10000, 5}, +}; + +INSTANTIATE_TEST_CASE_P(TokenBucket, TokenBucketTest, ::testing::ValuesIn(rateToConsumeSize)); + +TEST(TokenBucket, drainOnFail) +{ + DynamicTokenBucket tokenBucket; + + // Almost empty the bucket + EXPECT_TRUE(tokenBucket.consume(9, 10, 10, 1)); + + // Request more tokens than available + EXPECT_FALSE(tokenBucket.consume(5, 10, 10, 1)); + EXPECT_DOUBLE_EQ(1.0, tokenBucket.available(10, 10, 1)); + + // Again request more tokens than available, but ask to drain + EXPECT_DOUBLE_EQ(1.0, tokenBucket.consumeOrDrain(5, 10, 10, 1)); + EXPECT_DOUBLE_EQ(0.0, tokenBucket.consumeOrDrain(1, 10, 10, 1)); +} + +TEST(TokenBucket, returnTokensTest) +{ + DynamicTokenBucket tokenBucket; + + // Empty the bucket. + EXPECT_TRUE(tokenBucket.consume(10, 10, 10, 5)); + // consume should fail now. + EXPECT_FALSE(tokenBucket.consume(1, 10, 10, 5)); + EXPECT_DOUBLE_EQ(0.0, tokenBucket.consumeOrDrain(1, 10, 10, 5)); + + // Return tokens. Return 40 'excess' tokens but they wont be available to + // later callers. + tokenBucket.returnTokens(50, 10); + // Should be able to allocate 10 tokens again but the extra 40 returned in + // previous call are gone. + EXPECT_TRUE(tokenBucket.consume(10, 10, 10, 5)); + EXPECT_FALSE(tokenBucket.consume(1, 10, 10, 5)); +} + +TEST(TokenBucket, consumeOrBorrowTest) +{ + DynamicTokenBucket tokenBucket; + + // Empty the bucket. + EXPECT_TRUE(tokenBucket.consume(10, 10, 10, 1)); + // consume should fail now. + EXPECT_FALSE(tokenBucket.consume(1, 10, 10, 1)); + // Now borrow from future allocations. Each call is asking for 1s worth of + // allocations so it should return (i+1)*1s in the ith iteration as the time + // caller needs to wait. + for (int i = 0; i < 10; ++i) { + auto waitTime = tokenBucket.consumeWithBorrowNonBlocking(10, 10, 10, 1); + EXPECT_TRUE(waitTime.is_initialized()); + EXPECT_DOUBLE_EQ((i + 1) * 1.0, *waitTime); + } + + // No allocation will succeed until nowInSeconds goes higher than 11s. + EXPECT_FALSE(tokenBucket.consume(1, 10, 10, 11)); +} diff --git a/src/core/tests/TokenBucketTest.h b/src/core/tests/TokenBucketTest.h new file mode 100644 index 0000000000..2a500a3664 --- /dev/null +++ b/src/core/tests/TokenBucketTest.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include + +namespace folly { + +struct TokenBucketTest : public ::testing::TestWithParam> +{ +}; + +} // namespace folly diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 33cb78d8d3..f015f52a2e 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,7 +103,7 @@ fds_service::fds_service() { uint32_t limit_rate = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_limit_rate", 20, "rate limit of fds(Mb)"); - _token_bucket.reset(new folly::TokenBucket(limit_rate * 1e6, 5 * limit_rate * 1e6)); + _token_bucket.reset(new folly::TokenBucket(limit_rate, 5 * limit_rate)); } fds_service::~fds_service() {} From b438617d099df4c6121434f18bd5943c9570d8e8 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 21 Apr 2020 13:14:20 +0800 Subject: [PATCH 08/49] test --- src/dist/block_service/fds/fds_service.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index f015f52a2e..a2714a8c65 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,7 +103,7 @@ fds_service::fds_service() { uint32_t limit_rate = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_limit_rate", 20, "rate limit of fds(Mb)"); - _token_bucket.reset(new folly::TokenBucket(limit_rate, 5 * limit_rate)); + _token_bucket.reset(new folly::TokenBucket(limit_rate * 1e6, 5 * limit_rate *1e6)); } fds_service::~fds_service() {} @@ -496,7 +496,7 @@ error_code fds_file_object::get_content_with_throttling(uint64_t start, std::ostream &os, uint64_t &transfered_bytes) { - const uint64_t BATCH_MAX = 1e6; // 1MB(8Mb) + const uint64_t BATCH_MAX = std::min(1e6, _service->_token_bucket->burst() / 8); uint64_t once_transfered_bytes = 0; transfered_bytes = 0; uint64_t pos = start; @@ -589,7 +589,7 @@ dsn::error_code fds_file_object::get_content(uint64_t pos, error_code fds_file_object::put_content_with_throttling(std::istream &is, uint64_t &transfered_bytes) { - const uint64_t BATCH_MAX = 1e6; // 1MB(8Mb) + const uint64_t BATCH_MAX = std::min(1e6, _service->_token_bucket->burst() / 8); uint64_t once_transfered_bytes = 0; transfered_bytes = 0; char *buffer = new char[BATCH_MAX]; From b8b8083cb3ca779712f68386af503a29e7522c47 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 21 Apr 2020 18:58:46 +0800 Subject: [PATCH 09/49] test --- src/dist/block_service/fds/fds_service.cpp | 69 ++++++++-------------- src/dist/block_service/fds/fds_service.h | 13 ++-- 2 files changed, 32 insertions(+), 50 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index a2714a8c65..37308c7a06 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace dsn { namespace dist { @@ -101,9 +102,11 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { - uint32_t limit_rate = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_limit_rate", 20, "rate limit of fds(Mb)"); - _token_bucket.reset(new folly::TokenBucket(limit_rate * 1e6, 5 * limit_rate *1e6)); + uint64_t rate_limit = (uint32_t)dsn_config_get_value_uint64( + "replication", "fds_limit_rate", 100, "rate limit of fds(Mb)"); + // burst size must be greater than 64MB * 8 + uint64_t burst_size = std::max(3 * rate_limit * 1e6, 64e6 * 8); + _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6, burst_size)); } fds_service::~fds_service() {} @@ -491,12 +494,12 @@ fds_file_object::fds_file_object(fds_service *s, fds_file_object::~fds_file_object() {} -error_code fds_file_object::get_content_with_throttling(uint64_t start, - int64_t length, - std::ostream &os, - uint64_t &transfered_bytes) +error_code fds_file_object::get_content_in_batches(uint64_t start, + int64_t length, + std::ostream &os, + uint64_t &transfered_bytes) { - const uint64_t BATCH_MAX = std::min(1e6, _service->_token_bucket->burst() / 8); + const uint64_t BATCH_MAX = 1e6; uint64_t once_transfered_bytes = 0; transfered_bytes = 0; uint64_t pos = start; @@ -582,44 +585,19 @@ dsn::error_code fds_file_object::get_content(uint64_t pos, return err; } } - - return err; -} - -error_code fds_file_object::put_content_with_throttling(std::istream &is, - uint64_t &transfered_bytes) -{ - const uint64_t BATCH_MAX = std::min(1e6, _service->_token_bucket->burst() / 8); - uint64_t once_transfered_bytes = 0; - transfered_bytes = 0; - char *buffer = new char[BATCH_MAX]; - auto cleanup = defer([buffer]() { delete[] buffer; }); - - while (is.good() && !is.eof()) { - int batch = is.readsome(buffer, BATCH_MAX); - if (0 == batch) { - break; - } - // get tokens from token bucket - _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); - - std::istringstream batch_is(std::string(buffer, batch)); - error_code err = put_content(batch_is, once_transfered_bytes); - transfered_bytes += once_transfered_bytes; - if (err != ERR_OK || once_transfered_bytes < batch) { - return err; - } - } - - return ERR_OK; } dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is, + int64_t to_transfer_bytes, uint64_t &transfered_bytes) { dsn::error_code err = dsn::ERR_OK; transfered_bytes = 0; galaxy::fds::GalaxyFDSClient *c = _service->get_client(); + + // get tokens from token bucket + _service->_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes); + try { c->putObject(_service->get_bucket_name(), _fds_path, is, galaxy::fds::FDSObjectMetadata()); } catch (const galaxy::fds::GalaxyFDSClientException &ex) { @@ -681,7 +659,7 @@ dsn::task_ptr fds_file_object::write(const write_request &req, write_response resp; std::istringstream is; is.str(std::string(req.buffer.data(), req.buffer.length())); - resp.err = put_content_with_throttling(is, resp.written_size); + resp.err = put_content(is, req.buffer.length(), resp.written_size); t->enqueue_with(resp); release_ref(); @@ -703,6 +681,10 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req, add_ref(); auto upload_background = [this, req, t]() { const std::string &local_file = req.input_local_name; + // get file size + int64_t file_sz = 0; + dsn::utils::filesystem::file_size(local_file, file_sz); + upload_response resp; // TODO: we can cache the whole file in buffer, then upload the buffer rather than the // ifstream, because if ifstream read file beyond 60s, fds-server will reset the session, @@ -718,7 +700,7 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req, ptr); resp.err = dsn::ERR_FILE_OPERATION_FAILED; } else { - resp.err = put_content_with_throttling(is, resp.uploaded_size); + resp.err = put_content(is, file_sz, resp.uploaded_size); is.close(); } @@ -751,8 +733,7 @@ dsn::task_ptr fds_file_object::read(const read_request &req, read_response resp; std::ostringstream os; uint64_t transferd_size; - resp.err = - get_content_with_throttling(req.remote_pos, req.remote_length, os, transferd_size); + resp.err = get_content_in_batches(req.remote_pos, req.remote_length, os, transferd_size); if (os.tellp() > 0) { std::string *output = new std::string(); *output = os.str(); @@ -804,8 +785,8 @@ dsn::task_ptr fds_file_object::download(const download_request &req, auto download_background = [this, req, handle, t]() { download_response resp; uint64_t transfered_size; - resp.err = get_content_with_throttling( - req.remote_pos, req.remote_length, *handle, transfered_size); + resp.err = + get_content_in_batches(req.remote_pos, req.remote_length, *handle, transfered_size); resp.downloaded_size = 0; if (handle->tellp() != -1) resp.downloaded_size = handle->tellp(); diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index bf49d2ed8f..07b7a4e061 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -105,16 +105,17 @@ class fds_file_object : public block_file dsn::task_tracker *tracker) override; private: - error_code get_content_with_throttling(uint64_t start, - int64_t length, - std::ostream &os, - uint64_t &transfered_bytes); + error_code get_content_in_batches(uint64_t start, + int64_t length, + std::ostream &os, + uint64_t &transfered_bytes); dsn::error_code get_content(uint64_t pos, int64_t length, /*out*/ std::ostream &os, /*out*/ uint64_t &transfered_bytes); - error_code put_content_with_throttling(std::istream &is, uint64_t &transfered_bytes); - dsn::error_code put_content(/*in-out*/ std::istream &is, /*out*/ uint64_t &transfered_bytes); + dsn::error_code put_content(/*in-out*/ std::istream &is, + /*int*/ int64_t to_transfer_bytes, + /*out*/ uint64_t &transfered_bytes); fds_service *_service; std::string _fds_path; std::string _md5sum; From d30fcbe85b9e30eb989f49f2080c735cfea5f012 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 21 Apr 2020 20:03:51 +0800 Subject: [PATCH 10/49] test --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 37308c7a06..4c0a2546f1 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -596,7 +596,7 @@ dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is, galaxy::fds::GalaxyFDSClient *c = _service->get_client(); // get tokens from token bucket - _service->_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes); + _service->_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes * 8); try { c->putObject(_service->get_bucket_name(), _fds_path, is, galaxy::fds::FDSObjectMetadata()); From 0daa28cec01991308a2a7a406e66cd6bfae4908e Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 12:38:46 +0800 Subject: [PATCH 11/49] fix --- src/dist/block_service/fds/fds_service.cpp | 118 ++++++++++----------- src/dist/block_service/fds/fds_service.h | 20 ++-- 2 files changed, 68 insertions(+), 70 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 4c0a2546f1..0f88fa4691 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -494,46 +494,62 @@ fds_file_object::fds_file_object(fds_service *s, fds_file_object::~fds_file_object() {} -error_code fds_file_object::get_content_in_batches(uint64_t start, - int64_t length, - std::ostream &os, - uint64_t &transfered_bytes) +error_code fds_file_object::get_file_meta() { - const uint64_t BATCH_MAX = 1e6; - uint64_t once_transfered_bytes = 0; - transfered_bytes = 0; - uint64_t pos = start; - while (pos < start + length) { - int64_t batch = std::min(BATCH_MAX, start + length - pos); - // get tokens from token bucket - _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); - - error_code err = get_content(pos, batch, os, once_transfered_bytes); - transfered_bytes += once_transfered_bytes; - if (err != ERR_OK || once_transfered_bytes < batch) { - return err; + galaxy::fds::GalaxyFDSClient *c = _service->get_client(); + try { + auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata(); + // get file size + auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY); + if (iter != meta.end()) { + _size = atoll(iter->second.c_str()); } - pos += batch; - } - return ERR_OK; + // get md5 + iter = meta.find(fds_service::FILE_MD5_KEY); + if (iter == meta.end()) { + _md5sum = iter->second; + } + + _has_meta_synced = true; + return ERR_OK; + } catch (const galaxy::fds::GalaxyFDSClientException &ex) { + if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) { + return ERR_HANDLER_NOT_FOUND; + } else { + derror("fds getObjectMetadata failed: parameter(%s), code(%d), msg(%s)", + _name.c_str(), + ex.code(), + ex.what()); + return ERR_FS_INTERNAL; + } + } } -dsn::error_code fds_file_object::get_content(uint64_t pos, - int64_t length, - /*out*/ std::ostream &os, - /*out*/ uint64_t &transfered_bytes) +error_code fds_file_object::get_content(uint64_t pos, + int64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes) { - dsn::error_code err; + error_code err; transfered_bytes = 0; + // get file meta if it is not synced + if (!_has_meta_synced) { + err = get_file_meta(); + if (ERR_OK != err) { + return err; + } + } + + // get tokens from token bucket + _service->_token_bucket->consumeWithBorrowAndWait(_size * 8); + while (true) { - if (_has_meta_synced) { - // if we have download enough or we have reach the end - if ((length != -1 && (int64_t)transfered_bytes >= length) || - transfered_bytes + pos >= _size) { - return dsn::ERR_OK; - } + // if we have download enough or we have reach the end + if ((length != -1 && (int64_t)transfered_bytes >= length) || + transfered_bytes + pos >= _size) { + return ERR_OK; } try { @@ -547,51 +563,36 @@ dsn::error_code fds_file_object::get_content(uint64_t pos, pos + transfered_bytes, length - transfered_bytes); dinfo("get object from fds succeed, remote_file(%s)", _fds_path.c_str()); - if (!_has_meta_synced) { - const std::map &meta = obj->objectMetadata().metadata(); - auto iter = meta.find(fds_service::FILE_MD5_KEY); - if (iter != meta.end()) { - _md5sum = iter->second; - iter = meta.find(fds_service::FILE_LENGTH_KEY); - dassert(iter != meta.end(), - "%s: can't get %s in getObject %s", - _name.c_str(), - fds_service::FILE_LENGTH_KEY.c_str(), - _fds_path.c_str()); - _size = atoll(iter->second.c_str()); - _has_meta_synced = true; - } - } std::istream &is = obj->objectContent(); transfered_bytes += utils::copy_stream(is, os, PIECE_SIZE); - err = dsn::ERR_OK; + err = ERR_OK; } catch (const galaxy::fds::GalaxyFDSClientException &ex) { derror("fds getObject error: remote_file(%s), code(%d), msg(%s)", file_name().c_str(), ex.code(), ex.what()); - if (!_has_meta_synced && ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) { + if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) { _has_meta_synced = true; _md5sum = ""; _size = 0; - err = dsn::ERR_OBJECT_NOT_FOUND; + err = ERR_OBJECT_NOT_FOUND; } else { - err = dsn::ERR_FS_INTERNAL; + err = ERR_FS_INTERNAL; } } FDS_EXCEPTION_HANDLE(err, "getObject", file_name().c_str()) - if (err != dsn::ERR_OK) { + if (err != ERR_OK) { return err; } } } -dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is, - int64_t to_transfer_bytes, - uint64_t &transfered_bytes) +error_code fds_file_object::put_content(/*in-out*/ std::istream &is, + int64_t to_transfer_bytes, + uint64_t &transfered_bytes) { - dsn::error_code err = dsn::ERR_OK; + error_code err = ERR_OK; transfered_bytes = 0; galaxy::fds::GalaxyFDSClient *c = _service->get_client(); @@ -609,7 +610,7 @@ dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is, } FDS_EXCEPTION_HANDLE(err, "putObject", file_name().c_str()) - if (err != dsn::ERR_OK) { + if (err != ERR_OK) { return err; } @@ -733,7 +734,7 @@ dsn::task_ptr fds_file_object::read(const read_request &req, read_response resp; std::ostringstream os; uint64_t transferd_size; - resp.err = get_content_in_batches(req.remote_pos, req.remote_length, os, transferd_size); + resp.err = get_content(req.remote_pos, req.remote_length, os, transferd_size); if (os.tellp() > 0) { std::string *output = new std::string(); *output = os.str(); @@ -785,8 +786,7 @@ dsn::task_ptr fds_file_object::download(const download_request &req, auto download_background = [this, req, handle, t]() { download_response resp; uint64_t transfered_size; - resp.err = - get_content_in_batches(req.remote_pos, req.remote_length, *handle, transfered_size); + resp.err = get_content(req.remote_pos, req.remote_length, *handle, transfered_size); resp.downloaded_size = 0; if (handle->tellp() != -1) resp.downloaded_size = handle->tellp(); diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index 07b7a4e061..34280e176f 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -105,17 +105,15 @@ class fds_file_object : public block_file dsn::task_tracker *tracker) override; private: - error_code get_content_in_batches(uint64_t start, - int64_t length, - std::ostream &os, - uint64_t &transfered_bytes); - dsn::error_code get_content(uint64_t pos, - int64_t length, - /*out*/ std::ostream &os, - /*out*/ uint64_t &transfered_bytes); - dsn::error_code put_content(/*in-out*/ std::istream &is, - /*int*/ int64_t to_transfer_bytes, - /*out*/ uint64_t &transfered_bytes); + error_code get_content(uint64_t pos, + int64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes); + error_code put_content(/*in-out*/ std::istream &is, + /*int*/ int64_t to_transfer_bytes, + /*out*/ uint64_t &transfered_bytes); + error_code get_file_meta(); + fds_service *_service; std::string _fds_path; std::string _md5sum; From 4bc9556dee350051975d621ddeb5c28654d1a0af Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 13:20:01 +0800 Subject: [PATCH 12/49] test --- src/dist/block_service/fds/fds_service.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 0f88fa4691..16a8743f4c 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -543,7 +543,9 @@ error_code fds_file_object::get_content(uint64_t pos, } // get tokens from token bucket - _service->_token_bucket->consumeWithBorrowAndWait(_size * 8); + if (!_service->_token_bucket->consumeWithBorrowAndWait(_size * 8)) { + return ERR_BUSY; + } while (true) { // if we have download enough or we have reach the end From 429a0d04d183c094669f69f28b6c4345d0458f03 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 13:25:33 +0800 Subject: [PATCH 13/49] fiux --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 16a8743f4c..f1b2c7f124 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -515,7 +515,7 @@ error_code fds_file_object::get_file_meta() return ERR_OK; } catch (const galaxy::fds::GalaxyFDSClientException &ex) { if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) { - return ERR_HANDLER_NOT_FOUND; + return ERR_OBJECT_NOT_FOUND; } else { derror("fds getObjectMetadata failed: parameter(%s), code(%d), msg(%s)", _name.c_str(), From 5f39178fa4a9ed8dc80e31cc6065d5c4977f90c4 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 14:07:34 +0800 Subject: [PATCH 14/49] fix --- src/dist/block_service/fds/fds_service.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index f1b2c7f124..0ece7ed5ea 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,10 +103,8 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { uint64_t rate_limit = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_limit_rate", 100, "rate limit of fds(Mb)"); - // burst size must be greater than 64MB * 8 - uint64_t burst_size = std::max(3 * rate_limit * 1e6, 64e6 * 8); - _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6, burst_size)); + "replication", "fds_limit_rate", 200, "rate limit of fds(Mb/s)"); + _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6, 3 * rate_limit * 1e6)); } fds_service::~fds_service() {} From 6cb6cb3addf6feae4ee7d240aa81b17e5d6dd153 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 14:31:54 +0800 Subject: [PATCH 15/49] fix --- src/dist/block_service/fds/fds_service.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 0ece7ed5ea..a285886b45 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,8 +103,11 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { uint64_t rate_limit = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_limit_rate", 200, "rate limit of fds(Mb/s)"); - _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6, 3 * rate_limit * 1e6)); + "replication", "fds_limit_rate", 20, "rate limit of fds(Mb/s)"); + // If file size > burst size, the file will be rejected by the token bucket. + // sst file size is about 64MB, so burst size must be greater than 64MB. + uint64_t burst_size = std::max(3 * rate_limit * 1e6, 70e6 * 8); + _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6, burst_size)); } fds_service::~fds_service() {} From e6eb81d839134f3524a7508f9cd6e009ef70879f Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 15:16:58 +0800 Subject: [PATCH 16/49] test --- src/dist/block_service/fds/fds_service.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index a285886b45..2617924f46 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -102,11 +102,11 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { - uint64_t rate_limit = (uint32_t)dsn_config_get_value_uint64( + uint32_t rate_limit = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_limit_rate", 20, "rate limit of fds(Mb/s)"); // If file size > burst size, the file will be rejected by the token bucket. // sst file size is about 64MB, so burst size must be greater than 64MB. - uint64_t burst_size = std::max(3 * rate_limit * 1e6, 70e6 * 8); + uint32_t burst_size = std::max(3 * rate_limit * 1e6, 70e6 * 8); _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6, burst_size)); } From 02b9808a10cd5e72f2db3d0daffa51600699254e Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 17:35:08 +0800 Subject: [PATCH 17/49] test --- src/dist/block_service/fds/fds_service.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 2617924f46..5e42b2762f 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -97,7 +97,6 @@ DEFINE_THREAD_POOL_CODE(THREAD_POOL_FDS_SERVICE) DEFINE_TASK_CODE(LPC_FDS_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_FDS_SERVICE) const std::string fds_service::FILE_LENGTH_CUSTOM_KEY = "x-xiaomi-meta-content-length"; -const std::string fds_service::FILE_LENGTH_KEY = "content-length"; const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() From 4233135d8399bb219e384f683279c41bc71b7734 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 17:58:47 +0800 Subject: [PATCH 18/49] fix --- src/dist/block_service/fds/fds_service.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 5e42b2762f..d34ee6ea04 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -543,7 +543,13 @@ error_code fds_file_object::get_content(uint64_t pos, } // get tokens from token bucket - if (!_service->_token_bucket->consumeWithBorrowAndWait(_size * 8)) { + uint32_t consume_token; + if (-1 == length) { + consume_token = _size * 8; + } else { + consume_token = length * 8; + } + if (!_service->_token_bucket->consumeWithBorrowAndWait(consume_token)) { return ERR_BUSY; } From f4f87209171bf92c24306685f98fc4af5838552e Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 18:38:56 +0800 Subject: [PATCH 19/49] get content in batches --- src/dist/block_service/fds/fds_service.cpp | 63 ++++++++++++++-------- src/dist/block_service/fds/fds_service.h | 6 ++- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index d34ee6ea04..c9b1075370 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -526,11 +526,12 @@ error_code fds_file_object::get_file_meta() } } -error_code fds_file_object::get_content(uint64_t pos, - int64_t length, - /*out*/ std::ostream &os, - /*out*/ uint64_t &transfered_bytes) +error_code fds_file_object::get_content_in_batches(uint64_t start, + int64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes) { + const uint64_t BATCH_MAX = 1e6; error_code err; transfered_bytes = 0; @@ -542,34 +543,51 @@ error_code fds_file_object::get_content(uint64_t pos, } } - // get tokens from token bucket - uint32_t consume_token; + // if length = -1, it means we should transfer the whole file + uint64_t to_transfer_bytes = length; if (-1 == length) { - consume_token = _size * 8; - } else { - consume_token = length * 8; + to_transfer_bytes = _size; } - if (!_service->_token_bucket->consumeWithBorrowAndWait(consume_token)) { - return ERR_BUSY; + + uint64_t pos = start; + uint64_t once_transfered_bytes = 0; + while (pos < start + to_transfer_bytes) { + uint64_t batch = std::min(BATCH_MAX, start + to_transfer_bytes - pos); + // get tokens from token bucket + _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); + + err = get_content(pos, batch, os, once_transfered_bytes); + transfered_bytes += once_transfered_bytes; + if (err != ERR_OK || once_transfered_bytes < batch) { + ddebug("once_transfered_bytes = %ld, batch = %ld", once_transfered_bytes, batch); + return err; + } + pos += batch; } + return ERR_OK; +} + +error_code fds_file_object::get_content(uint64_t pos, + uint64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes) +{ + error_code err; + transfered_bytes = 0; while (true) { // if we have download enough or we have reach the end - if ((length != -1 && (int64_t)transfered_bytes >= length) || - transfered_bytes + pos >= _size) { + if ((int64_t)transfered_bytes >= length || transfered_bytes + pos >= _size) { return ERR_OK; } try { galaxy::fds::GalaxyFDSClient *c = _service->get_client(); std::shared_ptr obj; - if (length == -1) - obj = c->getObject(_service->get_bucket_name(), _fds_path, pos + transfered_bytes); - else - obj = c->getObject(_service->get_bucket_name(), - _fds_path, - pos + transfered_bytes, - length - transfered_bytes); + obj = c->getObject(_service->get_bucket_name(), + _fds_path, + pos + transfered_bytes, + length - transfered_bytes); dinfo("get object from fds succeed, remote_file(%s)", _fds_path.c_str()); std::istream &is = obj->objectContent(); transfered_bytes += utils::copy_stream(is, os, PIECE_SIZE); @@ -742,7 +760,7 @@ dsn::task_ptr fds_file_object::read(const read_request &req, read_response resp; std::ostringstream os; uint64_t transferd_size; - resp.err = get_content(req.remote_pos, req.remote_length, os, transferd_size); + resp.err = get_content_in_batches(req.remote_pos, req.remote_length, os, transferd_size); if (os.tellp() > 0) { std::string *output = new std::string(); *output = os.str(); @@ -794,7 +812,8 @@ dsn::task_ptr fds_file_object::download(const download_request &req, auto download_background = [this, req, handle, t]() { download_response resp; uint64_t transfered_size; - resp.err = get_content(req.remote_pos, req.remote_length, *handle, transfered_size); + resp.err = + get_content_in_batches(req.remote_pos, req.remote_length, *handle, transfered_size); resp.downloaded_size = 0; if (handle->tellp() != -1) resp.downloaded_size = handle->tellp(); diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index 34280e176f..92590100c4 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -105,8 +105,12 @@ class fds_file_object : public block_file dsn::task_tracker *tracker) override; private: + error_code get_content_in_batches(uint64_t start, + int64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes); error_code get_content(uint64_t pos, - int64_t length, + uint64_t length, /*out*/ std::ostream &os, /*out*/ uint64_t &transfered_bytes); error_code put_content(/*in-out*/ std::istream &is, From 52b3a2712cc9b9bc24676314040973db1b357feb Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 18:44:13 +0800 Subject: [PATCH 20/49] add note --- src/dist/block_service/fds/fds_service.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index c9b1075370..ebc0983ce6 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,8 +103,11 @@ fds_service::fds_service() { uint32_t rate_limit = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_limit_rate", 20, "rate limit of fds(Mb/s)"); - // If file size > burst size, the file will be rejected by the token bucket. - // sst file size is about 64MB, so burst size must be greater than 64MB. + /// For send operation, we can't send a file in batches. Because putContent interface of fds + /// will + /// overwrite what was sent before for the same file. So we must send a file as a whole. + /// If file size > burst size, the file will be rejected by the token bucket. + /// And sst maximum file size is about 64MB, so burst size must be greater than 64MB. uint32_t burst_size = std::max(3 * rate_limit * 1e6, 70e6 * 8); _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6, burst_size)); } From c7f5a8aa99cf6d60508b19c47b3998efd2319f15 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 18:45:20 +0800 Subject: [PATCH 21/49] fix --- src/dist/block_service/fds/fds_service.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index ebc0983ce6..b83e34386f 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -104,8 +104,7 @@ fds_service::fds_service() uint32_t rate_limit = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_limit_rate", 20, "rate limit of fds(Mb/s)"); /// For send operation, we can't send a file in batches. Because putContent interface of fds - /// will - /// overwrite what was sent before for the same file. So we must send a file as a whole. + /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. /// And sst maximum file size is about 64MB, so burst size must be greater than 64MB. uint32_t burst_size = std::max(3 * rate_limit * 1e6, 70e6 * 8); From 390809b3532dfbf07c861a841d13968e86f3e9fa Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 18:54:35 +0800 Subject: [PATCH 22/49] fix --- src/dist/block_service/fds/fds_service.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index b83e34386f..fefd911e07 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -509,7 +509,7 @@ error_code fds_file_object::get_file_meta() // get md5 iter = meta.find(fds_service::FILE_MD5_KEY); - if (iter == meta.end()) { + if (iter != meta.end()) { _md5sum = iter->second; } @@ -579,7 +579,7 @@ error_code fds_file_object::get_content(uint64_t pos, transfered_bytes = 0; while (true) { // if we have download enough or we have reach the end - if ((int64_t)transfered_bytes >= length || transfered_bytes + pos >= _size) { + if (transfered_bytes >= length || transfered_bytes + pos >= _size) { return ERR_OK; } From 38a938c8ba58dc5b72b895e76c9a2ef884e226f7 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 18:55:28 +0800 Subject: [PATCH 23/49] fix --- src/dist/block_service/fds/fds_service.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index fefd911e07..7b46289d09 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -501,13 +501,11 @@ error_code fds_file_object::get_file_meta() galaxy::fds::GalaxyFDSClient *c = _service->get_client(); try { auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata(); - // get file size auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY); if (iter != meta.end()) { _size = atoll(iter->second.c_str()); } - // get md5 iter = meta.find(fds_service::FILE_MD5_KEY); if (iter != meta.end()) { _md5sum = iter->second; From 597125b04b6dc0a18137c4e301da25a0e1ca83aa Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 19:08:03 +0800 Subject: [PATCH 24/49] fix --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 7b46289d09..cb6de2382f 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,7 +103,7 @@ fds_service::fds_service() { uint32_t rate_limit = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_limit_rate", 20, "rate limit of fds(Mb/s)"); - /// For send operation, we can't send a file in batches. Because putContent interface of fds + /// For put operation, we can't send a file in batches. Because putContent interface of fds /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. /// And sst maximum file size is about 64MB, so burst size must be greater than 64MB. From 1630c534db834b31567e07df1c7d90298d0b921f Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 19:10:15 +0800 Subject: [PATCH 25/49] fix --- src/dist/block_service/fds/fds_service.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index cb6de2382f..5083ea409c 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -559,7 +559,6 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, err = get_content(pos, batch, os, once_transfered_bytes); transfered_bytes += once_transfered_bytes; if (err != ERR_OK || once_transfered_bytes < batch) { - ddebug("once_transfered_bytes = %ld, batch = %ld", once_transfered_bytes, batch); return err; } pos += batch; From 84d49b18c4ee67f268b8156c11a8ea2605764a14 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 19:14:09 +0800 Subject: [PATCH 26/49] fix --- src/dist/block_service/fds/fds_service.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 5083ea409c..2de8e2db9e 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -532,7 +532,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, /*out*/ uint64_t &transfered_bytes) { const uint64_t BATCH_MAX = 1e6; - error_code err; + error_code err = ERR_OK; transfered_bytes = 0; // get file meta if it is not synced @@ -572,7 +572,7 @@ error_code fds_file_object::get_content(uint64_t pos, /*out*/ std::ostream &os, /*out*/ uint64_t &transfered_bytes) { - error_code err; + error_code err = ERR_OK; transfered_bytes = 0; while (true) { // if we have download enough or we have reach the end From 0425f2817a41c05f68ec2d11bfba29b367b4ae34 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 22 Apr 2020 19:19:52 +0800 Subject: [PATCH 27/49] format --- include/dsn/utility/TokenBucket.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/dsn/utility/TokenBucket.h b/include/dsn/utility/TokenBucket.h index 5581289592..fcf93c17a1 100644 --- a/include/dsn/utility/TokenBucket.h +++ b/include/dsn/utility/TokenBucket.h @@ -226,9 +226,9 @@ class BasicDynamicTokenBucket * Thread-safe. */ boost::optional consumeWithBorrowNonBlocking(double toConsume, - double rate, - double burstSize, - double nowInSeconds = defaultClockNow()) + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) { assert(rate > 0); assert(burstSize > 0); @@ -457,7 +457,7 @@ class BasicTokenBucket * be compatible with the bucket configuration. */ boost::optional consumeWithBorrowNonBlocking(double toConsume, - double nowInSeconds = defaultClockNow()) + double nowInSeconds = defaultClockNow()) { return tokenBucket_.consumeWithBorrowNonBlocking( toConsume, rate_, burstSize_, nowInSeconds); From bddabfb0f65d20930f12e18a5036096419cdeabf Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Sun, 26 Apr 2020 10:53:58 +0800 Subject: [PATCH 28/49] Update src/dist/block_service/fds/fds_service.h Co-Authored-By: Wu Tao --- src/dist/block_service/fds/fds_service.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index 92590100c4..2037ffd3a2 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -2,7 +2,9 @@ #define FDS_SERVICE_H #include -#include +namespace folly { +class TokenBucket; +} // namespace folly namespace galaxy { namespace fds { From 4ef68ac4967421fb759d228a99b58772d51b5298 Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Sun, 26 Apr 2020 10:54:16 +0800 Subject: [PATCH 29/49] Update src/dist/block_service/fds/fds_service.cpp Co-Authored-By: Wu Tao --- src/dist/block_service/fds/fds_service.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 2de8e2db9e..78280cfe31 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -503,7 +503,11 @@ error_code fds_file_object::get_file_meta() auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata(); auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY); if (iter != meta.end()) { - _size = atoll(iter->second.c_str()); + bool valid = buf2int64(iter->second, _size) + if(!valid) { + derror(...); + return ...; + } } iter = meta.find(fds_service::FILE_MD5_KEY); From 26552aeb3a5c6d7fb72f9dd227c0b1b22f8a882b Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Sun, 26 Apr 2020 10:54:30 +0800 Subject: [PATCH 30/49] Update src/dist/block_service/fds/fds_service.cpp Co-Authored-By: Wu Tao --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 78280cfe31..e713dfba1d 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -556,7 +556,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, uint64_t pos = start; uint64_t once_transfered_bytes = 0; while (pos < start + to_transfer_bytes) { - uint64_t batch = std::min(BATCH_MAX, start + to_transfer_bytes - pos); + uint64_t batch_len = std::min(BATCH_MAX, start + to_transfer_bytes - pos); // get tokens from token bucket _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); From dd242f0d48ecdf1914e9fb522a8b4a092b202c5b Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Sun, 26 Apr 2020 10:59:35 +0800 Subject: [PATCH 31/49] format --- src/dist/block_service/fds/fds_service.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index e713dfba1d..3ef598f504 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -503,10 +503,10 @@ error_code fds_file_object::get_file_meta() auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata(); auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY); if (iter != meta.end()) { - bool valid = buf2int64(iter->second, _size) - if(!valid) { - derror(...); - return ...; + bool valid = buf2int64(iter->second, _size) if (!valid) + { + derror(...); + return ...; } } From 957b3a908a44a85ce7d32a571233d3a5d612243f Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Sun, 26 Apr 2020 11:10:17 +0800 Subject: [PATCH 32/49] fix --- src/dist/block_service/fds/fds_service.cpp | 23 +++++++++++----------- src/dist/block_service/fds/fds_service.h | 4 +--- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 3ef598f504..13f9788f6d 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -107,8 +107,9 @@ fds_service::fds_service() /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. /// And sst maximum file size is about 64MB, so burst size must be greater than 64MB. - uint32_t burst_size = std::max(3 * rate_limit * 1e6, 70e6 * 8); - _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6, burst_size)); + const int BYTE_TO_BIT = 8; + uint32_t burst_size = std::max(3 * rate_limit * 1e6 / BYTE_TO_BIT, 70e6); + _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); } fds_service::~fds_service() {} @@ -503,10 +504,10 @@ error_code fds_file_object::get_file_meta() auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata(); auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY); if (iter != meta.end()) { - bool valid = buf2int64(iter->second, _size) if (!valid) - { - derror(...); - return ...; + bool valid = dsn::buf2uint64(iter->second, _size); + if (!valid || _size < 0) { + derror("Error to get file size"); + return ERR_FS_INTERNAL; } } @@ -558,14 +559,14 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, while (pos < start + to_transfer_bytes) { uint64_t batch_len = std::min(BATCH_MAX, start + to_transfer_bytes - pos); // get tokens from token bucket - _service->_token_bucket->consumeWithBorrowAndWait(batch * 8); + _service->_token_bucket->consumeWithBorrowAndWait(batch_len); - err = get_content(pos, batch, os, once_transfered_bytes); + err = get_content(pos, batch_len, os, once_transfered_bytes); transfered_bytes += once_transfered_bytes; - if (err != ERR_OK || once_transfered_bytes < batch) { + if (err != ERR_OK || once_transfered_bytes < batch_len) { return err; } - pos += batch; + pos += batch_len; } return ERR_OK; @@ -626,7 +627,7 @@ error_code fds_file_object::put_content(/*in-out*/ std::istream &is, galaxy::fds::GalaxyFDSClient *c = _service->get_client(); // get tokens from token bucket - _service->_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes * 8); + _service->_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes); try { c->putObject(_service->get_bucket_name(), _fds_path, is, galaxy::fds::FDSObjectMetadata()); diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index 2037ffd3a2..92590100c4 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -2,9 +2,7 @@ #define FDS_SERVICE_H #include -namespace folly { -class TokenBucket; -} // namespace folly +#include namespace galaxy { namespace fds { From 079d0788fc95787072437753acfd26c61d0f0090 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Sun, 26 Apr 2020 11:19:04 +0800 Subject: [PATCH 33/49] fix --- src/dist/block_service/fds/fds_service.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 13f9788f6d..c3b9144bc6 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -536,6 +536,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, /*out*/ std::ostream &os, /*out*/ uint64_t &transfered_bytes) { + // the max batch size is 1MB const uint64_t BATCH_MAX = 1e6; error_code err = ERR_OK; transfered_bytes = 0; From 73b1b6f17ff338cec35feb4ec4e5606e27fff7e7 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Sun, 26 Apr 2020 11:45:36 +0800 Subject: [PATCH 34/49] fix --- src/dist/block_service/fds/fds_service.cpp | 1 + src/dist/block_service/fds/fds_service.h | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index c3b9144bc6..12edd68306 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -19,6 +19,7 @@ #include #include #include +#include namespace dsn { namespace dist { diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index 92590100c4..3258ae1c5b 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -2,7 +2,13 @@ #define FDS_SERVICE_H #include -#include + +namespace folly { +template +class BasicTokenBucket; + +using TokenBucket = BasicTokenBucket; +} namespace galaxy { namespace fds { From e36ec1171b483282e322fea10b7a152d45ed3aa3 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Sun, 26 Apr 2020 18:30:37 +0800 Subject: [PATCH 35/49] fix --- src/dist/block_service/fds/fds_service.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 12edd68306..2c1a9145ce 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -551,10 +551,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, } // if length = -1, it means we should transfer the whole file - uint64_t to_transfer_bytes = length; - if (-1 == length) { - to_transfer_bytes = _size; - } + uint64_t to_transfer_bytes = (length == -1 ? _size : length); uint64_t pos = start; uint64_t once_transfered_bytes = 0; From 56ec29b0bf1c9c53a006fc2b4c7831d76d24c9bd Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Sun, 26 Apr 2020 18:39:43 +0800 Subject: [PATCH 36/49] fix --- src/dist/block_service/fds/fds_service.cpp | 24 ++++++++++++++-------- src/dist/block_service/fds/fds_service.h | 3 ++- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 2c1a9145ce..9fcbc071f9 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -102,15 +102,23 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { - uint32_t rate_limit = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_limit_rate", 20, "rate limit of fds(Mb/s)"); - /// For put operation, we can't send a file in batches. Because putContent interface of fds + const int BYTE_TO_BIT = 8; + + uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64( + "replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)"); + /// For write operation, we can't send a file in batches. Because putContent interface of fds /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. /// And sst maximum file size is about 64MB, so burst size must be greater than 64MB. - const int BYTE_TO_BIT = 8; - uint32_t burst_size = std::max(3 * rate_limit * 1e6 / BYTE_TO_BIT, 70e6); - _token_bucket.reset(new folly::TokenBucket(rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); + uint32_t burst_size = std::max(3 * write_rate_limit * 1e6 / BYTE_TO_BIT, 70e6); + _write_token_bucket.reset( + new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); + + uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64( + "replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)"); + burst_size = 3 * write_rate_limit * 1e6 / BYTE_TO_BIT; + _read_token_bucket.reset( + new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); } fds_service::~fds_service() {} @@ -558,7 +566,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, while (pos < start + to_transfer_bytes) { uint64_t batch_len = std::min(BATCH_MAX, start + to_transfer_bytes - pos); // get tokens from token bucket - _service->_token_bucket->consumeWithBorrowAndWait(batch_len); + _service->_read_token_bucket->consumeWithBorrowAndWait(batch_len); err = get_content(pos, batch_len, os, once_transfered_bytes); transfered_bytes += once_transfered_bytes; @@ -626,7 +634,7 @@ error_code fds_file_object::put_content(/*in-out*/ std::istream &is, galaxy::fds::GalaxyFDSClient *c = _service->get_client(); // get tokens from token bucket - _service->_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes); + _service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes); try { c->putObject(_service->get_bucket_name(), _fds_path, is, galaxy::fds::FDSObjectMetadata()); diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index 3258ae1c5b..0e763d0155 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -71,7 +71,8 @@ class fds_service : public block_filesystem private: std::shared_ptr _client; std::string _bucket_name; - std::unique_ptr _token_bucket; + std::unique_ptr _read_token_bucket; + std::unique_ptr _write_token_bucket; friend class fds_file_object; }; From b03211b983af85a077340615bd7e8b9d984fe152 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Sun, 26 Apr 2020 18:42:49 +0800 Subject: [PATCH 37/49] fix --- src/dist/block_service/fds/fds_service.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 9fcbc071f9..643d0b8b8b 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -253,6 +253,7 @@ dsn::task_ptr fds_service::list_dir(const ls_request &req, return t; } +// TODO(zhaoliwei) refactor these code, because there have same code in get_file_meta() dsn::task_ptr fds_service::create_file(const create_file_request &req, dsn::task_code code, const create_file_callback &cb, From 4743ec8cd6b8d097683ddae72a5399224f73c2c3 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Tue, 28 Apr 2020 10:10:58 +0800 Subject: [PATCH 38/49] fix --- src/dist/block_service/fds/fds_service.cpp | 27 ++++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 643d0b8b8b..41e78408ed 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -512,16 +512,23 @@ error_code fds_file_object::get_file_meta() galaxy::fds::GalaxyFDSClient *c = _service->get_client(); try { auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata(); + + // get file length auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY); - if (iter != meta.end()) { - bool valid = dsn::buf2uint64(iter->second, _size); - if (!valid || _size < 0) { - derror("Error to get file size"); - return ERR_FS_INTERNAL; - } - } + dassert(iter != meta.end(), + "can't find %s in object(%s)'s metadata", + fds_service::FILE_LENGTH_CUSTOM_KEY.c_str(), + _fds_path.c_str()); + bool valid = dsn::buf2uint64(iter->second, _size); + dassert(valid && _size >= 0, "error to get file size"); + // get md5 key iter = meta.find(fds_service::FILE_MD5_KEY); + dassert(iter != meta.end(), + "can't find %s in object(%s)'s metadata", + fds_service::FILE_MD5_KEY.c_str(), + _fds_path.c_str()); + if (iter != meta.end()) { _md5sum = iter->second; } @@ -837,6 +844,6 @@ dsn::task_ptr fds_file_object::download(const download_request &req, dsn::tasking::enqueue(LPC_FDS_CALL, nullptr, download_background); return t; } -} -} -} +} // namespace block_service +} // namespace dist +} // namespace dsn From 2f53f3d64f4572dbf359a192abadb6ecb3b8cf6c Mon Sep 17 00:00:00 2001 From: levy5307 Date: Tue, 28 Apr 2020 10:54:36 +0800 Subject: [PATCH 39/49] fix --- src/dist/block_service/fds/fds_service.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 41e78408ed..acf3017c3a 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -104,13 +104,17 @@ fds_service::fds_service() { const int BYTE_TO_BIT = 8; + uint64_t sst_file_size = dsn_config_get_value_uint64("pegasus.server", + "rocksdb_target_file_size_base", + 64 * 1024 * 1024, + "rocksdb options.target_file_size_base"); uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)"); /// For write operation, we can't send a file in batches. Because putContent interface of fds /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. - /// And sst maximum file size is about 64MB, so burst size must be greater than 64MB. - uint32_t burst_size = std::max(3 * write_rate_limit * 1e6 / BYTE_TO_BIT, 70e6); + /// Here we set burst_size = sst_file_size + 3MB, a litter greater than sst_file_size + uint32_t burst_size = std::max(3 * write_rate_limit * 1e6 / BYTE_TO_BIT, sst_file_size + 3e6); _write_token_bucket.reset( new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); From ca107b5c4a81d9f9a2d778a691168c428c2339f7 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Tue, 28 Apr 2020 10:59:54 +0800 Subject: [PATCH 40/49] fix --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index acf3017c3a..25d452054c 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -113,7 +113,7 @@ fds_service::fds_service() /// For write operation, we can't send a file in batches. Because putContent interface of fds /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. - /// Here we set burst_size = sst_file_size + 3MB, a litter greater than sst_file_size + /// Here we set burst_size = sst_file_size + 3MB, a litte greater than sst_file_size uint32_t burst_size = std::max(3 * write_rate_limit * 1e6 / BYTE_TO_BIT, sst_file_size + 3e6); _write_token_bucket.reset( new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); From 44b73eab10aea6b134b6c26f6de731b8b91bf4a2 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Tue, 28 Apr 2020 17:57:34 +0800 Subject: [PATCH 41/49] fix --- src/dist/block_service/fds/fds_service.cpp | 31 ++++++++++------------ 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 25d452054c..e6660724b4 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -519,23 +519,20 @@ error_code fds_file_object::get_file_meta() // get file length auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY); - dassert(iter != meta.end(), - "can't find %s in object(%s)'s metadata", - fds_service::FILE_LENGTH_CUSTOM_KEY.c_str(), - _fds_path.c_str()); + dassert_f(iter != meta.end(), + "can't find {} in object({})'s metadata", + fds_service::FILE_LENGTH_CUSTOM_KEY.c_str(), + _fds_path.c_str()); bool valid = dsn::buf2uint64(iter->second, _size); - dassert(valid && _size >= 0, "error to get file size"); + dassert_f(valid && _size >= 0, "error to get file size"); // get md5 key iter = meta.find(fds_service::FILE_MD5_KEY); - dassert(iter != meta.end(), - "can't find %s in object(%s)'s metadata", - fds_service::FILE_MD5_KEY.c_str(), - _fds_path.c_str()); - - if (iter != meta.end()) { - _md5sum = iter->second; - } + dassert_f(iter != meta.end(), + "can't find {} in object({})'s metadata", + fds_service::FILE_MD5_KEY.c_str(), + _fds_path.c_str()); + _md5sum = iter->second; _has_meta_synced = true; return ERR_OK; @@ -543,10 +540,10 @@ error_code fds_file_object::get_file_meta() if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) { return ERR_OBJECT_NOT_FOUND; } else { - derror("fds getObjectMetadata failed: parameter(%s), code(%d), msg(%s)", - _name.c_str(), - ex.code(), - ex.what()); + derror_f("fds getObjectMetadata failed: parameter({}), code({}), msg({})", + _name.c_str(), + ex.code(), + ex.what()); return ERR_FS_INTERNAL; } } From 043bad8749d72c7e2ec7cb3776d8f7bd5b366cc9 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Tue, 28 Apr 2020 18:27:48 +0800 Subject: [PATCH 42/49] fix --- src/dist/block_service/fds/fds_service.cpp | 28 ++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index e6660724b4..f29957ccef 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -104,17 +104,26 @@ fds_service::fds_service() { const int BYTE_TO_BIT = 8; - uint64_t sst_file_size = dsn_config_get_value_uint64("pegasus.server", - "rocksdb_target_file_size_base", - 64 * 1024 * 1024, - "rocksdb options.target_file_size_base"); + // get max sst file size in normal scenario + uint64_t target_file_size = + dsn_config_get_value_uint64("pegasus.server", + "rocksdb_target_file_size_base", + 64 * 1024 * 1024, + "rocksdb options.target_file_size_base"); + uint64_t write_buffer_size = dsn_config_get_value_uint64("pegasus.server", + "rocksdb_write_buffer_size", + 64 * 1024 * 1024, + "rocksdb options.write_buffer_size"); + uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size); + uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)"); /// For write operation, we can't send a file in batches. Because putContent interface of fds /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. - /// Here we set burst_size = sst_file_size + 3MB, a litte greater than sst_file_size - uint32_t burst_size = std::max(3 * write_rate_limit * 1e6 / BYTE_TO_BIT, sst_file_size + 3e6); + /// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size + uint32_t burst_size = + std::max(3 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6); _write_token_bucket.reset( new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); @@ -643,7 +652,12 @@ error_code fds_file_object::put_content(/*in-out*/ std::istream &is, galaxy::fds::GalaxyFDSClient *c = _service->get_client(); // get tokens from token bucket - _service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes); + if (!_service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes)) { + ddebug_f("the transfer count({}) is greater than burst size({}), so it is rejected by " + "token bucket", + to_transfer_bytes, + _service->_write_token_bucket->burst()); + } try { c->putObject(_service->get_bucket_name(), _fds_path, is, galaxy::fds::FDSObjectMetadata()); From bb44ae46c0acfe69df28faa51157a984d29d4a2d Mon Sep 17 00:00:00 2001 From: levy5307 Date: Tue, 28 Apr 2020 18:34:32 +0800 Subject: [PATCH 43/49] fix --- src/dist/block_service/fds/fds_service.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index f29957ccef..9bbb8c65b7 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -657,6 +657,7 @@ error_code fds_file_object::put_content(/*in-out*/ std::istream &is, "token bucket", to_transfer_bytes, _service->_write_token_bucket->burst()); + return ERR_INJECTED; } try { From ea2d13cb09c6facbf8f3850aec598e1a1d8dcb6d Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 29 Apr 2020 10:16:32 +0800 Subject: [PATCH 44/49] fix --- src/dist/block_service/fds/fds_service.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 9bbb8c65b7..143d378a1e 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace dsn { namespace dist { From 287d79ca6e1e9699ef4dc9ef80469e2bbe3a9505 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 29 Apr 2020 11:05:40 +0800 Subject: [PATCH 45/49] fix --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 143d378a1e..0f34a1d4fe 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -534,7 +534,7 @@ error_code fds_file_object::get_file_meta() fds_service::FILE_LENGTH_CUSTOM_KEY.c_str(), _fds_path.c_str()); bool valid = dsn::buf2uint64(iter->second, _size); - dassert_f(valid && _size >= 0, "error to get file size"); + dassert_f(valid, "error to get file size"); // get md5 key iter = meta.find(fds_service::FILE_MD5_KEY); From c0be4ac4d10fd2743e9c968897a7f508456f721f Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Thu, 30 Apr 2020 09:48:45 +0800 Subject: [PATCH 46/49] fiux --- src/dist/block_service/fds/fds_service.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 0f34a1d4fe..a4f2f096bc 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -105,7 +105,11 @@ fds_service::fds_service() { const int BYTE_TO_BIT = 8; - // get max sst file size in normal scenario + /// In normarl scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25] + /// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25]. + /// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration, + /// we must set max_sst_file_size to 4 * write_buffer_size * [0.75, 1.25], which is too big. + /// So in this implementation, we don't take BULK_LOAD scenario into consideration. uint64_t target_file_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_target_file_size_base", From d8d5e9067879c5ed9f5845d6f6b16a11a5a998e1 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Thu, 30 Apr 2020 16:16:47 +0800 Subject: [PATCH 47/49] fix --- src/dist/block_service/fds/fds_service.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index a4f2f096bc..f4f93a7598 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -105,7 +105,7 @@ fds_service::fds_service() { const int BYTE_TO_BIT = 8; - /// In normarl scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25] + /// In normal scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25] /// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25]. /// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration, /// we must set max_sst_file_size to 4 * write_buffer_size * [0.75, 1.25], which is too big. @@ -662,7 +662,7 @@ error_code fds_file_object::put_content(/*in-out*/ std::istream &is, "token bucket", to_transfer_bytes, _service->_write_token_bucket->burst()); - return ERR_INJECTED; + return ERR_BUSY; } try { From ef9339be74edf1d246970b647477077f197afbbf Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Thu, 7 May 2020 17:43:45 +0800 Subject: [PATCH 48/49] fix --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index f4f93a7598..0a32df5fb0 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -134,7 +134,7 @@ fds_service::fds_service() uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)"); - burst_size = 3 * write_rate_limit * 1e6 / BYTE_TO_BIT; + burst_size = 3 * read_rate_limit * 1e6 / BYTE_TO_BIT; _read_token_bucket.reset( new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); } From 74ad3cbbcf1f8905409cfe358951e16712fc21c9 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Thu, 7 May 2020 18:58:00 +0800 Subject: [PATCH 49/49] fix --- src/dist/block_service/fds/fds_service.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 0a32df5fb0..6c4dfd28ea 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -128,13 +128,13 @@ fds_service::fds_service() /// If file size > burst size, the file will be rejected by the token bucket. /// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size uint32_t burst_size = - std::max(3 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6); + std::max(2 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6); _write_token_bucket.reset( new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64( "replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)"); - burst_size = 3 * read_rate_limit * 1e6 / BYTE_TO_BIT; + burst_size = 2 * read_rate_limit * 1e6 / BYTE_TO_BIT; _read_token_bucket.reset( new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); }