Skip to content

Commit

Permalink
functional-test: add advance network failure cases
Browse files Browse the repository at this point in the history
add more network failures such as packet corruption, reordering, loss, and network partition.

resolve #5614
  • Loading branch information
fanminshi committed Dec 1, 2016
1 parent aea9c66 commit 69b7117
Show file tree
Hide file tree
Showing 9 changed files with 457 additions and 18 deletions.
82 changes: 79 additions & 3 deletions pkg/netutil/isolate_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,82 @@ func RecoverPort(port int) error {
return err
}

// SetPacketCorruption corrupts packets at p%
func SetPacketCorruption(p int) error {
if p < 0 || p > 100 {
return fmt.Errorf("packets corruption percentage must be between 0 and 100 but got %v", p)
}
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr1 := fmt.Sprintf("sudo tc qdisc add dev %s root handle 1:1 netem corrupt %d%%", ifce, p)
cmdStr2 := fmt.Sprintf("sudo tc qdisc add dev %s parent 1:1 handle 10:1 netem corrupt %d%%", ifce, p)
cmdStrs := []string{cmdStr1, cmdStr2}

for _, cmdStr := range cmdStrs {
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}

return nil

}

// SetPacketReordering reorders packets. rp% of packets (with a correlation of cp%) gets send immediately. The rest will be delayed for ms millisecond
func SetPacketReordering(rp int, cp int, ms int) error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr1 := fmt.Sprintf("sudo tc qdisc add dev %s root handle 1:1 netem delay %dms reorder %d%% %d%%", ifce, ms, rp, cp)
cmdStr2 := fmt.Sprintf("sudo tc qdisc add dev %s parent 1:1 handle 10:1 delay %dms reorder %d%% %d%%", ifce, ms, rp, cp)
cmdStrs := []string{cmdStr1, cmdStr2}

for _, cmdStr := range cmdStrs {
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}

return nil

}

// SetPackLoss randomly drop packet at p% probability
func SetPackLoss(p int) error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem loss %d%%", ifce, p)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}

return nil
}

// SetPartitioning sets a very long delay of ms scale with random variations rv to isolate this node
func SetPartitioning(ms int, rv int) error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}

cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}

return nil
}

// SetLatency adds latency in millisecond scale with random variations.
func SetLatency(ms, rv int) error {
ifce, err := GetDefaultInterface()
Expand All @@ -64,12 +140,12 @@ func SetLatency(ms, rv int) error {
return nil
}

// RemoveLatency resets latency configurations.
func RemoveLatency() error {
func ResetDefaultInterface() error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
cmdStr := fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
return err
}
10 changes: 10 additions & 0 deletions pkg/netutil/isolate_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ func DropPort(port int) error { return nil }

func RecoverPort(port int) error { return nil }

func SetPacketCorruption(cp int) error { return nil }

func SetPacketReordering(ms int, rp int, cp int) error { return nil }

func SetPackLoss(p int) error { return nil }

func SetPartitioning(ms int, rv int) error { return nil }

func SetLatency(ms, rv int) error { return nil }

func RemoveLatency() error { return nil }

func ResetDefaultInterface() error { return nil }
2 changes: 1 addition & 1 deletion tools/functional-tester/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM alpine
RUN apk update
RUN apk add -v iptables sudo
RUN apk --update add iptables bash iproute2
ADD bin/etcd-agent /
ADD bin/etcd /
ADD bin/etcd-tester /
Expand Down
7 changes: 1 addition & 6 deletions tools/functional-tester/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,5 @@ tester:
- /etcd-tester
- -agent-endpoints
- "172.20.0.2:9027,172.20.0.3:9027,172.20.0.4:9027"
- -limit
- "1"
- -stress-key-count
- "1"
- -stress-key-size
- "1"


38 changes: 35 additions & 3 deletions tools/functional-tester/etcd-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,48 @@ func (a *Agent) recoverPort(port int) error {
return netutil.RecoverPort(port)
}

func (a *Agent) setLatency(ms, rv int) error {
func (a *Agent) setPacketCorruption(cp int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPacketCorruption(cp)
}

func (a *Agent) setPacketReordering(rp, cp, ms int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPacketReordering(rp, cp, ms)
}

func (a *Agent) setPackLoss(p int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPackLoss(p)
}

func (a *Agent) setPartitioning(ms, rv int) error {
if !a.cfg.UseRoot {
return nil
}
if ms == 0 {
return netutil.RemoveLatency()
return netutil.SetPartitioning(ms, rv)
}

func (a *Agent) setLatency(ms, rv int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetLatency(ms, rv)
}

func (a *Agent) resetDefaultInterface() error {
if !a.cfg.UseRoot {
return nil
}
return netutil.ResetDefaultInterface()
}

func (a *Agent) status() client.Status {
return client.Status{State: a.state}
}
Expand Down
50 changes: 49 additions & 1 deletion tools/functional-tester/etcd-agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,25 @@ type Agent interface {
DropPort(port int) error
// RecoverPort stops dropping all network packets at the given port.
RecoverPort(port int) error
// SetPacketCorruption corrupts packets at a probability of p%
SetPacketCorruption(p int) error
// RemovePacketCorruption removes packet corruption.
RemovePacketCorruption() error
// SetPacketReordering reorders packets. rp% of packets (with a correlation of cp%) gets send immediately. The rest will be delayed for ms millisecond
SetPacketReordering(rp, cp, ms int) error
// RemovePacketReordering removes reordering of packets
RemovePacketReordering() error
// SetPackLoss randomly drop packet at a probability of p%
SetPacketLoss(p int) error
// RemovePacketLoss removes dropping of packets
RemovePacketLoss() error
// SetPartitioning sets a very long network delay of ms scale with random variations rv to isolate a node
SetPartitioning(ms, rv int) error
// SetPartitioning removes partition of a node
RemovePartitioning() error
// SetLatency slows down network by introducing latency.
SetLatency(ms, rv int) error
// RemoveLatency removes latency introduced by SetLatency.
// RemoveLatency removes latency.
RemoveLatency() error
// Status returns the status of etcd on the agent
Status() (Status, error)
Expand Down Expand Up @@ -99,6 +115,38 @@ func (a *agent) RecoverPort(port int) error {
return a.rpcClient.Call("Agent.RPCRecoverPort", port, nil)
}

func (a *agent) SetPacketCorruption(p int) error {
return a.rpcClient.Call("Agent.RPCSetPacketCorruption", []int{p}, nil)
}

func (a *agent) RemovePacketCorruption() error {
return a.rpcClient.Call("Agent.RPCRemovePacketCorruption", struct{}{}, nil)
}

func (a *agent) SetPacketReordering(rp, cp, ms int) error {
return a.rpcClient.Call("Agent.RPCSetPacketReordering", []int{rp, cp, ms}, nil)
}

func (a *agent) RemovePacketReordering() error {
return a.rpcClient.Call("Agent.RPCRemovePacketCorruption", struct{}{}, nil)
}

func (a *agent) SetPacketLoss(p int) error {
return a.rpcClient.Call("Agent.RPCSetPacketLoss", []int{p}, nil)
}

func (a *agent) RemovePacketLoss() error {
return a.rpcClient.Call("Agent.RPCRemovePacketLoss", struct{}{}, nil)
}

func (a *agent) SetPartitioning(ms, rv int) error {
return a.rpcClient.Call("Agent.RPCSetPartitioning", []int{ms, rv}, nil)
}

func (a *agent) RemovePartitioning() error {
return a.rpcClient.Call("Agent.RPCRemovePartitioning", struct{}{}, nil)
}

func (a *agent) SetLatency(ms, rv int) error {
return a.rpcClient.Call("Agent.RPCSetLatency", []int{ms, rv}, nil)
}
Expand Down
86 changes: 85 additions & 1 deletion tools/functional-tester/etcd-agent/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,90 @@ func (a *Agent) RPCRecoverPort(port int, reply *struct{}) error {
return nil
}

func (a *Agent) RPCSetPacketCorruption(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPacketCorruption needs 1 arg, got (%v)", args)
}
plog.Printf("set packet corruption at %d%%)", args[0])
err := a.setPacketCorruption(args[0])
if err != nil {
plog.Println("error setting packet corruption", err)
}
return nil
}

func (a *Agent) RPCRemovePacketCorruption(args struct{}, reply *struct{}) error {
plog.Println("removing packet corruption")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet corruption")
}
return nil
}

func (a *Agent) RPCSetPacketReordering(args []int, reply *struct{}) error {
if len(args) != 3 {
return fmt.Errorf("SetPacketReordering needs 3 args, got (%v)", args)
}
plog.Printf("SetPacketReordering reorders packets. %d%% of packets (with a correlation of %d%%) gets send immediately. The rest will be delayed for %d millisecond", args[0], args[1], args[2])
err := a.setPacketCorruption(args[0])
if err != nil {
plog.Println("error setting packet reordering", err)
}
return nil
}

func (a *Agent) RPCRemovePacketReordering(args struct{}, reply *struct{}) error {
plog.Println("removing packet reordering")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet reordering")
}
return nil
}

func (a *Agent) RPCSetPacketLoss(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPacketLoss needs 1 arg, got (%v)", args)
}
plog.Printf("SetPackLoss randomly drop packet at %d probability", args[0])
err := a.setPackLoss(args[0])
if err != nil {
plog.Println("error setting packet loss", err)
}
return nil
}

func (a *Agent) RPCRemovePacketLoss(args struct{}, reply *struct{}) error {
plog.Println("removing packet loss")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet loss")
}
return nil
}

func (a *Agent) RPCSetPartitioning(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPartitioning needs 2 args, got (%v)", args)
}
plog.Printf("SetPartitioning sets %dms (+/- %dms)", args[0], args[1])
err := a.setPartitioning(args[0], args[1])
if err != nil {
plog.Println("error setting packet loss", err)
}
return nil
}

func (a *Agent) RPCRemovePartitioning(args struct{}, reply *struct{}) error {
plog.Println("removing packet partitioning")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet partitioning")
}
return nil
}

func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error {
if len(args) != 2 {
return fmt.Errorf("SetLatency needs two args, got (%v)", args)
Expand All @@ -118,7 +202,7 @@ func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error {

func (a *Agent) RPCRemoveLatency(args struct{}, reply *struct{}) error {
plog.Println("removing latency")
err := a.setLatency(0, 0)
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing latency")
}
Expand Down
Loading

0 comments on commit 69b7117

Please sign in to comment.