Skip to content

Commit

Permalink
Merge pull request hashicorp#134 from hashicorp/b-racy-configuration
Browse files Browse the repository at this point in the history
Fixes races with Raft configurations member, adds GetConfiguration accessor.
  • Loading branch information
slackpad authored Jul 9, 2016
2 parents 522b2cc + a6ec80e commit 78e25a2
Show file tree
Hide file tree
Showing 7 changed files with 1,923 additions and 1,782 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)

test:
go test -timeout=30s ./...
go test -timeout=45s ./...

integ: test
INTEG_TESTS=yes go test -timeout=3s -run=Integ ./...
Expand Down
105 changes: 102 additions & 3 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ type Configuration struct {
Servers []Server
}

// Clone makes a deep copy of a Configuration.
func (c *Configuration) Clone() (copy Configuration) {
copy.Servers = append(copy.Servers, c.Servers...)
return
}

// ConfigurationChangeCommand is the different ways to change the cluster
// configuration.
type ConfigurationChangeCommand uint8
Expand Down Expand Up @@ -127,9 +133,12 @@ type configurations struct {
latestIndex uint64
}

// cloneConfiguration makes a deep copy of a Configuration.
func cloneConfiguration(old Configuration) (copy Configuration) {
copy.Servers = append(copy.Servers, old.Servers...)
// Clone makes a deep copy of a configurations object.
func (c *configurations) Clone() (copy configurations) {
copy.committed = c.committed.Clone()
copy.committedIndex = c.committedIndex
copy.latest = c.latest.Clone()
copy.latestIndex = c.latestIndex
return
}

Expand Down Expand Up @@ -175,6 +184,96 @@ func checkConfiguration(configuration Configuration) error {
return nil
}

// nextConfiguration generates a new Configuration from the current one and a
// configuration change request. It's split from appendConfigurationEntry so
// that it can be unit tested easily.
func nextConfiguration(current Configuration, currentIndex uint64, change configurationChangeRequest) (Configuration, error) {
if change.prevIndex > 0 && change.prevIndex != currentIndex {
return Configuration{}, fmt.Errorf("Configuration changed since %v (latest is %v)", change.prevIndex, currentIndex)
}

configuration := current.Clone()
switch change.command {
case AddStaging:
// TODO: barf on new address?
newServer := Server{
// TODO: This should add the server as Staging, to be automatically
// promoted to Voter later. However, the promoton to Voter is not yet
// implemented, and doing so is not trivial with the way the leader loop
// coordinates with the replication goroutines today. So, for now, the
// server will have a vote right away, and the Promote case below is
// unused.
Suffrage: Voter,
ID: change.serverID,
Address: change.serverAddress,
}
found := false
for i, server := range configuration.Servers {
if server.ID == change.serverID {
if server.Suffrage == Voter {
configuration.Servers[i].Address = change.serverAddress
} else {
configuration.Servers[i] = newServer
}
found = true
break
}
}
if !found {
configuration.Servers = append(configuration.Servers, newServer)
}
case AddNonvoter:
newServer := Server{
Suffrage: Nonvoter,
ID: change.serverID,
Address: change.serverAddress,
}
found := false
for i, server := range configuration.Servers {
if server.ID == change.serverID {
if server.Suffrage != Nonvoter {
configuration.Servers[i].Address = change.serverAddress
} else {
configuration.Servers[i] = newServer
}
found = true
break
}
}
if !found {
configuration.Servers = append(configuration.Servers, newServer)
}
case DemoteVoter:
for i, server := range configuration.Servers {
if server.ID == change.serverID {
configuration.Servers[i].Suffrage = Nonvoter
break
}
}
case RemoveServer:
for i, server := range configuration.Servers {
if server.ID == change.serverID {
configuration.Servers = append(configuration.Servers[:i], configuration.Servers[i+1:]...)
break
}
}
case Promote:
for i, server := range configuration.Servers {
if server.ID == change.serverID && server.Suffrage == Staging {
configuration.Servers[i].Suffrage = Voter
break
}
}
}

// Make sure we didn't do something bad like remove the last voter
if err := checkConfiguration(configuration); err != nil {
return Configuration{}, err
}

return configuration, nil
}

// decodePeers is used to deserialize an old list of peers into a Configuration.
// This is here for backwards compatibility with old log entries and snapshots;
// it should be removed eventually.
Expand Down
190 changes: 188 additions & 2 deletions configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var sampleConfiguration Configuration = Configuration{
},
}

func TestConfiguration_cloneConfiguration(t *testing.T) {
cloned := cloneConfiguration(sampleConfiguration)
func TestConfiguration_Configuration_Clone(t *testing.T) {
cloned := sampleConfiguration.Clone()
if !reflect.DeepEqual(sampleConfiguration, cloned) {
t.Fatalf("mismatch %v %v", sampleConfiguration, cloned)
}
Expand All @@ -38,6 +38,25 @@ func TestConfiguration_cloneConfiguration(t *testing.T) {
}
}

func TestConfiguration_configurations_Clone(t *testing.T) {
configurations := configurations{
committed: sampleConfiguration,
committedIndex: 1,
latest: sampleConfiguration,
latestIndex: 2,
}
cloned := configurations.Clone()
if !reflect.DeepEqual(configurations, cloned) {
t.Fatalf("mismatch %v %v", configurations, cloned)
}
cloned.committed.Servers[1].ID = "scribble"
cloned.latest.Servers[1].ID = "scribble"
if configurations.committed.Servers[1].ID == "scribble" ||
configurations.latest.Servers[1].ID == "scribble" {
t.Fatalf("cloned configuration shouldn't alias Servers")
}
}

func TestConfiguration_hasVote(t *testing.T) {
if hasVote(sampleConfiguration, "id0") {
t.Fatalf("id0 should not have vote")
Expand Down Expand Up @@ -97,6 +116,173 @@ func TestConfiguration_checkConfiguration(t *testing.T) {
}
}

var singleServer = Configuration{
Servers: []Server{
Server{
Suffrage: Voter,
ID: ServerID("id1"),
Address: ServerAddress("addr1x"),
},
},
}

var oneOfEach = Configuration{
Servers: []Server{
Server{
Suffrage: Voter,
ID: ServerID("id1"),
Address: ServerAddress("addr1x"),
},
Server{
Suffrage: Staging,
ID: ServerID("id2"),
Address: ServerAddress("addr2x"),
},
Server{
Suffrage: Nonvoter,
ID: ServerID("id3"),
Address: ServerAddress("addr3x"),
},
},
}

var voterPair = Configuration{
Servers: []Server{
Server{
Suffrage: Voter,
ID: ServerID("id1"),
Address: ServerAddress("addr1x"),
},
Server{
Suffrage: Voter,
ID: ServerID("id2"),
Address: ServerAddress("addr2x"),
},
},
}

var nextConfigurationTests = []struct {
current Configuration
command ConfigurationChangeCommand
serverID int
next string
}{
// AddStaging: was missing.
{Configuration{}, AddStaging, 1, "{[{Voter id1 addr1}]}"},
{singleServer, AddStaging, 2, "{[{Voter id1 addr1x} {Voter id2 addr2}]}"},
// AddStaging: was Voter.
{singleServer, AddStaging, 1, "{[{Voter id1 addr1}]}"},
// AddStaging: was Staging.
{oneOfEach, AddStaging, 2, "{[{Voter id1 addr1x} {Voter id2 addr2} {Nonvoter id3 addr3x}]}"},
// AddStaging: was Nonvoter.
{oneOfEach, AddStaging, 3, "{[{Voter id1 addr1x} {Staging id2 addr2x} {Voter id3 addr3}]}"},

// AddNonvoter: was missing.
{singleServer, AddNonvoter, 2, "{[{Voter id1 addr1x} {Nonvoter id2 addr2}]}"},
// AddNonvoter: was Voter.
{singleServer, AddNonvoter, 1, "{[{Voter id1 addr1}]}"},
// AddNonvoter: was Staging.
{oneOfEach, AddNonvoter, 2, "{[{Voter id1 addr1x} {Staging id2 addr2} {Nonvoter id3 addr3x}]}"},
// AddNonvoter: was Nonvoter.
{oneOfEach, AddNonvoter, 3, "{[{Voter id1 addr1x} {Staging id2 addr2x} {Nonvoter id3 addr3}]}"},

// DemoteVoter: was missing.
{singleServer, DemoteVoter, 2, "{[{Voter id1 addr1x}]}"},
// DemoteVoter: was Voter.
{voterPair, DemoteVoter, 2, "{[{Voter id1 addr1x} {Nonvoter id2 addr2x}]}"},
// DemoteVoter: was Staging.
{oneOfEach, DemoteVoter, 2, "{[{Voter id1 addr1x} {Nonvoter id2 addr2x} {Nonvoter id3 addr3x}]}"},
// DemoteVoter: was Nonvoter.
{oneOfEach, DemoteVoter, 3, "{[{Voter id1 addr1x} {Staging id2 addr2x} {Nonvoter id3 addr3x}]}"},

// RemoveServer: was missing.
{singleServer, RemoveServer, 2, "{[{Voter id1 addr1x}]}"},
// RemoveServer: was Voter.
{voterPair, RemoveServer, 2, "{[{Voter id1 addr1x}]}"},
// RemoveServer: was Staging.
{oneOfEach, RemoveServer, 2, "{[{Voter id1 addr1x} {Nonvoter id3 addr3x}]}"},
// RemoveServer: was Nonvoter.
{oneOfEach, RemoveServer, 3, "{[{Voter id1 addr1x} {Staging id2 addr2x}]}"},

// Promote: was missing.
{singleServer, Promote, 2, "{[{Voter id1 addr1x}]}"},
// Promote: was Voter.
{singleServer, Promote, 1, "{[{Voter id1 addr1x}]}"},
// Promote: was Staging.
{oneOfEach, Promote, 2, "{[{Voter id1 addr1x} {Voter id2 addr2x} {Nonvoter id3 addr3x}]}"},
// Promote: was Nonvoter.
{oneOfEach, Promote, 3, "{[{Voter id1 addr1x} {Staging id2 addr2x} {Nonvoter id3 addr3x}]}"},
}

func TestConfiguration_nextConfiguration_table(t *testing.T) {
for i, tt := range nextConfigurationTests {
req := configurationChangeRequest{
command: tt.command,
serverID: ServerID(fmt.Sprintf("id%d", tt.serverID)),
serverAddress: ServerAddress(fmt.Sprintf("addr%d", tt.serverID)),
}
next, err := nextConfiguration(tt.current, 1, req)
if err != nil {
t.Errorf("nextConfiguration %d should have succeeded, got %v", i, err)
continue
}
if fmt.Sprintf("%v", next) != tt.next {
t.Errorf("nextConfiguration %d returned %v, expected %s", i, next, tt.next)
continue
}
}
}

func TestConfiguration_nextConfiguration_prevIndex(t *testing.T) {
// Stale prevIndex.
req := configurationChangeRequest{
command: AddStaging,
serverID: ServerID("id1"),
serverAddress: ServerAddress("addr1"),
prevIndex: 1,
}
_, err := nextConfiguration(singleServer, 2, req)
if err == nil || !strings.Contains(err.Error(), "changed") {
t.Fatalf("nextConfiguration should have failed due to intervening configuration change")
}

// Current prevIndex.
req = configurationChangeRequest{
command: AddStaging,
serverID: ServerID("id2"),
serverAddress: ServerAddress("addr2"),
prevIndex: 2,
}
_, err = nextConfiguration(singleServer, 2, req)
if err != nil {
t.Fatalf("nextConfiguration should have succeeded, got %v", err)
}

// Zero prevIndex.
req = configurationChangeRequest{
command: AddStaging,
serverID: ServerID("id3"),
serverAddress: ServerAddress("addr3"),
prevIndex: 0,
}
_, err = nextConfiguration(singleServer, 2, req)
if err != nil {
t.Fatalf("nextConfiguration should have succeeded, got %v", err)
}
}

func TestConfiguration_nextConfiguration_checkConfiguration(t *testing.T) {
req := configurationChangeRequest{
command: AddNonvoter,
serverID: ServerID("id1"),
serverAddress: ServerAddress("addr1"),
}
_, err := nextConfiguration(Configuration{}, 1, req)
if err == nil || !strings.Contains(err.Error(), "at least one voter") {
t.Fatalf("nextConfiguration should have failed for not having a voter")
}
}

func TestConfiguration_decodePeers(t *testing.T) {
var configuration Configuration
_, trans := NewInmemTransport("")
Expand Down
15 changes: 10 additions & 5 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,9 @@ type reqSnapshotFuture struct {
deferError

// snapshot details provided by the FSM runner before responding
index uint64
term uint64
configuration Configuration
configurationIndex uint64
snapshot FSMSnapshot
index uint64
term uint64
snapshot FSMSnapshot
}

// restoreFuture is used for requesting an FSM to perform a
Expand All @@ -165,6 +163,13 @@ type verifyFuture struct {
voteLock sync.Mutex
}

// configurationsFuture is used to retrieve the current configurations. This is
// used to allow safe access to this information outside of the main thread.
type configurationsFuture struct {
deferError
configurations configurations
}

// vote is used to respond to a verifyFuture.
// This may block when responding on the notifyCh.
func (v *verifyFuture) vote(leader bool) {
Expand Down
Loading

0 comments on commit 78e25a2

Please sign in to comment.