Skip to content

Commit

Permalink
[wip] raft/rafttest: introduce datadriven testing
Browse files Browse the repository at this point in the history
It has often been tedious to test the interactions between multi-member
Raft groups, especially when many steps were required to reach a certain
scenario. Often, this boilerplate was as boring as it is hard to write
and hard to maintain, making it attractive to resort to shortcuts
whenever possible, which in turn tended to undercut how meaningful and
maintainable the tests ended up being - that is, if the tests were even
written, which sometimes they weren't.

This change introduces a datadriven framework specifically for testing
deterministically the interaction between multiple members of a raft group
with the goal of reducing the friction for writing these tests to near
zero.

In the near term, this will be used to add thorough testing for joint
consensus (which is already available today, but wildly undertested),
but just converting an existing test into this framework has shown that
the concise representation and built-in inspection of log messages
highlights unexpected behavior much more readily than the previous unit
tests did (the test in question is `snapshot_succeed_via_app_resp`; the
reader is invited to compare the old and new version of it).

The main building block is `InteractionEnv`, which holds on to the state
of the whole system and exposes various relevant methods for
manipulating it, including but not limited to adding nodes, delivering
and dropping messages, and proposing configuration changes. All of this
is extensible so that in the future I hope to use it to explore the
phenomena discussed in

etcd-io#7625 (comment)

which requires injecting appropriate "crash points" in the Ready
handling loop. Discussions of the "what if X happened in state Y"
can quickly be made concrete by "scripting up an interaction test".

Additionally, this framework is intentionally not kept internal to the
raft package.. Though this is in its infancy, a goal is that it should
be possible for a suite of interaction tests to allow applications to
validate that their Storage implementation behaves accordingly, simply
by running a raft-provided interaction suite against their Storage.
  • Loading branch information
tbg committed Aug 7, 2019
1 parent 888670c commit 5f1e2f7
Show file tree
Hide file tree
Showing 23 changed files with 1,376 additions and 126 deletions.
17 changes: 17 additions & 0 deletions raft/interaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package raft_test

import (
"testing"

"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft/rafttest"
)

func TestInteraction(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
env := rafttest.NewInteractionEnv(nil)
datadriven.RunTest(t, path, func(d *datadriven.TestData) string {
return env.Handle(t, *d)
})
})
}
13 changes: 11 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -528,7 +529,6 @@ func (r *raft) bcastAppend() {
if id == r.id {
return
}

r.sendAppend(id)
})
}
Expand Down Expand Up @@ -794,7 +794,16 @@ func (r *raft) campaign(t CampaignType) {
}
return
}
for id := range r.prs.Voters.IDs() {
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
Expand Down
120 changes: 0 additions & 120 deletions raft/raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)

var (
Expand Down Expand Up @@ -112,125 +111,6 @@ func TestSnapshotSucceed(t *testing.T) {
}
}

// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap-
// shot is sent to a follower at the most recent index (i.e. the snapshot index
// is the leader's last index is the committed index). In that situation, a bug
// in the past left the follower in probing status until the next log entry was
// committed.
func TestSnapshotSucceedViaAppResp(t *testing.T) {
s1 := NewMemoryStorage()
// Create a single-node leader.
n1 := newTestRaft(1, []uint64{1}, 10, 1, s1)
n1.becomeCandidate()
n1.becomeLeader()
// We need to add a second empty entry so that we can truncate the first
// one away.
n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}})

rd := newReady(n1, &SoftState{}, pb.HardState{})
s1.Append(rd.Entries)
s1.SetHardState(rd.HardState)

if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp {
t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1)
}

// Force a log truncation.
if err := s1.Compact(1); err != nil {
t.Fatal(err)
}

// Add a follower to the group. Do this in a clandestine way for simplicity.
// Also set up a snapshot that will be sent to the follower.
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
s1.snapshot = pb.Snapshot{
Metadata: pb.SnapshotMetadata{
ConfState: pb.ConfState{Voters: []uint64{1, 2}},
Index: s1.lastIndex(),
Term: s1.ents[len(s1.ents)-1].Term,
},
}

noMessage := pb.MessageType(-1)
mustSend := func(from, to *raft, typ pb.MessageType) pb.Message {
t.Helper()
for i, msg := range from.msgs {
if msg.From != from.id || msg.To != to.id || msg.Type != typ {
continue
}
t.Log(DescribeMessage(msg, func([]byte) string { return "" }))
if len(msg.Entries) > 0 {
t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) }))
}
if err := to.Step(msg); err != nil {
t.Fatalf("%v: %s", msg, err)
}
from.msgs = append(from.msgs[:i], from.msgs[i+1:]...)
return msg
}
if typ == noMessage {
if len(from.msgs) == 0 {
return pb.Message{}
}
t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs)
}
t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs)
return pb.Message{} // unreachable
}

// Create the follower that will receive the snapshot.
s2 := NewMemoryStorage()
n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2)

// Let the leader probe the follower.
if !n1.maybeSendAppend(2, true /* sendIfEmpty */) {
t.Fatalf("expected message to be sent")
}
if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 {
// For this test to work, the leader must not have anything to append
// to the follower right now.
t.Fatalf("unexpectedly appending entries %v", msg.Entries)
}

// Follower rejects the append (because it doesn't have any log entries)
if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject {
t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint)
}

const expIdx = 2
// Leader sends snapshot due to RejectHint of zero (we set up the raft log
// to start at index 2).
if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx {
t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index)
}

// n2 reacts to snapshot with MsgAppResp.
if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx {
t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index)
}

// Leader sends MsgApp to communicate commit index.
if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx {
t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit)
}

// Follower responds.
mustSend(n2, n1, pb.MsgAppResp)

// Leader has correct state for follower.
pr := n1.prs.Progress[2]
if pr.State != tracker.StateReplicate {
t.Fatalf("unexpected state %v", pr)
}
if pr.Match != expIdx || pr.Next != expIdx+1 {
t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next)
}

// Leader and follower are done.
mustSend(n1, n2, noMessage)
mustSend(n2, n1, noMessage)
}

func TestSnapshotAbort(t *testing.T) {
storage := NewMemoryStorage()
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
Expand Down
97 changes: 97 additions & 0 deletions raft/rafttest/interaction_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafttest

import (
"fmt"
"math"
"strings"

"go.etcd.io/etcd/raft"
pb "go.etcd.io/etcd/raft/raftpb"
)

// InteractionOpts blob
type InteractionOpts struct {
OnConfig func(*raft.Config)
}

type State struct {
AppliedIndex uint64
pb.ConfState
Content string
}

type Storage interface {
raft.Storage
SetHardState(state pb.HardState) error
ApplySnapshot(pb.Snapshot) error
Compact(newFirstIndex uint64) error
Append([]pb.Entry) error
}

type InteractionEnv struct {
Options *InteractionOpts
Nodes []*raft.RawNode
Storages []Storage
Configs []*raft.Config
Messages []pb.Message

Histories [][]pb.Snapshot

Logger *RedirectLogger
}

func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv {
if opts == nil {
opts = &InteractionOpts{}
}
return &InteractionEnv{
Options: opts,
Logger: &RedirectLogger{
Buf: &strings.Builder{},
},
}
}

func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config {
return &raft.Config{
ID: id,
Applied: applied,
ElectionTick: 3,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: math.MaxUint64,
MaxInflightMsgs: math.MaxInt32,
}
}

func defaultEntryFormatter(b []byte) string {
return fmt.Sprintf("%q", b)
}

type Store struct {
Storage
SnapshotOverride func() (pb.Snapshot, error)
}

func (s Store) Snapshot() (pb.Snapshot, error) {
if s.SnapshotOverride != nil {
return s.SnapshotOverride()
}
return s.Storage.Snapshot()
}

var _ raft.Storage = Store{}
91 changes: 91 additions & 0 deletions raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafttest

import (
"fmt"
"strconv"
"testing"

"github.com/cockroachdb/datadriven"
)

func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
env.Logger.Buf.Reset()
buf := env.Logger.Buf
var err error
switch d.Cmd {
case "add-nodes":
err = env.handleAddNodes(t, d)
case "campaign":
err = env.handleCampaign(t, d)
case "compact":
err = env.handleCompact(t, d)
case "deliver-msgs":
err = env.handleDeliverMsgs(t, d)
case "process-ready":
err = env.handleProcessReady(t, d)
case "log-level":
err = env.handleLogLevel(t, d)
case "raft-log":
err = env.handleRaftLog(t, d)
case "stabilize":
err = env.handleStabilize(t, d)
case "status":
err = env.handleStatus(t, d)
case "tick-heartbeat":
err = env.handleTickHeartbeat(t, d)
case "propose-conf-change":
err = env.handleProposeConfChange(t, d)
default:
err = fmt.Errorf("unknown command")
}
if err != nil {
buf.WriteString(err.Error())
}
// NB: the highest log level suppresses all output, including that of the
// handlers. This comes in useful during setup which can be chatty.
if buf.Len() == 0 || env.Logger.Lvl == len(lvlNames)-1 {
return "ok\n"
}
s := buf.String()
if len(s) > 0 && s[len(s)-1] != '\n' {
s += "\n"
}
return s
}

func mustFirstAsInt(t *testing.T, d datadriven.TestData) int {
n, err := strconv.Atoi(d.CmdArgs[0].Key)
if err != nil {
t.Fatal(err)
}
return n
}

func ints(t *testing.T, d datadriven.TestData) []int {
var ints []int
for i := 0; i < len(d.CmdArgs); i++ {
if len(d.CmdArgs[i].Vals) != 0 {
continue
}
n, err := strconv.Atoi(d.CmdArgs[i].Key)
if err != nil {
t.Fatal(err)
}
ints = append(ints, n)
}
return ints
}
Loading

0 comments on commit 5f1e2f7

Please sign in to comment.