diff --git a/go/test/endtoend/backup/transform/backup_transform_utils.go b/go/test/endtoend/backup/transform/backup_transform_utils.go index a07a1c84fdd..cfd20d88e8a 100644 --- a/go/test/endtoend/backup/transform/backup_transform_utils.go +++ b/go/test/endtoend/backup/transform/backup_transform_utils.go @@ -250,7 +250,7 @@ func TestBackupTransformImpl(t *testing.T) { replica2.VttabletProcess.ServingStatus = "" err = replica2.VttabletProcess.Setup() require.Nil(t, err) - err = replica2.VttabletProcess.WaitForTabletTypesForTimeout([]string{"SERVING"}, 25*time.Second) + err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) require.Nil(t, err) defer replica2.VttabletProcess.TearDown() diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 896e6e4038d..49ca0efe6df 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -295,7 +295,7 @@ func masterBackup(t *testing.T) { require.Nil(t, err) restoreWaitForBackup(t, "replica") - err = replica2.VttabletProcess.WaitForTabletTypesForTimeout([]string{"SERVING"}, 25*time.Second) + err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) require.Nil(t, err) cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) @@ -327,7 +327,7 @@ func masterReplicaSameBackup(t *testing.T) { // now bring up the other replica, letting it restore from backup. restoreWaitForBackup(t, "replica") - err = replica2.VttabletProcess.WaitForTabletTypesForTimeout([]string{"SERVING"}, 25*time.Second) + err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) require.Nil(t, err) // check the new replica has the data @@ -563,7 +563,7 @@ func vtctlBackup(t *testing.T, tabletType string) { _, err = master.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) require.Nil(t, err) - err = replica2.VttabletProcess.WaitForTabletTypesForTimeout([]string{"SERVING"}, 25*time.Second) + err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) require.Nil(t, err) cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) @@ -626,7 +626,7 @@ func verifyRestoreTablet(t *testing.T, tablet *cluster.Vttablet, status string) err := tablet.VttabletProcess.Setup() require.Nil(t, err) if status != "" { - err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{status}, 25*time.Second) + err = tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{status}, 25*time.Second) require.Nil(t, err) } diff --git a/go/test/endtoend/cluster/vtctlclient_process.go b/go/test/endtoend/cluster/vtctlclient_process.go index beaa704d5ef..4bd73b6fe5f 100644 --- a/go/test/endtoend/cluster/vtctlclient_process.go +++ b/go/test/endtoend/cluster/vtctlclient_process.go @@ -156,8 +156,6 @@ func (vtctlclient *VtctlClientProcess) ExecuteCommand(args ...string) (err error if output != "" { if err != nil { log.Errorf("Output:\n%v", output) - } else { - log.Infof("Output:\n%v", output) } } return err diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index 6bc00e1dfad..895ff94219f 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -18,6 +18,7 @@ limitations under the License. package cluster import ( + "fmt" "os" "os/exec" "path" @@ -52,6 +53,7 @@ func (orc *VtorcProcess) Setup() (err error) { "-topo_global_server_address", orc.TopoGlobalAddress, "-topo_global_root", orc.TopoGlobalRoot, "-config", orc.Config, + "-orc_web_dir", path.Join(os.Getenv("VTROOT"), "web", "orchestrator"), ) if *isCoverage { orc.proc.Args = append(orc.proc.Args, "-test.coverprofile="+getCoveragePath("orc.out")) @@ -60,7 +62,7 @@ func (orc *VtorcProcess) Setup() (err error) { orc.proc.Args = append(orc.proc.Args, orc.ExtraArgs...) orc.proc.Args = append(orc.proc.Args, "-alsologtostderr", "http") - errFile, _ := os.Create(path.Join(orc.LogDir, "orc-stderr.txt")) + errFile, _ := os.Create(path.Join(orc.LogDir, fmt.Sprintf("orc-stderr-%d.txt", time.Now().UnixNano()))) orc.proc.Stderr = errFile orc.proc.Env = append(orc.proc.Env, os.Environ()...) @@ -95,7 +97,7 @@ func (orc *VtorcProcess) TearDown() error { orc.proc = nil return nil - case <-time.After(10 * time.Second): + case <-time.After(30 * time.Second): _ = orc.proc.Process.Kill() orc.proc = nil return <-orc.exit diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index 967aeadfcb9..5b5d18a877b 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -140,7 +140,7 @@ func (vttablet *VttabletProcess) Setup() (err error) { }() if vttablet.ServingStatus != "" { - if err = vttablet.WaitForTabletType(vttablet.ServingStatus); err != nil { + if err = vttablet.WaitForTabletStatus(vttablet.ServingStatus); err != nil { errFileContent, _ := ioutil.ReadFile(fname) if errFileContent != nil { log.Infof("vttablet error:\n%s\n", string(errFileContent)) @@ -208,23 +208,37 @@ func (vttablet *VttabletProcess) GetTabletStatus() string { return "" } -// WaitForTabletType waits for 10 second till expected type reached -func (vttablet *VttabletProcess) WaitForTabletType(expectedType string) error { - return vttablet.WaitForTabletTypesForTimeout([]string{expectedType}, 10*time.Second) +// GetTabletType returns the tablet type as seen in /debug/vars TabletType +func (vttablet *VttabletProcess) GetTabletType() string { + resultMap := vttablet.GetVars() + if resultMap != nil { + return reflect.ValueOf(resultMap["TabletType"]).String() + } + return "" +} + +// WaitForTabletStatus waits for 10 second till expected status is reached +func (vttablet *VttabletProcess) WaitForTabletStatus(expectedStatus string) error { + return vttablet.WaitForTabletStatusesForTimeout([]string{expectedStatus}, 10*time.Second) } -// WaitForTabletTypes waits for 10 second till expected type reached +// WaitForTabletStatuses waits for 10 second till one of expected statuses is reached +func (vttablet *VttabletProcess) WaitForTabletStatuses(expectedStatuses []string) error { + return vttablet.WaitForTabletStatusesForTimeout(expectedStatuses, 10*time.Second) +} + +// WaitForTabletTypes waits for 10 second till one of expected statuses is reached func (vttablet *VttabletProcess) WaitForTabletTypes(expectedTypes []string) error { return vttablet.WaitForTabletTypesForTimeout(expectedTypes, 10*time.Second) } -// WaitForTabletTypesForTimeout waits till the tablet reaches to any of the provided status -func (vttablet *VttabletProcess) WaitForTabletTypesForTimeout(expectedTypes []string, timeout time.Duration) error { - timeToWait := time.Now().Add(timeout) +// WaitForTabletStatusesForTimeout waits till the tablet reaches to any of the provided statuses +func (vttablet *VttabletProcess) WaitForTabletStatusesForTimeout(expectedStatuses []string, timeout time.Duration) error { + waitUntil := time.Now().Add(timeout) var status string - for time.Now().Before(timeToWait) { + for time.Now().Before(waitUntil) { status = vttablet.GetTabletStatus() - if contains(expectedTypes, status) { + if contains(expectedStatuses, status) { return nil } select { @@ -235,7 +249,27 @@ func (vttablet *VttabletProcess) WaitForTabletTypesForTimeout(expectedTypes []st } } return fmt.Errorf("Vttablet %s, current status = %s, expected status [%s] not reached, details: %v", - vttablet.TabletPath, status, strings.Join(expectedTypes, ","), vttablet.GetStatusDetails()) + vttablet.TabletPath, status, strings.Join(expectedStatuses, ","), vttablet.GetStatusDetails()) +} + +// WaitForTabletTypesForTimeout waits till the tablet reaches to any of the provided types +func (vttablet *VttabletProcess) WaitForTabletTypesForTimeout(expectedTypes []string, timeout time.Duration) error { + waitUntil := time.Now().Add(timeout) + var tabletType string + for time.Now().Before(waitUntil) { + tabletType = vttablet.GetTabletType() + if contains(expectedTypes, tabletType) { + return nil + } + select { + case err := <-vttablet.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", vttablet.Name, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + return fmt.Errorf("Vttablet %s, current type = %s, expected type [%s] not reached, status details: %v", + vttablet.TabletPath, tabletType, strings.Join(expectedTypes, ","), vttablet.GetStatusDetails()) } func contains(arr []string, str string) bool { @@ -472,6 +506,11 @@ func (vttablet *VttabletProcess) BulkLoad(t testing.TB, db, table string, bulkIn bufFinish.Sub(bufStart), end.Sub(bufFinish), end.Sub(bufStart)) } +// IsShutdown returns whether a vttablet is shutdown or not +func (vttablet *VttabletProcess) IsShutdown() bool { + return vttablet.proc == nil +} + // VttabletProcessInstance returns a VttabletProcess handle for vttablet process // configured with the given Config. // The process must be manually started by calling setup() diff --git a/go/test/endtoend/recovery/pitr/shardedpitr_test.go b/go/test/endtoend/recovery/pitr/shardedpitr_test.go index c7c89630651..4e9cceaa3a8 100644 --- a/go/test/endtoend/recovery/pitr/shardedpitr_test.go +++ b/go/test/endtoend/recovery/pitr/shardedpitr_test.go @@ -538,5 +538,5 @@ func launchRecoveryTablet(t *testing.T, tablet *cluster.Vttablet, binlogServer * err = tablet.VttabletProcess.Setup() require.NoError(t, err) - tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"SERVING"}, 20*time.Second) + tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 20*time.Second) } diff --git a/go/test/endtoend/recovery/pitrtls/shardedpitr_tls_test.go b/go/test/endtoend/recovery/pitrtls/shardedpitr_tls_test.go index 259db0d1835..64a7bce652f 100644 --- a/go/test/endtoend/recovery/pitrtls/shardedpitr_tls_test.go +++ b/go/test/endtoend/recovery/pitrtls/shardedpitr_tls_test.go @@ -522,7 +522,7 @@ func tlsLaunchRecoveryTablet(t *testing.T, tablet *cluster.Vttablet, tabletForBi err = tablet.VttabletProcess.Setup() require.NoError(t, err) - tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"SERVING"}, 20*time.Second) + tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 20*time.Second) } func getCNFromCertPEM(filename string) string { diff --git a/go/test/endtoend/recovery/recovery_util.go b/go/test/endtoend/recovery/recovery_util.go index c48320d487f..cde3af4dac0 100644 --- a/go/test/endtoend/recovery/recovery_util.go +++ b/go/test/endtoend/recovery/recovery_util.go @@ -83,7 +83,7 @@ func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tabl err = tablet.VttabletProcess.Setup() require.Nil(t, err) - err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"SERVING"}, 20*time.Second) + err = tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 20*time.Second) require.Nil(t, err) } diff --git a/go/test/endtoend/recovery/shardedrecovery/sharded_recovery_test.go b/go/test/endtoend/recovery/shardedrecovery/sharded_recovery_test.go index 612428219b1..83a8098b92f 100644 --- a/go/test/endtoend/recovery/shardedrecovery/sharded_recovery_test.go +++ b/go/test/endtoend/recovery/shardedrecovery/sharded_recovery_test.go @@ -149,7 +149,7 @@ func TestUnShardedRecoveryAfterSharding(t *testing.T) { shardedTablets := []*cluster.Vttablet{shard0Master, shard0Replica, shard0RdOnly, shard1Master, shard1Replica, shard1RdOnly} for _, tablet := range shardedTablets { - _ = tablet.VttabletProcess.WaitForTabletType("SERVING") + _ = tablet.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) } @@ -288,7 +288,7 @@ func TestShardedRecovery(t *testing.T) { require.NoError(t, err) for _, tablet := range shardedTablets { - _ = tablet.VttabletProcess.WaitForTabletType("SERVING") + _ = tablet.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) } diff --git a/go/test/endtoend/reparent/utils_test.go b/go/test/endtoend/reparent/utils_test.go index 44bf8ec918f..bc62e529906 100644 --- a/go/test/endtoend/reparent/utils_test.go +++ b/go/test/endtoend/reparent/utils_test.go @@ -163,7 +163,7 @@ func setupShard(ctx context.Context, t *testing.T, shardName string, tablets []* } for _, tablet := range tablets { - err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"}) + err := tablet.VttabletProcess.WaitForTabletStatuses([]string{"SERVING", "NOT_SERVING"}) require.NoError(t, err) } diff --git a/go/test/endtoend/sharding/initialsharding/sharding_util.go b/go/test/endtoend/sharding/initialsharding/sharding_util.go index d7cf5f1414c..07fefbade85 100644 --- a/go/test/endtoend/sharding/initialsharding/sharding_util.go +++ b/go/test/endtoend/sharding/initialsharding/sharding_util.go @@ -271,15 +271,15 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query err = ClusterInstance.VtctlclientProcess.InitShardMaster(keyspace.Name, shard1.Name, cell, shard1MasterTablet.TabletUID) require.NoError(t, err) } else { - err = shard1.Replica().VttabletProcess.WaitForTabletType("SERVING") + err = shard1.Replica().VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) _, err = ClusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("TabletExternallyReparented", shard1MasterTablet.Alias) require.NoError(t, err) } - err = shard1.Replica().VttabletProcess.WaitForTabletType("SERVING") + err = shard1.Replica().VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) - err = shard1.Rdonly().VttabletProcess.WaitForTabletType("SERVING") + err = shard1.Rdonly().VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) for _, vttablet := range shard1.Vttablets { assert.Equal(t, vttablet.VttabletProcess.GetTabletStatus(), "SERVING") @@ -352,8 +352,8 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query _ = ClusterInstance.VtctlclientProcess.ApplyVSchema(keyspaceName, fmt.Sprintf(vSchema, tableName, "id")) for _, shard := range []cluster.Shard{shard21, shard22} { - _ = shard.Replica().VttabletProcess.WaitForTabletType("SERVING") - _ = shard.Rdonly().VttabletProcess.WaitForTabletType("SERVING") + _ = shard.Replica().VttabletProcess.WaitForTabletStatus("SERVING") + _ = shard.Rdonly().VttabletProcess.WaitForTabletStatus("SERVING") } for _, shard := range []cluster.Shard{shard21, shard22} { @@ -526,8 +526,8 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query expectedPartitions[topodata.TabletType_RDONLY] = []string{shard21.Name, shard22.Name} checkSrvKeyspaceForSharding(t, keyspaceName, expectedPartitions) - _ = shard21.Rdonly().VttabletProcess.WaitForTabletType("SERVING") - _ = shard22.Rdonly().VttabletProcess.WaitForTabletType("SERVING") + _ = shard21.Rdonly().VttabletProcess.WaitForTabletStatus("SERVING") + _ = shard22.Rdonly().VttabletProcess.WaitForTabletStatus("SERVING") _ = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard21.Name), 1) _ = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard22.Name), 1) diff --git a/go/test/endtoend/sharding/mergesharding/mergesharding_base.go b/go/test/endtoend/sharding/mergesharding/mergesharding_base.go index 7263e2cca42..fb731dd9c58 100644 --- a/go/test/endtoend/sharding/mergesharding/mergesharding_base.go +++ b/go/test/endtoend/sharding/mergesharding/mergesharding_base.go @@ -214,13 +214,13 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) { require.NoError(t, err) // Wait for tablets to come in Service state - err = shard0Master.VttabletProcess.WaitForTabletType("SERVING") + err = shard0Master.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) - err = shard1Master.VttabletProcess.WaitForTabletType("SERVING") + err = shard1Master.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) - err = shard2Master.VttabletProcess.WaitForTabletType("SERVING") + err = shard2Master.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) - err = shard3Master.VttabletProcess.WaitForTabletType("SERVING") + err = shard3Master.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) // keyspace/shard name fields @@ -348,7 +348,7 @@ func TestMergesharding(t *testing.T, useVarbinaryShardingKeyType bool) { // The tested behavior is a safeguard to prevent that somebody can // accidentally modify data on the destination masters while they are not // migrated yet and the source shards are still the source of truth. - err = shard3Master.VttabletProcess.WaitForTabletType("NOT_SERVING") + err = shard3Master.VttabletProcess.WaitForTabletStatus("NOT_SERVING") require.NoError(t, err) // check that binlog server exported the stats vars diff --git a/go/test/endtoend/sharding/resharding/resharding_base.go b/go/test/endtoend/sharding/resharding/resharding_base.go index fabc1a0734b..9b04ca89c7e 100644 --- a/go/test/endtoend/sharding/resharding/resharding_base.go +++ b/go/test/endtoend/sharding/resharding/resharding_base.go @@ -305,13 +305,13 @@ func TestResharding(t *testing.T, useVarbinaryShardingKeyType bool) { require.Nil(t, err) // Wait for tablets to come in Service state - err = shard0Master.VttabletProcess.WaitForTabletType("SERVING") + err = shard0Master.VttabletProcess.WaitForTabletStatus("SERVING") require.Nil(t, err) - err = shard1Master.VttabletProcess.WaitForTabletType("SERVING") + err = shard1Master.VttabletProcess.WaitForTabletStatus("SERVING") require.Nil(t, err) - err = shard2Master.VttabletProcess.WaitForTabletType("SERVING") + err = shard2Master.VttabletProcess.WaitForTabletStatus("SERVING") require.Nil(t, err) - err = shard3Master.VttabletProcess.WaitForTabletType("SERVING") + err = shard3Master.VttabletProcess.WaitForTabletStatus("SERVING") require.Nil(t, err) // keyspace/shard name fields @@ -375,7 +375,7 @@ func TestResharding(t *testing.T, useVarbinaryShardingKeyType bool) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", shard1Replica2.Alias, "spare") require.Nil(t, err) - err = shard1Replica2.VttabletProcess.WaitForTabletType("NOT_SERVING") + err = shard1Replica2.VttabletProcess.WaitForTabletStatus("NOT_SERVING") require.Nil(t, err) // we need to create the schema, and the worker will do data copying @@ -522,9 +522,9 @@ func TestResharding(t *testing.T, useVarbinaryShardingKeyType bool) { // The tested behavior is a safeguard to prevent that somebody can // accidentally modify data on the destination masters while they are not // migrated yet and the source shards are still the source of truth. - err = shard2Master.VttabletProcess.WaitForTabletType("NOT_SERVING") + err = shard2Master.VttabletProcess.WaitForTabletStatus("NOT_SERVING") require.Nil(t, err) - err = shard3Master.VttabletProcess.WaitForTabletType("NOT_SERVING") + err = shard3Master.VttabletProcess.WaitForTabletStatus("NOT_SERVING") require.Nil(t, err) // check that binlog server exported the stats vars @@ -596,9 +596,9 @@ func TestResharding(t *testing.T, useVarbinaryShardingKeyType bool) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", shard1Replica1.Alias, "spare") require.Nil(t, err) - err = shard1Replica2.VttabletProcess.WaitForTabletType("SERVING") + err = shard1Replica2.VttabletProcess.WaitForTabletStatus("SERVING") require.Nil(t, err) - err = shard1Replica1.VttabletProcess.WaitForTabletType("NOT_SERVING") + err = shard1Replica1.VttabletProcess.WaitForTabletStatus("NOT_SERVING") require.Nil(t, err) err = clusterInstance.VtctlclientProcess.ExecuteCommand("RunHealthCheck", shard1Replica2.Alias) diff --git a/go/test/endtoend/sharding/verticalsplit/vertical_split_test.go b/go/test/endtoend/sharding/verticalsplit/vertical_split_test.go index ad1c73ccaa1..2de8bfe7338 100644 --- a/go/test/endtoend/sharding/verticalsplit/vertical_split_test.go +++ b/go/test/endtoend/sharding/verticalsplit/vertical_split_test.go @@ -141,11 +141,11 @@ func TestVerticalSplit(t *testing.T) { destinationMasterTablet.Type = "master" for _, tablet := range []cluster.Vttablet{sourceReplicaTablet, destinationReplicaTablet} { - _ = tablet.VttabletProcess.WaitForTabletType("SERVING") + _ = tablet.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) } for _, tablet := range []cluster.Vttablet{sourceRdOnlyTablet1, sourceRdOnlyTablet2, destinationRdOnlyTablet1, destinationRdOnlyTablet2} { - _ = tablet.VttabletProcess.WaitForTabletType("SERVING") + _ = tablet.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) } diff --git a/go/test/endtoend/tabletmanager/master/tablet_master_test.go b/go/test/endtoend/tabletmanager/master/tablet_master_test.go index 07dcdd4b899..31cd0631e78 100644 --- a/go/test/endtoend/tabletmanager/master/tablet_master_test.go +++ b/go/test/endtoend/tabletmanager/master/tablet_master_test.go @@ -169,7 +169,7 @@ func TestMasterRestartSetsTERTimestamp(t *testing.T) { err := clusterInstance.VtctlclientProcess.InitShardMaster(keyspaceName, shardName, cell, replicaTablet.TabletUID) require.Nil(t, err) - err = replicaTablet.VttabletProcess.WaitForTabletType("SERVING") + err = replicaTablet.VttabletProcess.WaitForTabletStatus("SERVING") require.Nil(t, err) // Capture the current TER. @@ -224,7 +224,7 @@ func TestMasterRestartSetsTERTimestamp(t *testing.T) { // Reset master err = clusterInstance.VtctlclientProcess.InitShardMaster(keyspaceName, shardName, cell, masterTablet.TabletUID) require.Nil(t, err) - err = masterTablet.VttabletProcess.WaitForTabletType("SERVING") + err = masterTablet.VttabletProcess.WaitForTabletStatus("SERVING") require.Nil(t, err) } diff --git a/go/test/endtoend/tabletmanager/tablet_health_test.go b/go/test/endtoend/tabletmanager/tablet_health_test.go index a31b9b82d63..0d7906ab259 100644 --- a/go/test/endtoend/tabletmanager/tablet_health_test.go +++ b/go/test/endtoend/tabletmanager/tablet_health_test.go @@ -205,7 +205,7 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) { //Wait if tablet is not in service state defer cluster.PanicHandler(t) - err := rdonlyTablet.VttabletProcess.WaitForTabletType("SERVING") + err := rdonlyTablet.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) // Check tablet health @@ -231,7 +231,7 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) { checkTabletType(t, rdonlyTablet.Alias, "DRAINED") // Query service is still running. - err = rdonlyTablet.VttabletProcess.WaitForTabletType("SERVING") + err = rdonlyTablet.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) // Restart replication. Tablet will become healthy again. diff --git a/go/test/endtoend/tabletmanager/tablet_test.go b/go/test/endtoend/tabletmanager/tablet_test.go index 0ce8cd94276..778583102d0 100644 --- a/go/test/endtoend/tabletmanager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/tablet_test.go @@ -53,7 +53,7 @@ func TestEnsureDB(t *testing.T) { // Switch to read-write and verify that that we go serving. _ = clusterInstance.VtctlclientProcess.ExecuteCommand("SetReadWrite", tablet.Alias) - err = tablet.VttabletProcess.WaitForTabletType("SERVING") + err = tablet.VttabletProcess.WaitForTabletStatus("SERVING") require.NoError(t, err) killTablets(t, tablet) } diff --git a/go/test/endtoend/vtorc/vtorc_test.go b/go/test/endtoend/vtorc/vtorc_test.go index f6e8690e8d2..9f005d6311a 100644 --- a/go/test/endtoend/vtorc/vtorc_test.go +++ b/go/test/endtoend/vtorc/vtorc_test.go @@ -25,6 +25,12 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/topo" + _ "vitess.io/vitess/go/vt/topo/consultopo" + _ "vitess.io/vitess/go/vt/topo/etcd2topo" + _ "vitess.io/vitess/go/vt/topo/k8stopo" + _ "vitess.io/vitess/go/vt/topo/zk2topo" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" @@ -40,84 +46,276 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -func createCluster(t *testing.T, numReplicas int, numRdonly int, orcExtraArgs []string) *cluster.LocalProcessCluster { - keyspaceName := "ks" - shardName := "0" - keyspace := &cluster.Keyspace{Name: keyspaceName} - shard0 := &cluster.Shard{Name: shardName} - //dbName = "vt_" + keyspaceName - //username = "vt_dba" - hostname := "localhost" - cell1 := "zone1" - cell2 := "zone2" - tablets := []*cluster.Vttablet{} - clusterInstance := cluster.NewCluster(cell1, hostname) +var ( + clusterInstance *cluster.LocalProcessCluster + ts *topo.Server + replicaTablets []*cluster.Vttablet + rdonlyTablets []*cluster.Vttablet + uidBase = 100 +) + +const ( + keyspaceName = "ks" + shardName = "0" + hostname = "localhost" + cell1 = "zone1" + numReplicas = 4 + numRdonly = 1 +) + +// createClusterAndStartTopo starts the cluster and topology service +func createClusterAndStartTopo() error { + clusterInstance = cluster.NewCluster(cell1, hostname) // Start topo server err := clusterInstance.StartTopo() - require.NoError(t, err) + if err != nil { + return err + } - // Adding another cell in the same cluster - err = clusterInstance.TopoProcess.ManageTopoDir("mkdir", "/vitess/"+cell2) - require.NoError(t, err) - err = clusterInstance.VtctlProcess.AddCellInfo(cell2) - require.NoError(t, err) + // create the vttablets + err = createVttablets() + if err != nil { + return err + } + + // create topo server connection + ts, err = topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + return err +} + +// createVttablets is used to create the vttablets for all the tests +func createVttablets() error { + keyspace := &cluster.Keyspace{Name: keyspaceName} + shard0 := &cluster.Shard{Name: shardName} // creating tablets by hand instead of using StartKeyspace because we don't want to call InitShardMaster - uidBase := 100 + var tablets []*cluster.Vttablet for i := 0; i < numReplicas; i++ { - tablets = append(tablets, clusterInstance.NewVttabletInstance("replica", uidBase+i, cell1)) + vttabletInstance := clusterInstance.NewVttabletInstance("replica", uidBase, cell1) + uidBase++ + tablets = append(tablets, vttabletInstance) + replicaTablets = append(replicaTablets, vttabletInstance) } for i := 0; i < numRdonly; i++ { - tablets = append(tablets, clusterInstance.NewVttabletInstance("rdonly", uidBase+numReplicas+i, cell1)) + vttabletInstance := clusterInstance.NewVttabletInstance("rdonly", uidBase, cell1) + uidBase++ + tablets = append(tablets, vttabletInstance) + rdonlyTablets = append(rdonlyTablets, vttabletInstance) } - clusterInstance.VtTabletExtraArgs = []string{ "-lock_tables_timeout", "5s", "-disable_active_reparents", } - // Initialize Cluster shard0.Vttablets = tablets - err = clusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard0}) - require.NoError(t, err) - + err := clusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard0}) + if err != nil { + return err + } //Start MySql var mysqlCtlProcessList []*exec.Cmd for _, tablet := range shard0.Vttablets { log.Infof("Starting MySql for tablet %v", tablet.Alias) proc, err := tablet.MysqlctlProcess.StartProcess() - require.NoError(t, err) + if err != nil { + return err + } mysqlCtlProcessList = append(mysqlCtlProcessList, proc) } - // Wait for mysql processes to start for _, proc := range mysqlCtlProcessList { err := proc.Wait() - require.NoError(t, err) + if err != nil { + return err + } } - for _, tablet := range shard0.Vttablets { // Reset status, don't wait for the tablet status. We will check it later tablet.VttabletProcess.ServingStatus = "" - // Start the tablet err := tablet.VttabletProcess.Setup() - require.NoError(t, err) + if err != nil { + return err + } + } + for _, tablet := range shard0.Vttablets { + err := tablet.VttabletProcess.WaitForTabletStatuses([]string{"SERVING", "NOT_SERVING"}) + if err != nil { + return err + } } + // we also need to wait for the tablet type to change from restore to replica, before we delete a tablet from the topology + // otherwise it will notice that their is no record for the tablet in the topology when it tries to update its state and shutdown itself! for _, tablet := range shard0.Vttablets { - err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"}) - require.NoError(t, err) + err := tablet.VttabletProcess.WaitForTabletTypes([]string{"replica", "rdonly"}) + if err != nil { + return err + } + } + + return nil +} + +// shutdownVttablets shuts down all the vttablets and removes them from the topology +func shutdownVttablets() error { + // demote the primary tablet if there is + err := demotePrimaryTablet() + if err != nil { + return err + } + + for _, vttablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + // we need to stop a vttablet only if it is not shutdown + if !vttablet.VttabletProcess.IsShutdown() { + // wait for primary tablet to demote. For all others, it will not wait + err = vttablet.VttabletProcess.WaitForTabletTypes([]string{vttablet.Type}) + if err != nil { + return err + } + // Stop the vttablets + err := vttablet.VttabletProcess.TearDown() + if err != nil { + return err + } + // Remove the tablet record for this tablet + } + err = clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteTablet", vttablet.Alias) + if err != nil { + return err + } + } + clusterInstance.Keyspaces[0].Shards[0].Vttablets = nil + return nil +} + +// demotePrimaryTablet demotes the primary tablet for our shard +func demotePrimaryTablet() (err error) { + // lock the shard + ctx, unlock, lockErr := ts.LockShard(context.Background(), keyspaceName, shardName, "demotePrimaryTablet-vtorc-endtoend-test") + if lockErr != nil { + return lockErr } + defer unlock(&err) + + // update the shard record's master + if _, err = ts.UpdateShardFields(ctx, keyspaceName, shardName, func(si *topo.ShardInfo) error { + si.MasterAlias = nil + si.SetMasterTermStartTime(time.Now()) + return nil + }); err != nil { + return err + } + return +} +// startVtorc is used to start the orchestrator with the given extra arguments +func startVtorc(t *testing.T, orcExtraArgs []string) { // Start vtorc clusterInstance.VtorcProcess = clusterInstance.NewOrcProcess(path.Join(os.Getenv("PWD"), "test_config.json")) clusterInstance.VtorcProcess.ExtraArgs = orcExtraArgs - err = clusterInstance.VtorcProcess.Setup() + err := clusterInstance.VtorcProcess.Setup() + require.NoError(t, err) +} + +// stopVtorc is used to stop the orchestrator +func stopVtorc(t *testing.T) { + // Stop vtorc + if clusterInstance.VtorcProcess != nil { + err := clusterInstance.VtorcProcess.TearDown() + require.NoError(t, err) + } + clusterInstance.VtorcProcess = nil +} + +// setupVttabletsAndVtorc is used to setup the vttablets and start the orchestrator +func setupVttabletsAndVtorc(t *testing.T, numReplicasReq int, numRdonlyReq int, orcExtraArgs []string) { + // stop vtorc if it is running + stopVtorc(t) + + // remove all the vttablets so that each test can add the amount that they require + err := shutdownVttablets() + require.NoError(t, err) + + for _, tablet := range replicaTablets { + if numReplicasReq == 0 { + break + } + err = cleanAndStartVttablet(t, tablet) + require.NoError(t, err) + numReplicasReq-- + } + + for _, tablet := range rdonlyTablets { + if numRdonlyReq == 0 { + break + } + err = cleanAndStartVttablet(t, tablet) + require.NoError(t, err) + numRdonlyReq-- + } + + if numRdonlyReq > 0 || numReplicasReq > 0 { + t.Fatalf("more than available tablets requested. Please increase the constants numReplicas or numRdonly") + } + + // wait for the tablets to come up properly + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + err := tablet.VttabletProcess.WaitForTabletStatuses([]string{"SERVING", "NOT_SERVING"}) + require.NoError(t, err) + } + for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { + err := tablet.VttabletProcess.WaitForTabletTypes([]string{"replica", "rdonly"}) + require.NoError(t, err) + } + + // start vtorc + startVtorc(t, orcExtraArgs) +} + +func cleanAndStartVttablet(t *testing.T, vttablet *cluster.Vttablet) error { + // remove the databases if they exist + _, err := runSQL(t, "DROP DATABASE IF EXISTS vt_ks", vttablet, "") + require.NoError(t, err) + _, err = runSQL(t, "DROP DATABASE IF EXISTS _vt", vttablet, "") + require.NoError(t, err) + // stop the replication + _, err = runSQL(t, "STOP SLAVE", vttablet, "") + require.NoError(t, err) + // reset the binlog + _, err = runSQL(t, "RESET MASTER", vttablet, "") + require.NoError(t, err) + + // start the vttablet + err = vttablet.VttabletProcess.Setup() require.NoError(t, err) - return clusterInstance + clusterInstance.Keyspaces[0].Shards[0].Vttablets = append(clusterInstance.Keyspaces[0].Shards[0].Vttablets, vttablet) + return nil +} + +func TestMain(m *testing.M) { + exitcode, err := func() (int, error) { + err := createClusterAndStartTopo() + if err != nil { + return 1, err + } + return m.Run(), nil + }() + + cluster.PanicHandler(nil) + killTablets(replicaTablets) + killTablets(rdonlyTablets) + clusterInstance.Keyspaces[0].Shards[0].Vttablets = nil + clusterInstance.Teardown() + + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } else { + os.Exit(exitcode) + } } // Cases to test: @@ -126,15 +324,10 @@ func createCluster(t *testing.T, numReplicas int, numRdonly int, orcExtraArgs [] // verify replication is setup func TestMasterElection(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 1, 1, nil) + setupVttabletsAndVtorc(t, 1, 1, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - killTablets(t, shard0) - }() - //log.Exitf("error") checkMasterTablet(t, clusterInstance, shard0.Vttablets[0]) checkReplication(t, clusterInstance, shard0.Vttablets[0], shard0.Vttablets[1:]) } @@ -145,15 +338,10 @@ func TestMasterElection(t *testing.T) { // verify replication is setup func TestSingleKeyspace(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 1, 1, []string{"-clusters_to_watch", "ks"}) + setupVttabletsAndVtorc(t, 1, 1, []string{"-clusters_to_watch", "ks"}) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - killTablets(t, shard0) - }() - //log.Exitf("error") checkMasterTablet(t, clusterInstance, shard0.Vttablets[0]) checkReplication(t, clusterInstance, shard0.Vttablets[0], shard0.Vttablets[1:]) } @@ -164,15 +352,10 @@ func TestSingleKeyspace(t *testing.T) { // verify replication is setup func TestKeyspaceShard(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 1, 1, []string{"-clusters_to_watch", "ks/0"}) + setupVttabletsAndVtorc(t, 1, 1, []string{"-clusters_to_watch", "ks/0"}) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - killTablets(t, shard0) - }() - //log.Exitf("error") checkMasterTablet(t, clusterInstance, shard0.Vttablets[0]) checkReplication(t, clusterInstance, shard0.Vttablets[0], shard0.Vttablets[1:]) } @@ -180,13 +363,9 @@ func TestKeyspaceShard(t *testing.T) { // 2. bring down master, let orc promote replica func TestDownMaster(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0, nil) + setupVttabletsAndVtorc(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - killTablets(t, shard0) - }() // find master from topo curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0) assert.NotNil(t, curMaster, "should have elected a master") @@ -194,6 +373,17 @@ func TestDownMaster(t *testing.T) { // Make the current master database unavailable. err := curMaster.MysqlctlProcess.Stop() require.NoError(t, err) + defer func() { + // we remove the tablet from our global list since its mysqlctl process has stopped and cannot be reused for other tests + for i, tablet := range replicaTablets { + if tablet == curMaster { + // remove this tablet since its mysql has stopped + replicaTablets = append(replicaTablets[:i], replicaTablets[i+1:]...) + killTablets([]*cluster.Vttablet{curMaster}) + return + } + } + }() for _, tablet := range shard0.Vttablets { // we know we have only two tablets, so the "other" one must be the new master @@ -208,7 +398,8 @@ func waitForReadOnlyValue(t *testing.T, curMaster *cluster.Vttablet, expectValue timeout := 15 * time.Second startTime := time.Now() for time.Since(startTime) < timeout { - qr := runSQL(t, "select @@global.read_only as read_only", curMaster, "") + qr, err := runSQL(t, "select @@global.read_only as read_only", curMaster, "") + require.NoError(t, err) require.NotNil(t, qr) row := qr.Named().Row() require.NotNil(t, row) @@ -225,21 +416,17 @@ func waitForReadOnlyValue(t *testing.T, curMaster *cluster.Vttablet, expectValue // 3. make master readonly, let orc repair func TestMasterReadOnly(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0, nil) + setupVttabletsAndVtorc(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - // Kill tablets - killTablets(t, shard0) - }() // find master from topo curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0) assert.NotNil(t, curMaster, "should have elected a master") // Make the current master database read-only. - runSQL(t, "set global read_only=ON", curMaster, "") + _, err := runSQL(t, "set global read_only=ON", curMaster, "") + require.NoError(t, err) // wait for repair match := waitForReadOnlyValue(t, curMaster, 0) @@ -249,14 +436,9 @@ func TestMasterReadOnly(t *testing.T) { // 4. make replica ReadWrite, let orc repair func TestReplicaReadWrite(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0, nil) + setupVttabletsAndVtorc(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - // Kill tablets - killTablets(t, shard0) - }() // find master from topo curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0) @@ -271,7 +453,8 @@ func TestReplicaReadWrite(t *testing.T) { } } // Make the replica database read-write. - runSQL(t, "set global read_only=OFF", replica, "") + _, err := runSQL(t, "set global read_only=OFF", replica, "") + require.NoError(t, err) // wait for repair match := waitForReadOnlyValue(t, replica, 1) @@ -281,14 +464,9 @@ func TestReplicaReadWrite(t *testing.T) { // 5. stop replication, let orc repair func TestStopReplication(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0, nil) + setupVttabletsAndVtorc(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - // Kill tablets - killTablets(t, shard0) - }() // find master from topo curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0) @@ -320,14 +498,9 @@ func TestStopReplication(t *testing.T) { // 6. setup replication from non-master, let orc repair func TestReplicationFromOtherReplica(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 3, 0, nil) + setupVttabletsAndVtorc(t, 3, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - // Kill tablets - killTablets(t, shard0) - }() // find master from topo curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0) @@ -371,14 +544,9 @@ func TestRepairAfterTER(t *testing.T) { // test fails intermittently on CI, skip until it can be fixed. t.SkipNow() defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0, nil) + setupVttabletsAndVtorc(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] - defer func() { - clusterInstance.Teardown() - // Kill tablets - killTablets(t, shard0) - }() // find master from topo curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0) @@ -408,6 +576,40 @@ func TestRepairAfterTER(t *testing.T) { checkReplication(t, clusterInstance, newMaster, []*cluster.Vttablet{curMaster}) } +// 7. make instance A replicates from B and B from A, wait for repair +func TestCircularReplication(t *testing.T) { + defer cluster.PanicHandler(t) + setupVttabletsAndVtorc(t, 2, 0, nil) + keyspace := &clusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + + // find master from topo + master := shardMasterTablet(t, clusterInstance, keyspace, shard0) + assert.NotNil(t, master, "should have elected a master") + + var replica *cluster.Vttablet + for _, tablet := range shard0.Vttablets { + // we know we have only two tablets, so the "other" one must be the new master + if tablet.Alias != master.Alias { + replica = tablet + break + } + } + + changeMasterCommands := fmt.Sprintf("RESET SLAVE;"+ + "CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+ + "START SLAVE;", replica.VttabletProcess.TabletHostname, replica.MySQLPort) + + _, err := runSQL(t, changeMasterCommands, master, "") + require.NoError(t, err) + + // wait for repair + err = waitForReplicationToStop(t, master) + require.NoError(t, err) + // check replication is setup correctly + checkReplication(t, clusterInstance, master, []*cluster.Vttablet{replica}) +} + func shardMasterTablet(t *testing.T, cluster *cluster.LocalProcessCluster, keyspace *cluster.Keyspace, shard *cluster.Shard) *cluster.Vttablet { start := time.Now() for { @@ -455,7 +657,7 @@ func checkMasterTablet(t *testing.T, cluster *cluster.LocalProcessCluster, table continue } else { // allow time for tablet state to be updated after topo is updated - time.Sleep(time.Second) + time.Sleep(2 * time.Second) // make sure the health stream is updated result, err = cluster.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", tablet.Alias) require.NoError(t, err) @@ -485,7 +687,8 @@ func checkReplication(t *testing.T, clusterInstance *cluster.LocalProcessCluster primary key (id) ) Engine=InnoDB ` - runSQL(t, sqlSchema, master, "vt_ks") + _, err := runSQL(t, sqlSchema, master, "vt_ks") + require.NoError(t, err) confirmReplication(t, master, replicas) } @@ -494,7 +697,8 @@ func confirmReplication(t *testing.T, master *cluster.Vttablet, replicas []*clus n := 2 // random value ... // insert data into the new master, check the connected replica work insertSQL := fmt.Sprintf("insert into vt_insert_test(id, msg) values (%d, 'test %d')", n, n) - runSQL(t, insertSQL, master, "vt_ks") + _, err := runSQL(t, insertSQL, master, "vt_ks") + require.NoError(t, err) time.Sleep(100 * time.Millisecond) for _, tab := range replicas { err := checkInsertedValues(t, tab, n) @@ -507,8 +711,10 @@ func checkInsertedValues(t *testing.T, tablet *cluster.Vttablet, index int) erro timeout := time.Now().Add(10 * time.Second) for time.Now().Before(timeout) { selectSQL := fmt.Sprintf("select msg from vt_insert_test where id=%d", index) - qr := runSQL(t, selectSQL, tablet, "vt_ks") - if len(qr.Rows) == 1 { + qr, err := runSQL(t, selectSQL, tablet, "vt_ks") + // The error may be not nil, if the replication has not caught upto the point where the table exists. + // We can safely skip this error and retry reading after wait + if err == nil && len(qr.Rows) == 1 { return nil } time.Sleep(300 * time.Millisecond) @@ -516,21 +722,52 @@ func checkInsertedValues(t *testing.T, tablet *cluster.Vttablet, index int) erro return fmt.Errorf("data is not yet replicated") } +func waitForReplicationToStop(t *testing.T, vttablet *cluster.Vttablet) error { + timeout := time.After(15 * time.Second) + for { + select { + case <-timeout: + return fmt.Errorf("timedout: waiting for master to stop replication") + default: + res, err := runSQL(t, "SHOW SLAVE STATUS", vttablet, "") + if err != nil { + return err + } + if len(res.Rows) == 0 { + return nil + } + time.Sleep(1 * time.Second) + } + } +} + func validateTopology(t *testing.T, cluster *cluster.LocalProcessCluster, pingTablets bool) { - if pingTablets { - out, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("Validate", "-ping-tablets=true") - require.NoError(t, err, out) - } else { - err := cluster.VtctlclientProcess.ExecuteCommand("Validate") - require.NoError(t, err) + ch := make(chan interface{}) + go func() { + if pingTablets { + out, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("Validate", "-ping-tablets=true") + require.NoError(t, err, out) + } else { + err := cluster.VtctlclientProcess.ExecuteCommand("Validate") + require.NoError(t, err) + } + ch <- true + }() + + select { + case <-ch: + return + case <-time.After(120 * time.Second): + t.Fatal("time out waiting for validation to finish") } } -func killTablets(t *testing.T, shard *cluster.Shard) { - for _, tablet := range shard.Vttablets { +func killTablets(vttablets []*cluster.Vttablet) { + for _, tablet := range vttablets { + log.Infof("Shutting down MySQL for %v", tablet.Alias) + _ = tablet.MysqlctlProcess.Stop() log.Infof("Calling TearDown on tablet %v", tablet.Alias) - err := tablet.VttabletProcess.TearDown() - require.NoError(t, err) + _ = tablet.VttabletProcess.TearDown() } } @@ -545,7 +782,7 @@ func getMysqlConnParam(tablet *cluster.Vttablet, db string) mysql.ConnParams { return connParams } -func runSQL(t *testing.T, sql string, tablet *cluster.Vttablet, db string) *sqltypes.Result { +func runSQL(t *testing.T, sql string, tablet *cluster.Vttablet, db string) (*sqltypes.Result, error) { // Get Connection tabletParams := getMysqlConnParam(tablet, db) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -558,9 +795,7 @@ func runSQL(t *testing.T, sql string, tablet *cluster.Vttablet, db string) *sqlt return execute(t, conn, sql) } -func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { +func execute(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error) { t.Helper() - qr, err := conn.ExecuteFetch(query, 1000, true) - require.Nil(t, err) - return qr + return conn.ExecuteFetch(query, 1000, true) } diff --git a/go/test/endtoend/worker/worker_test.go b/go/test/endtoend/worker/worker_test.go index 4fdb163ec3e..5e834f2314d 100644 --- a/go/test/endtoend/worker/worker_test.go +++ b/go/test/endtoend/worker/worker_test.go @@ -509,7 +509,7 @@ func runShardTablets(t *testing.T, shardName string, tabletArr []*cluster.Vttabl // set a replica or rdonly tablet back to NOT_SERVING. for _, tablet := range tabletArr { - err = tablet.VttabletProcess.WaitForTabletType("SERVING") + err = tablet.VttabletProcess.WaitForTabletStatus("SERVING") require.Nil(t, err) } diff --git a/go/vt/orchestrator/logic/orchestrator.go b/go/vt/orchestrator/logic/orchestrator.go index f8d05577a1c..28c4d2062d2 100644 --- a/go/vt/orchestrator/logic/orchestrator.go +++ b/go/vt/orchestrator/logic/orchestrator.go @@ -49,6 +49,7 @@ const ( var discoveryQueue *discovery.Queue var snapshotDiscoveryKeys chan inst.InstanceKey var snapshotDiscoveryKeysMutex sync.Mutex +var hasReceivedSIGTERM int32 var discoveriesCounter = metrics.NewCounter() var failedDiscoveriesCounter = metrics.NewCounter() @@ -120,10 +121,28 @@ func acceptSignals() { config.Reload() discoveryMetrics.SetExpirePeriod(time.Duration(config.Config.DiscoveryCollectionRetentionSeconds) * time.Second) case syscall.SIGTERM: - log.Infof("Received SIGTERM. Shutting down orchestrator") + log.Infof("Received SIGTERM. Starting shutdown") + atomic.StoreInt32(&hasReceivedSIGTERM, 1) discoveryMetrics.StopAutoExpiration() // probably should poke other go routines to stop cleanly here ... inst.AuditOperation("shutdown", nil, "Triggered via SIGTERM") + timeout := time.After(*shutdownWaitTime) + func() { + for { + count := atomic.LoadInt32(&shardsLockCounter) + if count == 0 { + return + } + select { + case <-timeout: + log.Infof("wait for lock release timed out. Some locks might not have been released.") + return + default: + time.Sleep(100 * time.Millisecond) + } + } + }() + log.Infof("Shutting down orchestrator") os.Exit(0) } } diff --git a/go/vt/orchestrator/logic/tablet_discovery.go b/go/vt/orchestrator/logic/tablet_discovery.go index 0bf55069753..d62c51ef566 100644 --- a/go/vt/orchestrator/logic/tablet_discovery.go +++ b/go/vt/orchestrator/logic/tablet_discovery.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "google.golang.org/protobuf/encoding/prototext" @@ -41,8 +42,10 @@ import ( ) var ( - ts *topo.Server - clustersToWatch = flag.String("clusters_to_watch", "", "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + ts *topo.Server + clustersToWatch = flag.String("clusters_to_watch", "", "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + shutdownWaitTime = flag.Duration("shutdown_wait_time", 30*time.Second, "maximum time to wait for vtorc to release all the locks that it is holding before shutting down on SIGTERM") + shardsLockCounter int32 ) // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker @@ -240,6 +243,10 @@ func LockShard(instanceKey inst.InstanceKey) (func(*error), error) { if instanceKey.Hostname == "" { return nil, errors.New("Can't lock shard: instance is unspecified") } + val := atomic.LoadInt32(&hasReceivedSIGTERM) + if val > 0 { + return nil, errors.New("Can't lock shard: SIGTERM received") + } tablet, err := inst.ReadTablet(instanceKey) if err != nil { @@ -247,8 +254,16 @@ func LockShard(instanceKey inst.InstanceKey) (func(*error), error) { } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.Config.LockShardTimeoutSeconds)*time.Second) defer cancel() + atomic.AddInt32(&shardsLockCounter, 1) _, unlock, err := ts.LockShard(ctx, tablet.Keyspace, tablet.Shard, "Orc Recovery") - return unlock, err + if err != nil { + atomic.AddInt32(&shardsLockCounter, -1) + return nil, err + } + return func(e *error) { + defer atomic.AddInt32(&shardsLockCounter, -1) + unlock(e) + }, nil } // TabletRefresh refreshes the tablet info.