diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index d39d1d2519d0..0d8a879d50c4 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -852,6 +852,174 @@ func TestSnapshotAfterTruncation(t *testing.T) { } } +// TestSnapshotAfterTruncationWithUncommittedTail is similar in spirit to +// TestSnapshotAfterTruncation/differentTerm. However, it differs in that we +// take care to ensure that the partitioned Replica has a long uncommitted tail +// of Raft entries that is not entirely overwritten by the snapshot it receives +// after the partition heals. If the recipient of the snapshot did not purge its +// Raft entry cache when receiving the snapshot, it could get stuck repeatedly +// rejecting attempts to catch it up. This serves as a regression test for the +// bug seen in #37056. +func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + mtc := &multiTestContext{ + // This test was written before the multiTestContext started creating many + // system ranges at startup, and hasn't been update to take that into + // account. + startWithSingleRange: true, + } + defer mtc.Stop() + mtc.Start(t, 3) + + key := roachpb.Key("a") + incA := int64(5) + incB := int64(7) + incC := int64(9) + incAB := incA + incB + incABC := incAB + incC + + // Set up a key to replicate across the cluster. We're going to modify this + // key and truncate the raft logs from that command after partitioning one + // of the nodes to check that it gets the new value after it reconnects. + // We're then going to continue modifying this key to make sure that the + // temporarily partitioned node can continue to receive updates. + incArgs := incrementArgs(key, incA) + if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incArgs); pErr != nil { + t.Fatal(pErr) + } + + mtc.replicateRange(1, 1, 2) + mtc.waitForValues(key, []int64{incA, incA, incA}) + + // We partition the original leader from the other two replicas. This allows + // us to build up a large uncommitted Raft log on the partitioned node. + const partStore = 0 + partRepl, err := mtc.stores[partStore].GetReplica(1) + if err != nil { + t.Fatal(err) + } + partReplDesc, err := partRepl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + partReplSender := mtc.stores[partStore].TestSender() + + // Partition the original leader from its followers. We do this by installing + // unreliableRaftHandler listeners on all three Stores. The handler on the + // partitioned store filters out all messages while the handler on the other + // two stores only filters out messages from the partitioned store. The + // configuration looks like: + // + // [0] + // x x + // / \ + // x x + // [1]<---->[2] + // + for _, s := range []int{0, 1, 2} { + h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[s]} + if s != partStore { + // Only filter messages from the partitioned store on the other + // two stores. + h.dropReq = func(req *storage.RaftMessageRequest) bool { + return req.FromReplica.StoreID == partRepl.StoreID() + } + h.dropHB = func(hb *storage.RaftHeartbeat) bool { + return hb.FromReplicaID == partReplDesc.ReplicaID + } + } + mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h) + } + + // Perform a series of writes on the partitioned replica. The writes will + // not succeed before their context is canceled, but they will be appended + // to the partitioned replica's Raft log because it is currently the Raft + // leader. + failedWrite := func() { + t.Helper() + cCtx, cancel := context.WithTimeout(ctx, 25*time.Millisecond) + defer cancel() + incArgs2 := incrementArgs(roachpb.Key("b"), 1) + if _, pErr := client.SendWrapped(cCtx, partReplSender, incArgs2); pErr == nil { + t.Fatal("unexpected success") + } else if !testutils.IsPError(pErr, "context deadline exceeded") { + t.Fatal(pErr) + } + } + for i := 0; i < 32; i++ { + failedWrite() + } + + // Transfer the lease to one of the followers and perform a write. The + // partition ensures that this will require a Raft leadership change. + const newLeaderStore = partStore + 1 + newLeaderRepl, err := mtc.stores[newLeaderStore].GetReplica(1) + if err != nil { + t.Fatal(err) + } + newLeaderReplSender := mtc.stores[newLeaderStore].TestSender() + + incArgs = incrementArgs(key, incB) + testutils.SucceedsSoon(t, func() error { + mtc.advanceClock(ctx) + _, pErr := client.SendWrapped(ctx, newLeaderReplSender, incArgs) + if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok { + return pErr.GoError() + } else if pErr != nil { + t.Fatal(pErr) + } + return nil + }) + mtc.waitForValues(key, []int64{incA, incAB, incAB}) + + index, err := newLeaderRepl.GetLastIndex() + if err != nil { + t.Fatal(err) + } + + // Truncate the log at index+1 (log entries < N are removed, so this + // includes the increment). + truncArgs := truncateLogArgs(index+1, 1) + if _, pErr := client.SendWrapped(ctx, newLeaderReplSender, truncArgs); pErr != nil { + t.Fatal(pErr) + } + + snapsMetric := mtc.stores[partStore].Metrics().RangeSnapshotsNormalApplied + snapsBefore := snapsMetric.Count() + + // Remove the partition. Snapshot should follow. + for _, s := range []int{0, 1, 2} { + mtc.transport.Listen(mtc.stores[s].Ident.StoreID, &unreliableRaftHandler{ + rangeID: 1, + RaftMessageHandler: mtc.stores[s], + dropReq: func(req *storage.RaftMessageRequest) bool { + // Make sure that even going forward no MsgApp for what we just truncated can + // make it through. The Raft transport is asynchronous so this is necessary + // to make the test pass reliably. + return req.Message.Type == raftpb.MsgApp && req.Message.Index <= index + }, + dropHB: func(*storage.RaftHeartbeat) bool { return false }, + dropResp: func(*storage.RaftMessageResponse) bool { return false }, + }) + } + + // The partitioned replica should catch up after a snapshot. + mtc.waitForValues(key, []int64{incAB, incAB, incAB}) + snapsAfter := snapsMetric.Count() + if !(snapsAfter > snapsBefore) { + t.Fatalf("expected at least 1 snapshot to catch the partitioned replica up") + } + + // Perform another write. The partitioned replica should be able to receive + // replicated updates. + incArgs = incrementArgs(key, incC) + if _, pErr := client.SendWrapped(ctx, newLeaderReplSender, incArgs); pErr != nil { + t.Fatal(pErr) + } + mtc.waitForValues(key, []int64{incABC, incABC, incABC}) +} + type fakeSnapshotStream struct { nextReq *storage.SnapshotRequest nextErr error