Skip to content

Commit

Permalink
storage: create new TestSnapshotAfterTruncationWithUncommittedTail test
Browse files Browse the repository at this point in the history
This PR adds a regression test for cockroachdb#37056. In doing so, it also confirms
the theory that cockroachdb#37055 is the proper fix for that bug.

Before cockroachdb#37055, the test would get stuck with the following logs repeatedly
firing:
```
I190424 00:15:52.338808 12 storage/client_test.go:1242  SucceedsSoon: expected [21 21 21], got [12 21 21]
I190424 00:15:52.378060 78 vendor/go.etcd.io/etcd/raft/raft.go:1317  [s1,r1/1:/M{in-ax}] 1 [logterm: 6, index: 31] rejected msgApp [logterm: 8, index: 31] from 2
I190424 00:15:52.378248 184 vendor/go.etcd.io/etcd/raft/raft.go:1065  [s2,r1/2:/M{in-ax}] 2 received msgApp rejection(lastindex: 31) from 1 for index 31
I190424 00:15:52.378275 184 vendor/go.etcd.io/etcd/raft/raft.go:1068  [s2,r1/2:/M{in-ax}] 2 decreased progress of 1 to [next = 31, match = 31, state = ProgressStateProbe, waiting = false, pendingSnapshot = 0]
```

After cockroachdb#37055, the test passes.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 24, 2019
1 parent af48c34 commit f14c931
Showing 1 changed file with 168 additions and 0 deletions.
168 changes: 168 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,174 @@ func TestSnapshotAfterTruncation(t *testing.T) {
}
}

// TestSnapshotAfterTruncationWithUncommittedTail is similar in spirit to
// TestSnapshotAfterTruncation/differentTerm. However, it differs in that we
// take care to ensure that the partitioned Replica has a long uncommitted tail
// of Raft entries that is not entirely overwritten by the snapshot it receives
// after the partition heals. If the recipient of the snapshot did not purge its
// Raft entry cache when receiving the snapshot, it could get stuck repeatedly
// rejecting attempts to catch it up. This serves as a regression test for the
// bug seen in #37056.
func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
mtc := &multiTestContext{
// This test was written before the multiTestContext started creating many
// system ranges at startup, and hasn't been update to take that into
// account.
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, 3)

key := roachpb.Key("a")
incA := int64(5)
incB := int64(7)
incC := int64(9)
incAB := incA + incB
incABC := incAB + incC

// Set up a key to replicate across the cluster. We're going to modify this
// key and truncate the raft logs from that command after partitioning one
// of the nodes to check that it gets the new value after it reconnects.
// We're then going to continue modifying this key to make sure that the
// temporarily partitioned node can continue to receive updates.
incArgs := incrementArgs(key, incA)
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incArgs); pErr != nil {
t.Fatal(pErr)
}

mtc.replicateRange(1, 1, 2)
mtc.waitForValues(key, []int64{incA, incA, incA})

// We partition the original leader from the other two replicas. This allows
// us to build up a large uncommitted Raft log on the partitioned node.
const partStore = 0
partRepl, err := mtc.stores[partStore].GetReplica(1)
if err != nil {
t.Fatal(err)
}
partReplDesc, err := partRepl.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
partReplSender := mtc.stores[partStore].TestSender()

// Partition the original leader from its followers. We do this by installing
// unreliableRaftHandler listeners on all three Stores. The handler on the
// partitioned store filters out all messages while the handler on the other
// two stores only filters out messages from the partitioned store. The
// configuration looks like:
//
// [0]
// x x
// / \
// x x
// [1]<---->[2]
//
for _, s := range []int{0, 1, 2} {
h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[s]}
if s != partStore {
// Only filter messages from the partitioned store on the other
// two stores.
h.dropReq = func(req *storage.RaftMessageRequest) bool {
return req.FromReplica.StoreID == partRepl.StoreID()
}
h.dropHB = func(hb *storage.RaftHeartbeat) bool {
return hb.FromReplicaID == partReplDesc.ReplicaID
}
}
mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h)
}

// Perform a series of writes on the partitioned replica. The writes will
// not succeed before their context is canceled, but they will be appended
// to the partitioned replica's Raft log because it is currently the Raft
// leader.
failedWrite := func() {
t.Helper()
cCtx, cancel := context.WithTimeout(ctx, 25*time.Millisecond)
defer cancel()
incArgs2 := incrementArgs(roachpb.Key("b"), 1)
if _, pErr := client.SendWrapped(cCtx, partReplSender, incArgs2); pErr == nil {
t.Fatal("unexpected success")
} else if !testutils.IsPError(pErr, "context deadline exceeded") {
t.Fatal(pErr)
}
}
for i := 0; i < 32; i++ {
failedWrite()
}

// Transfer the lease to one of the followers and perform a write. The
// partition ensures that this will require a Raft leadership change.
const newLeaderStore = partStore + 1
newLeaderRepl, err := mtc.stores[newLeaderStore].GetReplica(1)
if err != nil {
t.Fatal(err)
}
newLeaderReplSender := mtc.stores[newLeaderStore].TestSender()

incArgs = incrementArgs(key, incB)
testutils.SucceedsSoon(t, func() error {
mtc.advanceClock(ctx)
_, pErr := client.SendWrapped(ctx, newLeaderReplSender, incArgs)
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok {
return pErr.GoError()
} else if pErr != nil {
t.Fatal(pErr)
}
return nil
})
mtc.waitForValues(key, []int64{incA, incAB, incAB})

index, err := newLeaderRepl.GetLastIndex()
if err != nil {
t.Fatal(err)
}

// Truncate the log at index+1 (log entries < N are removed, so this
// includes the increment).
truncArgs := truncateLogArgs(index+1, 1)
if _, pErr := client.SendWrapped(ctx, newLeaderReplSender, truncArgs); pErr != nil {
t.Fatal(pErr)
}

snapsMetric := mtc.stores[partStore].Metrics().RangeSnapshotsNormalApplied
snapsBefore := snapsMetric.Count()

// Remove the partition. Snapshot should follow.
for _, s := range []int{0, 1, 2} {
mtc.transport.Listen(mtc.stores[s].Ident.StoreID, &unreliableRaftHandler{
rangeID: 1,
RaftMessageHandler: mtc.stores[s],
dropReq: func(req *storage.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just truncated can
// make it through. The Raft transport is asynchronous so this is necessary
// to make the test pass reliably.
return req.Message.Type == raftpb.MsgApp && req.Message.Index <= index
},
dropHB: func(*storage.RaftHeartbeat) bool { return false },
dropResp: func(*storage.RaftMessageResponse) bool { return false },
})
}

// The partitioned replica should catch up after a snapshot.
mtc.waitForValues(key, []int64{incAB, incAB, incAB})
snapsAfter := snapsMetric.Count()
if !(snapsAfter > snapsBefore) {
t.Fatalf("expected at least 1 snapshot to catch the partitioned replica up")
}

// Perform another write. The partitioned replica should be able to receive
// replicated updates.
incArgs = incrementArgs(key, incC)
if _, pErr := client.SendWrapped(ctx, newLeaderReplSender, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(key, []int64{incABC, incABC, incABC})
}

type fakeSnapshotStream struct {
nextReq *storage.SnapshotRequest
nextErr error
Expand Down

0 comments on commit f14c931

Please sign in to comment.