From 69b7117de9ffa3f6e4c652dc80123f58563d4fb8 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 29 Nov 2016 10:39:54 -0800 Subject: [PATCH] functional-test: add advance network failure cases add more network failures such as packet corruption, reordering, loss, and network partition. resolve #5614 --- pkg/netutil/isolate_linux.go | 82 +++++++- pkg/netutil/isolate_stub.go | 10 + tools/functional-tester/docker/Dockerfile | 2 +- .../docker/docker-compose.yml | 7 +- tools/functional-tester/etcd-agent/agent.go | 38 +++- .../etcd-agent/client/client.go | 50 ++++- tools/functional-tester/etcd-agent/rpc.go | 86 +++++++- .../etcd-tester/failure_agent.go | 188 +++++++++++++++++- tools/functional-tester/etcd-tester/main.go | 12 ++ 9 files changed, 457 insertions(+), 18 deletions(-) diff --git a/pkg/netutil/isolate_linux.go b/pkg/netutil/isolate_linux.go index c0128a5a50b..8e8af8f1bc6 100644 --- a/pkg/netutil/isolate_linux.go +++ b/pkg/netutil/isolate_linux.go @@ -41,6 +41,82 @@ func RecoverPort(port int) error { return err } +// SetPacketCorruption corrupts packets at p% +func SetPacketCorruption(p int) error { + if p < 0 || p > 100 { + return fmt.Errorf("packets corruption percentage must be between 0 and 100 but got %v", p) + } + ifce, err := GetDefaultInterface() + if err != nil { + return err + } + cmdStr1 := fmt.Sprintf("sudo tc qdisc add dev %s root handle 1:1 netem corrupt %d%%", ifce, p) + cmdStr2 := fmt.Sprintf("sudo tc qdisc add dev %s parent 1:1 handle 10:1 netem corrupt %d%%", ifce, p) + cmdStrs := []string{cmdStr1, cmdStr2} + + for _, cmdStr := range cmdStrs { + _, err = exec.Command("/bin/sh", "-c", cmdStr).Output() + if err != nil { + return err + } + } + + return nil + +} + +// SetPacketReordering reorders packets. rp% of packets (with a correlation of cp%) gets send immediately. The rest will be delayed for ms millisecond +func SetPacketReordering(rp int, cp int, ms int) error { + ifce, err := GetDefaultInterface() + if err != nil { + return err + } + cmdStr1 := fmt.Sprintf("sudo tc qdisc add dev %s root handle 1:1 netem delay %dms reorder %d%% %d%%", ifce, ms, rp, cp) + cmdStr2 := fmt.Sprintf("sudo tc qdisc add dev %s parent 1:1 handle 10:1 delay %dms reorder %d%% %d%%", ifce, ms, rp, cp) + cmdStrs := []string{cmdStr1, cmdStr2} + + for _, cmdStr := range cmdStrs { + _, err = exec.Command("/bin/sh", "-c", cmdStr).Output() + if err != nil { + return err + } + } + + return nil + +} + +// SetPackLoss randomly drop packet at p% probability +func SetPackLoss(p int) error { + ifce, err := GetDefaultInterface() + if err != nil { + return err + } + cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem loss %d%%", ifce, p) + _, err = exec.Command("/bin/sh", "-c", cmdStr).Output() + if err != nil { + return err + } + + return nil +} + +// SetPartitioning sets a very long delay of ms scale with random variations rv to isolate this node +func SetPartitioning(ms int, rv int) error { + ifce, err := GetDefaultInterface() + if err != nil { + return err + } + + cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv) + _, err = exec.Command("/bin/sh", "-c", cmdStr).Output() + if err != nil { + return err + } + + return nil +} + // SetLatency adds latency in millisecond scale with random variations. func SetLatency(ms, rv int) error { ifce, err := GetDefaultInterface() @@ -64,12 +140,12 @@ func SetLatency(ms, rv int) error { return nil } -// RemoveLatency resets latency configurations. -func RemoveLatency() error { +func ResetDefaultInterface() error { ifce, err := GetDefaultInterface() if err != nil { return err } - _, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output() + cmdStr := fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce) + _, err = exec.Command("/bin/sh", "-c", cmdStr).Output() return err } diff --git a/pkg/netutil/isolate_stub.go b/pkg/netutil/isolate_stub.go index 7f4c3e67c2a..af9eca911ca 100644 --- a/pkg/netutil/isolate_stub.go +++ b/pkg/netutil/isolate_stub.go @@ -20,6 +20,16 @@ func DropPort(port int) error { return nil } func RecoverPort(port int) error { return nil } +func SetPacketCorruption(cp int) error { return nil } + +func SetPacketReordering(ms int, rp int, cp int) error { return nil } + +func SetPackLoss(p int) error { return nil } + +func SetPartitioning(ms int, rv int) error { return nil } + func SetLatency(ms, rv int) error { return nil } func RemoveLatency() error { return nil } + +func ResetDefaultInterface() error { return nil } diff --git a/tools/functional-tester/docker/Dockerfile b/tools/functional-tester/docker/Dockerfile index 0c8e49f788b..2a75555855a 100644 --- a/tools/functional-tester/docker/Dockerfile +++ b/tools/functional-tester/docker/Dockerfile @@ -1,6 +1,6 @@ FROM alpine RUN apk update -RUN apk add -v iptables sudo +RUN apk --update add iptables bash iproute2 ADD bin/etcd-agent / ADD bin/etcd / ADD bin/etcd-tester / diff --git a/tools/functional-tester/docker/docker-compose.yml b/tools/functional-tester/docker/docker-compose.yml index 5aa7659bf1d..5a30e804f0c 100644 --- a/tools/functional-tester/docker/docker-compose.yml +++ b/tools/functional-tester/docker/docker-compose.yml @@ -19,10 +19,5 @@ tester: - /etcd-tester - -agent-endpoints - "172.20.0.2:9027,172.20.0.3:9027,172.20.0.4:9027" - - -limit - - "1" - - -stress-key-count - - "1" - - -stress-key-size - - "1" + diff --git a/tools/functional-tester/etcd-agent/agent.go b/tools/functional-tester/etcd-agent/agent.go index 7302dc27583..f1423a30dd3 100644 --- a/tools/functional-tester/etcd-agent/agent.go +++ b/tools/functional-tester/etcd-agent/agent.go @@ -191,16 +191,48 @@ func (a *Agent) recoverPort(port int) error { return netutil.RecoverPort(port) } -func (a *Agent) setLatency(ms, rv int) error { +func (a *Agent) setPacketCorruption(cp int) error { + if !a.cfg.UseRoot { + return nil + } + return netutil.SetPacketCorruption(cp) +} + +func (a *Agent) setPacketReordering(rp, cp, ms int) error { + if !a.cfg.UseRoot { + return nil + } + return netutil.SetPacketReordering(rp, cp, ms) +} + +func (a *Agent) setPackLoss(p int) error { + if !a.cfg.UseRoot { + return nil + } + return netutil.SetPackLoss(p) +} + +func (a *Agent) setPartitioning(ms, rv int) error { if !a.cfg.UseRoot { return nil } - if ms == 0 { - return netutil.RemoveLatency() + return netutil.SetPartitioning(ms, rv) +} + +func (a *Agent) setLatency(ms, rv int) error { + if !a.cfg.UseRoot { + return nil } return netutil.SetLatency(ms, rv) } +func (a *Agent) resetDefaultInterface() error { + if !a.cfg.UseRoot { + return nil + } + return netutil.ResetDefaultInterface() +} + func (a *Agent) status() client.Status { return client.Status{State: a.state} } diff --git a/tools/functional-tester/etcd-agent/client/client.go b/tools/functional-tester/etcd-agent/client/client.go index 53d28d0343d..fd16405b023 100644 --- a/tools/functional-tester/etcd-agent/client/client.go +++ b/tools/functional-tester/etcd-agent/client/client.go @@ -40,9 +40,25 @@ type Agent interface { DropPort(port int) error // RecoverPort stops dropping all network packets at the given port. RecoverPort(port int) error + // SetPacketCorruption corrupts packets at a probability of p% + SetPacketCorruption(p int) error + // RemovePacketCorruption removes packet corruption. + RemovePacketCorruption() error + // SetPacketReordering reorders packets. rp% of packets (with a correlation of cp%) gets send immediately. The rest will be delayed for ms millisecond + SetPacketReordering(rp, cp, ms int) error + // RemovePacketReordering removes reordering of packets + RemovePacketReordering() error + // SetPackLoss randomly drop packet at a probability of p% + SetPacketLoss(p int) error + // RemovePacketLoss removes dropping of packets + RemovePacketLoss() error + // SetPartitioning sets a very long network delay of ms scale with random variations rv to isolate a node + SetPartitioning(ms, rv int) error + // SetPartitioning removes partition of a node + RemovePartitioning() error // SetLatency slows down network by introducing latency. SetLatency(ms, rv int) error - // RemoveLatency removes latency introduced by SetLatency. + // RemoveLatency removes latency. RemoveLatency() error // Status returns the status of etcd on the agent Status() (Status, error) @@ -99,6 +115,38 @@ func (a *agent) RecoverPort(port int) error { return a.rpcClient.Call("Agent.RPCRecoverPort", port, nil) } +func (a *agent) SetPacketCorruption(p int) error { + return a.rpcClient.Call("Agent.RPCSetPacketCorruption", []int{p}, nil) +} + +func (a *agent) RemovePacketCorruption() error { + return a.rpcClient.Call("Agent.RPCRemovePacketCorruption", struct{}{}, nil) +} + +func (a *agent) SetPacketReordering(rp, cp, ms int) error { + return a.rpcClient.Call("Agent.RPCSetPacketReordering", []int{rp, cp, ms}, nil) +} + +func (a *agent) RemovePacketReordering() error { + return a.rpcClient.Call("Agent.RPCRemovePacketCorruption", struct{}{}, nil) +} + +func (a *agent) SetPacketLoss(p int) error { + return a.rpcClient.Call("Agent.RPCSetPacketLoss", []int{p}, nil) +} + +func (a *agent) RemovePacketLoss() error { + return a.rpcClient.Call("Agent.RPCRemovePacketLoss", struct{}{}, nil) +} + +func (a *agent) SetPartitioning(ms, rv int) error { + return a.rpcClient.Call("Agent.RPCSetPartitioning", []int{ms, rv}, nil) +} + +func (a *agent) RemovePartitioning() error { + return a.rpcClient.Call("Agent.RPCRemovePartitioning", struct{}{}, nil) +} + func (a *agent) SetLatency(ms, rv int) error { return a.rpcClient.Call("Agent.RPCSetLatency", []int{ms, rv}, nil) } diff --git a/tools/functional-tester/etcd-agent/rpc.go b/tools/functional-tester/etcd-agent/rpc.go index c9cc7a3905b..ff881a372e0 100644 --- a/tools/functional-tester/etcd-agent/rpc.go +++ b/tools/functional-tester/etcd-agent/rpc.go @@ -104,6 +104,90 @@ func (a *Agent) RPCRecoverPort(port int, reply *struct{}) error { return nil } +func (a *Agent) RPCSetPacketCorruption(args []int, reply *struct{}) error { + if len(args) != 1 { + return fmt.Errorf("SetPacketCorruption needs 1 arg, got (%v)", args) + } + plog.Printf("set packet corruption at %d%%)", args[0]) + err := a.setPacketCorruption(args[0]) + if err != nil { + plog.Println("error setting packet corruption", err) + } + return nil +} + +func (a *Agent) RPCRemovePacketCorruption(args struct{}, reply *struct{}) error { + plog.Println("removing packet corruption") + err := a.resetDefaultInterface() + if err != nil { + plog.Println("error removing packet corruption") + } + return nil +} + +func (a *Agent) RPCSetPacketReordering(args []int, reply *struct{}) error { + if len(args) != 3 { + return fmt.Errorf("SetPacketReordering needs 3 args, got (%v)", args) + } + plog.Printf("SetPacketReordering reorders packets. %d%% of packets (with a correlation of %d%%) gets send immediately. The rest will be delayed for %d millisecond", args[0], args[1], args[2]) + err := a.setPacketCorruption(args[0]) + if err != nil { + plog.Println("error setting packet reordering", err) + } + return nil +} + +func (a *Agent) RPCRemovePacketReordering(args struct{}, reply *struct{}) error { + plog.Println("removing packet reordering") + err := a.resetDefaultInterface() + if err != nil { + plog.Println("error removing packet reordering") + } + return nil +} + +func (a *Agent) RPCSetPacketLoss(args []int, reply *struct{}) error { + if len(args) != 1 { + return fmt.Errorf("SetPacketLoss needs 1 arg, got (%v)", args) + } + plog.Printf("SetPackLoss randomly drop packet at %d probability", args[0]) + err := a.setPackLoss(args[0]) + if err != nil { + plog.Println("error setting packet loss", err) + } + return nil +} + +func (a *Agent) RPCRemovePacketLoss(args struct{}, reply *struct{}) error { + plog.Println("removing packet loss") + err := a.resetDefaultInterface() + if err != nil { + plog.Println("error removing packet loss") + } + return nil +} + +func (a *Agent) RPCSetPartitioning(args []int, reply *struct{}) error { + if len(args) != 1 { + return fmt.Errorf("SetPartitioning needs 2 args, got (%v)", args) + } + plog.Printf("SetPartitioning sets %dms (+/- %dms)", args[0], args[1]) + err := a.setPartitioning(args[0], args[1]) + if err != nil { + plog.Println("error setting packet loss", err) + } + return nil +} + +func (a *Agent) RPCRemovePartitioning(args struct{}, reply *struct{}) error { + plog.Println("removing packet partitioning") + err := a.resetDefaultInterface() + if err != nil { + plog.Println("error removing packet partitioning") + } + return nil +} + func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error { if len(args) != 2 { return fmt.Errorf("SetLatency needs two args, got (%v)", args) @@ -118,7 +202,7 @@ func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error { func (a *Agent) RPCRemoveLatency(args struct{}, reply *struct{}) error { plog.Println("removing latency") - err := a.setLatency(0, 0) + err := a.resetDefaultInterface() if err != nil { plog.Println("error removing latency") } diff --git a/tools/functional-tester/etcd-tester/failure_agent.go b/tools/functional-tester/etcd-tester/failure_agent.go index 5dddec5302f..4163a657abe 100644 --- a/tools/functional-tester/etcd-tester/failure_agent.go +++ b/tools/functional-tester/etcd-tester/failure_agent.go @@ -20,9 +20,15 @@ import ( ) const ( - snapshotCount = 10000 - slowNetworkLatency = 500 // 500 millisecond - randomVariation = 50 + snapshotCount = 10000 + slowNetworkLatency = 500 // 500 millisecond + partitionNetworkLatency = 30000 // 30000 millisecond + randomVariation = 50 + packetCorruptionPercentage = 25 + packetLossPercentage = 25 + packetReorderingPercentage = 25 + packetReorderingCorrelation = 50 + packetReorderingDelay = 50 // 50 millisecond // Wait more when it recovers from slow network, because network layer // needs extra time to propagate traffic control (tc command) change. @@ -97,6 +103,70 @@ func newFailureIsolateAll() failure { } } +func injectPacketCorruption(m *member) error { + if err := m.Agent.SetPacketCorruption(25); err != nil { + m.Agent.RemovePacketCorruption() + return err + } + return nil +} + +func recoverPacketCorruption(m *member) error { + if err := m.Agent.RemovePacketCorruption(); err != nil { + return err + } + time.Sleep(waitRecover) + return nil +} + +func injectPacketReordering(m *member) error { + if err := m.Agent.SetPacketReordering(packetReorderingPercentage, packetReorderingCorrelation, packetReorderingDelay); err != nil { + m.Agent.RemovePacketReordering() + return err + } + return nil +} + +func recoverPacketReordering(m *member) error { + if err := m.Agent.RemovePacketReordering(); err != nil { + return err + } + time.Sleep(waitRecover) + return nil +} + +func injectPacketLoss(m *member) error { + if err := m.Agent.SetPacketLoss(packetLossPercentage); err != nil { + m.Agent.RemovePacketLoss() + return err + } + return nil +} + +func recoverPacketLoss(m *member) error { + if err := m.Agent.RemovePacketLoss(); err != nil { + return err + } + time.Sleep(waitRecover) + return nil +} + +func injectPartitioning(m *member) error { + if err := m.Agent.SetPartitioning(partitionNetworkLatency, randomVariation); err != nil { + m.Agent.RemovePartitioning() + return err + } + return nil +} + +func recoverPartitioning(m *member) error { + if err := m.Agent.RemovePartitioning(); err != nil { + return err + } + time.Sleep(waitRecover) + return nil +} + func injectLatency(m *member) error { if err := m.Agent.SetLatency(slowNetworkLatency, randomVariation); err != nil { m.Agent.RemoveLatency() @@ -140,6 +210,118 @@ func newFailureSlowNetworkAll() failure { } } +func newFailureCorruptedNetworkOneMember() failure { + desc := fmt.Sprintf("inject packet corruption to one member's network with a probablity of %d", packetCorruptionPercentage) + return &failureOne{ + description: description(desc), + injectMember: injectPacketCorruption, + recoverMember: recoverPacketCorruption, + } +} + +func newFailureCorruptedNetworkLeader() failure { + desc := fmt.Sprintf("inject packet corruption to leader's network with a probablity of %d", packetCorruptionPercentage) + ff := failureByFunc{ + description: description(desc), + injectMember: injectPacketCorruption, + recoverMember: recoverPacketCorruption, + } + return &failureLeader{ff, 0} +} + +func newFailureCorruptedNetworkAll() failure { + desc := fmt.Sprintf("inject packet corruption to all member's network with a probablity of %d", packetCorruptionPercentage) + return &failureAll{ + description: description(desc), + injectMember: injectPacketCorruption, + recoverMember: recoverPacketCorruption, + } +} + +func newFailurePacketReorderingOneMember() failure { + desc := fmt.Sprintf("inject packet reordering to one member's network with a probablity of %d", packetCorruptionPercentage) + return &failureOne{ + description: description(desc), + injectMember: injectPacketReordering, + recoverMember: recoverPacketReordering, + } +} + +func newFailurePacketReorderingLeader() failure { + desc := fmt.Sprintf("inject packet reordering to leader's network with a probablity of %d", packetCorruptionPercentage) + ff := failureByFunc{ + description: description(desc), + injectMember: injectPacketReordering, + recoverMember: recoverPacketReordering, + } + return &failureLeader{ff, 0} +} + +func newFailurePacketReorderingAll() failure { + desc := fmt.Sprintf("inject packet reordering to all member's network with a probablity of %d", packetCorruptionPercentage) + return &failureAll{ + description: description(desc), + injectMember: injectPacketReordering, + recoverMember: recoverPacketReordering, + } +} + +func newFailurePacketLossOneMember() failure { + desc := fmt.Sprintf("inject packet loss to one member's network with a probablity of %d", packetLossPercentage) + return &failureOne{ + description: description(desc), + injectMember: injectPacketLoss, + recoverMember: recoverPacketLoss, + } +} + +func newFailurePacketLossLeader() failure { + desc := fmt.Sprintf("inject packet loss to leader's network with a probablity of %d", packetLossPercentage) + ff := failureByFunc{ + description: description(desc), + injectMember: injectPacketLoss, + recoverMember: recoverPacketLoss, + } + return &failureLeader{ff, 0} +} + +func newFailurePacketLossAll() failure { + desc := fmt.Sprintf("inject packet loss to all member's network with a probablity of %d", packetLossPercentage) + return &failureAll{ + description: description(desc), + injectMember: injectPacketLoss, + recoverMember: recoverPacketLoss, + } +} + +func newFailureNetworkPartitionOneMember() failure { + desc := fmt.Sprintf("create network partition to one member's network by adding %d ms latency", partitionNetworkLatency) + return &failureOne{ + description: description(desc), + injectMember: injectPartitioning, + recoverMember: recoverPartitioning, + } +} + +func newFailureNetworkPartitionLeader() failure { + desc := fmt.Sprintf("create network partition to one member's network by adding %d ms latency", partitionNetworkLatency) + ff := failureByFunc{ + description: description(desc), + injectMember: injectPartitioning, + recoverMember: recoverPartitioning, + } + return &failureLeader{ff, 0} +} + +func newFailureNetworkPartitionAll() failure { + desc := fmt.Sprintf("create network partition to one member's network by adding %d ms latency", partitionNetworkLatency) + return &failureAll{ + description: description(desc), + injectMember: injectPartitioning, + recoverMember: recoverPartitioning, + } +} + func newFailureNop() failure { return &failureNop{ description: "no failure", diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 18ecfcb53f1..05109e4a3d4 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -178,6 +178,18 @@ func makeFailures(types string, c *cluster) []failure { newFailureSlowNetworkOneMember(), newFailureSlowNetworkLeader(), newFailureSlowNetworkAll(), + newFailureCorruptedNetworkOneMember(), + newFailureCorruptedNetworkLeader(), + newFailureCorruptedNetworkAll(), + newFailurePacketReorderingOneMember(), + newFailurePacketReorderingLeader(), + newFailurePacketReorderingAll(), + newFailurePacketLossOneMember(), + newFailurePacketLossLeader(), + newFailurePacketLossAll(), + newFailureNetworkPartitionOneMember(), + newFailureNetworkPartitionLeader(), + newFailureNetworkPartitionAll(), } failures = append(failures, defaultFailures...)