From 19587a7352c8f41901f424d9c274d657daf48f59 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 18 Jul 2016 16:04:41 -0700 Subject: [PATCH] Adds basic protocol versioning for in-place upgrades. --- api.go | 93 ++++++++++++++++++++++++++++++------------- config.go | 30 ++++++++++++++ configuration.go | 26 +++++++++++- configuration_test.go | 19 ++++----- log.go | 3 +- raft.go | 48 +++++++++++++++++----- raft_test.go | 91 ++++++++++++++++++++++++++++++++++-------- 7 files changed, 243 insertions(+), 67 deletions(-) diff --git a/api.go b/api.go index 5fde8339a..6bb22b2da 100644 --- a/api.go +++ b/api.go @@ -42,13 +42,21 @@ var ( // ErrNothingNewToSnapshot is returned when trying to create a snapshot // but there's nothing new commited to the FSM since we started. - ErrNothingNewToSnapshot = errors.New("Nothing new to snapshot") + ErrNothingNewToSnapshot = errors.New("nothing new to snapshot") + + // ErrUnsupportedProtocol is returned when an operation is attempted + // that's not supported by the current protocol version. + ErrUnsupportedProtocol = errors.New("operation not supported with current protocol version") ) // Raft implements a Raft node. type Raft struct { raftState + // protocolVersion is used to inter-operate with older Raft servers. See + // ProtocolVersion in Config for more details. + protocolVersion uint8 + // applyCh is used to async send logs to the main thread to // be committed and applied to the FSM. applyCh chan *logFuture @@ -207,12 +215,12 @@ func BootstrapCluster(config *Config, logs LogStore, stable StableStore, snaps S // Raft node. func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) { - // Validate the configuration + // Validate the configuration. if err := ValidateConfig(conf); err != nil { return nil, err } - // Ensure we have a LogOutput + // Ensure we have a LogOutput. var logger *log.Logger if conf.Logger != nil { logger = conf.Logger @@ -223,19 +231,19 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna logger = log.New(conf.LogOutput, "", log.LstdFlags) } - // Try to restore the current term + // Try to restore the current term. currentTerm, err := stable.GetUint64(keyCurrentTerm) if err != nil && err.Error() != "not found" { return nil, fmt.Errorf("failed to load current term: %v", err) } - // Read the last log value + // Read the last log value. lastIdx, err := logs.LastIndex() if err != nil { return nil, fmt.Errorf("failed to find last log: %v", err) } - // Get the log + // Get the log. var lastLog Log if lastIdx > 0 { if err = logs.GetLog(lastIdx, &lastLog); err != nil { @@ -243,27 +251,33 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna } } + // Make sure we have a valid server address and ID. + protocolVersion := conf.ProtocolVersion localAddr := ServerAddress(trans.LocalAddr()) localID := conf.LocalID - if localID == "" { - logger.Printf("[WARN] raft: No server ID given, using network address: %v. This default will be removed in the future. Set server ID explicitly in config.", + if protocolVersion < 1 || localID == "" { + // During the transition to the new ID system, keep this as an + // INFO level message. Once the new scheme has been out for a + // while, change this to a deprecation WARN message. + logger.Printf("[INFO] raft: No server ID given, or ProtocolVersion < 1, using network address as server ID: %v", localAddr) localID = ServerID(localAddr) } - // Create Raft struct + // Create Raft struct. r := &Raft{ - applyCh: make(chan *logFuture), - conf: conf, - fsm: fsm, - fsmCommitCh: make(chan commitTuple, 128), - fsmRestoreCh: make(chan *restoreFuture), - fsmSnapshotCh: make(chan *reqSnapshotFuture), - leaderCh: make(chan bool), - localID: localID, - localAddr: localAddr, - logger: logger, - logs: logs, + protocolVersion: protocolVersion, + applyCh: make(chan *logFuture), + conf: conf, + fsm: fsm, + fsmCommitCh: make(chan commitTuple, 128), + fsmRestoreCh: make(chan *restoreFuture), + fsmSnapshotCh: make(chan *reqSnapshotFuture), + leaderCh: make(chan bool), + localID: localID, + localAddr: localAddr, + logger: logger, + logs: logs, configurationChangeCh: make(chan *configurationChangeFuture), configurations: configurations{}, rpcCh: trans.Consumer(), @@ -277,7 +291,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna observers: make(map[uint64]*Observer), } - // Initialize as a follower + // Initialize as a follower. r.setState(Follower) // Start as leader if specified. This should only be used @@ -287,16 +301,16 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna r.setLeader(r.localAddr) } - // Restore the current term and the last log + // Restore the current term and the last log. r.setCurrentTerm(currentTerm) r.setLastLog(lastLog.Index, lastLog.Term) - // Attempt to restore a snapshot if there are any + // Attempt to restore a snapshot if there are any. if err := r.restoreSnapshot(); err != nil { return nil, err } - // Scan through the log for any configuration change entries + // Scan through the log for any configuration change entries. snapshotIndex, _ := r.getLastSnapshot() for index := snapshotIndex + 1; index <= lastLog.Index; index++ { var entry Log @@ -319,7 +333,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna // to be called concurrently with a blocking RPC. trans.SetHeartbeatHandler(r.processHeartbeat) - // Start the background work + // Start the background work. r.goFunc(r.run) r.goFunc(r.runFSM) r.goFunc(r.runSnapshots) @@ -486,7 +500,12 @@ func (r *Raft) GetConfiguration() (Configuration, uint64, error) { // AddPeer (deprecated) is used to add a new peer into the cluster. This must be // run on the leader or it will fail. Use AddVoter/AddNonvoter instead. func (r *Raft) AddPeer(peer ServerAddress) Future { - return r.AddVoter(ServerID(peer), peer, 0, 0) + return r.requestConfigChange(configurationChangeRequest{ + command: AddStaging, + serverID: ServerID(peer), + serverAddress: peer, + prevIndex: 0, + }, 0) } // RemovePeer (deprecated) is used to remove a peer from the cluster. If the @@ -494,7 +513,11 @@ func (r *Raft) AddPeer(peer ServerAddress) Future { // to occur. This must be run on the leader or it will fail. // Use RemoveServer instead. func (r *Raft) RemovePeer(peer ServerAddress) Future { - return r.RemoveServer(ServerID(peer), 0, 0) + return r.requestConfigChange(configurationChangeRequest{ + command: RemoveServer, + serverID: ServerID(peer), + prevIndex: 0, + }, 0) } // AddVoter will add the given server to the cluster as a staging server. If the @@ -506,6 +529,10 @@ func (r *Raft) RemovePeer(peer ServerAddress) Future { // If nonzero, timeout is how long this server should wait before the // configuration change log entry is appended. func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture { + if r.protocolVersion < 1 { + return errorFuture{ErrUnsupportedProtocol} + } + return r.requestConfigChange(configurationChangeRequest{ command: AddStaging, serverID: id, @@ -520,6 +547,10 @@ func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, ti // a staging server or voter, this does nothing. This must be run on the leader // or it will fail. For prevIndex and timeout, see AddVoter. func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture { + if r.protocolVersion < 1 { + return errorFuture{ErrUnsupportedProtocol} + } + return r.requestConfigChange(configurationChangeRequest{ command: AddNonvoter, serverID: id, @@ -532,6 +563,10 @@ func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, // leader is being removed, it will cause a new election to occur. This must be // run on the leader or it will fail. For prevIndex and timeout, see AddVoter. func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture { + if r.protocolVersion < 1 { + return errorFuture{ErrUnsupportedProtocol} + } + return r.requestConfigChange(configurationChangeRequest{ command: RemoveServer, serverID: id, @@ -545,6 +580,10 @@ func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration // does nothing. This must be run on the leader or it will fail. For prevIndex // and timeout, see AddVoter. func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture { + if r.protocolVersion < 1 { + return errorFuture{ErrUnsupportedProtocol} + } + return r.requestConfigChange(configurationChangeRequest{ command: DemoteVoter, serverID: id, diff --git a/config.go b/config.go index 8f7ccca3c..b9313de9b 100644 --- a/config.go +++ b/config.go @@ -7,9 +7,35 @@ import ( "time" ) +// These are the versions of the protocol (which includes wire protocol as +// well as Raft-specific log entries) that this server can _understand_. Use +// the ProtocolVersion member of the Config object to control the version of +// the protocol to use when _speaking_ to other servers. +// +// Version History +// +// 0: Unversioned original protocol spoken until Q2 2016. +// 1: Added server IDs and a new peer change mechanism via a new LogConfiguration +// log entry type. All servers must be running >= 1 in order to support new +// staging and nonvoter modes for servers. +const ( + ProtocolVersionMin uint8 = 0 + ProtocolVersionMax = 1 +) + // Config provides any necessary configuration to // the Raft server type Config struct { + // ProtocolVersion allows a Raft server to inter-operate with older + // Raft servers running an older version of the code. This is used to + // version the wire protocol as well as Raft-specific log entries that + // the server uses when _speaking_ to other servers. There is currently + // no auto-negotiation of versions so all servers must be manually + // configured with compatible versions. See ProtocolVersionMin and + // ProtocolVersionMax for the versions of the protocol that this server + // can _understand_. + ProtocolVersion uint8 + // HeartbeatTimeout specifies the time in follower state without // a leader before we attempt an election. HeartbeatTimeout time.Duration @@ -82,6 +108,7 @@ type Config struct { // DefaultConfig returns a Config with usable defaults. func DefaultConfig() *Config { return &Config{ + ProtocolVersion: ProtocolVersionMax, HeartbeatTimeout: 1000 * time.Millisecond, ElectionTimeout: 1000 * time.Millisecond, CommitTimeout: 50 * time.Millisecond, @@ -96,6 +123,9 @@ func DefaultConfig() *Config { // ValidateConfig is used to validate a sane configuration func ValidateConfig(config *Config) error { + if config.ProtocolVersion > ProtocolVersionMax { + return fmt.Errorf("Protocol version cannot be > %d", ProtocolVersionMax) + } if config.HeartbeatTimeout < 5*time.Millisecond { return fmt.Errorf("Heartbeat timeout is too low") } diff --git a/configuration.go b/configuration.go index 11866c50a..d605ce461 100644 --- a/configuration.go +++ b/configuration.go @@ -274,17 +274,39 @@ func nextConfiguration(current Configuration, currentIndex uint64, change config return configuration, nil } +// encodePeers is used to serialize a Configuration into the old peers format. +// This is here for backwards compatibility when operating with a mix of old +// servers and should be removed eventually. +func encodePeers(configuration Configuration, trans Transport) []byte { + // Gather up all the voters, other suffrage types are not supported by + // this data format. + var encPeers [][]byte + for _, server := range configuration.Servers { + if server.Suffrage == Voter { + encPeers = append(encPeers, trans.EncodePeer(server.Address)) + } + } + + // Encode the entire array. + buf, err := encodeMsgPack(encPeers) + if err != nil { + panic(fmt.Errorf("failed to encode peers: %v", err)) + } + + return buf.Bytes() +} + // decodePeers is used to deserialize an old list of peers into a Configuration. // This is here for backwards compatibility with old log entries and snapshots; // it should be removed eventually. func decodePeers(buf []byte, trans Transport) Configuration { - // Decode the buffer first + // Decode the buffer first. var encPeers [][]byte if err := decodeMsgPack(buf, &encPeers); err != nil { panic(fmt.Errorf("failed to decode peers: %v", err)) } - // Deserialize each peer + // Deserialize each peer. var servers []Server for _, enc := range encPeers { p := trans.DecodePeer(enc) diff --git a/configuration_test.go b/configuration_test.go index 9adeb4193..e6b1b521e 100644 --- a/configuration_test.go +++ b/configuration_test.go @@ -283,12 +283,9 @@ func TestConfiguration_nextConfiguration_checkConfiguration(t *testing.T) { } } -func TestConfiguration_decodePeers(t *testing.T) { +func TestConfiguration_encodeDecodePeers(t *testing.T) { + // Set up configuration. var configuration Configuration - _, trans := NewInmemTransport("") - - // Set up configuration and encode into old format - var encPeers [][]byte for i := 0; i < 3; i++ { address := NewInmemAddr() configuration.Servers = append(configuration.Servers, Server{ @@ -296,16 +293,14 @@ func TestConfiguration_decodePeers(t *testing.T) { ID: ServerID(address), Address: ServerAddress(address), }) - encPeers = append(encPeers, trans.EncodePeer(address)) - } - buf, err := encodeMsgPack(encPeers) - if err != nil { - panic(fmt.Errorf("failed to encode peers: %v", err)) } - // Decode from old format, as if reading an old log entry - decoded := decodePeers(buf.Bytes(), trans) + // Encode into the old format. + _, trans := NewInmemTransport("") + buf := encodePeers(configuration, trans) + // Decode from old format, as if reading an old log entry. + decoded := decodePeers(buf, trans) if !reflect.DeepEqual(configuration, decoded) { t.Fatalf("mismatch %v %v", configuration, decoded) } diff --git a/log.go b/log.go index 2e9cab6e3..19b8710fb 100644 --- a/log.go +++ b/log.go @@ -26,7 +26,8 @@ const ( LogBarrier // LogConfiguration establishes a membership change configuration. It is - // created when a server is added, removed, promoted, etc. + // created when a server is added, removed, promoted, etc. Only used + // when protocol version 1 or greater is in use. LogConfiguration ) diff --git a/raft.go b/raft.go index 3d0fd086a..e15037ef3 100644 --- a/raft.go +++ b/raft.go @@ -628,10 +628,26 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { r.logger.Printf("[INFO] raft: Updating configuration with %s (%v, %v) to %v", future.req.command, future.req.serverAddress, future.req.serverID, configuration) - future.log = Log{ - Type: LogConfiguration, - Data: encodeConfiguration(configuration), + + // In pre-ID compatibility mode we translate all configuration changes + // in to an old remove peer message, which can handle all supported + // cases for peer changes in the pre-ID world (adding and removing + // voters). Both add peer and remove peer log entries are handled + // similarly on old Raft servers, but remove peer does extra checks to + // see if a leader needs to step down. Since they both assert the full + // configuration, then we can safely call remove peer for everything. + if r.protocolVersion < 1 { + future.log = Log{ + Type: LogRemovePeerDeprecated, + Data: encodePeers(configuration, r.trans), + } + } else { + future.log = Log{ + Type: LogConfiguration, + Data: encodeConfiguration(configuration), + } } + r.dispatchLogs([]*logFuture{&future.logFuture}) index := future.Index() r.configurations.latest = configuration @@ -740,6 +756,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) { case LogRemovePeerDeprecated: case LogNoop: // Ignore the no-op + default: r.logger.Printf("[ERR] raft: Got unrecognized log type: %#v", l) } @@ -891,13 +908,9 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { return } + // Handle any new configuration changes for _, newEntry := range newEntries { - if newEntry.Type == LogConfiguration { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex - r.configurations.latest = decodeConfiguration(newEntry.Data) - r.configurations.latestIndex = newEntry.Index - } + r.checkAndProcessConfigurationLog(newEntry) } // Update the lastLog @@ -927,6 +940,23 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { return } +// checkAndProcessConfigurationLog takes a log entry and updates the latest +// configuration if the entry results in a new configuration. This must only be +// called from the main thread. +func (r *Raft) checkAndProcessConfigurationLog(entry *Log) { + if entry.Type == LogConfiguration { + r.configurations.committed = r.configurations.latest + r.configurations.committedIndex = r.configurations.latestIndex + r.configurations.latest = decodeConfiguration(entry.Data) + r.configurations.latestIndex = entry.Index + } else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated { + r.configurations.committed = r.configurations.latest + r.configurations.committedIndex = r.configurations.latestIndex + r.configurations.latest = decodePeers(entry.Data, r.trans) + r.configurations.latestIndex = entry.Index + } +} + // requestVote is invoked when we get an request vote RPC call. func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now()) diff --git a/raft_test.go b/raft_test.go index 7b56c56b6..94012af1e 100644 --- a/raft_test.go +++ b/raft_test.go @@ -523,28 +523,22 @@ WAIT: goto CHECK } -// raftToPeerSet returns the set of peers as a map. -func (c *cluster) raftToPeerSet(r *Raft) map[ServerID]struct{} { - peers := make(map[ServerID]struct{}) - +// getConfiguration returns the configuration of the given Raft instance, or +// fails the test if there's an error +func (c *cluster) getConfiguration(r *Raft) Configuration { configuration, _, err := r.GetConfiguration() if err != nil { c.FailNowf("[ERR] failed to get configuration: %v", err) - return peers + return Configuration{} } - for _, p := range configuration.Servers { - if p.Suffrage == Voter { - peers[p.ID] = struct{}{} - } - } - return peers + return configuration } // EnsureSamePeers makes sure all the rafts have the same set of peers. func (c *cluster) EnsureSamePeers(t *testing.T) { limit := time.Now().Add(c.longstopTimeout) - peerSet := c.raftToPeerSet(c.rafts[0]) + peerSet := c.getConfiguration(c.rafts[0]) CHECK: for i, raft := range c.rafts { @@ -552,10 +546,10 @@ CHECK: continue } - otherSet := c.raftToPeerSet(raft) + otherSet := c.getConfiguration(raft) if !reflect.DeepEqual(peerSet, otherSet) { if time.Now().After(limit) { - c.FailNowf("[ERR] peer mismatch: %v %v", peerSet, otherSet) + c.FailNowf("[ERR] peer mismatch: %+v %+v", peerSet, otherSet) } else { goto WAIT } @@ -1578,8 +1572,6 @@ func TestRaft_ReJoinFollower(t *testing.T) { // Enable operation after a remove. conf := inmemConfig(t) conf.ShutdownOnRemove = false - - // Make a cluster c := MakeCluster(3, t, conf) defer c.Close() @@ -1949,6 +1941,73 @@ func TestRaft_Voting(t *testing.T) { } } +func TestRaft_ProtocolVersion_0(t *testing.T) { + // Make a cluster on the old protocol. + conf := inmemConfig(t) + conf.ProtocolVersion = 0 + c := MakeCluster(2, t, conf) + defer c.Close() + + // Set up another server, also speaking the old protocol. + c1 := MakeClusterNoBootstrap(1, t, conf) + + // Merge clusters. + c.Merge(c1) + c.FullyConnect() + + // Make sure the new id-based operations aren't supported in the old + // protocol. + future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) + if err := future.Error(); err != ErrUnsupportedProtocol { + c.FailNowf("[ERR] err: %v", err) + } + future = c.Leader().AddNonvoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second) + if err := future.Error(); err != ErrUnsupportedProtocol { + c.FailNowf("[ERR] err: %v", err) + } + future = c.Leader().RemoveServer(c1.rafts[0].localID, 0, 1*time.Second) + if err := future.Error(); err != ErrUnsupportedProtocol { + c.FailNowf("[ERR] err: %v", err) + } + future = c.Leader().DemoteVoter(c1.rafts[0].localID, 0, 1*time.Second) + if err := future.Error(); err != ErrUnsupportedProtocol { + c.FailNowf("[ERR] err: %v", err) + } + + // Now do the join using the old address-based API (this returns a + // different type of future). + { + future := c.Leader().AddPeer(c1.rafts[0].localAddr) + if err := future.Error(); err != nil { + c.FailNowf("[ERR] err: %v", err) + } + } + + // Set up another server, this time speaking the new protocol. + c2 := MakeClusterNoBootstrap(1, t, nil) + + // Merge this one in. + c.Merge(c2) + c.FullyConnect() + + // Make sure this can join in and inter-operate with the old servers. + { + future := c.Leader().AddPeer(c2.rafts[0].localAddr) + if err := future.Error(); err != nil { + c.FailNowf("[ERR] err: %v", err) + } + } + + // Check the FSMs + c.EnsureSame(t) + + // Check the peers + c.EnsureSamePeers(t) + + // Ensure one leader + c.EnsureLeader(t, c.Leader().localAddr) +} + // TODO: These are test cases we'd like to write for appendEntries(). // Unfortunately, it's difficult to do so with the current way this file is // tested.