From 66c8ddcf3b01443e7128cea1e564f553febe6016 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 27 Jun 2016 23:02:18 -0700 Subject: [PATCH 1/9] Takes ServerAddress into existing public interface and into transports. --- inmem_transport.go | 48 ++++++++++++++++++++++------------------------ integ_test.go | 4 ++-- net_transport.go | 32 +++++++++++++++---------------- raft.go | 15 +++++++-------- raft_test.go | 30 ++++++++++++++--------------- replication.go | 14 +++++++------- tcp_transport.go | 4 ++-- transport.go | 24 +++++++++++------------ transport_test.go | 2 +- 9 files changed, 85 insertions(+), 88 deletions(-) diff --git a/inmem_transport.go b/inmem_transport.go index 2d5f31906..3693cd5ad 100644 --- a/inmem_transport.go +++ b/inmem_transport.go @@ -9,15 +9,15 @@ import ( // NewInmemAddr returns a new in-memory addr with // a randomly generate UUID as the ID. -func NewInmemAddr() string { - return generateUUID() +func NewInmemAddr() ServerAddress { + return ServerAddress(generateUUID()) } // inmemPipeline is used to pipeline requests for the in-mem transport. type inmemPipeline struct { trans *InmemTransport peer *InmemTransport - peerAddr string + peerAddr ServerAddress doneCh chan AppendFuture inprogressCh chan *inmemPipelineInflight @@ -37,22 +37,22 @@ type inmemPipelineInflight struct { type InmemTransport struct { sync.RWMutex consumerCh chan RPC - localAddr string - peers map[string]*InmemTransport + localAddr ServerAddress + peers map[ServerAddress]*InmemTransport pipelines []*inmemPipeline timeout time.Duration } // NewInmemTransport is used to initialize a new transport // and generates a random local address if none is specified -func NewInmemTransport(addr string) (string, *InmemTransport) { - if addr == "" { +func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) { + if string(addr) == "" { addr = NewInmemAddr() } trans := &InmemTransport{ consumerCh: make(chan RPC, 16), localAddr: addr, - peers: make(map[string]*InmemTransport), + peers: make(map[ServerAddress]*InmemTransport), timeout: 50 * time.Millisecond, } return addr, trans @@ -69,13 +69,13 @@ func (i *InmemTransport) Consumer() <-chan RPC { } // LocalAddr implements the Transport interface. -func (i *InmemTransport) LocalAddr() string { +func (i *InmemTransport) LocalAddr() ServerAddress { return i.localAddr } // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. -func (i *InmemTransport) AppendEntriesPipeline(target string) (AppendPipeline, error) { +func (i *InmemTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) { i.RLock() peer, ok := i.peers[target] i.RUnlock() @@ -90,7 +90,7 @@ func (i *InmemTransport) AppendEntriesPipeline(target string) (AppendPipeline, e } // AppendEntries implements the Transport interface. -func (i *InmemTransport) AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { +func (i *InmemTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { rpcResp, err := i.makeRPC(target, args, nil, i.timeout) if err != nil { return err @@ -103,7 +103,7 @@ func (i *InmemTransport) AppendEntries(target string, args *AppendEntriesRequest } // RequestVote implements the Transport interface. -func (i *InmemTransport) RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error { +func (i *InmemTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { rpcResp, err := i.makeRPC(target, args, nil, i.timeout) if err != nil { return err @@ -116,7 +116,7 @@ func (i *InmemTransport) RequestVote(target string, args *RequestVoteRequest, re } // InstallSnapshot implements the Transport interface. -func (i *InmemTransport) InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { +func (i *InmemTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout) if err != nil { return err @@ -128,7 +128,7 @@ func (i *InmemTransport) InstallSnapshot(target string, args *InstallSnapshotReq return nil } -func (i *InmemTransport) makeRPC(target string, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) { +func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) { i.RLock() peer, ok := i.peers[target] i.RUnlock() @@ -158,21 +158,19 @@ func (i *InmemTransport) makeRPC(target string, args interface{}, r io.Reader, t return } -// EncodePeer implements the Transport interface. It uses the UUID as the -// address directly. -func (i *InmemTransport) EncodePeer(p string) []byte { +// EncodePeer implements the Transport interface. +func (i *InmemTransport) EncodePeer(p ServerAddress) []byte { return []byte(p) } -// DecodePeer implements the Transport interface. It wraps the UUID in an -// InmemAddr. -func (i *InmemTransport) DecodePeer(buf []byte) string { - return string(buf) +// DecodePeer implements the Transport interface. +func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress { + return ServerAddress(buf) } // Connect is used to connect this transport to another transport for // a given peer name. This allows for local routing. -func (i *InmemTransport) Connect(peer string, t Transport) { +func (i *InmemTransport) Connect(peer ServerAddress, t Transport) { trans := t.(*InmemTransport) i.Lock() defer i.Unlock() @@ -180,7 +178,7 @@ func (i *InmemTransport) Connect(peer string, t Transport) { } // Disconnect is used to remove the ability to route to a given peer. -func (i *InmemTransport) Disconnect(peer string) { +func (i *InmemTransport) Disconnect(peer ServerAddress) { i.Lock() defer i.Unlock() delete(i.peers, peer) @@ -202,7 +200,7 @@ func (i *InmemTransport) Disconnect(peer string) { func (i *InmemTransport) DisconnectAll() { i.Lock() defer i.Unlock() - i.peers = make(map[string]*InmemTransport) + i.peers = make(map[ServerAddress]*InmemTransport) // Handle pipelines for _, pipeline := range i.pipelines { @@ -217,7 +215,7 @@ func (i *InmemTransport) Close() error { return nil } -func newInmemPipeline(trans *InmemTransport, peer *InmemTransport, addr string) *inmemPipeline { +func newInmemPipeline(trans *InmemTransport, peer *InmemTransport, addr ServerAddress) *inmemPipeline { i := &inmemPipeline{ trans: trans, peer: peer, diff --git a/integ_test.go b/integ_test.go index 1c40fedc2..c7b4f6438 100644 --- a/integ_test.go +++ b/integ_test.go @@ -238,8 +238,8 @@ func TestRaft_Integ(t *testing.T) { } // Remove the old nodes - NoErr(WaitFuture(leader.raft.RemovePeer(string(rm1.raft.localAddr)), t), t) - NoErr(WaitFuture(leader.raft.RemovePeer(string(rm2.raft.localAddr)), t), t) + NoErr(WaitFuture(leader.raft.RemovePeer(rm1.raft.localAddr), t), t) + NoErr(WaitFuture(leader.raft.RemovePeer(rm2.raft.localAddr), t), t) // Shoot the leader env1.Release() diff --git a/net_transport.go b/net_transport.go index 9eb4fe054..c83d44cd6 100644 --- a/net_transport.go +++ b/net_transport.go @@ -56,7 +56,7 @@ is not known if there is an error. */ type NetworkTransport struct { - connPool map[string][]*netConn + connPool map[ServerAddress][]*netConn connPoolLock sync.Mutex consumeCh chan RPC @@ -84,11 +84,11 @@ type StreamLayer interface { net.Listener // Dial is used to create a new outgoing connection - Dial(address string, timeout time.Duration) (net.Conn, error) + Dial(address ServerAddress, timeout time.Duration) (net.Conn, error) } type netConn struct { - target string + target ServerAddress conn net.Conn r *bufio.Reader w *bufio.Writer @@ -142,7 +142,7 @@ func NewNetworkTransportWithLogger( logger = log.New(os.Stderr, "", log.LstdFlags) } trans := &NetworkTransport{ - connPool: make(map[string][]*netConn), + connPool: make(map[ServerAddress][]*netConn), consumeCh: make(chan RPC), logger: logger, maxPool: maxPool, @@ -183,8 +183,8 @@ func (n *NetworkTransport) Consumer() <-chan RPC { } // LocalAddr implements the Transport interface. -func (n *NetworkTransport) LocalAddr() string { - return n.stream.Addr().String() +func (n *NetworkTransport) LocalAddr() ServerAddress { + return ServerAddress(n.stream.Addr().String()) } // IsShutdown is used to check if the transport is shutdown. @@ -198,7 +198,7 @@ func (n *NetworkTransport) IsShutdown() bool { } // getExistingConn is used to grab a pooled connection. -func (n *NetworkTransport) getPooledConn(target string) *netConn { +func (n *NetworkTransport) getPooledConn(target ServerAddress) *netConn { n.connPoolLock.Lock() defer n.connPoolLock.Unlock() @@ -215,7 +215,7 @@ func (n *NetworkTransport) getPooledConn(target string) *netConn { } // getConn is used to get a connection from the pool. -func (n *NetworkTransport) getConn(target string) (*netConn, error) { +func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) { // Check for a pooled conn if conn := n.getPooledConn(target); conn != nil { return conn, nil @@ -260,7 +260,7 @@ func (n *NetworkTransport) returnConn(conn *netConn) { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. -func (n *NetworkTransport) AppendEntriesPipeline(target string) (AppendPipeline, error) { +func (n *NetworkTransport) AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) { // Get a connection conn, err := n.getConn(target) if err != nil { @@ -272,17 +272,17 @@ func (n *NetworkTransport) AppendEntriesPipeline(target string) (AppendPipeline, } // AppendEntries implements the Transport interface. -func (n *NetworkTransport) AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { +func (n *NetworkTransport) AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error { return n.genericRPC(target, rpcAppendEntries, args, resp) } // RequestVote implements the Transport interface. -func (n *NetworkTransport) RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error { +func (n *NetworkTransport) RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error { return n.genericRPC(target, rpcRequestVote, args, resp) } // genericRPC handles a simple request/response RPC. -func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interface{}, resp interface{}) error { +func (n *NetworkTransport) genericRPC(target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { // Get a conn conn, err := n.getConn(target) if err != nil { @@ -308,7 +308,7 @@ func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interfa } // InstallSnapshot implements the Transport interface. -func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { +func (n *NetworkTransport) InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { // Get a conn, always close for InstallSnapshot conn, err := n.getConn(target) if err != nil { @@ -346,13 +346,13 @@ func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotR } // EncodePeer implements the Transport interface. -func (n *NetworkTransport) EncodePeer(p string) []byte { +func (n *NetworkTransport) EncodePeer(p ServerAddress) []byte { return []byte(p) } // DecodePeer implements the Transport interface. -func (n *NetworkTransport) DecodePeer(buf []byte) string { - return string(buf) +func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress { + return ServerAddress(buf) } // listen is used to handling incoming connections. diff --git a/raft.go b/raft.go index da11afab5..05ffa12c3 100644 --- a/raft.go +++ b/raft.go @@ -353,10 +353,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna // Leader is used to return the current leader of the cluster. // It may return empty string if there is no current leader // or the leader is unknown. -func (r *Raft) Leader() string { - // TODO: change return type to ServerAddress? +func (r *Raft) Leader() ServerAddress { r.leaderLock.RLock() - leader := string(r.leader) + leader := r.leader r.leaderLock.RUnlock() return leader } @@ -452,15 +451,15 @@ func (r *Raft) VerifyLeader() Future { // AddPeer (deprecated) is used to add a new peer into the cluster. This must be // run on the leader or it will fail. Use AddVoter/AddNonvoter instead. -func (r *Raft) AddPeer(peer string) Future { - return r.AddVoter(ServerID(peer), ServerAddress(peer), 0, 0) +func (r *Raft) AddPeer(peer ServerAddress) Future { + return r.AddVoter(ServerID(peer), peer, 0, 0) } // RemovePeer (deprecated) is used to remove a peer from the cluster. If the // current leader is being removed, it will cause a new election // to occur. This must be run on the leader or it will fail. // Use RemoveServer instead. -func (r *Raft) RemovePeer(peer string) Future { +func (r *Raft) RemovePeer(peer ServerAddress) Future { return r.RemoveServer(ServerID(peer), 0, 0) } @@ -1866,7 +1865,7 @@ func (r *Raft) electSelf() <-chan *voteResult { lastIdx, lastTerm := r.getLastEntry() req := &RequestVoteRequest{ Term: r.getCurrentTerm(), - Candidate: r.trans.EncodePeer(string(r.localAddr)), + Candidate: r.trans.EncodePeer(r.localAddr), LastLogIndex: lastIdx, LastLogTerm: lastTerm, } @@ -1876,7 +1875,7 @@ func (r *Raft) electSelf() <-chan *voteResult { r.goFunc(func() { defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now()) resp := &voteResult{voterID: peer.ID} - err := r.trans.RequestVote(string(peer.Address), req, &resp.RequestVoteResponse) + err := r.trans.RequestVote(peer.Address, req, &resp.RequestVoteResponse) if err != nil { r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err) resp.Term = req.Term diff --git a/raft_test.go b/raft_test.go index 6eb0638ec..241ace20f 100644 --- a/raft_test.go +++ b/raft_test.go @@ -405,10 +405,10 @@ func (c *cluster) FullyConnect() { func (c *cluster) Disconnect(a ServerAddress) { c.logger.Printf("[DEBUG] Disconnecting %v", a) for _, t := range c.trans { - if t.LocalAddr() == string(a) { + if t.LocalAddr() == a { t.DisconnectAll() } else { - t.Disconnect(string(a)) + t.Disconnect(a) } } } @@ -966,7 +966,7 @@ func TestRaft_JoinNode(t *testing.T) { c.FullyConnect() // Join the new node in - future := c.Leader().AddPeer(string(c1.rafts[0].localAddr)) + future := c.Leader().AddPeer(c1.rafts[0].localAddr) if err := future.Error(); err != nil { c.FailNowf("[ERR] err: %v", err) } @@ -1002,7 +1002,7 @@ func TestRaft_RemoveFollower(t *testing.T) { // Remove a follower follower := followers[0] - future := leader.RemovePeer(string(follower.localAddr)) + future := leader.RemovePeer(follower.localAddr) if err := future.Error(); err != nil { c.FailNowf("[ERR] err: %v", err) } @@ -1039,7 +1039,7 @@ func TestRaft_RemoveLeader(t *testing.T) { } // Remove the leader - f := leader.RemovePeer(string(leader.localAddr)) + f := leader.RemovePeer(leader.localAddr) // Wait for the future to complete if f.Error() != nil { @@ -1088,7 +1088,7 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { for i := byte(0); i < 100; i++ { future := leader.Apply([]byte{i}, 0) if i == 80 { - removeFuture = leader.RemovePeer(string(leader.localAddr)) + removeFuture = leader.RemovePeer(leader.localAddr) } if i > 80 { if err := future.Error(); err == nil || err != ErrNotLeader { @@ -1144,7 +1144,7 @@ func TestRaft_RemoveLeader_SplitCluster(t *testing.T) { leader := c.Leader() // Remove the leader - leader.RemovePeer(string(leader.localAddr)) + leader.RemovePeer(leader.localAddr) // Wait until we have 2 leaders limit := time.Now().Add(c.longstopTimeout) @@ -1175,7 +1175,7 @@ func TestRaft_AddKnownPeer(t *testing.T) { startingConfigIdx := leader.configurations.committedIndex // Add a follower - future := leader.AddPeer(string(followers[0].localAddr)) + future := leader.AddPeer(followers[0].localAddr) // shouldn't error, configuration should end up the same as it was. // Should be already added @@ -1324,7 +1324,7 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { } // Change the peer addresses - peers := []string{leader.trans.LocalAddr()} + peers := []ServerAddress{leader.trans.LocalAddr()} for _, sec := range c2.rafts { peers = append(peers, sec.trans.LocalAddr()) } @@ -1537,7 +1537,7 @@ func TestRaft_ReJoinFollower(t *testing.T) { // Remove a follower follower := followers[0] - future := leader.RemovePeer(string(follower.localAddr)) + future := leader.RemovePeer(follower.localAddr) if err := future.Error(); err != nil { c.FailNowf("[ERR] err: %v", err) } @@ -1560,7 +1560,7 @@ func TestRaft_ReJoinFollower(t *testing.T) { // Rejoin. The follower will have a higher term than the leader, // this will cause the leader to step down, and a new round of elections // to take place. We should eventually re-stabilize. - future = leader.AddPeer(string(follower.localAddr)) + future = leader.AddPeer(follower.localAddr) if err := future.Error(); err != nil && err != ErrLeadershipLost { c.FailNowf("[ERR] err: %v", err) } @@ -1856,21 +1856,21 @@ func TestRaft_Voting(t *testing.T) { reqVote := RequestVoteRequest{ Term: ldr.getCurrentTerm() + 10, - Candidate: ldrT.EncodePeer(string(ldr.localAddr)), + Candidate: ldrT.EncodePeer(ldr.localAddr), LastLogIndex: ldr.LastIndex(), LastLogTerm: ldr.getCurrentTerm(), } // a follower that thinks there's a leader should vote for that leader. var resp RequestVoteResponse - if err := ldrT.RequestVote(string(followers[0].localAddr), &reqVote, &resp); err != nil { + if err := ldrT.RequestVote(followers[0].localAddr, &reqVote, &resp); err != nil { c.FailNowf("[ERR] RequestVote RPC failed %v", err) } if !resp.Granted { c.FailNowf("[ERR] expected vote to be granted, but wasn't %+v", resp) } // a follow that thinks there's a leader shouldn't vote for a different candidate - reqVote.Candidate = ldrT.EncodePeer(string(followers[0].localAddr)) - if err := ldrT.RequestVote(string(followers[1].localAddr), &reqVote, &resp); err != nil { + reqVote.Candidate = ldrT.EncodePeer(followers[0].localAddr) + if err := ldrT.RequestVote(followers[1].localAddr, &reqVote, &resp); err != nil { c.FailNowf("[ERR] RequestVote RPC failed %v", err) } if resp.Granted { diff --git a/replication.go b/replication.go index f2fb62db5..75fefc2d7 100644 --- a/replication.go +++ b/replication.go @@ -183,7 +183,7 @@ START: // Make the RPC call start = time.Now() - if err := r.trans.AppendEntries(string(s.peer.Address), &req, &resp); err != nil { + if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil { r.logger.Printf("[ERR] raft: Failed to AppendEntries to %v: %v", s.peer, err) s.failures++ return @@ -276,7 +276,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Setup the request req := InstallSnapshotRequest{ Term: s.currentTerm, - Leader: r.trans.EncodePeer(string(r.localAddr)), + Leader: r.trans.EncodePeer(r.localAddr), LastLogIndex: meta.Index, LastLogTerm: meta.Term, Peers: meta.Peers, @@ -288,7 +288,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Make the call start := time.Now() var resp InstallSnapshotResponse - if err := r.trans.InstallSnapshot(string(s.peer.Address), &req, &resp, snapshot); err != nil { + if err := r.trans.InstallSnapshot(s.peer.Address, &req, &resp, snapshot); err != nil { r.logger.Printf("[ERR] raft: Failed to install snapshot %v: %v", snapID, err) s.failures++ return false, err @@ -329,7 +329,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { var failures uint64 req := AppendEntriesRequest{ Term: s.currentTerm, - Leader: r.trans.EncodePeer(string(r.localAddr)), + Leader: r.trans.EncodePeer(r.localAddr), } var resp AppendEntriesResponse for { @@ -342,7 +342,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { } start := time.Now() - if err := r.trans.AppendEntries(string(s.peer.Address), &req, &resp); err != nil { + if err := r.trans.AppendEntries(s.peer.Address, &req, &resp); err != nil { r.logger.Printf("[ERR] raft: Failed to heartbeat to %v: %v", s.peer.Address, err) failures++ select { @@ -364,7 +364,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { // back to the standard replication which can handle more complex situations. func (r *Raft) pipelineReplicate(s *followerReplication) error { // Create a new pipeline - pipeline, err := r.trans.AppendEntriesPipeline(string(s.peer.Address)) + pipeline, err := r.trans.AppendEntriesPipeline(s.peer.Address) if err != nil { return err } @@ -472,7 +472,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, // setupAppendEntries is used to setup an append entries request. func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { req.Term = s.currentTerm - req.Leader = r.trans.EncodePeer(string(r.localAddr)) + req.Leader = r.trans.EncodePeer(r.localAddr) req.LeaderCommitIndex = r.getCommitIndex() if err := r.setPreviousLog(req, nextIndex); err != nil { return err diff --git a/tcp_transport.go b/tcp_transport.go index 50c6d15df..9281508a0 100644 --- a/tcp_transport.go +++ b/tcp_transport.go @@ -81,8 +81,8 @@ func newTCPTransport(bindAddr string, } // Dial implements the StreamLayer interface. -func (t *TCPStreamLayer) Dial(address string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("tcp", address, timeout) +func (t *TCPStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("tcp", string(address), timeout) } // Accept implements the net.Listener interface. diff --git a/transport.go b/transport.go index 24c8405c4..c1a814967 100644 --- a/transport.go +++ b/transport.go @@ -31,27 +31,27 @@ type Transport interface { Consumer() <-chan RPC // LocalAddr is used to return our local address to distinguish from our peers. - LocalAddr() string + LocalAddr() ServerAddress // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. - AppendEntriesPipeline(target string) (AppendPipeline, error) + AppendEntriesPipeline(target ServerAddress) (AppendPipeline, error) // AppendEntries sends the appropriate RPC to the target node. - AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error + AppendEntries(target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error // RequestVote sends the appropriate RPC to the target node. - RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error + RequestVote(target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error // InstallSnapshot is used to push a snapshot down to a follower. The data is read from // the ReadCloser and streamed to the client. - InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error + InstallSnapshot(target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error - // EncodePeer is used to serialize a peer name. - EncodePeer(string) []byte + // EncodePeer is used to serialize a peer's address. + EncodePeer(ServerAddress) []byte - // DecodePeer is used to deserialize a peer name. - DecodePeer([]byte) string + // DecodePeer is used to deserialize a peer's address. + DecodePeer([]byte) ServerAddress // SetHeartbeatHandler is used to setup a heartbeat handler // as a fast-pass. This is to avoid head-of-line blocking from @@ -79,9 +79,9 @@ type LoopbackTransport interface { // disconnection. Unless the transport is a loopback transport, the transport specified to // "Connect" is likely to be nil. type WithPeers interface { - Connect(peer string, t Transport) // Connect a peer - Disconnect(peer string) // Disconnect a given peer - DisconnectAll() // Disconnect all peers, possibly to reconnect them later + Connect(peer ServerAddress, t Transport) // Connect a peer + Disconnect(peer ServerAddress) // Disconnect a given peer + DisconnectAll() // Disconnect all peers, possibly to reconnect them later } // AppendPipeline is used for pipelining AppendEntries requests. It is used diff --git a/transport_test.go b/transport_test.go index 9d2151bf7..7104b840c 100644 --- a/transport_test.go +++ b/transport_test.go @@ -12,7 +12,7 @@ const ( TT_MAX ) -func NewTestTransport(ttype int, addr string) (string, LoopbackTransport) { +func NewTestTransport(ttype int, addr ServerAddress) (ServerAddress, LoopbackTransport) { switch ttype { case TT_INMEM: addr, lt := NewInmemTransport(addr) From b7e8a11b96add650933f0d9a2eccf9a0729c6125 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 27 Jun 2016 23:05:32 -0700 Subject: [PATCH 2/9] Removes unused leader observation. --- observer.go | 5 ----- raft.go | 4 ---- 2 files changed, 9 deletions(-) diff --git a/observer.go b/observer.go index 5bc5496e2..651c55c4e 100644 --- a/observer.go +++ b/observer.go @@ -10,11 +10,6 @@ type Observation struct { Data interface{} } -// LeaderObservation is used for the data when leadership changes. -type LeaderObservation struct { - leader string -} - // nextObserverId is used to provide a unique ID for each observer to aid in // deregistration. var nextObserverId uint64 diff --git a/raft.go b/raft.go index 05ffa12c3..a94339e5d 100644 --- a/raft.go +++ b/raft.go @@ -363,12 +363,8 @@ func (r *Raft) Leader() ServerAddress { // setLeader is used to modify the current leader of the cluster func (r *Raft) setLeader(leader ServerAddress) { r.leaderLock.Lock() - oldLeader := r.leader r.leader = leader r.leaderLock.Unlock() - if oldLeader != leader { - r.observe(LeaderObservation{leader: string(leader)}) - } } // Apply is used to apply a command to the FSM in a highly consistent From 571e8c3482225a093ebb4ecbb0df10cdcd33fc9b Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 27 Jun 2016 23:12:41 -0700 Subject: [PATCH 3/9] Tweaks warning message. --- raft.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/raft.go b/raft.go index a94339e5d..865d1b314 100644 --- a/raft.go +++ b/raft.go @@ -270,9 +270,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna localAddr := ServerAddress(trans.LocalAddr()) localID := conf.LocalID if localID == "" { - logger.Printf("[WARN] raft: No server ID given, using network address: %v", + logger.Printf("[WARN] raft: No server ID given, using network address: %v. This default will be removed in the future. Set server ID explicitly in config.", localAddr) - logger.Printf("[WARN] raft: This default will be removed in the future. Set server ID explicitly in Config") localID = ServerID(localAddr) } From c306a8c2aaee193867cea32e4e9af7f0b892d65e Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 28 Jun 2016 11:53:42 -0700 Subject: [PATCH 4/9] Fixes TestRaft_RemoveLeader_NoShutdown. --- raft_test.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/raft_test.go b/raft_test.go index 241ace20f..cac433256 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1072,7 +1072,6 @@ func TestRaft_RemoveLeader(t *testing.T) { } func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { - return // TODO: fix broken test // Make a cluster conf := inmemConfig(t) conf.ShutdownOnRemove = false @@ -1084,12 +1083,14 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { leader := c.Leader() // Remove the leader - var removeFuture Future for i := byte(0); i < 100; i++ { - future := leader.Apply([]byte{i}, 0) if i == 80 { - removeFuture = leader.RemovePeer(leader.localAddr) + removeFuture := leader.RemoveServer(leader.localID, 0, 0) + if err := removeFuture.Error(); err != nil { + c.FailNowf("[ERR] err: %v, remove leader failed", err) + } } + future := leader.Apply([]byte{i}, 0) if i > 80 { if err := future.Error(); err == nil || err != ErrNotLeader { c.FailNowf("[ERR] err: %v, future entries should fail", err) @@ -1097,10 +1098,6 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { } } - if err := removeFuture.Error(); err != nil { - c.FailNowf("[ERR] RemovePeer failed with error %v", err) - } - // Wait a while time.Sleep(c.propagateTimeout) @@ -1110,19 +1107,25 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { // Wait a bit for log application time.Sleep(c.propagateTimeout) - // Other nodes should have fewer peers - if len(newLeader.configurations.latest.Servers) != 3 { + // Other nodes should have pulled the leader. + if len(newLeader.configurations.latest.Servers) != 2 { c.FailNowf("[ERR] too many peers") } + if hasVote(newLeader.configurations.latest, leader.localID) { + c.FailNowf("[ERR] old leader should no longer have a vote") + } - // Old leader should be a follower + // Old leader should be a follower. if leader.State() != Follower { c.FailNowf("[ERR] leader should be shutdown") } - // Old leader should have no peers + // Old leader should not include itself in its peers. if len(leader.configurations.latest.Servers) != 2 { - c.FailNowf("[ERR] leader should have no peers") + c.FailNowf("[ERR] too many peers") + } + if hasVote(leader.configurations.latest, leader.localID) { + c.FailNowf("[ERR] old leader should no longer have a vote") } // Other nodes should have the same state From 5659e2306bb309830cfa99184587b548f57faf91 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 28 Jun 2016 12:05:17 -0700 Subject: [PATCH 5/9] Removes TestRaft_RemoveLeader_SplitCluster. Prior to the cluster membership refactor, a leader that removes itself from the cluster would have just itself as a peer, so it could continue operation on its own (it would never be able to disrupt the old quorum). Now the server will be left in a safer configuration where it doesn't have itself as a peer, so it will remain as a follower and not initiate a vote. The TestRaft_RemoveLeader_NoShutdown already verifies that the old leader remains in the follower state, so we remove this test since the behavior has now changed. --- raft_test.go | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/raft_test.go b/raft_test.go index cac433256..3717efa5c 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1132,40 +1132,6 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { c.EnsureSame(t) } -func TestRaft_RemoveLeader_SplitCluster(t *testing.T) { - return // TODO: fix broken test - // Enable operation after a remove - conf := inmemConfig(t) - conf.ShutdownOnRemove = false - - // Make a cluster - c := MakeCluster(3, t, conf) - defer c.Close() - - // Get the leader - c.Followers() - leader := c.Leader() - - // Remove the leader - leader.RemovePeer(leader.localAddr) - - // Wait until we have 2 leaders - limit := time.Now().Add(c.longstopTimeout) - var leaders []*Raft - for time.Now().Before(limit) && len(leaders) != 2 { - c.WaitEvent(nil, c.conf.CommitTimeout) - leaders = c.GetInState(Leader) - } - if len(leaders) != 2 { - c.FailNowf("[ERR] expected two leader: %v", leaders) - } - - // Old leader should have no peers - if len(leader.configurations.latest.Servers) != 1 { - c.FailNowf("[ERR] leader should have no peers") - } -} - func TestRaft_AddKnownPeer(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil) From 8656156207a7cb23a11ccf5b1c5d59d08b92a955 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 28 Jun 2016 22:02:33 -0700 Subject: [PATCH 6/9] Fixes TestRaft_ReJoinFollower. --- raft_test.go | 48 ++++++++++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/raft_test.go b/raft_test.go index 3717efa5c..1f5ed7231 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1481,8 +1481,7 @@ func TestRaft_SendSnapshotAndLogsFollower(t *testing.T) { } func TestRaft_ReJoinFollower(t *testing.T) { - return // TODO: fix broken test - // Enable operation after a remove + // Enable operation after a remove. conf := inmemConfig(t) conf.ShutdownOnRemove = false @@ -1490,10 +1489,10 @@ func TestRaft_ReJoinFollower(t *testing.T) { c := MakeCluster(3, t, conf) defer c.Close() - // Get the leader + // Get the leader. leader := c.Leader() - // Wait until we have 2 followers + // Wait until we have 2 followers. limit := time.Now().Add(c.longstopTimeout) var followers []*Raft for time.Now().Before(limit) && len(followers) != 2 { @@ -1504,27 +1503,36 @@ func TestRaft_ReJoinFollower(t *testing.T) { c.FailNowf("[ERR] expected two followers: %v", followers) } - // Remove a follower + // Remove a follower. follower := followers[0] future := leader.RemovePeer(follower.localAddr) if err := future.Error(); err != nil { c.FailNowf("[ERR] err: %v", err) } - // Wait a while + // Other nodes should have fewer peers. time.Sleep(c.propagateTimeout) - - // Other nodes should have fewer peers - if len(leader.configurations.latest.Servers) != 3 { + if len(leader.configurations.latest.Servers) != 2 { c.FailNowf("[ERR] too many peers: %v", leader.configurations) } - if len(followers[1].configurations.latest.Servers) != 3 { + if len(followers[1].configurations.latest.Servers) != 2 { c.FailNowf("[ERR] too many peers: %v", followers[1].configurations) } - // Get the leader - time.Sleep(c.propagateTimeout) - leader = c.Leader() + // Get the leader. We can't use the normal stability checker here because + // the removed server will be trying to run an election but will be + // ignored. The stability check will think this is off nominal because + // the RequestVote RPCs won't stop firing. + limit = time.Now().Add(c.longstopTimeout) + var leaders []*Raft + for time.Now().Before(limit) && len(leaders) != 1 { + c.WaitEvent(nil, c.conf.CommitTimeout) + leaders, _ = c.pollState(Leader) + } + if len(leaders) != 1 { + c.FailNowf("[ERR] expected a leader") + } + leader = leaders[0] // Rejoin. The follower will have a higher term than the leader, // this will cause the leader to step down, and a new round of elections @@ -1534,18 +1542,18 @@ func TestRaft_ReJoinFollower(t *testing.T) { c.FailNowf("[ERR] err: %v", err) } - // Wait a while - time.Sleep(c.propagateTimeout) - - // Other nodes should have fewer peers - if len(leader.configurations.latest.Servers) != 4 { + // We should level back up to the proper number of peers. We add a + // stability check here to make sure the cluster gets to a state where + // there's a solid leader. + leader = c.Leader() + if len(leader.configurations.latest.Servers) != 3 { c.FailNowf("[ERR] missing peers: %v", leader.configurations) } - if len(followers[1].configurations.latest.Servers) != 4 { + if len(followers[1].configurations.latest.Servers) != 3 { c.FailNowf("[ERR] missing peers: %v", followers[1].configurations) } - // Should be a follower now + // Should be a follower now. if follower.State() != Follower { c.FailNowf("[ERR] bad state: %v", follower.State()) } From 8a42cf10f0af590be3cf0a5d6887b3040931cc6c Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 28 Jun 2016 22:03:34 -0700 Subject: [PATCH 7/9] Adds a comment about TestRaft_SnapshotRestore_PeerChange. --- raft_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/raft_test.go b/raft_test.go index 1f5ed7231..ab7d2bef4 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1250,7 +1250,10 @@ func TestRaft_SnapshotRestore(t *testing.T) { // TODO: Need a test that has a previous format Snapshot and check that it can be read/installed on the new code. func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { - return // TODO: fix broken test + // TODO - Fix broken test. This needs a story about how we recover and + // manually let the operator adjust the quorum before we can proceed. + return + // Make the cluster conf := inmemConfig(t) conf.TrailingLogs = 10 From 43306980d5878473ae03f691bb815ed670e9c5ff Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 28 Jun 2016 22:12:11 -0700 Subject: [PATCH 8/9] Fixes TestRaft_Integ. --- integ_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/integ_test.go b/integ_test.go index c7b4f6438..e93cb1e80 100644 --- a/integ_test.go +++ b/integ_test.go @@ -74,6 +74,17 @@ func MakeRaft(t *testing.T, conf *Config) *RaftEnv { } env.trans = trans + var configuration Configuration + configuration.Servers = append(configuration.Servers, Server{ + Suffrage: Voter, + ID: ServerID(trans.LocalAddr()), + Address: trans.LocalAddr(), + }) + err = BootstrapCluster(conf, stable, stable, snap, configuration) + if err != nil { + t.Fatalf("err: %v", err) + } + log.Printf("[INFO] Starting node at %v", trans.LocalAddr()) raft, err := NewRaft(conf, env.fsm, stable, stable, snap, trans) if err != nil { From da052a772d4cdcf25be698119708e3add0049a18 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 28 Jun 2016 22:31:24 -0700 Subject: [PATCH 9/9] Adds a partitioned cluster config change test. --- raft_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/raft_test.go b/raft_test.go index ab7d2bef4..97c08d837 100644 --- a/raft_test.go +++ b/raft_test.go @@ -413,6 +413,41 @@ func (c *cluster) Disconnect(a ServerAddress) { } } +// Partition keeps the given list of addresses connected but isolates them +// from the other members of the cluster. +func (c *cluster) Partition(far []ServerAddress) { + c.logger.Printf("[DEBUG] Partitioning %v", far) + + // Gather the set of nodes on the "near" side of the partition (we + // will call the supplied list of nodes the "far" side). + near := make(map[ServerAddress]struct{}) +OUTER: + for _, t := range c.trans { + l := t.LocalAddr() + for _, a := range far { + if l == a { + continue OUTER + } + } + near[l] = struct{}{} + } + + // Now fixup all the connections. The near side will be separated from + // the far side, and vice-versa. + for _, t := range c.trans { + l := t.LocalAddr() + if _, ok := near[l]; ok { + for _, a := range far { + t.Disconnect(a) + } + } else { + for a, _ := range near { + t.Disconnect(a) + } + } + } +} + // IndexOf returns the index of the given raft instance. func (c *cluster) IndexOf(r *Raft) int { for i, n := range c.rafts { @@ -1132,6 +1167,42 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { c.EnsureSame(t) } +func TestRaft_RemoveFollower_SplitCluster(t *testing.T) { + // Make a cluster. + conf := inmemConfig(t) + c := MakeCluster(4, t, conf) + defer c.Close() + + // Wait for a leader to get elected. + leader := c.Leader() + + // Wait to make sure knowledge of the 4th server is known to all the + // peers. + numServers := 0 + limit := time.Now().Add(c.longstopTimeout) + for time.Now().Before(limit) && numServers != 4 { + time.Sleep(c.propagateTimeout) + numServers = len(leader.configurations.latest.Servers) + } + if numServers != 4 { + c.FailNowf("[ERR] Leader should have 4 servers, got %d", numServers) + } + c.EnsureSamePeers(t) + + // Isolate two of the followers. + followers := c.Followers() + if len(followers) != 3 { + c.FailNowf("[ERR] Expected 3 followers, got %d", len(followers)) + } + c.Partition([]ServerAddress{followers[0].localAddr, followers[1].localAddr}) + + // Try to remove the remaining follower that was left with the leader. + future := leader.RemovePeer(followers[2].localAddr) + if err := future.Error(); err == nil { + c.FailNowf("[ERR] Should not have been able to make peer change") + } +} + func TestRaft_AddKnownPeer(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil)