Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove explicit usage of etcd v2 (api and storage) #13791

Merged
merged 10 commits into from
Aug 22, 2023
9 changes: 9 additions & 0 deletions changelog/18.0/18.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- **[Major Changes](#major-changes)**
- **[Breaking Changes](#breaking-changes)**
- [Local examples now use etcd v3 storage and API](#local-examples-etcd-v3)
- **[New command line flags and behavior](#new-flag)**
- [VTOrc flag `--allow-emergency-reparent`](#new-flag-toggle-ers)
- [ERS sub flag `--wait-for-all-tablets`](#new-ers-subflag)
Expand All @@ -26,6 +27,14 @@

### <a id="breaking-changes"/>Breaking Changes

#### <a id="local-examples-etcd-v3"/>Local examples now use etcd v3 storage and API
In previous releases the [local examples](https://github.com/vitessio/vitess/tree/main/examples/local) were
explicitly using etcd v2 storage (`etcd --enable-v2=true`) and API (`ETCDCTL_API=2`) mode. We have now
removed this legacy etcd usage and instead use the new (default) etcd v3 storage and API. Please see
[PR #13791](https://github.com/vitessio/vitess/pull/13791) for additional info. If you are using the local
examples in any sort of long-term non-testing capacity, then you will need to explicitly use the v2 storage
and API mode or [migrate your existing data from v2 to v3](https://etcd.io/docs/v3.5/tutorials/how-to-migrate/).

### <a id="new-flag"/>New command line flags and behavior

#### <a id="new-flag-toggle-ers"/>VTOrc flag `--allow-emergency-reparent`
Expand Down
9 changes: 1 addition & 8 deletions examples/common/scripts/etcd-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,15 @@
source "$(dirname "${BASH_SOURCE[0]:-$0}")/../env.sh"

cell=${CELL:-'test'}
export ETCDCTL_API=2

# Check that etcd is not already running
curl "http://${ETCD_SERVER}" > /dev/null 2>&1 && fail "etcd is already running. Exiting."

etcd --enable-v2=true --data-dir "${VTDATAROOT}/etcd/" --listen-client-urls "http://${ETCD_SERVER}" --advertise-client-urls "http://${ETCD_SERVER}" > "${VTDATAROOT}"/tmp/etcd.out 2>&1 &
etcd --data-dir "${VTDATAROOT}/etcd/" --listen-client-urls "http://${ETCD_SERVER}" --advertise-client-urls "http://${ETCD_SERVER}" > "${VTDATAROOT}"/tmp/etcd.out 2>&1 &
PID=$!
echo $PID > "${VTDATAROOT}/tmp/etcd.pid"
sleep 5

echo "add /vitess/global"
etcdctl --endpoints "http://${ETCD_SERVER}" mkdir /vitess/global &

echo "add /vitess/$cell"
etcdctl --endpoints "http://${ETCD_SERVER}" mkdir /vitess/$cell &

# And also add the CellInfo description for the cell.
# If the node already exists, it's fine, means we used existing data.
echo "add $cell CellInfo"
Expand Down
55 changes: 46 additions & 9 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package cluster

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/exec"
Expand All @@ -27,7 +29,10 @@ import (
"syscall"
"time"

clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/vt/log"
vtopo "vitess.io/vitess/go/vt/topo"
)

// TopoProcess is a generic handle for a running Topo service .
Expand All @@ -44,6 +49,7 @@ type TopoProcess struct {
VerifyURL string
PeerURL string
ZKPorts string
Client interface{}

proc *exec.Cmd
exit chan error
Expand All @@ -57,10 +63,9 @@ func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster)
case "consul":
return topo.SetupConsul(cluster)
default:
// We still rely on the etcd v2 API for things like mkdir.
// If this ENV var is not set then some tests may fail with etcd 3.4+
// where the v2 API is disabled by default in both the client and server.
os.Setenv("ETCDCTL_API", "2")
// Override any inherited ETCDCTL_API env value to
// ensure that we use the v3 API and storage.
os.Setenv("ETCDCTL_API", "3")
return topo.SetupEtcd()
}
}
Expand All @@ -77,7 +82,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
"--initial-advertise-peer-urls", topo.PeerURL,
"--listen-peer-urls", topo.PeerURL,
"--initial-cluster", fmt.Sprintf("%s=%s", topo.Name, topo.PeerURL),
"--enable-v2=true",
)

err = createDirectory(topo.DataDirectory, 0700)
Expand Down Expand Up @@ -109,6 +113,14 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
timeout := time.Now().Add(60 * time.Second)
for time.Now().Before(timeout) {
if topo.IsHealthy() {
cli, cerr := clientv3.New(clientv3.Config{
Endpoints: []string{net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port))},
DialTimeout: 5 * time.Second,
})
if cerr != nil {
return err
}
topo.Client = cli
return
}
select {
Expand All @@ -125,7 +137,6 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
// SetupZookeeper spawns a new zookeeper topo service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) (err error) {

host, err := os.Hostname()
if err != nil {
return
Expand Down Expand Up @@ -171,7 +182,6 @@ type PortsInfo struct {
// SetupConsul spawns a new consul service and initializes it with the defaults.
// The service is kept running in the background until TearDown() is called.
func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {

topo.VerifyURL = fmt.Sprintf("http://%s:%d/v1/kv/?keys", topo.Host, topo.Port)

_ = os.MkdirAll(topo.LogDirectory, os.ModePerm)
Expand Down Expand Up @@ -247,8 +257,16 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {
return fmt.Errorf("process '%s' timed out after 60s (err: %s)", topo.Binary, <-topo.exit)
}

// TearDown shutdowns the running topo service
// TearDown shutdowns the running topo service.
func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error {
if topo.Client != nil {
switch cli := topo.Client.(type) {
case *clientv3.Client:
_ = cli.Close()
default:
log.Errorf("Unknown topo client type %T", cli)
}
}

if topoFlavor == "zk2" {
cmd := "shutdown"
Expand Down Expand Up @@ -324,6 +342,9 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er
url := topo.VerifyURL + directory
payload := strings.NewReader(`{"dir":"true"}`)
if command == "mkdir" {
if *topoFlavor == "etcd2" { // No need to create the empty prefix keys in v3
return nil
}
req, _ := http.NewRequest("PUT", url, payload)
req.Header.Add("content-type", "application/json")
resp, err := http.DefaultClient.Do(req)
Expand All @@ -332,6 +353,22 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er
}
return err
} else if command == "rmdir" {
if *topoFlavor == "etcd2" {
if topo.Client == nil {
return fmt.Errorf("etcd client is not initialized")
}
cli, ok := topo.Client.(*clientv3.Client)
if !ok {
return fmt.Errorf("etcd client is invalid")
}
ctx, cancel := context.WithTimeout(context.Background(), vtopo.RemoteOperationTimeout)
defer cancel()
_, err = cli.Delete(ctx, directory, clientv3.WithPrefix())
if err != nil {
return err
}
return nil
}
mattlord marked this conversation as resolved.
Show resolved Hide resolved
req, _ := http.NewRequest("DELETE", url+"?dir=true", payload)
resp, err := http.DefaultClient.Do(req)
if err == nil {
Expand Down Expand Up @@ -366,7 +403,7 @@ func TopoProcessInstance(port int, peerPort int, hostname string, flavor string,
topo.ListenClientURL = fmt.Sprintf("http://%s:%d", topo.Host, topo.Port)
topo.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port))
topo.LogDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "topo", port), "logs")
topo.VerifyURL = fmt.Sprintf("http://%s:%d/v2/keys", topo.Host, topo.Port)
topo.VerifyURL = fmt.Sprintf("http://%s:%d/health", topo.Host, topo.Port)
topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort)
return topo
}
39 changes: 35 additions & 4 deletions go/test/endtoend/clustertest/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,46 @@ package clustertest

import (
"fmt"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

"vitess.io/vitess/go/test/endtoend/cluster"
)

func TestEtcdServer(t *testing.T) {
defer cluster.PanicHandler(t)
etcdURL := fmt.Sprintf("http://%s:%d/v2/keys", clusterInstance.Hostname, clusterInstance.TopoPort)
testURL(t, etcdURL, "generic etcd url")
testURL(t, etcdURL+"/vitess/global", "vitess global key")
testURL(t, etcdURL+"/vitess/zone1", "vitess zone1 key")

// Confirm the basic etcd cluster health.
etcdHealthURL := fmt.Sprintf("http://%s:%d/health", clusterInstance.Hostname, clusterInstance.TopoPort)
testURL(t, etcdHealthURL, "generic etcd health url")

// Confirm that we have a working topo server by looking for some
// expected keys.
etcdClientOptions := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithKeysOnly(),
clientv3.WithLimit(1),
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{net.JoinHostPort(clusterInstance.TopoProcess.Host, fmt.Sprintf("%d", clusterInstance.TopoProcess.Port))},
DialTimeout: 5 * time.Second,
})
require.NoError(t, err)
defer cli.Close()
keyPrefixes := []string{
// At a minimum, this prefix confirms that we have a functioning
// global topo server with a valid cell from the test env.
fmt.Sprintf("/vitess/global/cells/%s", cell),
}
for _, keyPrefix := range keyPrefixes {
res, err := cli.Get(cli.Ctx(), keyPrefix, etcdClientOptions...)
require.NoError(t, err)
require.NotNil(t, res)
// Confirm that we have at least one key matching the prefix.
require.Greaterf(t, len(res.Kvs), 0, "no keys found matching prefix: %s", keyPrefix)
}
}