From ee6e44a97bec0455bd2f4d277da050aa6bee9d0c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 7 Mar 2022 14:11:03 +0000 Subject: [PATCH] kvserver: limit concurrent consistency checks Consistency checks run asynchronously below Raft on all replicas using a background context, but sharing a common per-store rate limit. It was possible for many concurrent checks to build up over time, which would reduce the rate available to each check, exacerbating the problem and preventing any of them from completing in a reasonable time. This patch adds two new limits for asynchronous consistency checks: * `consistencyCheckAsyncConcurrency`: limits the number of concurrent asynchronous consistency checks to 7 per store. Additional consistency checks will error. * `consistencyCheckAsyncTimeout`: sets an upper timeout of 1 hour for each consistency check, to prevent them from running forever. `CHECK_STATS` checks are exempt from the limit, as these are cheap and the DistSender will run these in parallel across ranges, e.g. via `crdb_internal.check_consistency()`. There are likely better solutions to this problem, but this is a simple backportable stopgap. Release justification: fixes for high-priority or high-severity bugs in existing functionality. Release note (bug fix): Added a limit of 7 concurrent asynchronous consistency checks per store, with an upper timeout of 1 hour. This prevents abandoned consistency checks from building up in some circumstances, which could lead to increasing disk usage as they held onto Pebble snapshots. --- pkg/kv/kvserver/consistency_queue.go | 25 +++++++++ pkg/kv/kvserver/replica_consistency.go | 51 +++++++++++++------ pkg/kv/kvserver/store.go | 4 ++ .../logic_test/builtin_function_notenant | 6 +++ 4 files changed, 71 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index ef463b61af24..5d1a0120e6a0 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -55,6 +55,31 @@ const consistencyCheckRateBurstFactor = 8 // churn on timers. const consistencyCheckRateMinWait = 100 * time.Millisecond +// consistencyCheckAsyncConcurrency is the maximum number of asynchronous +// consistency checks to run concurrently per store below Raft. The +// server.consistency_check.max_rate limit is shared among these, so running too +// many at the same time will cause them to time out. The rate is multiplied by +// 10 (permittedRangeScanSlowdown) to obtain the per-check timeout. 7 gives +// reasonable headroom, and also handles clusters with high replication factor +// and/or many nodes -- recall that each node runs a separate consistency queue +// which can schedule checks on other nodes, e.g. a 7-node cluster with a +// replication factor of 7 could run 7 concurrent checks on every node. +// +// Note that checksum calculations below Raft are not tied to the caller's +// context (especially on followers), and will continue to run even after the +// caller has given up on them, which may cause them to build up. +// +// CHECK_STATS checks do not count towards this limit, as they are cheap and the +// DistSender will parallelize them across all ranges (notably when calling +// crdb_internal.check_consistency()). +const consistencyCheckAsyncConcurrency = 7 + +// consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous +// consistency check calculations. These are not tied to the caller's context, +// and thus will continue to run even after the caller has given up on them, so +// we give them an upper timeout to prevent them from running forever. +const consistencyCheckAsyncTimeout = time.Hour + var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false) type consistencyQueue struct { diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index fd270fe5ef6a..58fb8794839e 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -32,11 +32,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -753,24 +755,43 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co } } - // Compute SHA asynchronously and store it in a map by UUID. + // Compute SHA asynchronously and store it in a map by UUID. Concurrent checks + // share the rate limit in r.store.consistencyLimiter, so we also limit the + // number of concurrent checks via r.store.consistencySem. + // // Don't use the proposal's context for this, as it likely to be canceled very // soon. - if err := stopper.RunAsyncTask(r.AnnotateCtx(context.Background()), "storage.Replica: computing checksum", func(ctx context.Context) { - func() { - defer snap.Close() - var snapshot *roachpb.RaftSnapshotData - if cc.SaveSnapshot { - snapshot = &roachpb.RaftSnapshotData{} - } + const taskName = "storage.Replica: computing checksum" + sem := r.store.consistencySem + if cc.Mode == roachpb.ChecksumMode_CHECK_STATS { + // Stats-only checks are cheap, and the DistSender parallelizes these across + // ranges (in particular when calling crdb_internal.check_consistency()), so + // they don't count towards the semaphore limit. + sem = nil + } + if err := stopper.RunAsyncTaskEx(r.AnnotateCtx(context.Background()), stop.TaskOpts{ + TaskName: taskName, + Sem: sem, + WaitForSem: false, + }, func(ctx context.Context) { + if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout, + func(ctx context.Context) error { + defer snap.Close() + var snapshot *roachpb.RaftSnapshotData + if cc.SaveSnapshot { + snapshot = &roachpb.RaftSnapshotData{} + } - result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) - if err != nil { - log.Errorf(ctx, "%v", err) - result = nil - } - r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) - }() + result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) + if err != nil { + result = nil + } + r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) + return err + }, + ); err != nil { + log.Errorf(ctx, "checksum computation failed: %v", err) + } var shouldFatal bool for _, rDesc := range cc.Terminate { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index a2b70a25daf7..55c9f179cdcd 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -724,6 +724,7 @@ type Store struct { scanner *replicaScanner // Replica scanner consistencyQueue *consistencyQueue // Replica consistency check queue consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks + consistencySem *quotapool.IntPool // Limit concurrent consistency checks metrics *StoreMetrics intentResolver *intentresolver.IntentResolver recoveryMgr txnrecovery.Manager @@ -2025,6 +2026,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { rate := consistencyCheckRate.Get(&s.ClusterSettings().SV) s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor) }) + s.consistencySem = quotapool.NewIntPool("concurrent async consistency checks", + consistencyCheckAsyncConcurrency) + s.stopper.AddCloser(s.consistencySem.Closer("stopper")) // Set the started flag (for unittests). atomic.StoreInt32(&s.started, 1) diff --git a/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant b/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant index f11fb0c8de66..cc31761c66a9 100644 --- a/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant +++ b/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant @@ -47,6 +47,12 @@ SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '\xff', '') ---- true +# Run a full consistency check across all ranges. +query B +SELECT count(*) > 10 FROM crdb_internal.check_consistency(false, '', '') +---- +true + # Test crdb_internal commands which execute as root, but # only checks for permissions afterwards. subtest crdb_internal_privileged_only