Skip to content

Commit

Permalink
Merge pull request #7718 from Shopify/persistent-vttest
Browse files Browse the repository at this point in the history
Make vttestserver compatible with persistent data directories
  • Loading branch information
deepthi authored Mar 24, 2021
2 parents 0fec0b9 + 0698a01 commit 92584e9
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 57 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,13 @@ docker_local:
docker_mini:
${call build_docker_image,docker/mini/Dockerfile,vitess/mini}

DOCKER_VTTESTSERVER_SUFFIX = mysql57 mysql80
DOCKER_VTTESTSERVER_TARGETS = $(addprefix docker_vttestserver_,$(DOCKER_VTTESTSERVER_SUFFIX))
$(DOCKER_VTTESTSERVER_TARGETS): docker_vttestserver_%:
${call build_docker_image,docker/vttestserver/Dockerfile.$*,vitess/vttestserver:$*}

docker_vttestserver: $(DOCKER_VTTESTSERVER_TARGETS)

# This rule loads the working copy of the code into a bootstrap image,
# and then runs the tests inside Docker.
# Example: $ make docker_test flavor=mariadb
Expand Down
16 changes: 15 additions & 1 deletion go/cmd/vttestserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -81,6 +83,16 @@ func init() {
" Also, the output specifies the mysql unix socket"+
" instead of the vtgate port.")

flag.BoolVar(&config.PersistentMode, "persistent_mode", false,
"If this flag is set, the MySQL data directory is not cleaned up"+
" when LocalCluster.TearDown() is called. This is useful for running"+
" vttestserver as a database container in local developer environments. Note"+
" that db migration files (-schema_dir option) and seeding of"+
" random data (-initialize_with_random_data option) will only run during"+
" cluster startup if the data directory does not already exist. vschema"+
" migrations are run every time the cluster starts, since persistence"+
" for the topology server has not been implemented yet")

flag.BoolVar(&doSeed, "initialize_with_random_data", false,
"If this flag is each table-shard will be initialized"+
" with random data. See also the 'rng_seed' and 'min_shard_size'"+
Expand Down Expand Up @@ -229,7 +241,9 @@ func main() {
log.Fatal(err)
}

select {}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
}

func runCluster() (vttest.LocalCluster, error) {
Expand Down
109 changes: 98 additions & 11 deletions go/cmd/vttestserver/vttestserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"strings"
"testing"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/tlstest"

"github.com/stretchr/testify/assert"
Expand All @@ -52,10 +54,12 @@ type columnVindex struct {
}

func TestRunsVschemaMigrations(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

cluster, err := startCluster()
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)

assert.NoError(t, err)
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
Expand All @@ -67,12 +71,69 @@ func TestRunsVschemaMigrations(t *testing.T) {
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table1", vindex: "my_vdx", vindexType: "hash", column: "id"})
}

func TestPersistentMode(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

dir, err := ioutil.TempDir("/tmp", "vttestserver_persistent_mode_")
assert.NoError(t, err)
defer os.RemoveAll(dir)

cluster, err := startPersistentCluster(dir)
assert.NoError(t, err)

// basic sanity checks similar to TestRunsVschemaMigrations
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})

// insert some data to ensure persistence across teardowns
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) error {
_, err := conn.ExecuteFetch("insert into customers (id, name) values (1, 'gopherson')", 1, false)
return err
})
assert.NoError(t, err)

expectedRows := [][]sqltypes.Value{
{sqltypes.NewInt64(1), sqltypes.NewVarChar("gopherson"), sqltypes.NULL},
}

// ensure data was actually inserted
var res *sqltypes.Result
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) (err error) {
res, err = conn.ExecuteFetch("SELECT * FROM customers", 1, false)
return err
})
assert.NoError(t, err)
assert.Equal(t, expectedRows, res.Rows)

// reboot the persistent cluster
cluster.TearDown()
cluster, err = startPersistentCluster(dir)
defer cluster.TearDown()
assert.NoError(t, err)

// rerun our sanity checks to make sure vschema migrations are run during every startup
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})

// ensure previous data was successfully persisted
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) (err error) {
res, err = conn.ExecuteFetch("SELECT * FROM customers", 1, false)
return err
})
assert.NoError(t, err)
assert.Equal(t, expectedRows, res.Rows)
}

func TestCanVtGateExecute(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

cluster, err := startCluster()
assert.NoError(t, err)
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)

client, err := vtctlclient.New(fmt.Sprintf("localhost:%v", cluster.GrpcPort()))
assert.NoError(t, err)
Expand Down Expand Up @@ -109,6 +170,10 @@ Out:
}

func TestMtlsAuth(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

// Our test root.
root, err := ioutil.TempDir("", "tlstest")
if err != nil {
Expand Down Expand Up @@ -141,15 +206,17 @@ func TestMtlsAuth(t *testing.T) {
fmt.Sprintf("-grpc_auth_mtls_allowed_substrings=%s", "CN=ClientApp"))
assert.NoError(t, err)
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)

// startCluster will apply vschema migrations using vtctl grpc and the clientCert.
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})
}

func TestMtlsAuthUnauthorizedFails(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

// Our test root.
root, err := ioutil.TempDir("", "tlstest")
if err != nil {
Expand Down Expand Up @@ -182,13 +249,21 @@ func TestMtlsAuthUnauthorizedFails(t *testing.T) {
fmt.Sprintf("-vtctld_grpc_ca=%s", caCert),
fmt.Sprintf("-grpc_auth_mtls_allowed_substrings=%s", "CN=ClientApp"))
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)

assert.Error(t, err)
assert.Contains(t, err.Error(), "code = Unauthenticated desc = client certificate not authorized")
}

func startPersistentCluster(dir string, flags ...string) (vttest.LocalCluster, error) {
flags = append(flags, []string{
"-persistent_mode",
// FIXME: if port is not provided, data_dir is not respected
fmt.Sprintf("-port=%d", randomPort()),
fmt.Sprintf("-data_dir=%s", dir),
}...)
return startCluster(flags...)
}

func startCluster(flags ...string) (vttest.LocalCluster, error) {
schemaDirArg := "-schema_dir=data/schema"
tabletHostname := "-tablet_hostname=localhost"
Expand All @@ -201,6 +276,13 @@ func startCluster(flags ...string) (vttest.LocalCluster, error) {
}

func addColumnVindex(cluster vttest.LocalCluster, keyspace string, vschemaMigration string) error {
return execOnCluster(cluster, keyspace, func(conn *mysql.Conn) error {
_, err := conn.ExecuteFetch(vschemaMigration, 1, false)
return err
})
}

func execOnCluster(cluster vttest.LocalCluster, keyspace string, f func(*mysql.Conn) error) error {
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Expand All @@ -213,8 +295,7 @@ func addColumnVindex(cluster vttest.LocalCluster, keyspace string, vschemaMigrat
return err
}
defer conn.Close()
_, err = conn.ExecuteFetch(vschemaMigration, 1, false)
return err
return f(conn)
}

func assertColumnVindex(t *testing.T, cluster vttest.LocalCluster, expected columnVindex) {
Expand Down Expand Up @@ -243,6 +324,12 @@ func assertEqual(t *testing.T, actual string, expected string, message string) {
}
}

func resetFlags(args []string) {
func resetFlags(args []string, conf vttest.Config) {
os.Args = args
config = conf
}

func randomPort() int {
v := rand.Int31n(20000)
return int(v + 10000)
}
62 changes: 43 additions & 19 deletions go/test/endtoend/vtcombo/vttest_sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestMain(m *testing.M) {
cfg.Topology = topology
cfg.SchemaDir = os.Getenv("VTROOT") + "/test/vttest_schema"
cfg.DefaultSchemaDir = os.Getenv("VTROOT") + "/test/vttest_schema/default"
cfg.PersistentMode = true

localCluster = &vttest.LocalCluster{
Config: cfg,
Expand Down Expand Up @@ -116,27 +117,24 @@ func TestStandalone(t *testing.T) {
conn, err := vtgateconn.Dial(ctx, grpcAddress)
require.Nil(t, err)
defer conn.Close()
cur := conn.Session(ks1+":-80@master", nil)

idStart, rowCount := 1000, 500
query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err = cur.Execute(ctx, "begin", nil)
require.Nil(t, err)
insertManyRows(ctx, t, conn, idStart, rowCount)
assertInsertedRowsExist(ctx, t, conn, idStart, rowCount)
assertCanInsertRow(ctx, t, conn)
assertTablesPresent(t)

for i := idStart; i < idStart+rowCount; i++ {
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)
}

_, err = cur.Execute(ctx, "commit", nil)
err = localCluster.TearDown()
require.Nil(t, err)
err = localCluster.Setup()
require.Nil(t, err)

assertInsertedRowsExist(ctx, t, conn, idStart, rowCount)
assertTablesPresent(t)
}

cur = conn.Session(ks1+":-80@rdonly", nil)
func assertInsertedRowsExist(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) {
cur := conn.Session(ks1+":-80@rdonly", nil)
bindVariables := map[string]*querypb.BindVariable{
"id_start": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(idStart), 10))},
}
Expand All @@ -153,23 +151,49 @@ func TestStandalone(t *testing.T) {
require.Nil(t, err)
require.Equal(t, 1, len(res.Rows))
assert.Equal(t, "VARCHAR(\"test1000\")", res.Rows[0][1].String())
}

cur = conn.Session(ks1+":80-@master", nil)
_, err = cur.Execute(ctx, "begin", nil)
func assertCanInsertRow(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn) {
cur := conn.Session(ks1+":80-@master", nil)
_, err := cur.Execute(ctx, "begin", nil)
require.Nil(t, err)

i := 0x810000000000000
bindVariables = map[string]*querypb.BindVariable{
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)

_, err = cur.Execute(ctx, "commit", nil)
require.Nil(t, err)
}

func insertManyRows(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) {
cur := conn.Session(ks1+":-80@master", nil)

query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err := cur.Execute(ctx, "begin", nil)
require.Nil(t, err)

for i := idStart; i < idStart+rowCount; i++ {
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)
}

_, err = cur.Execute(ctx, "commit", nil)
require.Nil(t, err)
}

func assertTablesPresent(t *testing.T) {
tmpCmd := exec.Command("vtctlclient", "-vtctl_client_protocol", "grpc", "-server", grpcAddress, "-stderrthreshold", "0", "ListAllTablets", "test")

log.Infof("Running vtctlclient with command: %v", tmpCmd.Args)
Expand Down
7 changes: 6 additions & 1 deletion go/vt/mysqlctl/mycnf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ func TabletDir(uid uint32) string {
if *tabletDir != "" {
return fmt.Sprintf("%s/%s", env.VtDataRoot(), *tabletDir)
}
return fmt.Sprintf("%s/vt_%010d", env.VtDataRoot(), uid)
return DefaultTabletDirAtRoot(env.VtDataRoot(), uid)
}

// DefaultTabletDirAtRoot returns the default directory for a tablet given a UID and a VtDataRoot variable
func DefaultTabletDirAtRoot(dataRoot string, uid uint32) string {
return fmt.Sprintf("%s/vt_%010d", dataRoot, uid)
}

// MycnfFile returns the default location of the my.cnf file.
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vttest/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (env *LocalTestEnv) MySQLManager(mycnf []string, snapshot string) (MySQLMan
Port: env.PortForProtocol("mysql", ""),
MyCnf: append(env.DefaultMyCnf, mycnf...),
Env: env.EnvVars(),
UID: 1,
}, nil
}

Expand Down Expand Up @@ -241,9 +242,11 @@ func NewLocalTestEnv(flavor string, basePort int) (*LocalTestEnv, error) {
// NewLocalTestEnvWithDirectory returns a new instance of the default test
// environment with a directory explicitly specified.
func NewLocalTestEnvWithDirectory(flavor string, basePort int, directory string) (*LocalTestEnv, error) {
err := os.Mkdir(path.Join(directory, "logs"), 0700)
if err != nil {
return nil, err
if _, err := os.Stat(directory); os.IsNotExist(err) {
err := os.Mkdir(path.Join(directory, "logs"), 0700)
if err != nil {
return nil, err
}
}

flavor, mycnf, err := GetMySQLOptions(flavor)
Expand Down
Loading

0 comments on commit 92584e9

Please sign in to comment.