diff --git a/changelog/18.0/18.0.0/summary.md b/changelog/18.0/18.0.0/summary.md index 561b209ac06..259388c00d5 100644 --- a/changelog/18.0/18.0.0/summary.md +++ b/changelog/18.0/18.0.0/summary.md @@ -4,6 +4,7 @@ - **[Major Changes](#major-changes)** - **[Breaking Changes](#breaking-changes)** + - [Local examples now use etcd v3 storage and API](#local-examples-etcd-v3) - **[New command line flags and behavior](#new-flag)** - [VTOrc flag `--allow-emergency-reparent`](#new-flag-toggle-ers) - [ERS sub flag `--wait-for-all-tablets`](#new-ers-subflag) @@ -26,6 +27,14 @@ ### Breaking Changes +#### Local examples now use etcd v3 storage and API +In previous releases the [local examples](https://github.com/vitessio/vitess/tree/main/examples/local) were +explicitly using etcd v2 storage (`etcd --enable-v2=true`) and API (`ETCDCTL_API=2`) mode. We have now +removed this legacy etcd usage and instead use the new (default) etcd v3 storage and API. Please see +[PR #13791](https://github.com/vitessio/vitess/pull/13791) for additional info. If you are using the local +examples in any sort of long-term non-testing capacity, then you will need to explicitly use the v2 storage +and API mode or [migrate your existing data from v2 to v3](https://etcd.io/docs/v3.5/tutorials/how-to-migrate/). + ### New command line flags and behavior #### VTOrc flag `--allow-emergency-reparent` diff --git a/examples/common/scripts/etcd-up.sh b/examples/common/scripts/etcd-up.sh index 20a16a42260..bd87fb0423a 100755 --- a/examples/common/scripts/etcd-up.sh +++ b/examples/common/scripts/etcd-up.sh @@ -19,22 +19,15 @@ source "$(dirname "${BASH_SOURCE[0]:-$0}")/../env.sh" cell=${CELL:-'test'} -export ETCDCTL_API=2 # Check that etcd is not already running curl "http://${ETCD_SERVER}" > /dev/null 2>&1 && fail "etcd is already running. Exiting." -etcd --enable-v2=true --data-dir "${VTDATAROOT}/etcd/" --listen-client-urls "http://${ETCD_SERVER}" --advertise-client-urls "http://${ETCD_SERVER}" > "${VTDATAROOT}"/tmp/etcd.out 2>&1 & +etcd --data-dir "${VTDATAROOT}/etcd/" --listen-client-urls "http://${ETCD_SERVER}" --advertise-client-urls "http://${ETCD_SERVER}" > "${VTDATAROOT}"/tmp/etcd.out 2>&1 & PID=$! echo $PID > "${VTDATAROOT}/tmp/etcd.pid" sleep 5 -echo "add /vitess/global" -etcdctl --endpoints "http://${ETCD_SERVER}" mkdir /vitess/global & - -echo "add /vitess/$cell" -etcdctl --endpoints "http://${ETCD_SERVER}" mkdir /vitess/$cell & - # And also add the CellInfo description for the cell. # If the node already exists, it's fine, means we used existing data. echo "add $cell CellInfo" diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index 7326aa57a52..8fd4bd1c74c 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -17,8 +17,10 @@ limitations under the License. package cluster import ( + "context" "encoding/json" "fmt" + "net" "net/http" "os" "os/exec" @@ -27,7 +29,10 @@ import ( "syscall" "time" + clientv3 "go.etcd.io/etcd/client/v3" + "vitess.io/vitess/go/vt/log" + vtopo "vitess.io/vitess/go/vt/topo" ) // TopoProcess is a generic handle for a running Topo service . @@ -44,6 +49,7 @@ type TopoProcess struct { VerifyURL string PeerURL string ZKPorts string + Client interface{} proc *exec.Cmd exit chan error @@ -57,10 +63,9 @@ func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) case "consul": return topo.SetupConsul(cluster) default: - // We still rely on the etcd v2 API for things like mkdir. - // If this ENV var is not set then some tests may fail with etcd 3.4+ - // where the v2 API is disabled by default in both the client and server. - os.Setenv("ETCDCTL_API", "2") + // Override any inherited ETCDCTL_API env value to + // ensure that we use the v3 API and storage. + os.Setenv("ETCDCTL_API", "3") return topo.SetupEtcd() } } @@ -77,7 +82,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) { "--initial-advertise-peer-urls", topo.PeerURL, "--listen-peer-urls", topo.PeerURL, "--initial-cluster", fmt.Sprintf("%s=%s", topo.Name, topo.PeerURL), - "--enable-v2=true", ) err = createDirectory(topo.DataDirectory, 0700) @@ -109,6 +113,14 @@ func (topo *TopoProcess) SetupEtcd() (err error) { timeout := time.Now().Add(60 * time.Second) for time.Now().Before(timeout) { if topo.IsHealthy() { + cli, cerr := clientv3.New(clientv3.Config{ + Endpoints: []string{net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port))}, + DialTimeout: 5 * time.Second, + }) + if cerr != nil { + return err + } + topo.Client = cli return } select { @@ -125,7 +137,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) { // SetupZookeeper spawns a new zookeeper topo service and initializes it with the defaults. // The service is kept running in the background until TearDown() is called. func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) (err error) { - host, err := os.Hostname() if err != nil { return @@ -171,7 +182,6 @@ type PortsInfo struct { // SetupConsul spawns a new consul service and initializes it with the defaults. // The service is kept running in the background until TearDown() is called. func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) { - topo.VerifyURL = fmt.Sprintf("http://%s:%d/v1/kv/?keys", topo.Host, topo.Port) _ = os.MkdirAll(topo.LogDirectory, os.ModePerm) @@ -247,8 +257,16 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) { return fmt.Errorf("process '%s' timed out after 60s (err: %s)", topo.Binary, <-topo.exit) } -// TearDown shutdowns the running topo service +// TearDown shutdowns the running topo service. func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error { + if topo.Client != nil { + switch cli := topo.Client.(type) { + case *clientv3.Client: + _ = cli.Close() + default: + log.Errorf("Unknown topo client type %T", cli) + } + } if topoFlavor == "zk2" { cmd := "shutdown" @@ -324,6 +342,9 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er url := topo.VerifyURL + directory payload := strings.NewReader(`{"dir":"true"}`) if command == "mkdir" { + if *topoFlavor == "etcd2" { // No need to create the empty prefix keys in v3 + return nil + } req, _ := http.NewRequest("PUT", url, payload) req.Header.Add("content-type", "application/json") resp, err := http.DefaultClient.Do(req) @@ -332,6 +353,22 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er } return err } else if command == "rmdir" { + if *topoFlavor == "etcd2" { + if topo.Client == nil { + return fmt.Errorf("etcd client is not initialized") + } + cli, ok := topo.Client.(*clientv3.Client) + if !ok { + return fmt.Errorf("etcd client is invalid") + } + ctx, cancel := context.WithTimeout(context.Background(), vtopo.RemoteOperationTimeout) + defer cancel() + _, err = cli.Delete(ctx, directory, clientv3.WithPrefix()) + if err != nil { + return err + } + return nil + } req, _ := http.NewRequest("DELETE", url+"?dir=true", payload) resp, err := http.DefaultClient.Do(req) if err == nil { @@ -366,7 +403,7 @@ func TopoProcessInstance(port int, peerPort int, hostname string, flavor string, topo.ListenClientURL = fmt.Sprintf("http://%s:%d", topo.Host, topo.Port) topo.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port)) topo.LogDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port), "logs") - topo.VerifyURL = fmt.Sprintf("http://%s:%d/v2/keys", topo.Host, topo.Port) + topo.VerifyURL = fmt.Sprintf("http://%s:%d/health", topo.Host, topo.Port) topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort) return topo } diff --git a/go/test/endtoend/clustertest/etcd_test.go b/go/test/endtoend/clustertest/etcd_test.go index 1f5e548696f..5239d960c47 100644 --- a/go/test/endtoend/clustertest/etcd_test.go +++ b/go/test/endtoend/clustertest/etcd_test.go @@ -18,15 +18,46 @@ package clustertest import ( "fmt" + "net" "testing" + "time" + + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" "vitess.io/vitess/go/test/endtoend/cluster" ) func TestEtcdServer(t *testing.T) { defer cluster.PanicHandler(t) - etcdURL := fmt.Sprintf("http://%s:%d/v2/keys", clusterInstance.Hostname, clusterInstance.TopoPort) - testURL(t, etcdURL, "generic etcd url") - testURL(t, etcdURL+"/vitess/global", "vitess global key") - testURL(t, etcdURL+"/vitess/zone1", "vitess zone1 key") + + // Confirm the basic etcd cluster health. + etcdHealthURL := fmt.Sprintf("http://%s:%d/health", clusterInstance.Hostname, clusterInstance.TopoPort) + testURL(t, etcdHealthURL, "generic etcd health url") + + // Confirm that we have a working topo server by looking for some + // expected keys. + etcdClientOptions := []clientv3.OpOption{ + clientv3.WithPrefix(), + clientv3.WithKeysOnly(), + clientv3.WithLimit(1), + } + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{net.JoinHostPort(clusterInstance.TopoProcess.Host, fmt.Sprintf("%d", clusterInstance.TopoProcess.Port))}, + DialTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer cli.Close() + keyPrefixes := []string{ + // At a minimum, this prefix confirms that we have a functioning + // global topo server with a valid cell from the test env. + fmt.Sprintf("/vitess/global/cells/%s", cell), + } + for _, keyPrefix := range keyPrefixes { + res, err := cli.Get(cli.Ctx(), keyPrefix, etcdClientOptions...) + require.NoError(t, err) + require.NotNil(t, res) + // Confirm that we have at least one key matching the prefix. + require.Greaterf(t, len(res.Kvs), 0, "no keys found matching prefix: %s", keyPrefix) + } }