Skip to content

Commit

Permalink
Merge #55904
Browse files Browse the repository at this point in the history
55904: kvserver: replace rate.RateLimiter with quotapool.RateLimiter in consistency_queue r=lunevalex a=lunevalex

Fixes #55652

When running a large restore job, we discovered high CPU usage, which was attributed to the new rate limiter in the consistency_queue introduced in #49763. During the checksum computation the rate limiter ends up doing a lot of nanosecond waits, due to the typically small size of individual key/value pairs. This is inefficient by itself, but since the rate.RateLimiter creates a new Timer for every wait, it ends up being extremely inefficient. The quota pool.RateLimiter pools Timers reducing allocation churn and supports a minimum wait parameter, which converts a lot of small waits into a few larger waits during checksum calculations.

Release note: None

Co-authored-by: Alex Lunev <[email protected]>
  • Loading branch information
craig[bot] and lunevalex committed Oct 28, 2020
2 parents f804353 + 6735856 commit 157b6dc
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 125 deletions.
1 change: 0 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ var consistencyCheckRate = settings.RegisterPublicValidatedByteSizeSetting(
validatePositive,
)

// consistencyCheckRateBurstFactor we use this to set the burst parameter on the
// quotapool.RateLimiter. It seems overkill to provide a user setting for this,
// so we use a factor to scale the burst setting based on the rate defined above.
const consistencyCheckRateBurstFactor = 8

// consistencyCheckRateMinWait is the minimum time to wait once the rate limit
// is reached. We check the limit on every key/value pair, which can lead to
// a lot of nano-second waits because each pair could be very small. Instead we
// force a larger pause every time the timer is breached to reduce the
// churn on timers.
const consistencyCheckRateMinWait = 100 * time.Millisecond

var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false)

type consistencyQueue struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -570,7 +570,7 @@ func (r *Replica) sha512(
snap storage.Reader,
snapshot *roachpb.RaftSnapshotData,
mode roachpb.ChecksumMode,
limiter *limit.LimiterBurstDisabled,
limiter *quotapool.RateLimiter,
) (*replicaHash, error) {
statsOnly := mode == roachpb.ChecksumMode_CHECK_STATS

Expand All @@ -586,7 +586,7 @@ func (r *Replica) sha512(

visitor := func(unsafeKey storage.MVCCKey, unsafeValue []byte) error {
// Rate Limit the scan through the range
if err := limiter.WaitN(ctx, len(unsafeKey.Key)+len(unsafeValue)); err != nil {
if err := limiter.WaitN(ctx, int64(len(unsafeKey.Key)+len(unsafeValue))); err != nil {
return err
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
Expand Down Expand Up @@ -228,8 +227,6 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co
}
}

limiter := limit.NewLimiter(rate.Limit(consistencyCheckRate.Get(&r.store.ClusterSettings().SV)))

// Compute SHA asynchronously and store it in a map by UUID.
if err := stopper.RunAsyncTask(ctx, "storage.Replica: computing checksum", func(ctx context.Context) {
func() {
Expand All @@ -239,7 +236,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co
snapshot = &roachpb.RaftSnapshotData{}
}

result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, limiter)
result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter)
if err != nil {
log.Errorf(ctx, "%v", err)
result = nil
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -75,7 +75,6 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
"golang.org/x/net/trace"
"golang.org/x/time/rate"
)

// allSpans is a SpanSet that covers *everything* for use in tests that don't
Expand Down Expand Up @@ -10007,7 +10006,9 @@ func TestReplicaServersideRefreshes(t *testing.T) {
// Regression test for #31870.
snap := tc.engine.NewSnapshot()
defer snap.Close()
res, err := tc.repl.sha512(context.Background(), *tc.repl.Desc(), tc.engine, nil /* diff */, roachpb.ChecksumMode_CHECK_FULL, limit.NewLimiter(rate.Inf))
res, err := tc.repl.sha512(context.Background(), *tc.repl.Desc(), tc.engine,
nil /* diff */, roachpb.ChecksumMode_CHECK_FULL,
quotapool.NewRateLimiter("ConsistencyQueue", quotapool.Limit(math.MaxFloat64), math.MaxInt64))
if err != nil {
return hlc.Timestamp{}, err
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ type Store struct {
tsMaintenanceQueue *timeSeriesMaintenanceQueue // Time series maintenance queue
scanner *replicaScanner // Replica scanner
consistencyQueue *consistencyQueue // Replica consistency check queue
consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks
metrics *StoreMetrics
intentResolver *intentresolver.IntentResolver
recoveryMgr txnrecovery.Manager
Expand Down Expand Up @@ -1576,6 +1577,17 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.storeRebalancer.Start(ctx, s.stopper)
}

s.consistencyLimiter = quotapool.NewRateLimiter(
"ConsistencyQueue",
quotapool.Limit(consistencyCheckRate.Get(&s.ClusterSettings().SV)),
consistencyCheckRate.Get(&s.ClusterSettings().SV)*consistencyCheckRateBurstFactor,
quotapool.WithMinimumWait(consistencyCheckRateMinWait))

consistencyCheckRate.SetOnChange(&s.ClusterSettings().SV, func() {
rate := consistencyCheckRate.Get(&s.ClusterSettings().SV)
s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor)
})

// Storing suggested compactions in the store itself was deprecated with
// the removal of the Compactor in 21.1. See discussion in
// https://github.com/cockroachdb/cockroach/pull/55893
Expand Down
13 changes: 2 additions & 11 deletions pkg/util/limit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,22 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "limit",
srcs = [
"limiter.go",
"rate_limiter.go",
],
srcs = ["limiter.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/limit",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/tracing",
"//vendor/github.com/marusama/semaphore",
"//vendor/golang.org/x/time/rate",
],
)

go_test(
name = "limit_test",
srcs = [
"limiter_test.go",
"rate_limiter_test.go",
],
srcs = ["limiter_test.go"],
embed = [":limit"],
deps = [
"//pkg/util/leaktest",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/stretchr/testify/require",
"//vendor/golang.org/x/sync/errgroup",
"//vendor/golang.org/x/time/rate",
],
)
72 changes: 0 additions & 72 deletions pkg/util/limit/rate_limiter.go

This file was deleted.

31 changes: 0 additions & 31 deletions pkg/util/limit/rate_limiter_test.go

This file was deleted.

0 comments on commit 157b6dc

Please sign in to comment.