Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion: reworking membership changes #117

Merged
merged 17 commits into from
Jun 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ type RequestVoteResponse struct {
// Newer term if leader is out of date
Term uint64

// Return the peers, so that a node can shutdown on removal
Peers []byte

// Is the vote granted
Granted bool
}
Expand All @@ -69,9 +66,16 @@ type InstallSnapshotRequest struct {
LastLogIndex uint64
LastLogTerm uint64

// Peer Set in the snapshot
// Peer Set in the snapshot. This is deprecated in favor of Configuration
// but remains here in case we receive an InstallSnapshot from a leader
// that's running old code.
Peers []byte

// Cluster membership.
Configuration []byte
// Log index where 'Configuration' entry was originally written.
ConfigurationIndex uint64

// Size of the snapshot
Size int64
}
Expand Down
32 changes: 17 additions & 15 deletions commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ type commitment struct {
sync.Mutex
// notified when commitIndex increases
commitCh chan struct{}
// voter to log index: the server stores up through this log entry
matchIndexes map[string]uint64
// voter ID to log index: the server stores up through this log entry
matchIndexes map[ServerID]uint64
// a quorum stores up through this log entry. monotonically increases.
commitIndex uint64
// the first index of this leader's term: this needs to be replicated to a
Expand All @@ -26,14 +26,15 @@ type commitment struct {
// newCommitment returns an commitment struct that notifies the provided
// channel when log entries have been committed. A new commitment struct is
// created each time this server becomes leader for a particular term.
// 'voters' are the voting members of the cluster, including the
// local server except when it's removed itself from the cluster.
// 'configuration' is the servers in the cluster.
// 'startIndex' is the first index created in this term (see
// its description above).
func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *commitment {
matchIndexes := make(map[string]uint64, len(voters))
for _, voter := range voters {
matchIndexes[voter] = 0
func newCommitment(commitCh chan struct{}, configuration Configuration, startIndex uint64) *commitment {
matchIndexes := make(map[ServerID]uint64)
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
matchIndexes[server.ID] = 0
}
}
return &commitment{
commitCh: commitCh,
Expand All @@ -44,16 +45,17 @@ func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *
}

// Called when a new cluster membership configuration is created: it will be
// used to determine commitment from now on. 'voters' are the voting members of
// the cluster, including the local server except when it's removed itself from
// used to determine commitment from now on. 'configuration' is the servers in
// the cluster.
func (c *commitment) setVoters(voters []string) {
func (c *commitment) setConfiguration(configuration Configuration) {
c.Lock()
defer c.Unlock()
oldMatchIndexes := c.matchIndexes
c.matchIndexes = make(map[string]uint64, len(voters))
for _, voter := range voters {
c.matchIndexes[voter] = oldMatchIndexes[voter] // defaults to 0
c.matchIndexes = make(map[ServerID]uint64)
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
c.matchIndexes[server.ID] = oldMatchIndexes[server.ID] // defaults to 0
}
}
c.recalculate()
}
Expand All @@ -69,7 +71,7 @@ func (c *commitment) getCommitIndex() uint64 {
// leader has written the new entry or a follower has replied to an
// AppendEntries RPC. The given server's disk agrees with this server's log up
// through the given index.
func (c *commitment) match(server string, matchIndex uint64) {
func (c *commitment) match(server ServerID, matchIndex uint64) {
c.Lock()
defer c.Unlock()
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
Expand Down
34 changes: 23 additions & 11 deletions commitment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@ import (
"testing"
)

func makeConfiguration(voters []string) Configuration {
var configuration Configuration
for _, voter := range voters {
configuration.Servers = append(configuration.Servers, Server{
Suffrage: Voter,
Address: ServerAddress(voter + "addr"),
ID: ServerID(voter),
})
}
return configuration
}

// Returns a slice of server names of size n.
func voters(n int) []string {
func voters(n int) Configuration {
if n > 7 {
panic("only up to 7 servers implemented")
}
return []string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n]
return makeConfiguration([]string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n])
}

// Tests setVoters() keeps matchIndexes where possible.
func TestCommitment_setVoters(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, []string{"a", "b", "c"}, 0)
c := newCommitment(commitCh, makeConfiguration([]string{"a", "b", "c"}), 0)
c.match("a", 10)
c.match("b", 20)
c.match("c", 30)
// commitIndex: 20
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.setVoters([]string{"c", "d", "e"})
c.setConfiguration(makeConfiguration([]string{"c", "d", "e"}))
// c: 30, d: 0, e: 0
c.match("e", 40)
if c.getCommitIndex() != 30 {
Expand Down Expand Up @@ -107,7 +119,7 @@ func TestCommitment_recalculate(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters(voters(3))
c.setConfiguration(voters(3))
// s1: 30, s2: 20, s3: 10
if c.getCommitIndex() != 20 {
t.Fatalf("expected 20 entries committed, found %d",
Expand All @@ -117,7 +129,7 @@ func TestCommitment_recalculate(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters(voters(4))
c.setConfiguration(voters(4))
// s1: 30, s2: 20, s3: 10, s4: 0
c.match("s2", 25)
if c.getCommitIndex() != 20 {
Expand Down Expand Up @@ -169,9 +181,9 @@ func TestCommitment_recalculate_startIndex(t *testing.T) {
// to not mark anything committed.
func TestCommitment_noVoterSanity(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, []string{}, 4)
c := newCommitment(commitCh, makeConfiguration([]string{}), 4)
c.match("s1", 10)
c.setVoters([]string{})
c.setConfiguration(makeConfiguration([]string{}))
c.match("s1", 10)
if c.getCommitIndex() != 0 {
t.Fatalf("no voting servers: shouldn't be able to commit")
Expand All @@ -181,7 +193,7 @@ func TestCommitment_noVoterSanity(t *testing.T) {
}

// add a voter so we can commit something and then remove it
c.setVoters(voters(1))
c.setConfiguration(voters(1))
c.match("s1", 10)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
Expand All @@ -191,7 +203,7 @@ func TestCommitment_noVoterSanity(t *testing.T) {
t.Fatalf("expected commit notify")
}

c.setVoters([]string{})
c.setConfiguration(makeConfiguration([]string{}))
c.match("s1", 20)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
Expand All @@ -215,7 +227,7 @@ func TestCommitment_singleVoter(t *testing.T) {
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.setVoters(voters(1))
c.setConfiguration(voters(1))
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}
Expand Down
37 changes: 14 additions & 23 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ type Config struct {
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool

// DisableBootstrapAfterElect is used to turn off EnableSingleNode
// after the node is elected. This is used to prevent self-election
// if the node is removed from the Raft cluster via RemovePeer. Setting
// it to false will keep the bootstrap mode, allowing the node to self-elect
// and potentially bootstrap a separate cluster.
DisableBootstrapAfterElect bool

// TrailingLogs controls how many logs we leave after a snapshot. This is
// used so that we can quickly replay logs on a follower instead of being
// forced to send an entire snapshot.
Expand All @@ -55,11 +48,6 @@ type Config struct {
// just replay a small set of logs.
SnapshotThreshold uint64

// EnableSingleNode allows for a single node mode of operation. This
// is false by default, which prevents a lone node from electing itself.
// leader.
EnableSingleNode bool

// LeaderLeaseTimeout is used to control how long the "lease" lasts
// for being the leader without being able to contact a quorum
// of nodes. If we reach this interval without contact, we will
Expand All @@ -70,6 +58,11 @@ type Config struct {
// never be used except for testing purposes, as it can cause a split-brain.
StartAsLeader bool

// The unique ID for this server across all time. For now, this defaults to
// an empty string, indicating the server's network address should be used
// here. That default will be removed in the future.
LocalID ServerID

// NotifyCh is used to provide a channel that will be notified of leadership
// changes. Raft will block writing to this channel, so it should either be
// buffered or aggressively consumed.
Expand All @@ -87,17 +80,15 @@ type Config struct {
// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
DisableBootstrapAfterElect: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
EnableSingleNode: false,
LeaderLeaseTimeout: 500 * time.Millisecond,
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
LeaderLeaseTimeout: 500 * time.Millisecond,
}
}

Expand Down
Loading