Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: consistency checker cleanups, pt 1 #76841

Merged
merged 4 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,9 @@ func declareKeysComputeChecksum(
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())})
}

// Version numbers for Replica checksum computation. Requests silently no-op
// unless the versions are compatible.
const (
ReplicaChecksumVersion = 4
ReplicaChecksumGCInterval = time.Hour
)
// ReplicaChecksumVersion versions the checksum computation. Requests silently no-op
// unless the versions between the requesting and requested replica are compatible.
const ReplicaChecksumVersion = 4

// ComputeChecksum starts the process of computing a checksum on the replica at
// a particular snapshot. The checksum is later verified through a
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ type Replica struct {
lastUpdateTimes lastUpdateTimesMap

// Computed checksum at a snapshot UUID.
checksums map[uuid.UUID]ReplicaChecksum
checksums map[uuid.UUID]replicaChecksum

// proposalQuota is the quota pool maintained by the lease holder where
// incoming writes acquire quota from a fixed quota pool before going
Expand Down
155 changes: 147 additions & 8 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -40,6 +43,14 @@ import (
"github.com/cockroachdb/redact"
)

// How long to keep consistency checker checksums in-memory for collection.
// Typically a long-poll waits for the result of the computation, so it's almost
// immediately collected. However, the consistency checker synchronously
// collects the first replica's checksum before all others, so if the first one
// is slow the checksum may not be collected right away, and that first
// consistency check can take a long time due to rate limiting and range size.
const replicaChecksumGCInterval = time.Hour

// fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A
// stats mismatch is the event in which
// - the consistency checker finds that all replicas are consistent
Expand All @@ -54,8 +65,8 @@ import (
// know old CRDB versions (<19.1 at time of writing) were not involved.
var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTENT_STATS", false)

// ReplicaChecksum contains progress on a replica checksum computation.
type ReplicaChecksum struct {
// replicaChecksum contains progress on a replica checksum computation.
type replicaChecksum struct {
CollectChecksumResponse
// started is true if the checksum computation has started.
started bool
Expand Down Expand Up @@ -419,10 +430,19 @@ func (r *Replica) RunConsistencyCheck(
return results, nil
}

func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) {
for id, val := range r.mu.checksums {
// The timestamp is valid only if set.
if !val.gcTimestamp.IsZero() && now.After(val.gcTimestamp) {
delete(r.mu.checksums, id)
}
}
}

// getChecksum waits for the result of ComputeChecksum and returns it.
// It returns false if there is no checksum being computed for the id,
// or it has already been GCed.
func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksum, error) {
func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (replicaChecksum, error) {
now := timeutil.Now()
r.mu.Lock()
r.gcOldChecksumEntriesLocked(now)
Expand All @@ -442,14 +462,14 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksu
// Wait for the checksum to compute or at least to start.
computed, err := r.checksumInitialWait(ctx, id, c.notify)
if err != nil {
return ReplicaChecksum{}, err
return replicaChecksum{}, err
}
// If the checksum started, but has not completed commit
// to waiting the full deadline.
if !computed {
_, err = r.checksumWait(ctx, id, c.notify, nil)
if err != nil {
return ReplicaChecksum{}, err
return replicaChecksum{}, err
}
}

Expand All @@ -463,7 +483,7 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksu
// The latter case can occur when there's a version mismatch or, more generally,
// when the (async) checksum computation fails.
if !ok || c.Checksum == nil {
return ReplicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id)
return replicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id)
}
return c, nil
}
Expand Down Expand Up @@ -539,7 +559,7 @@ func (r *Replica) computeChecksumDone(
c.Delta = enginepb.MVCCStatsDelta(delta)
c.Persisted = result.PersistedMS
}
c.gcTimestamp = timeutil.Now().Add(batcheval.ReplicaChecksumGCInterval)
c.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval)
c.Snapshot = snapshot
r.mu.checksums[id] = c
// Notify
Expand All @@ -559,7 +579,7 @@ type replicaHash struct {

// sha512 computes the SHA512 hash of all the replica data at the snapshot.
// It will dump all the kv data into snapshot if it is provided.
func (r *Replica) sha512(
func (*Replica) sha512(
ctx context.Context,
desc roachpb.RangeDescriptor,
snap storage.Reader,
Expand Down Expand Up @@ -684,3 +704,122 @@ func (r *Replica) sha512(

return &result, nil
}

func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.ComputeChecksum) {
stopper := r.store.Stopper()
now := timeutil.Now()
r.mu.Lock()
var notify chan struct{}
if c, ok := r.mu.checksums[cc.ChecksumID]; !ok {
// There is no record of this ID. Make a new notification.
notify = make(chan struct{})
} else if !c.started {
// A CollectChecksumRequest is waiting on the existing notification.
notify = c.notify
} else {
log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s",
cc.ChecksumID)
}

r.gcOldChecksumEntriesLocked(now)

// Create an entry with checksum == nil and gcTimestamp unset.
r.mu.checksums[cc.ChecksumID] = replicaChecksum{started: true, notify: notify}
desc := *r.mu.state.Desc
r.mu.Unlock()

if cc.Version != batcheval.ReplicaChecksumVersion {
r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil)
log.Infof(ctx, "incompatible ComputeChecksum versions (requested: %d, have: %d)",
cc.Version, batcheval.ReplicaChecksumVersion)
return
}

// Caller is holding raftMu, so an engine snapshot is automatically
// Raft-consistent (i.e. not in the middle of an AddSSTable).
snap := r.store.engine.NewSnapshot()
if cc.Checkpoint {
sl := stateloader.Make(r.RangeID)
as, err := sl.LoadRangeAppliedState(ctx, snap)
if err != nil {
log.Warningf(ctx, "unable to load applied index, continuing anyway")
}
// NB: the names here will match on all nodes, which is nice for debugging.
tag := fmt.Sprintf("r%d_at_%d", r.RangeID, as.RaftAppliedIndex)
if dir, err := r.store.checkpoint(ctx, tag); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err)
} else {
log.Warningf(ctx, "created checkpoint %s", dir)
}
}

// Compute SHA asynchronously and store it in a map by UUID.
// Don't use the proposal's context for this, as it likely to be canceled very
// soon.
if err := stopper.RunAsyncTask(r.AnnotateCtx(context.Background()), "storage.Replica: computing checksum", func(ctx context.Context) {
func() {
defer snap.Close()
var snapshot *roachpb.RaftSnapshotData
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
}

result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter)
if err != nil {
log.Errorf(ctx, "%v", err)
result = nil
}
r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot)
}()

var shouldFatal bool
for _, rDesc := range cc.Terminate {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID {
shouldFatal = true
}
}

if shouldFatal {
// This node should fatal as a result of a previous consistency
// check (i.e. this round is carried out only to obtain a diff).
// If we fatal too early, the diff won't make it back to the lease-
// holder and thus won't be printed to the logs. Since we're already
// in a goroutine that's about to end, simply sleep for a few seconds
// and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)

const attentionFmt = `ATTENTION:

this node is terminating because a replica inconsistency was detected between %s
and its other replicas. Please check your cluster-wide log files for more
information and contact the CockroachDB support team. It is not necessarily safe
to replace this node; cluster data may still be at risk of corruption.

A checkpoints directory to aid (expert) debugging should be present in:
%s

A file preventing this node from restarting was placed at:
%s
`
preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path)
if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}

if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil {
p(*r.store.Ident)
} else {
time.Sleep(10 * time.Second)
log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path)
}
}

}); err != nil {
defer snap.Close()
log.Errorf(ctx, "could not run async checksum computation (ID = %s): %v", cc.ChecksumID, err)
// Set checksum to nil.
r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil)
}
}
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {

// Simple condition, the checksum is notified, but not computed.
tc.repl.mu.Lock()
tc.repl.mu.checksums[id] = ReplicaChecksum{notify: notify}
tc.repl.mu.checksums[id] = replicaChecksum{notify: notify}
tc.repl.mu.Unlock()
rc, err := tc.repl.getChecksum(ctx, id)
if !testutils.IsError(err, "no checksum found") {
Expand All @@ -88,7 +88,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
// this will take 10ms.
id = uuid.FastMakeV4()
tc.repl.mu.Lock()
tc.repl.mu.checksums[id] = ReplicaChecksum{notify: make(chan struct{})}
tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{})}
tc.repl.mu.Unlock()
rc, err = tc.repl.getChecksum(ctx, id)
if !testutils.IsError(err, "checksum computation did not start") {
Expand All @@ -99,7 +99,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
// so next step is for context deadline.
id = uuid.FastMakeV4()
tc.repl.mu.Lock()
tc.repl.mu.checksums[id] = ReplicaChecksum{notify: make(chan struct{}), started: true}
tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{}), started: true}
tc.repl.mu.Unlock()
rc, err = tc.repl.getChecksum(ctx, id)
if !testutils.IsError(err, "context deadline exceeded") {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func newUnloadedReplica(
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
})
r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]ReplicaChecksum{}
r.mu.checksums = map[uuid.UUID]replicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader
r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps
Expand Down
Loading