Skip to content

Commit

Permalink
kvserver: enable loosely coupled raft log truncation
Browse files Browse the repository at this point in the history
The const override that disabled loosely coupled truncation
is removed. There is additional testing of both the end-to-end
path and the truncator.

Informs cockroachdb#36262

Release note: None
  • Loading branch information
sumeerbhola committed Feb 23, 2022
1 parent e2cd026 commit 1345618
Show file tree
Hide file tree
Showing 13 changed files with 732 additions and 387 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,9 @@ func mergeCheckingTimestampCaches(
// the result to apply on the majority quorum.
testutils.SucceedsSoon(t, func() error {
for _, r := range lhsRepls[1:] {
// Loosely-coupled truncation requires an engine flush to advance
// guaranteed durability.
require.NoError(t, r.Engine().Flush())
firstIndex, err := r.GetFirstIndex()
require.NoError(t, err)
if firstIndex < truncIndex {
Expand Down Expand Up @@ -3980,6 +3983,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
if _, err := kv.SendWrapped(ctx, distSender, truncArgs); err != nil {
t.Fatal(err)
}
waitForTruncationForTesting(t, repl, index)
return index
}()

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func TestRaftLogQueue(t *testing.T) {
for i := range tc.Servers {
tc.GetFirstStoreFromServer(t, i).MustForceRaftLogScanAndProcess()
}
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, raftLeaderRepl.Engine().Flush())
// Ensure that firstIndex has increased indicating that the log
// truncation has occurred.
afterTruncationIndex, err = raftLeaderRepl.GetFirstIndex()
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,20 @@ func TestSnapshotAfterTruncation(t *testing.T) {
}
}

func waitForTruncationForTesting(t *testing.T, r *kvserver.Replica, newFirstIndex uint64) {
testutils.SucceedsSoon(t, func() error {
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, r.Engine().Flush())
// FirstIndex has changed.
firstIndex, err := r.GetFirstIndex()
require.NoError(t, err)
if firstIndex != newFirstIndex {
return errors.Errorf("expected firstIndex == %d, got %d", newFirstIndex, firstIndex)
}
return nil
})
}

// TestSnapshotAfterTruncationWithUncommittedTail is similar in spirit to
// TestSnapshotAfterTruncation/differentTerm. However, it differs in that we
// take care to ensure that the partitioned Replica has a long uncommitted tail
Expand Down Expand Up @@ -1009,6 +1023,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
}
return nil
})
waitForTruncationForTesting(t, newLeaderRepl, index+1)

snapsMetric := tc.GetFirstStoreFromServer(t, partStore).Metrics().RangeSnapshotsAppliedByVoters
snapsBefore := snapsMetric.Count()
Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,7 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time {
func isLooselyCoupledRaftLogTruncationEnabled(
ctx context.Context, settings *cluster.Settings,
) bool {
// TODO(sumeer): remove the false when hooking up the
// raftLogTruncator.durabilityAdvanced and fixing that method to do a
// durable read of RaftAppliedIndex.
return settings.Version.IsActive(
ctx, clusterversion.LooselyCoupledRaftLogTruncation) &&
looselyCoupledTruncationEnabled.Get(&settings.SV) && false
looselyCoupledTruncationEnabled.Get(&settings.SV)
}
203 changes: 112 additions & 91 deletions pkg/kv/kvserver/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
raft "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/tracker"
)
Expand Down Expand Up @@ -580,7 +581,7 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
{1, RaftLogQueueStaleSize},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
store, _ := createTestStore(ctx, t,
Expand All @@ -590,7 +591,8 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
createSystemRanges: false,
},
stopper)

st := store.ClusterSettings()
looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled)
// Note that turning off the replica scanner does not prevent the queues
// from processing entries (in this case specifically the raftLogQueue),
// just that the scanner will not try to push all replicas onto the queues.
Expand Down Expand Up @@ -618,6 +620,10 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
// fairly quickly, there is a slight race between this check and the
// truncation, especially when under stress.
testutils.SucceedsSoon(t, func() error {
if looselyCoupled {
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, store.engine.Flush())
}
newFirstIndex, err := r.GetFirstIndex()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -702,108 +708,105 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
func TestTruncateLog(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := testContext{}
ctx := context.Background()
cfg := TestStoreConfig(nil)
cfg.TestingKnobs.DisableRaftLogQueue = true
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(ctx, t, stopper, cfg)
testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) {
tc := testContext{}
ctx := context.Background()
cfg := TestStoreConfig(nil)
cfg.TestingKnobs.DisableRaftLogQueue = true
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(ctx, t, stopper, cfg)
st := tc.store.ClusterSettings()
looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled)

// Populate the log with 10 entries. Save the LastIndex after each write.
var indexes []uint64
for i := 0; i < 10; i++ {
args := incrementArgs([]byte("a"), int64(i))

if _, pErr := tc.SendWrapped(args); pErr != nil {
t.Fatal(pErr)
}
idx, err := tc.repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
indexes = append(indexes, idx)
}

// Populate the log with 10 entries. Save the LastIndex after each write.
var indexes []uint64
for i := 0; i < 10; i++ {
args := incrementArgs([]byte("a"), int64(i))
rangeID := tc.repl.RangeID

if _, pErr := tc.SendWrapped(args); pErr != nil {
// Discard the first half of the log.
truncateArgs := truncateLogArgs(indexes[5], rangeID)
if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil {
t.Fatal(pErr)
}
idx, err := tc.repl.GetLastIndex()

waitForTruncationForTesting(t, tc.repl, indexes[5], looselyCoupled)

// We can still get what remains of the log.
tc.repl.mu.Lock()
entries, err := tc.repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64)
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
indexes = append(indexes, idx)
}

rangeID := tc.repl.RangeID

// Discard the first half of the log.
truncateArgs := truncateLogArgs(indexes[5], rangeID)
if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil {
t.Fatal(pErr)
}

// FirstIndex has changed.
firstIndex, err := tc.repl.GetFirstIndex()
if err != nil {
t.Fatal(err)
}
if firstIndex != indexes[5] {
t.Errorf("expected firstIndex == %d, got %d", indexes[5], firstIndex)
}

// We can still get what remains of the log.
tc.repl.mu.Lock()
entries, err := tc.repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64)
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if len(entries) != int(indexes[9]-indexes[5]) {
t.Errorf("expected %d entries, got %d", indexes[9]-indexes[5], len(entries))
}
if len(entries) != int(indexes[9]-indexes[5]) {
t.Errorf("expected %d entries, got %d", indexes[9]-indexes[5], len(entries))
}

// But any range that includes the truncated entries returns an error.
tc.repl.mu.Lock()
_, err = tc.repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64)
tc.repl.mu.Unlock()
if !errors.Is(err, raft.ErrCompacted) {
t.Errorf("expected ErrCompacted, got %s", err)
}
// But any range that includes the truncated entries returns an error.
tc.repl.mu.Lock()
_, err = tc.repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64)
tc.repl.mu.Unlock()
if !errors.Is(err, raft.ErrCompacted) {
t.Errorf("expected ErrCompacted, got %s", err)
}

// The term of the last truncated entry is still available.
tc.repl.mu.Lock()
term, err := tc.repl.raftTermRLocked(indexes[4])
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if term == 0 {
t.Errorf("invalid term 0 for truncated entry")
}
// The term of the last truncated entry is still available.
tc.repl.mu.Lock()
term, err := tc.repl.raftTermRLocked(indexes[4])
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if term == 0 {
t.Errorf("invalid term 0 for truncated entry")
}

// The terms of older entries are gone.
tc.repl.mu.Lock()
_, err = tc.repl.raftTermRLocked(indexes[3])
tc.repl.mu.Unlock()
if !errors.Is(err, raft.ErrCompacted) {
t.Errorf("expected ErrCompacted, got %s", err)
}
// The terms of older entries are gone.
tc.repl.mu.Lock()
_, err = tc.repl.raftTermRLocked(indexes[3])
tc.repl.mu.Unlock()
if !errors.Is(err, raft.ErrCompacted) {
t.Errorf("expected ErrCompacted, got %s", err)
}

// Truncating logs that have already been truncated should not return an
// error.
truncateArgs = truncateLogArgs(indexes[3], rangeID)
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
t.Fatal(pErr)
}
// Truncating logs that have already been truncated should not return an
// error.
truncateArgs = truncateLogArgs(indexes[3], rangeID)
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
t.Fatal(pErr)
}

// Truncating logs that have the wrong rangeID included should not return
// an error but should not truncate any logs.
truncateArgs = truncateLogArgs(indexes[9], rangeID+1)
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
t.Fatal(pErr)
}
// Truncating logs that have the wrong rangeID included should not return
// an error but should not truncate any logs.
truncateArgs = truncateLogArgs(indexes[9], rangeID+1)
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
t.Fatal(pErr)
}

tc.repl.mu.Lock()
// The term of the last truncated entry is still available.
term, err = tc.repl.raftTermRLocked(indexes[4])
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if term == 0 {
t.Errorf("invalid term 0 for truncated entry")
}
tc.repl.mu.Lock()
// The term of the last truncated entry is still available.
term, err = tc.repl.raftTermRLocked(indexes[4])
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if term == 0 {
t.Errorf("invalid term 0 for truncated entry")
}
})
}

func TestRaftLogQueueShouldQueueRecompute(t *testing.T) {
Expand Down Expand Up @@ -913,3 +916,21 @@ func TestTruncateLogRecompute(t *testing.T) {
put() // make sure we remain trusted and in sync
}
}

func waitForTruncationForTesting(
t *testing.T, r *Replica, newFirstIndex uint64, looselyCoupled bool,
) {
testutils.SucceedsSoon(t, func() error {
if looselyCoupled {
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, r.Engine().Flush())
}
// FirstIndex has changed.
firstIndex, err := r.GetFirstIndex()
require.NoError(t, err)
if firstIndex != newFirstIndex {
return errors.Errorf("expected firstIndex == %d, got %d", newFirstIndex, firstIndex)
}
return nil
})
}
Loading

0 comments on commit 1345618

Please sign in to comment.