Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to run vtcombo and vttest local cluster with real topo server #9176

Merged
merged 4 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ var (

mysqlPort = flag.Int("mysql_port", 3306, "mysql port")

externalTopoServer = flag.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+
"If true, vtcombo will use the flags defined in topo/server.go to open topo server")

ts *topo.Server
resilientServer *srvtopo.ResilientServer
)
Expand Down Expand Up @@ -135,8 +138,14 @@ func main() {
flag.Set("log_dir", "$VTDATAROOT/tmp")
}

// Create topo server. We use a 'memorytopo' implementation.
ts = memorytopo.NewServer(tpb.Cells...)
if *externalTopoServer {
// Open topo server based on the command line flags defined at topo/server.go
zhongr3n marked this conversation as resolved.
Show resolved Hide resolved
// do not create cell info as it should be done by whoever sets up the external topo server
ts = topo.Open()
} else {
// Create topo server. We use a 'memorytopo' implementation.
ts = memorytopo.NewServer(tpb.Cells...)
}
servenv.Init()
tabletenv.Init()

Expand Down
5 changes: 5 additions & 0 deletions go/cmd/vttestserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func init() {
flag.StringVar(&config.ForeignKeyMode, "foreign_key_mode", "allow", "This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow")
flag.BoolVar(&config.EnableOnlineDDL, "enable_online_ddl", true, "Allow users to submit, review and control Online DDL")
flag.BoolVar(&config.EnableDirectDDL, "enable_direct_ddl", true, "Allow users to submit direct DDL statements")

// flags for using an actual topo implementation for vtcombo instead of in-memory topo. useful for test setup where an external topo server is shared across multiple vtcombo processes or other components
flag.StringVar(&config.ExternalTopoImplementation, "external_topo_implementation", "", "the topology implementation to use for vtcombo process")
flag.StringVar(&config.ExternalTopoGlobalServerAddress, "external_topo_global_server_address", "", "the address of the global topology server for vtcombo process")
flag.StringVar(&config.ExternalTopoGlobalRoot, "external_topo_global_root", "", "the path of the global topology data in the global topology server for vtcombo process")
}

func (t *topoFlags) buildTopology() (*vttestpb.VTTestTopology, error) {
Expand Down
36 changes: 36 additions & 0 deletions go/cmd/vttestserver/vttestserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/test"

"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -308,6 +311,39 @@ func TestMtlsAuthUnauthorizedFails(t *testing.T) {
assert.Contains(t, err.Error(), "code = Unauthenticated desc = client certificate not authorized")
}

func TestExternalTopoServerConsul(t *testing.T) {
// Start a single consul in the background.
cmd, configFilename, serverAddr := test.StartConsul(t, "")
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use consul agent -dev i think we could do consul leave for a graceful shutdown and don't need to call os.Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried consul leave and it adds 10 seconds for grace shutdown. keeping process.kill

log.Errorf("cmd process kill has an error: %v", err)
}
// Alerts command did not run successful
if err := cmd.Wait(); err != nil {
log.Errorf("cmd process wait has an error: %v", err)
}
os.Remove(configFilename)
}()

args := os.Args
conf := config
defer resetFlags(args, conf)

cluster, err := startCluster("-external_topo_implementation=consul",
fmt.Sprintf("-external_topo_global_server_address=%s", serverAddr), "-external_topo_global_root=consul_test/global")
defer cluster.TearDown()

assert.NoError(t, err)
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})

// Add Hash vindex via vtgate execution on table
err = addColumnVindex(cluster, "test_keyspace", "alter vschema on test_table1 add vindex my_vdx (id)")
assert.NoError(t, err)
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table1", vindex: "my_vdx", vindexType: "hash", column: "id"})
}

func startPersistentCluster(dir string, flags ...string) (vttest.LocalCluster, error) {
flags = append(flags, []string{
"-persistent_mode",
Expand Down
100 changes: 4 additions & 96 deletions go/vt/topo/consultopo/server_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ limitations under the License.
package consultopo

import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"testing"
"time"
Expand All @@ -29,108 +27,18 @@ import (

"context"

"github.com/hashicorp/consul/api"

"vitess.io/vitess/go/testfiles"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/test"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// startConsul starts a consul subprocess, and waits for it to be ready.
// Returns the exec.Cmd forked, the config file to remove after the test,
// and the server address to RPC-connect to.
func startConsul(t *testing.T, authToken string) (*exec.Cmd, string, string) {
// Create a temporary config file, as ports cannot all be set
// via command line. The file name has to end with '.json' so
// we're not using TempFile.
configDir, err := os.MkdirTemp("", "consul")
if err != nil {
t.Fatalf("cannot create temp dir: %v", err)
}
defer os.RemoveAll(configDir)

configFilename := path.Join(configDir, "consul.json")
configFile, err := os.OpenFile(configFilename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
t.Fatalf("cannot create tempfile: %v", err)
}

// Create the JSON config, save it.
port := testfiles.GoVtTopoConsultopoPort
config := map[string]interface{}{
"ports": map[string]int{
"dns": port,
"http": port + 1,
"serf_lan": port + 2,
"serf_wan": port + 3,
},
}

if authToken != "" {
config["datacenter"] = "vitess"
config["acl_datacenter"] = "vitess"
config["acl_master_token"] = authToken
config["acl_default_policy"] = "deny"
config["acl_down_policy"] = "extend-cache"
}

data, err := json.Marshal(config)
if err != nil {
t.Fatalf("cannot json-encode config: %v", err)
}
if _, err := configFile.Write(data); err != nil {
t.Fatalf("cannot write config: %v", err)
}
if err := configFile.Close(); err != nil {
t.Fatalf("cannot close config: %v", err)
}

cmd := exec.Command("consul",
"agent",
"-dev",
"-config-file", configFilename)
err = cmd.Start()
if err != nil {
t.Fatalf("failed to start consul: %v", err)
}

// Create a client to connect to the created consul.
serverAddr := fmt.Sprintf("localhost:%v", port+1)
cfg := api.DefaultConfig()
cfg.Address = serverAddr
if authToken != "" {
cfg.Token = authToken
}
c, err := api.NewClient(cfg)
if err != nil {
t.Fatalf("api.NewClient(%v) failed: %v", serverAddr, err)
}

// Wait until we can list "/", or timeout.
start := time.Now()
kv := c.KV()
for {
_, _, err := kv.List("/", nil)
if err == nil {
break
}
if time.Since(start) > 10*time.Second {
t.Fatalf("Failed to start consul daemon in time. Consul is returning error: %v", err)
}
time.Sleep(10 * time.Millisecond)
}

return cmd, configFilename, serverAddr
}

func TestConsulTopo(t *testing.T) {
// One test is going to wait that full period, so make it shorter.
*watchPollDuration = 100 * time.Millisecond

// Start a single consul in the background.
cmd, configFilename, serverAddr := startConsul(t, "")
cmd, configFilename, serverAddr := test.StartConsul(t, "")
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
Expand Down Expand Up @@ -176,7 +84,7 @@ func TestConsulTopoWithChecks(t *testing.T) {
*consulLockSessionTTL = "15s"

// Start a single consul in the background.
cmd, configFilename, serverAddr := startConsul(t, "")
cmd, configFilename, serverAddr := test.StartConsul(t, "")
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
Expand Down Expand Up @@ -220,7 +128,7 @@ func TestConsulTopoWithAuth(t *testing.T) {
*watchPollDuration = 100 * time.Millisecond

// Start a single consul in the background.
cmd, configFilename, serverAddr := startConsul(t, "123456")
cmd, configFilename, serverAddr := test.StartConsul(t, "123456")
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
Expand Down Expand Up @@ -277,7 +185,7 @@ func TestConsulTopoWithAuthFailure(t *testing.T) {
*watchPollDuration = 100 * time.Millisecond

// Start a single consul in the background.
cmd, configFilename, serverAddr := startConsul(t, "123456")
cmd, configFilename, serverAddr := test.StartConsul(t, "123456")
defer func() {
cmd.Process.Kill()
cmd.Wait()
Expand Down
97 changes: 97 additions & 0 deletions go/vt/topo/test/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,17 @@ limitations under the License.
package test

import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"testing"
"time"

"github.com/hashicorp/consul/api"

"vitess.io/vitess/go/testfiles"

"vitess.io/vitess/go/vt/topo"

Expand Down Expand Up @@ -112,3 +122,90 @@ func TopoServerTestSuite(t *testing.T, factory func() *topo.Server) {
checkWatchInterrupt(t, ts)
ts.Close()
}

// StartConsul starts a consul subprocess, and waits for it to be ready.
// Returns the exec.Cmd forked, the config file to remove after the test,
// and the server address to RPC-connect to.
func StartConsul(t *testing.T, authToken string) (*exec.Cmd, string, string) {
// Create a temporary config file, as ports cannot all be set
// via command line. The file name has to end with '.json' so
// we're not using TempFile.
configDir, err := os.MkdirTemp("", "consul")
if err != nil {
t.Fatalf("cannot create temp dir: %v", err)
}
defer os.RemoveAll(configDir)

configFilename := path.Join(configDir, "consul.json")
configFile, err := os.OpenFile(configFilename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
t.Fatalf("cannot create tempfile: %v", err)
}

// Create the JSON config, save it.
port := testfiles.GoVtTopoConsultopoPort
config := map[string]interface{}{
"ports": map[string]int{
"dns": port,
"http": port + 1,
"serf_lan": port + 2,
"serf_wan": port + 3,
},
}

if authToken != "" {
config["datacenter"] = "vitess"
config["acl_datacenter"] = "vitess"
config["acl_master_token"] = authToken
config["acl_default_policy"] = "deny"
config["acl_down_policy"] = "extend-cache"
zhongr3n marked this conversation as resolved.
Show resolved Hide resolved
}

data, err := json.Marshal(config)
if err != nil {
t.Fatalf("cannot json-encode config: %v", err)
}
if _, err := configFile.Write(data); err != nil {
t.Fatalf("cannot write config: %v", err)
}
if err := configFile.Close(); err != nil {
t.Fatalf("cannot close config: %v", err)
}

cmd := exec.Command("consul",
"agent",
"-dev",
"-config-file", configFilename)
err = cmd.Start()
if err != nil {
t.Fatalf("failed to start consul: %v", err)
}

// Create a client to connect to the created consul.
serverAddr := fmt.Sprintf("localhost:%v", port+1)
cfg := api.DefaultConfig()
cfg.Address = serverAddr
if authToken != "" {
cfg.Token = authToken
}
c, err := api.NewClient(cfg)
if err != nil {
t.Fatalf("api.NewClient(%v) failed: %v", serverAddr, err)
}

// Wait until we can list "/", or timeout.
start := time.Now()
kv := c.KV()
for {
_, _, err := kv.List("/", nil)
if err == nil {
break
}
if time.Since(start) > 10*time.Second {
t.Fatalf("Failed to start consul daemon in time. Consul is returning error: %v", err)
}
time.Sleep(10 * time.Millisecond)
}

return cmd, configFilename, serverAddr
}
18 changes: 18 additions & 0 deletions go/vt/vttest/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

"vitess.io/vitess/go/vt/proto/vttest"

// we use gRPC everywhere, so import the vtgate client.
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
)
Expand All @@ -45,6 +47,12 @@ type Environment interface {
// See: vttest.MySQLManager for the interface the manager must implement
MySQLManager(mycnf []string, snapshot string) (MySQLManager, error)

// TopoManager is the constructor for the Topology manager that will
// be used by the cluster. It's only used when we run the local cluster with
// a remote topo server instead of in-memory topo server within vtcombo process
// See: vttest.TopoManager for the interface of topo manager
TopoManager(topoImplementation, topoServerAddress, topoRoot string, topology *vttest.VTTestTopology) TopoManager

// Directory is the path where the local cluster will store all its
// data and metadata. For local testing, this should probably be an
// unique temporary directory.
Expand Down Expand Up @@ -147,6 +155,16 @@ func (env *LocalTestEnv) MySQLManager(mycnf []string, snapshot string) (MySQLMan
}, nil
}

// TopoManager implements TopoManager for LocalTestEnv
func (env *LocalTestEnv) TopoManager(topoImplementation, topoServerAddress, topoRoot string, topology *vttest.VTTestTopology) TopoManager {
return &Topoctl{
TopoImplementation: topoImplementation,
TopoGlobalServerAddress: topoServerAddress,
TopoGlobalRoot: topoRoot,
Topology: topology,
}
}

// DefaultProtocol implements DefaultProtocol for LocalTestEnv.
// It is always GRPC.
func (env *LocalTestEnv) DefaultProtocol() string {
Expand Down
Loading