Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quotapool,kvserver: extend quotapool.RateLimit, rate limit queue addition in SystemConfigUpdate #53605

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ var concurrentRangefeedItersLimit = settings.RegisterPositiveIntSetting(
64,
)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas.
var queueAdditionOnSystemConfigUpdateRate = settings.RegisterNonNegativeFloatSetting(
"kv.store.system_config_update.queue_add_rate",
"the rate (per second) at which the store will add all replicas to the split and merge queue due to system config gossip",
.5)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas. The default is relatively high to deal with startup
// scenarios.
var queueAdditionOnSystemConfigUpdateBurst = settings.RegisterNonNegativeIntSetting(
"kv.store.system_config_update.queue_add_burst",
"the burst rate at which the store will add all replicas to the split and merge queue due to system config gossip",
32)

// raftLeadershipTransferTimeout limits the amount of time a drain command
// waits for lease transfers.
var raftLeadershipTransferWait = func() *settings.DurationSetting {
Expand Down Expand Up @@ -610,7 +625,8 @@ type Store struct {
// tenantRateLimiters manages tenantrate.Limiters
tenantRateLimiters *tenantrate.LimiterFactory

computeInitialMetrics sync.Once
computeInitialMetrics sync.Once
systemConfigUpdateQueueRateLimiter *quotapool.RateLimiter
}

var _ kv.Sender = &Store{}
Expand Down Expand Up @@ -905,6 +921,20 @@ func NewStore(
s.tenantRateLimiters = tenantrate.NewLimiterFactory(cfg.Settings, &cfg.TestingKnobs.TenantRateKnobs)
s.metrics.registry.AddMetricStruct(s.tenantRateLimiters.Metrics())

s.systemConfigUpdateQueueRateLimiter = quotapool.NewRateLimiter(
"SystemConfigUpdateQueue",
quotapool.Limit(queueAdditionOnSystemConfigUpdateRate.Get(&cfg.Settings.SV)),
queueAdditionOnSystemConfigUpdateBurst.Get(&cfg.Settings.SV))
updateSystemConfigUpdateQueueLimits := func() {
s.systemConfigUpdateQueueRateLimiter.UpdateLimit(
quotapool.Limit(queueAdditionOnSystemConfigUpdateRate.Get(&cfg.Settings.SV)),
queueAdditionOnSystemConfigUpdateBurst.Get(&cfg.Settings.SV))
}
queueAdditionOnSystemConfigUpdateRate.SetOnChange(&cfg.Settings.SV,
updateSystemConfigUpdateQueueLimits)
queueAdditionOnSystemConfigUpdateBurst.SetOnChange(&cfg.Settings.SV,
updateSystemConfigUpdateQueueLimits)

if s.cfg.Gossip != nil {
// Add range scanner and configure with queues.
s.scanner = newReplicaScanner(
Expand Down Expand Up @@ -1826,6 +1856,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
// For every range, update its zone config and check if it needs to
// be split or merged.
now := s.cfg.Clock.Now()
shouldQueue := s.systemConfigUpdateQueueRateLimiter.AdmitN(1)
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
key := repl.Desc().StartKey
zone, err := sysCfg.GetZoneConfigForKey(key)
Expand All @@ -1836,12 +1867,14 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
zone = s.cfg.DefaultZoneConfig
}
repl.SetZoneConfig(zone)
s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
if shouldQueue {
s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}
return true // more
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// Limit defines a rate in terms of quota per second.
Expand All @@ -26,19 +27,19 @@ type Limit float64
// in the case that they end up not getting used.
type RateLimiter struct {
qp *QuotaPool

// TODO(ajwerner): synchronization around changing limits.
burst int64
rateLimit Limit
}

// NewRateLimiter defines a new RateLimiter. The limiter is implemented as a
// token bucket which has a maximum capacity of burst. If a request attempts to
// acquire more than burst, it will block until the bucket is full and then
// put the token bucket in debt.
func NewRateLimiter(name string, rate Limit, burst int64, options ...Option) *RateLimiter {
rl := &RateLimiter{rateLimit: rate, burst: burst}
rl := &RateLimiter{}
bucket := rateBucket{
limitConfig: limitConfig{
rate: rate,
burst: burst,
},
p: rl,
cur: float64(burst),
lastUpdated: timeutil.Now(),
Expand Down Expand Up @@ -72,24 +73,61 @@ func (rl *RateLimiter) WaitN(ctx context.Context, n int64) error {
return nil
}

// AdmitN acquire n quota from the RateLimiter if it succeeds. It will return
// false and not block if there is currently insufficient quota or the pool is
// closed.
func (rl *RateLimiter) AdmitN(n int64) bool {
r := rl.newRateRequest(n)
defer rl.putRateRequest(r)
return rl.qp.Acquire(context.Background(), (*rateRequestNoWait)(r)) == nil
}

// UpdateLimit updates the rate and burst limits. The change in burst will
// be applied to the current quantity of quota. For example, if the RateLimiter
// currently had a quota of 5 available with a burst of 10 and the burst is
// update to 20, the quota will increase to 15. Similarly, if the burst is
// decreased by 10, the current quota will decrease accordingly, potentially
// putting the limiter into debt.
func (rl *RateLimiter) UpdateLimit(rate Limit, burst int64) {
cfg := limitConfig{rate: rate, burst: burst}
rl.qp.Add(&cfg)
}

// rateBucket is the implementation of Resource which remains in the quotapool
// for a RateLimiter.
type rateBucket struct {
limitConfig
p *RateLimiter
cur float64
lastUpdated time.Time
}

type limitConfig struct {
rate Limit
burst int64
}

var _ Resource = (*rateBucket)(nil)

func (i *rateBucket) Merge(val interface{}) (shouldNotify bool) {
v := val.(*rateAlloc)
i.cur += float64(v.alloc)
v.rl.putRateAlloc(v)
if i.cur > float64(i.p.burst) {
i.cur = float64(i.p.burst)
func (r *rateBucket) Merge(v interface{}) (shouldNotify bool) {
switch v := v.(type) {
case *rateAlloc:
r.cur += float64(v.alloc)
v.rl.putRateAlloc(v)
if r.cur > float64(r.burst) {
r.cur = float64(r.burst)
}
return true
case *limitConfig:
shouldNotify = r.burst < v.burst || r.rate < v.rate
burstDelta := v.burst - r.burst
r.limitConfig = *v
r.cur += float64(burstDelta)
r.update(r.p.qp.TimeSource().Now())
return shouldNotify
default:
panic(errors.Errorf("unexpected merge value type %T", v))
}
return true
}

// RateAlloc is an allocated quantity of quota which can be released back into
Expand Down Expand Up @@ -141,27 +179,19 @@ func (i *rateRequest) Acquire(
r := res.(*rateBucket)
now := r.p.qp.timeSource.Now()

// TODO(ajwerner): Consider instituting a minimum update frequency to avoid
// spinning too fast on timers for tons of tiny allocations at a fast rate.
if since := now.Sub(r.lastUpdated); since > 0 {
r.cur += float64(r.p.rateLimit) * since.Seconds()
if r.cur > float64(r.p.burst) {
r.cur = float64(r.p.burst)
}
r.lastUpdated = now
}
r.update(now)

// Deal with the case where the allocation is larger than the burst size.
// In this case we'll allow the acquisition to complete if the current value
// is equal to the burst. If the acquisition succeeds, it will put the limiter
// into debt.
want := float64(i.want)
if i.want > r.p.burst {
want = float64(r.p.burst)
if i.want > r.burst {
want = float64(r.burst)
}
if delta := want - r.cur; delta > 0 {
// Compute the time it will take for r.cur to get to the needed capacity.
timeDelta := time.Duration((delta * float64(time.Second)) / float64(r.p.rateLimit))
timeDelta := time.Duration((delta * float64(time.Second)) / float64(r.rate))

// Deal with the exceedingly edge case that timeDelta, as a floating point
// number, is less than 1ns by returning 1ns and looping back around.
Expand All @@ -175,6 +205,18 @@ func (i *rateRequest) Acquire(
return true, 0
}

func (r *rateBucket) update(now time.Time) {
// TODO(ajwerner): Consider instituting a minimum update frequency to avoid
// spinning too fast on timers for tons of tiny allocations at a fast rate.
if since := now.Sub(r.lastUpdated); since > 0 {
r.cur += float64(r.rate) * since.Seconds()
if r.cur > float64(r.burst) {
r.cur = float64(r.burst)
}
r.lastUpdated = now
}
}

func (i *rateRequest) ShouldWait() bool {
return true
}
Expand All @@ -193,3 +235,19 @@ func (rl *RateLimiter) putRateAlloc(a *rateAlloc) {
*a = rateAlloc{}
rateAllocSyncPool.Put(a)
}

// rateRequestNoWait is like a rate request but will not block waiting for
// quota.
type rateRequestNoWait rateRequest

func (r *rateRequestNoWait) Acquire(
ctx context.Context, resource Resource,
) (fulfilled bool, tryAgainAfter time.Duration) {
return (*rateRequest)(r).Acquire(ctx, resource)
}

func (r *rateRequestNoWait) ShouldWait() bool {
return false
}

var _ Request = (*rateRequestNoWait)(nil)
52 changes: 52 additions & 0 deletions pkg/util/quotapool/int_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,58 @@ func TestRateLimiterBasic(t *testing.T) {
mt.Advance(2 * time.Millisecond)
<-done
}
{
// Fill the bucket all the way up.
mt.Advance(2 * time.Second)

// Consume some. There should be 10 in the bucket.
go doWait(10)
<-done

// THis should need to wait one second for the bucket to fill up.
go doWait(30)
ensureNotDone()

// Adjust the rate and the burst down. This should move the current
// capacity down to 0 and lower the burst. It will now take 10 seconds
// before the bucket is full.
rl.UpdateLimit(1, 10)
ensureNotDone()
mt.Advance(9 * time.Second)
ensureNotDone()
mt.Advance(time.Second)
<-done

// At this point, the limiter should be 20 in debt so it should take
// 20s before the current goroutine is unblocked.
go doWait(2)
ensureNotDone()

// Adjust the rate and burst up. The burst delta is 10, so the debt should
// reduce to 10 and the rate is doubled. In 6 seconds the goroutine should
// unblock.
rl.UpdateLimit(2, 20)
mt.Advance(5 * time.Second)
ensureNotDone()
mt.Advance(time.Second)
<-done

// Set the limit and burst back to the default values.
rl.UpdateLimit(10, 20)
}
{
// Fill the bucket all the way up.
mt.Advance(2 * time.Second)

// Consume some. There should be 10 in the bucket.
go doWait(10)
<-done

require.False(t, rl.AdmitN(11))
require.True(t, rl.AdmitN(9)) // 1 left
require.False(t, rl.AdmitN(2))
require.True(t, rl.AdmitN(1)) // 0 left
}
}

// TestRateLimitWithVerySmallDelta ensures that in cases where the delta is
Expand Down