Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #318 from justinsb/wait_for_leader
Browse files Browse the repository at this point in the history
e2e tests should wait for cluster readiness
  • Loading branch information
justinsb authored Apr 28, 2020
2 parents 9f13a11 + a3813d4 commit 9418d08
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 14 deletions.
4 changes: 4 additions & 0 deletions pkg/etcdclient/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type EtcdClient interface {
// CopyTo traverses every key and writes it to dest
CopyTo(ctx context.Context, dest NodeSink) (int, error)

// LeaderID returns the ID of the current leader, or "" if there is no leader
// NOTE: This is currently only used in end-to-end tests
LeaderID(ctx context.Context) (string, error)

ListMembers(ctx context.Context) ([]*EtcdProcessMember, error)
AddMember(ctx context.Context, peerURLs []string) error
RemoveMember(ctx context.Context, member *EtcdProcessMember) error
Expand Down
11 changes: 11 additions & 0 deletions pkg/etcdclient/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ func (c *V2Client) ListMembers(ctx context.Context) ([]*EtcdProcessMember, error
return members, nil
}

func (c *V2Client) LeaderID(ctx context.Context) (string, error) {
leader, err := c.members.Leader(ctx)
if err != nil {
return "", err
}
if leader == nil {
return "", nil
}
return leader.ID, nil
}

func (c *V2Client) AddMember(ctx context.Context, peerURLs []string) error {
if len(peerURLs) == 0 {
return fmt.Errorf("AddMember with empty peerURLs")
Expand Down
36 changes: 26 additions & 10 deletions pkg/etcdclient/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
)

type V3Client struct {
endpoints []string
client *etcd_client_v3.Client
kv etcd_client_v3.KV
cluster etcd_client_v3.Cluster
tlsConfig *tls.Config
endpoints []string
client *etcd_client_v3.Client
kv etcd_client_v3.KV
cluster etcd_client_v3.Cluster
maintenance etcd_client_v3.Maintenance
tlsConfig *tls.Config
}

var _ EtcdClient = &V3Client{}
Expand All @@ -46,12 +47,14 @@ func NewV3Client(endpoints []string, tlsConfig *tls.Config) (EtcdClient, error)
}

kv := etcd_client_v3.NewKV(etcdClient)
maintenance := etcd_client_v3.NewMaintenance(etcdClient)
return &V3Client{
endpoints: endpoints,
client: etcdClient,
kv: kv,
cluster: etcd_client_v3.NewCluster(etcdClient),
tlsConfig: tlsConfig,
endpoints: endpoints,
client: etcdClient,
kv: kv,
maintenance: maintenance,
cluster: etcd_client_v3.NewCluster(etcdClient),
tlsConfig: tlsConfig,
}, nil
}

Expand Down Expand Up @@ -204,6 +207,19 @@ func (c *V3Client) ListMembers(ctx context.Context) ([]*EtcdProcessMember, error
return members, nil
}

func (c *V3Client) LeaderID(ctx context.Context) (string, error) {
response, err := c.maintenance.Status(ctx, c.endpoints[0])
if err != nil {
return "", err
}
leaderID := response.Leader
if leaderID == 0 {
return "", nil
}

return strconv.FormatUint(leaderID, 10), nil
}

func (c *V3Client) AddMember(ctx context.Context, peerURLs []string) error {
_, err := c.cluster.MemberAdd(ctx, peerURLs)
return err
Expand Down
6 changes: 6 additions & 0 deletions test/integration/harness/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ func (h *TestHarness) WaitForHealthy(nodes ...*TestHarnessNode) {
}
}

func (h *TestHarness) WaitForHasLeader(nodes ...*TestHarnessNode) {
for _, node := range nodes {
node.WaitForHasLeader(10 * time.Second)
}
}

func (h *TestHarness) WaitForVersion(timeout time.Duration, expectedVersion string, nodes ...*TestHarnessNode) {
for _, n := range nodes {
h.WaitFor(timeout, func() error {
Expand Down
19 changes: 19 additions & 0 deletions test/integration/harness/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,25 @@ func (n *TestHarnessNode) WaitForHealthy(timeout time.Duration) {
})
}

func (n *TestHarnessNode) WaitForHasLeader(timeout time.Duration) {
n.TestHarness.WaitFor(timeout, func() error {
client, err := n.NewClient()
if err != nil {
return fmt.Errorf("error building etcd client: %v", err)
}
defer client.Close()

leaderID, err := client.LeaderID(context.Background())
if err != nil {
return err
}
if leaderID == "" {
return fmt.Errorf("node did not have leader")
}
return nil
})
}

type MockDNSProvider struct {
}

Expand Down
9 changes: 5 additions & 4 deletions test/integration/upgradedowngrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,18 @@ func TestUpgradeDowngrade(t *testing.T) {
{
n1.WaitForListMembers(60 * time.Second)
h.WaitForHealthy(n1, n2, n3)
h.WaitForHasLeader(n1, n2, n3)
members1, err := n1.ListMembers(ctx)
if err != nil {
t.Errorf("error doing etcd ListMembers: %v", err)
t.Errorf("error doing etcd ListMembers (before upgrade): %v", err)
} else if len(members1) != 3 {
t.Errorf("members was not as expected: %v", members1)
t.Errorf("members was not as expected (before upgrade): %v", members1)
} else {
klog.Infof("got members from #1: %v", members1)
}

if err := n1.Put(ctx, testKey, "worldv2"); err != nil {
t.Fatalf("unable to set test key: %v", err)
t.Fatalf("unable to set test key before upgrade: %v", err)
}

n1.AssertVersion(t, fromVersion)
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestUpgradeDowngrade(t *testing.T) {
}

if err := n1.Put(ctx, testKey, "worldv3"); err != nil {
t.Fatalf("unable to set test key: %v", err)
t.Fatalf("unable to set test key after upgrade: %v", err)
}

n1.AssertVersion(t, toVersion)
Expand Down

0 comments on commit 9418d08

Please sign in to comment.