Skip to content

Commit

Permalink
Merge pull request #42011 from irfansharif/191028.truncation-asserts
Browse files Browse the repository at this point in the history
storage: assert around snapshot sending/receiving
  • Loading branch information
tbg authored Nov 5, 2019
2 parents c114859 + 93db76c commit cc22647
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 8 deletions.
9 changes: 7 additions & 2 deletions pkg/storage/batcheval/cmd_truncate_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,15 @@ func TruncateLog(
ms.SysBytes = -ms.SysBytes // simulate the deletion

} else {
if _, _, _, err := engine.MVCCDeleteRange(ctx, batch, &ms, start.Key, end.Key, math.MaxInt64, /* max */
hlc.Timestamp{}, nil /* txn */, false /* returnKeys */); err != nil {
_, _, numDeleted, err := engine.MVCCDeleteRange(ctx, batch, &ms, start.Key, end.Key, math.MaxInt64, /* max */
hlc.Timestamp{}, nil /* txn */, false /* returnKeys */)
if err != nil {
return result.Result{}, err
}
if expNumDeleted := (args.Index - firstIndex); uint64(numDeleted) > expNumDeleted {
log.Fatalf(ctx, "expected to delete up to %d log entries [%d, %d), deleted %d entries",
expNumDeleted, firstIndex, args.Index, numDeleted)
}
}

tState := &roachpb.RaftTruncatedState{
Expand Down
22 changes: 19 additions & 3 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,29 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision.NewFirstIndex = decision.Input.FirstIndex
decision.ChosenVia = truncatableIndexChosenViaFirstIndex
}

// Invariants: NewFirstIndex >= FirstIndex
// NewFirstIndex <= LastIndex (if != 10)
// NewFirstIndex <= QuorumIndex (if != 0)
//
// For uninit'ed replicas we can have input.FirstIndex > input.LastIndex, more
// specifically input.FirstIndex = input.LastIndex + 1. FirstIndex is set to
// TruncatedState.Index + 1, and for an unit'ed replica, LastIndex is simply
// 10. This is what informs the `input.LastIndex == 10` conditional below.
valid := (decision.NewFirstIndex >= input.FirstIndex) &&
(decision.NewFirstIndex <= input.LastIndex || input.LastIndex == 10) &&
(decision.NewFirstIndex <= decision.QuorumIndex || decision.QuorumIndex == 0)
if !valid {
err := fmt.Sprintf("invalid truncation decision; output = %d, input: [%d, %d], quorum idx = %d",
decision.NewFirstIndex, input.FirstIndex, input.LastIndex, decision.QuorumIndex)
panic(err)
}

return decision
}

// getQuorumIndex returns the index which a quorum of the nodes have
// committed. The snapshotLogTruncationConstraints indicates the index of a pending
// snapshot which is considered part of the Raft group even though it hasn't
// been added yet. Note that getQuorumIndex may return 0 if the progress map
// committed. Note that getQuorumIndex may return 0 if the progress map
// doesn't contain information for a sufficient number of followers (e.g. the
// local replica has only recently become the leader). In general, the value
// returned by getQuorumIndex may be smaller than raftStatus.Commit which is
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ type Replica struct {
// from the Raft log entry. Use the invalidLastTerm constant for this
// case.
lastIndex, lastTerm uint64
// A map of raft log index of pending preemptive snapshots to deadlines.
// A map of raft log index of pending snapshots to deadlines.
// Used to prohibit raft log truncations that would leave a gap between
// the snapshot and the new first index. The map entry has a zero
// deadline while the snapshot is being sent and turns nonzero when the
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,13 @@ func (r *Replica) sendSnapshot(
r.store.Engine().NewBatch,
sent,
); err != nil {
if errors.Cause(err) == malformedSnapshotError {
checkpointDir := r.store.checkpoint(ctx,
fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()))

log.Fatalf(ctx, "malformed snapshot generated, checkpoint created at: %s", checkpointDir)
}

return &snapshotError{err}
}
return nil
Expand Down
42 changes: 42 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,10 @@ type IncomingSnapshot struct {
snapType string
}

func (s *IncomingSnapshot) String() string {
return fmt.Sprintf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.State.RaftAppliedIndex)
}

// snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the
// given range. Note that snapshot() is called without Replica.raftMu held.
func snapshot(
Expand Down Expand Up @@ -936,6 +940,21 @@ func (r *Replica) applySnapshot(
s.RaftAppliedIndex, snap.Metadata.Index)
}

if expLen := s.RaftAppliedIndex - s.TruncatedState.Index; expLen != uint64(len(logEntries)) {
entriesRange, err := extractRangeFromEntries(inSnap.LogEntries)
if err != nil {
return err
}

checkpointDir := r.store.checkpoint(ctx,
fmt.Sprintf("r%d_%s", r.RangeID, inSnap.SnapUUID.String()))

log.Fatalf(ctx, "missing log entries in snapshot (%s): got %d entries, expected %d "+
"(TruncatedState.Index=%d, HardState=%s, LogEntries=%s). checkpoint created at: %s",
inSnap.String(), len(logEntries), expLen, s.TruncatedState.Index,
hs.String(), entriesRange, checkpointDir)
}

// We've written Raft log entries, so we need to sync the WAL.
if err := batch.Commit(syncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil {
return err
Expand Down Expand Up @@ -1016,6 +1035,29 @@ func (r *Replica) applySnapshot(
return nil
}

// extractRangeFromEntries returns a string representation of the range of
// marshaled list of raft log entries in the form of [first-index, last-index].
// If the list is empty, "[n/a, n/a]" is returned instead.
func extractRangeFromEntries(logEntries [][]byte) (string, error) {
var firstIndex, lastIndex string
if len(logEntries) == 0 {
firstIndex = "n/a"
lastIndex = "n/a"
} else {
firstAndLastLogEntries := make([]raftpb.Entry, 2)
if err := protoutil.Unmarshal(logEntries[0], &firstAndLastLogEntries[0]); err != nil {
return "", err
}
if err := protoutil.Unmarshal(logEntries[len(logEntries)-1], &firstAndLastLogEntries[1]); err != nil {
return "", err
}

firstIndex = string(firstAndLastLogEntries[0].Index)
lastIndex = string(firstAndLastLogEntries[1].Index)
}
return fmt.Sprintf("[%s, %s]", firstIndex, lastIndex), nil
}

type raftCommandEncodingVersion byte

// Raft commands are encoded with a 1-byte version (currently 0 or 1), an 8-byte
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -4458,6 +4460,20 @@ func (s *Store) updateCommandQueueGauges() error {
return nil
}

func (s *Store) checkpoint(ctx context.Context, tag string) string {
// We create RocksDB checkpoints whenever we run into missing raft log
// entries in the incoming snapshot.
checkpointBase := filepath.Join(s.engine.GetAuxiliaryDir(), "checkpoints")
_ = os.MkdirAll(checkpointBase, 0700)

checkpointDir := filepath.Join(checkpointBase, tag)
if err := s.engine.CreateCheckpoint(checkpointDir); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", checkpointDir, err)
}

return checkpointDir
}

// ComputeMetrics immediately computes the current value of store metrics which
// cannot be computed incrementally. This method should be invoked periodically
// by a higher-level system which records store metrics.
Expand Down
35 changes: 34 additions & 1 deletion pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type kvBatchSnapshotStrategy struct {
newBatch func() engine.Batch
}

// Send implements the snapshotStrategy interface.
// Receive implements the snapshotStrategy interface.
func (kvSS *kvBatchSnapshotStrategy) Receive(
ctx context.Context, stream incomingSnapshotStream, header SnapshotRequest_Header,
) (IncomingSnapshot, error) {
Expand Down Expand Up @@ -147,6 +147,17 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
State: &header.State,
snapType: snapTypeRaft,
}

expLen := inSnap.State.RaftAppliedIndex - inSnap.State.TruncatedState.Index
if expLen != uint64(len(logEntries)) {
// We've received a botched snapshot. We could fatal right here but opt
// to warn loudly instead, and fatal when applying the snapshot
// (in Replica.applySnapshot) in order to capture replica hard state.
log.Warningf(ctx,
"missing log entries in snapshot (%s): got %d entries, expected %d",
inSnap.String(), len(logEntries), expLen)
}

if header.RaftMessageRequest.ToReplica.ReplicaID == 0 {
inSnap.snapType = snapTypePreemptive
}
Expand All @@ -156,6 +167,10 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
}
}

// A MalformedSnapshotError indicates that the snapshot in question is
// malformed, for e.g. missing raft log entries.
var malformedSnapshotError = errors.New("malformed snapshot generated")

// Send implements the snapshotStrategy interface.
func (kvSS *kvBatchSnapshotStrategy) Send(
ctx context.Context,
Expand Down Expand Up @@ -266,6 +281,24 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
return err
}

// The difference between the snapshot index (applied index at the time of
// snapshot) and the truncated index should equal the number of log entries
// shipped over.
expLen := endIndex - firstIndex
if expLen != uint64(len(logEntries)) {
// We've generated a botched snapshot. We could fatal right here but opt
// to warn loudly instead, and fatal at the caller to capture a checkpoint
// of the underlying storage engine.
entriesRange, err := extractRangeFromEntries(logEntries)
if err != nil {
return err
}
log.Warningf(ctx, "missing log entries in snapshot (%s): "+
"got %d entries, expected %d (TruncatedState.Index=%d, LogEntries=%s)",
snap.String(), len(logEntries), expLen, snap.State.TruncatedState.Index, entriesRange)
return malformedSnapshotError
}

// Inline the payloads for all sideloaded proposals.
//
// TODO(tschottdorf): could also send slim proposals and attach sideloaded
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/store_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -60,6 +61,10 @@ func TestSnapshotRaftLogLimit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
firstIndex, err := (*replicaRaftStorage)(repl).FirstIndex()
if err != nil {
t.Fatal(err)
}
eng := store.Engine()
snap := eng.NewSnapshot()
defer snap.Close()
Expand All @@ -74,7 +79,12 @@ func TestSnapshotRaftLogLimit(t *testing.T) {
outSnap := &OutgoingSnapshot{
Iter: iter,
EngineSnap: snap,
snapType: snapType,
State: storagebase.ReplicaState{
TruncatedState: &roachpb.RaftTruncatedState{
Index: firstIndex - 1,
},
},
snapType: snapType,
RaftSnap: raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: lastIndex,
Expand Down

0 comments on commit cc22647

Please sign in to comment.