From 13456187352fca48914767f737a617a3b723682b Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 22 Feb 2022 14:09:39 -0500 Subject: [PATCH] kvserver: enable loosely coupled raft log truncation The const override that disabled loosely coupled truncation is removed. There is additional testing of both the end-to-end path and the truncator. Informs #36262 Release note: None --- pkg/kv/kvserver/client_merge_test.go | 4 + pkg/kv/kvserver/client_raft_log_queue_test.go | 2 + pkg/kv/kvserver/client_raft_test.go | 15 + pkg/kv/kvserver/raft_log_queue.go | 5 +- pkg/kv/kvserver/raft_log_queue_test.go | 203 +++++---- pkg/kv/kvserver/raft_log_truncator.go | 80 +++- pkg/kv/kvserver/raft_log_truncator_test.go | 23 +- .../replica_application_state_machine_test.go | 131 +++++- pkg/kv/kvserver/replica_rangefeed_test.go | 11 + pkg/kv/kvserver/replica_sideload_test.go | 113 ++--- pkg/kv/kvserver/replica_test.go | 430 +++++++++--------- pkg/kv/kvserver/store.go | 12 +- pkg/kv/kvserver/testdata/raft_log_truncator | 90 ++++ 13 files changed, 732 insertions(+), 387 deletions(-) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index af80bed240e4..c42254439d8a 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -741,6 +741,9 @@ func mergeCheckingTimestampCaches( // the result to apply on the majority quorum. testutils.SucceedsSoon(t, func() error { for _, r := range lhsRepls[1:] { + // Loosely-coupled truncation requires an engine flush to advance + // guaranteed durability. + require.NoError(t, r.Engine().Flush()) firstIndex, err := r.GetFirstIndex() require.NoError(t, err) if firstIndex < truncIndex { @@ -3980,6 +3983,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { if _, err := kv.SendWrapped(ctx, distSender, truncArgs); err != nil { t.Fatal(err) } + waitForTruncationForTesting(t, repl, index) return index }() diff --git a/pkg/kv/kvserver/client_raft_log_queue_test.go b/pkg/kv/kvserver/client_raft_log_queue_test.go index 195233a766dc..09a09b39fe50 100644 --- a/pkg/kv/kvserver/client_raft_log_queue_test.go +++ b/pkg/kv/kvserver/client_raft_log_queue_test.go @@ -96,6 +96,8 @@ func TestRaftLogQueue(t *testing.T) { for i := range tc.Servers { tc.GetFirstStoreFromServer(t, i).MustForceRaftLogScanAndProcess() } + // Flush the engine to advance durability, which triggers truncation. + require.NoError(t, raftLeaderRepl.Engine().Flush()) // Ensure that firstIndex has increased indicating that the log // truncation has occurred. afterTruncationIndex, err = raftLeaderRepl.GetFirstIndex() diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 21ad3b6b37af..757e576f98b6 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -831,6 +831,20 @@ func TestSnapshotAfterTruncation(t *testing.T) { } } +func waitForTruncationForTesting(t *testing.T, r *kvserver.Replica, newFirstIndex uint64) { + testutils.SucceedsSoon(t, func() error { + // Flush the engine to advance durability, which triggers truncation. + require.NoError(t, r.Engine().Flush()) + // FirstIndex has changed. + firstIndex, err := r.GetFirstIndex() + require.NoError(t, err) + if firstIndex != newFirstIndex { + return errors.Errorf("expected firstIndex == %d, got %d", newFirstIndex, firstIndex) + } + return nil + }) +} + // TestSnapshotAfterTruncationWithUncommittedTail is similar in spirit to // TestSnapshotAfterTruncation/differentTerm. However, it differs in that we // take care to ensure that the partitioned Replica has a long uncommitted tail @@ -1009,6 +1023,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { } return nil }) + waitForTruncationForTesting(t, newLeaderRepl, index+1) snapsMetric := tc.GetFirstStoreFromServer(t, partStore).Metrics().RangeSnapshotsAppliedByVoters snapsBefore := snapsMetric.Count() diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index 0ecbb3312565..9cf07ad296c4 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -744,10 +744,7 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time { func isLooselyCoupledRaftLogTruncationEnabled( ctx context.Context, settings *cluster.Settings, ) bool { - // TODO(sumeer): remove the false when hooking up the - // raftLogTruncator.durabilityAdvanced and fixing that method to do a - // durable read of RaftAppliedIndex. return settings.Version.IsActive( ctx, clusterversion.LooselyCoupledRaftLogTruncation) && - looselyCoupledTruncationEnabled.Get(&settings.SV) && false + looselyCoupledTruncationEnabled.Get(&settings.SV) } diff --git a/pkg/kv/kvserver/raft_log_queue_test.go b/pkg/kv/kvserver/raft_log_queue_test.go index 6c97113099f2..20adf8fad749 100644 --- a/pkg/kv/kvserver/raft_log_queue_test.go +++ b/pkg/kv/kvserver/raft_log_queue_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" raft "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/tracker" ) @@ -580,7 +581,7 @@ func TestProactiveRaftLogTruncate(t *testing.T) { {1, RaftLogQueueStaleSize}, } for _, c := range testCases { - t.Run("", func(t *testing.T) { + testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) { stopper := stop.NewStopper() defer stopper.Stop(ctx) store, _ := createTestStore(ctx, t, @@ -590,7 +591,8 @@ func TestProactiveRaftLogTruncate(t *testing.T) { createSystemRanges: false, }, stopper) - + st := store.ClusterSettings() + looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled) // Note that turning off the replica scanner does not prevent the queues // from processing entries (in this case specifically the raftLogQueue), // just that the scanner will not try to push all replicas onto the queues. @@ -618,6 +620,10 @@ func TestProactiveRaftLogTruncate(t *testing.T) { // fairly quickly, there is a slight race between this check and the // truncation, especially when under stress. testutils.SucceedsSoon(t, func() error { + if looselyCoupled { + // Flush the engine to advance durability, which triggers truncation. + require.NoError(t, store.engine.Flush()) + } newFirstIndex, err := r.GetFirstIndex() if err != nil { t.Fatal(err) @@ -702,108 +708,105 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { func TestTruncateLog(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - tc := testContext{} - ctx := context.Background() - cfg := TestStoreConfig(nil) - cfg.TestingKnobs.DisableRaftLogQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.StartWithStoreConfig(ctx, t, stopper, cfg) + testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) { + tc := testContext{} + ctx := context.Background() + cfg := TestStoreConfig(nil) + cfg.TestingKnobs.DisableRaftLogQueue = true + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(ctx, t, stopper, cfg) + st := tc.store.ClusterSettings() + looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled) + + // Populate the log with 10 entries. Save the LastIndex after each write. + var indexes []uint64 + for i := 0; i < 10; i++ { + args := incrementArgs([]byte("a"), int64(i)) + + if _, pErr := tc.SendWrapped(args); pErr != nil { + t.Fatal(pErr) + } + idx, err := tc.repl.GetLastIndex() + if err != nil { + t.Fatal(err) + } + indexes = append(indexes, idx) + } - // Populate the log with 10 entries. Save the LastIndex after each write. - var indexes []uint64 - for i := 0; i < 10; i++ { - args := incrementArgs([]byte("a"), int64(i)) + rangeID := tc.repl.RangeID - if _, pErr := tc.SendWrapped(args); pErr != nil { + // Discard the first half of the log. + truncateArgs := truncateLogArgs(indexes[5], rangeID) + if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil { t.Fatal(pErr) } - idx, err := tc.repl.GetLastIndex() + + waitForTruncationForTesting(t, tc.repl, indexes[5], looselyCoupled) + + // We can still get what remains of the log. + tc.repl.mu.Lock() + entries, err := tc.repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64) + tc.repl.mu.Unlock() if err != nil { t.Fatal(err) } - indexes = append(indexes, idx) - } - - rangeID := tc.repl.RangeID - - // Discard the first half of the log. - truncateArgs := truncateLogArgs(indexes[5], rangeID) - if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil { - t.Fatal(pErr) - } - - // FirstIndex has changed. - firstIndex, err := tc.repl.GetFirstIndex() - if err != nil { - t.Fatal(err) - } - if firstIndex != indexes[5] { - t.Errorf("expected firstIndex == %d, got %d", indexes[5], firstIndex) - } - - // We can still get what remains of the log. - tc.repl.mu.Lock() - entries, err := tc.repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64) - tc.repl.mu.Unlock() - if err != nil { - t.Fatal(err) - } - if len(entries) != int(indexes[9]-indexes[5]) { - t.Errorf("expected %d entries, got %d", indexes[9]-indexes[5], len(entries)) - } + if len(entries) != int(indexes[9]-indexes[5]) { + t.Errorf("expected %d entries, got %d", indexes[9]-indexes[5], len(entries)) + } - // But any range that includes the truncated entries returns an error. - tc.repl.mu.Lock() - _, err = tc.repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64) - tc.repl.mu.Unlock() - if !errors.Is(err, raft.ErrCompacted) { - t.Errorf("expected ErrCompacted, got %s", err) - } + // But any range that includes the truncated entries returns an error. + tc.repl.mu.Lock() + _, err = tc.repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64) + tc.repl.mu.Unlock() + if !errors.Is(err, raft.ErrCompacted) { + t.Errorf("expected ErrCompacted, got %s", err) + } - // The term of the last truncated entry is still available. - tc.repl.mu.Lock() - term, err := tc.repl.raftTermRLocked(indexes[4]) - tc.repl.mu.Unlock() - if err != nil { - t.Fatal(err) - } - if term == 0 { - t.Errorf("invalid term 0 for truncated entry") - } + // The term of the last truncated entry is still available. + tc.repl.mu.Lock() + term, err := tc.repl.raftTermRLocked(indexes[4]) + tc.repl.mu.Unlock() + if err != nil { + t.Fatal(err) + } + if term == 0 { + t.Errorf("invalid term 0 for truncated entry") + } - // The terms of older entries are gone. - tc.repl.mu.Lock() - _, err = tc.repl.raftTermRLocked(indexes[3]) - tc.repl.mu.Unlock() - if !errors.Is(err, raft.ErrCompacted) { - t.Errorf("expected ErrCompacted, got %s", err) - } + // The terms of older entries are gone. + tc.repl.mu.Lock() + _, err = tc.repl.raftTermRLocked(indexes[3]) + tc.repl.mu.Unlock() + if !errors.Is(err, raft.ErrCompacted) { + t.Errorf("expected ErrCompacted, got %s", err) + } - // Truncating logs that have already been truncated should not return an - // error. - truncateArgs = truncateLogArgs(indexes[3], rangeID) - if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil { - t.Fatal(pErr) - } + // Truncating logs that have already been truncated should not return an + // error. + truncateArgs = truncateLogArgs(indexes[3], rangeID) + if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil { + t.Fatal(pErr) + } - // Truncating logs that have the wrong rangeID included should not return - // an error but should not truncate any logs. - truncateArgs = truncateLogArgs(indexes[9], rangeID+1) - if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil { - t.Fatal(pErr) - } + // Truncating logs that have the wrong rangeID included should not return + // an error but should not truncate any logs. + truncateArgs = truncateLogArgs(indexes[9], rangeID+1) + if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil { + t.Fatal(pErr) + } - tc.repl.mu.Lock() - // The term of the last truncated entry is still available. - term, err = tc.repl.raftTermRLocked(indexes[4]) - tc.repl.mu.Unlock() - if err != nil { - t.Fatal(err) - } - if term == 0 { - t.Errorf("invalid term 0 for truncated entry") - } + tc.repl.mu.Lock() + // The term of the last truncated entry is still available. + term, err = tc.repl.raftTermRLocked(indexes[4]) + tc.repl.mu.Unlock() + if err != nil { + t.Fatal(err) + } + if term == 0 { + t.Errorf("invalid term 0 for truncated entry") + } + }) } func TestRaftLogQueueShouldQueueRecompute(t *testing.T) { @@ -913,3 +916,21 @@ func TestTruncateLogRecompute(t *testing.T) { put() // make sure we remain trusted and in sync } } + +func waitForTruncationForTesting( + t *testing.T, r *Replica, newFirstIndex uint64, looselyCoupled bool, +) { + testutils.SucceedsSoon(t, func() error { + if looselyCoupled { + // Flush the engine to advance durability, which triggers truncation. + require.NoError(t, r.Engine().Flush()) + } + // FirstIndex has changed. + firstIndex, err := r.GetFirstIndex() + require.NoError(t, err) + if firstIndex != newFirstIndex { + return errors.Errorf("expected firstIndex == %d, got %d", newFirstIndex, firstIndex) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/raft_log_truncator.go b/pkg/kv/kvserver/raft_log_truncator.go index f7e3621ed700..e4d0388d015d 100644 --- a/pkg/kv/kvserver/raft_log_truncator.go +++ b/pkg/kv/kvserver/raft_log_truncator.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -184,19 +185,29 @@ func (pt *pendingTruncation) firstIndexAfterTrunc() uint64 { // raftLogTruncator.mu, but the reverse is not permitted. type raftLogTruncator struct { - store storeForTruncator - mu struct { + ambientCtx context.Context + store storeForTruncator + stopper *stop.Stopper + mu struct { syncutil.Mutex // Ranges are queued into addRanges and batch dequeued by swapping with // drainRanges. This avoids holding mu for any work proportional to the // number of queued ranges. addRanges, drainRanges map[roachpb.RangeID]struct{} + // State for scheduling the goroutine for background enacting of + // truncations. + runningTruncation bool + queuedDurabilityCB bool } } -func makeRaftLogTruncator(store storeForTruncator) *raftLogTruncator { +func makeRaftLogTruncator( + ambientCtx log.AmbientContext, store storeForTruncator, stopper *stop.Stopper, +) *raftLogTruncator { t := &raftLogTruncator{ - store: store, + ambientCtx: ambientCtx.AnnotateCtx(context.Background()), + store: store, + stopper: stopper, } t.mu.addRanges = make(map[roachpb.RangeID]struct{}) t.mu.drainRanges = make(map[roachpb.RangeID]struct{}) @@ -363,9 +374,33 @@ func (r rangesByRangeID) Swap(i, j int) { // truncated index become durable in RangeAppliedState.RaftAppliedIndex. This // coarseness assumption is important for not wasting much work being done in // this method. -// TODO(sumeer): hook this up to the callback that will be invoked on the -// Store by the Engine (Pebble). Put this work on a separate goroutine of -// which there will be at most one running at a time. +// +// This method schedules the actual work for asynchronous execution as we need +// to return quickly, and not call into the engine since that could risk +// deadlock (see storage.Engine.RegisterFlushCompletedCallback). +func (t *raftLogTruncator) durabilityAdvancedCallback() { + t.mu.Lock() + runTruncation := false + if !t.mu.runningTruncation && len(t.mu.addRanges) > 0 { + runTruncation = true + t.mu.runningTruncation = true + } + if !runTruncation && len(t.mu.addRanges) > 0 { + t.mu.queuedDurabilityCB = true + } + t.mu.Unlock() + if runTruncation { + if err := t.stopper.RunAsyncTask( + t.ambientCtx, "raft-log-truncation", t.durabilityAdvanced); err != nil { + // Task did not run because stopper is stopped. + t.mu.Lock() + defer t.mu.Unlock() + t.mu.runningTruncation = false + } + } +} + +// Synchronously does the work to truncate the queued replicas. func (t *raftLogTruncator) durabilityAdvanced(ctx context.Context) { t.mu.Lock() t.mu.addRanges, t.mu.drainRanges = t.mu.drainRanges, t.mu.addRanges @@ -390,15 +425,28 @@ func (t *raftLogTruncator) durabilityAdvanced(ctx context.Context) { // Sort it for deterministic testing output. sort.Sort(rangesByRangeID(ranges)) // Create an engine Reader to provide a safe lower bound on what is durable. - // - // TODO(sumeer): This is incorrect -- change this reader to only read - // durable state after merging - // https://github.com/cockroachdb/pebble/pull/1490 and incorporating into - // CockroachDB. - reader := t.store.getEngine().NewReadOnly(storage.StandardDurability) + reader := t.store.getEngine().NewReadOnly(storage.GuaranteedDurability) defer reader.Close() + shouldQuiesce := t.stopper.ShouldQuiesce() + quiesced := false for _, rangeID := range ranges { t.tryEnactTruncations(ctx, rangeID, reader) + select { + case <-shouldQuiesce: + quiesced = true + default: + } + if quiesced { + break + } + } + t.mu.Lock() + t.mu.runningTruncation = false + runCB := t.mu.queuedDurabilityCB + t.mu.queuedDurabilityCB = false + t.mu.Unlock() + if runCB { + t.durabilityAdvancedCallback() } } @@ -528,7 +576,11 @@ func (t *raftLogTruncator) tryEnactTruncations( r.setTruncationDeltaAndTrusted(trunc.logDeltaBytes, isDeltaTrusted) }) // Now remove the enacted truncations. It is the same iteration as the - // previous one, but we do it while holding pendingTruncs.mu. + // previous one, but we do it while holding pendingTruncs.mu. Note that + // since we have updated the raft log size but not yet removed the pending + // truncations, a concurrent thread could race and compute a lower post + // truncation size. We ignore this race since it seems harmless, and closing + // it requires a more complicated replicaForTruncator interface. pendingTruncs.mu.Lock() for i := 0; i <= enactIndex; i++ { pendingTruncs.popLocked() diff --git a/pkg/kv/kvserver/raft_log_truncator_test.go b/pkg/kv/kvserver/raft_log_truncator_test.go index 126da01307bf..274ab02bff19 100644 --- a/pkg/kv/kvserver/raft_log_truncator_test.go +++ b/pkg/kv/kvserver/raft_log_truncator_test.go @@ -26,6 +26,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -179,17 +181,15 @@ func (r *replicaTruncatorTest) writeRaftStateToEngine( } func (r *replicaTruncatorTest) writeRaftAppliedIndex( - t *testing.T, eng storage.Engine, raftAppliedIndex uint64, + t *testing.T, eng storage.Engine, raftAppliedIndex uint64, flush bool, ) { require.NoError(t, r.stateLoader.SetRangeAppliedState(context.Background(), eng, raftAppliedIndex, 0, 0, &enginepb.MVCCStats{}, nil)) // Flush to make it satisfy the contract of OnlyReadGuaranteedDurable in // Pebble. - // TODO(sumeer): by controlling the size of the memtable we can probably - // construct a deterministic test where a flush does not happen, and we can - // test that raftLogTruncator is actually reading only the durable - // RaftAppliedIndex. - require.NoError(t, eng.Flush()) + if flush { + require.NoError(t, eng.Flush()) + } } func (r *replicaTruncatorTest) printEngine(t *testing.T, eng storage.Engine) { @@ -283,7 +283,10 @@ func TestRaftLogTruncator(t *testing.T) { eng := storage.NewDefaultInMemForTesting() defer eng.Close() store := makeStoreTT(eng, &buf) - truncator := makeRaftLogTruncator(store) + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + truncator := makeRaftLogTruncator( + log.MakeTestingAmbientContext(tracing.NewTracer()), store, stopper) datadriven.RunTest(t, testutils.TestDataPath(t, "raft_log_truncator"), func(t *testing.T, d *datadriven.TestData) string { @@ -335,7 +338,11 @@ func TestRaftLogTruncator(t *testing.T) { rangeID := scanRangeID(t, d) var raftAppliedIndex uint64 d.ScanArgs(t, "raft-applied-index", &raftAppliedIndex) - store.replicas[rangeID].writeRaftAppliedIndex(t, eng, raftAppliedIndex) + noFlush := false + if d.HasArg("no-flush") { + d.ScanArgs(t, "no-flush", &noFlush) + } + store.replicas[rangeID].writeRaftAppliedIndex(t, eng, raftAppliedIndex, !noFlush) return flushAndReset() case "add-replica-to-truncator": diff --git a/pkg/kv/kvserver/replica_application_state_machine_test.go b/pkg/kv/kvserver/replica_application_state_machine_test.go index c949daa90353..96d932947da4 100644 --- a/pkg/kv/kvserver/replica_application_state_machine_test.go +++ b/pkg/kv/kvserver/replica_application_state_machine_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3/raftpb" ) @@ -147,11 +148,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) { }) } -// TODO(sumeer): when isLooselyCoupledRaftLogTruncationEnabled can return -// true, add a test that queues up a pending truncation and then cause it to -// get enacted by calling raftLogTruncator.durabilityAdvanced. - -func TestReplicaStateMachineRaftLogTruncation(t *testing.T) { +func TestReplicaStateMachineRaftLogTruncationStronglyCoupled(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) testutils.RunTrueAndFalse(t, "accurate first index", func(t *testing.T, accurate bool) { @@ -160,6 +157,8 @@ func TestReplicaStateMachineRaftLogTruncation(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) tc.Start(ctx, t, stopper) + st := tc.store.ClusterSettings() + looselyCoupledTruncationEnabled.Override(ctx, &st.SV, false) // Lock the replica for the entire test. r := tc.repl @@ -240,3 +239,125 @@ func TestReplicaStateMachineRaftLogTruncation(t *testing.T) { }() }) } + +func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testutils.RunTrueAndFalse(t, "accurate first index", func(t *testing.T, accurate bool) { + tc := testContext{} + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) + st := tc.store.ClusterSettings() + looselyCoupledTruncationEnabled.Override(ctx, &st.SV, true) + // Remove the flush completed callback since we don't want a + // non-deterministic flush to cause the test to fail. + tc.store.engine.RegisterFlushCompletedCallback(func() {}) + r := tc.repl + r.mu.Lock() + raftAppliedIndex := r.mu.state.RaftAppliedIndex + truncatedIndex := r.mu.state.TruncatedState.Index + raftLogSize := r.mu.raftLogSize + // Overwrite to be trusted, since we want to check if transitions to false + // or not. + r.mu.raftLogSizeTrusted = true + r.mu.Unlock() + expectedFirstIndex := truncatedIndex + 1 + if !accurate { + expectedFirstIndex = truncatedIndex + } + + // Enqueue the truncation. + func() { + // Lock the replica. + r.raftMu.Lock() + defer r.raftMu.Unlock() + sm := r.getStateMachine() + + // Create a new application batch. + b := sm.NewBatch(false /* ephemeral */).(*replicaAppBatch) + defer b.Close() + // Stage a command that truncates one raft log entry which we pretend has a + // byte size of 1. + cmd := &replicatedCmd{ + ctx: ctx, + ent: &raftpb.Entry{ + Index: raftAppliedIndex + 1, + Type: raftpb.EntryNormal, + }, + decodedRaftEntry: decodedRaftEntry{ + idKey: makeIDKey(), + raftCmd: kvserverpb.RaftCommand{ + ProposerLeaseSequence: r.mu.state.Lease.Sequence, + MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1, + ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ + State: &kvserverpb.ReplicaState{ + TruncatedState: &roachpb.RaftTruncatedState{ + Index: truncatedIndex + 1, + }, + }, + RaftLogDelta: -1, + RaftExpectedFirstIndex: expectedFirstIndex, + WriteTimestamp: r.mu.state.GCThreshold.Add(1, 0), + }, + }, + }, + } + + checkedCmd, err := b.Stage(cmd.ctx, cmd) + require.NoError(t, err) + + // Apply the batch to the StateMachine. + err = b.ApplyToStateMachine(ctx) + require.NoError(t, err) + + // Apply the side effects of the command to the StateMachine. + _, err = sm.ApplySideEffects(checkedCmd.Ctx(), checkedCmd) + require.NoError(t, err) + func() { + r.mu.Lock() + defer r.mu.Unlock() + require.Equal(t, raftAppliedIndex+1, r.mu.state.RaftAppliedIndex) + // No truncation. + require.Equal(t, truncatedIndex, r.mu.state.TruncatedState.Index) + require.True(t, r.mu.raftLogSizeTrusted) + }() + require.False(t, r.pendingLogTruncations.isEmptyLocked()) + trunc := r.pendingLogTruncations.frontLocked() + require.Equal(t, truncatedIndex+1, trunc.Index) + require.Equal(t, expectedFirstIndex, trunc.expectedFirstIndex) + require.EqualValues(t, -1, trunc.logDeltaBytes) + require.True(t, trunc.isDeltaTrusted) + }() + require.NoError(t, tc.store.Engine().Flush()) + // Asynchronous call to advance durability. + tc.store.raftTruncator.durabilityAdvancedCallback() + expectedSize := raftLogSize - 1 + // We typically have a raftLogSize > 0 (based on inspecting some test + // runs), but we can't be sure. + if expectedSize < 0 { + expectedSize = 0 + } + // Wait until async truncation is done. + testutils.SucceedsSoon(t, func() error { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.state.TruncatedState.Index != truncatedIndex+1 { + return errors.Errorf("not truncated") + } + if r.mu.raftLogSize != expectedSize { + return errors.Errorf("not truncated") + } + if accurate != r.mu.raftLogSizeTrusted { + return errors.Errorf("not truncated") + } + r.pendingLogTruncations.mu.Lock() + defer r.pendingLogTruncations.mu.Unlock() + if !r.pendingLogTruncations.isEmptyLocked() { + return errors.Errorf("not truncated") + } + return nil + }) + }) +} diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 94b35e399fe0..3f61d63387d8 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -778,6 +778,10 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { if err != nil { t.Fatal(err) } + secondStore, err := tc.Servers[1].Stores().GetStore(tc.Servers[1].GetFirstStoreID()) + if err != nil { + t.Fatal(err) + } for _, server := range tc.Servers { store, err := server.Stores().GetStore(server.GetFirstStoreID()) @@ -854,6 +858,13 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { if _, err := kv.SendWrapped(ctx, firstStore.TestSender(), truncArgs); err != nil { t.Fatal(err) } + for _, store := range []*kvserver.Store{firstStore, secondStore} { + _, err := store.GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + waitForTruncationForTesting(t, repl, index+1) + } // Remove the partition. Snapshot should follow. partitionStore.Transport().Listen(partitionStore.Ident.StoreID, &unreliableRaftHandler{ diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index 9651d312ef5c..6ad62cc2cfa8 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -804,65 +804,70 @@ func TestRaftSSTableSideloadingTruncation(t *testing.T) { defer log.Scope(t).Close(t) defer SetMockAddSSTable()() - tc := testContext{} - stopper := stop.NewStopper() - ctx := context.Background() - defer stopper.Stop(ctx) - tc.Start(ctx, t, stopper) + testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) { + tc := testContext{} + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) + st := tc.store.ClusterSettings() + looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled) - const count = 10 + const count = 10 - var indexes []uint64 - addLastIndex := func() { - lastIndex, err := tc.repl.GetLastIndex() - if err != nil { - t.Fatal(err) + var indexes []uint64 + addLastIndex := func() { + lastIndex, err := tc.repl.GetLastIndex() + if err != nil { + t.Fatal(err) + } + indexes = append(indexes, lastIndex) + } + for i := 0; i < count; i++ { + addLastIndex() + key := fmt.Sprintf("key-%d", i) + val := fmt.Sprintf("val-%d", i) + if err := ProposeAddSSTable(ctx, key, val, tc.Clock().Now(), tc.store); err != nil { + t.Fatalf("%d: %+v", i, err) + } } - indexes = append(indexes, lastIndex) - } - for i := 0; i < count; i++ { + // Append an extra entry which, if we truncate it, should definitely also + // remove any leftover files (ok, unless the last one is reproposed but + // that's *very* unlikely to happen for the last one) addLastIndex() - key := fmt.Sprintf("key-%d", i) - val := fmt.Sprintf("val-%d", i) - if err := ProposeAddSSTable(ctx, key, val, tc.Clock().Now(), tc.store); err != nil { - t.Fatalf("%d: %+v", i, err) - } - } - // Append an extra entry which, if we truncate it, should definitely also - // remove any leftover files (ok, unless the last one is reproposed but - // that's *very* unlikely to happen for the last one) - addLastIndex() - fmtSideloaded := func() []string { - tc.repl.raftMu.Lock() - defer tc.repl.raftMu.Unlock() - fs, _ := tc.repl.Engine().List(tc.repl.raftMu.sideloaded.Dir()) - sort.Strings(fs) - return fs - } - - // Check that when we truncate, the number of on-disk files changes in ways - // we expect. Intentionally not too strict due to the possibility of - // reproposals, etc; it could be made stricter, but this should give enough - // confidence already that we're calling `PurgeTo` correctly, and for the - // remainder unit testing on each impl's PurgeTo is more useful. - for i := range indexes { - const rangeID = 1 - newFirstIndex := indexes[i] + 1 - truncateArgs := truncateLogArgs(newFirstIndex, rangeID) - log.Eventf(ctx, "truncating to index < %d", newFirstIndex) - if _, pErr := kv.SendWrappedWith(ctx, tc.Sender(), roachpb.Header{RangeID: rangeID}, &truncateArgs); pErr != nil { - t.Fatal(pErr) - } - sideloadStrings := fmtSideloaded() - if minFiles := count - i; len(sideloadStrings) < minFiles { - t.Fatalf("after truncation at %d (i=%d), expected at least %d files left, but have:\n%v", - indexes[i], i, minFiles, sideloadStrings) + fmtSideloaded := func() []string { + tc.repl.raftMu.Lock() + defer tc.repl.raftMu.Unlock() + fs, _ := tc.repl.Engine().List(tc.repl.raftMu.sideloaded.Dir()) + sort.Strings(fs) + return fs + } + + // Check that when we truncate, the number of on-disk files changes in ways + // we expect. Intentionally not too strict due to the possibility of + // reproposals, etc; it could be made stricter, but this should give enough + // confidence already that we're calling `PurgeTo` correctly, and for the + // remainder unit testing on each impl's PurgeTo is more useful. + for i := range indexes { + const rangeID = 1 + newFirstIndex := indexes[i] + 1 + truncateArgs := truncateLogArgs(newFirstIndex, rangeID) + log.Eventf(ctx, "truncating to index < %d", newFirstIndex) + if _, pErr := kv.SendWrappedWith(ctx, tc.Sender(), roachpb.Header{RangeID: rangeID}, &truncateArgs); pErr != nil { + t.Fatal(pErr) + } + waitForTruncationForTesting(t, tc.repl, newFirstIndex, looselyCoupled) + // Truncation done, so check sideloaded files. + sideloadStrings := fmtSideloaded() + if minFiles := count - i; len(sideloadStrings) < minFiles { + t.Fatalf("after truncation at %d (i=%d), expected at least %d files left, but have:\n%v", + indexes[i], i, minFiles, sideloadStrings) + } } - } - - if sideloadStrings := fmtSideloaded(); len(sideloadStrings) != 0 { - t.Fatalf("expected all files to be cleaned up, but found %v", sideloadStrings) - } + if sideloadStrings := fmtSideloaded(); len(sideloadStrings) != 0 { + t.Fatalf("expected all files to be cleaned up, but found %v", sideloadStrings) + } + }) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index f25851e85496..c2cf64de2960 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -7171,244 +7171,256 @@ func TestQuotaPoolAccessOnDestroyedReplica(t *testing.T) { func TestEntries(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - tc := testContext{} - cfg := TestStoreConfig(nil) - // Disable ticks to avoid quiescence, which can result in empty - // entries being proposed and causing the test to flake. - cfg.RaftTickInterval = math.MaxInt32 - cfg.TestingKnobs.DisableRaftLogQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.StartWithStoreConfig(ctx, t, stopper, cfg) - repl := tc.repl - rangeID := repl.RangeID - var indexes []uint64 - - populateLogs := func(from, to int) []uint64 { - var newIndexes []uint64 - for i := from; i < to; i++ { - args := incrementArgs([]byte("a"), int64(i)) - if _, pErr := tc.SendWrapped(args); pErr != nil { - t.Fatal(pErr) - } - idx, err := repl.GetLastIndex() - if err != nil { - t.Fatal(err) + testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) { + ctx := context.Background() + tc := testContext{} + cfg := TestStoreConfig(nil) + // Disable ticks to avoid quiescence, which can result in empty + // entries being proposed and causing the test to flake. + cfg.RaftTickInterval = math.MaxInt32 + cfg.TestingKnobs.DisableRaftLogQueue = true + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(ctx, t, stopper, cfg) + st := tc.store.ClusterSettings() + looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled) + + repl := tc.repl + rangeID := repl.RangeID + var indexes []uint64 + + populateLogs := func(from, to int) []uint64 { + var newIndexes []uint64 + for i := from; i < to; i++ { + args := incrementArgs([]byte("a"), int64(i)) + if _, pErr := tc.SendWrapped(args); pErr != nil { + t.Fatal(pErr) + } + idx, err := repl.GetLastIndex() + if err != nil { + t.Fatal(err) + } + newIndexes = append(newIndexes, idx) } - newIndexes = append(newIndexes, idx) + return newIndexes } - return newIndexes - } - truncateLogs := func(index int) { - truncateArgs := truncateLogArgs(indexes[index], rangeID) - if _, err := kv.SendWrappedWith( - ctx, - tc.Sender(), - roachpb.Header{RangeID: 1}, - &truncateArgs, - ); err != nil { - t.Fatal(err) + truncateLogs := func(index int) { + truncateArgs := truncateLogArgs(indexes[index], rangeID) + if _, err := kv.SendWrappedWith( + ctx, + tc.Sender(), + roachpb.Header{RangeID: 1}, + &truncateArgs, + ); err != nil { + t.Fatal(err) + } + waitForTruncationForTesting(t, repl, indexes[index], looselyCoupled) } - } - // Populate the log with 10 entries. Save the LastIndex after each write. - indexes = append(indexes, populateLogs(0, 10)...) + // Populate the log with 10 entries. Save the LastIndex after each write. + indexes = append(indexes, populateLogs(0, 10)...) - for i, tc := range []struct { - lo uint64 - hi uint64 - maxBytes uint64 - expResultCount int - expCacheCount int - expError error - // Setup, if not nil, is called before running the test case. - setup func() - }{ - // Case 0: All of the entries from cache. - {lo: indexes[0], hi: indexes[9] + 1, expResultCount: 10, expCacheCount: 10, setup: nil}, - // Case 1: Get the first entry from cache. - {lo: indexes[0], hi: indexes[1], expResultCount: 1, expCacheCount: 1, setup: nil}, - // Case 2: Get the last entry from cache. - {lo: indexes[9], hi: indexes[9] + 1, expResultCount: 1, expCacheCount: 1, setup: nil}, - // Case 3: lo is available, but hi is not, cache miss. - {lo: indexes[9], hi: indexes[9] + 2, expCacheCount: 1, expError: raft.ErrUnavailable, setup: nil}, - - // Case 4: Just most of the entries from cache. - {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: func() { - // Discard the first half of the log. - truncateLogs(5) - }}, - // Case 5: Get a single entry from cache. - {lo: indexes[5], hi: indexes[6], expResultCount: 1, expCacheCount: 1, setup: nil}, - // Case 6: Get range without size limitation. (Like case 4, without truncating). - {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: nil}, - // Case 7: maxBytes is set low so only a single value should be - // returned. - {lo: indexes[5], hi: indexes[9], maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil}, - // Case 8: hi value is just past the last index, should return all - // available entries. - {lo: indexes[5], hi: indexes[9] + 1, expResultCount: 5, expCacheCount: 5, setup: nil}, - // Case 9: all values have been truncated from cache and storage. - {lo: indexes[1], hi: indexes[2], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 10: hi has just been truncated from cache and storage. - {lo: indexes[1], hi: indexes[4], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 11: another case where hi has just been truncated from - // cache and storage. - {lo: indexes[3], hi: indexes[4], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 12: lo has been truncated and hi is the truncation point. - {lo: indexes[4], hi: indexes[5], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 13: lo has been truncated but hi is available. - {lo: indexes[4], hi: indexes[9], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 14: lo has been truncated and hi is not available. - {lo: indexes[4], hi: indexes[9] + 100, expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 15: lo has been truncated but hi is available, and maxBytes is - // set low. - {lo: indexes[4], hi: indexes[9], maxBytes: 1, expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, - // Case 16: lo is available but hi is not. - {lo: indexes[5], hi: indexes[9] + 100, expCacheCount: 6, expError: raft.ErrUnavailable, setup: nil}, - // Case 17: both lo and hi are not available, cache miss. - {lo: indexes[9] + 100, hi: indexes[9] + 1000, expCacheCount: 0, expError: raft.ErrUnavailable, setup: nil}, - // Case 18: lo is available, hi is not, but it was cut off by maxBytes. - {lo: indexes[5], hi: indexes[9] + 1000, maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil}, - // Case 19: lo and hi are available, but entry cache evicted. - {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 0, setup: func() { - // Manually evict cache for the first 10 log entries. - repl.store.raftEntryCache.Clear(rangeID, indexes[9]+1) - indexes = append(indexes, populateLogs(10, 40)...) - }}, - // Case 20: lo and hi are available, entry cache evicted and hi available in cache. - {lo: indexes[5], hi: indexes[9] + 5, expResultCount: 9, expCacheCount: 0, setup: nil}, - // Case 21: lo and hi are available and in entry cache. - {lo: indexes[9] + 2, hi: indexes[9] + 32, expResultCount: 30, expCacheCount: 30, setup: nil}, - // Case 22: lo is available and hi is not. - {lo: indexes[9] + 2, hi: indexes[9] + 33, expCacheCount: 30, expError: raft.ErrUnavailable, setup: nil}, - } { - if tc.setup != nil { - tc.setup() - } - if tc.maxBytes == 0 { - tc.maxBytes = math.MaxUint64 - } - cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.Scan(nil, rangeID, tc.lo, tc.hi, tc.maxBytes) - if len(cacheEntries) != tc.expCacheCount { - t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries)) - } - repl.mu.Lock() - ents, err := repl.raftEntriesLocked(tc.lo, tc.hi, tc.maxBytes) - repl.mu.Unlock() - if tc.expError == nil && err != nil { - t.Errorf("%d: expected no error, got %s", i, err) - continue - } else if !errors.Is(err, tc.expError) { - t.Errorf("%d: expected error %s, got %s", i, tc.expError, err) - continue - } - if len(ents) != tc.expResultCount { - t.Errorf("%d: expected %d entries, got %d", i, tc.expResultCount, len(ents)) - } else if tc.expResultCount > 0 { - expHitLimit := ents[len(ents)-1].Index < tc.hi-1 - if hitLimit != expHitLimit { - t.Errorf("%d: unexpected hit limit: %t", i, hitLimit) + for i, tc := range []struct { + lo uint64 + hi uint64 + maxBytes uint64 + expResultCount int + expCacheCount int + expError error + // Setup, if not nil, is called before running the test case. + setup func() + }{ + // Case 0: All of the entries from cache. + {lo: indexes[0], hi: indexes[9] + 1, expResultCount: 10, expCacheCount: 10, setup: nil}, + // Case 1: Get the first entry from cache. + {lo: indexes[0], hi: indexes[1], expResultCount: 1, expCacheCount: 1, setup: nil}, + // Case 2: Get the last entry from cache. + {lo: indexes[9], hi: indexes[9] + 1, expResultCount: 1, expCacheCount: 1, setup: nil}, + // Case 3: lo is available, but hi is not, cache miss. + {lo: indexes[9], hi: indexes[9] + 2, expCacheCount: 1, expError: raft.ErrUnavailable, setup: nil}, + + // Case 4: Just most of the entries from cache. + {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: func() { + // Discard the first half of the log. + truncateLogs(5) + }}, + // Case 5: Get a single entry from cache. + {lo: indexes[5], hi: indexes[6], expResultCount: 1, expCacheCount: 1, setup: nil}, + // Case 6: Get range without size limitation. (Like case 4, without truncating). + {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: nil}, + // Case 7: maxBytes is set low so only a single value should be + // returned. + {lo: indexes[5], hi: indexes[9], maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil}, + // Case 8: hi value is just past the last index, should return all + // available entries. + {lo: indexes[5], hi: indexes[9] + 1, expResultCount: 5, expCacheCount: 5, setup: nil}, + // Case 9: all values have been truncated from cache and storage. + {lo: indexes[1], hi: indexes[2], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, + // Case 10: hi has just been truncated from cache and storage. + {lo: indexes[1], hi: indexes[4], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, + // Case 11: another case where hi has just been truncated from + // cache and storage. + {lo: indexes[3], hi: indexes[4], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, + // Case 12: lo has been truncated and hi is the truncation point. + {lo: indexes[4], hi: indexes[5], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, + // Case 13: lo has been truncated but hi is available. + {lo: indexes[4], hi: indexes[9], expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, + // Case 14: lo has been truncated and hi is not available. + {lo: indexes[4], hi: indexes[9] + 100, expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, + // Case 15: lo has been truncated but hi is available, and maxBytes is + // set low. + {lo: indexes[4], hi: indexes[9], maxBytes: 1, expCacheCount: 0, expError: raft.ErrCompacted, setup: nil}, + // Case 16: lo is available but hi is not. + {lo: indexes[5], hi: indexes[9] + 100, expCacheCount: 6, expError: raft.ErrUnavailable, setup: nil}, + // Case 17: both lo and hi are not available, cache miss. + {lo: indexes[9] + 100, hi: indexes[9] + 1000, expCacheCount: 0, expError: raft.ErrUnavailable, setup: nil}, + // Case 18: lo is available, hi is not, but it was cut off by maxBytes. + {lo: indexes[5], hi: indexes[9] + 1000, maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil}, + // Case 19: lo and hi are available, but entry cache evicted. + {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 0, setup: func() { + // Manually evict cache for the first 10 log entries. + repl.store.raftEntryCache.Clear(rangeID, indexes[9]+1) + indexes = append(indexes, populateLogs(10, 40)...) + }}, + // Case 20: lo and hi are available, entry cache evicted and hi available in cache. + {lo: indexes[5], hi: indexes[9] + 5, expResultCount: 9, expCacheCount: 0, setup: nil}, + // Case 21: lo and hi are available and in entry cache. + {lo: indexes[9] + 2, hi: indexes[9] + 32, expResultCount: 30, expCacheCount: 30, setup: nil}, + // Case 22: lo is available and hi is not. + {lo: indexes[9] + 2, hi: indexes[9] + 33, expCacheCount: 30, expError: raft.ErrUnavailable, setup: nil}, + } { + if tc.setup != nil { + tc.setup() + } + if tc.maxBytes == 0 { + tc.maxBytes = math.MaxUint64 + } + cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.Scan(nil, rangeID, tc.lo, tc.hi, tc.maxBytes) + if len(cacheEntries) != tc.expCacheCount { + t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries)) + } + repl.mu.Lock() + ents, err := repl.raftEntriesLocked(tc.lo, tc.hi, tc.maxBytes) + repl.mu.Unlock() + if tc.expError == nil && err != nil { + t.Errorf("%d: expected no error, got %s", i, err) + continue + } else if !errors.Is(err, tc.expError) { + t.Errorf("%d: expected error %s, got %s", i, tc.expError, err) + continue + } + if len(ents) != tc.expResultCount { + t.Errorf("%d: expected %d entries, got %d", i, tc.expResultCount, len(ents)) + } else if tc.expResultCount > 0 { + expHitLimit := ents[len(ents)-1].Index < tc.hi-1 + if hitLimit != expHitLimit { + t.Errorf("%d: unexpected hit limit: %t", i, hitLimit) + } } } - } - // Case 23: Lo must be less than or equal to hi. - repl.mu.Lock() - if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], math.MaxUint64); err == nil { - t.Errorf("23: error expected, got none") - } - repl.mu.Unlock() + // Case 23: Lo must be less than or equal to hi. + repl.mu.Lock() + if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], math.MaxUint64); err == nil { + t.Errorf("23: error expected, got none") + } + repl.mu.Unlock() + }) } func TestTerm(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - tc := testContext{} - tsc := TestStoreConfig(nil) - tsc.TestingKnobs.DisableRaftLogQueue = true - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.StartWithStoreConfig(ctx, t, stopper, tsc) - repl := tc.repl - rangeID := repl.RangeID + testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) { + ctx := context.Background() + tc := testContext{} + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.DisableRaftLogQueue = true + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(ctx, t, stopper, tsc) + st := tc.store.ClusterSettings() + looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled) - // Populate the log with 10 entries. Save the LastIndex after each write. - var indexes []uint64 - for i := 0; i < 10; i++ { - args := incrementArgs([]byte("a"), int64(i)) + repl := tc.repl + rangeID := repl.RangeID + + // Populate the log with 10 entries. Save the LastIndex after each write. + var indexes []uint64 + for i := 0; i < 10; i++ { + args := incrementArgs([]byte("a"), int64(i)) + + if _, pErr := tc.SendWrapped(args); pErr != nil { + t.Fatal(pErr) + } + idx, err := tc.repl.GetLastIndex() + if err != nil { + t.Fatal(err) + } + indexes = append(indexes, idx) + } - if _, pErr := tc.SendWrapped(args); pErr != nil { + // Discard the first half of the log. + truncateArgs := truncateLogArgs(indexes[5], rangeID) + if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil { t.Fatal(pErr) } - idx, err := tc.repl.GetLastIndex() + waitForTruncationForTesting(t, repl, indexes[5], looselyCoupled) + + repl.mu.Lock() + defer repl.mu.Unlock() + + firstIndex, err := repl.raftFirstIndexLocked() if err != nil { t.Fatal(err) } - indexes = append(indexes, idx) - } - - // Discard the first half of the log. - truncateArgs := truncateLogArgs(indexes[5], rangeID) - if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil { - t.Fatal(pErr) - } - - repl.mu.Lock() - defer repl.mu.Unlock() - - firstIndex, err := repl.raftFirstIndexLocked() - if err != nil { - t.Fatal(err) - } - if firstIndex != indexes[5] { - t.Fatalf("expected firstIndex %d to be %d", firstIndex, indexes[4]) - } + if firstIndex != indexes[5] { + t.Fatalf("expected firstIndex %d to be %d", firstIndex, indexes[4]) + } - // Truncated logs should return an ErrCompacted error. - if _, err := tc.repl.raftTermRLocked(indexes[1]); !errors.Is(err, raft.ErrCompacted) { - t.Errorf("expected ErrCompacted, got %s", err) - } - if _, err := tc.repl.raftTermRLocked(indexes[3]); !errors.Is(err, raft.ErrCompacted) { - t.Errorf("expected ErrCompacted, got %s", err) - } + // Truncated logs should return an ErrCompacted error. + if _, err := tc.repl.raftTermRLocked(indexes[1]); !errors.Is(err, raft.ErrCompacted) { + t.Errorf("expected ErrCompacted, got %s", err) + } + if _, err := tc.repl.raftTermRLocked(indexes[3]); !errors.Is(err, raft.ErrCompacted) { + t.Errorf("expected ErrCompacted, got %s", err) + } - // FirstIndex-1 should return the term of firstIndex. - firstIndexTerm, err := tc.repl.raftTermRLocked(firstIndex) - if err != nil { - t.Errorf("expect no error, got %s", err) - } + // FirstIndex-1 should return the term of firstIndex. + firstIndexTerm, err := tc.repl.raftTermRLocked(firstIndex) + if err != nil { + t.Errorf("expect no error, got %s", err) + } - term, err := tc.repl.raftTermRLocked(indexes[4]) - if err != nil { - t.Errorf("expect no error, got %s", err) - } - if term != firstIndexTerm { - t.Errorf("expected firstIndex-1's term:%d to equal that of firstIndex:%d", term, firstIndexTerm) - } + term, err := tc.repl.raftTermRLocked(indexes[4]) + if err != nil { + t.Errorf("expect no error, got %s", err) + } + if term != firstIndexTerm { + t.Errorf("expected firstIndex-1's term:%d to equal that of firstIndex:%d", term, firstIndexTerm) + } - lastIndex, err := repl.raftLastIndexLocked() - if err != nil { - t.Fatal(err) - } + lastIndex, err := repl.raftLastIndexLocked() + if err != nil { + t.Fatal(err) + } - // Last index should return correctly. - if _, err := tc.repl.raftTermRLocked(lastIndex); err != nil { - t.Errorf("expected no error, got %s", err) - } + // Last index should return correctly. + if _, err := tc.repl.raftTermRLocked(lastIndex); err != nil { + t.Errorf("expected no error, got %s", err) + } - // Terms for after the last index should return ErrUnavailable. - if _, err := tc.repl.raftTermRLocked(lastIndex + 1); !errors.Is(err, raft.ErrUnavailable) { - t.Errorf("expected ErrUnavailable, got %s", err) - } - if _, err := tc.repl.raftTermRLocked(indexes[9] + 1000); !errors.Is(err, raft.ErrUnavailable) { - t.Errorf("expected ErrUnavailable, got %s", err) - } + // Terms for after the last index should return ErrUnavailable. + if _, err := tc.repl.raftTermRLocked(lastIndex + 1); !errors.Is(err, raft.ErrUnavailable) { + t.Errorf("expected ErrUnavailable, got %s", err) + } + if _, err := tc.repl.raftTermRLocked(indexes[9] + 1000); !errors.Is(err, raft.ErrUnavailable) { + t.Errorf("expected ErrUnavailable, got %s", err) + } + }) } func TestGCIncorrectRange(t *testing.T) { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b5cfabef3603..f56f7245de7e 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -717,7 +717,7 @@ type Store struct { replicaGCQueue *replicaGCQueue // Replica GC queue raftLogQueue *raftLogQueue // Raft log truncation queue // Carries out truncations proposed by the raft log queue, and "replicated" - // via raft, when they are safe. + // via raft, when they are safe. Created in Store.Start. raftTruncator *raftLogTruncator raftSnapshotQueue *raftSnapshotQueue // Raft repair queue tsMaintenanceQueue *timeSeriesMaintenanceQueue // Time series maintenance queue @@ -1165,7 +1165,6 @@ func NewStore( ) } s.replRankings = newReplicaRankings() - s.raftTruncator = makeRaftLogTruncator((*storeForTruncatorImpl)(s)) s.draining.Store(false) s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency) @@ -1832,6 +1831,15 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { }) s.metrics.registry.AddMetricStruct(s.intentResolver.Metrics) + // Create the raft log truncator and register the callback. + s.raftTruncator = makeRaftLogTruncator(s.cfg.AmbientCtx, (*storeForTruncatorImpl)(s), stopper) + { + truncator := s.raftTruncator + s.engine.RegisterFlushCompletedCallback(func() { + truncator.durabilityAdvancedCallback() + }) + } + // Create the recovery manager. s.recoveryMgr = txnrecovery.NewManager( s.cfg.AmbientCtx, s.cfg.Clock, s.db, stopper, diff --git a/pkg/kv/kvserver/testdata/raft_log_truncator b/pkg/kv/kvserver/testdata/raft_log_truncator index e61b1666061d..2d1d8531f23c 100644 --- a/pkg/kv/kvserver/testdata/raft_log_truncator +++ b/pkg/kv/kvserver/testdata/raft_log_truncator @@ -399,3 +399,93 @@ print-engine-state id=2 truncated index: 34 log entries: durable applied index: 34 + +# Test case to ensure that truncator is reading only flushed engine state when +# deciding to enact truncations. +create-replica id=3 trunc-index=20 last-log-entry=30 +---- + +print-engine-state id=3 +---- +truncated index: 20 +log entries: 21, 22, 23, 24, 25, 26, 27, 28, 29 +durable applied index: 0 + +# Add two pending truncations. +add-pending-truncation id=3 first-index=21 trunc-index=22 delta-bytes=-20 sideloaded-bytes=10 +---- +r3.getPendingTruncs +r3.getTruncatedState +r3.sideloadedBytesIfTruncatedFromTo(21, 23) +truncator ranges: 3 + +add-pending-truncation id=3 first-index=23 trunc-index=24 delta-bytes=-20 sideloaded-bytes=10 +---- +r3.getPendingTruncs +r3.getTruncatedState +r3.sideloadedBytesIfTruncatedFromTo(23, 25) +truncator ranges: 3 + +print-replica-state id=3 +---- +truncIndex: 20 +pending: + {RaftTruncatedState:{Index:22 Term:0} expectedFirstIndex:21 logDeltaBytes:-30 isDeltaTrusted:true} + {RaftTruncatedState:{Index:24 Term:0} expectedFirstIndex:23 logDeltaBytes:-30 isDeltaTrusted:true} + +# Move RaftAppliedState enough to allow first truncation, but don't flush that +# change. +write-raft-applied-index id=3 raft-applied-index=22 no-flush=true +---- + +durability-advanced +---- +acquireReplica(3) +r3.getTruncatedState +r3.getPendingTruncs +r3.getStateLoader +releaseReplica(3) +truncator ranges: 3 + +# Both truncations are still pending. +print-replica-state id=3 +---- +truncIndex: 20 +pending: + {RaftTruncatedState:{Index:22 Term:0} expectedFirstIndex:21 logDeltaBytes:-30 isDeltaTrusted:true} + {RaftTruncatedState:{Index:24 Term:0} expectedFirstIndex:23 logDeltaBytes:-30 isDeltaTrusted:true} + +print-engine-state id=3 +---- +truncated index: 20 +log entries: 21, 22, 23, 24, 25, 26, 27, 28, 29 +durable applied index: 22 + +# Do the same change to RaftAppliedState, but flush this time (we didn't need +# to rewrite and flushing would have been sufficient). +write-raft-applied-index id=3 raft-applied-index=22 +---- + +durability-advanced +---- +acquireReplica(3) +r3.getTruncatedState +r3.getPendingTruncs +r3.getStateLoader +r3.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:true +r3.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +releaseReplica(3) +truncator ranges: 3 + +# First truncation is enacted. +print-replica-state id=3 +---- +truncIndex: 22 +pending: + {RaftTruncatedState:{Index:24 Term:0} expectedFirstIndex:23 logDeltaBytes:-30 isDeltaTrusted:true} + +print-engine-state id=3 +---- +truncated index: 22 +log entries: 23, 24, 25, 26, 27, 28, 29 +durable applied index: 22