Skip to content

Commit

Permalink
e2e: test alarm CORRUPT
Browse files Browse the repository at this point in the history
Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Sep 13, 2017
1 parent d6e2729 commit 6609b77
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 0 deletions.
7 changes: 7 additions & 0 deletions e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/coreos/etcd/etcdserver"
)
Expand Down Expand Up @@ -115,6 +116,7 @@ type etcdProcessClusterConfig struct {
forceNewCluster bool
initialToken string
quotaBackendBytes int64
corruptCheckTime time.Duration
noStrictReconfig bool
}

Expand Down Expand Up @@ -221,6 +223,11 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
)
}
if cfg.corruptCheckTime > 0 {
args = append(args,
"--experimental-corrupt-check-time", cfg.corruptCheckTime.String(),
)
}
if cfg.noStrictReconfig {
args = append(args, "--strict-reconfig-check=false")
}
Expand Down
104 changes: 104 additions & 0 deletions e2e/ctl_v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@ package e2e

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
)

func TestCtlV3Alarm(t *testing.T) {
// The boltdb minimum working set is six pages.
testCtl(t, alarmTest, withQuota(int64(13*os.Getpagesize())))
}

func TestCtlV3AlarmCorrupt(t *testing.T) {
testCtl(t, alarmCorruptTest, withCfg(configNoTLS), withQuorum(), withCorruptCheckTime(2*time.Second))
}

func alarmTest(cx ctlCtx) {
// test small put still works
smallbuf := strings.Repeat("a", 64)
Expand Down Expand Up @@ -99,6 +108,101 @@ func alarmTest(cx ctlCtx) {
}
}

type fakeConsistentIndex struct{ rev uint64 }

func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }

func alarmCorruptTest(cx ctlCtx) {
for i := 0; i < 10; i++ {
if err := ctlV3Put(cx, "k", "v", ""); err != nil {
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
cx.t.Fatalf("putTest ctlV3Put error (%v)", err)
}
}
}
cx.epc.procs[0].Stop()

// Corrupt member 0 by modifying backend offline.
fp := filepath.Join(cx.epc.procs[0].Config().dataDirPath, "member", "snap", "db")
be := backend.NewDefaultBackend(fp)
s := mvcc.NewStore(be, nil, &fakeConsistentIndex{13})
s.Put([]byte("abc"), []byte("def"), 0)
s.Put([]byte("xyz"), []byte("123"), 0)
s.Compact(5)
s.Commit()
s.Close()
be.Close()

eps := cx.epc.EndpointsV3()

// Wait for cluster so Puts succeed in case member 0 was the leader.
cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cli1.Close()

if _, err = cli1.Get(context.TODO(), "k"); err != nil {
cx.t.Fatal(err)
}
cli1.Put(context.TODO(), "xyz", "321")
cli1.Put(context.TODO(), "abc", "fed")

// Restart with corruption checking
cx.epc.procs[1].Stop()
cx.epc.procs[2].Stop()
cx.epc.Restart()

// wait until corruption detected
cli0, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[0]}, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cli0.Close()

sresp, err := cli1.Status(context.TODO(), eps[0])
if err != nil {
cx.t.Fatal(err)
}
id0 := sresp.Header.GetMemberId()

corrupted := false
for i := 0; i < 5; i++ {
presp, perr := cli0.Put(context.TODO(), "abc", "aaa")
if perr != nil {
if perr.Error() != rpctypes.ErrCorrupt.Error() {
cx.t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
}
corrupted = true
break
}
time.Sleep(time.Second)
}
if !corrupted {
cx.t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
}

// corrupt alarm should now be on
if err = ctlV3Alarm(cx, "list", "alarm:CORRUPT"); err != nil {
cx.t.Fatal(err)
}

// '/health' handler should return 'false'
if err = cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":false,"errors":["CORRUPT"]}`}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}

// corrupted member should fail to restart
cx.epc.procs[0].Stop()
ep := cx.epc.procs[0]
proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...))
if err != nil {
cx.t.Fatal(err)
}
defer proc.Stop()
waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdserver: corrupted %016x", id0)})
}

func ctlV3Alarm(cx ctlCtx, cmd string, as ...string) error {
cmdArgs := append(cx.PrefixArgs(), "alarm", cmd)
return spawnWithExpects(cmdArgs, as...)
Expand Down
8 changes: 8 additions & 0 deletions e2e/ctl_v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type ctlCtx struct {
t *testing.T
cfg etcdProcessClusterConfig
quotaBackendBytes int64
corruptCheckTime time.Duration
noStrictReconfig bool

epc *etcdProcessCluster
Expand Down Expand Up @@ -101,6 +102,10 @@ func withQuota(b int64) ctlOption {
return func(cx *ctlCtx) { cx.quotaBackendBytes = b }
}

func withCorruptCheckTime(d time.Duration) ctlOption {
return func(cx *ctlCtx) { cx.corruptCheckTime = d }
}

func withCompactPhysical() ctlOption {
return func(cx *ctlCtx) { cx.compactPhysical = true }
}
Expand Down Expand Up @@ -130,6 +135,9 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
if ret.quotaBackendBytes > 0 {
ret.cfg.quotaBackendBytes = ret.quotaBackendBytes
}
if ret.corruptCheckTime > 0 {
ret.cfg.corruptCheckTime = ret.corruptCheckTime
}
ret.cfg.noStrictReconfig = ret.noStrictReconfig

epc, err := newEtcdProcessCluster(&ret.cfg)
Expand Down

0 comments on commit 6609b77

Please sign in to comment.