Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: use RawNode for node's event loop; clean up bootstrap #10892

Merged
merged 4 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,11 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
}
}

n = raft.StartNode(c, peers)
if len(peers) == 0 {
n = raft.RestartNode(c)
} else {
n = raft.StartNode(c, peers)
}
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
Expand Down
80 changes: 80 additions & 0 deletions raft/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package raft

import (
"errors"

pb "go.etcd.io/etcd/raft/raftpb"
)

// Bootstrap initializes the RawNode for first use by appending configuration
// changes for the supplied peers. This method returns an error if the Storage
// is nonempty.
//
// It is recommended that instead of calling this method, applications bootstrap
// their state manually by setting up a Storage that has a first index > 1 and
// which stores the desired ConfState as its InitialState.
func (rn *RawNode) Bootstrap(peers []Peer) error {
if len(peers) == 0 {
return errors.New("must provide at least one peer to Bootstrap")
}
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
if err != nil {
return err
}

if lastIndex != 0 {
return errors.New("can't bootstrap a nonempty Storage")
}

// We've faked out initial entries above, but nothing has been
// persisted. Start with an empty HardState (thus the first Ready will
// emit a HardState update for the app to persist).
rn.prevHardSt = emptyState

// TODO(tbg): remove StartNode and give the application the right tools to
// bootstrap the initial membership in a cleaner way.
rn.raft.becomeFollower(1, None)
ents := make([]pb.Entry, len(peers))
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
data, err := cc.Marshal()
if err != nil {
return err
}

ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
}
rn.raft.raftLog.append(ents...)

// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
//
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
}
return nil
}
121 changes: 33 additions & 88 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,52 +197,22 @@ type Peer struct {

// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
//
// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
// TODO(tbg): this should append the ConfChange for the own node first
// and also call applyConfChange below for that node first. Otherwise
// we have a Raft group (for a little while) that doesn't have itself
// in its config, which is bad.
// This whole way of setting things up is rickety. The app should just
// populate the initial ConfState appropriately and then all of this
// goes away.
e := pb.Entry{
Type: pb.EntryConfChange,
Term: 1,
Index: r.raftLog.lastIndex() + 1,
Data: d,
}
r.raftLog.append(e)
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
// Mark these initial entries as committed.
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
r.raftLog.committed = r.raftLog.lastIndex()
// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
for _, peer := range peers {
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
rn.Bootstrap(peers)

n := newNode()
n.logger = c.Logger
go n.run(r)

go n.run(rn)
return &n
}

Expand All @@ -251,11 +221,13 @@ func StartNode(c *Config, peers []Peer) Node {
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
r := newRaft(c)

rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
n := newNode()
n.logger = c.Logger
go n.run(r)
go n.run(rn)
return &n
}

Expand Down Expand Up @@ -310,30 +282,30 @@ func (n *node) Stop() {
<-n.done
}

func (n *node) run(r *raft) {
func (n *node) run(rn *RawNode) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready

r := rn.raft

lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState

for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
} else if rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = rn.Ready()
readyc = n.readyc
}

if lead != r.lead {
Expand Down Expand Up @@ -382,40 +354,13 @@ func (n *node) run(r *raft) {
case <-n.done:
}
case <-n.tickc:
r.tick()
rn.Tick()
case readyc <- rd:
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
}

r.msgs = nil
r.readStates = nil
r.reduceUncommittedSize(rd.CommittedEntries)
rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
rn.commitReady(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
Expand Down
4 changes: 2 additions & 2 deletions raft/node_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func BenchmarkOneNode(b *testing.B) {

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)

defer n.Stop()

Expand Down
Loading