diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index ef463b61af24..5d1a0120e6a0 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -55,6 +55,31 @@ const consistencyCheckRateBurstFactor = 8 // churn on timers. const consistencyCheckRateMinWait = 100 * time.Millisecond +// consistencyCheckAsyncConcurrency is the maximum number of asynchronous +// consistency checks to run concurrently per store below Raft. The +// server.consistency_check.max_rate limit is shared among these, so running too +// many at the same time will cause them to time out. The rate is multiplied by +// 10 (permittedRangeScanSlowdown) to obtain the per-check timeout. 7 gives +// reasonable headroom, and also handles clusters with high replication factor +// and/or many nodes -- recall that each node runs a separate consistency queue +// which can schedule checks on other nodes, e.g. a 7-node cluster with a +// replication factor of 7 could run 7 concurrent checks on every node. +// +// Note that checksum calculations below Raft are not tied to the caller's +// context (especially on followers), and will continue to run even after the +// caller has given up on them, which may cause them to build up. +// +// CHECK_STATS checks do not count towards this limit, as they are cheap and the +// DistSender will parallelize them across all ranges (notably when calling +// crdb_internal.check_consistency()). +const consistencyCheckAsyncConcurrency = 7 + +// consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous +// consistency check calculations. These are not tied to the caller's context, +// and thus will continue to run even after the caller has given up on them, so +// we give them an upper timeout to prevent them from running forever. +const consistencyCheckAsyncTimeout = time.Hour + var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false) type consistencyQueue struct { diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index fd270fe5ef6a..58fb8794839e 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -32,11 +32,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -753,24 +755,43 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co } } - // Compute SHA asynchronously and store it in a map by UUID. + // Compute SHA asynchronously and store it in a map by UUID. Concurrent checks + // share the rate limit in r.store.consistencyLimiter, so we also limit the + // number of concurrent checks via r.store.consistencySem. + // // Don't use the proposal's context for this, as it likely to be canceled very // soon. - if err := stopper.RunAsyncTask(r.AnnotateCtx(context.Background()), "storage.Replica: computing checksum", func(ctx context.Context) { - func() { - defer snap.Close() - var snapshot *roachpb.RaftSnapshotData - if cc.SaveSnapshot { - snapshot = &roachpb.RaftSnapshotData{} - } + const taskName = "storage.Replica: computing checksum" + sem := r.store.consistencySem + if cc.Mode == roachpb.ChecksumMode_CHECK_STATS { + // Stats-only checks are cheap, and the DistSender parallelizes these across + // ranges (in particular when calling crdb_internal.check_consistency()), so + // they don't count towards the semaphore limit. + sem = nil + } + if err := stopper.RunAsyncTaskEx(r.AnnotateCtx(context.Background()), stop.TaskOpts{ + TaskName: taskName, + Sem: sem, + WaitForSem: false, + }, func(ctx context.Context) { + if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout, + func(ctx context.Context) error { + defer snap.Close() + var snapshot *roachpb.RaftSnapshotData + if cc.SaveSnapshot { + snapshot = &roachpb.RaftSnapshotData{} + } - result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) - if err != nil { - log.Errorf(ctx, "%v", err) - result = nil - } - r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) - }() + result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) + if err != nil { + result = nil + } + r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) + return err + }, + ); err != nil { + log.Errorf(ctx, "checksum computation failed: %v", err) + } var shouldFatal bool for _, rDesc := range cc.Terminate { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index a2b70a25daf7..55c9f179cdcd 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -724,6 +724,7 @@ type Store struct { scanner *replicaScanner // Replica scanner consistencyQueue *consistencyQueue // Replica consistency check queue consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks + consistencySem *quotapool.IntPool // Limit concurrent consistency checks metrics *StoreMetrics intentResolver *intentresolver.IntentResolver recoveryMgr txnrecovery.Manager @@ -2025,6 +2026,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { rate := consistencyCheckRate.Get(&s.ClusterSettings().SV) s.consistencyLimiter.UpdateLimit(quotapool.Limit(rate), rate*consistencyCheckRateBurstFactor) }) + s.consistencySem = quotapool.NewIntPool("concurrent async consistency checks", + consistencyCheckAsyncConcurrency) + s.stopper.AddCloser(s.consistencySem.Closer("stopper")) // Set the started flag (for unittests). atomic.StoreInt32(&s.started, 1) diff --git a/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant b/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant index f11fb0c8de66..cc31761c66a9 100644 --- a/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant +++ b/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant @@ -47,6 +47,12 @@ SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '\xff', '') ---- true +# Run a full consistency check across all ranges. +query B +SELECT count(*) > 10 FROM crdb_internal.check_consistency(false, '', '') +---- +true + # Test crdb_internal commands which execute as root, but # only checks for permissions afterwards. subtest crdb_internal_privileged_only