Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove explicit usage of etcd v2 (api and storage) #13791

Merged
merged 10 commits into from
Aug 22, 2023
9 changes: 1 addition & 8 deletions examples/common/scripts/etcd-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,15 @@
source "$(dirname "${BASH_SOURCE[0]:-$0}")/../env.sh"

cell=${CELL:-'test'}
export ETCDCTL_API=2

# Check that etcd is not already running
curl "http://${ETCD_SERVER}" > /dev/null 2>&1 && fail "etcd is already running. Exiting."

etcd --enable-v2=true --data-dir "${VTDATAROOT}/etcd/" --listen-client-urls "http://${ETCD_SERVER}" --advertise-client-urls "http://${ETCD_SERVER}" > "${VTDATAROOT}"/tmp/etcd.out 2>&1 &
etcd --data-dir "${VTDATAROOT}/etcd/" --listen-client-urls "http://${ETCD_SERVER}" --advertise-client-urls "http://${ETCD_SERVER}" > "${VTDATAROOT}"/tmp/etcd.out 2>&1 &
PID=$!
echo $PID > "${VTDATAROOT}/tmp/etcd.pid"
sleep 5

echo "add /vitess/global"
etcdctl --endpoints "http://${ETCD_SERVER}" mkdir /vitess/global &

echo "add /vitess/$cell"
etcdctl --endpoints "http://${ETCD_SERVER}" mkdir /vitess/$cell &

# And also add the CellInfo description for the cell.
# If the node already exists, it's fine, means we used existing data.
echo "add $cell CellInfo"
Expand Down
55 changes: 46 additions & 9 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package cluster

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/exec"
Expand All @@ -27,7 +29,10 @@ import (
"syscall"
"time"

clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/vt/log"
vtopo "vitess.io/vitess/go/vt/topo"
)

// TopoProcess is a generic handle for a running Topo service .
Expand All @@ -44,6 +49,7 @@ type TopoProcess struct {
VerifyURL string
PeerURL string
ZKPorts string
Client interface{}

proc *exec.Cmd
exit chan error
Expand All @@ -57,10 +63,9 @@ func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster)
case "consul":
return topo.SetupConsul(cluster)
default:
// We still rely on the etcd v2 API for things like mkdir.
// If this ENV var is not set then some tests may fail with etcd 3.4+
// where the v2 API is disabled by default in both the client and server.
os.Setenv("ETCDCTL_API", "2")
// Override any inherited ETCDCTL_API env value to
// ensure that we use the v3 API and storage.
os.Setenv("ETCDCTL_API", "3")
return topo.SetupEtcd()
}
}
Expand All @@ -77,7 +82,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
"--initial-advertise-peer-urls", topo.PeerURL,
"--listen-peer-urls", topo.PeerURL,
"--initial-cluster", fmt.Sprintf("%s=%s", topo.Name, topo.PeerURL),
"--enable-v2=true",
)

err = createDirectory(topo.DataDirectory, 0700)
Expand Down Expand Up @@ -125,7 +129,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
// SetupZookeeper spawns a new zookeeper topo service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) (err error) {

host, err := os.Hostname()
if err != nil {
return
Expand Down Expand Up @@ -171,7 +174,6 @@ type PortsInfo struct {
// SetupConsul spawns a new consul service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {

topo.VerifyURL = fmt.Sprintf("http://%s:%d/v1/kv/?keys", topo.Host, topo.Port)

_ = os.MkdirAll(topo.LogDirectory, os.ModePerm)
Expand Down Expand Up @@ -234,6 +236,14 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {
timeout := time.Now().Add(60 * time.Second)
for time.Now().Before(timeout) {
if topo.IsHealthy() {
cli, cerr := clientv3.New(clientv3.Config{
Endpoints: []string{net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port))},
DialTimeout: 5 * time.Second,
})
if cerr != nil {
return err
}
topo.Client = cli
return
}
select {
Expand All @@ -249,7 +259,6 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {

// TearDown shutdowns the running topo service
func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error {

if topoFlavor == "zk2" {
cmd := "shutdown"
if keepdata {
Expand All @@ -275,6 +284,15 @@ func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoo
topo.removeTopoDirectories(Cell)
}

if topo.Client != nil {
switch cli := topo.Client.(type) {
case *clientv3.Client:
_ = cli.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should Close() be a deffered function earlier on?

Copy link
Contributor Author

@mattlord mattlord Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I was creating/opening a new Client every time in the rmdir function. I then switched to the current method of creating/opening the client when we start the topo server and closing it when we shutdown the topo server. I'm actually not sure which is better or preferable...

But I could could certainly move this code to a defer that we put in place earlier in the Teardown if that's what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too important since this is just a testing code.

default:
log.Errorf("Unknown topo client type %T", cli)
}
}

// Attempt graceful shutdown with SIGTERM first
_ = topo.proc.Process.Signal(syscall.SIGTERM)

Expand Down Expand Up @@ -324,6 +342,9 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er
url := topo.VerifyURL + directory
payload := strings.NewReader(`{"dir":"true"}`)
if command == "mkdir" {
if *topoFlavor == "etcd2" { // No need to create the empty prefix keys in v3
return nil
}
req, _ := http.NewRequest("PUT", url, payload)
req.Header.Add("content-type", "application/json")
resp, err := http.DefaultClient.Do(req)
Expand All @@ -332,6 +353,22 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er
}
return err
} else if command == "rmdir" {
if *topoFlavor == "etcd2" {
if topo.Client == nil {
return fmt.Errorf("etcd client is not initialized")
}
cli, ok := topo.Client.(*clientv3.Client)
if !ok {
return fmt.Errorf("etcd client is invalid")
}
ctx, cancel := context.WithTimeout(context.Background(), vtopo.RemoteOperationTimeout)
defer cancel()
_, err = cli.Delete(ctx, directory, clientv3.WithPrefix())
if err != nil {
return err
}
return nil
}
mattlord marked this conversation as resolved.
Show resolved Hide resolved
req, _ := http.NewRequest("DELETE", url+"?dir=true", payload)
resp, err := http.DefaultClient.Do(req)
if err == nil {
Expand Down Expand Up @@ -366,7 +403,7 @@ func TopoProcessInstance(port int, peerPort int, hostname string, flavor string,
topo.ListenClientURL = fmt.Sprintf("http://%s:%d", topo.Host, topo.Port)
topo.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port))
topo.LogDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port), "logs")
topo.VerifyURL = fmt.Sprintf("http://%s:%d/v2/keys", topo.Host, topo.Port)
topo.VerifyURL = fmt.Sprintf("http://%s:%d/health", topo.Host, topo.Port)
topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort)
return topo
}
39 changes: 35 additions & 4 deletions go/test/endtoend/clustertest/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,46 @@ package clustertest

import (
"fmt"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/test/endtoend/cluster"
)

func TestEtcdServer(t *testing.T) {
defer cluster.PanicHandler(t)
etcdURL := fmt.Sprintf("http://%s:%d/v2/keys", clusterInstance.Hostname, clusterInstance.TopoPort)
testURL(t, etcdURL, "generic etcd url")
testURL(t, etcdURL+"/vitess/global", "vitess global key")
testURL(t, etcdURL+"/vitess/zone1", "vitess zone1 key")

// Confirm the basic etcd cluster health.
etcdHealthURL := fmt.Sprintf("http://%s:%d/health", clusterInstance.Hostname, clusterInstance.TopoPort)
testURL(t, etcdHealthURL, "generic etcd health url")

// Confirm that we have a working topo server by looking for some
// expected keys.
etcdClientOptions := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithKeysOnly(),
clientv3.WithLimit(1),
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{net.JoinHostPort(clusterInstance.TopoProcess.Host, fmt.Sprintf("%d", clusterInstance.TopoProcess.Port))},
DialTimeout: 5 * time.Second,
})
require.NoError(t, err)
defer cli.Close()
keyPrefixes := []string{
// At a minimum, this prefix confirms that we have a functioning
// global topo server with a valid cell from the test env.
fmt.Sprintf("/vitess/global/cells/%s", cell),
}
for _, keyPrefix := range keyPrefixes {
res, err := cli.Get(cli.Ctx(), keyPrefix, etcdClientOptions...)
require.NoError(t, err)
require.NotNil(t, res)
// Confirm that we have at least one key matching the prefix.
require.Greaterf(t, len(res.Kvs), 0, "no keys found matching prefix: %s", keyPrefix)
}
}