Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vttestserver: persist vschema changes in --persistent_mode #13065

Merged
merged 4 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ var (
mysqlPort = flags.Int("mysql_port", 3306, "mysql port")
externalTopoServer = flags.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+
"If true, vtcombo will use the flags defined in topo/server.go to open topo server")
plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.")
plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.")
vschemaPersistenceDir = flags.String("vschema-persistence-dir", "", "If set, per-keyspace vschema will be persisted in this directory "+
"and reloaded into the in-memory topology server across restarts. Bookkeeping is performed using a simple watcher goroutine. "+
"This is useful when running vtcombo as an application development container (e.g. vttestserver) where you want to keep the same "+
"vschema even if developer's machine reboots. This works in tandem with vttestserver's --persistent_mode flag. Needless to say, "+
"this is neither a perfect nor a production solution for vschema persistence. Consider using the --external_topo_server flag if "+
"you require a more complete solution. This flag is ignored if --external_topo_server is set.")

tpb vttestpb.VTTestTopology
ts *topo.Server
Expand Down Expand Up @@ -292,6 +298,10 @@ func main() {
exit.Return(1)
}

if *vschemaPersistenceDir != "" && !*externalTopoServer {
startVschemaWatcher(*vschemaPersistenceDir, tpb.Keyspaces, ts)
}

servenv.OnRun(func() {
addStatusParts(vtg)
})
Expand Down
111 changes: 111 additions & 0 deletions go/cmd/vtcombo/vschema_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

hkdsun marked this conversation as resolved.
Show resolved Hide resolved
import (
"context"
"encoding/json"
"os"
"path"

"vitess.io/vitess/go/vt/log"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vttestpb "vitess.io/vitess/go/vt/proto/vttest"
"vitess.io/vitess/go/vt/topo"
)

func startVschemaWatcher(vschemaPersistenceDir string, keyspaces []*vttestpb.Keyspace, ts *topo.Server) {
// Create the directory if it doesn't exist.
if err := createDirectoryIfNotExists(vschemaPersistenceDir); err != nil {
log.Fatalf("Unable to create vschema persistence directory %v: %v", vschemaPersistenceDir, err)
}

// If there are keyspace files, load them.
loadKeyspacesFromDir(vschemaPersistenceDir, keyspaces, ts)

// Rebuild the SrvVSchema object in case we loaded vschema from file
if err := ts.RebuildSrvVSchema(context.Background(), tpb.Cells); err != nil {
log.Fatalf("RebuildSrvVSchema failed: %v", err)
}

// Now watch for changes in the SrvVSchema object and persist them to disk.
go watchSrvVSchema(context.Background(), ts, tpb.Cells[0])
}

func loadKeyspacesFromDir(dir string, keyspaces []*vttestpb.Keyspace, ts *topo.Server) {
for _, ks := range tpb.Keyspaces {
ksFile := path.Join(dir, ks.Name+".json")
if _, err := os.Stat(ksFile); err == nil {
jsonData, err := os.ReadFile(ksFile)
if err != nil {
log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err)
}

keyspace := &vschemapb.Keyspace{}
err = json.Unmarshal(jsonData, keyspace)
if err != nil {
log.Fatalf("Unable to parse keyspace file %v: %v", ksFile, err)
}

ts.SaveVSchema(context.Background(), ks.Name, keyspace)
log.Infof("Loaded keyspace %v from %v\n", ks.Name, ksFile)
}
}
}

func watchSrvVSchema(ctx context.Context, ts *topo.Server, cell string) {
data, ch, err := ts.WatchSrvVSchema(context.Background(), tpb.Cells[0])
if err != nil {
log.Fatalf("WatchSrvVSchema failed: %v", err)
}

if data.Err != nil {
log.Fatalf("WatchSrvVSchema could not retrieve initial vschema: %v", data.Err)
}
persistNewSrvVSchema(data.Value)

for update := range ch {
if update.Err != nil {
log.Errorf("WatchSrvVSchema returned an error: %v", update.Err)
} else {
persistNewSrvVSchema(update.Value)
}
}
}

func persistNewSrvVSchema(srvVSchema *vschemapb.SrvVSchema) {
for ksName, ks := range srvVSchema.Keyspaces {
jsonBytes, err := json.MarshalIndent(ks, "", " ")
if err != nil {
log.Errorf("Error marshaling keyspace: %v", err)
continue
}

err = os.WriteFile(path.Join(*vschemaPersistenceDir, ksName+".json"), jsonBytes, 0644)
if err != nil {
log.Errorf("Error writing keyspace file: %v", err)
}
log.Infof("Persisted keyspace %v to %v", ksName, *vschemaPersistenceDir)
}
}

func createDirectoryIfNotExists(dir string) error {
if _, err := os.Stat(dir); os.IsNotExist(err) {
return os.Mkdir(dir, 0755)
}
return nil
}
6 changes: 3 additions & 3 deletions go/cmd/vttestserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func registerFlags(fs *pflag.FlagSet) {
" vttestserver as a database container in local developer environments. Note"+
" that db migration files (--schema_dir option) and seeding of"+
" random data (--initialize_with_random_data option) will only run during"+
" cluster startup if the data directory does not already exist. vschema"+
" migrations are run every time the cluster starts, since persistence"+
" for the topology server has not been implemented yet")
" cluster startup if the data directory does not already exist. "+
" Changes to VSchema are persisted across cluster restarts using a simple"+
" watcher if the --data_dir argument is specified.")

fs.BoolVar(&doSeed, "initialize_with_random_data", false,
"If this flag is each table-shard will be initialized"+
Expand Down
14 changes: 11 additions & 3 deletions go/cmd/vttestserver/vttestserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,14 @@ func TestPersistentMode(t *testing.T) {
cluster, err := startPersistentCluster(dir)
assert.NoError(t, err)

// basic sanity checks similar to TestRunsVschemaMigrations
// Add a new "ad-hoc" vindex via vtgate once the cluster is up, to later make sure it is persisted across teardowns
err = addColumnVindex(cluster, "test_keyspace", "alter vschema on persistence_test add vindex my_vdx(id)")
assert.NoError(t, err)

// Basic sanity checks similar to TestRunsVschemaMigrations
// See go/cmd/vttestserver/data/schema/app_customer/* and go/cmd/vttestserver/data/schema/test_keyspace/*
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "persistence_test", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})

// insert some data to ensure persistence across teardowns
Expand Down Expand Up @@ -111,8 +117,9 @@ func TestPersistentMode(t *testing.T) {
defer cluster.TearDown()
assert.NoError(t, err)

// rerun our sanity checks to make sure vschema migrations are run during every startup
// rerun our sanity checks to make sure vschema is persisted correctly
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "persistence_test", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})

// ensure previous data was successfully persisted
Expand Down Expand Up @@ -316,7 +323,8 @@ func startCluster(flags ...string) (vttest.LocalCluster, error) {
keyspaceArg := "--keyspaces=" + strings.Join(clusterKeyspaces, ",")
numShardsArg := "--num_shards=2,2"
vschemaDDLAuthorizedUsers := "--vschema_ddl_authorized_users=%"
os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, tabletHostname, vschemaDDLAuthorizedUsers}...)
alsoLogToStderr := "--alsologtostderr" // better debugging
os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, tabletHostname, vschemaDDLAuthorizedUsers, alsoLogToStderr}...)
os.Args = append(os.Args, flags...)
return runCluster()
}
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttestserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Usage of vttestserver:
--num_shards strings Comma separated shard count (one per keyspace) (default [2])
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
--persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. vschema migrations are run every time the cluster starts, since persistence for the topology server has not been implemented yet
--persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. Changes to VSchema are persisted across cluster restarts using a simple watcher if the --data_dir argument is specified.
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.
--planner-version string Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the new gen4 planner and falls back to the V3 planner if the gen4 fails.
--pool_hostname_resolve_interval duration if set force an update to all hostnames and reconnect if changed, defaults to 0 (disabled)
Expand Down
34 changes: 16 additions & 18 deletions go/vt/vttest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,28 +491,26 @@ func (db *LocalCluster) loadSchema(shouldRunDatabaseMigrations bool) error {
}
}

glob, _ := filepath.Glob(path.Join(schemaDir, "*.sql"))
for _, filepath := range glob {
cmds, err := LoadSQLFile(filepath, schemaDir)
if err != nil {
return err
}

// One single vschema migration per file
if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") {
if err = db.applyVschema(keyspace, cmds[0]); err != nil {
if shouldRunDatabaseMigrations {
glob, _ := filepath.Glob(path.Join(schemaDir, "*.sql"))
for _, filepath := range glob {
cmds, err := LoadSQLFile(filepath, schemaDir)
if err != nil {
return err
}
continue
}

if !shouldRunDatabaseMigrations {
continue
}
// One single vschema migration per file
if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") {
if err = db.applyVschema(keyspace, cmds[0]); err != nil {
return err
}
continue
}

for _, dbname := range db.shardNames(kpb) {
if err := db.Execute(cmds, dbname); err != nil {
return err
for _, dbname := range db.shardNames(kpb) {
if err := db.Execute(cmds, dbname); err != nil {
return err
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttest/vtprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"os"
"os/exec"
"path"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -242,6 +243,9 @@ func VtcomboProcess(environment Environment, args *Config, mysql MySQLManager) (
if args.SchemaDir != "" {
vt.ExtraArgs = append(vt.ExtraArgs, []string{"--schema_dir", args.SchemaDir}...)
}
if args.PersistentMode && args.DataDir != "" {
vt.ExtraArgs = append(vt.ExtraArgs, []string{"--vschema-persistence-dir", path.Join(args.DataDir, "vschema_data")}...)
}
if args.TransactionMode != "" {
vt.ExtraArgs = append(vt.ExtraArgs, []string{"--transaction_mode", args.TransactionMode}...)
}
Expand Down