Skip to content

Commit

Permalink
Merge pull request #255 from aaronlehmann/clockwork
Browse files Browse the repository at this point in the history
Use an artificial timebase for raft tests
  • Loading branch information
aaronlehmann committed Apr 7, 2016
2 parents baa2df2 + 547188b commit 87bac4f
Show file tree
Hide file tree
Showing 11 changed files with 801 additions and 226 deletions.
8 changes: 8 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 58 additions & 16 deletions manager/state/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"github.com/docker/swarm-v2/api"
"github.com/docker/swarm-v2/manager/state/pb"
"github.com/gogo/protobuf/proto"
"github.com/pivotal-golang/clock"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

const (
maxRequestBytes = 1.5 * 1024 * 1024
defaultProposeTimeout = 10 * time.Second
maxRequestBytes = 1.5 * 1024 * 1024
)

var (
Expand Down Expand Up @@ -115,14 +115,18 @@ type Node struct {
appliedIndex uint64
snapshotIndex uint64

ticker *time.Ticker
stopCh chan struct{}
doneCh chan struct{}
ticker clock.Ticker
sendTimeout time.Duration
stopCh chan struct{}
doneCh chan struct{}

leadershipCh chan LeadershipState
startNodePeers []raft.Peer

sends sync.WaitGroup

// used to coordinate shutdown
stopMu sync.RWMutex
}

// NewNodeOptions provides arguments for NewNode
Expand All @@ -144,6 +148,13 @@ type NewNodeOptions struct {
// LogEntriesForSlowFollowers is the number of recent log entries to
// keep when compacting the log.
LogEntriesForSlowFollowers *uint64 // optional; pointer because 0 is valid
// ClockSource is a Clock interface to use as a time base.
// Leave this nil except for tests that are designed not to run in real
// time.
ClockSource clock.Clock
// SendTimeout is the timeout on the sending messages to other raft
// nodes. Leave this as 0 to get the default value.
SendTimeout time.Duration
}

func init() {
Expand Down Expand Up @@ -178,12 +189,12 @@ func NewNode(ctx context.Context, opts NewNodeOptions, leadershipCh chan Leaders
},
snapshotInterval: 1000,
logEntriesForSlowFollowers: 500,
ticker: time.NewTicker(opts.TickInterval),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
stateDir: opts.StateDir,
joinAddr: opts.JoinAddr,
leadershipCh: leadershipCh,
sendTimeout: 2 * time.Second,
}
n.memoryStore = NewMemoryStore(n)

Expand All @@ -193,6 +204,14 @@ func NewNode(ctx context.Context, opts NewNodeOptions, leadershipCh chan Leaders
if opts.LogEntriesForSlowFollowers != nil {
n.logEntriesForSlowFollowers = *opts.LogEntriesForSlowFollowers
}
if opts.ClockSource == nil {
n.ticker = clock.NewClock().NewTicker(opts.TickInterval)
} else {
n.ticker = opts.ClockSource.NewTicker(opts.TickInterval)
}
if opts.SendTimeout != 0 {
n.sendTimeout = opts.SendTimeout
}

if err := n.loadAndStart(); err != nil {
n.ticker.Stop()
Expand Down Expand Up @@ -397,7 +416,7 @@ func (n *Node) Run() {
n.wait = newWait()
for {
select {
case <-n.ticker.C:
case <-n.ticker.C():
n.Tick()

case rd := <-n.Ready():
Expand Down Expand Up @@ -464,10 +483,19 @@ func (n *Node) Run() {

case <-n.stopCh:
n.sends.Wait()

n.stopMu.Lock()
members := n.cluster.listMembers()
for _, member := range members {
if member.Client != nil && member.Client.Conn != nil {
member.Client.Conn.Close()
}
}
n.Stop()
n.wal.Close()
n.Node = nil
close(n.doneCh)
n.stopMu.Unlock()
return
}
}
Expand Down Expand Up @@ -499,6 +527,14 @@ func (n *Node) Leader() uint64 {
// beginning the log replication process. This method
// is called from an aspiring member to an existing member
func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinResponse, error) {
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()

if n.Node == nil {
return nil, ErrStopped
}

meta, err := req.Node.Marshal()
if err != nil {
return nil, err
Expand Down Expand Up @@ -539,6 +575,14 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
// from a member who is willing to leave its raft
// membership to an active member of the raft
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()

if n.Node == nil {
return nil, ErrStopped
}

cc := raftpb.ConfChange{
ID: req.Node.ID,
Type: raftpb.ConfChangeRemoveNode,
Expand All @@ -559,12 +603,13 @@ func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResp
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg.Msg == nil {
panic("received nil message")
}
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if n.Node == nil {
panic("received RPC after raft node stopped")
return nil, ErrStopped
}

err := n.Step(n.Ctx, *msg.Msg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -618,10 +663,7 @@ func (n *Node) deregisterNode(id uint64) error {
return err
}

if err := peer.Client.Conn.Close(); err != nil {
return err
}

peer.Client.Conn.Close()
return nil
}

Expand Down Expand Up @@ -756,7 +798,7 @@ func (n *Node) restoreFromSnapshot(data []byte) error {
func (n *Node) send(messages []raftpb.Message) error {
members := n.cluster.listMembers()

ctx, _ := context.WithTimeout(n.Ctx, 2*time.Second)
ctx, _ := context.WithTimeout(n.Ctx, n.sendTimeout)

for _, m := range messages {
// Process locally
Expand Down
Loading

0 comments on commit 87bac4f

Please sign in to comment.