Skip to content

Commit

Permalink
kvserver: use ClearRawRange to truncate very large Raft logs
Browse files Browse the repository at this point in the history
Raft log truncation was done using point deletes in a single Pebble
batch. If the number of entries to truncate was very large, this could
result in overflowing the Pebble batch, causing the node to panic. This
has been seen to happen e.g. when the snapshot rate was set very low,
effectively stalling snapshot transfers which in turn held up log
truncations for extended periods of time.

This patch uses a Pebble range tombstone if the number of entries to
truncate is very large (>100k). In most common cases, point deletes are
still used, to avoid writing too many range tombstones to Pebble.

Release note (bug fix): Fixed a bug which could cause nodes to crash
when truncating abnormally large Raft logs.
  • Loading branch information
erikgrinaker committed Feb 1, 2022
1 parent 65db9cf commit 66fd9f4
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 39 deletions.
60 changes: 41 additions & 19 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,26 @@ import (
"go.etcd.io/etcd/raft/v3/tracker"
)

const (
// raftLogTruncationClearRangeThreshold is the number of entries at which Raft
// log truncation uses a Pebble range tombstone rather than point deletes. It
// is set high enough to avoid writing too many range tombstones to Pebble,
// but low enough that we don't do too many point deletes either (in
// particular, we don't want to overflow the Pebble write batch).
//
// In the steady state, Raft log truncation occurs when RaftLogQueueStaleSize
// (64 KB) or RaftLogQueueStaleThreshold (100 entries) is exceeded, so
// truncations are generally small. If followers are lagging, we let the log
// grow to RaftLogTruncationThreshold (16 MB) before truncating.
//
// 100k was chosen because it is unlikely to be hit in most common cases,
// keeping the number of range tombstones low, but will trigger when Raft logs
// have grown abnormally large. RaftLogTruncationThreshold will typically not
// trigger it, unless the average log entry is <= 160 bytes. The key size is
// ~16 bytes, so Pebble point deletion batches will be bounded at ~1.6MB.
raftLogTruncationClearRangeThreshold = 100000
)

func makeIDKey() kvserverbase.CmdIDKey {
idKeyBuf := make([]byte, 0, raftCommandIDLen)
idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63()))
Expand Down Expand Up @@ -1909,35 +1929,37 @@ func handleTruncatedStateBelowRaftPreApply(
loader stateloader.StateLoader,
readWriter storage.ReadWriter,
) (_apply bool, _ error) {
if suggestedTruncatedState.Index <= currentTruncatedState.Index {
// The suggested truncated state moves us backwards; instruct the
// caller to not update the in-memory state.
return false, nil
}

// Truncate the Raft log from the entry after the previous
// truncation index to the new truncation index. This is performed
// atomically with the raft command application so that the
// TruncatedState index is always consistent with the state of the
// Raft log itself. We can use the distinct writer because we know
// all writes will be to distinct keys.
//
// Intentionally don't use range deletion tombstones (ClearRange())
// due to performance concerns connected to having many range
// deletion tombstones. There is a chance that ClearRange will
// perform well here because the tombstones could be "collapsed",
// but it is hardly worth the risk at this point.
// Raft log itself.
prefixBuf := &loader.RangeIDPrefixBuf
for idx := currentTruncatedState.Index + 1; idx <= suggestedTruncatedState.Index; idx++ {
numTruncatedEntries := suggestedTruncatedState.Index - currentTruncatedState.Index
if numTruncatedEntries >= raftLogTruncationClearRangeThreshold {
start := prefixBuf.RaftLogKey(currentTruncatedState.Index + 1).Clone()
end := prefixBuf.RaftLogKey(suggestedTruncatedState.Index + 1).Clone() // end is exclusive
if err := readWriter.ClearRawRange(start, end); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v at index %d-%d",
suggestedTruncatedState, currentTruncatedState.Index+1, suggestedTruncatedState.Index+1)
}
} else {
// NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to
// avoid allocating when constructing Raft log keys (16 bytes).
unsafeKey := prefixBuf.RaftLogKey(idx)
if err := readWriter.ClearUnversioned(unsafeKey); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v at index %d",
suggestedTruncatedState, idx)
for idx := currentTruncatedState.Index + 1; idx <= suggestedTruncatedState.Index; idx++ {
if err := readWriter.ClearUnversioned(prefixBuf.RaftLogKey(idx)); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v at index %d",
suggestedTruncatedState, idx)
}
}
}

if suggestedTruncatedState.Index <= currentTruncatedState.Index {
// The suggested truncated state moves us backwards; instruct the
// caller to not update the in-memory state.
return false, nil
}

// The suggested truncated state moves us forward; apply it and tell
// the caller as much.
if err := storage.MVCCPutProto(
Expand Down
65 changes: 48 additions & 17 deletions pkg/kv/kvserver/replica_raft_truncation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ package kvserver
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -40,6 +42,7 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
datadriven.Walk(t, testutils.TestDataPath(t, "truncated_state"), func(t *testing.T, path string) {
const rangeID = 12
loader := stateloader.Make(rangeID)
prefixBuf := &loader.RangeIDPrefixBuf
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

Expand All @@ -50,9 +53,9 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
d.ScanArgs(t, "index", &prevTruncatedState.Index)
d.ScanArgs(t, "term", &prevTruncatedState.Term)
return ""

case "put":
var index uint64
var term uint64
var index, term uint64
d.ScanArgs(t, "index", &index)
d.ScanArgs(t, "term", &term)

Expand All @@ -61,42 +64,70 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Term: term,
}

assert.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState))
require.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState))
return ""

case "handle":
var buf bytes.Buffer

var index uint64
var term uint64
var index, term uint64
d.ScanArgs(t, "index", &index)
d.ScanArgs(t, "term", &term)

suggestedTruncatedState := &roachpb.RaftTruncatedState{
Index: index,
Term: term,
}

currentTruncatedState, err := loader.LoadRaftTruncatedState(ctx, eng)
assert.NoError(t, err)
apply, err := handleTruncatedStateBelowRaftPreApply(ctx, &currentTruncatedState, suggestedTruncatedState, loader, eng)
if err != nil {
return err.Error()
require.NoError(t, err)

// Write log entries at start, middle, end, and above the truncated interval.
if suggestedTruncatedState.Index > currentTruncatedState.Index {
indexes := []uint64{
currentTruncatedState.Index + 1, // start
(suggestedTruncatedState.Index + currentTruncatedState.Index + 1) / 2, // middle
suggestedTruncatedState.Index, // end
suggestedTruncatedState.Index + 1, // new head
}
for _, idx := range indexes {
meta := enginepb.MVCCMetadata{RawBytes: make([]byte, 8)}
binary.BigEndian.PutUint64(meta.RawBytes, idx)
value, err := protoutil.Marshal(&meta)
require.NoError(t, err)
require.NoError(t, eng.PutUnversioned(prefixBuf.RaftLogKey(idx), value))
}
}

// Apply truncation.
apply, err := handleTruncatedStateBelowRaftPreApply(ctx, &currentTruncatedState, suggestedTruncatedState, loader, eng)
require.NoError(t, err)
fmt.Fprintf(&buf, "apply: %t\n", apply)

// Check the truncated state.
key := keys.RaftTruncatedStateKey(rangeID)
var truncatedState roachpb.RaftTruncatedState
ok, err := storage.MVCCGetProto(ctx, eng, key, hlc.Timestamp{}, &truncatedState, storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
require.True(t, ok)
fmt.Fprintf(&buf, "%s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term)
fmt.Fprintf(&buf, "state: %s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term)

// Find the first untruncated log entry (the log head).
res, err := storage.MVCCScan(ctx, eng,
prefixBuf.RaftLogPrefix().Clone(),
prefixBuf.RaftLogPrefix().PrefixEnd(),
hlc.Timestamp{},
storage.MVCCScanOptions{MaxKeys: 1})
require.NoError(t, err)
var tail roachpb.Key
if len(res.KVs) > 0 {
tail = res.KVs[0].Key
}
fmt.Fprintf(&buf, "head: %s\n", tail)

return buf.String()

default:
return fmt.Sprintf("unsupported: %s", d.Cmd)
}
return fmt.Sprintf("unsupported: %s", d.Cmd)
})
})
}
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/testdata/truncated_state/truncated_state
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,28 @@ prev index=100 term=9
handle index=150 term=9
----
apply: true
/Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# Simulate another truncation that moves forward.

handle index=170 term=9
----
apply: true
/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:171

# ... and one that moves backwards and should not take effect.

handle index=150 term=9
----
apply: false
/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:171

# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works.
handle index=12345678901234567890 term=9
----
apply: true
state: /Local/RangeID/12/u/RaftTruncatedState -> index=12345678901234567890 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891
7 changes: 7 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ func bytesPrefixEnd(b []byte) []byte {
return b
}

// Clone returns a copy of the key.
func (k Key) Clone() Key {
c := make(Key, len(k))
copy(c, k)
return c
}

// Next returns the next key in lexicographic sort order. The method may only
// take a shallow copy of the Key, so both the receiver and the return
// value should be treated as immutable after.
Expand Down

0 comments on commit 66fd9f4

Please sign in to comment.