Skip to content

Commit

Permalink
Merge pull request hashicorp#117 from ongardie/membership
Browse files Browse the repository at this point in the history
Reworks membership changes for one in process at any time.
  • Loading branch information
slackpad authored Jun 28, 2016
2 parents eea86d7 + 33b1b0e commit 0eb014d
Show file tree
Hide file tree
Showing 23 changed files with 1,299 additions and 893 deletions.
12 changes: 8 additions & 4 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ type RequestVoteResponse struct {
// Newer term if leader is out of date
Term uint64

// Return the peers, so that a node can shutdown on removal
Peers []byte

// Is the vote granted
Granted bool
}
Expand All @@ -69,9 +66,16 @@ type InstallSnapshotRequest struct {
LastLogIndex uint64
LastLogTerm uint64

// Peer Set in the snapshot
// Peer Set in the snapshot. This is deprecated in favor of Configuration
// but remains here in case we receive an InstallSnapshot from a leader
// that's running old code.
Peers []byte

// Cluster membership.
Configuration []byte
// Log index where 'Configuration' entry was originally written.
ConfigurationIndex uint64

// Size of the snapshot
Size int64
}
Expand Down
32 changes: 17 additions & 15 deletions commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ type commitment struct {
sync.Mutex
// notified when commitIndex increases
commitCh chan struct{}
// voter to log index: the server stores up through this log entry
matchIndexes map[string]uint64
// voter ID to log index: the server stores up through this log entry
matchIndexes map[ServerID]uint64
// a quorum stores up through this log entry. monotonically increases.
commitIndex uint64
// the first index of this leader's term: this needs to be replicated to a
Expand All @@ -26,14 +26,15 @@ type commitment struct {
// newCommitment returns an commitment struct that notifies the provided
// channel when log entries have been committed. A new commitment struct is
// created each time this server becomes leader for a particular term.
// 'voters' are the voting members of the cluster, including the
// local server except when it's removed itself from the cluster.
// 'configuration' is the servers in the cluster.
// 'startIndex' is the first index created in this term (see
// its description above).
func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *commitment {
matchIndexes := make(map[string]uint64, len(voters))
for _, voter := range voters {
matchIndexes[voter] = 0
func newCommitment(commitCh chan struct{}, configuration Configuration, startIndex uint64) *commitment {
matchIndexes := make(map[ServerID]uint64)
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
matchIndexes[server.ID] = 0
}
}
return &commitment{
commitCh: commitCh,
Expand All @@ -44,16 +45,17 @@ func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *
}

// Called when a new cluster membership configuration is created: it will be
// used to determine commitment from now on. 'voters' are the voting members of
// the cluster, including the local server except when it's removed itself from
// used to determine commitment from now on. 'configuration' is the servers in
// the cluster.
func (c *commitment) setVoters(voters []string) {
func (c *commitment) setConfiguration(configuration Configuration) {
c.Lock()
defer c.Unlock()
oldMatchIndexes := c.matchIndexes
c.matchIndexes = make(map[string]uint64, len(voters))
for _, voter := range voters {
c.matchIndexes[voter] = oldMatchIndexes[voter] // defaults to 0
c.matchIndexes = make(map[ServerID]uint64)
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
c.matchIndexes[server.ID] = oldMatchIndexes[server.ID] // defaults to 0
}
}
c.recalculate()
}
Expand All @@ -69,7 +71,7 @@ func (c *commitment) getCommitIndex() uint64 {
// leader has written the new entry or a follower has replied to an
// AppendEntries RPC. The given server's disk agrees with this server's log up
// through the given index.
func (c *commitment) match(server string, matchIndex uint64) {
func (c *commitment) match(server ServerID, matchIndex uint64) {
c.Lock()
defer c.Unlock()
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
Expand Down
34 changes: 23 additions & 11 deletions commitment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@ import (
"testing"
)

func makeConfiguration(voters []string) Configuration {
var configuration Configuration
for _, voter := range voters {
configuration.Servers = append(configuration.Servers, Server{
Suffrage: Voter,
Address: ServerAddress(voter + "addr"),
ID: ServerID(voter),
})
}
return configuration
}

// Returns a slice of server names of size n.
func voters(n int) []string {
func voters(n int) Configuration {
if n > 7 {
panic("only up to 7 servers implemented")
}
return []string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n]
return makeConfiguration([]string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n])
}

// Tests setVoters() keeps matchIndexes where possible.
func TestCommitment_setVoters(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, []string{"a", "b", "c"}, 0)
c := newCommitment(commitCh, makeConfiguration([]string{"a", "b", "c"}), 0)
c.match("a", 10)
c.match("b", 20)
c.match("c", 30)
// commitIndex: 20
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.setVoters([]string{"c", "d", "e"})
c.setConfiguration(makeConfiguration([]string{"c", "d", "e"}))
// c: 30, d: 0, e: 0
c.match("e", 40)
if c.getCommitIndex() != 30 {
Expand Down Expand Up @@ -107,7 +119,7 @@ func TestCommitment_recalculate(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters(voters(3))
c.setConfiguration(voters(3))
// s1: 30, s2: 20, s3: 10
if c.getCommitIndex() != 20 {
t.Fatalf("expected 20 entries committed, found %d",
Expand All @@ -117,7 +129,7 @@ func TestCommitment_recalculate(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters(voters(4))
c.setConfiguration(voters(4))
// s1: 30, s2: 20, s3: 10, s4: 0
c.match("s2", 25)
if c.getCommitIndex() != 20 {
Expand Down Expand Up @@ -169,9 +181,9 @@ func TestCommitment_recalculate_startIndex(t *testing.T) {
// to not mark anything committed.
func TestCommitment_noVoterSanity(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, []string{}, 4)
c := newCommitment(commitCh, makeConfiguration([]string{}), 4)
c.match("s1", 10)
c.setVoters([]string{})
c.setConfiguration(makeConfiguration([]string{}))
c.match("s1", 10)
if c.getCommitIndex() != 0 {
t.Fatalf("no voting servers: shouldn't be able to commit")
Expand All @@ -181,7 +193,7 @@ func TestCommitment_noVoterSanity(t *testing.T) {
}

// add a voter so we can commit something and then remove it
c.setVoters(voters(1))
c.setConfiguration(voters(1))
c.match("s1", 10)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
Expand All @@ -191,7 +203,7 @@ func TestCommitment_noVoterSanity(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters([]string{})
c.setConfiguration(makeConfiguration([]string{}))
c.match("s1", 20)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
Expand All @@ -215,7 +227,7 @@ func TestCommitment_singleVoter(t *testing.T) {
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.setVoters(voters(1))
c.setConfiguration(voters(1))
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}
Expand Down
37 changes: 14 additions & 23 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ type Config struct {
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool

// DisableBootstrapAfterElect is used to turn off EnableSingleNode
// after the node is elected. This is used to prevent self-election
// if the node is removed from the Raft cluster via RemovePeer. Setting
// it to false will keep the bootstrap mode, allowing the node to self-elect
// and potentially bootstrap a separate cluster.
DisableBootstrapAfterElect bool

// TrailingLogs controls how many logs we leave after a snapshot. This is
// used so that we can quickly replay logs on a follower instead of being
// forced to send an entire snapshot.
Expand All @@ -55,11 +48,6 @@ type Config struct {
// just replay a small set of logs.
SnapshotThreshold uint64

// EnableSingleNode allows for a single node mode of operation. This
// is false by default, which prevents a lone node from electing itself.
// leader.
EnableSingleNode bool

// LeaderLeaseTimeout is used to control how long the "lease" lasts
// for being the leader without being able to contact a quorum
// of nodes. If we reach this interval without contact, we will
Expand All @@ -70,6 +58,11 @@ type Config struct {
// never be used except for testing purposes, as it can cause a split-brain.
StartAsLeader bool

// The unique ID for this server across all time. For now, this defaults to
// an empty string, indicating the server's network address should be used
// here. That default will be removed in the future.
LocalID ServerID

// NotifyCh is used to provide a channel that will be notified of leadership
// changes. Raft will block writing to this channel, so it should either be
// buffered or aggressively consumed.
Expand All @@ -87,17 +80,15 @@ type Config struct {
// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
DisableBootstrapAfterElect: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
EnableSingleNode: false,
LeaderLeaseTimeout: 500 * time.Millisecond,
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
LeaderLeaseTimeout: 500 * time.Millisecond,
}
}

Expand Down
Loading

0 comments on commit 0eb014d

Please sign in to comment.