From 2c1aef911085cf2e926d6cd9df205cd6a702e871 Mon Sep 17 00:00:00 2001 From: Hormoz Kheradmand Date: Tue, 9 May 2023 05:48:38 +0000 Subject: [PATCH 1/4] vttestserver: persist vschema changes to data_dir Signed-off-by: Hormoz Kheradmand --- go/cmd/vtcombo/main.go | 12 +++- go/cmd/vtcombo/vschema_watcher.go | 96 +++++++++++++++++++++++++++++++ go/cmd/vttestserver/main.go | 6 +- go/vt/vttest/vtprocess.go | 4 ++ 4 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 go/cmd/vtcombo/vschema_watcher.go diff --git a/go/cmd/vtcombo/main.go b/go/cmd/vtcombo/main.go index affbf0520e7..9ab94776bef 100644 --- a/go/cmd/vtcombo/main.go +++ b/go/cmd/vtcombo/main.go @@ -61,7 +61,13 @@ var ( mysqlPort = flags.Int("mysql_port", 3306, "mysql port") externalTopoServer = flags.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+ "If true, vtcombo will use the flags defined in topo/server.go to open topo server") - plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.") + plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.") + vschemaPersistenceDir = flags.String("vschema_persistence_dir", "", "If set, per-keyspace vschema will be persisted in this directory "+ + "and reloaded into the in-memory topology server across restarts. Bookkeeping is performed using a simple watcher goroutine. "+ + "This is useful when running vtcombo as an application development container (e.g. vttestserver) where you want to keep the same "+ + "vschema even if developer's machine reboots. This works in tandem with vttestserver's --persistent_mode flag. Needless to say, "+ + "this is neither a perfect nor a production solution for vschema persistence. Consider using the --external_topo_server flag if "+ + "you require a more complete solution. This flag is ignored if --external_topo_server is set.") tpb vttestpb.VTTestTopology ts *topo.Server @@ -292,6 +298,10 @@ func main() { exit.Return(1) } + if *vschemaPersistenceDir != "" && !*externalTopoServer { + startVschemaWatcher(*vschemaPersistenceDir, tpb.Keyspaces, ts) + } + servenv.OnRun(func() { addStatusParts(vtg) }) diff --git a/go/cmd/vtcombo/vschema_watcher.go b/go/cmd/vtcombo/vschema_watcher.go new file mode 100644 index 00000000000..c061a9fd243 --- /dev/null +++ b/go/cmd/vtcombo/vschema_watcher.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path" + + "vitess.io/vitess/go/vt/log" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vttestpb "vitess.io/vitess/go/vt/proto/vttest" + "vitess.io/vitess/go/vt/topo" +) + +func startVschemaWatcher(vschemaPersistenceDir string, keyspaces []*vttestpb.Keyspace, ts *topo.Server) { + // Create the directory if it doesn't exist. + if err := createDirectoryIfNotExists(vschemaPersistenceDir); err != nil { + log.Fatalf("Unable to create vschema persistence directory %v: %v", vschemaPersistenceDir, err) + } + + // If there are keyspace files, load them. + loadKeyspacesFromDir(vschemaPersistenceDir, keyspaces, ts) + + // Rebuild the SrvVSchema object in case we loaded vschema from file + if err := ts.RebuildSrvVSchema(context.Background(), tpb.Cells); err != nil { + log.Fatalf("RebuildSrvVSchema failed: %v", err) + } + + // Now watch for changes in the SrvVSchema object and persist them to disk. + go watchSrvVSchema(context.Background(), ts, tpb.Cells[0]) +} + +func loadKeyspacesFromDir(dir string, keyspaces []*vttestpb.Keyspace, ts *topo.Server) { + for _, ks := range tpb.Keyspaces { + ksFile := path.Join(dir, ks.Name+".json") + if _, err := os.Stat(ksFile); err == nil { + jsonData, err := ioutil.ReadFile(ksFile) + if err != nil { + log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err) + } + + keyspace := &vschemapb.Keyspace{} + err = json.Unmarshal(jsonData, keyspace) + if err != nil { + log.Fatalf("Unable to parse keyspace file %v: %v", ksFile, err) + } + + ts.SaveVSchema(context.Background(), ks.Name, keyspace) + log.Infof("Loaded keyspace %v from %v\n", ks.Name, ksFile) + } + } +} + +func watchSrvVSchema(ctx context.Context, ts *topo.Server, cell string) { + data, ch, err := ts.WatchSrvVSchema(context.Background(), tpb.Cells[0]) + if err != nil { + log.Fatalf("WatchSrvVSchema failed: %v", err) + } + + if data.Err != nil { + log.Fatalf("WatchSrvVSchema could not retrieve initial vschema: %v", data.Err) + } + persistNewSrvVSchema(data.Value) + + for update := range ch { + if update.Err != nil { + log.Errorf("WatchSrvVSchema returned an error: %v", update.Err) + } else { + persistNewSrvVSchema(update.Value) + } + } +} + +func persistNewSrvVSchema(srvVSchema *vschemapb.SrvVSchema) { + for ksName, ks := range srvVSchema.Keyspaces { + jsonBytes, err := json.MarshalIndent(ks, "", " ") + if err != nil { + log.Errorf("Error marshaling keyspace: %v", err) + continue + } + + err = ioutil.WriteFile(path.Join(*vschemaPersistenceDir, ksName+".json"), jsonBytes, 0644) + if err != nil { + log.Errorf("Error writing keyspace file: %v", err) + } + log.Infof("Persisted keyspace %v to %v", ksName, *vschemaPersistenceDir) + } +} + +func createDirectoryIfNotExists(dir string) error { + if _, err := os.Stat(dir); os.IsNotExist(err) { + return os.Mkdir(dir, 0755) + } + return nil +} diff --git a/go/cmd/vttestserver/main.go b/go/cmd/vttestserver/main.go index e73b722d154..3246f730aa9 100644 --- a/go/cmd/vttestserver/main.go +++ b/go/cmd/vttestserver/main.go @@ -93,9 +93,9 @@ func registerFlags(fs *pflag.FlagSet) { " vttestserver as a database container in local developer environments. Note"+ " that db migration files (--schema_dir option) and seeding of"+ " random data (--initialize_with_random_data option) will only run during"+ - " cluster startup if the data directory does not already exist. vschema"+ - " migrations are run every time the cluster starts, since persistence"+ - " for the topology server has not been implemented yet") + " cluster startup if the data directory does not already exist. "+ + " Changes to VSchema are persisted across cluster restarts using a simple"+ + " watcher if the --data_dir argument is specified.") fs.BoolVar(&doSeed, "initialize_with_random_data", false, "If this flag is each table-shard will be initialized"+ diff --git a/go/vt/vttest/vtprocess.go b/go/vt/vttest/vtprocess.go index 37582542c02..f829b85b133 100644 --- a/go/vt/vttest/vtprocess.go +++ b/go/vt/vttest/vtprocess.go @@ -23,6 +23,7 @@ import ( "net/http" "os" "os/exec" + "path" "strings" "syscall" "time" @@ -242,6 +243,9 @@ func VtcomboProcess(environment Environment, args *Config, mysql MySQLManager) ( if args.SchemaDir != "" { vt.ExtraArgs = append(vt.ExtraArgs, []string{"--schema_dir", args.SchemaDir}...) } + if args.PersistentMode && args.DataDir != "" { + vt.ExtraArgs = append(vt.ExtraArgs, []string{"--vschema_persistence_dir", path.Join(args.DataDir, "vschema_data")}...) + } if args.TransactionMode != "" { vt.ExtraArgs = append(vt.ExtraArgs, []string{"--transaction_mode", args.TransactionMode}...) } From fa1a669369da623de4db5ea9a1aa61b264c4d69b Mon Sep 17 00:00:00 2001 From: Hormoz Kheradmand Date: Wed, 10 May 2023 16:32:49 +0000 Subject: [PATCH 2/4] do not run vschema migrations if not initializing database Signed-off-by: Hormoz Kheradmand --- go/cmd/vtcombo/vschema_watcher.go | 5 ++-- go/cmd/vttestserver/vttestserver_test.go | 14 +++++++--- go/flags/endtoend/vttestserver.txt | 2 +- go/vt/vttest/local_cluster.go | 34 +++++++++++------------- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/go/cmd/vtcombo/vschema_watcher.go b/go/cmd/vtcombo/vschema_watcher.go index c061a9fd243..be0191fe6ac 100644 --- a/go/cmd/vtcombo/vschema_watcher.go +++ b/go/cmd/vtcombo/vschema_watcher.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "io/ioutil" "os" "path" @@ -35,7 +34,7 @@ func loadKeyspacesFromDir(dir string, keyspaces []*vttestpb.Keyspace, ts *topo.S for _, ks := range tpb.Keyspaces { ksFile := path.Join(dir, ks.Name+".json") if _, err := os.Stat(ksFile); err == nil { - jsonData, err := ioutil.ReadFile(ksFile) + jsonData, err := os.ReadFile(ksFile) if err != nil { log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err) } @@ -80,7 +79,7 @@ func persistNewSrvVSchema(srvVSchema *vschemapb.SrvVSchema) { continue } - err = ioutil.WriteFile(path.Join(*vschemaPersistenceDir, ksName+".json"), jsonBytes, 0644) + err = os.WriteFile(path.Join(*vschemaPersistenceDir, ksName+".json"), jsonBytes, 0644) if err != nil { log.Errorf("Error writing keyspace file: %v", err) } diff --git a/go/cmd/vttestserver/vttestserver_test.go b/go/cmd/vttestserver/vttestserver_test.go index 0665d5f9c46..9313182d9b8 100644 --- a/go/cmd/vttestserver/vttestserver_test.go +++ b/go/cmd/vttestserver/vttestserver_test.go @@ -81,8 +81,14 @@ func TestPersistentMode(t *testing.T) { cluster, err := startPersistentCluster(dir) assert.NoError(t, err) - // basic sanity checks similar to TestRunsVschemaMigrations + // Add a new "ad-hoc" vindex via vtgate once the cluster is up, to later make sure it is persisted across teardowns + err = addColumnVindex(cluster, "test_keyspace", "alter vschema on persistence_test add vindex my_vdx(id)") + assert.NoError(t, err) + + // Basic sanity checks similar to TestRunsVschemaMigrations + // See go/cmd/vttestserver/data/schema/app_customer/* and go/cmd/vttestserver/data/schema/test_keyspace/* assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"}) + assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "persistence_test", vindex: "my_vdx", vindexType: "hash", column: "id"}) assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"}) // insert some data to ensure persistence across teardowns @@ -111,8 +117,9 @@ func TestPersistentMode(t *testing.T) { defer cluster.TearDown() assert.NoError(t, err) - // rerun our sanity checks to make sure vschema migrations are run during every startup + // rerun our sanity checks to make sure vschema is persisted correctly assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"}) + assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "persistence_test", vindex: "my_vdx", vindexType: "hash", column: "id"}) assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"}) // ensure previous data was successfully persisted @@ -316,7 +323,8 @@ func startCluster(flags ...string) (vttest.LocalCluster, error) { keyspaceArg := "--keyspaces=" + strings.Join(clusterKeyspaces, ",") numShardsArg := "--num_shards=2,2" vschemaDDLAuthorizedUsers := "--vschema_ddl_authorized_users=%" - os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, tabletHostname, vschemaDDLAuthorizedUsers}...) + alsoLogToStderr := "--alsologtostderr" // better debugging + os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, tabletHostname, vschemaDDLAuthorizedUsers, alsoLogToStderr}...) os.Args = append(os.Args, flags...) return runCluster() } diff --git a/go/flags/endtoend/vttestserver.txt b/go/flags/endtoend/vttestserver.txt index eac9e817418..ec2c2187d47 100644 --- a/go/flags/endtoend/vttestserver.txt +++ b/go/flags/endtoend/vttestserver.txt @@ -87,7 +87,7 @@ Usage of vttestserver: --num_shards strings Comma separated shard count (one per keyspace) (default [2]) --onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s) --onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s) - --persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. vschema migrations are run every time the cluster starts, since persistence for the topology server has not been implemented yet + --persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. Changes to VSchema are persisted across cluster restarts using a simple watcher if the --data_dir argument is specified. --pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown. --planner-version string Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the new gen4 planner and falls back to the V3 planner if the gen4 fails. --pool_hostname_resolve_interval duration if set force an update to all hostnames and reconnect if changed, defaults to 0 (disabled) diff --git a/go/vt/vttest/local_cluster.go b/go/vt/vttest/local_cluster.go index 3a095eb8b2d..726726e2615 100644 --- a/go/vt/vttest/local_cluster.go +++ b/go/vt/vttest/local_cluster.go @@ -491,28 +491,26 @@ func (db *LocalCluster) loadSchema(shouldRunDatabaseMigrations bool) error { } } - glob, _ := filepath.Glob(path.Join(schemaDir, "*.sql")) - for _, filepath := range glob { - cmds, err := LoadSQLFile(filepath, schemaDir) - if err != nil { - return err - } - - // One single vschema migration per file - if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") { - if err = db.applyVschema(keyspace, cmds[0]); err != nil { + if shouldRunDatabaseMigrations { + glob, _ := filepath.Glob(path.Join(schemaDir, "*.sql")) + for _, filepath := range glob { + cmds, err := LoadSQLFile(filepath, schemaDir) + if err != nil { return err } - continue - } - if !shouldRunDatabaseMigrations { - continue - } + // One single vschema migration per file + if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") { + if err = db.applyVschema(keyspace, cmds[0]); err != nil { + return err + } + continue + } - for _, dbname := range db.shardNames(kpb) { - if err := db.Execute(cmds, dbname); err != nil { - return err + for _, dbname := range db.shardNames(kpb) { + if err := db.Execute(cmds, dbname); err != nil { + return err + } } } } From 602a8c121cfdd851a8a43b102a65a813b3f5ecda Mon Sep 17 00:00:00 2001 From: Hormoz Kheradmand Date: Wed, 24 May 2023 17:51:45 +0000 Subject: [PATCH 3/4] flags: use dashes instead of underscores Signed-off-by: Hormoz Kheradmand --- go/cmd/vtcombo/main.go | 2 +- go/vt/vttest/vtprocess.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/cmd/vtcombo/main.go b/go/cmd/vtcombo/main.go index 9ab94776bef..8557d5575e8 100644 --- a/go/cmd/vtcombo/main.go +++ b/go/cmd/vtcombo/main.go @@ -62,7 +62,7 @@ var ( externalTopoServer = flags.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+ "If true, vtcombo will use the flags defined in topo/server.go to open topo server") plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.") - vschemaPersistenceDir = flags.String("vschema_persistence_dir", "", "If set, per-keyspace vschema will be persisted in this directory "+ + vschemaPersistenceDir = flags.String("vschema-persistence-dir", "", "If set, per-keyspace vschema will be persisted in this directory "+ "and reloaded into the in-memory topology server across restarts. Bookkeeping is performed using a simple watcher goroutine. "+ "This is useful when running vtcombo as an application development container (e.g. vttestserver) where you want to keep the same "+ "vschema even if developer's machine reboots. This works in tandem with vttestserver's --persistent_mode flag. Needless to say, "+ diff --git a/go/vt/vttest/vtprocess.go b/go/vt/vttest/vtprocess.go index f829b85b133..a6c03176f1b 100644 --- a/go/vt/vttest/vtprocess.go +++ b/go/vt/vttest/vtprocess.go @@ -244,7 +244,7 @@ func VtcomboProcess(environment Environment, args *Config, mysql MySQLManager) ( vt.ExtraArgs = append(vt.ExtraArgs, []string{"--schema_dir", args.SchemaDir}...) } if args.PersistentMode && args.DataDir != "" { - vt.ExtraArgs = append(vt.ExtraArgs, []string{"--vschema_persistence_dir", path.Join(args.DataDir, "vschema_data")}...) + vt.ExtraArgs = append(vt.ExtraArgs, []string{"--vschema-persistence-dir", path.Join(args.DataDir, "vschema_data")}...) } if args.TransactionMode != "" { vt.ExtraArgs = append(vt.ExtraArgs, []string{"--transaction_mode", args.TransactionMode}...) From ead867687c9630304f4878b04dee95eb060dce6d Mon Sep 17 00:00:00 2001 From: Hormoz Kheradmand Date: Mon, 26 Jun 2023 18:05:48 +0000 Subject: [PATCH 4/4] add license header Signed-off-by: Hormoz Kheradmand --- go/cmd/vtcombo/vschema_watcher.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/go/cmd/vtcombo/vschema_watcher.go b/go/cmd/vtcombo/vschema_watcher.go index be0191fe6ac..9194f61e774 100644 --- a/go/cmd/vtcombo/vschema_watcher.go +++ b/go/cmd/vtcombo/vschema_watcher.go @@ -1,3 +1,19 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package main import (