Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] backport e2e WaitLeader #17381

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/e2e/cmux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, member e2e.Et
}
t.Run("etcdctl", func(t *testing.T) {
t.Run("v2", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true)
etcdctl := e2e.NewEtcdctl([]string{httpEndpoint}, connType, false, true)
err := etcdctl.Set("a", "1")
assert.NoError(t, err)
})
t.Run("v3", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
etcdctl := e2e.NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
err := etcdctl.Put("a", "1")
assert.NoError(t, err)
})
Expand Down
10 changes: 5 additions & 5 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestInPlaceRecovery(t *testing.T) {
//Put some data into the old cluster, so that after recovering from a blank db, the hash diverges.
t.Log("putting 10 keys...")

oldCc := NewEtcdctl(epcOld.EndpointsV3(), e2e.ClientNonTLS, false, false)
oldCc := e2e.NewEtcdctl(epcOld.EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 10; i++ {
err := oldCc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
Expand All @@ -154,7 +154,7 @@ func TestInPlaceRecovery(t *testing.T) {
}
})

newCc := NewEtcdctl(epcNew.EndpointsV3(), e2e.ClientNonTLS, false, false)
newCc := e2e.NewEtcdctl(epcNew.EndpointsV3(), e2e.ClientNonTLS, false, false)
assert.NoError(t, err)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
}
})

cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)

for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
}
})

cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)

for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
// Put 200 identical keys to the cluster, so that the compaction will drop some stale values.
// We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it.
t.Log("putting 200 values to the identical key...")
cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)

for i := 0; i < 200; i++ {
err = cc.Put("key", fmt.Sprint(i))
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_member_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestMemberReplace(t *testing.T) {
for i := 1; i < len(epc.Procs); i++ {
endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...)
}
cc := NewEtcdctl(endpoints, e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(endpoints, e2e.ClientNonTLS, false, false)

memberID, found, err := getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
Expand Down
10 changes: 5 additions & 5 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expec

func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
buf := strings.Repeat("b", os.Getpagesize())
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
for {
if err := etcdctl.Put("foo", buf); err != nil {
if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") {
Expand All @@ -377,7 +377,7 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl
// the following proposal will be blocked at applying stage
// because when apply index < committed index, linearizable read would time out.
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", fmt.Sprintf(`sleep("%s")`, duration)))
etcdctl := NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl.Put("foo", "bar")
}

Expand All @@ -391,12 +391,12 @@ func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _

func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSaveWaitWalSync", fmt.Sprintf(`sleep("%s")`, duration)))
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl.Put("foo", "bar")
}

func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)

_, err := etcdctl.UserAdd("root", "root")
require.NoError(t, err)
Expand All @@ -409,7 +409,7 @@ func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus
}

func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 10; i++ {
require.NoError(t, etcdctl.Put("foo", "bar"))
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error
return g.Wait()
}

func getMemberIdByName(ctx context.Context, c *Etcdctl, name string) (id uint64, found bool, err error) {
func getMemberIdByName(ctx context.Context, c *e2e.Etcdctl, name string) (id uint64, found bool, err error) {
resp, err := c.MemberList()
if err != nil {
return 0, false, err
Expand Down
69 changes: 69 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package e2e

import (
"context"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -576,3 +577,71 @@ func (epc *EtcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
}
return ret
}

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, epc.Procs)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess) int {
cc := NewEtcdctl(epc.EndpointsV3(), epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2)

// ensure leader is up via linearizable get
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get("0")
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
t.Logf("WaitMembersForLeader Get err: %v", err)
}

leaders := make(map[uint64]struct{})
members := make(map[uint64]int)
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
for i := range membs {
resp, err := membs[i].Etcdctl(epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2).Status()
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
continue
} else {
t.Fatal(err)
}
}
members[resp[0].Header.MemberId] = i
leaders[resp[0].Leader] = struct{}{}
}
// members agree on the same leader
if len(leaders) == 1 {
break
}
leaders = make(map[uint64]struct{})
members = make(map[uint64]int)
// From main branch 10 * config.TickDuration (10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
for l := range leaders {
if index, ok := members[l]; ok {
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
return index
}
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
}
t.Fatal("impossible path of execution")
return -1
}
6 changes: 6 additions & 0 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type EtcdProcess interface {
PeerProxy() proxy.Server
Failpoints() *BinaryFailpoints
IsRunning() bool

Etcdctl(connType ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl
}

type LogsExpect interface {
Expand Down Expand Up @@ -249,6 +251,10 @@ func (ep *EtcdServerProcess) IsRunning() bool {
return false
}

func (ep *EtcdServerProcess) Etcdctl(connType ClientConnType, isAutoTLS, v2 bool) *Etcdctl {
return NewEtcdctl(ep.EndpointsV3(), connType, isAutoTLS, v2)
}

type BinaryFailpoints struct {
member EtcdProcess
availableCache map[string]string
Expand Down
39 changes: 27 additions & 12 deletions tests/e2e/etcdctl.go → tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ import (
"strings"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/integration"
)

type Etcdctl struct {
connType e2e.ClientConnType
connType ClientConnType
isAutoTLS bool
endpoints []string
v2 bool
}

func NewEtcdctl(endpoints []string, connType e2e.ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl {
func NewEtcdctl(endpoints []string, connType ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl {
return &Etcdctl{
endpoints: endpoints,
connType: connType,
Expand All @@ -52,7 +51,7 @@ func (ctl *Etcdctl) Put(key, value string) error {
}
args := ctl.cmdArgs()
args = append(args, "put", key, value)
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK")
return SpawnWithExpectWithEnv(args, ctl.env(), "OK")
}

func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error {
Expand All @@ -61,7 +60,7 @@ func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error {
}
args := ctl.cmdArgs()
args = append(args, "--user", fmt.Sprintf("%s:%s", username, password), "put", key, value)
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK")
return SpawnWithExpectWithEnv(args, ctl.env(), "OK")
}

func (ctl *Etcdctl) Set(key, value string) error {
Expand All @@ -70,7 +69,7 @@ func (ctl *Etcdctl) Set(key, value string) error {
}
args := ctl.cmdArgs()
args = append(args, "set", key, value)
lines, err := e2e.RunUtilCompletion(args, ctl.env())
lines, err := RunUtilCompletion(args, ctl.env())
if err != nil {
return err
}
Expand All @@ -83,7 +82,7 @@ func (ctl *Etcdctl) Set(key, value string) error {

func (ctl *Etcdctl) AuthEnable() error {
args := ctl.cmdArgs("auth", "enable")
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled")
return SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled")
}

func (ctl *Etcdctl) UserGrantRole(user string, role string) (*clientv3.AuthUserGrantRoleResponse, error) {
Expand Down Expand Up @@ -148,12 +147,28 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
panic("Unsupported method for v2")
}
args := ctl.cmdArgs("compact", fmt.Sprint(rev))
return nil, e2e.SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
return nil, SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
}

func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Status *clientv3.StatusResponse
}
err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status")
if err != nil {
return nil, err
}
resp := make([]*clientv3.StatusResponse, len(epStatus))
for i, e := range epStatus {
resp[i] = e.Status
}
return resp, err
}

func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error {
args = append(args, "-w", "json")
cmd, err := e2e.SpawnCmd(append(ctl.cmdArgs(), args...), ctl.env())
cmd, err := SpawnCmd(append(ctl.cmdArgs(), args...), ctl.env())
if err != nil {
return err
}
Expand All @@ -165,7 +180,7 @@ func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error {
}

func (ctl *Etcdctl) cmdArgs(args ...string) []string {
cmdArgs := []string{e2e.CtlBinPath}
cmdArgs := []string{CtlBinPath}
for k, v := range ctl.flags() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}
Expand All @@ -176,13 +191,13 @@ func (ctl *Etcdctl) flags() map[string]string {
fmap := make(map[string]string)
if ctl.v2 {
fmap["no-sync"] = "true"
if ctl.connType == e2e.ClientTLS {
if ctl.connType == ClientTLS {
fmap["ca-file"] = integration.TestTLSInfo.TrustedCAFile
fmap["cert-file"] = integration.TestTLSInfo.CertFile
fmap["key-file"] = integration.TestTLSInfo.KeyFile
}
} else {
if ctl.connType == e2e.ClientTLS {
if ctl.connType == ClientTLS {
if ctl.isAutoTLS {
fmap["insecure-transport"] = "false"
fmap["insecure-skip-tls-verify"] = "true"
Expand Down
Loading