Skip to content

Commit

Permalink
Reuse topo client in endtoend tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Aug 15, 2023
1 parent 166d0d2 commit 99d75c5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
32 changes: 24 additions & 8 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type TopoProcess struct {
VerifyURL string
PeerURL string
ZKPorts string
Client interface{}

proc *exec.Cmd
exit chan error
Expand Down Expand Up @@ -128,7 +129,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
// SetupZookeeper spawns a new zookeeper topo service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) (err error) {

host, err := os.Hostname()
if err != nil {
return
Expand Down Expand Up @@ -174,7 +174,6 @@ type PortsInfo struct {
// SetupConsul spawns a new consul service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {

topo.VerifyURL = fmt.Sprintf("http://%s:%d/v1/kv/?keys", topo.Host, topo.Port)

_ = os.MkdirAll(topo.LogDirectory, os.ModePerm)
Expand Down Expand Up @@ -237,6 +236,14 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {
timeout := time.Now().Add(60 * time.Second)
for time.Now().Before(timeout) {
if topo.IsHealthy() {
cli, cerr := clientv3.New(clientv3.Config{
Endpoints: []string{net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port))},
DialTimeout: 5 * time.Second,
})
if cerr != nil {
return err
}
topo.Client = cli
return
}
select {
Expand Down Expand Up @@ -277,6 +284,15 @@ func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoo
topo.removeTopoDirectories(Cell)
}

if topo.Client != nil {
switch cli := topo.Client.(type) {
case *clientv3.Client:
_ = cli.Close()
default:
log.Errorf("Unknown topo client type %T", cli)
}
}

// Attempt graceful shutdown with SIGTERM first
_ = topo.proc.Process.Signal(syscall.SIGTERM)

Expand Down Expand Up @@ -338,12 +354,12 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er
return err
} else if command == "rmdir" {
if *topoFlavor == "etcd2" {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port))},
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
if topo.Client == nil {
return fmt.Errorf("etcd client is not initialized")
}
cli, ok := topo.Client.(*clientv3.Client)
if !ok {
return fmt.Errorf("etcd client is invalid")
}
ctx, cancel := context.WithTimeout(context.Background(), vtopo.RemoteOperationTimeout)
defer cancel()
Expand Down
13 changes: 9 additions & 4 deletions go/test/endtoend/clustertest/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (
func TestEtcdServer(t *testing.T) {
defer cluster.PanicHandler(t)

// Confirm basic cluster health.
// Confirm the basic etcd cluster health.
etcdHealthURL := fmt.Sprintf("http://%s:%d/health", clusterInstance.Hostname, clusterInstance.TopoPort)
testURL(t, etcdHealthURL, "generic etcd health url")

// Confirm that we have the expected global keys for the
// cluster's cell.
// Confirm that we have a working topo server by looking for some
// expected keys.
etcdClientOptions := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithKeysOnly(),
Expand All @@ -48,10 +48,15 @@ func TestEtcdServer(t *testing.T) {
})
require.NoError(t, err)
defer cli.Close()
keyPrefixes := []string{fmt.Sprintf("/vitess/global/cells/%s", cell)}
keyPrefixes := []string{
// At a minimum, this prefix confirms that we have a functioning
// global topo server with a valid cell from the test env.
fmt.Sprintf("/vitess/global/cells/%s", cell),
}
for _, keyPrefix := range keyPrefixes {
res, err := cli.Get(cli.Ctx(), keyPrefix, etcdClientOptions...)
require.NoError(t, err)
require.NotNil(t, res)
// Confirm that we have at least one key matching the prefix.
require.Greaterf(t, len(res.Kvs), 0, "no keys found matching prefix: %s", keyPrefix)
}
Expand Down

0 comments on commit 99d75c5

Please sign in to comment.