Skip to content

Commit

Permalink
Merge pull request #6140 from gyuho/network-partition
Browse files Browse the repository at this point in the history
*: add network partition tests
  • Loading branch information
gyuho authored Aug 12, 2016
2 parents 82a3d90 + 0a00328 commit f975fe8
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 0 deletions.
16 changes: 16 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,22 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}()
}

// CutPeer drops messages to the specified peer.
func (s *EtcdServer) CutPeer(id types.ID) {
tr, ok := s.r.transport.(*rafthttp.Transport)
if ok {
tr.CutPeer(id)
}
}

// MendPeer recovers the message dropping behavior of the given peer.
func (s *EtcdServer) MendPeer(id types.ID) {
tr, ok := s.r.transport.(*rafthttp.Transport)
if ok {
tr.MendPeer(id)
}
}

func (s *EtcdServer) PauseSending() { s.r.pauseSending() }

func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
Expand Down
43 changes: 43 additions & 0 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {

func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) }

// waitLeader waits until given members agree on the same leader.
func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
possibleLead := make(map[uint64]bool)
var lead uint64
Expand Down Expand Up @@ -369,6 +370,28 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
return -1
}

func (c *cluster) WaitNoLeader(t *testing.T) { c.waitNoLeader(t, c.Members) }

// waitNoLeader waits until given members lose leader.
func (c *cluster) waitNoLeader(t *testing.T, membs []*member) {
noLeader := false
for !noLeader {
noLeader = true
for _, m := range membs {
select {
case <-m.s.StopNotify():
continue
default:
}
if m.s.Lead() != 0 {
noLeader = false
time.Sleep(10 * tickDuration)
break
}
}
}
}

func (c *cluster) waitVersion() {
for _, m := range c.Members {
for {
Expand Down Expand Up @@ -502,6 +525,10 @@ func (m *member) listenGRPC() error {
return nil
}

func (m *member) electionTimeout() time.Duration {
return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond
}

func (m *member) DropConnections() { m.grpcBridge.Reset() }

// NewClientV3 creates a new grpc client connection to the member
Expand Down Expand Up @@ -741,6 +768,22 @@ func (m *member) Metric(metricName string) (string, error) {
return "", nil
}

// InjectPartition drops connections from m to others, vice versa.
func (m *member) InjectPartition(t *testing.T, others []*member) {
for _, other := range others {
m.s.CutPeer(other.s.ID())
other.s.CutPeer(m.s.ID())
}
}

// RecoverPartition recovers connections from m to others, vice versa.
func (m *member) RecoverPartition(t *testing.T, others []*member) {
for _, other := range others {
m.s.MendPeer(other.s.ID())
other.s.MendPeer(m.s.ID())
}
}

func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
cfgtls := transport.TLSInfo{}
if tls != nil {
Expand Down
136 changes: 136 additions & 0 deletions integration/network_partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"testing"
"time"

"github.com/coreos/etcd/pkg/testutil"
)

func TestNetworkPartition5MembersLeaderInMinority(t *testing.T) {
defer testutil.AfterTest(t)

clus := NewClusterV3(t, &ClusterConfig{Size: 5})
defer clus.Terminate(t)

leadIndex := clus.WaitLeader(t)

// minority: leader, follower / majority: follower, follower, follower
minority := []int{leadIndex, (leadIndex + 1) % 5}
majority := []int{(leadIndex + 2) % 5, (leadIndex + 3) % 5, (leadIndex + 4) % 5}

minorityMembers := getMembersByIndexSlice(clus.cluster, minority)
majorityMembers := getMembersByIndexSlice(clus.cluster, majority)

// network partition (bi-directional)
injectPartition(t, minorityMembers, majorityMembers)

// minority leader must be lost
clus.waitNoLeader(t, minorityMembers)

// wait extra election timeout
time.Sleep(2 * majorityMembers[0].electionTimeout())

// new leader must be from majority
clus.waitLeader(t, majorityMembers)

// recover network partition (bi-directional)
recoverPartition(t, minorityMembers, majorityMembers)
clusterMustProgress(t, clus.Members)
}

func TestNetworkPartition5MembersLeaderInMajority(t *testing.T) {
defer testutil.AfterTest(t)

clus := NewClusterV3(t, &ClusterConfig{Size: 5})
defer clus.Terminate(t)

leadIndex := clus.WaitLeader(t)

// majority: leader, follower, follower / minority: follower, follower
majority := []int{leadIndex, (leadIndex + 1) % 5, (leadIndex + 2) % 5}
minority := []int{(leadIndex + 3) % 5, (leadIndex + 4) % 5}

majorityMembers := getMembersByIndexSlice(clus.cluster, majority)
minorityMembers := getMembersByIndexSlice(clus.cluster, minority)

// network partition (bi-directional)
injectPartition(t, majorityMembers, minorityMembers)

// minority leader must be lost
clus.waitNoLeader(t, minorityMembers)

// wait extra election timeout
time.Sleep(2 * majorityMembers[0].electionTimeout())

// leader must be hold in majority
leadIndex2 := clus.waitLeader(t, majorityMembers)
leadID, leadID2 := clus.Members[leadIndex].s.ID(), majorityMembers[leadIndex2].s.ID()
if leadID != leadID2 {
t.Fatalf("unexpected leader change from %s, got %s", leadID, leadID2)
}

// recover network partition (bi-directional)
recoverPartition(t, majorityMembers, minorityMembers)
clusterMustProgress(t, clus.Members)
}

func TestNetworkPartition4Members(t *testing.T) {
defer testutil.AfterTest(t)

clus := NewClusterV3(t, &ClusterConfig{Size: 4})
defer clus.Terminate(t)

leadIndex := clus.WaitLeader(t)

// groupA: leader, follower / groupB: follower, follower
groupA := []int{leadIndex, (leadIndex + 1) % 4}
groupB := []int{(leadIndex + 2) % 4, (leadIndex + 3) % 4}

leaderPartition := getMembersByIndexSlice(clus.cluster, groupA)
followerPartition := getMembersByIndexSlice(clus.cluster, groupB)

// network partition (bi-directional)
injectPartition(t, leaderPartition, followerPartition)

// no group has quorum, so leader must be lost in all members
clus.WaitNoLeader(t)

// recover network partition (bi-directional)
recoverPartition(t, leaderPartition, followerPartition)
clusterMustProgress(t, clus.Members)
}

func getMembersByIndexSlice(clus *cluster, idxs []int) []*member {
ms := make([]*member, len(idxs))
for i, idx := range idxs {
ms[i] = clus.Members[idx]
}
return ms
}

func injectPartition(t *testing.T, src, others []*member) {
for _, m := range src {
m.InjectPartition(t, others)
}
}

func recoverPartition(t *testing.T, src, others []*member) {
for _, m := range src {
m.RecoverPartition(t, others)
}
}

0 comments on commit f975fe8

Please sign in to comment.