Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

functional-test: add advance network failure cases #6918

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions pkg/netutil/isolate_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,82 @@ func RecoverPort(port int) error {
return err
}

// SetPacketCorruption corrupts packets at p%
func SetPacketCorruption(p int) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of this will be corrected by tcp checksums, for the packets that aren't, I don't see how etcd would be able to pass its checks (e.g., suppose a lease key is corrupted and when the lease checker looks for the intended key, it's gone)

if p < 0 || p > 100 {
return fmt.Errorf("packets corruption percentage must be between 0 and 100 but got %v", p)
}
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr1 := fmt.Sprintf("sudo tc qdisc add dev %s root handle 1:1 netem corrupt %d%%", ifce, p)
cmdStr2 := fmt.Sprintf("sudo tc qdisc add dev %s parent 1:1 handle 10:1 netem corrupt %d%%", ifce, p)
cmdStrs := []string{cmdStr1, cmdStr2}

for _, cmdStr := range cmdStrs {
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}

return nil

}

// SetPacketReordering reorders packets. rp% of packets (with a correlation of cp%) gets send immediately. The rest will be delayed for ms millisecond
func SetPacketReordering(rp int, cp int, ms int) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only tests the tcp stack; etcd will still see everything in order, so why have it?

ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr1 := fmt.Sprintf("sudo tc qdisc add dev %s root handle 1:1 netem delay %dms reorder %d%% %d%%", ifce, ms, rp, cp)
cmdStr2 := fmt.Sprintf("sudo tc qdisc add dev %s parent 1:1 handle 10:1 delay %dms reorder %d%% %d%%", ifce, ms, rp, cp)
cmdStrs := []string{cmdStr1, cmdStr2}

for _, cmdStr := range cmdStrs {
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}

return nil

}

// SetPackLoss randomly drop packet at p% probability
func SetPackLoss(p int) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this any different from injecting random latencies? the tcp stack will retransmit

ifce, err := GetDefaultInterface()
if err != nil {
return err
}
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem loss %d%%", ifce, p)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}

return nil
}

// SetPartitioning sets a very long delay of ms scale with random variations rv to isolate this node
func SetPartitioning(ms int, rv int) error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}

cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}

return nil
}

// SetLatency adds latency in millisecond scale with random variations.
func SetLatency(ms, rv int) error {
ifce, err := GetDefaultInterface()
Expand All @@ -64,12 +140,12 @@ func SetLatency(ms, rv int) error {
return nil
}

// RemoveLatency resets latency configurations.
func RemoveLatency() error {
func ResetDefaultInterface() error {
ifce, err := GetDefaultInterface()
if err != nil {
return err
}
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
cmdStr := fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
return err
}
10 changes: 10 additions & 0 deletions pkg/netutil/isolate_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ func DropPort(port int) error { return nil }

func RecoverPort(port int) error { return nil }

func SetPacketCorruption(cp int) error { return nil }

func SetPacketReordering(ms int, rp int, cp int) error { return nil }

func SetPackLoss(p int) error { return nil }

func SetPartitioning(ms int, rv int) error { return nil }

func SetLatency(ms, rv int) error { return nil }

func RemoveLatency() error { return nil }

func ResetDefaultInterface() error { return nil }
2 changes: 1 addition & 1 deletion tools/functional-tester/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM alpine
RUN apk update
RUN apk add -v iptables sudo
RUN apk --update add iptables bash iproute2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add bash?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably don't need.

ADD bin/etcd-agent /
ADD bin/etcd /
ADD bin/etcd-tester /
Expand Down
7 changes: 1 addition & 6 deletions tools/functional-tester/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,5 @@ tester:
- /etcd-tester
- -agent-endpoints
- "172.20.0.2:9027,172.20.0.3:9027,172.20.0.4:9027"
- -limit
- "1"
- -stress-key-count
- "1"
- -stress-key-size
- "1"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the functional-tester run on docker image mirrors the one we run using goreman?


38 changes: 35 additions & 3 deletions tools/functional-tester/etcd-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,48 @@ func (a *Agent) recoverPort(port int) error {
return netutil.RecoverPort(port)
}

func (a *Agent) setLatency(ms, rv int) error {
func (a *Agent) setPacketCorruption(cp int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPacketCorruption(cp)
}

func (a *Agent) setPacketReordering(rp, cp, ms int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPacketReordering(rp, cp, ms)
}

func (a *Agent) setPackLoss(p int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetPackLoss(p)
}

func (a *Agent) setPartitioning(ms, rv int) error {
if !a.cfg.UseRoot {
return nil
}
if ms == 0 {
return netutil.RemoveLatency()
return netutil.SetPartitioning(ms, rv)
}

func (a *Agent) setLatency(ms, rv int) error {
if !a.cfg.UseRoot {
return nil
}
return netutil.SetLatency(ms, rv)
}

func (a *Agent) resetDefaultInterface() error {
if !a.cfg.UseRoot {
return nil
}
return netutil.ResetDefaultInterface()
}

func (a *Agent) status() client.Status {
return client.Status{State: a.state}
}
Expand Down
50 changes: 49 additions & 1 deletion tools/functional-tester/etcd-agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,25 @@ type Agent interface {
DropPort(port int) error
// RecoverPort stops dropping all network packets at the given port.
RecoverPort(port int) error
// SetPacketCorruption corrupts packets at a probability of p%
SetPacketCorruption(p int) error
// RemovePacketCorruption removes packet corruption.
RemovePacketCorruption() error
// SetPacketReordering reorders packets. rp% of packets (with a correlation of cp%) gets send immediately. The rest will be delayed for ms millisecond
SetPacketReordering(rp, cp, ms int) error
// RemovePacketReordering removes reordering of packets
RemovePacketReordering() error
// SetPackLoss randomly drop packet at a probability of p%
SetPacketLoss(p int) error
// RemovePacketLoss removes dropping of packets
RemovePacketLoss() error
// SetPartitioning sets a very long network delay of ms scale with random variations rv to isolate a node
SetPartitioning(ms, rv int) error
// SetPartitioning removes partition of a node
RemovePartitioning() error
// SetLatency slows down network by introducing latency.
SetLatency(ms, rv int) error
// RemoveLatency removes latency introduced by SetLatency.
// RemoveLatency removes latency.
RemoveLatency() error
// Status returns the status of etcd on the agent
Status() (Status, error)
Expand Down Expand Up @@ -99,6 +115,38 @@ func (a *agent) RecoverPort(port int) error {
return a.rpcClient.Call("Agent.RPCRecoverPort", port, nil)
}

func (a *agent) SetPacketCorruption(p int) error {
return a.rpcClient.Call("Agent.RPCSetPacketCorruption", []int{p}, nil)
}

func (a *agent) RemovePacketCorruption() error {
return a.rpcClient.Call("Agent.RPCRemovePacketCorruption", struct{}{}, nil)
}

func (a *agent) SetPacketReordering(rp, cp, ms int) error {
return a.rpcClient.Call("Agent.RPCSetPacketReordering", []int{rp, cp, ms}, nil)
}

func (a *agent) RemovePacketReordering() error {
return a.rpcClient.Call("Agent.RPCRemovePacketCorruption", struct{}{}, nil)
}

func (a *agent) SetPacketLoss(p int) error {
return a.rpcClient.Call("Agent.RPCSetPacketLoss", []int{p}, nil)
}

func (a *agent) RemovePacketLoss() error {
return a.rpcClient.Call("Agent.RPCRemovePacketLoss", struct{}{}, nil)
}

func (a *agent) SetPartitioning(ms, rv int) error {
return a.rpcClient.Call("Agent.RPCSetPartitioning", []int{ms, rv}, nil)
}

func (a *agent) RemovePartitioning() error {
return a.rpcClient.Call("Agent.RPCRemovePartitioning", struct{}{}, nil)
}

func (a *agent) SetLatency(ms, rv int) error {
return a.rpcClient.Call("Agent.RPCSetLatency", []int{ms, rv}, nil)
}
Expand Down
86 changes: 85 additions & 1 deletion tools/functional-tester/etcd-agent/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,90 @@ func (a *Agent) RPCRecoverPort(port int, reply *struct{}) error {
return nil
}

func (a *Agent) RPCSetPacketCorruption(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPacketCorruption needs 1 arg, got (%v)", args)
}
plog.Printf("set packet corruption at %d%%)", args[0])
err := a.setPacketCorruption(args[0])
if err != nil {
plog.Println("error setting packet corruption", err)
}
return nil
}

func (a *Agent) RPCRemovePacketCorruption(args struct{}, reply *struct{}) error {
plog.Println("removing packet corruption")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet corruption")
}
return nil
}

func (a *Agent) RPCSetPacketReordering(args []int, reply *struct{}) error {
if len(args) != 3 {
return fmt.Errorf("SetPacketReordering needs 3 args, got (%v)", args)
}
plog.Printf("SetPacketReordering reorders packets. %d%% of packets (with a correlation of %d%%) gets send immediately. The rest will be delayed for %d millisecond", args[0], args[1], args[2])
err := a.setPacketCorruption(args[0])
if err != nil {
plog.Println("error setting packet reordering", err)
}
return nil
}

func (a *Agent) RPCRemovePacketReordering(args struct{}, reply *struct{}) error {
plog.Println("removing packet reordering")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet reordering")
}
return nil
}

func (a *Agent) RPCSetPacketLoss(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPacketLoss needs 1 arg, got (%v)", args)
}
plog.Printf("SetPackLoss randomly drop packet at %d probability", args[0])
err := a.setPackLoss(args[0])
if err != nil {
plog.Println("error setting packet loss", err)
}
return nil
}

func (a *Agent) RPCRemovePacketLoss(args struct{}, reply *struct{}) error {
plog.Println("removing packet loss")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet loss")
}
return nil
}

func (a *Agent) RPCSetPartitioning(args []int, reply *struct{}) error {
if len(args) != 1 {
return fmt.Errorf("SetPartitioning needs 2 args, got (%v)", args)
}
plog.Printf("SetPartitioning sets %dms (+/- %dms)", args[0], args[1])
err := a.setPartitioning(args[0], args[1])
if err != nil {
plog.Println("error setting packet loss", err)
}
return nil
}

func (a *Agent) RPCRemovePartitioning(args struct{}, reply *struct{}) error {
plog.Println("removing packet partitioning")
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing packet partitioning")
}
return nil
}

func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error {
if len(args) != 2 {
return fmt.Errorf("SetLatency needs two args, got (%v)", args)
Expand All @@ -118,7 +202,7 @@ func (a *Agent) RPCSetLatency(args []int, reply *struct{}) error {

func (a *Agent) RPCRemoveLatency(args struct{}, reply *struct{}) error {
plog.Println("removing latency")
err := a.setLatency(0, 0)
err := a.resetDefaultInterface()
if err != nil {
plog.Println("error removing latency")
}
Expand Down
Loading