diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index 3feb4c68293..630a7c30c2a 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -110,12 +110,12 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, member e2e.Et } t.Run("etcdctl", func(t *testing.T) { t.Run("v2", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true) + etcdctl := e2e.NewEtcdctl([]string{httpEndpoint}, connType, false, true) err := etcdctl.Set("a", "1") assert.NoError(t, err) }) t.Run("v3", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false) + etcdctl := e2e.NewEtcdctl([]string{grpcEndpoint}, connType, false, false) err := etcdctl.Put("a", "1") assert.NoError(t, err) }) diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index b63afee2c27..5f582af2029 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -128,7 +128,7 @@ func TestInPlaceRecovery(t *testing.T) { //Put some data into the old cluster, so that after recovering from a blank db, the hash diverges. t.Log("putting 10 keys...") - oldCc := NewEtcdctl(epcOld.EndpointsV3(), e2e.ClientNonTLS, false, false) + oldCc := e2e.NewEtcdctl(epcOld.EndpointsV3(), e2e.ClientNonTLS, false, false) for i := 0; i < 10; i++ { err := oldCc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i)) assert.NoError(t, err, "error on put") @@ -154,7 +154,7 @@ func TestInPlaceRecovery(t *testing.T) { } }) - newCc := NewEtcdctl(epcNew.EndpointsV3(), e2e.ClientNonTLS, false, false) + newCc := e2e.NewEtcdctl(epcNew.EndpointsV3(), e2e.ClientNonTLS, false, false) assert.NoError(t, err) wg := sync.WaitGroup{} @@ -211,7 +211,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { } }) - cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false) + cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false) for i := 0; i < 10; i++ { err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i)) @@ -252,7 +252,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { } }) - cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false) + cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false) for i := 0; i < 10; i++ { err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i)) @@ -302,7 +302,7 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) { // Put 200 identical keys to the cluster, so that the compaction will drop some stale values. // We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it. t.Log("putting 200 values to the identical key...") - cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false) + cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false) for i := 0; i < 200; i++ { err = cc.Put("key", fmt.Sprint(i)) diff --git a/tests/e2e/ctl_v3_member_no_proxy_test.go b/tests/e2e/ctl_v3_member_no_proxy_test.go index 3a3f6b2d9a5..cfb42b6d26b 100644 --- a/tests/e2e/ctl_v3_member_no_proxy_test.go +++ b/tests/e2e/ctl_v3_member_no_proxy_test.go @@ -49,7 +49,7 @@ func TestMemberReplace(t *testing.T) { for i := 1; i < len(epc.Procs); i++ { endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...) } - cc := NewEtcdctl(endpoints, e2e.ClientNonTLS, false, false) + cc := e2e.NewEtcdctl(endpoints, e2e.ClientNonTLS, false, false) memberID, found, err := getMemberIdByName(ctx, cc, memberName) require.NoError(t, err) diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go index 89c13d2b044..676b59cec3b 100644 --- a/tests/e2e/http_health_check_test.go +++ b/tests/e2e/http_health_check_test.go @@ -362,7 +362,7 @@ func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expec func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { buf := strings.Repeat("b", os.Getpagesize()) - etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) + etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) for { if err := etcdctl.Put("foo", buf); err != nil { if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") { @@ -377,7 +377,7 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl // the following proposal will be blocked at applying stage // because when apply index < committed index, linearizable read would time out. require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", fmt.Sprintf(`sleep("%s")`, duration))) - etcdctl := NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false) + etcdctl := e2e.NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false) etcdctl.Put("foo", "bar") } @@ -391,12 +391,12 @@ func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSaveWaitWalSync", fmt.Sprintf(`sleep("%s")`, duration))) - etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) + etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) etcdctl.Put("foo", "bar") } func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { - etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) + etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) _, err := etcdctl.UserAdd("root", "root") require.NoError(t, err) @@ -409,7 +409,7 @@ func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus } func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { - etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) + etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) for i := 0; i < 10; i++ { require.NoError(t, etcdctl.Put("foo", "bar")) } diff --git a/tests/e2e/utils.go b/tests/e2e/utils.go index a88fb06135e..d5cf69b5465 100644 --- a/tests/e2e/utils.go +++ b/tests/e2e/utils.go @@ -116,7 +116,7 @@ func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error return g.Wait() } -func getMemberIdByName(ctx context.Context, c *Etcdctl, name string) (id uint64, found bool, err error) { +func getMemberIdByName(ctx context.Context, c *e2e.Etcdctl, name string) (id uint64, found bool, err error) { resp, err := c.MemberList() if err != nil { return 0, false, err diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 649c9f7051e..126a6ee597e 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "net/url" "os" @@ -576,3 +577,71 @@ func (epc *EtcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) { } return ret } + +// WaitLeader returns index of the member in c.Members() that is leader +// or fails the test (if not established in 30s). +func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + return epc.WaitMembersForLeader(ctx, t, epc.Procs) +} + +// WaitMembersForLeader waits until given members agree on the same leader, +// and returns its 'index' in the 'membs' list +func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess) int { + cc := NewEtcdctl(epc.EndpointsV3(), epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2) + + // ensure leader is up via linearizable get + for { + select { + case <-ctx.Done(): + t.Fatal("WaitMembersForLeader timeout") + default: + } + _, err := cc.Get("0") + if err == nil || strings.Contains(err.Error(), "Key not found") { + break + } + t.Logf("WaitMembersForLeader Get err: %v", err) + } + + leaders := make(map[uint64]struct{}) + members := make(map[uint64]int) + for { + select { + case <-ctx.Done(): + t.Fatal("WaitMembersForLeader timeout") + default: + } + for i := range membs { + resp, err := membs[i].Etcdctl(epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2).Status() + if err != nil { + if strings.Contains(err.Error(), "connection refused") { + // if member[i] has stopped + continue + } else { + t.Fatal(err) + } + } + members[resp[0].Header.MemberId] = i + leaders[resp[0].Leader] = struct{}{} + } + // members agree on the same leader + if len(leaders) == 1 { + break + } + leaders = make(map[uint64]struct{}) + members = make(map[uint64]int) + // From main branch 10 * config.TickDuration (10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) + } + for l := range leaders { + if index, ok := members[l]; ok { + t.Logf("members agree on a leader, members:%v , leader:%v", members, l) + return index + } + t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l) + } + t.Fatal("impossible path of execution") + return -1 +} diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index c3eaa188d6a..a741998c541 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -58,6 +58,8 @@ type EtcdProcess interface { PeerProxy() proxy.Server Failpoints() *BinaryFailpoints IsRunning() bool + + Etcdctl(connType ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl } type LogsExpect interface { @@ -249,6 +251,10 @@ func (ep *EtcdServerProcess) IsRunning() bool { return false } +func (ep *EtcdServerProcess) Etcdctl(connType ClientConnType, isAutoTLS, v2 bool) *Etcdctl { + return NewEtcdctl(ep.EndpointsV3(), connType, isAutoTLS, v2) +} + type BinaryFailpoints struct { member EtcdProcess availableCache map[string]string diff --git a/tests/e2e/etcdctl.go b/tests/framework/e2e/etcdctl.go similarity index 83% rename from tests/e2e/etcdctl.go rename to tests/framework/e2e/etcdctl.go index cf64419bed8..010b72b6456 100644 --- a/tests/e2e/etcdctl.go +++ b/tests/framework/e2e/etcdctl.go @@ -20,18 +20,17 @@ import ( "strings" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/integration" ) type Etcdctl struct { - connType e2e.ClientConnType + connType ClientConnType isAutoTLS bool endpoints []string v2 bool } -func NewEtcdctl(endpoints []string, connType e2e.ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl { +func NewEtcdctl(endpoints []string, connType ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl { return &Etcdctl{ endpoints: endpoints, connType: connType, @@ -52,7 +51,7 @@ func (ctl *Etcdctl) Put(key, value string) error { } args := ctl.cmdArgs() args = append(args, "put", key, value) - return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK") + return SpawnWithExpectWithEnv(args, ctl.env(), "OK") } func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error { @@ -61,7 +60,7 @@ func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error { } args := ctl.cmdArgs() args = append(args, "--user", fmt.Sprintf("%s:%s", username, password), "put", key, value) - return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK") + return SpawnWithExpectWithEnv(args, ctl.env(), "OK") } func (ctl *Etcdctl) Set(key, value string) error { @@ -70,7 +69,7 @@ func (ctl *Etcdctl) Set(key, value string) error { } args := ctl.cmdArgs() args = append(args, "set", key, value) - lines, err := e2e.RunUtilCompletion(args, ctl.env()) + lines, err := RunUtilCompletion(args, ctl.env()) if err != nil { return err } @@ -83,7 +82,7 @@ func (ctl *Etcdctl) Set(key, value string) error { func (ctl *Etcdctl) AuthEnable() error { args := ctl.cmdArgs("auth", "enable") - return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled") + return SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled") } func (ctl *Etcdctl) UserGrantRole(user string, role string) (*clientv3.AuthUserGrantRoleResponse, error) { @@ -148,12 +147,28 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) { panic("Unsupported method for v2") } args := ctl.cmdArgs("compact", fmt.Sprint(rev)) - return nil, e2e.SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev)) + return nil, SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev)) +} + +func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) { + var epStatus []*struct { + Endpoint string + Status *clientv3.StatusResponse + } + err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status") + if err != nil { + return nil, err + } + resp := make([]*clientv3.StatusResponse, len(epStatus)) + for i, e := range epStatus { + resp[i] = e.Status + } + return resp, err } func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error { args = append(args, "-w", "json") - cmd, err := e2e.SpawnCmd(append(ctl.cmdArgs(), args...), ctl.env()) + cmd, err := SpawnCmd(append(ctl.cmdArgs(), args...), ctl.env()) if err != nil { return err } @@ -165,7 +180,7 @@ func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error { } func (ctl *Etcdctl) cmdArgs(args ...string) []string { - cmdArgs := []string{e2e.CtlBinPath} + cmdArgs := []string{CtlBinPath} for k, v := range ctl.flags() { cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v)) } @@ -176,13 +191,13 @@ func (ctl *Etcdctl) flags() map[string]string { fmap := make(map[string]string) if ctl.v2 { fmap["no-sync"] = "true" - if ctl.connType == e2e.ClientTLS { + if ctl.connType == ClientTLS { fmap["ca-file"] = integration.TestTLSInfo.TrustedCAFile fmap["cert-file"] = integration.TestTLSInfo.CertFile fmap["key-file"] = integration.TestTLSInfo.KeyFile } } else { - if ctl.connType == e2e.ClientTLS { + if ctl.connType == ClientTLS { if ctl.isAutoTLS { fmap["insecure-transport"] = "false" fmap["insecure-skip-tls-verify"] = "true"