Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to run vtcombo and vttest local cluster with real topo server #9176

Merged
merged 4 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ var (

mysqlPort = flag.Int("mysql_port", 3306, "mysql port")

externalTopoServer = flag.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+
"If true, vtcombo will use the flags defined in topo/server.go to open topo server")

ts *topo.Server
resilientServer *srvtopo.ResilientServer
)
Expand Down Expand Up @@ -135,8 +138,14 @@ func main() {
flag.Set("log_dir", "$VTDATAROOT/tmp")
}

// Create topo server. We use a 'memorytopo' implementation.
ts = memorytopo.NewServer(tpb.Cells...)
if *externalTopoServer {
// Open topo server based on the command line flags defined at topo/server.go
zhongr3n marked this conversation as resolved.
Show resolved Hide resolved
// do not create cell info as it should be done by whoever sets up the external topo server
ts = topo.Open()
} else {
// Create topo server. We use a 'memorytopo' implementation.
ts = memorytopo.NewServer(tpb.Cells...)
}
servenv.Init()
tabletenv.Init()

Expand Down
5 changes: 5 additions & 0 deletions go/cmd/vttestserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func init() {
flag.StringVar(&config.ForeignKeyMode, "foreign_key_mode", "allow", "This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow")
flag.BoolVar(&config.EnableOnlineDDL, "enable_online_ddl", true, "Allow users to submit, review and control Online DDL")
flag.BoolVar(&config.EnableDirectDDL, "enable_direct_ddl", true, "Allow users to submit direct DDL statements")

// flags for using an actual topo implementation for vtcombo instead of in-memory topo. useful for test setup where an external topo server is shared across multiple vtcombo processes or other components
flag.StringVar(&config.ExternalTopoImplementation, "external_topo_implementation", "", "the topology implementation to use for vtcombo process")
flag.StringVar(&config.ExternalTopoGlobalServerAddress, "external_topo_global_server_address", "", "the address of the global topology server for vtcombo process")
flag.StringVar(&config.ExternalTopoGlobalRoot, "external_topo_global_root", "", "the path of the global topology data in the global topology server for vtcombo process")
}

func (t *topoFlags) buildTopology() (*vttestpb.VTTestTopology, error) {
Expand Down
130 changes: 102 additions & 28 deletions go/cmd/vttestserver/vttestserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import (
"io"
"math/rand"
"os"
"os/exec"
"path"
"strings"
"testing"
"time"

"github.com/hashicorp/consul/api"
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/tlstest"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -189,38 +192,33 @@ func TestCanVtGateExecute(t *testing.T) {
assert.NoError(t, err)
defer cluster.TearDown()

client, err := vtctlclient.New(fmt.Sprintf("localhost:%v", cluster.GrpcPort()))
assert.NoError(t, err)
defer client.Close()
stream, err := client.ExecuteVtctlCommand(
context.Background(),
[]string{
"VtGateExecute",
"-server",
fmt.Sprintf("localhost:%v", cluster.GrpcPort()),
"select 'success';",
},
30*time.Second,
)
assert.NoError(t, err)
assertVtGateExecute(t, cluster)
}

var b strings.Builder
b.Grow(1024)
func TestExternalTopoServerConsul(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

Out:
for {
e, err := stream.Recv()
switch err {
case nil:
b.WriteString(e.Value)
case io.EOF:
break Out
default:
assert.FailNow(t, err.Error())
// Start a single consul in the background.
cmd, serverAddr := startConsul(t)
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
log.Errorf("cmd process kill has an error: %v", err)
}
}
// Alerts command did not run successful
if err := cmd.Wait(); err != nil {
log.Errorf("cmd process wait has an error: %v", err)
}
}()

assert.Contains(t, b.String(), "success")
cluster, err := startCluster("-external_topo_implementation=consul",
fmt.Sprintf("-external_topo_global_server_address=%s", serverAddr), "-external_topo_global_root=consul_test/global")
assert.NoError(t, err)
defer cluster.TearDown()

assertVtGateExecute(t, cluster)
}

func TestMtlsAuth(t *testing.T) {
Expand Down Expand Up @@ -387,3 +385,79 @@ func randomPort() int {
v := rand.Int31n(20000)
return int(v + 10000)
}

func assertVtGateExecute(t *testing.T, cluster vttest.LocalCluster) {
client, err := vtctlclient.New(fmt.Sprintf("localhost:%v", cluster.GrpcPort()))
assert.NoError(t, err)
defer client.Close()
stream, err := client.ExecuteVtctlCommand(
context.Background(),
[]string{
"VtGateExecute",
"-server",
fmt.Sprintf("localhost:%v", cluster.GrpcPort()),
"select 'success';",
},
30*time.Second,
)
assert.NoError(t, err)

var b strings.Builder
b.Grow(1024)

Out:
for {
e, err := stream.Recv()
switch err {
case nil:
b.WriteString(e.Value)
case io.EOF:
break Out
default:
assert.FailNow(t, err.Error())
}
}

assert.Contains(t, b.String(), "success")
}

// startConsul starts a consul subprocess, and waits for it to be ready.
// Returns the exec.Cmd forked, and the server address to RPC-connect to.
func startConsul(t *testing.T) (*exec.Cmd, string) {
// pick a random port to make sure things work with non-default port
port := randomPort()

cmd := exec.Command("consul",
"agent",
"-dev",
"-http-port", fmt.Sprintf("%d", port))
err := cmd.Start()
if err != nil {
t.Fatalf("failed to start consul: %v", err)
}

// Create a client to connect to the created consul.
serverAddr := fmt.Sprintf("localhost:%v", port)
cfg := api.DefaultConfig()
cfg.Address = serverAddr
c, err := api.NewClient(cfg)
if err != nil {
t.Fatalf("api.NewClient(%v) failed: %v", serverAddr, err)
}

// Wait until we can list "/", or timeout.
start := time.Now()
kv := c.KV()
for {
_, _, err := kv.List("/", nil)
if err == nil {
break
}
if time.Since(start) > 10*time.Second {
t.Fatalf("Failed to start consul daemon in time. Consul is returning error: %v", err)
}
time.Sleep(10 * time.Millisecond)
}

return cmd, serverAddr
}
18 changes: 18 additions & 0 deletions go/vt/vttest/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

"vitess.io/vitess/go/vt/proto/vttest"

// we use gRPC everywhere, so import the vtgate client.
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
)
Expand All @@ -45,6 +47,12 @@ type Environment interface {
// See: vttest.MySQLManager for the interface the manager must implement
MySQLManager(mycnf []string, snapshot string) (MySQLManager, error)

// TopoManager is the constructor for the Topology manager that will
// be used by the cluster. It's only used when we run the local cluster with
// a remote topo server instead of in-memory topo server within vtcombo process
// See: vttest.TopoManager for the interface of topo manager
TopoManager(topoImplementation, topoServerAddress, topoRoot string, topology *vttest.VTTestTopology) TopoManager

// Directory is the path where the local cluster will store all its
// data and metadata. For local testing, this should probably be an
// unique temporary directory.
Expand Down Expand Up @@ -147,6 +155,16 @@ func (env *LocalTestEnv) MySQLManager(mycnf []string, snapshot string) (MySQLMan
}, nil
}

// TopoManager implements TopoManager for LocalTestEnv
func (env *LocalTestEnv) TopoManager(topoImplementation, topoServerAddress, topoRoot string, topology *vttest.VTTestTopology) TopoManager {
return &Topoctl{
TopoImplementation: topoImplementation,
TopoGlobalServerAddress: topoServerAddress,
TopoGlobalRoot: topoRoot,
Topology: topology,
}
}

// DefaultProtocol implements DefaultProtocol for LocalTestEnv.
// It is always GRPC.
func (env *LocalTestEnv) DefaultProtocol() string {
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vttest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ type Config struct {

// Allow users to submit direct DDL statements
EnableDirectDDL bool

// Allow users to start a local cluster using a remote topo server
ExternalTopoImplementation string

ExternalTopoGlobalServerAddress string

ExternalTopoGlobalRoot string
}

// InitSchemas is a shortcut for tests that just want to setup a single
Expand Down Expand Up @@ -200,6 +207,7 @@ type LocalCluster struct {
Env Environment

mysql MySQLManager
topo TopoManager
vt *VtProcess
}

Expand Down Expand Up @@ -237,6 +245,16 @@ func (db *LocalCluster) Setup() error {

log.Infof("LocalCluster environment: %+v", db.Env)

// Set up topo manager if we are using a remote topo server
if db.ExternalTopoImplementation != "" {
db.topo = db.Env.TopoManager(db.ExternalTopoImplementation, db.ExternalTopoGlobalServerAddress, db.ExternalTopoGlobalRoot, db.Topology)
log.Infof("Initializing Topo Manager: %+v", db.topo)
if err := db.topo.Setup(); err != nil {
log.Errorf("Failed to set up Topo Manager: %v", err)
return err
}
}

db.mysql, err = db.Env.MySQLManager(db.ExtraMyCnf, db.SnapshotFile)
if err != nil {
return err
Expand Down
23 changes: 23 additions & 0 deletions go/vt/vttest/plugin_consultopo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vttest

// This plugin imports consultopo to register the consul implementation of TopoServer.

import (
_ "vitess.io/vitess/go/vt/topo/consultopo"
)
23 changes: 23 additions & 0 deletions go/vt/vttest/plugin_etcd2topo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vttest

// This plugin imports etcd2topo to register the etcd2 implementation of TopoServer.

import (
_ "vitess.io/vitess/go/vt/topo/etcd2topo"
)
23 changes: 23 additions & 0 deletions go/vt/vttest/plugin_grpctmclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vttest

// Imports and register the gRPC tabletmanager client

import (
_ "vitess.io/vitess/go/vt/vttablet/grpctmclient"
)
23 changes: 23 additions & 0 deletions go/vt/vttest/plugin_zk2topo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vttest

// Imports and register the zk2 TopologyServer

import (
_ "vitess.io/vitess/go/vt/topo/zk2topo"
)
Loading