diff --git a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go index 0e1f9dbd4adb..563d74cedfba 100644 --- a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go +++ b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go @@ -46,12 +46,9 @@ func declareKeysComputeChecksum( latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())}) } -// Version numbers for Replica checksum computation. Requests silently no-op -// unless the versions are compatible. -const ( - ReplicaChecksumVersion = 4 - ReplicaChecksumGCInterval = time.Hour -) +// ReplicaChecksumVersion versions the checksum computation. Requests silently no-op +// unless the versions between the requesting and requested replica are compatible. +const ReplicaChecksumVersion = 4 // ComputeChecksum starts the process of computing a checksum on the replica at // a particular snapshot. The checksum is later verified through a diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 9577d00d2ffc..d0ddb55950af 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -557,7 +557,7 @@ type Replica struct { lastUpdateTimes lastUpdateTimesMap // Computed checksum at a snapshot UUID. - checksums map[uuid.UUID]ReplicaChecksum + checksums map[uuid.UUID]replicaChecksum // proposalQuota is the quota pool maintained by the lease holder where // incoming writes acquire quota from a fixed quota pool before going diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 004989354102..3fc943c63155 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -19,15 +19,18 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -40,6 +43,14 @@ import ( "github.com/cockroachdb/redact" ) +// How long to keep consistency checker checksums in-memory for collection. +// Typically a long-poll waits for the result of the computation, so it's almost +// immediately collected. However, the consistency checker synchronously +// collects the first replica's checksum before all others, so if the first one +// is slow the checksum may not be collected right away, and that first +// consistency check can take a long time due to rate limiting and range size. +const replicaChecksumGCInterval = time.Hour + // fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A // stats mismatch is the event in which // - the consistency checker finds that all replicas are consistent @@ -54,8 +65,8 @@ import ( // know old CRDB versions (<19.1 at time of writing) were not involved. var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTENT_STATS", false) -// ReplicaChecksum contains progress on a replica checksum computation. -type ReplicaChecksum struct { +// replicaChecksum contains progress on a replica checksum computation. +type replicaChecksum struct { CollectChecksumResponse // started is true if the checksum computation has started. started bool @@ -419,10 +430,19 @@ func (r *Replica) RunConsistencyCheck( return results, nil } +func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) { + for id, val := range r.mu.checksums { + // The timestamp is valid only if set. + if !val.gcTimestamp.IsZero() && now.After(val.gcTimestamp) { + delete(r.mu.checksums, id) + } + } +} + // getChecksum waits for the result of ComputeChecksum and returns it. // It returns false if there is no checksum being computed for the id, // or it has already been GCed. -func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksum, error) { +func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (replicaChecksum, error) { now := timeutil.Now() r.mu.Lock() r.gcOldChecksumEntriesLocked(now) @@ -442,14 +462,14 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksu // Wait for the checksum to compute or at least to start. computed, err := r.checksumInitialWait(ctx, id, c.notify) if err != nil { - return ReplicaChecksum{}, err + return replicaChecksum{}, err } // If the checksum started, but has not completed commit // to waiting the full deadline. if !computed { _, err = r.checksumWait(ctx, id, c.notify, nil) if err != nil { - return ReplicaChecksum{}, err + return replicaChecksum{}, err } } @@ -463,7 +483,7 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksu // The latter case can occur when there's a version mismatch or, more generally, // when the (async) checksum computation fails. if !ok || c.Checksum == nil { - return ReplicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id) + return replicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id) } return c, nil } @@ -539,7 +559,7 @@ func (r *Replica) computeChecksumDone( c.Delta = enginepb.MVCCStatsDelta(delta) c.Persisted = result.PersistedMS } - c.gcTimestamp = timeutil.Now().Add(batcheval.ReplicaChecksumGCInterval) + c.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval) c.Snapshot = snapshot r.mu.checksums[id] = c // Notify @@ -559,7 +579,7 @@ type replicaHash struct { // sha512 computes the SHA512 hash of all the replica data at the snapshot. // It will dump all the kv data into snapshot if it is provided. -func (r *Replica) sha512( +func (*Replica) sha512( ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, @@ -684,3 +704,122 @@ func (r *Replica) sha512( return &result, nil } + +func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.ComputeChecksum) { + stopper := r.store.Stopper() + now := timeutil.Now() + r.mu.Lock() + var notify chan struct{} + if c, ok := r.mu.checksums[cc.ChecksumID]; !ok { + // There is no record of this ID. Make a new notification. + notify = make(chan struct{}) + } else if !c.started { + // A CollectChecksumRequest is waiting on the existing notification. + notify = c.notify + } else { + log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s", + cc.ChecksumID) + } + + r.gcOldChecksumEntriesLocked(now) + + // Create an entry with checksum == nil and gcTimestamp unset. + r.mu.checksums[cc.ChecksumID] = replicaChecksum{started: true, notify: notify} + desc := *r.mu.state.Desc + r.mu.Unlock() + + if cc.Version != batcheval.ReplicaChecksumVersion { + r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) + log.Infof(ctx, "incompatible ComputeChecksum versions (requested: %d, have: %d)", + cc.Version, batcheval.ReplicaChecksumVersion) + return + } + + // Caller is holding raftMu, so an engine snapshot is automatically + // Raft-consistent (i.e. not in the middle of an AddSSTable). + snap := r.store.engine.NewSnapshot() + if cc.Checkpoint { + sl := stateloader.Make(r.RangeID) + as, err := sl.LoadRangeAppliedState(ctx, snap) + if err != nil { + log.Warningf(ctx, "unable to load applied index, continuing anyway") + } + // NB: the names here will match on all nodes, which is nice for debugging. + tag := fmt.Sprintf("r%d_at_%d", r.RangeID, as.RaftAppliedIndex) + if dir, err := r.store.checkpoint(ctx, tag); err != nil { + log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err) + } else { + log.Warningf(ctx, "created checkpoint %s", dir) + } + } + + // Compute SHA asynchronously and store it in a map by UUID. + // Don't use the proposal's context for this, as it likely to be canceled very + // soon. + if err := stopper.RunAsyncTask(r.AnnotateCtx(context.Background()), "storage.Replica: computing checksum", func(ctx context.Context) { + func() { + defer snap.Close() + var snapshot *roachpb.RaftSnapshotData + if cc.SaveSnapshot { + snapshot = &roachpb.RaftSnapshotData{} + } + + result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) + if err != nil { + log.Errorf(ctx, "%v", err) + result = nil + } + r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) + }() + + var shouldFatal bool + for _, rDesc := range cc.Terminate { + if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID { + shouldFatal = true + } + } + + if shouldFatal { + // This node should fatal as a result of a previous consistency + // check (i.e. this round is carried out only to obtain a diff). + // If we fatal too early, the diff won't make it back to the lease- + // holder and thus won't be printed to the logs. Since we're already + // in a goroutine that's about to end, simply sleep for a few seconds + // and then terminate. + auxDir := r.store.engine.GetAuxiliaryDir() + _ = r.store.engine.MkdirAll(auxDir) + path := base.PreventedStartupFile(auxDir) + + const attentionFmt = `ATTENTION: + +this node is terminating because a replica inconsistency was detected between %s +and its other replicas. Please check your cluster-wide log files for more +information and contact the CockroachDB support team. It is not necessarily safe +to replace this node; cluster data may still be at risk of corruption. + +A checkpoints directory to aid (expert) debugging should be present in: +%s + +A file preventing this node from restarting was placed at: +%s +` + preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path) + if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil { + log.Warningf(ctx, "%v", err) + } + + if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil { + p(*r.store.Ident) + } else { + time.Sleep(10 * time.Second) + log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path) + } + } + + }); err != nil { + defer snap.Close() + log.Errorf(ctx, "could not run async checksum computation (ID = %s): %v", cc.ChecksumID, err) + // Set checksum to nil. + r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) + } +} diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 07fc06d792b4..66afdee534ff 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -77,7 +77,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // Simple condition, the checksum is notified, but not computed. tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = ReplicaChecksum{notify: notify} + tc.repl.mu.checksums[id] = replicaChecksum{notify: notify} tc.repl.mu.Unlock() rc, err := tc.repl.getChecksum(ctx, id) if !testutils.IsError(err, "no checksum found") { @@ -88,7 +88,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // this will take 10ms. id = uuid.FastMakeV4() tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = ReplicaChecksum{notify: make(chan struct{})} + tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{})} tc.repl.mu.Unlock() rc, err = tc.repl.getChecksum(ctx, id) if !testutils.IsError(err, "checksum computation did not start") { @@ -99,7 +99,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { // so next step is for context deadline. id = uuid.FastMakeV4() tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = ReplicaChecksum{notify: make(chan struct{}), started: true} + tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{}), started: true} tc.repl.mu.Unlock() rc, err = tc.repl.getChecksum(ctx, id) if !testutils.IsError(err, "context deadline exceeded") { diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 06d779def727..2545b21644d7 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -100,7 +100,7 @@ func newUnloadedReplica( return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) }) r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} - r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} + r.mu.checksums = map[uuid.UUID]replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index ac31494229af..bbef1c9cfa95 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -17,22 +17,18 @@ import ( "time" "unsafe" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -174,139 +170,6 @@ func (proposal *ProposalData) releaseQuota() { } } -// TODO(tschottdorf): we should find new homes for the checksum, lease -// code, and various others below to leave here only the core logic. -// Not moving anything right now to avoid awkward diffs. These should -// all be moved to replica_application_result.go. - -func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) { - for id, val := range r.mu.checksums { - // The timestamp is valid only if set. - if !val.gcTimestamp.IsZero() && now.After(val.gcTimestamp) { - delete(r.mu.checksums, id) - } - } -} - -func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.ComputeChecksum) { - stopper := r.store.Stopper() - now := timeutil.Now() - r.mu.Lock() - var notify chan struct{} - if c, ok := r.mu.checksums[cc.ChecksumID]; !ok { - // There is no record of this ID. Make a new notification. - notify = make(chan struct{}) - } else if !c.started { - // A CollectChecksumRequest is waiting on the existing notification. - notify = c.notify - } else { - log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s", - cc.ChecksumID) - } - - r.gcOldChecksumEntriesLocked(now) - - // Create an entry with checksum == nil and gcTimestamp unset. - r.mu.checksums[cc.ChecksumID] = ReplicaChecksum{started: true, notify: notify} - desc := *r.mu.state.Desc - r.mu.Unlock() - - if cc.Version != batcheval.ReplicaChecksumVersion { - r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) - log.Infof(ctx, "incompatible ComputeChecksum versions (requested: %d, have: %d)", - cc.Version, batcheval.ReplicaChecksumVersion) - return - } - - // Caller is holding raftMu, so an engine snapshot is automatically - // Raft-consistent (i.e. not in the middle of an AddSSTable). - snap := r.store.engine.NewSnapshot() - if cc.Checkpoint { - sl := stateloader.Make(r.RangeID) - as, err := sl.LoadRangeAppliedState(ctx, snap) - if err != nil { - log.Warningf(ctx, "unable to load applied index, continuing anyway") - } - // NB: the names here will match on all nodes, which is nice for debugging. - tag := fmt.Sprintf("r%d_at_%d", r.RangeID, as.RaftAppliedIndex) - if dir, err := r.store.checkpoint(ctx, tag); err != nil { - log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err) - } else { - log.Warningf(ctx, "created checkpoint %s", dir) - } - } - - // Compute SHA asynchronously and store it in a map by UUID. - // Don't use the proposal's context for this, as it likely to be canceled very - // soon. - if err := stopper.RunAsyncTask(r.AnnotateCtx(context.Background()), "storage.Replica: computing checksum", func(ctx context.Context) { - func() { - defer snap.Close() - var snapshot *roachpb.RaftSnapshotData - if cc.SaveSnapshot { - snapshot = &roachpb.RaftSnapshotData{} - } - - result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) - if err != nil { - log.Errorf(ctx, "%v", err) - result = nil - } - r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) - }() - - var shouldFatal bool - for _, rDesc := range cc.Terminate { - if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID { - shouldFatal = true - } - } - - if shouldFatal { - // This node should fatal as a result of a previous consistency - // check (i.e. this round is carried out only to obtain a diff). - // If we fatal too early, the diff won't make it back to the lease- - // holder and thus won't be printed to the logs. Since we're already - // in a goroutine that's about to end, simply sleep for a few seconds - // and then terminate. - auxDir := r.store.engine.GetAuxiliaryDir() - _ = r.store.engine.MkdirAll(auxDir) - path := base.PreventedStartupFile(auxDir) - - const attentionFmt = `ATTENTION: - -this node is terminating because a replica inconsistency was detected between %s -and its other replicas. Please check your cluster-wide log files for more -information and contact the CockroachDB support team. It is not necessarily safe -to replace this node; cluster data may still be at risk of corruption. - -A checkpoints directory to aid (expert) debugging should be present in: -%s - -A file preventing this node from restarting was placed at: -%s -` - preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path) - if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil { - log.Warningf(ctx, "%v", err) - } - - if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil { - p(*r.store.Ident) - } else { - time.Sleep(10 * time.Second) - log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path) - } - } - - }); err != nil { - defer snap.Close() - log.Errorf(ctx, "could not run async checksum computation (ID = %s): %v", cc.ChecksumID, err) - // Set checksum to nil. - r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) - } -} - // leaseJumpOption controls what assertions leasePostApplyLocked can make. type leaseJumpOption bool