Skip to content

Commit

Permalink
Merge #37058
Browse files Browse the repository at this point in the history
37058: storage: create new TestSnapshotAfterTruncationWithUncommittedTail test r=nvanbenschoten a=nvanbenschoten

This PR adds a regression test for #37056. In doing so, it also confirms
the theory that #37055 is the proper fix for that bug.

Before #37055, the test would get stuck with the following logs repeatedly
firing:
```
I190424 00:15:52.338808 12 storage/client_test.go:1242  SucceedsSoon: expected [21 21 21], got [12 21 21]
I190424 00:15:52.378060 78 vendor/go.etcd.io/etcd/raft/raft.go:1317  [s1,r1/1:/M{in-ax}] 1 [logterm: 6, index: 31] rejected msgApp [logterm: 8, index: 31] from 2
I190424 00:15:52.378248 184 vendor/go.etcd.io/etcd/raft/raft.go:1065  [s2,r1/2:/M{in-ax}] 2 received msgApp rejection(lastindex: 31) from 1 for index 31
I190424 00:15:52.378275 184 vendor/go.etcd.io/etcd/raft/raft.go:1068  [s2,r1/2:/M{in-ax}] 2 decreased progress of 1 to [next = 31, match = 31, state = ProgressStateProbe, waiting = false, pendingSnapshot = 0]
```

After #37055, the test passes.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Apr 24, 2019
2 parents 4179d90 + 32e7d43 commit 9335ce0
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 6 deletions.
35 changes: 31 additions & 4 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2979,27 +2979,54 @@ type unreliableRaftHandler struct {
rangeID roachpb.RangeID
storage.RaftMessageHandler
// If non-nil, can return false to avoid dropping a msg to rangeID
drop func(request *storage.RaftMessageRequest, response *storage.RaftMessageResponse) bool
dropReq func(*storage.RaftMessageRequest) bool
dropHB func(*storage.RaftHeartbeat) bool
dropResp func(*storage.RaftMessageResponse) bool
}

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
if req.RangeID == h.rangeID {
if h.drop == nil || h.drop(req, nil) {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
reqCpy := *req
req = &reqCpy
req.Heartbeats = h.filterHeartbeats(req.Heartbeats)
req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps)
if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 {
// Entirely filtered.
return nil
}
} else if req.RangeID == h.rangeID {
if h.dropReq == nil || h.dropReq(req) {
return nil
}
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (h *unreliableRaftHandler) filterHeartbeats(
hbs []storage.RaftHeartbeat,
) []storage.RaftHeartbeat {
if len(hbs) == 0 {
return hbs
}
var cpy []storage.RaftHeartbeat
for i := range hbs {
hb := &hbs[i]
if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) {
cpy = append(cpy, *hb)
}
}
return cpy
}

func (h *unreliableRaftHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
if resp.RangeID == h.rangeID {
if h.drop == nil || h.drop(nil, resp) {
if h.dropResp == nil || h.dropResp(resp) {
return nil
}
}
Expand Down
171 changes: 171 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,177 @@ func TestSnapshotAfterTruncation(t *testing.T) {
}
}

// TestSnapshotAfterTruncationWithUncommittedTail is similar in spirit to
// TestSnapshotAfterTruncation/differentTerm. However, it differs in that we
// take care to ensure that the partitioned Replica has a long uncommitted tail
// of Raft entries that is not entirely overwritten by the snapshot it receives
// after the partition heals. If the recipient of the snapshot did not purge its
// Raft entry cache when receiving the snapshot, it could get stuck repeatedly
// rejecting attempts to catch it up. This serves as a regression test for the
// bug seen in #37056.
func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
mtc := &multiTestContext{
// This test was written before the multiTestContext started creating many
// system ranges at startup, and hasn't been update to take that into
// account.
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, 3)

key := roachpb.Key("a")
incA := int64(5)
incB := int64(7)
incC := int64(9)
incAB := incA + incB
incABC := incAB + incC

// Set up a key to replicate across the cluster. We're going to modify this
// key and truncate the raft logs from that command after partitioning one
// of the nodes to check that it gets the new value after it reconnects.
// We're then going to continue modifying this key to make sure that the
// temporarily partitioned node can continue to receive updates.
incArgs := incrementArgs(key, incA)
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incArgs); pErr != nil {
t.Fatal(pErr)
}

mtc.replicateRange(1, 1, 2)
mtc.waitForValues(key, []int64{incA, incA, incA})

// We partition the original leader from the other two replicas. This allows
// us to build up a large uncommitted Raft log on the partitioned node.
const partStore = 0
partRepl, err := mtc.stores[partStore].GetReplica(1)
if err != nil {
t.Fatal(err)
}
partReplDesc, err := partRepl.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
partReplSender := mtc.stores[partStore].TestSender()

// Partition the original leader from its followers. We do this by installing
// unreliableRaftHandler listeners on all three Stores. The handler on the
// partitioned store filters out all messages while the handler on the other
// two stores only filters out messages from the partitioned store. The
// configuration looks like:
//
// [0]
// x x
// / \
// x x
// [1]<---->[2]
//
for _, s := range []int{0, 1, 2} {
h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[s]}
if s != partStore {
// Only filter messages from the partitioned store on the other
// two stores.
h.dropReq = func(req *storage.RaftMessageRequest) bool {
return req.FromReplica.StoreID == partRepl.StoreID()
}
h.dropHB = func(hb *storage.RaftHeartbeat) bool {
return hb.FromReplicaID == partReplDesc.ReplicaID
}
}
mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h)
}

// Perform a series of writes on the partitioned replica. The writes will
// not succeed before their context is canceled, but they will be appended
// to the partitioned replica's Raft log because it is currently the Raft
// leader.
failedWrite := func() {
t.Helper()
cCtx, cancel := context.WithTimeout(ctx, 25*time.Millisecond)
defer cancel()
incArgs2 := incrementArgs(roachpb.Key("b"), 1)
if _, pErr := client.SendWrapped(cCtx, partReplSender, incArgs2); pErr == nil {
t.Fatal("unexpected success")
} else if !testutils.IsPError(pErr, "context deadline exceeded") {
t.Fatal(pErr)
}
}
for i := 0; i < 32; i++ {
failedWrite()
}

// Transfer the lease to one of the followers and perform a write. The
// partition ensures that this will require a Raft leadership change.
const newLeaderStore = partStore + 1
newLeaderRepl, err := mtc.stores[newLeaderStore].GetReplica(1)
if err != nil {
t.Fatal(err)
}
newLeaderReplSender := mtc.stores[newLeaderStore].TestSender()

incArgs = incrementArgs(key, incB)
testutils.SucceedsSoon(t, func() error {
mtc.advanceClock(ctx)
_, pErr := client.SendWrapped(ctx, newLeaderReplSender, incArgs)
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok {
return pErr.GoError()
} else if pErr != nil {
t.Fatal(pErr)
}
return nil
})
mtc.waitForValues(key, []int64{incA, incAB, incAB})

index, err := newLeaderRepl.GetLastIndex()
if err != nil {
t.Fatal(err)
}

// Truncate the log at index+1 (log entries < N are removed, so this
// includes the increment).
truncArgs := truncateLogArgs(index+1, 1)
if _, pErr := client.SendWrapped(ctx, newLeaderReplSender, truncArgs); pErr != nil {
t.Fatal(pErr)
}

snapsMetric := mtc.stores[partStore].Metrics().RangeSnapshotsNormalApplied
snapsBefore := snapsMetric.Count()

// Remove the partition. Snapshot should follow.
for _, s := range []int{0, 1, 2} {
mtc.transport.Listen(mtc.stores[s].Ident.StoreID, &unreliableRaftHandler{
rangeID: 1,
RaftMessageHandler: mtc.stores[s],
dropReq: func(req *storage.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just truncated can
// make it through. The Raft transport is asynchronous so this is necessary
// to make the test pass reliably.
return req.Message.Type == raftpb.MsgApp && req.Message.Index <= index
},
dropHB: func(*storage.RaftHeartbeat) bool { return false },
dropResp: func(*storage.RaftMessageResponse) bool { return false },
})
}

// The partitioned replica should catch up after a snapshot.
testutils.SucceedsSoon(t, func() error {
snapsAfter := snapsMetric.Count()
if !(snapsAfter > snapsBefore) {
return errors.New("expected at least 1 snapshot to catch the partitioned replica up")
}
return nil
})
mtc.waitForValues(key, []int64{incAB, incAB, incAB})

// Perform another write. The partitioned replica should be able to receive
// replicated updates.
incArgs = incrementArgs(key, incC)
if _, pErr := client.SendWrapped(ctx, newLeaderReplSender, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(key, []int64{incABC, incABC, incABC})
}

type fakeSnapshotStream struct {
nextReq *storage.SnapshotRequest
nextErr error
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,12 +603,14 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
mtc.transport.Listen(partitionStore.Ident.StoreID, &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: partitionStore,
drop: func(req *storage.RaftMessageRequest, _ *storage.RaftMessageResponse) bool {
dropReq: func(req *storage.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just truncated can
// make it through. The Raft transport is asynchronous so this is necessary
// to make the test pass reliably.
return req != nil && req.Message.Type == raftpb.MsgApp && req.Message.Index <= index
return req.Message.Type == raftpb.MsgApp && req.Message.Index <= index
},
dropHB: func(*storage.RaftHeartbeat) bool { return false },
dropResp: func(*storage.RaftMessageResponse) bool { return false },
})

// Check the error.
Expand Down

0 comments on commit 9335ce0

Please sign in to comment.