Skip to content

Commit

Permalink
Rework membership changes
Browse files Browse the repository at this point in the history
This is the main patch for hashicorp#117. It still has several outstanding problems that
we'll work through in later commits, but it's good enough to discuss.
  • Loading branch information
ongardie committed Jun 16, 2016
1 parent 406a5b2 commit 687db02
Show file tree
Hide file tree
Showing 22 changed files with 1,023 additions and 811 deletions.
12 changes: 8 additions & 4 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ type RequestVoteResponse struct {
// Newer term if leader is out of date
Term uint64

// Return the peers, so that a node can shutdown on removal
Peers []byte

// Is the vote granted
Granted bool
}
Expand All @@ -69,9 +66,16 @@ type InstallSnapshotRequest struct {
LastLogIndex uint64
LastLogTerm uint64

// Peer Set in the snapshot
// Peer Set in the snapshot. This is deprecated in favor of Configuration
// but remains here in case we receive an InstallSnapshot from a leader
// that's running old code.
Peers []byte

// Cluster membership.
Configuration []byte
// Log index where 'Configuration' entry was originally written.
ConfigurationIndex uint64

// Size of the snapshot
Size int64
}
Expand Down
28 changes: 15 additions & 13 deletions commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type commitment struct {
sync.Mutex
// notified when commitIndex increases
commitCh chan struct{}
// voter to log index: the server stores up through this log entry
// voter GUID to log index: the server stores up through this log entry
matchIndexes map[string]uint64
// a quorum stores up through this log entry. monotonically increases.
commitIndex uint64
Expand All @@ -26,14 +26,15 @@ type commitment struct {
// newCommitment returns an commitment struct that notifies the provided
// channel when log entries have been committed. A new commitment struct is
// created each time this server becomes leader for a particular term.
// 'voters' are the voting members of the cluster, including the
// local server except when it's removed itself from the cluster.
// 'configuration' is the servers in the cluster.
// 'startIndex' is the first index created in this term (see
// its description above).
func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *commitment {
matchIndexes := make(map[string]uint64, len(voters))
for _, voter := range voters {
matchIndexes[voter] = 0
func newCommitment(commitCh chan struct{}, configuration Configuration, startIndex uint64) *commitment {
matchIndexes := make(map[string]uint64)
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
matchIndexes[server.GUID] = 0
}
}
return &commitment{
commitCh: commitCh,
Expand All @@ -44,16 +45,17 @@ func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *
}

// Called when a new cluster membership configuration is created: it will be
// used to determine commitment from now on. 'voters' are the voting members of
// the cluster, including the local server except when it's removed itself from
// used to determine commitment from now on. 'configuration' is the servers in
// the cluster.
func (c *commitment) setVoters(voters []string) {
func (c *commitment) setConfiguration(configuration Configuration) {
c.Lock()
defer c.Unlock()
oldMatchIndexes := c.matchIndexes
c.matchIndexes = make(map[string]uint64, len(voters))
for _, voter := range voters {
c.matchIndexes[voter] = oldMatchIndexes[voter] // defaults to 0
c.matchIndexes = make(map[string]uint64)
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
c.matchIndexes[server.GUID] = oldMatchIndexes[server.GUID] // defaults to 0
}
}
c.recalculate()
}
Expand Down
34 changes: 23 additions & 11 deletions commitment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@ import (
"testing"
)

func makeConfiguration(voters []string) Configuration {
var configuration Configuration
for _, voter := range voters {
configuration.Servers = append(configuration.Servers, Server{
Suffrage: Voter,
Address: voter,
GUID: voter,
})
}
return configuration
}

// Returns a slice of server names of size n.
func voters(n int) []string {
func voters(n int) Configuration {
if n > 7 {
panic("only up to 7 servers implemented")
}
return []string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n]
return makeConfiguration([]string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n])
}

// Tests setVoters() keeps matchIndexes where possible.
func TestCommitment_setVoters(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, []string{"a", "b", "c"}, 0)
c := newCommitment(commitCh, makeConfiguration([]string{"a", "b", "c"}), 0)
c.match("a", 10)
c.match("b", 20)
c.match("c", 30)
// commitIndex: 20
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.setVoters([]string{"c", "d", "e"})
c.setConfiguration(makeConfiguration([]string{"c", "d", "e"}))
// c: 30, d: 0, e: 0
c.match("e", 40)
if c.getCommitIndex() != 30 {
Expand Down Expand Up @@ -107,7 +119,7 @@ func TestCommitment_recalculate(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters(voters(3))
c.setConfiguration(voters(3))
// s1: 30, s2: 20, s3: 10
if c.getCommitIndex() != 20 {
t.Fatalf("expected 20 entries committed, found %d",
Expand All @@ -117,7 +129,7 @@ func TestCommitment_recalculate(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters(voters(4))
c.setConfiguration(voters(4))
// s1: 30, s2: 20, s3: 10, s4: 0
c.match("s2", 25)
if c.getCommitIndex() != 20 {
Expand Down Expand Up @@ -169,9 +181,9 @@ func TestCommitment_recalculate_startIndex(t *testing.T) {
// to not mark anything committed.
func TestCommitment_noVoterSanity(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, []string{}, 4)
c := newCommitment(commitCh, makeConfiguration([]string{}), 4)
c.match("s1", 10)
c.setVoters([]string{})
c.setConfiguration(makeConfiguration([]string{}))
c.match("s1", 10)
if c.getCommitIndex() != 0 {
t.Fatalf("no voting servers: shouldn't be able to commit")
Expand All @@ -181,7 +193,7 @@ func TestCommitment_noVoterSanity(t *testing.T) {
}

// add a voter so we can commit something and then remove it
c.setVoters(voters(1))
c.setConfiguration(voters(1))
c.match("s1", 10)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
Expand All @@ -191,7 +203,7 @@ func TestCommitment_noVoterSanity(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters([]string{})
c.setConfiguration(makeConfiguration([]string{}))
c.match("s1", 20)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
Expand All @@ -215,7 +227,7 @@ func TestCommitment_singleVoter(t *testing.T) {
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.setVoters(voters(1))
c.setConfiguration(voters(1))
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}
Expand Down
32 changes: 9 additions & 23 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ type Config struct {
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool

// DisableBootstrapAfterElect is used to turn off EnableSingleNode
// after the node is elected. This is used to prevent self-election
// if the node is removed from the Raft cluster via RemovePeer. Setting
// it to false will keep the bootstrap mode, allowing the node to self-elect
// and potentially bootstrap a separate cluster.
DisableBootstrapAfterElect bool

// TrailingLogs controls how many logs we leave after a snapshot. This is
// used so that we can quickly replay logs on a follower instead of being
// forced to send an entire snapshot.
Expand All @@ -55,11 +48,6 @@ type Config struct {
// just replay a small set of logs.
SnapshotThreshold uint64

// EnableSingleNode allows for a single node mode of operation. This
// is false by default, which prevents a lone node from electing itself.
// leader.
EnableSingleNode bool

// LeaderLeaseTimeout is used to control how long the "lease" lasts
// for being the leader without being able to contact a quorum
// of nodes. If we reach this interval without contact, we will
Expand Down Expand Up @@ -87,17 +75,15 @@ type Config struct {
// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
DisableBootstrapAfterElect: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
EnableSingleNode: false,
LeaderLeaseTimeout: 500 * time.Millisecond,
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
LeaderLeaseTimeout: 500 * time.Millisecond,
}
}

Expand Down
156 changes: 156 additions & 0 deletions configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package raft

import "fmt"

// ServerSuffrage determines whether a Server in a Configuration gets a vote.
type ServerSuffrage int

const (
// Voter is a server whose vote is counted in elections and whose match index
// is used in advancing the leader's commit index.
Voter ServerSuffrage = 1
// Nonvoter is a server that receives log entries but is not considered for
// elections or commitment purposes.
Nonvoter ServerSuffrage = 2
// Staging is a server that acts like a nonvoter with one exception: once a
// staging server receives enough log entries to be sufficiently caught up to
// the leader's log, the leader will invoke a membership change to change
// the Staging server to a Voter.
Staging ServerSuffrage = 3
)

// Server tracks the information about a single server in a configuration.
type Server struct {
// Suffrage determines whether the server gets a vote.
Suffrage ServerSuffrage
// GUID is a unique string identifying this server for all time.
GUID string
// Address is its network address that a transport can contact.
Address string
}

// Configuration tracks which servers are in the cluster, and whether they have
// votes. This should include the local server, if it's a member of the cluster.
// The servers are listed no particular order, but each should only appear once.
// These entries are appended to the log during membership changes.
type Configuration struct {
Servers []Server
}

// configurations is state tracked on every server about its Configurations.
// Note that, per Diego's dissertation, there can be at most one uncommitted
// configuration at a time (the next configuration may not be created until the
// prior one has been committed).
//
// One downside to storing just two configurations is that if you try to take a
// snahpsot when your state machine hasn't yet applied the committedIndex, we
// have no record of the configuration that would logically fit into that
// snapshot. We disallow snapshots in that case now. An alternative approach,
// which LogCabin uses, is to track every configuration change in the
// log.
type configurations struct {
// committed is the latest configuration in the log/snapshot that has been
// committed (the one with the largest index).
committed Configuration
// committedIndex is the log index where 'committed' was written.
committedIndex uint64
// latest is the latest configuration in the log/snapshot (may be committed
// or uncommitted)
latest Configuration
// latestIndex is the log index where 'latest' was written.
latestIndex uint64
}

// cloneConfiguration makes a deep copy of a Configuration.
func cloneConfiguration(old Configuration) (copy Configuration) {
copy.Servers = append(copy.Servers, old.Servers...)
return
}

// hasVote returns true if the server identified by 'guid' is a Voter in the
// provided Configuration.
func hasVote(configuration Configuration, guid string) bool {
for _, server := range configuration.Servers {
if server.GUID == guid {
return server.Suffrage == Voter
}
}
return false
}

// checkConfiguration tests a cluster membership configuration for common
// errors.
func checkConfiguration(configuration Configuration) error {
guidSet := make(map[string]bool)
addressSet := make(map[string]bool)
var voters int
for _, server := range configuration.Servers {
if server.GUID == "" {
return fmt.Errorf("Empty GUID in configuration: %v", configuration)
}
if server.Address == "" {
return fmt.Errorf("Empty address in configuration: %v", server)
}
if guidSet[server.GUID] {
return fmt.Errorf("Found duplicate GUID in configuration: %v", server.GUID)
}
guidSet[server.GUID] = true
if addressSet[server.Address] {
return fmt.Errorf("Found duplicate Address in configuration: %v", server.Address)
}
addressSet[server.Address] = true
if server.Suffrage == Voter {
voters++
}
}
if voters == 0 {
return fmt.Errorf("Need at least one voter in configuration: %v", configuration)
}
return nil
}

// decodePeers is used to deserialize an old list of peers into a Configuration.
// This is here for backwards compatibility with old log entries and snapshots;
// it should be removed eventually.
func decodePeers(buf []byte, trans Transport) Configuration {
// Decode the buffer first
var encPeers [][]byte
if err := decodeMsgPack(buf, &encPeers); err != nil {
panic(fmt.Errorf("failed to decode peers: %v", err))
}

// Deserialize each peer
var servers []Server
for _, enc := range encPeers {
p := trans.DecodePeer(enc)
servers = append(servers, Server{
Suffrage: Voter,
GUID: p,
Address: p,
})
}

return Configuration{
Servers: servers,
}
}

// encodeConfiguration serializes a Configuration using MsgPack, or panics on
// errors.
func encodeConfiguration(configuration Configuration) []byte {
buf, err := encodeMsgPack(configuration)
if err != nil {
panic(fmt.Errorf("failed to encode peers: %v", err))
}
return buf.Bytes()
}

// decodeConfiguration deserializes a Configuration using MsgPack, or panics on
// errors.
func decodeConfiguration(buf []byte) Configuration {
var configuration Configuration
if err := decodeMsgPack(buf, &configuration); err != nil {
panic(fmt.Errorf("failed to decode configuration: %v", err))
}
return configuration
}
Loading

0 comments on commit 687db02

Please sign in to comment.