Skip to content

Commit

Permalink
Adds basic protocol versioning for in-place upgrades.
Browse files Browse the repository at this point in the history
  • Loading branch information
James Phillips committed Jul 18, 2016
1 parent 1521771 commit 19587a7
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 67 deletions.
93 changes: 66 additions & 27 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,21 @@ var (

// ErrNothingNewToSnapshot is returned when trying to create a snapshot
// but there's nothing new commited to the FSM since we started.
ErrNothingNewToSnapshot = errors.New("Nothing new to snapshot")
ErrNothingNewToSnapshot = errors.New("nothing new to snapshot")

// ErrUnsupportedProtocol is returned when an operation is attempted
// that's not supported by the current protocol version.
ErrUnsupportedProtocol = errors.New("operation not supported with current protocol version")
)

// Raft implements a Raft node.
type Raft struct {
raftState

// protocolVersion is used to inter-operate with older Raft servers. See
// ProtocolVersion in Config for more details.
protocolVersion uint8

// applyCh is used to async send logs to the main thread to
// be committed and applied to the FSM.
applyCh chan *logFuture
Expand Down Expand Up @@ -207,12 +215,12 @@ func BootstrapCluster(config *Config, logs LogStore, stable StableStore, snaps S
// Raft node.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore,
trans Transport) (*Raft, error) {
// Validate the configuration
// Validate the configuration.
if err := ValidateConfig(conf); err != nil {
return nil, err
}

// Ensure we have a LogOutput
// Ensure we have a LogOutput.
var logger *log.Logger
if conf.Logger != nil {
logger = conf.Logger
Expand All @@ -223,47 +231,53 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
logger = log.New(conf.LogOutput, "", log.LstdFlags)
}

// Try to restore the current term
// Try to restore the current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
if err != nil && err.Error() != "not found" {
return nil, fmt.Errorf("failed to load current term: %v", err)
}

// Read the last log value
// Read the last log value.
lastIdx, err := logs.LastIndex()
if err != nil {
return nil, fmt.Errorf("failed to find last log: %v", err)
}

// Get the log
// Get the log.
var lastLog Log
if lastIdx > 0 {
if err = logs.GetLog(lastIdx, &lastLog); err != nil {
return nil, fmt.Errorf("failed to get last log: %v", err)
}
}

// Make sure we have a valid server address and ID.
protocolVersion := conf.ProtocolVersion
localAddr := ServerAddress(trans.LocalAddr())
localID := conf.LocalID
if localID == "" {
logger.Printf("[WARN] raft: No server ID given, using network address: %v. This default will be removed in the future. Set server ID explicitly in config.",
if protocolVersion < 1 || localID == "" {
// During the transition to the new ID system, keep this as an
// INFO level message. Once the new scheme has been out for a
// while, change this to a deprecation WARN message.
logger.Printf("[INFO] raft: No server ID given, or ProtocolVersion < 1, using network address as server ID: %v",
localAddr)
localID = ServerID(localAddr)
}

// Create Raft struct
// Create Raft struct.
r := &Raft{
applyCh: make(chan *logFuture),
conf: conf,
fsm: fsm,
fsmCommitCh: make(chan commitTuple, 128),
fsmRestoreCh: make(chan *restoreFuture),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: conf,
fsm: fsm,
fsmCommitCh: make(chan commitTuple, 128),
fsmRestoreCh: make(chan *restoreFuture),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
configurationChangeCh: make(chan *configurationChangeFuture),
configurations: configurations{},
rpcCh: trans.Consumer(),
Expand All @@ -277,7 +291,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
observers: make(map[uint64]*Observer),
}

// Initialize as a follower
// Initialize as a follower.
r.setState(Follower)

// Start as leader if specified. This should only be used
Expand All @@ -287,16 +301,16 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
r.setLeader(r.localAddr)
}

// Restore the current term and the last log
// Restore the current term and the last log.
r.setCurrentTerm(currentTerm)
r.setLastLog(lastLog.Index, lastLog.Term)

// Attempt to restore a snapshot if there are any
// Attempt to restore a snapshot if there are any.
if err := r.restoreSnapshot(); err != nil {
return nil, err
}

// Scan through the log for any configuration change entries
// Scan through the log for any configuration change entries.
snapshotIndex, _ := r.getLastSnapshot()
for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
var entry Log
Expand All @@ -319,7 +333,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processHeartbeat)

// Start the background work
// Start the background work.
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
Expand Down Expand Up @@ -486,15 +500,24 @@ func (r *Raft) GetConfiguration() (Configuration, uint64, error) {
// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
// run on the leader or it will fail. Use AddVoter/AddNonvoter instead.
func (r *Raft) AddPeer(peer ServerAddress) Future {
return r.AddVoter(ServerID(peer), peer, 0, 0)
return r.requestConfigChange(configurationChangeRequest{
command: AddStaging,
serverID: ServerID(peer),
serverAddress: peer,
prevIndex: 0,
}, 0)
}

// RemovePeer (deprecated) is used to remove a peer from the cluster. If the
// current leader is being removed, it will cause a new election
// to occur. This must be run on the leader or it will fail.
// Use RemoveServer instead.
func (r *Raft) RemovePeer(peer ServerAddress) Future {
return r.RemoveServer(ServerID(peer), 0, 0)
return r.requestConfigChange(configurationChangeRequest{
command: RemoveServer,
serverID: ServerID(peer),
prevIndex: 0,
}, 0)
}

// AddVoter will add the given server to the cluster as a staging server. If the
Expand All @@ -506,6 +529,10 @@ func (r *Raft) RemovePeer(peer ServerAddress) Future {
// If nonzero, timeout is how long this server should wait before the
// configuration change log entry is appended.
func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 1 {
return errorFuture{ErrUnsupportedProtocol}
}

return r.requestConfigChange(configurationChangeRequest{
command: AddStaging,
serverID: id,
Expand All @@ -520,6 +547,10 @@ func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, ti
// a staging server or voter, this does nothing. This must be run on the leader
// or it will fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 1 {
return errorFuture{ErrUnsupportedProtocol}
}

return r.requestConfigChange(configurationChangeRequest{
command: AddNonvoter,
serverID: id,
Expand All @@ -532,6 +563,10 @@ func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64,
// leader is being removed, it will cause a new election to occur. This must be
// run on the leader or it will fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 1 {
return errorFuture{ErrUnsupportedProtocol}
}

return r.requestConfigChange(configurationChangeRequest{
command: RemoveServer,
serverID: id,
Expand All @@ -545,6 +580,10 @@ func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration
// does nothing. This must be run on the leader or it will fail. For prevIndex
// and timeout, see AddVoter.
func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 1 {
return errorFuture{ErrUnsupportedProtocol}
}

return r.requestConfigChange(configurationChangeRequest{
command: DemoteVoter,
serverID: id,
Expand Down
30 changes: 30 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,35 @@ import (
"time"
)

// These are the versions of the protocol (which includes wire protocol as
// well as Raft-specific log entries) that this server can _understand_. Use
// the ProtocolVersion member of the Config object to control the version of
// the protocol to use when _speaking_ to other servers.
//
// Version History
//
// 0: Unversioned original protocol spoken until Q2 2016.
// 1: Added server IDs and a new peer change mechanism via a new LogConfiguration
// log entry type. All servers must be running >= 1 in order to support new
// staging and nonvoter modes for servers.
const (
ProtocolVersionMin uint8 = 0
ProtocolVersionMax = 1
)

// Config provides any necessary configuration to
// the Raft server
type Config struct {
// ProtocolVersion allows a Raft server to inter-operate with older
// Raft servers running an older version of the code. This is used to
// version the wire protocol as well as Raft-specific log entries that
// the server uses when _speaking_ to other servers. There is currently
// no auto-negotiation of versions so all servers must be manually
// configured with compatible versions. See ProtocolVersionMin and
// ProtocolVersionMax for the versions of the protocol that this server
// can _understand_.
ProtocolVersion uint8

// HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election.
HeartbeatTimeout time.Duration
Expand Down Expand Up @@ -82,6 +108,7 @@ type Config struct {
// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
ProtocolVersion: ProtocolVersionMax,
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
Expand All @@ -96,6 +123,9 @@ func DefaultConfig() *Config {

// ValidateConfig is used to validate a sane configuration
func ValidateConfig(config *Config) error {
if config.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("Protocol version cannot be > %d", ProtocolVersionMax)
}
if config.HeartbeatTimeout < 5*time.Millisecond {
return fmt.Errorf("Heartbeat timeout is too low")
}
Expand Down
26 changes: 24 additions & 2 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,17 +274,39 @@ func nextConfiguration(current Configuration, currentIndex uint64, change config
return configuration, nil
}

// encodePeers is used to serialize a Configuration into the old peers format.
// This is here for backwards compatibility when operating with a mix of old
// servers and should be removed eventually.
func encodePeers(configuration Configuration, trans Transport) []byte {
// Gather up all the voters, other suffrage types are not supported by
// this data format.
var encPeers [][]byte
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
encPeers = append(encPeers, trans.EncodePeer(server.Address))
}
}

// Encode the entire array.
buf, err := encodeMsgPack(encPeers)
if err != nil {
panic(fmt.Errorf("failed to encode peers: %v", err))
}

return buf.Bytes()
}

// decodePeers is used to deserialize an old list of peers into a Configuration.
// This is here for backwards compatibility with old log entries and snapshots;
// it should be removed eventually.
func decodePeers(buf []byte, trans Transport) Configuration {
// Decode the buffer first
// Decode the buffer first.
var encPeers [][]byte
if err := decodeMsgPack(buf, &encPeers); err != nil {
panic(fmt.Errorf("failed to decode peers: %v", err))
}

// Deserialize each peer
// Deserialize each peer.
var servers []Server
for _, enc := range encPeers {
p := trans.DecodePeer(enc)
Expand Down
19 changes: 7 additions & 12 deletions configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,29 +283,24 @@ func TestConfiguration_nextConfiguration_checkConfiguration(t *testing.T) {
}
}

func TestConfiguration_decodePeers(t *testing.T) {
func TestConfiguration_encodeDecodePeers(t *testing.T) {
// Set up configuration.
var configuration Configuration
_, trans := NewInmemTransport("")

// Set up configuration and encode into old format
var encPeers [][]byte
for i := 0; i < 3; i++ {
address := NewInmemAddr()
configuration.Servers = append(configuration.Servers, Server{
Suffrage: Voter,
ID: ServerID(address),
Address: ServerAddress(address),
})
encPeers = append(encPeers, trans.EncodePeer(address))
}
buf, err := encodeMsgPack(encPeers)
if err != nil {
panic(fmt.Errorf("failed to encode peers: %v", err))
}

// Decode from old format, as if reading an old log entry
decoded := decodePeers(buf.Bytes(), trans)
// Encode into the old format.
_, trans := NewInmemTransport("")
buf := encodePeers(configuration, trans)

// Decode from old format, as if reading an old log entry.
decoded := decodePeers(buf, trans)
if !reflect.DeepEqual(configuration, decoded) {
t.Fatalf("mismatch %v %v", configuration, decoded)
}
Expand Down
3 changes: 2 additions & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
LogBarrier

// LogConfiguration establishes a membership change configuration. It is
// created when a server is added, removed, promoted, etc.
// created when a server is added, removed, promoted, etc. Only used
// when protocol version 1 or greater is in use.
LogConfiguration
)

Expand Down
Loading

0 comments on commit 19587a7

Please sign in to comment.