diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 95995903a83..bf810a5a319 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -378,103 +378,9 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames // Create the keyspace if it doesn't already exist. _ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy) for _, shardName := range shardNames { - shard := &Shard{ - Name: shardName, - } - log.Infof("Starting shard: %v", shardName) - var mysqlctlProcessList []*exec.Cmd - for i := 0; i < totalTabletsRequired; i++ { - // instantiate vttablet object with reserved ports - tabletUID := cluster.GetAndReserveTabletUID() - tablet := &Vttablet{ - TabletUID: tabletUID, - Type: "replica", - HTTPPort: cluster.GetAndReservePort(), - GrpcPort: cluster.GetAndReservePort(), - MySQLPort: cluster.GetAndReservePort(), - Alias: fmt.Sprintf("%s-%010d", cluster.Cell, tabletUID), - } - if i == 0 { // Make the first one as primary - tablet.Type = "primary" - } else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed - tablet.Type = "rdonly" - } - // Start Mysqlctl process - log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort) - mysqlctlProcess, err := MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory, !cluster.ReusingVTDATAROOT) - if err != nil { - return err - } - switch tablet.Type { - case "primary": - mysqlctlProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX") - case "replica": - mysqlctlProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX") - } - tablet.MysqlctlProcess = *mysqlctlProcess - proc, err := tablet.MysqlctlProcess.StartProcess() - if err != nil { - log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err) - return err - } - mysqlctlProcessList = append(mysqlctlProcessList, proc) - - // start vttablet process - tablet.VttabletProcess = VttabletProcessInstance( - tablet.HTTPPort, - tablet.GrpcPort, - tablet.TabletUID, - cluster.Cell, - shardName, - keyspace.Name, - cluster.VtctldProcess.Port, - tablet.Type, - cluster.TopoProcess.Port, - cluster.Hostname, - cluster.TmpDirectory, - cluster.VtTabletExtraArgs, - cluster.DefaultCharset) - switch tablet.Type { - case "primary": - tablet.VttabletProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX") - case "replica": - tablet.VttabletProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX") - } - tablet.Alias = tablet.VttabletProcess.TabletPath - if cluster.ReusingVTDATAROOT { - tablet.VttabletProcess.ServingStatus = "SERVING" - } - shard.Vttablets = append(shard.Vttablets, tablet) - // Apply customizations - for _, customizer := range customizers { - if f, ok := customizer.(func(*VttabletProcess)); ok { - f(tablet.VttabletProcess) - } else { - return fmt.Errorf("type mismatch on customizer: %T", customizer) - } - } - } - - // wait till all mysqlctl is instantiated - for _, proc := range mysqlctlProcessList { - if err = proc.Wait(); err != nil { - log.Errorf("unable to start mysql process %v: %v", proc, err) - return err - } - } - for _, tablet := range shard.Vttablets { - log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort) - - if err = tablet.VttabletProcess.Setup(); err != nil { - log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err) - return - } - } - - // Make first tablet as primary - if err = cluster.VtctldClientProcess.InitializeShard(keyspace.Name, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil { - log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspace.Name, shardName, err) - return + shard, err := cluster.AddShard(keyspace.Name, shardName, totalTabletsRequired, rdonly, customizers) + if err != nil { + return err } keyspace.Shards = append(keyspace.Shards, *shard) } @@ -488,33 +394,135 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames } if !existingKeyspace { cluster.Keyspaces = append(cluster.Keyspaces, keyspace) - } - // Apply Schema SQL - if keyspace.SchemaSQL != "" { - if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil { - log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err) - return + // Apply Schema SQL + if keyspace.SchemaSQL != "" { + if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil { + log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err) + return + } + } + + // Apply VSchema + if keyspace.VSchema != "" { + if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil { + log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err) + return + } + } + + log.Infof("Done creating keyspace: %v ", keyspace.Name) + + err = cluster.StartVTOrc(keyspace.Name) + if err != nil { + log.Errorf("Error starting VTOrc - %v", err) + return err } } - // Apply VSchema - if keyspace.VSchema != "" { - if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil { - log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err) - return + return +} + +func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName string, totalTabletsRequired int, rdonly bool, customizers []any) (*Shard, error) { + shard := &Shard{ + Name: shardName, + } + log.Infof("Starting shard: %v", shardName) + var mysqlctlProcessList []*exec.Cmd + for i := 0; i < totalTabletsRequired; i++ { + // instantiate vttablet object with reserved ports + tabletUID := cluster.GetAndReserveTabletUID() + tablet := &Vttablet{ + TabletUID: tabletUID, + Type: "replica", + HTTPPort: cluster.GetAndReservePort(), + GrpcPort: cluster.GetAndReservePort(), + MySQLPort: cluster.GetAndReservePort(), + Alias: fmt.Sprintf("%s-%010d", cluster.Cell, tabletUID), + } + if i == 0 { // Make the first one as primary + tablet.Type = "primary" + } else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed + tablet.Type = "rdonly" + } + // Start Mysqlctl process + log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort) + mysqlctlProcess, err := MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory, !cluster.ReusingVTDATAROOT) + if err != nil { + return nil, err + } + switch tablet.Type { + case "primary": + mysqlctlProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX") + case "replica": + mysqlctlProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX") + } + tablet.MysqlctlProcess = *mysqlctlProcess + proc, err := tablet.MysqlctlProcess.StartProcess() + if err != nil { + log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err) + return nil, err + } + mysqlctlProcessList = append(mysqlctlProcessList, proc) + + // start vttablet process + tablet.VttabletProcess = VttabletProcessInstance( + tablet.HTTPPort, + tablet.GrpcPort, + tablet.TabletUID, + cluster.Cell, + shardName, + keyspaceName, + cluster.VtctldProcess.Port, + tablet.Type, + cluster.TopoProcess.Port, + cluster.Hostname, + cluster.TmpDirectory, + cluster.VtTabletExtraArgs, + cluster.DefaultCharset) + switch tablet.Type { + case "primary": + tablet.VttabletProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX") + case "replica": + tablet.VttabletProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX") + } + tablet.Alias = tablet.VttabletProcess.TabletPath + if cluster.ReusingVTDATAROOT { + tablet.VttabletProcess.ServingStatus = "SERVING" + } + shard.Vttablets = append(shard.Vttablets, tablet) + // Apply customizations + for _, customizer := range customizers { + if f, ok := customizer.(func(*VttabletProcess)); ok { + f(tablet.VttabletProcess) + } else { + return nil, fmt.Errorf("type mismatch on customizer: %T", customizer) + } } } - log.Infof("Done creating keyspace: %v ", keyspace.Name) + // wait till all mysqlctl is instantiated + for _, proc := range mysqlctlProcessList { + if err := proc.Wait(); err != nil { + log.Errorf("unable to start mysql process %v: %v", proc, err) + return nil, err + } + } + for _, tablet := range shard.Vttablets { + log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort) - err = cluster.StartVTOrc(keyspace.Name) - if err != nil { - log.Errorf("Error starting VTOrc - %v", err) - return err + if err := tablet.VttabletProcess.Setup(); err != nil { + log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err) + return nil, err + } } - return + // Make first tablet as primary + if err := cluster.VtctldClientProcess.InitializeShard(keyspaceName, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil { + log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspaceName, shardName, err) + return nil, err + } + return shard, nil } // StartUnshardedKeyspaceLegacy starts unshared keyspace with shard name as "0" diff --git a/go/test/endtoend/cluster/reshard.go b/go/test/endtoend/cluster/reshard.go new file mode 100644 index 00000000000..af36d4543c8 --- /dev/null +++ b/go/test/endtoend/cluster/reshard.go @@ -0,0 +1,102 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "slices" + "strings" + "testing" + "time" +) + +// ReshardWorkflow is used to store the information needed to run +// Reshard commands. +type ReshardWorkflow struct { + t *testing.T + clusterInstance *LocalProcessCluster + workflowName string + targetKs string + sourceShards string + targetShards string +} + +// NewReshard creates a new ReshardWorkflow. +func NewReshard(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, targetShards, srcShards string) *ReshardWorkflow { + return &ReshardWorkflow{ + t: t, + clusterInstance: clusterInstance, + workflowName: workflowName, + targetKs: targetKs, + sourceShards: srcShards, + targetShards: targetShards, + } +} + +func (rw *ReshardWorkflow) Create() (string, error) { + args := []string{"Create"} + if rw.sourceShards != "" { + args = append(args, "--source-shards="+rw.sourceShards) + } + if rw.targetShards != "" { + args = append(args, "--target-shards="+rw.targetShards) + } + + return rw.exec(args...) +} + +func (rw *ReshardWorkflow) exec(args ...string) (string, error) { + args2 := []string{"Reshard", "--workflow=" + rw.workflowName, "--target-keyspace=" + rw.targetKs} + args2 = append(args2, args...) + return rw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...) +} + +func (rw *ReshardWorkflow) SwitchReadsAndWrites() (string, error) { + return rw.exec("SwitchTraffic") +} + +func (rw *ReshardWorkflow) ReverseReadsAndWrites() (string, error) { + return rw.exec("ReverseTraffic") +} + +func (rw *ReshardWorkflow) Cancel() (string, error) { + return rw.exec("Cancel") +} + +func (rw *ReshardWorkflow) Complete() (string, error) { + return rw.exec("Complete") +} + +func (rw *ReshardWorkflow) Show() (string, error) { + return rw.exec("Show") +} + +func (rw *ReshardWorkflow) WaitForVreplCatchup(timeToWait time.Duration) { + targetShards := strings.Split(rw.targetShards, ",") + for _, ks := range rw.clusterInstance.Keyspaces { + if ks.Name != rw.targetKs { + continue + } + for _, shard := range ks.Shards { + if !slices.Contains(targetShards, shard.Name) { + continue + } + vttablet := shard.PrimaryTablet().VttabletProcess + vttablet.WaitForVReplicationToCatchup(rw.t, rw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", timeToWait) + } + } +} diff --git a/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go index 5bbb484ec1e..c908a99e631 100644 --- a/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go @@ -105,12 +105,12 @@ func TestTwoPCFuzzTest(t *testing.T) { timeForTesting: 5 * time.Second, }, { - name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL, MoveTables disruptions", + name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL, MoveTables, Reshard disruptions", threads: 4, updateSets: 4, timeForTesting: 5 * time.Second, - clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer, moveTablesFuzzer}, - disruptionProbability: []int{5, 5, 5, 5, 5, 5}, + clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer, moveTablesFuzzer, reshardFuzzer}, + disruptionProbability: []int{5, 5, 5, 5, 5, 5, 5}, }, } @@ -486,6 +486,23 @@ func moveTablesFuzzer(t *testing.T) { assert.NoError(t, err, output) } +// reshardFuzzer runs a Reshard workflow. +func reshardFuzzer(t *testing.T) { + var srcShards, targetShards string + shardCount := len(clusterInstance.Keyspaces[0].Shards) + if shardCount == 2 { + srcShards = "40-" + targetShards = "40-80,80-" + } else { + srcShards = "40-80,80-" + targetShards = "40-" + } + log.Errorf("Reshard from - \"%v\" to \"%v\"", srcShards, targetShards) + twopcutil.AddShards(t, clusterInstance, keyspaceName, strings.Split(targetShards, ",")) + err := twopcutil.RunReshard(t, clusterInstance, "TestTwoPCFuzzTest", keyspaceName, srcShards, targetShards) + require.NoError(t, err) +} + func mysqlRestarts(t *testing.T) { shards := clusterInstance.Keyspaces[0].Shards shard := shards[rand.Intn(len(shards))] diff --git a/go/test/endtoend/transaction/twopc/fuzz/main_test.go b/go/test/endtoend/transaction/twopc/fuzz/main_test.go index 86e524e648f..1b05615d51a 100644 --- a/go/test/endtoend/transaction/twopc/fuzz/main_test.go +++ b/go/test/endtoend/transaction/twopc/fuzz/main_test.go @@ -67,6 +67,7 @@ func TestMain(m *testing.M) { clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--transaction_mode", "TWOPC", "--grpc_use_effective_callerid", + "--tablet_refresh_interval", "2s", ) clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--twopc_enable", diff --git a/go/test/endtoend/transaction/twopc/stress/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go index 05525171d2d..782ffe1ec12 100644 --- a/go/test/endtoend/transaction/twopc/stress/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -67,6 +67,7 @@ func TestMain(m *testing.M) { clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--transaction_mode", "TWOPC", "--grpc_use_effective_callerid", + "--tablet_refresh_interval", "2s", ) clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--twopc_enable", diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 819e7ea48d3..acb8146975b 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -47,6 +47,7 @@ func TestDisruptions(t *testing.T) { testcases := []struct { disruptionName string commitDelayTime string + setupFunc func(t *testing.T) disruption func(t *testing.T) error resetFunc func(t *testing.T) }{ @@ -57,6 +58,13 @@ func TestDisruptions(t *testing.T) { return nil }, }, + { + disruptionName: "Resharding", + commitDelayTime: "20", + setupFunc: createShard, + disruption: mergeShards, + resetFunc: splitShardsBack, + }, { disruptionName: "PlannedReparentShard", commitDelayTime: "5", @@ -98,6 +106,9 @@ func TestDisruptions(t *testing.T) { t.Run(fmt.Sprintf("%s-%ss delay", tt.disruptionName, tt.commitDelayTime), func(t *testing.T) { // Reparent all the shards to first tablet being the primary. reparentToFirstTablet(t) + if tt.setupFunc != nil { + tt.setupFunc(t) + } // cleanup all the old data. conn, closer := start(t) defer closer() @@ -156,6 +167,23 @@ func TestDisruptions(t *testing.T) { } } +func mergeShards(t *testing.T) error { + return twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-80,80-", "40-") +} + +func splitShardsBack(t *testing.T) { + t.Helper() + twopcutil.AddShards(t, clusterInstance, keyspaceName, []string{"40-80", "80-"}) + err := twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-", "40-80,80-") + require.NoError(t, err) +} + +// createShard creates a new shard in the keyspace that we'll use for Resharding. +func createShard(t *testing.T) { + t.Helper() + twopcutil.AddShards(t, clusterInstance, keyspaceName, []string{"40-"}) +} + // threadToWrite is a helper function to write to the database in a loop. func threadToWrite(t *testing.T, ctx context.Context, id int) { for { diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index ecc91245a67..14f3214ae00 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -21,6 +21,8 @@ import ( "fmt" "os" "path" + "slices" + "strings" "testing" "time" @@ -170,3 +172,52 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, ks string, } } } + +func RunReshard(t *testing.T, clusterInstance *cluster.LocalProcessCluster, workflowName, keyspaceName string, sourceShards, targetShards string) error { + rw := cluster.NewReshard(t, clusterInstance, workflowName, keyspaceName, targetShards, sourceShards) + // Initiate Reshard. + output, err := rw.Create() + require.NoError(t, err, output) + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + rw.WaitForVreplCatchup(10 * time.Second) + // SwitchTraffic + output, err = rw.SwitchReadsAndWrites() + require.NoError(t, err, output) + output, err = rw.Complete() + require.NoError(t, err, output) + + // When Reshard completes, it has already deleted the source shards from the topo server. + // We just need to shutdown the vttablets, and remove them from the cluster. + removeShards(t, clusterInstance, keyspaceName, sourceShards) + return nil +} + +func removeShards(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName string, shards string) { + sourceShardsList := strings.Split(shards, ",") + var remainingShards []cluster.Shard + for idx, keyspace := range clusterInstance.Keyspaces { + if keyspace.Name != keyspaceName { + continue + } + for _, shard := range keyspace.Shards { + if slices.Contains(sourceShardsList, shard.Name) { + for _, vttablet := range shard.Vttablets { + err := vttablet.VttabletProcess.TearDown() + require.NoError(t, err) + } + continue + } + remainingShards = append(remainingShards, shard) + } + clusterInstance.Keyspaces[idx].Shards = remainingShards + } +} + +func AddShards(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName string, shardNames []string) { + for _, shardName := range shardNames { + t.Helper() + shard, err := clusterInstance.AddShard(keyspaceName, shardName, 3, false, nil) + require.NoError(t, err) + clusterInstance.Keyspaces[0].Shards = append(clusterInstance.Keyspaces[0].Shards, *shard) + } +} diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 1bd05493a59..545eb1471bd 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver" replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -354,7 +355,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string // If semi-sync is enabled, we need to set two pc to be allowed. // Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness.. - tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet) + tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet) // Setting super_read_only `OFF` so that we can run the DDL commands if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil { @@ -600,7 +601,7 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e // If semi-sync is enabled, we need to set two pc to be allowed. // Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness.. - tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet) + tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet) // If using semi-sync, we need to enable source-side. if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { @@ -925,7 +926,7 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str // If semi-sync is enabled, we need to set two pc to be allowed. // Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness.. - tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet) + tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet) pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv()) if err != nil { diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index 5ab49ebfbcf..fbba2b6c713 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -549,13 +549,17 @@ func (tm *TabletManager) createKeyspaceShard(ctx context.Context) (*topo.ShardIn return nil, err } - tm.tmState.RefreshFromTopoInfo(ctx, shardInfo, nil) + if err := tm.tmState.RefreshFromTopoInfo(ctx, shardInfo, nil); err != nil { + return nil, err + } // Rebuild keyspace if this the first tablet in this keyspace/cell srvKeyspace, err := tm.TopoServer.GetSrvKeyspace(ctx, tm.tabletAlias.Cell, tablet.Keyspace) switch { case err == nil: - tm.tmState.RefreshFromTopoInfo(ctx, nil, srvKeyspace) + if err := tm.tmState.RefreshFromTopoInfo(ctx, nil, srvKeyspace); err != nil { + return nil, err + } case topo.IsErrType(err, topo.NoNode): var rebuildKsCtx context.Context rebuildKsCtx, tm._rebuildKeyspaceCancel = context.WithCancel(tm.BatchCtx) @@ -607,7 +611,10 @@ func (tm *TabletManager) rebuildKeyspace(ctx context.Context, done chan<- struct defer func() { log.Infof("Keyspace rebuilt: %v", keyspace) if ctx.Err() == nil { - tm.tmState.RefreshFromTopoInfo(tm.BatchCtx, nil, srvKeyspace) + err := tm.tmState.RefreshFromTopoInfo(tm.BatchCtx, nil, srvKeyspace) + if err != nil { + log.Errorf("Error refreshing topo information - %v", err) + } } close(done) }() diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index cf56c515cfc..75917cb2065 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -135,11 +136,10 @@ func (ts *tmState) RefreshFromTopo(ctx context.Context) error { if err != nil { return err } - ts.RefreshFromTopoInfo(ctx, shardInfo, srvKeyspace) - return nil + return ts.RefreshFromTopoInfo(ctx, shardInfo, srvKeyspace) } -func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.ShardInfo, srvKeyspace *topodatapb.SrvKeyspace) { +func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.ShardInfo, srvKeyspace *topodatapb.SrvKeyspace) error { ts.mu.Lock() defer ts.mu.Unlock() @@ -157,6 +157,7 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar if srvKeyspace != nil { ts.isShardServing = make(map[topodatapb.TabletType]bool) ts.tabletControls = make(map[topodatapb.TabletType]bool) + ts.tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_TabletControls, true) for _, partition := range srvKeyspace.GetPartitions() { @@ -169,7 +170,10 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar for _, tabletControl := range partition.GetShardTabletControls() { if key.KeyRangeEqual(tabletControl.GetKeyRange(), ts.KeyRange()) { if tabletControl.QueryServiceDisabled { - ts.tabletControls[partition.GetServedType()] = true + err := ts.prepareForDisableQueryService(ctx, partition.GetServedType()) + if err != nil { + return err + } } break } @@ -177,7 +181,20 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar } } - _ = ts.updateLocked(ctx) + return ts.updateLocked(ctx) +} + +// prepareForDisableQueryService prepares the tablet for disabling query service. +func (ts *tmState) prepareForDisableQueryService(ctx context.Context, servType topodatapb.TabletType) error { + if servType == topodatapb.TabletType_PRIMARY { + ts.tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_TabletControls, false) + err := ts.tm.QueryServiceControl.WaitForPreparedTwoPCTransactions(ctx) + if err != nil { + return err + } + } + ts.tabletControls[servType] = true + return nil } func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.TabletType, action DBAction) error { diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index c54ab4e20dd..abb7b390e0d 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -97,8 +97,9 @@ type Controller interface { // RedoPreparedTransactions recreates the transactions with stored prepared transaction log. RedoPreparedTransactions() - // SetTwoPCAllowed sets whether TwoPC is allowed or not. - SetTwoPCAllowed(bool) + // SetTwoPCAllowed sets whether TwoPC is allowed or not. It also takes the reason of why it is being set. + // The reason should be an enum value defined in the tabletserver. + SetTwoPCAllowed(int, bool) // UnresolvedTransactions returns all unresolved transactions list UnresolvedTransactions(ctx context.Context, target *querypb.Target, abandonAgeSeconds int64) ([]*querypb.TransactionMetadata, error) @@ -111,6 +112,9 @@ type Controller interface { // RollbackPrepared rolls back the prepared transaction and removes the transaction log. RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) error + + // WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved. + WaitForPreparedTwoPCTransactions(ctx context.Context) error } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index b14e4d65d16..1aaf75edc9e 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -57,7 +57,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - if !dte.te.twopcAllowed { + if !dte.te.IsTwoPCAllowed() { return vterrors.VT10002("two-pc is enabled, but semi-sync is not") } defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now()) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 1a0c6a79e67..624042ca9cd 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -186,7 +186,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler) - tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsEmpty) + tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsEmptyForTable) tsv.sm = &stateManager{ statelessql: tsv.statelessql, @@ -700,6 +700,26 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T ) } +// WaitForPreparedTwoPCTransactions waits for all the prepared transactions to complete. +func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) error { + if tsv.te.preparedPool.IsEmpty() { + return nil + } + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + // Return an error if we run out of time. + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Prepared transactions have not been resolved yet") + case <-ticker.C: + if tsv.te.preparedPool.IsEmpty() { + return nil + } + } + } +} + // CreateTransaction creates the metadata for a 2PC transaction. func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) { return tsv.execRequest( @@ -1699,9 +1719,10 @@ func (tsv *TabletServer) RedoPreparedTransactions() { tsv.te.RedoPreparedTransactions() } -// SetTwoPCAllowed sets whether TwoPC is allowed or not. -func (tsv *TabletServer) SetTwoPCAllowed(allowed bool) { - tsv.te.twopcAllowed = allowed +// SetTwoPCAllowed sets whether TwoPC is allowed or not. It also takes the reason of why it is being set. +// The reason should be an enum value defined in the tabletserver. +func (tsv *TabletServer) SetTwoPCAllowed(reason int, allowed bool) { + tsv.te.twopcAllowed[reason] = allowed } // HandlePanic is part of the queryservice.QueryService interface diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 549851790ed..42bec29dfa3 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -77,9 +77,11 @@ type TxEngine struct { // twopcEnabled is the flag value of whether the user has enabled twopc or not. twopcEnabled bool - // twopcAllowed is wether it is safe to allow two pc transactions or not. - // If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls. - twopcAllowed bool + // twopcAllowed is whether it is safe to allow two pc transactions or not. + // There are multiple reasons to disallow TwoPC: + // 1. If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls. + // 2. TabletControls have been set in the tablet record, and Query service is going to be disabled. + twopcAllowed []bool shutdownGracePeriod time.Duration coordinatorAddress string abandonAge time.Duration @@ -94,6 +96,14 @@ type TxEngine struct { dxNotify func() } +// TwoPC can be disallowed for various reasons. These are the reasons we keep track off +// when deciding if new prepared transactions should be allowed or not. +const ( + TwoPCAllowed_SemiSync = iota + TwoPCAllowed_TabletControls + TwoPCAllowed_Len +) + // NewTxEngine creates a new TxEngine. func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine { config := env.Config() @@ -105,8 +115,13 @@ func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine { limiter := txlimiter.New(env) te.txPool = NewTxPool(env, limiter) // We initially allow twoPC (handles vttablet restarts). - // We will disallow them, when a new tablet is promoted if semi-sync is turned off. - te.twopcAllowed = true + // We will disallow them for a few reasons - + // 1. when a new tablet is promoted if semi-sync is turned off. + // 2. TabletControls have been set by a Resharding workflow. + te.twopcAllowed = make([]bool, TwoPCAllowed_Len) + for idx := range te.twopcAllowed { + te.twopcAllowed[idx] = true + } te.twopcEnabled = config.TwoPCEnable if te.twopcEnabled { if config.TwoPCAbandonAge <= 0 { @@ -708,3 +723,13 @@ func (te *TxEngine) beginNewDbaConnection(ctx context.Context) (*StatefulConnect _, _, err = te.txPool.begin(ctx, nil, false, sc, nil) return sc, err } + +// IsTwoPCAllowed checks if TwoPC is allowed. +func (te *TxEngine) IsTwoPCAllowed() bool { + for _, allowed := range te.twopcAllowed { + if !allowed { + return false + } + } + return true +} diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index 43916dab3c2..a9958525587 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -678,3 +678,46 @@ func TestCheckReceivedError(t *testing.T) { }) } } + +func TestIsTwoPCAllowed(t *testing.T) { + testcases := []struct { + semiSyncAllowed bool + tabletControllsAllowed bool + wantAllowed bool + }{ + { + semiSyncAllowed: true, + tabletControllsAllowed: true, + wantAllowed: true, + }, + { + semiSyncAllowed: false, + tabletControllsAllowed: true, + wantAllowed: false, + }, + { + semiSyncAllowed: true, + tabletControllsAllowed: false, + wantAllowed: false, + }, + { + semiSyncAllowed: false, + tabletControllsAllowed: false, + wantAllowed: false, + }, + } + + for _, tt := range testcases { + t.Run(fmt.Sprintf("SemiSyncAllowed - %v, TabletControlsAllowed - %v", tt.semiSyncAllowed, tt.tabletControllsAllowed), func(t *testing.T) { + te := &TxEngine{ + twopcAllowed: []bool{true, true}, + } + tsv := TabletServer{ + te: te, + } + tsv.SetTwoPCAllowed(TwoPCAllowed_SemiSync, tt.semiSyncAllowed) + tsv.SetTwoPCAllowed(TwoPCAllowed_TabletControls, tt.tabletControllsAllowed) + require.Equal(t, tt.wantAllowed, te.IsTwoPCAllowed()) + }) + } +} diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go index 468c160c002..8e766062a92 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go @@ -173,7 +173,8 @@ func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection { return conns } -func (pp *TxPreparedPool) IsEmpty(tableName string) bool { +// IsEmptyForTable returns true if no prepared transactions are found for the table. +func (pp *TxPreparedPool) IsEmptyForTable(tableName string) bool { pp.mu.Lock() defer pp.mu.Unlock() if !pp.twoPCEnabled { @@ -194,3 +195,17 @@ func (pp *TxPreparedPool) IsEmpty(tableName string) bool { } return true } + +// IsEmpty returns true if the pool is empty. +func (pp *TxPreparedPool) IsEmpty() bool { + pp.mu.Lock() + defer pp.mu.Unlock() + if !pp.twoPCEnabled { + return true + } + // If the pool is shutdown, we do not know the correct state of prepared transactions. + if !pp.open { + return false + } + return len(pp.conns) == 0 +} diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go index cf6d2b61093..e8c889990f0 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go @@ -21,6 +21,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" ) func TestEmptyPrep(t *testing.T) { @@ -116,3 +118,135 @@ func createAndOpenPreparedPool(capacity int) *TxPreparedPool { pp.Open() return pp } + +func TestTxPreparedPoolIsEmptyForTable(t *testing.T) { + tests := []struct { + name string + setupFunc func(pp *TxPreparedPool) + wantIsEmpty bool + }{ + { + name: "Closed prepared pool", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.open = false + }, + wantIsEmpty: false, + }, + { + name: "Two PC Disabled", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.twoPCEnabled = false + }, + wantIsEmpty: true, + }, + { + name: "No prepared transactions", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.open = true + }, + wantIsEmpty: true, + }, + { + name: "Prepared transactions for table t1", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + pp.open = true + pp.mu.Unlock() + pp.Put(&StatefulConnection{ + txProps: &tx.Properties{ + Queries: []tx.Query{ + { + Tables: []string{"t1", "t2"}, + }, + }, + }, + }, "dtid1") + }, + wantIsEmpty: false, + }, + { + name: "Prepared transactions for other tables", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + pp.open = true + pp.mu.Unlock() + pp.Put(&StatefulConnection{ + txProps: &tx.Properties{ + Queries: []tx.Query{ + { + Tables: []string{"t3", "t2"}, + }, + }, + }, + }, "dtid1") + }, + wantIsEmpty: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pp := NewTxPreparedPool(1, true) + tt.setupFunc(pp) + assert.Equalf(t, tt.wantIsEmpty, pp.IsEmptyForTable("t1"), "IsEmptyForTable()") + }) + } +} + +func TestTxPreparedPoolIsEmpty(t *testing.T) { + tests := []struct { + name string + setupFunc func(pp *TxPreparedPool) + wantIsEmpty bool + }{ + { + name: "Closed prepared pool", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.open = false + }, + wantIsEmpty: false, + }, + { + name: "Two PC Disabled", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.twoPCEnabled = false + }, + wantIsEmpty: true, + }, + { + name: "No prepared transactions", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.open = true + }, + wantIsEmpty: true, + }, + { + name: "Prepared transactions exist", + setupFunc: func(pp *TxPreparedPool) { + pp.mu.Lock() + pp.open = true + pp.mu.Unlock() + pp.Put(&StatefulConnection{}, "dtid1") + }, + wantIsEmpty: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pp := NewTxPreparedPool(1, true) + tt.setupFunc(pp) + assert.Equalf(t, tt.wantIsEmpty, pp.IsEmpty(), "IsEmpty()") + }) + } +} diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index bb0ce05ed19..9e97c033776 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -232,8 +232,9 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott // RedoPreparedTransactions is part of the tabletserver.Controller interface func (tqsc *Controller) RedoPreparedTransactions() {} -// SetTwoPCAllowed sets whether TwoPC is allowed or not. -func (tqsc *Controller) SetTwoPCAllowed(bool) { +// SetTwoPCAllowed sets whether TwoPC is allowed or not. It also takes the reason of why it is being set. +// The reason should be an enum value defined in the tabletserver. +func (tqsc *Controller) SetTwoPCAllowed(int, bool) { } // UnresolvedTransactions is part of the tabletserver.Controller interface @@ -260,6 +261,12 @@ func (tqsc *Controller) RollbackPrepared(context.Context, *querypb.Target, strin return nil } +// WaitForPreparedTwoPCTransactions is part of the tabletserver.Controller interface +func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error { + tqsc.MethodCalled["WaitForPreparedTwoPCTransactions"] = true + return nil +} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock() diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 4f508766330..92996c9d931 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -325,6 +325,18 @@ func TestPartialMoveTables(t *testing.T) { tme := newTestTablePartialMigrater(ctx, t, shards, shards[0:1], "select * %s") defer tme.stopTablets(t) + // Add the schema for the primary tablets, so that we don't fail while applying the denied table rules. + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + }, { + Name: "t2", + }}, + } + for _, primary := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + primary.FakeMysqlDaemon.Schema = schm + } + // Save some unrelated shard routing rules to be sure that // they don't interfere in any way. srr, err := tme.ts.GetShardRoutingRules(ctx) @@ -400,6 +412,17 @@ func TestPartialMoveTablesShardSubset(t *testing.T) { } tme := newTestTablePartialMigrater(ctx, t, shards, shardsToMove, "select * %s") defer tme.stopTablets(t) + // Add the schema for the primary tablets, so that we don't fail while applying the denied table rules. + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + }, { + Name: "t2", + }}, + } + for _, primary := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + primary.FakeMysqlDaemon.Schema = schm + } // Save some unrelated shard routing rules to be sure that // they don't interfere in any way.