diff --git a/pkg/kv/kvserver/api.proto b/pkg/kv/kvserver/api.proto index 9c50cd5ce62c..18453f87763c 100644 --- a/pkg/kv/kvserver/api.proto +++ b/pkg/kv/kvserver/api.proto @@ -36,15 +36,18 @@ message CollectChecksumRequest { bytes checksum_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ChecksumID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; - bytes checksum = 4; + reserved 4; + // If true then the response must include the snapshot of the data from which + // the checksum is computed. + bool with_snapshot = 5; } message CollectChecksumResponse { // The checksum is the sha512 hash of the requested computation. It is empty // if the computation failed. bytes checksum = 1; - // snapshot is set if the roachpb.ComputeChecksumRequest had snapshot = true - // and the response checksum is different from the request checksum. + // snapshot is set if the with_snapshot in CollectChecksumRequest is true. For + // example, it can be set by the caller when it has detected an inconsistency. // // TODO(tschottdorf): with larger ranges, this is no longer tenable. // See https://github.com/cockroachdb/cockroach/issues/21128. diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 771966df1a11..0a40db7aa21c 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -66,8 +66,9 @@ const consistencyCheckRateMinWait = 100 * time.Millisecond // replication factor of 7 could run 7 concurrent checks on every node. // // Note that checksum calculations below Raft are not tied to the caller's -// context (especially on followers), and will continue to run even after the -// caller has given up on them, which may cause them to build up. +// context, and may continue to run even after the caller has given up on them, +// which may cause them to build up. Although we do best effort to cancel the +// running task on the receiving end when the incoming request is aborted. // // CHECK_STATS checks do not count towards this limit, as they are cheap and the // DistSender will parallelize them across all ranges (notably when calling @@ -76,8 +77,8 @@ const consistencyCheckAsyncConcurrency = 7 // consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous // consistency check calculations. These are not tied to the caller's context, -// and thus will continue to run even after the caller has given up on them, so -// we give them an upper timeout to prevent them from running forever. +// and thus may continue to run even if the caller has given up on them, so we +// give them an upper timeout to prevent them from running forever. const consistencyCheckAsyncTimeout = time.Hour var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index b840fd16d5ba..06b65bf23497 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -552,7 +552,7 @@ type Replica struct { lastUpdateTimes lastUpdateTimesMap // Computed checksum at a snapshot UUID. - checksums map[uuid.UUID]replicaChecksum + checksums map[uuid.UUID]*replicaChecksum // proposalQuota is the quota pool maintained by the lease holder where // incoming writes acquire quota from a fixed quota pool before going diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 5707d7b15173..9c3baccfa656 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" ) // replica_application_*.go files provide concrete implementations of @@ -334,7 +336,11 @@ func (r *Replica) handleVersionResult(ctx context.Context, version *roachpb.Vers } func (r *Replica) handleComputeChecksumResult(ctx context.Context, cc *kvserverpb.ComputeChecksum) { - r.computeChecksumPostApply(ctx, *cc) + err := r.computeChecksumPostApply(ctx, *cc) + // Don't log errors caused by the store quiescing, they are expected. + if err != nil && !errors.Is(err, stop.ErrUnavailable) { + log.Errorf(ctx, "failed to start ComputeChecksum task %s: %v", cc.ChecksumID, err) + } } func (r *Replica) handleChangeReplicasResult( diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 5b9d0f3c4004..f442b1e680bb 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -15,7 +15,6 @@ import ( "crypto/sha512" "encoding/binary" "fmt" - "sort" "sync" "time" @@ -47,10 +46,15 @@ import ( // How long to keep consistency checker checksums in-memory for collection. // Typically a long-poll waits for the result of the computation, so it's almost -// immediately collected. However, the consistency checker synchronously -// collects the first replica's checksum before all others, so if the first one -// is slow the checksum may not be collected right away, and that first -// consistency check can take a long time due to rate limiting and range size. +// immediately collected. +// +// Up to 22.1, the consistency check initiator used to synchronously collect the +// first replica's checksum before all others, so checksum collection requests +// could arrive late if the first one was slow. Since 22.2, all requests are +// parallel and likely arrive quickly. +// +// TODO(pavelkalinnikov): Consider removing GC behaviour in 23.1+, when all the +// incoming requests are from 22.2+ nodes (hence arrive timely). const replicaChecksumGCInterval = time.Hour // fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A @@ -69,25 +73,28 @@ var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTEN // replicaChecksum contains progress on a replica checksum computation. type replicaChecksum struct { - CollectChecksumResponse - // started is true if the checksum computation has started. - started bool - // If gcTimestamp is nonzero, GC this checksum after gcTimestamp. gcTimestamp - // is zero if and only if the checksum computation is in progress. + // started is closed when the checksum computation has started. If the start + // was successful, passes a function that can be used by the receiver to stop + // the computation, otherwise is closed immediately. + started chan context.CancelFunc + // result passes a single checksum computation result from the task. + // INVARIANT: result is written to or closed only if started is closed. + result chan CollectChecksumResponse + // A non-zero gcTimestamp means this tracker is "inactive", i.e. either the + // computation task completed/failed, or the checksum collection request + // returned. A tracker is deleted from the state when both participants have + // learnt about it, or gcTimestamp passes, whichever happens first. gcTimestamp time.Time - // This channel is closed after the checksum is computed, and is used - // as a notification. - notify chan struct{} } // CheckConsistency runs a consistency check on the range. It first applies a // ComputeChecksum through Raft and then issues CollectChecksum commands to the // other replicas. These are inspected and a CheckConsistencyResponse is assembled. // -// When args.Mode is CHECK_VIA_QUEUE and an inconsistency is detected and no -// diff was requested, the consistency check will be re-run to collect a diff, -// which is then printed before calling `log.Fatal`. This behavior should be -// lifted to the consistency checker queue in the future. +// When req.Mode is CHECK_VIA_QUEUE and an inconsistency is detected, the +// consistency check will be re-run to collect a diff, which is then printed +// before calling `log.Fatal`. This behavior should be lifted to the consistency +// checker queue in the future. func (r *Replica) CheckConsistency( ctx context.Context, req roachpb.CheckConsistencyRequest, ) (roachpb.CheckConsistencyResponse, *roachpb.Error) { @@ -325,7 +332,7 @@ type ConsistencyCheckResult struct { } func (r *Replica) collectChecksumFromReplica( - ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, checksum []byte, + ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, withSnap bool, ) (CollectChecksumResponse, error) { conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass) if err != nil { @@ -337,7 +344,7 @@ func (r *Replica) collectChecksumFromReplica( StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID}, RangeID: r.RangeID, ChecksumID: id, - Checksum: checksum, + WithSnapshot: withSnap, } resp, err := client.CollectChecksum(ctx, req) if err != nil { @@ -361,70 +368,61 @@ func (r *Replica) runConsistencyCheck( } ccRes := res.(*roachpb.ComputeChecksumResponse) - var orderedReplicas []roachpb.ReplicaDescriptor - { - desc := r.Desc() - localReplica, err := r.GetReplicaDescriptor() - if err != nil { - return nil, errors.Wrap(err, "could not get replica descriptor") - } - - // Move the local replica to the front (which makes it the "master" - // we're comparing against). - orderedReplicas = append(orderedReplicas, desc.Replicas().Descriptors()...) - - sort.Slice(orderedReplicas, func(i, j int) bool { - return orderedReplicas[i] == localReplica - }) + replSet := r.Desc().Replicas() + localReplica, found := replSet.GetReplicaDescriptorByID(r.replicaID) + if !found { + return nil, errors.New("could not get local replica descriptor") } + replicas := replSet.Descriptors() + + resultCh := make(chan ConsistencyCheckResult, len(replicas)) + results := make([]ConsistencyCheckResult, 0, len(replicas)) - resultCh := make(chan ConsistencyCheckResult, len(orderedReplicas)) - var results []ConsistencyCheckResult var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) - for _, replica := range orderedReplicas { + defer close(resultCh) // close the channel when + defer wg.Wait() // writers have terminated + defer cancel() // but cancel them first + // P.S. Have you noticed the Haiku? + + for _, replica := range replicas { wg.Add(1) replica := replica // per-iteration copy for the goroutine if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency", func(ctx context.Context) { defer wg.Done() - - var masterChecksum []byte - if len(results) > 0 { - masterChecksum = results[0].Response.Checksum - } - resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum) + resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, req.Snapshot) resultCh <- ConsistencyCheckResult{ Replica: replica, Response: resp, Err: err, } - }); err != nil { + }, + ); err != nil { + // If we can't start tasks, the node is likely draining. Return the error + // verbatim, after all the started tasks are stopped. wg.Done() - // If we can't start tasks, the node is likely draining. Just return the error verbatim. return nil, err } + } - // Collect the master result eagerly so that we can send a SHA in the - // remaining requests (this is used for logging inconsistencies on the - // remote nodes only). - if len(results) == 0 { - wg.Wait() - result := <-resultCh + // Collect the results from all replicas, while the tasks are running. + for result := range resultCh { + results = append(results, result) + if result.Replica.IsSame(localReplica) { + // If we can't compute the local checksum, give up. This will cancel all + // the outstanding requests, and wait for the tasks above to terminate. if err := result.Err; err != nil { - // If we can't compute the local checksum, give up. return nil, errors.Wrap(err, "computing own checksum") } - results = append(results, result) + // Put the local replica first in the list. + results[0], results[len(results)-1] = results[len(results)-1], results[0] + } + // If it was the last request, don't wait on the channel anymore. + if len(results) == len(replicas) { + break } - } - - wg.Wait() - close(resultCh) - - // Collect the remaining results. - for result := range resultCh { - results = append(results, result) } return results, nil @@ -439,137 +437,122 @@ func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) { } } -// getChecksum waits for the result of ComputeChecksum and returns it. -// It returns false if there is no checksum being computed for the id, -// or it has already been GCed. -func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (replicaChecksum, error) { - now := timeutil.Now() +// getReplicaChecksum returns replicaChecksum tracker for the given ID, and +// whether it is still active (i.e. has a zero GC timestamp). +func (r *Replica) getReplicaChecksum(id uuid.UUID, now time.Time) (*replicaChecksum, bool) { r.mu.Lock() + defer r.mu.Unlock() r.gcOldChecksumEntriesLocked(now) - c, ok := r.mu.checksums[id] - if !ok { - // TODO(tbg): we need to unconditionally set a gcTimestamp or this - // request can simply get stuck forever or cancel anyway and leak an - // entry in r.mu.checksums. - if d, dOk := ctx.Deadline(); dOk { - c.gcTimestamp = d - } - c.notify = make(chan struct{}) - r.mu.checksums[id] = c - } - r.mu.Unlock() - - // Wait for the checksum to compute or at least to start. - computed, err := r.checksumInitialWait(ctx, id, c.notify) - if err != nil { - return replicaChecksum{}, err - } - // If the checksum started, but has not completed commit - // to waiting the full deadline. - if !computed { - _, err = r.checksumWait(ctx, id, c.notify, nil) - if err != nil { - return replicaChecksum{}, err + c := r.mu.checksums[id] + if c == nil { + c = &replicaChecksum{ + started: make(chan context.CancelFunc, 1), // allow an async send + result: make(chan CollectChecksumResponse, 1), // allow an async send } + r.mu.checksums[id] = c } + return c, c.gcTimestamp.IsZero() +} - if log.V(1) { - log.Infof(ctx, "waited for compute checksum for %s", timeutil.Since(now)) - } - r.mu.RLock() - c, ok = r.mu.checksums[id] - r.mu.RUnlock() - // If the checksum wasn't found or the checksum could not be computed, error out. - // The latter case can occur when there's a version mismatch or, more generally, - // when the (async) checksum computation fails. - if !ok || c.Checksum == nil { - return replicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id) +// gcReplicaChecksum schedules GC to remove the given replicaChecksum from the +// state after replicaChecksumGCInterval passes from now, or removes immediately +// if it is no longer active. +// +// Each user of replicaChecksum (at most two during its lifetime: sender and +// receiver; in any order) must call this method exactly once when they finish +// working on this tracker. +// +// The guarantee: both parties see the same tracker iff neither of them arrives +// at it (by calling getReplicaChecksum) later than GC timeout past the moment +// when the other left it (by calling gcReplicaChecksum). +func (r *Replica) gcReplicaChecksum(id uuid.UUID, rc *replicaChecksum) { + // TODO(pavelkalinnikov): Avoid locking, use atomics. + r.mu.Lock() + defer r.mu.Unlock() + // If the tracker is inactive (GC is already scheduled) then the counterparty + // has abandoned this tracker, and will not come back to it. Remove it then. + if !rc.gcTimestamp.IsZero() { + delete(r.mu.checksums, id) + } else { // otherwise give the counterparty some time to see this tracker + rc.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval) } - return c, nil } -// Waits for the checksum to be available or for the checksum to start computing. -// If we waited for 10% of the deadline and it has not started, then it's -// unlikely to start because this replica is most likely being restored from -// snapshots. -func (r *Replica) checksumInitialWait( - ctx context.Context, id uuid.UUID, notify chan struct{}, -) (bool, error) { - d, dOk := ctx.Deadline() - // The max wait time should be 5 seconds, so we dont end up waiting for - // minutes for a huge range. - maxInitialWait := 5 * time.Second - var initialWait <-chan time.Time - if dOk { - duration := time.Duration(timeutil.Until(d).Nanoseconds() / 10) - if duration > maxInitialWait { - duration = maxInitialWait - } - initialWait = time.After(duration) - } else { - initialWait = time.After(maxInitialWait) +// getChecksum waits for the result of ComputeChecksum and returns it. Returns +// an error if there is no checksum being computed for the ID, it has already +// been GC-ed, or an error happened during the computation. +func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (CollectChecksumResponse, error) { + now := timeutil.Now() + c, _ := r.getReplicaChecksum(id, now) + defer r.gcReplicaChecksum(id, c) + + // Wait for the checksum computation to start. + var taskCancel context.CancelFunc + select { + case <-ctx.Done(): + return CollectChecksumResponse{}, + errors.Wrapf(ctx.Err(), "while waiting for compute checksum (ID = %s)", id) + case <-time.After(r.checksumInitialWait(ctx)): + return CollectChecksumResponse{}, + errors.Errorf("checksum computation did not start in time for (ID = %s)", id) + case taskCancel = <-c.started: + // Happy case, the computation has started. + } + if taskCancel == nil { // but it may have started with an error + return CollectChecksumResponse{}, errors.Errorf("checksum task failed to start (ID = %s)", id) } - return r.checksumWait(ctx, id, notify, initialWait) -} -// checksumWait waits for the checksum to be available or for the computation -// to start within the initialWait time. The bool return flag is used to -// indicate if a checksum is available (true) or if the initial wait has expired -// and the caller should wait more, since the checksum computation started. -func (r *Replica) checksumWait( - ctx context.Context, id uuid.UUID, notify chan struct{}, initialWait <-chan time.Time, -) (bool, error) { - // Wait + // Wait for the computation result. select { - case <-r.store.Stopper().ShouldQuiesce(): - return false, - errors.Errorf("store quiescing while waiting for compute checksum (ID = %s)", id) case <-ctx.Done(): - return false, + taskCancel() + return CollectChecksumResponse{}, errors.Wrapf(ctx.Err(), "while waiting for compute checksum (ID = %s)", id) - case <-initialWait: - { - r.mu.Lock() - started := r.mu.checksums[id].started - r.mu.Unlock() - if !started { - return false, - errors.Errorf("checksum computation did not start in time for (ID = %s)", id) - } - return false, nil + case c, ok := <-c.result: + if log.V(1) { + log.Infof(ctx, "waited for compute checksum for %s", timeutil.Since(now)) + } + if !ok || c.Checksum == nil { + return CollectChecksumResponse{}, errors.Errorf("no checksum found (ID = %s)", id) } - case <-notify: - return true, nil + return c, nil } } -// computeChecksumDone adds the computed checksum, sets a deadline for GCing the -// checksum, and sends out a notification. -func (r *Replica) computeChecksumDone( - ctx context.Context, id uuid.UUID, result *replicaHash, snapshot *roachpb.RaftSnapshotData, -) { - r.mu.Lock() - defer r.mu.Unlock() - if c, ok := r.mu.checksums[id]; ok { - if result != nil { - c.Checksum = result.SHA512[:] - - delta := result.PersistedMS - delta.Subtract(result.RecomputedMS) - c.Delta = enginepb.MVCCStatsDelta(delta) - c.Persisted = result.PersistedMS +// checksumInitialWait returns the amount of time to wait until the checksum +// computation has started. It is set to min of 5s and 10% of the remaining time +// in the passed-in context (if it has a deadline). +// +// If it takes longer, chances are that the replica is being restored from +// snapshots, or otherwise too busy to handle this request soon. +func (*Replica) checksumInitialWait(ctx context.Context) time.Duration { + wait := 5 * time.Second + if d, ok := ctx.Deadline(); ok { + if dur := time.Duration(timeutil.Until(d).Nanoseconds() / 10); dur < wait { + wait = dur } - c.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval) - c.Snapshot = snapshot - r.mu.checksums[id] = c - // Notify - close(c.notify) - } else { - // ComputeChecksum adds an entry into the map, and the entry can - // only be GCed once the gcTimestamp is set above. Something - // really bad happened. - log.Errorf(ctx, "no map entry for checksum (ID = %s)", id) } + return wait +} + +// computeChecksumDone sends the checksum computation result to the receiver. +func (*Replica) computeChecksumDone( + rc *replicaChecksum, result *replicaHash, snapshot *roachpb.RaftSnapshotData, +) { + c := CollectChecksumResponse{Snapshot: snapshot} + if result != nil { + c.Checksum = result.SHA512[:] + delta := result.PersistedMS + delta.Subtract(result.RecomputedMS) + c.Delta = enginepb.MVCCStatsDelta(delta) + c.Persisted = result.PersistedMS + } + + // Sending succeeds because the channel is buffered, and there is at most one + // computeChecksumDone per replicaChecksum. In case of a bug, another writer + // closes the channel, so this send panics instead of deadlocking. By design. + rc.result <- c + close(rc.result) } type replicaHash struct { @@ -747,36 +730,28 @@ func (*Replica) sha512( return &result, nil } -func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.ComputeChecksum) { - stopper := r.store.Stopper() - now := timeutil.Now() - r.mu.Lock() - var notify chan struct{} - if c, ok := r.mu.checksums[cc.ChecksumID]; !ok { - // There is no record of this ID. Make a new notification. - notify = make(chan struct{}) - } else if !c.started { - // A CollectChecksumRequest is waiting on the existing notification. - notify = c.notify - } else { - log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s", - cc.ChecksumID) +func (r *Replica) computeChecksumPostApply( + ctx context.Context, cc kvserverpb.ComputeChecksum, +) (err error) { + // Note: all exit paths must call gcReplicaChecksum. + c, active := r.getReplicaChecksum(cc.ChecksumID, timeutil.Now()) + defer func() { + if err != nil { + close(c.started) // send nothing to signal that the task failed to start + r.gcReplicaChecksum(cc.ChecksumID, c) + } + }() + if !active { + return errors.New("checksum collection request gave up") } - - r.gcOldChecksumEntriesLocked(now) - - // Create an entry with checksum == nil and gcTimestamp unset. - r.mu.checksums[cc.ChecksumID] = replicaChecksum{started: true, notify: notify} - desc := *r.mu.state.Desc - r.mu.Unlock() - - if cc.Version != batcheval.ReplicaChecksumVersion { - r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) - log.Infof(ctx, "incompatible ComputeChecksum versions (requested: %d, have: %d)", - cc.Version, batcheval.ReplicaChecksumVersion) - return + if req, have := cc.Version, uint32(batcheval.ReplicaChecksumVersion); req != have { + return errors.Errorf("incompatible versions (requested: %d, have: %d)", req, have) } + // Capture the current range descriptor, as it may change by the time the + // async task below runs. + desc := *r.Desc() + // Caller is holding raftMu, so an engine snapshot is automatically // Raft-consistent (i.e. not in the middle of an AddSSTable). snap := r.store.engine.NewSnapshot() @@ -801,7 +776,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co // // Don't use the proposal's context for this, as it likely to be canceled very // soon. - const taskName = "storage.Replica: computing checksum" + const taskName = "kvserver.Replica: computing checksum" sem := r.store.consistencySem if cc.Mode == roachpb.ChecksumMode_CHECK_STATS { // Stats-only checks are cheap, and the DistSender parallelizes these across @@ -809,11 +784,20 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co // they don't count towards the semaphore limit. sem = nil } - if err := stopper.RunAsyncTaskEx(r.AnnotateCtx(context.Background()), stop.TaskOpts{ + stopper := r.store.Stopper() + taskCtx, taskCancel := stopper.WithCancelOnQuiesce(r.AnnotateCtx(context.Background())) + if err := stopper.RunAsyncTaskEx(taskCtx, stop.TaskOpts{ TaskName: taskName, Sem: sem, WaitForSem: false, }, func(ctx context.Context) { + defer taskCancel() + // There is only one writer to c.started (this task), so this doesn't block. + // But if by mistake there is another writer, one of us closes the channel + // eventually, and other send/close ops will crash. This is by design. + c.started <- taskCancel + close(c.started) + if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout, func(ctx context.Context) error { defer snap.Close() @@ -826,7 +810,8 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co if err != nil { result = nil } - r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) + r.computeChecksumDone(c, result, snapshot) + r.gcReplicaChecksum(cc.ChecksumID, c) return err }, ); err != nil { @@ -878,9 +863,9 @@ A file preventing this node from restarting was placed at: } }); err != nil { - defer snap.Close() - log.Errorf(ctx, "could not run async checksum computation (ID = %s): %v", cc.ChecksumID, err) - // Set checksum to nil. - r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) + taskCancel() + snap.Close() + return err } + return nil } diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 1d91a6321040..638e5b2bd1e9 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -53,14 +54,14 @@ func TestReplicaChecksumVersion(t *testing.T) { } else { cc.Version = 1 } - tc.repl.computeChecksumPostApply(ctx, cc) + taskErr := tc.repl.computeChecksumPostApply(ctx, cc) rc, err := tc.repl.getChecksum(ctx, cc.ChecksumID) if !matchingVersion { - if !testutils.IsError(err, "no checksum found") { - t.Fatal(err) - } + require.ErrorContains(t, taskErr, "incompatible versions") + require.ErrorContains(t, err, "checksum task failed to start") require.Nil(t, rc.Checksum) } else { + require.NoError(t, taskErr) require.NoError(t, err) require.NotNil(t, rc.Checksum) } @@ -78,52 +79,55 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { defer stopper.Stop(ctx) tc.Start(ctx, t, stopper) - id := uuid.FastMakeV4() - notify := make(chan struct{}) - close(notify) + requireChecksumTaskNotStarted := func(id uuid.UUID) { + require.ErrorContains(t, + tc.repl.computeChecksumPostApply(context.Background(), kvserverpb.ComputeChecksum{ + ChecksumID: id, + Mode: roachpb.ChecksumMode_CHECK_FULL, + Version: batcheval.ReplicaChecksumVersion, + }), "checksum collection request gave up") + } - // Simple condition, the checksum is notified, but not computed. - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: notify} - tc.repl.mu.Unlock() + // Checksum computation failed to start. + id := uuid.FastMakeV4() + c, _ := tc.repl.getReplicaChecksum(id, timeutil.Now()) + close(c.started) rc, err := tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "no checksum found") { - t.Fatal(err) - } + require.ErrorContains(t, err, "checksum task failed to start") require.Nil(t, rc.Checksum) - // Next condition, the initial wait expires and checksum is not started, - // this will take 10ms. + + // Checksum computation started, but failed. id = uuid.FastMakeV4() - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{})} - tc.repl.mu.Unlock() + c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now()) + c.started <- func() {} + close(c.started) + close(c.result) rc, err = tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "checksum computation did not start") { - t.Fatal(err) - } + require.ErrorContains(t, err, "no checksum found") require.Nil(t, rc.Checksum) - // Next condition, initial wait expired and we found the started flag, - // so next step is for context deadline. + + // The initial wait for the task start expires. This will take 10ms. id = uuid.FastMakeV4() - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{}), started: true} - tc.repl.mu.Unlock() rc, err = tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "context deadline exceeded") { - t.Fatal(err) - } + require.ErrorContains(t, err, "checksum computation did not start") require.Nil(t, rc.Checksum) + requireChecksumTaskNotStarted(id) - // Need to reset the context, since we deadlined it above. - ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - // Next condition, node should quiesce. - tc.repl.store.Stopper().Quiesce(ctx) - rc, err = tc.repl.getChecksum(ctx, uuid.FastMakeV4()) - if !testutils.IsError(err, "store quiescing") { - t.Fatal(err) - } + // The computation has started, but the request context timed out. + id = uuid.FastMakeV4() + c, _ = tc.repl.getReplicaChecksum(id, timeutil.Now()) + c.started <- func() {} + close(c.started) + rc, err = tc.repl.getChecksum(ctx, id) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, rc.Checksum) + + // Context is canceled during the initial waiting. + id = uuid.FastMakeV4() + rc, err = tc.repl.getChecksum(ctx, id) + require.ErrorIs(t, err, context.DeadlineExceeded) require.Nil(t, rc.Checksum) + requireChecksumTaskNotStarted(id) } // TestReplicaChecksumSHA512 checks that a given dataset produces the expected diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index e6b8c4e48591..c25f4fde2817 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -100,7 +100,7 @@ func newUnloadedReplica( return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) }) r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} - r.mu.checksums = map[uuid.UUID]replicaChecksum{} + r.mu.checksums = map[uuid.UUID]*replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader r.mu.proposalBuf.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot = store.cfg.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index 73fd82dfe8c8..f3154aca6842 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -11,7 +11,6 @@ package kvserver import ( - "bytes" "context" "time" @@ -19,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/redact" ) // Server implements PerReplicaServer. @@ -53,29 +51,20 @@ func (is Server) execStoreCommand( func (is Server) CollectChecksum( ctx context.Context, req *CollectChecksumRequest, ) (*CollectChecksumResponse, error) { - resp := &CollectChecksumResponse{} + var resp *CollectChecksumResponse err := is.execStoreCommand(ctx, req.StoreRequestHeader, func(ctx context.Context, s *Store) error { + ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) + defer cancel() r, err := s.GetReplica(req.RangeID) if err != nil { return err } - c, err := r.getChecksum(ctx, req.ChecksumID) + ccr, err := r.getChecksum(ctx, req.ChecksumID) if err != nil { return err } - ccr := c.CollectChecksumResponse - if !bytes.Equal(req.Checksum, ccr.Checksum) { - // If this check is false, then this request is the replica carrying out - // the consistency check. The message is spurious, but we want to leave the - // snapshot (if present) intact. - if len(req.Checksum) > 0 { - log.Errorf(ctx, "consistency check failed on range r%d: expected checksum %x, got %x", - req.RangeID, redact.Safe(req.Checksum), redact.Safe(ccr.Checksum)) - // Leave resp.Snapshot alone so that the caller will receive what's - // in it (if anything). - } - } else { + if !req.WithSnapshot { ccr.Snapshot = nil } resp = &ccr