diff --git a/go.mod b/go.mod index cb28a0d8b8d..31eabdb8a4a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module go.etcd.io/etcd require ( github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect github.com/bgentry/speakeasy v0.1.0 + github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238 github.com/coreos/go-semver v0.2.0 github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf diff --git a/go.sum b/go.sum index 660f98f0d45..213796aa75c 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLM github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238 h1:uNljlOxtOHrPnRoPPx+JanqjAGZpNiqAGVBfGskd/pg= +github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM= diff --git a/raft/progress.go b/raft/progress.go index fa4d63edfba..37663562493 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -17,6 +17,8 @@ package raft import ( "fmt" "sort" + + "go.etcd.io/etcd/raft/quorum" ) const ( @@ -291,21 +293,25 @@ func (in *inflights) reset() { // known about the nodes and learners in it. In particular, it tracks the match // index for each peer which in turn allows reasoning about the committed index. type progressTracker struct { - nodes map[uint64]*Progress - learners map[uint64]*Progress + voters quorum.JointConfig + learners map[uint64]struct{} + prs map[uint64]*Progress votes map[uint64]bool maxInflight int - matchBuf uint64Slice } -func makePRS(maxInflight int) progressTracker { +func makeProgressTracker(maxInflight int) progressTracker { p := progressTracker{ maxInflight: maxInflight, - nodes: map[uint64]*Progress{}, - learners: map[uint64]*Progress{}, - votes: map[uint64]bool{}, + voters: quorum.JointConfig{ + quorum.MajorityConfig{}, + quorum.MajorityConfig{}, + }, + learners: map[uint64]struct{}{}, + votes: map[uint64]bool{}, + prs: map[uint64]*Progress{}, } return p } @@ -313,80 +319,70 @@ func makePRS(maxInflight int) progressTracker { // isSingleton returns true if (and only if) there is only one voting member // (i.e. the leader) in the current configuration. func (p *progressTracker) isSingleton() bool { - return len(p.nodes) == 1 + return len(p.voters[0]) == 1 && len(p.voters[1]) == 0 } -func (p *progressTracker) quorum() int { - return len(p.nodes)/2 + 1 -} +type progressAckIndexer map[uint64]*Progress + +var _ quorum.AckedIndexer = progressAckIndexer(nil) -func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool { - return len(m) >= p.quorum() +func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) { + pr, ok := l[id] + if !ok { + return 0, false + } + return quorum.Index(pr.Match), true } // committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. func (p *progressTracker) committed() uint64 { - // Preserving matchBuf across calls is an optimization - // used to avoid allocating a new slice on each call. - if cap(p.matchBuf) < len(p.nodes) { - p.matchBuf = make(uint64Slice, len(p.nodes)) - } - p.matchBuf = p.matchBuf[:len(p.nodes)] - idx := 0 - for _, pr := range p.nodes { - p.matchBuf[idx] = pr.Match - idx++ - } - sort.Sort(&p.matchBuf) - return p.matchBuf[len(p.matchBuf)-p.quorum()] + return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs))) } func (p *progressTracker) removeAny(id uint64) { - pN := p.nodes[id] - pL := p.learners[id] + _, okPR := p.prs[id] + _, okV1 := p.voters[0][id] + _, okV2 := p.voters[1][id] + _, okL := p.learners[id] + + okV := okV1 || okV2 - if pN == nil && pL == nil { + if !okPR { + panic("attempting to remove unknown peer %x") + } else if !okV && !okL { panic("attempting to remove unknown peer %x") - } else if pN != nil && pL != nil { + } else if okV && okL { panic(fmt.Sprintf("peer %x is both voter and learner", id)) } - delete(p.nodes, id) + delete(p.voters[0], id) + delete(p.voters[1], id) delete(p.learners, id) + delete(p.prs, id) } // initProgress initializes a new progress for the given node or learner. The // node may not exist yet in either form or a panic will ensue. func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { - if pr := p.nodes[id]; pr != nil { + if pr := p.prs[id]; pr != nil { panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) } - if pr := p.learners[id]; pr != nil { - panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr)) - } if !isLearner { - p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)} - return + p.voters[0][id] = struct{}{} + } else { + p.learners[id] = struct{}{} } - p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true} + p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner} } func (p *progressTracker) getProgress(id uint64) *Progress { - if pr, ok := p.nodes[id]; ok { - return pr - } - - return p.learners[id] + return p.prs[id] } // visit invokes the supplied closure for all tracked progresses. func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { - for id, pr := range p.nodes { - f(id, pr) - } - - for id, pr := range p.learners { + for id, pr := range p.prs { f(id, pr) } } @@ -395,19 +391,21 @@ func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { // the view of the local raft state machine. Otherwise, it returns // false. func (p *progressTracker) quorumActive() bool { - var act int + votes := map[uint64]bool{} p.visit(func(id uint64, pr *Progress) { - if pr.RecentActive && !pr.IsLearner { - act++ + if pr.IsLearner { + return } + votes[id] = pr.RecentActive }) - return act >= p.quorum() + return p.voters.VoteResult(votes) == quorum.VoteWon } func (p *progressTracker) voterNodes() []uint64 { - nodes := make([]uint64, 0, len(p.nodes)) - for id := range p.nodes { + m := p.voters.IDs() + nodes := make([]uint64, 0, len(m)) + for id := range m { nodes = append(nodes, id) } sort.Sort(uint64Slice(nodes)) @@ -439,22 +437,21 @@ func (p *progressTracker) recordVote(id uint64, v bool) { // tallyVotes returns the number of granted and rejected votes, and whether the // election outcome is known. -func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) { - for _, v := range p.votes { - if v { +func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) { + // Make sure to populate granted/rejected correctly even if the votes slice + // contains members no longer part of the configuration. This doesn't really + // matter in the way the numbers are used (they're informational), but might + // as well get it right. + for id, pr := range p.prs { + if pr.IsLearner { + continue + } + if p.votes[id] { granted++ } else { rejected++ } } - - q := p.quorum() - - result = electionIndeterminate - if granted >= q { - result = electionWon - } else if rejected >= q { - result = electionLost - } + result := p.voters.VoteResult(p.votes) return granted, rejected, result } diff --git a/raft/quorum/bench_test.go b/raft/quorum/bench_test.go new file mode 100644 index 00000000000..5c7961ed6cf --- /dev/null +++ b/raft/quorum/bench_test.go @@ -0,0 +1,40 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "fmt" + "math" + "math/rand" + "testing" +) + +func BenchmarkMajorityConfig_CommittedIndex(b *testing.B) { + // go test -run - -bench . -benchmem ./raft/quorum + for _, n := range []int{1, 3, 5, 7, 9, 11} { + b.Run(fmt.Sprintf("voters=%d", n), func(b *testing.B) { + c := MajorityConfig{} + l := mapAckIndexer{} + for i := uint64(0); i < uint64(n); i++ { + c[i+1] = struct{}{} + l[i+1] = Index(rand.Int63n(math.MaxInt64)) + } + + for i := 0; i < b.N; i++ { + _ = c.CommittedIndex(l) + } + }) + } +} diff --git a/raft/quorum/datadriven_test.go b/raft/quorum/datadriven_test.go new file mode 100644 index 00000000000..58bfd7234ca --- /dev/null +++ b/raft/quorum/datadriven_test.go @@ -0,0 +1,250 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/datadriven" +) + +// TestDataDriven parses and executes the test cases in ./testdata/*. An entry +// in such a file specifies the command, which is either of "committed" to check +// CommittedIndex or "vote" to verify a VoteResult. The underlying configuration +// and inputs are specified via the arguments 'cfg' and 'cfgj' (for the majority +// config and, optionally, majority config joint to the first one) and 'idx' +// (for CommittedIndex) and 'votes' (for VoteResult). +// +// Internally, the harness runs some additional checks on each test case for +// which it is known that the result shouldn't change. For example, +// interchanging the majority configurations of a joint quorum must not +// influence the result; if it does, this is noted in the test's output. +func TestDataDriven(t *testing.T) { + datadriven.Walk(t, "testdata", func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(d *datadriven.TestData) string { + // Two majority configs. The first one is always used (though it may + // be empty) and the second one is used iff joint is true. + var joint bool + var ids, idsj []uint64 + // The committed indexes for the nodes in the config in the order in + // which they appear in (ids,idsj), without repetition. An underscore + // denotes an omission (i.e. no information for this voter); this is + // different from 0. For example, + // + // cfg=(1,2) cfgj=(2,3,4) idxs=(_,5,_,7) initializes the idx for voter 2 + // to 5 and that for voter 4 to 7 (and no others). + // + // cfgj=zero is specified to instruct the test harness to treat cfgj + // as zero instead of not specified (i.e. it will trigger a joint + // quorum test instead of a majority quorum test for cfg only). + var idxs []Index + // Votes. These are initialized similar to idxs except the only values + // used are 1 (voted against) and 2 (voted for). This looks awkward, + // but is convenient because it allows sharing code between the two. + var votes []Index + + // Parse the args. + for _, arg := range d.CmdArgs { + for i := range arg.Vals { + switch arg.Key { + case "cfg": + var n uint64 + arg.Scan(t, i, &n) + ids = append(ids, n) + case "cfgj": + joint = true + if arg.Vals[i] == "zero" { + if len(arg.Vals) != 1 { + t.Fatalf("cannot mix 'zero' into configuration") + } + } else { + var n uint64 + arg.Scan(t, i, &n) + idsj = append(idsj, n) + } + case "idx": + var n uint64 + // Register placeholders as zeroes. + if arg.Vals[i] != "_" { + arg.Scan(t, i, &n) + if n == 0 { + // This is a restriction caused by the above + // special-casing for _. + t.Fatalf("cannot use 0 as idx") + } + } + idxs = append(idxs, Index(n)) + case "votes": + var s string + arg.Scan(t, i, &s) + switch s { + case "y": + votes = append(votes, 2) + case "n": + votes = append(votes, 1) + case "_": + votes = append(votes, 0) + default: + t.Fatalf("unknown vote: %s", s) + } + default: + t.Fatalf("unknown arg %s", arg.Key) + } + } + } + + // Build the two majority configs. + c := MajorityConfig{} + for _, id := range ids { + c[id] = struct{}{} + } + cj := MajorityConfig{} + for _, id := range idsj { + cj[id] = struct{}{} + } + + // Helper that returns an AckedIndexer which has the specified indexes + // mapped to the right IDs. + makeLookuper := func(idxs []Index, ids, idsj []uint64) mapAckIndexer { + l := mapAckIndexer{} + var p int // next to consume from idxs + for _, id := range append(append([]uint64(nil), ids...), idsj...) { + if _, ok := l[id]; ok { + continue + } + if p < len(idxs) { + // NB: this creates zero entries for placeholders that we remove later. + // The upshot of doing it that way is to avoid having to specify place- + // holders multiple times when omitting voters present in both halves of + // a joint config. + l[id] = idxs[p] + p++ + } + } + + for id := range l { + // Zero entries are created by _ placeholders; we don't want + // them in the lookuper because "no entry" is different from + // "zero entry". Note that we prevent tests from specifying + // zero commit indexes, so that there's no confusion between + // the two concepts. + if l[id] == 0 { + delete(l, id) + } + } + return l + } + + { + input := idxs + if d.Cmd == "vote" { + input = votes + } + if voters := JointConfig([2]MajorityConfig{c, cj}).IDs(); len(voters) != len(input) { + return fmt.Sprintf("error: mismatched input (explicit or _) for voters %v: %v", + voters, input) + } + } + + var buf strings.Builder + switch d.Cmd { + case "committed": + l := makeLookuper(idxs, ids, idsj) + + // Branch based on whether this is a majority or joint quorum + // test case. + if !joint { + idx := c.CommittedIndex(l) + fmt.Fprintf(&buf, c.Describe(l)) + // These alternative computations should return the same + // result. If not, print to the output. + if aIdx := alternativeMajorityCommittedIndex(c, l); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- via alternative computation\n", aIdx) + } + // Joining a majority with the empty majority should give same result. + if aIdx := JointConfig([2]MajorityConfig{c, {}}).CommittedIndex(l); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- via zero-joint quorum\n", aIdx) + } + // Joining a majority with itself should give same result. + if aIdx := JointConfig([2]MajorityConfig{c, c}).CommittedIndex(l); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- via self-joint quorum\n", aIdx) + } + overlay := func(c MajorityConfig, l AckedIndexer, id uint64, idx Index) AckedIndexer { + ll := mapAckIndexer{} + for iid := range c { + if iid == id { + ll[iid] = idx + } else if idx, ok := l.AckedIndex(iid); ok { + ll[iid] = idx + } + } + return ll + } + for id := range c { + idx, _ := l.AckedIndex(id) + if idx > idx && idx > 0 { + // If the committed index was definitely above the currently + // inspected idx, the result shouldn't change if we lower it + // further. + lo := overlay(c, l, id, idx-1) + if aIdx := c.CommittedIndex(lo); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- overlaying %d->%d", aIdx, id, idx) + } + lo = overlay(c, l, id, 0) + if aIdx := c.CommittedIndex(lo); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- overlaying %d->0", aIdx, id) + } + } + } + fmt.Fprintf(&buf, "%s\n", idx) + } else { + cc := JointConfig([2]MajorityConfig{c, cj}) + fmt.Fprintf(&buf, cc.Describe(l)) + idx := cc.CommittedIndex(l) + // Interchanging the majorities shouldn't make a difference. If it does, print. + if aIdx := JointConfig([2]MajorityConfig{c, cj}).CommittedIndex(l); aIdx != idx { + fmt.Fprintf(&buf, "%s <-- via symmetry\n", aIdx) + } + fmt.Fprintf(&buf, "%s\n", idx) + } + case "vote": + ll := makeLookuper(votes, ids, idsj) + l := map[uint64]bool{} + for id, v := range ll { + l[id] = v != 1 // NB: 1 == false, 2 == true + } + + if !joint { + // Test a majority quorum. + r := c.VoteResult(l) + fmt.Fprintf(&buf, "%v\n", r) + } else { + // Run a joint quorum test case. + r := JointConfig([2]MajorityConfig{c, cj}).VoteResult(l) + // Interchanging the majorities shouldn't make a difference. If it does, print. + if ar := JointConfig([2]MajorityConfig{cj, c}).VoteResult(l); ar != r { + fmt.Fprintf(&buf, "%v <-- via symmetry\n", ar) + } + fmt.Fprintf(&buf, "%v\n", r) + } + default: + t.Fatalf("unknown command: %s", d.Cmd) + } + return buf.String() + }) + }) +} diff --git a/raft/quorum/joint.go b/raft/quorum/joint.go new file mode 100644 index 00000000000..9f8f484dc57 --- /dev/null +++ b/raft/quorum/joint.go @@ -0,0 +1,68 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +// JointConfig is a configuration of two groups of (possibly overlapping) +// majority configurations. Decisions require the support of both majorities. +type JointConfig [2]MajorityConfig + +// IDs returns a newly initialized map representing the set of voters present +// in the joint configuration. +func (c JointConfig) IDs() map[uint64]struct{} { + m := map[uint64]struct{}{} + for _, cc := range c { + for id := range cc { + m[id] = struct{}{} + } + } + return m +} + +// Describe returns a (multi-line) representation of the commit indexes for the +// given lookuper. +func (c JointConfig) Describe(l AckedIndexer) string { + return MajorityConfig(c.IDs()).Describe(l) +} + +// CommittedIndex returns the largest committed index for the given joint +// quorum. An index is jointly committed if it is committed in both constituent +// majorities. +func (c JointConfig) CommittedIndex(l AckedIndexer) Index { + idx0 := c[0].CommittedIndex(l) + idx1 := c[1].CommittedIndex(l) + if idx0 < idx1 { + return idx0 + } + return idx1 +} + +// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns +// a result indicating whether the vote is pending, lost, or won. A joint quorum +// requires both majority quorums to vote in favor. +func (c JointConfig) VoteResult(votes map[uint64]bool) VoteResult { + r1 := c[0].VoteResult(votes) + r2 := c[1].VoteResult(votes) + + if r1 == r2 { + // If they agree, return the agreed state. + return r1 + } + if r1 == VoteLost || r2 == VoteLost { + // If either config has lost, loss is the only possible outcome. + return VoteLost + } + // One side won, the other one is pending, so the whole outcome is. + return VotePending +} diff --git a/raft/quorum/majority.go b/raft/quorum/majority.go new file mode 100644 index 00000000000..3d7bf82335a --- /dev/null +++ b/raft/quorum/majority.go @@ -0,0 +1,184 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "fmt" + "math" + "sort" + "strings" +) + +// MajorityConfig is a set of IDs that uses majority quorums to make decisions. +type MajorityConfig map[uint64]struct{} + +// Describe returns a (multi-line) representation of the commit indexes for the +// given lookuper. +func (c MajorityConfig) Describe(l AckedIndexer) string { + if len(c) == 0 { + return "" + } + type tup struct { + id uint64 + idx Index + ok bool // idx found? + bar int // length of bar displayed for this tup + } + + // Below, populate .bar so that the i-th largest commit index has bar i (we + // plot this as sort of a progress bar). The actual code is a bit more + // complicated and also makes sure that equal index => equal bar. + + n := len(c) + info := make([]tup, 0, n) + for id := range c { + idx, ok := l.AckedIndex(id) + info = append(info, tup{id: id, idx: idx, ok: ok}) + } + + // Sort by index + sort.Slice(info, func(i, j int) bool { + if info[i].idx == info[j].idx { + return info[i].id < info[j].id + } + return info[i].idx < info[j].idx + }) + + // Populate .bar. + for i := range info { + if i > 0 && info[i-1].idx < info[i].idx { + info[i].bar = i + } + } + + // Sort by ID. + sort.Slice(info, func(i, j int) bool { + return info[i].id < info[j].id + }) + + var buf strings.Builder + + // Print. + fmt.Fprint(&buf, strings.Repeat(" ", n)+" idx\n") + for i := range info { + bar := info[i].bar + if !info[i].ok { + fmt.Fprint(&buf, "?"+strings.Repeat(" ", n)) + } else { + fmt.Fprint(&buf, strings.Repeat("x", bar)+">"+strings.Repeat(" ", n-bar)) + } + fmt.Fprintf(&buf, " %5d (id=%d)\n", info[i].idx, info[i].id) + } + return buf.String() +} + +type uint64Slice []uint64 + +func insertionSort(sl uint64Slice) { + a, b := 0, len(sl) + for i := a + 1; i < b; i++ { + for j := i; j > a && sl[j] < sl[j-1]; j-- { + sl[j], sl[j-1] = sl[j-1], sl[j] + } + } +} + +// CommittedIndex computes the committed index from those supplied via the +// provided AckedIndexer (for the active config). +func (c MajorityConfig) CommittedIndex(l AckedIndexer) Index { + n := len(c) + if n == 0 { + // This plays well with joint quorums which, when one half is the zero + // MajorityConfig, should behave like the other half. + return math.MaxUint64 + } + + // Use an on-stack slice to collect the committed indexes when n <= 7 + // (otherwise we alloc). The alternative is to stash a slice on + // MajorityConfig, but this impairs usability (as is, MajorityConfig is just + // a map, and that's nice). The assumption is that running with a + // replication factor of >7 is rare, and in cases in which it happens + // performance is a lesser concern (additionally the performance + // implications of an allocation here are far from drastic). + var stk [7]uint64 + srt := uint64Slice(stk[:]) + + if cap(srt) < n { + srt = make([]uint64, n) + } + srt = srt[:n] + + { + // Fill the slice with the indexes observed. Any unused slots will be + // left as zero; these correspond to voters that may report in, but + // haven't yet. We fill from the right (since the zeroes will end up on + // the left after sorting below anyway). + i := n - 1 + for id := range c { + if idx, ok := l.AckedIndex(id); ok { + srt[i] = uint64(idx) + i-- + } + } + } + + // Sort by index. Use a bespoke algorithm (copied from the stdlib's sort + // package) to keep srt on the stack. + insertionSort(srt) + + // The smallest index into the array for which the value is acked by a + // quorum. In other words, from the end of the slice, move n/2+1 to the + // left (accounting for zero-indexing). + pos := n - (n/2 + 1) + return Index(srt[pos]) +} + +// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns +// a result indicating whether the vote is pending (i.e. neither a quorum of +// yes/no has been reached), won (a quorum of yes has been reached), or lost (a +// quorum of no has been reached). +func (c MajorityConfig) VoteResult(votes map[uint64]bool) VoteResult { + if len(c) == 0 { + // By convention, the elections on an empty config win. This comes in + // handy with joint quorums because it'll make a half-populated joint + // quorum behave like a majority quorum. + return VoteWon + } + + ny := [2]int{} // vote counts for no and yes, respectively + + var missing int + for id := range c { + v, ok := votes[id] + if !ok { + missing++ + continue + } + if v { + ny[1]++ + } else { + ny[0]++ + } + } + + q := len(c)/2 + 1 + if ny[1] >= q { + return VoteWon + } + if ny[1]+missing >= q { + return VotePending + } + return VoteLost +} diff --git a/raft/quorum/quick_test.go b/raft/quorum/quick_test.go new file mode 100644 index 00000000000..45fb6b00a74 --- /dev/null +++ b/raft/quorum/quick_test.go @@ -0,0 +1,122 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "math" + "math/rand" + "reflect" + "testing" + "testing/quick" +) + +// TestQuick uses quickcheck to heuristically assert that the main +// implementation of (MajorityConfig).CommittedIndex agrees with a "dumb" +// alternative version. +func TestQuick(t *testing.T) { + cfg := &quick.Config{ + MaxCount: 50000, + } + + t.Run("majority_commit", func(t *testing.T) { + fn1 := func(c memberMap, l idxMap) uint64 { + return uint64(MajorityConfig(c).CommittedIndex(mapAckIndexer(l))) + } + fn2 := func(c memberMap, l idxMap) uint64 { + return uint64(alternativeMajorityCommittedIndex(MajorityConfig(c), mapAckIndexer(l))) + } + if err := quick.CheckEqual(fn1, fn2, cfg); err != nil { + t.Fatal(err) + } + }) +} + +// smallRandIdxMap returns a reasonably sized map of ids to commit indexes. +func smallRandIdxMap(rand *rand.Rand, size int) map[uint64]Index { + // Hard-code a reasonably small size here (quick will hard-code 50, which + // is not useful here). + size = 10 + + n := rand.Intn(size) + ids := rand.Perm(2 * n)[:n] + idxs := make([]int, len(ids)) + for i := range idxs { + idxs[i] = rand.Intn(n) + } + + m := map[uint64]Index{} + for i := range ids { + m[uint64(ids[i])] = Index(idxs[i]) + } + return m +} + +type idxMap map[uint64]Index + +func (idxMap) Generate(rand *rand.Rand, size int) reflect.Value { + m := smallRandIdxMap(rand, size) + return reflect.ValueOf(m) +} + +type memberMap map[uint64]struct{} + +func (memberMap) Generate(rand *rand.Rand, size int) reflect.Value { + m := smallRandIdxMap(rand, size) + mm := map[uint64]struct{}{} + for id := range m { + mm[id] = struct{}{} + } + return reflect.ValueOf(mm) +} + +// This is an alternative implementation of (MajorityConfig).CommittedIndex(l). +func alternativeMajorityCommittedIndex(c MajorityConfig, l AckedIndexer) Index { + if len(c) == 0 { + return math.MaxUint64 + } + + idToIdx := map[uint64]Index{} + for id := range c { + if idx, ok := l.AckedIndex(id); ok { + idToIdx[id] = idx + } + } + + // Build a map from index to voters who have acked that or any higher index. + idxToVotes := map[Index]int{} + for _, idx := range idToIdx { + idxToVotes[idx] = 0 + } + + for _, idx := range idToIdx { + for idy := range idxToVotes { + if idy > idx { + continue + } + idxToVotes[idy]++ + } + } + + // Find the maximum index that has achieved quorum. + q := len(c)/2 + 1 + var maxQuorumIdx Index + for idx, n := range idxToVotes { + if n >= q && idx > maxQuorumIdx { + maxQuorumIdx = idx + } + } + + return maxQuorumIdx +} diff --git a/raft/quorum/quorum.go b/raft/quorum/quorum.go new file mode 100644 index 00000000000..ff9c6f48d89 --- /dev/null +++ b/raft/quorum/quorum.go @@ -0,0 +1,57 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package quorum + +import ( + "math" + "strconv" +) + +type Index uint64 + +func (i Index) String() string { + if i == math.MaxUint64 { + return "∞" + } + return strconv.FormatUint(uint64(i), 10) +} + +// AckedIndexer allows looking up a commit index for a given ID of a voter +// from a corresponding MajorityConfig. +type AckedIndexer interface { + AckedIndex(voterID uint64) (idx Index, found bool) +} + +type mapAckIndexer map[uint64]Index + +func (m mapAckIndexer) AckedIndex(id uint64) (Index, bool) { + idx, ok := m[id] + return idx, ok +} + +// VoteResult indicates the outcome of a vote. +// +//go:generate stringer -type=VoteResult +type VoteResult uint8 + +const ( + // VotePending indicates that the decision of the vote depends on future + // votes, i.e. neither "yes" or "no" has reached quorum yet. + VotePending VoteResult = 1 + iota + // VoteLost indicates that the quorum has voted "no". + VoteLost + // VoteWon indicates that the quorum has voted "yes". + VoteWon +) diff --git a/raft/quorum/testdata/joint_commit.txt b/raft/quorum/testdata/joint_commit.txt new file mode 100644 index 00000000000..12f19fb331c --- /dev/null +++ b/raft/quorum/testdata/joint_commit.txt @@ -0,0 +1,481 @@ +# No difference between a simple majority quorum and a simple majority quorum +# joint with an empty majority quorum. (This is asserted for all datadriven tests +# by the framework, so we don't dwell on it more). +# +# Note that by specifying cfgj explicitly we tell the test harness to treat the +# input as a joint quorum and not a majority quorum. If we didn't specify +# cfgj=zero the test would pass just the same, but it wouldn't be exercising the +# joint quorum path. +committed cfg=(1,2,3) cfgj=zero idx=(100,101,99) +---- + idx +x> 100 (id=1) +xx> 101 (id=2) +> 99 (id=3) +100 + +# Joint nonoverlapping singleton quorums. + +committed cfg=(1) cfgj=(2) idx=(_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +0 + +# Voter 1 has 100 committed, 2 nothing. This means we definitely won't commit +# past 100. +committed cfg=(1) cfgj=(2) idx=(100,_) +---- + idx +x> 100 (id=1) +? 0 (id=2) +0 + +# Committed index collapses once both majorities do, to the lower index. +committed cfg=(1) cfgj=(2) idx=(13, 100) +---- + idx +> 13 (id=1) +x> 100 (id=2) +13 + +# Joint overlapping (i.e. identical) singleton quorum. + +committed cfg=(1) cfgj=(1) idx=(_) +---- + idx +? 0 (id=1) +0 + +committed cfg=(1) cfgj=(1) idx=(100) +---- + idx +> 100 (id=1) +100 + + + +# Two-node config joint with non-overlapping single node config +committed cfg=(1,3) cfgj=(2) idx=(_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,3) cfgj=(2) idx=(100,_,_) +---- + idx +xx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +# 1 has 100 committed, 2 has 50 (collapsing half of the joint quorum to 50). +committed cfg=(1,3) cfgj=(2) idx=(100,_,50) +---- + idx +xx> 100 (id=1) +x> 50 (id=2) +? 0 (id=3) +0 + +# 2 reports 45, collapsing the other half (to 45). +committed cfg=(1,3) cfgj=(2) idx=(100,45,50) +---- + idx +xx> 100 (id=1) +x> 50 (id=2) +> 45 (id=3) +45 + +# Two-node config with overlapping single-node config. + +committed cfg=(1,2) cfgj=(2) idx=(_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +0 + +# 1 reports 100. +committed cfg=(1,2) cfgj=(2) idx=(100,_) +---- + idx +x> 100 (id=1) +? 0 (id=2) +0 + +# 2 reports 100. +committed cfg=(1,2) cfgj=(2) idx=(_,100) +---- + idx +? 0 (id=1) +x> 100 (id=2) +0 + +committed cfg=(1,2) cfgj=(2) idx=(50,100) +---- + idx +> 50 (id=1) +x> 100 (id=2) +50 + +committed cfg=(1,2) cfgj=(2) idx=(100,50) +---- + idx +x> 100 (id=1) +> 50 (id=2) +50 + + + +# Joint non-overlapping two-node configs. + +committed cfg=(1,2) cfgj=(3,4) idx=(50,_,_,_) +---- + idx +xxx> 50 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +0 + +committed cfg=(1,2) cfgj=(3,4) idx=(50,_,49,_) +---- + idx +xxx> 50 (id=1) +? 0 (id=2) +xx> 49 (id=3) +? 0 (id=4) +0 + +committed cfg=(1,2) cfgj=(3,4) idx=(50,48,49,_) +---- + idx +xxx> 50 (id=1) +x> 48 (id=2) +xx> 49 (id=3) +? 0 (id=4) +0 + +committed cfg=(1,2) cfgj=(3,4) idx=(50,48,49,47) +---- + idx +xxx> 50 (id=1) +x> 48 (id=2) +xx> 49 (id=3) +> 47 (id=4) +47 + +# Joint overlapping two-node configs. +committed cfg=(1,2) cfgj=(2,3) idx=(_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,2) cfgj=(2,3) idx=(100,_,_) +---- + idx +xx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,2) cfgj=(2,3) idx=(_,100,_) +---- + idx +? 0 (id=1) +xx> 100 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,2) cfgj=(2,3) idx=(_,100,99) +---- + idx +? 0 (id=1) +xx> 100 (id=2) +x> 99 (id=3) +0 + +committed cfg=(1,2) cfgj=(2,3) idx=(101,100,99) +---- + idx +xx> 101 (id=1) +x> 100 (id=2) +> 99 (id=3) +99 + +# Joint identical two-node configs. +committed cfg=(1,2) cfgj=(1,2) idx=(_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +0 + +committed cfg=(1,2) cfgj=(1,2) idx=(_,40) +---- + idx +? 0 (id=1) +x> 40 (id=2) +0 + +committed cfg=(1,2) cfgj=(1,2) idx=(41,40) +---- + idx +x> 41 (id=1) +> 40 (id=2) +40 + + + +# Joint disjoint three-node configs. + +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(_,_,_,_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +? 0 (id=6) +0 + +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,_,_,_,_,_) +---- + idx +xxxxx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +? 0 (id=6) +0 + +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,_,_,90,_,_) +---- + idx +xxxxx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +xxxx> 90 (id=4) +? 0 (id=5) +? 0 (id=6) +0 + +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,99,_,_,_,_) +---- + idx +xxxxx> 100 (id=1) +xxxx> 99 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +? 0 (id=6) +0 + +# First quorum <= 99, second one <= 97. Both quorums guarantee that 90 is +# committed. +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(_,99,90,97,95,_) +---- + idx +? 0 (id=1) +xxxxx> 99 (id=2) +xx> 90 (id=3) +xxxx> 97 (id=4) +xxx> 95 (id=5) +? 0 (id=6) +90 + +# First quorum collapsed to 92. Second one already had at least 95 committed, +# so the result also collapses. +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(92,99,90,97,95,_) +---- + idx +xx> 92 (id=1) +xxxxx> 99 (id=2) +x> 90 (id=3) +xxxx> 97 (id=4) +xxx> 95 (id=5) +? 0 (id=6) +92 + +# Second quorum collapses, but nothing changes in the output. +committed cfg=(1,2,3) cfgj=(4,5,6) idx=(92,99,90,97,95,77) +---- + idx +xx> 92 (id=1) +xxxxx> 99 (id=2) +x> 90 (id=3) +xxxx> 97 (id=4) +xxx> 95 (id=5) +> 77 (id=6) +92 + + +# Joint overlapping three-node configs. + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,_,_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +0 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,_,_,_,_) +---- + idx +xxxx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +0 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,101,_,_,_) +---- + idx +xxx> 100 (id=1) +xxxx> 101 (id=2) +? 0 (id=3) +? 0 (id=4) +? 0 (id=5) +0 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,101,100,_,_) +---- + idx +xx> 100 (id=1) +xxxx> 101 (id=2) +> 100 (id=3) +? 0 (id=4) +? 0 (id=5) +0 + +# Second quorum could commit either 98 or 99, but first quorum is open. +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,100,_,99,98) +---- + idx +? 0 (id=1) +xxxx> 100 (id=2) +? 0 (id=3) +xxx> 99 (id=4) +xx> 98 (id=5) +0 + +# Additionally, first quorum can commit either 100 or 99 +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,100,99,99,98) +---- + idx +? 0 (id=1) +xxxx> 100 (id=2) +xx> 99 (id=3) +> 99 (id=4) +x> 98 (id=5) +98 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(1,100,99,99,98) +---- + idx +> 1 (id=1) +xxxx> 100 (id=2) +xx> 99 (id=3) +> 99 (id=4) +x> 98 (id=5) +98 + +committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,100,99,99,98) +---- + idx +xxx> 100 (id=1) +> 100 (id=2) +x> 99 (id=3) +> 99 (id=4) +> 98 (id=5) +99 + + +# More overlap. + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(_,_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +? 0 (id=4) +0 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(_,100,99,_) +---- + idx +? 0 (id=1) +xxx> 100 (id=2) +xx> 99 (id=3) +? 0 (id=4) +99 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(98,100,99,_) +---- + idx +x> 98 (id=1) +xxx> 100 (id=2) +xx> 99 (id=3) +? 0 (id=4) +99 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,100,99,_) +---- + idx +xx> 100 (id=1) +> 100 (id=2) +x> 99 (id=3) +? 0 (id=4) +99 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,100,99,98) +---- + idx +xx> 100 (id=1) +> 100 (id=2) +x> 99 (id=3) +> 98 (id=4) +99 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,_,_,101) +---- + idx +xx> 100 (id=1) +? 0 (id=2) +? 0 (id=3) +xxx> 101 (id=4) +0 + +committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,99,_,101) +---- + idx +xx> 100 (id=1) +x> 99 (id=2) +? 0 (id=3) +xxx> 101 (id=4) +99 + +# Identical. This is also exercised in the test harness, so it's listed here +# only briefly. +committed cfg=(1,2,3) cfgj=(1,2,3) idx=(50,45,_) +---- + idx +xx> 50 (id=1) +x> 45 (id=2) +? 0 (id=3) +45 diff --git a/raft/quorum/testdata/joint_vote.txt b/raft/quorum/testdata/joint_vote.txt new file mode 100644 index 00000000000..36cd0cabcff --- /dev/null +++ b/raft/quorum/testdata/joint_vote.txt @@ -0,0 +1,165 @@ +# Empty joint config wins all votes. This isn't used in production. Note that +# by specifying cfgj explicitly we tell the test harness to treat the input as +# a joint quorum and not a majority quorum. +vote cfgj=zero +---- +VoteWon + +# More examples with close to trivial configs. + +vote cfg=(1) cfgj=zero votes=(_) +---- +VotePending + +vote cfg=(1) cfgj=zero votes=(y) +---- +VoteWon + +vote cfg=(1) cfgj=zero votes=(n) +---- +VoteLost + +vote cfg=(1) cfgj=(1) votes=(_) +---- +VotePending + +vote cfg=(1) cfgj=(1) votes=(y) +---- +VoteWon + +vote cfg=(1) cfgj=(1) votes=(n) +---- +VoteLost + +vote cfg=(1) cfgj=(2) votes=(_,_) +---- +VotePending + +vote cfg=(1) cfgj=(2) votes=(y,_) +---- +VotePending + +vote cfg=(1) cfgj=(2) votes=(y,y) +---- +VoteWon + +vote cfg=(1) cfgj=(2) votes=(y,n) +---- +VoteLost + +vote cfg=(1) cfgj=(2) votes=(n,_) +---- +VoteLost + +vote cfg=(1) cfgj=(2) votes=(n,n) +---- +VoteLost + +vote cfg=(1) cfgj=(2) votes=(n,y) +---- +VoteLost + +# Two node configs. + +vote cfg=(1,2) cfgj=(3,4) votes=(_,_,_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(3,4) votes=(y,_,_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,n,_) +---- +VoteLost + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,n,n) +---- +VoteLost + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,y,n) +---- +VoteLost + +vote cfg=(1,2) cfgj=(3,4) votes=(y,y,y,y) +---- +VoteWon + +vote cfg=(1,2) cfgj=(2,3) votes=(_,_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(2,3) votes=(_,n,_) +---- +VoteLost + +vote cfg=(1,2) cfgj=(2,3) votes=(y,y,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(2,3) votes=(y,y,n) +---- +VoteLost + +vote cfg=(1,2) cfgj=(2,3) votes=(y,y,y) +---- +VoteWon + +vote cfg=(1,2) cfgj=(1,2) votes=(_,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(1,2) votes=(y,_) +---- +VotePending + +vote cfg=(1,2) cfgj=(1,2) votes=(y,n) +---- +VoteLost + +vote cfg=(1,2) cfgj=(1,2) votes=(n,_) +---- +VoteLost + +vote cfg=(1,2) cfgj=(1,2) votes=(n,n) +---- +VoteLost + + +# Simple example for overlapping three node configs. + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,_,_,_) +---- +VotePending + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,n,_,_) +---- +VotePending + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,n,n,_) +---- +VoteLost + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,y,y,_) +---- +VoteWon + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,_,_) +---- +VotePending + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,_) +---- +VotePending + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,n) +---- +VoteLost + +vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,y) +---- +VoteWon diff --git a/raft/quorum/testdata/majority_commit.txt b/raft/quorum/testdata/majority_commit.txt new file mode 100644 index 00000000000..6ff5d0b89e0 --- /dev/null +++ b/raft/quorum/testdata/majority_commit.txt @@ -0,0 +1,153 @@ +# The empty quorum commits "everything". This is useful for its use in joint +# quorums. +committed +---- +∞ + + + +# A single voter quorum is not final when no index is known. +committed cfg=(1) idx=(_) +---- + idx +? 0 (id=1) +0 + +# When an index is known, that's the committed index, and that's final. +committed cfg=(1) idx=(12) +---- + idx +> 12 (id=1) +12 + + + + +# With two nodes, start out similarly. +committed cfg=(1, 2) idx=(_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +0 + +# The first committed index becomes known (for n1). Nothing changes in the +# output because idx=12 is not known to be on a quorum (which is both nodes). +committed cfg=(1, 2) idx=(12,_) +---- + idx +x> 12 (id=1) +? 0 (id=2) +0 + +# The second index comes in and finalize the decision. The result will be the +# smaller of the two indexes. +committed cfg=(1,2) idx=(12,5) +---- + idx +x> 12 (id=1) +> 5 (id=2) +5 + + + + +# No surprises for three nodes. +committed cfg=(1,2,3) idx=(_,_,_) +---- + idx +? 0 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +committed cfg=(1,2,3) idx=(12,_,_) +---- + idx +xx> 12 (id=1) +? 0 (id=2) +? 0 (id=3) +0 + +# We see a committed index, but a higher committed index for the last pending +# votes could change (increment) the outcome, so not final yet. +committed cfg=(1,2,3) idx=(12,5,_) +---- + idx +xx> 12 (id=1) +x> 5 (id=2) +? 0 (id=3) +5 + +# a) the case in which it does: +committed cfg=(1,2,3) idx=(12,5,6) +---- + idx +xx> 12 (id=1) +> 5 (id=2) +x> 6 (id=3) +6 + +# b) the case in which it does not: +committed cfg=(1,2,3) idx=(12,5,4) +---- + idx +xx> 12 (id=1) +x> 5 (id=2) +> 4 (id=3) +5 + +# c) a different case in which the last index is pending but it has no chance of +# swaying the outcome (because nobody in the current quorum agrees on anything +# higher than the candidate): +committed cfg=(1,2,3) idx=(5,5,_) +---- + idx +x> 5 (id=1) +> 5 (id=2) +? 0 (id=3) +5 + +# c) continued: Doesn't matter what shows up last. The result is final. +committed cfg=(1,2,3) idx=(5,5,12) +---- + idx +> 5 (id=1) +> 5 (id=2) +xx> 12 (id=3) +5 + +# With all committed idx known, the result is final. +committed cfg=(1, 2, 3) idx=(100, 101, 103) +---- + idx +> 100 (id=1) +x> 101 (id=2) +xx> 103 (id=3) +101 + + + +# Some more complicated examples. Similar to case c) above. The result is +# already final because no index higher than 103 is one short of quorum. +committed cfg=(1, 2, 3, 4, 5) idx=(101, 104, 103, 103,_) +---- + idx +x> 101 (id=1) +xxxx> 104 (id=2) +xx> 103 (id=3) +> 103 (id=4) +? 0 (id=5) +103 + +# A similar case which is not final because another vote for >= 103 would change +# the outcome. +committed cfg=(1, 2, 3, 4, 5) idx=(101, 102, 103, 103,_) +---- + idx +x> 101 (id=1) +xx> 102 (id=2) +xxx> 103 (id=3) +> 103 (id=4) +? 0 (id=5) +102 diff --git a/raft/quorum/testdata/majority_vote.txt b/raft/quorum/testdata/majority_vote.txt new file mode 100644 index 00000000000..5f9564b4f51 --- /dev/null +++ b/raft/quorum/testdata/majority_vote.txt @@ -0,0 +1,97 @@ +# The empty config always announces a won vote. +vote +---- +VoteWon + +vote cfg=(1) votes=(_) +---- +VotePending + +vote cfg=(1) votes=(n) +---- +VoteLost + +vote cfg=(123) votes=(y) +---- +VoteWon + + + + +vote cfg=(4,8) votes=(_,_) +---- +VotePending + +# With two voters, a single rejection loses the vote. +vote cfg=(4,8) votes=(n,_) +---- +VoteLost + +vote cfg=(4,8) votes=(y,_) +---- +VotePending + +vote cfg=(4,8) votes=(n,y) +---- +VoteLost + +vote cfg=(4,8) votes=(y,y) +---- +VoteWon + + + +vote cfg=(2,4,7) votes=(_,_,_) +---- +VotePending + +vote cfg=(2,4,7) votes=(n,_,_) +---- +VotePending + +vote cfg=(2,4,7) votes=(y,_,_) +---- +VotePending + +vote cfg=(2,4,7) votes=(n,n,_) +---- +VoteLost + +vote cfg=(2,4,7) votes=(y,n,_) +---- +VotePending + +vote cfg=(2,4,7) votes=(y,y,_) +---- +VoteWon + +vote cfg=(2,4,7) votes=(y,y,n) +---- +VoteWon + +vote cfg=(2,4,7) votes=(n,y,n) +---- +VoteLost + + + +# Test some random example with seven nodes (why not). +vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,_,_,_) +---- +VotePending + +vote cfg=(1,2,3,4,5,6,7) votes=(_,y,y,_,n,y,n) +---- +VotePending + +vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,_,n,y) +---- +VoteWon + +vote cfg=(1,2,3,4,5,6,7) votes=(y,y,_,n,y,n,n) +---- +VotePending + +vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,n,n,n) +---- +VoteLost diff --git a/raft/quorum/voteresult_string.go b/raft/quorum/voteresult_string.go new file mode 100644 index 00000000000..9eca8fd0c96 --- /dev/null +++ b/raft/quorum/voteresult_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type=VoteResult"; DO NOT EDIT. + +package quorum + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[VotePending-1] + _ = x[VoteLost-2] + _ = x[VoteWon-3] +} + +const _VoteResult_name = "VotePendingVoteLostVoteWon" + +var _VoteResult_index = [...]uint8{0, 11, 19, 26} + +func (i VoteResult) String() string { + i -= 1 + if i >= VoteResult(len(_VoteResult_index)-1) { + return "VoteResult(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _VoteResult_name[_VoteResult_index[i]:_VoteResult_index[i+1]] +} diff --git a/raft/raft.go b/raft/raft.go index 61df549b96d..06ba6bf12b2 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "go.etcd.io/etcd/raft/quorum" pb "go.etcd.io/etcd/raft/raftpb" ) @@ -343,7 +344,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: makePRS(c.MaxInflightMsgs), + prs: makeProgressTracker(c.MaxInflightMsgs), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -744,7 +745,7 @@ func (r *raft) campaign(t CampaignType) { voteMsg = pb.MsgVote term = r.Term } - if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon { + if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. if t == campaignPreElection { @@ -754,7 +755,7 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs.nodes { + for id := range r.prs.voters.IDs() { if id == r.id { continue } @@ -769,15 +770,7 @@ func (r *raft) campaign(t CampaignType) { } } -type electionResult byte - -const ( - electionIndeterminate electionResult = iota - electionLost - electionWon -) - -func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) { +func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) { if v { r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) } else { @@ -999,7 +992,9 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastAppend() return nil case pb.MsgReadIndex: - if !r.prs.isSingleton() { // more than one voting member in cluster + // If more than the local vote is needed, go through a full broadcast, + // otherwise optimize. + if !r.prs.isSingleton() { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return nil @@ -1110,7 +1105,7 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) { + if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { return nil } @@ -1210,14 +1205,14 @@ func stepCandidate(r *raft, m pb.Message) error { gr, rj, res := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) switch res { - case electionWon: + case quorum.VoteWon: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } - case electionLost: + case quorum.VoteLost: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) @@ -1346,7 +1341,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) - r.prs = makePRS(r.prs.maxInflight) + r.prs = makeProgressTracker(r.prs.maxInflight) r.restoreNode(s.Metadata.ConfState.Nodes, false) r.restoreNode(s.Metadata.ConfState.Learners, true) return true @@ -1417,7 +1412,7 @@ func (r *raft) removeNode(id uint64) { r.prs.removeAny(id) // Do not try to commit or abort transferring if the cluster is now empty. - if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 { + if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 { return } diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 699bb5b0780..033e336921c 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -29,7 +29,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window @@ -65,7 +65,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window @@ -110,7 +110,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 145473824c6..246ed07e207 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { // force set the next of node 2, so that // node 2 needs a snapshot - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() + sm.prs.prs[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) - if sm.prs.nodes[2].PendingSnapshot != 11 { - t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true}) + if sm.prs.prs[2].PendingSnapshot != 11 { + t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot) } } @@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) msgs := sm.readMessages() @@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } - if sm.prs.nodes[2].Next != 1 { - t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 1 { + t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next) } - if !sm.prs.nodes[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) + if !sm.prs.prs[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused) } } @@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } - if sm.prs.nodes[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 12 { + t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next) } - if !sm.prs.nodes[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) + if !sm.prs.prs[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused) } } @@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { mustSend(n2, n1, pb.MsgAppResp) // Leader has correct state for follower. - pr := n1.prs.nodes[2] + pr := n1.prs.prs[2] if pr.State != ProgressStateReplicate { t.Fatalf("unexpected state %v", pr) } @@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) // A successful msgAppResp that has a higher/equal index than the // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } // The follower entered ProgressStateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. - if sm.prs.nodes[2].Next != 13 { - t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 13 { + t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next) } - if n := sm.prs.nodes[2].ins.count; n != 1 { + if n := sm.prs.prs[2].ins.count; n != 1 { t.Fatalf("expected an inflight message, got %d", n) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index d4f0fa26828..40be17cfc8f 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -271,12 +271,12 @@ func TestProgressLeader(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() // Send proposals to r1. The first 5 entries should be appended to the log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} for i := 0; i < 5; i++ { - if pr := r.prs.nodes[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { + if pr := r.prs.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { t.Errorf("unexpected progress %v", pr) } if err := r.Step(propMsg); err != nil { @@ -291,17 +291,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.prs.nodes[2].Paused = true + r.prs.prs[2].Paused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want false", r.prs.nodes[2].Paused) + if r.prs.prs[2].Paused { + t.Errorf("paused = %v, want false", r.prs.prs[2].Paused) } } @@ -331,7 +331,7 @@ func TestProgressFlowControl(t *testing.T) { r.readMessages() // While node 2 is in probe state, propose a bunch of entries. - r.prs.nodes[2].becomeProbe() + r.prs.prs[2].becomeProbe() blob := []byte(strings.Repeat("a", 1000)) for i := 0; i < 10; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) @@ -409,8 +409,8 @@ func TestUncommittedEntryLimit(t *testing.T) { // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 - r.prs.nodes[2].becomeReplicate() - r.prs.nodes[3].becomeReplicate() + r.prs.prs[2].becomeReplicate() + r.prs.prs[3].becomeReplicate() r.uncommittedSize = 0 // Send proposals to r1. The first 5 entries should be appended to the log. @@ -2632,7 +2632,7 @@ func TestLeaderAppResp(t *testing.T) { sm.readMessages() sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) - p := sm.prs.nodes[2] + p := sm.prs.prs[2] if p.Match != tt.wmatch { t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) } @@ -2679,9 +2679,9 @@ func TestBcastBeat(t *testing.T) { mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) } // slow follower - sm.prs.nodes[2].Match, sm.prs.nodes[2].Next = 5, 6 + sm.prs.prs[2].Match, sm.prs.prs[2].Next = 5, 6 // normal follower - sm.prs.nodes[3].Match, sm.prs.nodes[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.prs.prs[3].Match, sm.prs.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() @@ -2689,8 +2689,8 @@ func TestBcastBeat(t *testing.T) { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } wantCommitMap := map[uint64]uint64{ - 2: min(sm.raftLog.committed, sm.prs.nodes[2].Match), - 3: min(sm.raftLog.committed, sm.prs.nodes[3].Match), + 2: min(sm.raftLog.committed, sm.prs.prs[2].Match), + 3: min(sm.raftLog.committed, sm.prs.prs[3].Match), } for i, m := range msgs { if m.Type != pb.MsgHeartbeat { @@ -2776,11 +2776,11 @@ func TestLeaderIncreaseNext(t *testing.T) { sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].State = tt.state - sm.prs.nodes[2].Next = tt.next + sm.prs.prs[2].State = tt.state + sm.prs.prs[2].Next = tt.next sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - p := sm.prs.nodes[2] + p := sm.prs.prs[2] if p.Next != tt.wnext { t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) } @@ -2792,7 +2792,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeProbe() + r.prs.prs[2].becomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2811,8 +2811,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2826,8 +2826,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } // consume the heartbeat @@ -2849,8 +2849,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } } @@ -2859,7 +2859,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2876,7 +2876,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeSnapshot(10) + r.prs.prs[2].becomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2897,17 +2897,17 @@ func TestRecvMsgUnreachable(t *testing.T) { r.becomeLeader() r.readMessages() // set node 2 to state replicate - r.prs.nodes[2].Match = 3 - r.prs.nodes[2].becomeReplicate() - r.prs.nodes[2].optimisticUpdate(5) + r.prs.prs[2].Match = 3 + r.prs.prs[2].becomeReplicate() + r.prs.prs[2].optimisticUpdate(5) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - if r.prs.nodes[2].State != ProgressStateProbe { - t.Errorf("state = %s, want %s", r.prs.nodes[2].State, ProgressStateProbe) + if r.prs.prs[2].State != ProgressStateProbe { + t.Errorf("state = %s, want %s", r.prs.prs[2].State, ProgressStateProbe) } - if wnext := r.prs.nodes[2].Match + 1; r.prs.nodes[2].Next != wnext { - t.Errorf("next = %d, want %d", r.prs.nodes[2].Next, wnext) + if wnext := r.prs.prs[2].Match + 1; r.prs.prs[2].Next != wnext { + t.Errorf("next = %d, want %d", r.prs.prs[2].Next, wnext) } } @@ -2973,13 +2973,13 @@ func TestRestoreWithLearner(t *testing.T) { t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } for _, n := range s.Metadata.ConfState.Nodes { - if sm.prs.nodes[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], false) + if sm.prs.prs[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], false) } } for _, n := range s.Metadata.ConfState.Learners { - if !sm.prs.learners[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], true) + if !sm.prs.prs[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], true) } } @@ -3121,8 +3121,8 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() // force set the next of node 2, so that node 2 needs a snapshot - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) + sm.prs.prs[2].Next = sm.raftLog.firstIndex() + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true}) msgs := sm.readMessages() if len(msgs) != 1 { @@ -3152,8 +3152,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { // force set the next of node 2, so that node 2 needs a snapshot // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - 1 - sm.prs.nodes[2].RecentActive = false + sm.prs.prs[2].Next = sm.raftLog.firstIndex() - 1 + sm.prs.prs[2].RecentActive = false sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -3201,7 +3201,7 @@ func TestSlowNodeRestore(t *testing.T) { // node 3 will only be considered as active when node 1 receives a reply from it. for { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if lead.prs.nodes[3].RecentActive { + if lead.prs.prs[3].RecentActive { break } } @@ -3304,8 +3304,8 @@ func TestAddLearner(t *testing.T) { if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) } - if !r.prs.learners[2].IsLearner { - t.Errorf("node 2 is learner %t, want %t", r.prs.nodes[2].IsLearner, true) + if !r.prs.prs[2].IsLearner { + t.Errorf("node 2 is learner %t, want %t", r.prs.prs[2].IsLearner, true) } } @@ -3619,8 +3619,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - if lead.prs.nodes[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) + if lead.prs.prs[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of log. @@ -3642,8 +3642,8 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.storage[1].Compact(lead.raftLog.applied) nt.recover() - if lead.prs.nodes[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) + if lead.prs.prs[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of snapshot. @@ -3722,8 +3722,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { t.Fatalf("should return drop proposal error while transferring") } - if lead.prs.nodes[1].Match != 1 { - t.Fatalf("node 1 has match %x, want %x", lead.prs.nodes[1].Match, 1) + if lead.prs.prs[1].Match != 1 { + t.Fatalf("node 1 has match %x, want %x", lead.prs.prs[1].Match, 1) } } @@ -4334,14 +4334,19 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs.nodes = make(map[uint64]*Progress) - v.prs.learners = make(map[uint64]*Progress) + v.prs.voters[0] = make(map[uint64]struct{}) + v.prs.voters[1] = make(map[uint64]struct{}) + v.prs.learners = make(map[uint64]struct{}) + v.prs.prs = make(map[uint64]*Progress) for i := 0; i < size; i++ { + pr := &Progress{} if _, ok := learners[peerAddrs[i]]; ok { - v.prs.learners[peerAddrs[i]] = &Progress{IsLearner: true} + pr.IsLearner = true + v.prs.learners[peerAddrs[i]] = struct{}{} } else { - v.prs.nodes[peerAddrs[i]] = &Progress{} + v.prs.voters[0][peerAddrs[i]] = struct{}{} } + v.prs.prs[peerAddrs[i]] = pr } v.reset(v.Term) npeers[id] = v diff --git a/raft/read_only.go b/raft/read_only.go index 39eb2b06515..6987f1bd7d7 100644 --- a/raft/read_only.go +++ b/raft/read_only.go @@ -29,7 +29,11 @@ type ReadState struct { type readIndexStatus struct { req pb.Message index uint64 - acks map[uint64]struct{} + // NB: this never records 'false', but it's more convenient to use this + // instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If + // this becomes performance sensitive enough (doubtful), quorum.VoteResult + // can change to an API that is closer to that of CommittedIndex. + acks map[uint64]bool } type readOnly struct { @@ -54,20 +58,20 @@ func (ro *readOnly) addRequest(index uint64, m pb.Message) { if _, ok := ro.pendingReadIndex[s]; ok { return } - ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)} ro.readIndexQueue = append(ro.readIndexQueue, s) } // recvAck notifies the readonly struct that the raft state machine received // an acknowledgment of the heartbeat that attached with the read only request // context. -func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} { +func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool { rs, ok := ro.pendingReadIndex[string(context)] if !ok { return nil } - rs.acks[id] = struct{}{} + rs.acks[id] = true return rs.acks } diff --git a/vendor/github.com/cockroachdb/datadriven/LICENSE b/vendor/github.com/cockroachdb/datadriven/LICENSE new file mode 100644 index 00000000000..261eeb9e9f8 --- /dev/null +++ b/vendor/github.com/cockroachdb/datadriven/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/cockroachdb/datadriven/datadriven.go b/vendor/github.com/cockroachdb/datadriven/datadriven.go new file mode 100644 index 00000000000..49e73ce380f --- /dev/null +++ b/vendor/github.com/cockroachdb/datadriven/datadriven.go @@ -0,0 +1,318 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package datadriven + +import ( + "bufio" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "testing" +) + +var ( + rewriteTestFiles = flag.Bool( + "rewrite", false, + "ignore the expected results and rewrite the test files with the actual results from this "+ + "run. Used to update tests when a change affects many cases; please verify the testfile "+ + "diffs carefully!", + ) +) + +// RunTest invokes a data-driven test. The test cases are contained in a +// separate test file and are dynamically loaded, parsed, and executed by this +// testing framework. By convention, test files are typically located in a +// sub-directory called "testdata". Each test file has the following format: +// +// [,...] [arg | arg=val | arg=(val1, val2, ...)]... +// +// ---- +// +// +// The command input can contain blank lines. However, by default, the expected +// results cannot contain blank lines. This alternate syntax allows the use of +// blank lines: +// +// [,...] [arg | arg=val | arg=(val1, val2, ...)]... +// +// ---- +// ---- +// +// +// +// ---- +// ---- +// +// To execute data-driven tests, pass the path of the test file as well as a +// function which can interpret and execute whatever commands are present in +// the test file. The framework invokes the function, passing it information +// about the test case in a TestData struct. The function then returns the +// actual results of the case, which this function compares with the expected +// results, and either succeeds or fails the test. +func RunTest(t *testing.T, path string, f func(d *TestData) string) { + t.Helper() + file, err := os.OpenFile(path, os.O_RDWR, 0644 /* irrelevant */) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = file.Close() + }() + + runTestInternal(t, path, file, f, *rewriteTestFiles) +} + +// RunTestFromString is a version of RunTest which takes the contents of a test +// directly. +func RunTestFromString(t *testing.T, input string, f func(d *TestData) string) { + t.Helper() + runTestInternal(t, "" /* optionalPath */, strings.NewReader(input), f, *rewriteTestFiles) +} + +func runTestInternal( + t *testing.T, sourceName string, reader io.Reader, f func(d *TestData) string, rewrite bool, +) { + t.Helper() + + r := newTestDataReader(t, sourceName, reader, rewrite) + for r.Next(t) { + d := &r.data + actual := func() string { + defer func() { + if r := recover(); r != nil { + fmt.Printf("\npanic during %s:\n%s\n", d.Pos, d.Input) + panic(r) + } + }() + return f(d) + }() + + if r.rewrite != nil { + r.emit("----") + if hasBlankLine(actual) { + r.emit("----") + r.rewrite.WriteString(actual) + r.emit("----") + r.emit("----") + } else { + r.emit(actual) + } + } else if d.Expected != actual { + t.Fatalf("\n%s: %s\nexpected:\n%s\nfound:\n%s", d.Pos, d.Input, d.Expected, actual) + } else if testing.Verbose() { + input := d.Input + if input == "" { + input = "" + } + // TODO(tbg): it's awkward to reproduce the args, but it would be helpful. + fmt.Printf("\n%s:\n%s [%d args]\n%s\n----\n%s", d.Pos, d.Cmd, len(d.CmdArgs), input, actual) + } + } + + if r.rewrite != nil { + data := r.rewrite.Bytes() + if l := len(data); l > 2 && data[l-1] == '\n' && data[l-2] == '\n' { + data = data[:l-1] + } + if dest, ok := reader.(*os.File); ok { + if _, err := dest.WriteAt(data, 0); err != nil { + t.Fatal(err) + } + if err := dest.Truncate(int64(len(data))); err != nil { + t.Fatal(err) + } + if err := dest.Sync(); err != nil { + t.Fatal(err) + } + } else { + t.Logf("input is not a file; rewritten output is:\n%s", data) + } + } +} + +// Walk goes through all the files in a subdirectory, creating subtests to match +// the file hierarchy; for each "leaf" file, the given function is called. +// +// This can be used in conjunction with RunTest. For example: +// +// datadriven.Walk(t, path, func (t *testing.T, path string) { +// // initialize per-test state +// datadriven.RunTest(t, path, func (d *datadriven.TestData) { +// // ... +// } +// } +// +// Files: +// testdata/typing +// testdata/logprops/scan +// testdata/logprops/select +// +// If path is "testdata/typing", the function is called once and no subtests +// care created. +// +// If path is "testdata/logprops", the function is called two times, in +// separate subtests /scan, /select. +// +// If path is "testdata", the function is called three times, in subtest +// hierarchy /typing, /logprops/scan, /logprops/select. +// +func Walk(t *testing.T, path string, f func(t *testing.T, path string)) { + finfo, err := os.Stat(path) + if err != nil { + t.Fatal(err) + } + if !finfo.IsDir() { + f(t, path) + return + } + files, err := ioutil.ReadDir(path) + if err != nil { + t.Fatal(err) + } + for _, file := range files { + t.Run(file.Name(), func(t *testing.T) { + Walk(t, filepath.Join(path, file.Name()), f) + }) + } +} + +// TestData contains information about one data-driven test case that was +// parsed from the test file. +type TestData struct { + Pos string // reader and line number + + // Cmd is the first string on the directive line (up to the first whitespace). + Cmd string + + CmdArgs []CmdArg + + Input string + Expected string +} + +// ScanArgs looks up the first CmdArg matching the given key and scans it into +// the given destinations in order. If the arg does not exist, the number of +// destinations does not match that of the arguments, or a destination can not +// be populated from its matching value, a fatal error results. +// +// For example, for a TestData originating from +// +// cmd arg1=50 arg2=yoruba arg3=(50, 50, 50) +// +// the following would be valid: +// +// var i1, i2, i3, i4 int +// var s string +// td.ScanArgs(t, "arg1", &i1) +// td.ScanArgs(t, "arg2", &s) +// td.ScanArgs(t, "arg3", &i2, &i3, &i4) +func (td *TestData) ScanArgs(t *testing.T, key string, dests ...interface{}) { + t.Helper() + var arg CmdArg + for i := range td.CmdArgs { + if td.CmdArgs[i].Key == key { + arg = td.CmdArgs[i] + break + } + } + if arg.Key == "" { + t.Fatalf("missing argument: %s", key) + } + if len(dests) != len(arg.Vals) { + t.Fatalf("%s: got %d destinations, but %d values", arg.Key, len(dests), len(arg.Vals)) + } + + for i := range dests { + arg.Scan(t, i, dests[i]) + + } +} + +// CmdArg contains information about an argument on the directive line. An +// argument is specified in one of the following forms: +// - argument +// - argument=value +// - argument=(values, ...) +type CmdArg struct { + Key string + Vals []string +} + +func (arg CmdArg) String() string { + switch len(arg.Vals) { + case 0: + return arg.Key + + case 1: + return fmt.Sprintf("%s=%s", arg.Key, arg.Vals[0]) + + default: + return fmt.Sprintf("%s=(%s)", arg.Key, strings.Join(arg.Vals, ", ")) + } +} + +// Scan attempts to parse the value at index i into the dest. +func (arg CmdArg) Scan(t *testing.T, i int, dest interface{}) { + if i < 0 || i >= len(arg.Vals) { + t.Fatalf("cannot scan index %d of key %s", i, arg.Key) + } + val := arg.Vals[i] + switch dest := dest.(type) { + case *string: + *dest = val + case *int: + n, err := strconv.ParseInt(val, 10, 64) + if err != nil { + t.Fatal(err) + } + *dest = int(n) // assume 64bit ints + case *uint64: + n, err := strconv.ParseUint(val, 10, 64) + if err != nil { + t.Fatal(err) + } + *dest = n + case *bool: + b, err := strconv.ParseBool(val) + if err != nil { + t.Fatal(err) + } + *dest = b + default: + t.Fatalf("unsupported type %T for destination #%d (might be easy to add it)", dest, i+1) + } +} + +// Fatalf wraps a fatal testing error with test file position information, so +// that it's easy to locate the source of the error. +func (td TestData) Fatalf(tb testing.TB, format string, args ...interface{}) { + tb.Helper() + tb.Fatalf("%s: %s", td.Pos, fmt.Sprintf(format, args...)) +} + +func hasBlankLine(s string) bool { + scanner := bufio.NewScanner(strings.NewReader(s)) + for scanner.Scan() { + if strings.TrimSpace(scanner.Text()) == "" { + return true + } + } + return false +} diff --git a/vendor/github.com/cockroachdb/datadriven/line_scanner.go b/vendor/github.com/cockroachdb/datadriven/line_scanner.go new file mode 100644 index 00000000000..67681dcfda6 --- /dev/null +++ b/vendor/github.com/cockroachdb/datadriven/line_scanner.go @@ -0,0 +1,40 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package datadriven + +import ( + "bufio" + "io" +) + +type lineScanner struct { + *bufio.Scanner + line int +} + +func newLineScanner(r io.Reader) *lineScanner { + return &lineScanner{ + Scanner: bufio.NewScanner(r), + line: 0, + } +} + +func (l *lineScanner) Scan() bool { + ok := l.Scanner.Scan() + if ok { + l.line++ + } + return ok +} diff --git a/vendor/github.com/cockroachdb/datadriven/test_data_reader.go b/vendor/github.com/cockroachdb/datadriven/test_data_reader.go new file mode 100644 index 00000000000..315fbf2dd51 --- /dev/null +++ b/vendor/github.com/cockroachdb/datadriven/test_data_reader.go @@ -0,0 +1,202 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package datadriven + +import ( + "bytes" + "fmt" + "io" + "regexp" + "strings" + "testing" +) + +type testDataReader struct { + sourceName string + reader io.Reader + scanner *lineScanner + data TestData + rewrite *bytes.Buffer +} + +func newTestDataReader( + t *testing.T, sourceName string, file io.Reader, record bool, +) *testDataReader { + t.Helper() + + var rewrite *bytes.Buffer + if record { + rewrite = &bytes.Buffer{} + } + return &testDataReader{ + sourceName: sourceName, + reader: file, + scanner: newLineScanner(file), + rewrite: rewrite, + } +} + +func (r *testDataReader) Next(t *testing.T) bool { + t.Helper() + + r.data = TestData{} + for r.scanner.Scan() { + line := r.scanner.Text() + r.emit(line) + + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "#") { + // Skip comment lines. + continue + } + // Support wrapping directive lines using \, for example: + // build-scalar \ + // vars(int) + for strings.HasSuffix(line, `\`) && r.scanner.Scan() { + nextLine := r.scanner.Text() + r.emit(nextLine) + line = strings.TrimSuffix(line, `\`) + " " + strings.TrimSpace(nextLine) + } + + fields := splitDirectives(t, line) + if len(fields) == 0 { + continue + } + cmd := fields[0] + r.data.Pos = fmt.Sprintf("%s:%d", r.sourceName, r.scanner.line) + r.data.Cmd = cmd + + for _, arg := range fields[1:] { + key := arg + var vals []string + if pos := strings.IndexByte(key, '='); pos >= 0 { + key = arg[:pos] + val := arg[pos+1:] + + if len(val) > 2 && val[0] == '(' && val[len(val)-1] == ')' { + vals = strings.Split(val[1:len(val)-1], ",") + for i := range vals { + vals[i] = strings.TrimSpace(vals[i]) + } + } else { + vals = []string{val} + } + } + r.data.CmdArgs = append(r.data.CmdArgs, CmdArg{Key: key, Vals: vals}) + } + + var buf bytes.Buffer + var separator bool + for r.scanner.Scan() { + line := r.scanner.Text() + if line == "----" { + separator = true + break + } + + r.emit(line) + fmt.Fprintln(&buf, line) + } + + r.data.Input = strings.TrimSpace(buf.String()) + + if separator { + r.readExpected() + } + return true + } + return false +} + +func (r *testDataReader) readExpected() { + var buf bytes.Buffer + var line string + var allowBlankLines bool + + if r.scanner.Scan() { + line = r.scanner.Text() + if line == "----" { + allowBlankLines = true + } + } + + if allowBlankLines { + // Look for two successive lines of "----" before terminating. + for r.scanner.Scan() { + line = r.scanner.Text() + + if line == "----" { + if r.scanner.Scan() { + line2 := r.scanner.Text() + if line2 == "----" { + break + } + + fmt.Fprintln(&buf, line) + fmt.Fprintln(&buf, line2) + continue + } + } + + fmt.Fprintln(&buf, line) + } + } else { + // Terminate on first blank line. + for { + if strings.TrimSpace(line) == "" { + break + } + + fmt.Fprintln(&buf, line) + + if !r.scanner.Scan() { + break + } + + line = r.scanner.Text() + } + } + + r.data.Expected = buf.String() +} + +func (r *testDataReader) emit(s string) { + if r.rewrite != nil { + r.rewrite.WriteString(s) + r.rewrite.WriteString("\n") + } +} + +var splitDirectivesRE = regexp.MustCompile(`^ *[a-zA-Z0-9_,-\.]+(|=[-a-zA-Z0-9_@]+|=\([^)]*\))( |$)`) + +// splits a directive line into tokens, where each token is +// either: +// - a,list,of,things +// - argument +// - argument=value +// - argument=(values, ...) +func splitDirectives(t *testing.T, line string) []string { + var res []string + + for line != "" { + str := splitDirectivesRE.FindString(line) + if len(str) == 0 { + t.Fatalf("cannot parse directive %s\n", line) + } + res = append(res, strings.TrimSpace(line[0:len(str)])) + line = line[len(str):] + } + return res +}