From b5b998b8d1577e4679534c40ef3311b881a3194e Mon Sep 17 00:00:00 2001 From: Vincent Lee Date: Thu, 11 Jan 2018 12:43:55 +0800 Subject: [PATCH] raft: let raft step return error when proposal is dropped to allow fail-fast. --- CODE_OF_CONDUCT.md | 63 +++++++++++++++++++++++++++++++ etcdctl/README.md | 10 ++++- etcdserver/etcdserverpb/rpc.pb.go | 40 ++++++++++++++++++++ mvcc/backend/backend.go | 1 + raft/node_test.go | 9 +++-- raft/raft.go | 50 +++++++++++++----------- raft/raft_paper_test.go | 3 +- raft/raft_test.go | 7 +++- raft/rawnode_test.go | 3 +- 9 files changed, 157 insertions(+), 29 deletions(-) create mode 100644 CODE_OF_CONDUCT.md diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 000000000000..c0c20dd81565 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,63 @@ +## CoreOS Community Code of Conduct + +### Contributor Code of Conduct + +As contributors and maintainers of this project, and in the interest of +fostering an open and welcoming community, we pledge to respect all people who +contribute through reporting issues, posting feature requests, updating +documentation, submitting pull requests or patches, and other activities. + +We are committed to making participation in this project a harassment-free +experience for everyone, regardless of level of experience, gender, gender +identity and expression, sexual orientation, disability, personal appearance, +body size, race, ethnicity, age, religion, or nationality. + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery +* Personal attacks +* Trolling or insulting/derogatory comments +* Public or private harassment +* Publishing others' private information, such as physical or electronic addresses, without explicit permission +* Other unethical or unprofessional conduct. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct. By adopting this Code of Conduct, +project maintainers commit themselves to fairly and consistently applying these +principles to every aspect of managing this project. Project maintainers who do +not follow or enforce the Code of Conduct may be permanently removed from the +project team. + +This code of conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting a project maintainer, Brandon Philips +, and/or Meghan Schofield +. + +This Code of Conduct is adapted from the Contributor Covenant +(http://contributor-covenant.org), version 1.2.0, available at +http://contributor-covenant.org/version/1/2/0/ + +### CoreOS Events Code of Conduct + +CoreOS events are working conferences intended for professional networking and +collaboration in the CoreOS community. Attendees are expected to behave +according to professional standards and in accordance with their employer’s +policies on appropriate workplace behavior. + +While at CoreOS events or related social networking opportunities, attendees +should not engage in discriminatory or offensive speech or actions including +but not limited to gender, sexuality, race, age, disability, or religion. +Speakers should be especially aware of these concerns. + +CoreOS does not condone any statements by speakers contrary to these standards. +CoreOS reserves the right to deny entrance and/or eject from an event (without +refund) any individual found to be engaging in discriminatory or offensive +speech or actions. + +Please bring any concerns to the immediate attention of designated on-site +staff, Brandon Philips , and/or Meghan Schofield +. diff --git a/etcdctl/README.md b/etcdctl/README.md index 17a14ce5cfb1..15cdc986d9c9 100644 --- a/etcdctl/README.md +++ b/etcdctl/README.md @@ -87,6 +87,8 @@ GET gets the key or a range of keys [key, range_end) if range_end is given. RPC: Range +RPC: Range + #### Options - hex -- print out key and value as hex encode string @@ -182,6 +184,8 @@ Removes the specified key or range of keys [key, range_end) if range_end is give RPC: DeleteRange +RPC: DeleteRange + #### Options - prefix -- delete keys by matching prefix @@ -343,6 +347,8 @@ Watch watches events stream on keys or prefixes, [key or prefix, range_end) if r RPC: Watch +RPC: Watch + #### Options - hex -- print out key and value as hex encode string @@ -1367,7 +1373,7 @@ RPC: UserRevokeRole The approximate total number of keys transferred to the destination cluster, updated every 30 seconds. -#### Examples +- dest-cert -- TLS certificate file for destination cluster ``` ./etcdctl make-mirror mirror.example.com:2379 @@ -1434,7 +1440,7 @@ Prints the version of etcdctl. Prints etcd version and API version. -#### Examples +No output on success. ```bash ./etcdctl version diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index 22829bd0c8e7..604e8ce00832 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -10928,6 +10928,46 @@ func (m *Compare) Unmarshal(dAtA []byte) error { } } m.TargetUnion = &Compare_ModRevision{v} + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.TargetUnion = &Compare_CreateRevision{v} + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ModRevision", wireType) + } + var v int64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.TargetUnion = &Compare_ModRevision{v} case 7: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index c0305cf3215e..1d4173dfcb2d 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -373,6 +373,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { } tmpb, berr := tmptx.CreateBucketIfNotExists(next) + tmpb.FillPercent = 0.9 // for seq write in for each if berr != nil { return berr } diff --git a/raft/node_test.go b/raft/node_test.go index ef0c92ad1456..82c3ade37a29 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -109,8 +109,9 @@ func TestNodeStepUnblock(t *testing.T) { // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft. func TestNodePropose(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } n := newNode() @@ -147,8 +148,9 @@ func TestNodePropose(t *testing.T) { // It also ensures that ReadState can be read out through ready chan. func TestNodeReadIndex(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} @@ -284,8 +286,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) { // to the underlying raft. func TestNodeProposeConfig(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } n := newNode() diff --git a/raft/raft.go b/raft/raft.go index b9939fae092f..219dd1ca8de4 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -67,6 +67,8 @@ const ( campaignTransfer CampaignType = "CampaignTransfer" ) +var ErrProposalDropped = errors.New("raft proposal dropped") + // lockedRand is a small wrapper around rand.Rand to provide // synchronization. Only the methods needed by the code are exposed // (e.g. Intn). @@ -872,25 +874,28 @@ func (r *raft) Step(m pb.Message) error { } default: - r.step(r, m) + err := r.step(r, m) + if err != nil { + return err + } } return nil } -type stepFunc func(r *raft, m pb.Message) +type stepFunc func(r *raft, m pb.Message) error -func stepLeader(r *raft, m pb.Message) { +func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { case pb.MsgBeat: r.bcastHeartbeat() - return + return nil case pb.MsgCheckQuorum: if !r.checkQuorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) } - return + return nil case pb.MsgProp: if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) @@ -899,11 +904,11 @@ func stepLeader(r *raft, m pb.Message) { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. - return + return ErrProposalDropped } if r.leadTransferee != None { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) - return + return ErrProposalDropped } for i, e := range m.Entries { @@ -919,7 +924,7 @@ func stepLeader(r *raft, m pb.Message) { } r.appendEntry(m.Entries...) r.bcastAppend() - return + return nil case pb.MsgReadIndex: if r.quorum() > 1 { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { @@ -946,14 +951,14 @@ func stepLeader(r *raft, m pb.Message) { r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } - return + return nil } // All other message types require a progress for m.From (pr). pr := r.getProgress(m.From) if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) - return + return nil } switch m.Type { case pb.MsgAppResp: @@ -1009,12 +1014,12 @@ func stepLeader(r *raft, m pb.Message) { } if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { - return + return nil } ackCount := r.readOnly.recvAck(m) if ackCount < r.quorum() { - return + return nil } rss := r.readOnly.advance(m) @@ -1028,7 +1033,7 @@ func stepLeader(r *raft, m pb.Message) { } case pb.MsgSnapStatus: if pr.State != ProgressStateSnapshot { - return + return nil } if !m.Reject { pr.becomeProbe() @@ -1060,14 +1065,14 @@ func stepLeader(r *raft, m pb.Message) { if lastLeadTransferee == leadTransferee { r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", r.id, r.Term, leadTransferee, leadTransferee) - return + return nil } r.abortLeaderTransfer() r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee) } if leadTransferee == r.id { r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id) - return + return nil } // Transfer leadership to third party. r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee) @@ -1081,11 +1086,12 @@ func stepLeader(r *raft, m pb.Message) { r.sendAppend(leadTransferee) } } + return nil } // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is // whether they respond to MsgVoteResp or MsgPreVoteResp. -func stepCandidate(r *raft, m pb.Message) { +func stepCandidate(r *raft, m pb.Message) error { // Only handle vote responses corresponding to our candidacy (while in // StateCandidate, we may get stale MsgPreVoteResp messages in this term from // our pre-candidate state). @@ -1098,7 +1104,7 @@ func stepCandidate(r *raft, m pb.Message) { switch m.Type { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return + return ErrProposalDropped case pb.MsgApp: r.becomeFollower(r.Term, m.From) r.handleAppendEntries(m) @@ -1125,9 +1131,10 @@ func stepCandidate(r *raft, m pb.Message) { case pb.MsgTimeoutNow: r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) } + return nil } -func stepFollower(r *raft, m pb.Message) { +func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgProp: if r.lead == None { @@ -1154,7 +1161,7 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgTransferLeader: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) - return + return nil } m.To = r.lead r.send(m) @@ -1171,17 +1178,18 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgReadIndex: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) - return + return nil } m.To = r.lead r.send(m) case pb.MsgReadIndexResp: if len(m.Entries) != 1 { r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) - return + return nil } r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) } + return nil } func (r *raft) handleAppendEntries(m pb.Message) { diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 2911e8aa333a..71a7d14ace16 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -79,8 +79,9 @@ func testUpdateTermFromMessage(t *testing.T, state StateType) { // Reference: section 5.1 func TestRejectStaleTermMessage(t *testing.T) { called := false - fakeStep := func(r *raft, m pb.Message) { + fakeStep := func(r *raft, m pb.Message) error { called = true + return nil } r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) r.step = fakeStep diff --git a/raft/raft_test.go b/raft/raft_test.go index b8ef596ea57b..e8be5e645c44 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1231,8 +1231,9 @@ func TestPastElectionTimeout(t *testing.T) { // actual stepX function. func TestStepIgnoreOldTermMsg(t *testing.T) { called := false - fakeStep := func(r *raft, m pb.Message) { + fakeStep := func(r *raft, m pb.Message) error { called = true + return nil } sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) sm.step = fakeStep @@ -3219,6 +3220,10 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { } nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + if err != ErrProposalDropped { + t.Fatalf("should return drop proposal error while transferring") + } if lead.prs[1].Match != 1 { t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 4ccf72de45a7..f034521be575 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -190,8 +190,9 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { // to the underlying raft. It also ensures that ReadState can be read out. func TestRawNodeReadIndex(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}