From 96c3dca7c4b4252e2ac7046374a415003215c579 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 1 Mar 2023 03:09:03 -0500 Subject: [PATCH] Flakes: Use new healthy shard check in vreplication e2e tests (#12502) * Use new healthy shard check in vreplication e2e tests This is needed because checking that there's a primary tablet for the shard in vtgate's healtcheck is no longer a reliable indicator that the shard has a healthy serving primary, because now a primary needs to initialize its sidecar database and wait for that to replicate via semi-sync before it becomes serving and can proceed to perform normal functions. So this delay could cause test flakiness if you required a healthy shard before continuing with the test. Signed-off-by: Matt Lord * Try to address unit test race flakes around log size They looked like this: WARNING: DATA RACE Write at 0x000005bf9b60 by goroutine 27141: github.com/spf13/pflag.newUint64Value() /home/runner/go/pkg/mod/github.com/spf13/pflag@v1.0.5/uint64.go:9 +0x5a github.com/spf13/pflag.(*FlagSet).Uint64Var() /home/runner/go/pkg/mod/github.com/spf13/pflag@v1.0.5/uint64.go:45 +0x55 vitess.io/vitess/go/vt/log.RegisterFlags() /home/runner/work/vitess/vitess/go/vt/log/log.go:81 +0x64 vitess.io/vitess/go/vt/servenv.GetFlagSetFor() /home/runner/work/vitess/vitess/go/vt/servenv/servenv.go:347 +0x183 vitess.io/vitess/go/vt/servenv.ParseFlags() /home/runner/work/vitess/vitess/go/vt/servenv/servenv.go:326 +0x49 ... Previous read at 0x000005bf9b60 by goroutine 27136: 1744 github.com/golang/glog.(*syncBuffer).Write() ... And they most often occurred in the wrangler unit tests, which makes sense because it creates a log of loggers. Signed-off-by: Matt Lord * Revert "Try to address unit test race flakes around log size" This reverts commit 51992b8d8390648f2f22657633aec6f21614b266. Signed-off-by: Matt Lord * Use external cluster vtctld in TestMigrate Signed-off-by: Matt Lord * Use subshell vs command output interpolation Signed-off-by: Matt Lord * Ingnore any config files in mysql alias Signed-off-by: Matt Lord --------- Signed-off-by: Matt Lord --- examples/common/env.sh | 2 +- go/test/endtoend/cluster/cluster_util.go | 48 +++++++++++++ .../endtoend/vreplication/materialize_test.go | 23 ++++--- go/test/endtoend/vreplication/migrate_test.go | 8 ++- .../endtoend/vreplication/performance_test.go | 3 +- .../resharding_workflows_v2_test.go | 23 +++---- .../endtoend/vreplication/time_zone_test.go | 9 +-- go/test/endtoend/vreplication/vdiff2_test.go | 8 ++- .../vreplication/vreplication_test.go | 68 +++++++++++-------- .../vreplication/vschema_load_test.go | 4 +- go/test/endtoend/vreplication/vstream_test.go | 4 +- test/local_example.sh | 6 +- 12 files changed, 138 insertions(+), 68 deletions(-) diff --git a/examples/common/env.sh b/examples/common/env.sh index 3b6a23a26cb..adee0f34d3f 100644 --- a/examples/common/env.sh +++ b/examples/common/env.sh @@ -78,7 +78,7 @@ mkdir -p "${VTDATAROOT}/tmp" # In your own environment you may prefer to use config files, # such as ~/.my.cnf -alias mysql="command mysql -h 127.0.0.1 -P 15306" +alias mysql="command mysql --no-defaults -h 127.0.0.1 -P 15306" alias vtctlclient="command vtctlclient --server localhost:15999 --log_dir ${VTDATAROOT}/tmp --alsologtostderr" alias vtctldclient="command vtctldclient --server localhost:15999" diff --git a/go/test/endtoend/cluster/cluster_util.go b/go/test/endtoend/cluster/cluster_util.go index b7f3e33e716..ea2dd0d7e20 100644 --- a/go/test/endtoend/cluster/cluster_util.go +++ b/go/test/endtoend/cluster/cluster_util.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,6 +42,8 @@ var ( tmClient = tmc.NewClient() dbCredentialFile string InsertTabletTemplateKsID = `insert into %s (id, msg) values (%d, '%s') /* id:%d */` + defaultOperationTimeout = 60 * time.Second + defeaultRetryDelay = 1 * time.Second ) // Restart restarts vttablet and mysql. @@ -381,3 +384,48 @@ func WaitForTabletSetup(vtctlClientProcess *VtctlClientProcess, expectedTablets return fmt.Errorf("all %d tablet are not in expected state %s", expectedTablets, expectedStatus) } + +// WaitForHealthyShard waits for the given shard info record in the topo +// server to list a tablet (alias and uid) as the primary serving tablet +// for the shard. This is done using "vtctldclient GetShard" and parsing +// its JSON output. All other watchers should then also see this shard +// info status as well. +func WaitForHealthyShard(vtctldclient *VtctldClientProcess, keyspace, shard string) error { + var ( + tmr = time.NewTimer(defaultOperationTimeout) + res string + err error + json []byte + cell string + uid int64 + ) + for { + res, err = vtctldclient.ExecuteCommandWithOutput("GetShard", fmt.Sprintf("%s/%s", keyspace, shard)) + if err != nil { + return err + } + json = []byte(res) + + cell, err = jsonparser.GetString(json, "shard", "primary_alias", "cell") + if err != nil && err != jsonparser.KeyPathNotFoundError { + return err + } + uid, err = jsonparser.GetInt(json, "shard", "primary_alias", "uid") + if err != nil && err != jsonparser.KeyPathNotFoundError { + return err + } + + if cell != "" && uid > 0 { + return nil + } + + select { + case <-tmr.C: + return fmt.Errorf("timed out waiting for the %s/%s shard to become healthy in the topo after %v; last seen status: %s; last seen error: %v", + keyspace, shard, defaultOperationTimeout, res, err) + default: + } + + time.Sleep(defeaultRetryDelay) + } +} diff --git a/go/test/endtoend/vreplication/materialize_test.go b/go/test/endtoend/vreplication/materialize_test.go index 0016a0771dd..a13ec1d0da6 100644 --- a/go/test/endtoend/vreplication/materialize_test.go +++ b/go/test/endtoend/vreplication/materialize_test.go @@ -17,10 +17,11 @@ limitations under the License. package vreplication import ( - "fmt" "testing" "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" ) const smSchema = ` @@ -68,6 +69,7 @@ func testShardedMaterialize(t *testing.T) { vc = NewVitessCluster(t, "TestShardedMaterialize", allCells, mainClusterConfig) ks1 := "ks1" ks2 := "ks2" + shard := "0" require.NotNil(t, vc) defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets defer func() { defaultReplicas = 1 }() @@ -78,15 +80,17 @@ func testShardedMaterialize(t *testing.T) { vc.AddKeyspace(t, []*Cell{defaultCell}, ks1, "0", smVSchema, smSchema, defaultReplicas, defaultRdonly, 100, nil) vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ks1, "0"), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, ks1, shard) + require.NoError(t, err) vc.AddKeyspace(t, []*Cell{defaultCell}, ks2, "0", smVSchema, smSchema, defaultReplicas, defaultRdonly, 200, nil) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ks2, "0"), 1) + err = cluster.WaitForHealthyShard(vc.VtctldClient, ks2, shard) + require.NoError(t, err) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t, vc) - _, err := vtgateConn.ExecuteFetch(initDataQuery, 0, false) + _, err = vtgateConn.ExecuteFetch(initDataQuery, 0, false) require.NoError(t, err) materialize(t, smMaterializeSpec) tab := vc.getPrimaryTablet(t, ks2, "0") @@ -184,6 +188,7 @@ func testMaterialize(t *testing.T) { vc = NewVitessCluster(t, "TestMaterialize", allCells, mainClusterConfig) sourceKs := "source" targetKs := "target" + shard := "0" require.NotNil(t, vc) defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets defer func() { defaultReplicas = 1 }() @@ -194,19 +199,21 @@ func testMaterialize(t *testing.T) { vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, "0", smMaterializeVSchemaSource, smMaterializeSchemaSource, defaultReplicas, defaultRdonly, 300, nil) vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, "0"), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard) + require.NoError(t, err) vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, "0", smMaterializeVSchemaTarget, smMaterializeSchemaTarget, defaultReplicas, defaultRdonly, 400, nil) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, "0"), 1) + err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard) + require.NoError(t, err) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t, vc) - _, err := vtgateConn.ExecuteFetch(materializeInitDataQuery, 0, false) + _, err = vtgateConn.ExecuteFetch(materializeInitDataQuery, 0, false) require.NoError(t, err) - ks2Primary := vc.getPrimaryTablet(t, targetKs, "0") + ks2Primary := vc.getPrimaryTablet(t, targetKs, shard) _, err = ks2Primary.QueryTablet(customFunc, targetKs, true) require.NoError(t, err) diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go index 18745aea4cd..0c83658cee8 100644 --- a/go/test/endtoend/vreplication/migrate_test.go +++ b/go/test/endtoend/vreplication/migrate_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" ) func insertInitialDataIntoExternalCluster(t *testing.T, conn *mysql.Conn) { @@ -55,9 +56,10 @@ func TestMigrate(t *testing.T) { defaultCell = vc.Cells[defaultCellName] vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil) + err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0") + require.NoError(t, err) vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() @@ -76,12 +78,12 @@ func TestMigrate(t *testing.T) { extVtgate := extCell2.Vtgates[0] require.NotNil(t, extVtgate) - extVtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "rating", "0"), 1) + err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "0") + require.NoError(t, err) verifyClusterHealth(t, extVc) extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort) insertInitialDataIntoExternalCluster(t, extVtgateConn) - var err error var output, expected string ksWorkflow := "product.e1" diff --git a/go/test/endtoend/vreplication/performance_test.go b/go/test/endtoend/vreplication/performance_test.go index 1cd0d46bc3b..ce47e027f2d 100644 --- a/go/test/endtoend/vreplication/performance_test.go +++ b/go/test/endtoend/vreplication/performance_test.go @@ -63,7 +63,8 @@ create table customer(cid int, name varbinary(128), meta json default null, typ vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0") + require.NoError(t, err) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index fada3b621c2..58b92e0b65c 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -570,7 +570,8 @@ func setupCluster(t *testing.T) *VitessCluster { vtgate = zone1.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0") + require.NoError(t, err) vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 2) vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", "product", "0"), 1) @@ -590,12 +591,10 @@ func setupCustomerKeyspace(t *testing.T) { customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil { t.Fatal(err) } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "-80"), 1); err != nil { - t.Fatal(err) - } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "80-"), 1); err != nil { - t.Fatal(err) - } + err := cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "-80") + require.NoError(t, err) + err = cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "80-") + require.NoError(t, err) if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "customer", "-80"), 2); err != nil { t.Fatal(err) } @@ -623,9 +622,8 @@ func setupCustomer2Keyspace(t *testing.T) { t.Fatal(err) } for _, c2shard := range c2shards { - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", c2keyspace, c2shard), 1); err != nil { - t.Fatal(err) - } + err := cluster.WaitForHealthyShard(vc.VtctldClient, c2keyspace, c2shard) + require.NoError(t, err) if defaultReplicas > 0 { if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", c2keyspace, c2shard), defaultReplicas); err != nil { t.Fatal(err) @@ -758,9 +756,8 @@ func createAdditionalCustomerShards(t *testing.T, shards string) { arrTargetShardNames := strings.Split(shards, ",") for _, shardName := range arrTargetShardNames { - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ksName, shardName), 1); err != nil { - require.NoError(t, err) - } + err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shardName) + require.NoError(t, err) if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shardName), 2); err != nil { require.NoError(t, err) } diff --git a/go/test/endtoend/vreplication/time_zone_test.go b/go/test/endtoend/vreplication/time_zone_test.go index b10cd55e048..f5d57eac9df 100644 --- a/go/test/endtoend/vreplication/time_zone_test.go +++ b/go/test/endtoend/vreplication/time_zone_test.go @@ -36,6 +36,7 @@ func TestMoveTablesTZ(t *testing.T) { workflow := "tz" sourceKs := "product" targetKs := "customer" + shard := "0" ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) ksReverseWorkflow := fmt.Sprintf("%s.%s_reverse", sourceKs, workflow) @@ -51,7 +52,8 @@ func TestMoveTablesTZ(t *testing.T) { vtgate = cell1.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard) + require.NoError(t, err) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() @@ -87,9 +89,8 @@ func TestMoveTablesTZ(t *testing.T) { if _, err := vc.AddKeyspace(t, cells, targetKs, "0", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, targetKsOpts); err != nil { t.Fatal(err) } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "0"), 1); err != nil { - t.Fatal(err) - } + err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard) + require.NoError(t, err) defaultCell := vc.Cells["zone1"] custKs := vc.Cells[defaultCell.Name].Keyspaces[targetKs] diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index c20fc435b84..06eddf95c9b 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -23,6 +23,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" ) type testCase struct { @@ -121,7 +123,7 @@ func TestVDiff2(t *testing.T) { vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) for _, shard := range sourceShards { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1)) + require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard)) } vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) @@ -139,7 +141,7 @@ func TestVDiff2(t *testing.T) { _, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts) require.NoError(t, err) for _, shard := range targetShards { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1)) + require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard)) } for _, tc := range testCases { @@ -155,7 +157,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) tks := vc.Cells[cells[0].Name].Keyspaces[tc.targetKs] require.NoError(t, vc.AddShards(t, cells, tks, tc.targetShards, 0, 0, tc.tabletBaseID, targetKsOpts)) for _, shard := range arrTargetShards { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", tc.targetKs, shard), 1)) + require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, tc.targetKs, shard)) } } ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 8844b738156..3f8f7f91997 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -137,8 +137,10 @@ func TestVReplicationDDLHandling(t *testing.T) { } vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard) + require.NoError(t, err) + err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard) + require.NoError(t, err) verifyClusterHealth(t, vc) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) @@ -148,7 +150,7 @@ func TestVReplicationDDLHandling(t *testing.T) { insertInitialData(t) - _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", sourceKs), 1, false) + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", sourceKs), 1, false) require.NoError(t, err) addColDDL := fmt.Sprintf("alter table %s add column %s varchar(64)", table, newColumn) @@ -231,8 +233,10 @@ func TestVreplicationCopyThrottling(t *testing.T) { } vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard) + require.NoError(t, err) + err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard) + require.NoError(t, err) // Confirm that the initial copy table phase does not proceed until the source tablet(s) // have an InnoDB History List length that is less than specified in the tablet's config. @@ -290,7 +294,8 @@ func testVreplicationWorkflows(t *testing.T, minimal bool) { vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts) vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0") + require.NoError(t, err) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() @@ -360,17 +365,20 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) { require.NotNil(t, vc) defaultCellName := "zone1" defaultCell = vc.Cells[defaultCellName] + keyspace := "product" + shard := "0" defer vc.TearDown(t) cell1 := vc.Cells["zone1"] cell2 := vc.Cells["zone2"] - vc.AddKeyspace(t, []*Cell{cell1, cell2}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts) + vc.AddKeyspace(t, []*Cell{cell1, cell2}, keyspace, shard, initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts) vtgate = cell1.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 2) + err := cluster.WaitForHealthyShard(vc.VtctldClient, keyspace, shard) + require.NoError(t, err) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspace, shard), 2) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() @@ -405,8 +413,10 @@ func TestVStreamFlushBinlog(t *testing.T) { } vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard) + require.NoError(t, err) + err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard) + require.NoError(t, err) verifyClusterHealth(t, vc) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) @@ -551,12 +561,14 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { allCellNames = "zone1,zone2" defaultCellName := "zone1" defaultCell = vc.Cells[defaultCellName] + keyspace := "product" + shard := "0" defer vc.TearDown(t) cell1 := vc.Cells["zone1"] cell2 := vc.Cells["zone2"] - vc.AddKeyspace(t, []*Cell{cell1, cell2}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts) + vc.AddKeyspace(t, []*Cell{cell1, cell2}, keyspace, shard, initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts) // Add cell alias containing only zone2 result, err := vc.VtctlClient.ExecuteCommandWithOutput("AddCellsAlias", "--", "--cells", "zone2", "alias") @@ -564,8 +576,9 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { vtgate = cell1.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 2) + err = cluster.WaitForHealthyShard(vc.VtctldClient, keyspace, shard) + require.NoError(t, err) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspace, shard), 2) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() @@ -573,7 +586,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { insertInitialData(t) t.Run("VStreamFrom", func(t *testing.T) { - testVStreamFrom(t, "product", 2) + testVStreamFrom(t, keyspace, 2) }) shardCustomer(t, true, []*Cell{cell1, cell2}, "alias", false) } @@ -698,12 +711,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl if _, err := vc.AddKeyspace(t, cells, "customer", "-80,80-", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, targetKsOpts); err != nil { t.Fatal(err) } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "-80"), 1); err != nil { - t.Fatal(err) - } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "80-"), 1); err != nil { - t.Fatal(err) - } + err := cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, "-80") + require.NoError(t, err) + err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, "80-") + require.NoError(t, err) // Assume we are operating on first cell defaultCell := cells[0] @@ -976,9 +987,8 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou arrTargetShardNames := strings.Split(targetShards, ",") for _, shardName := range arrTargetShardNames { - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ksName, shardName), 1); err != nil { - t.Fatal(err) - } + err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shardName) + require.NoError(t, err) } tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary") @@ -1075,12 +1085,10 @@ func shardMerchant(t *testing.T) { if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, merchantKeyspace, "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400, targetKsOpts); err != nil { t.Fatal(err) } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", merchantKeyspace, "-80"), 1); err != nil { - t.Fatal(err) - } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", merchantKeyspace, "80-"), 1); err != nil { - t.Fatal(err) - } + err := cluster.WaitForHealthyShard(vc.VtctldClient, merchantKeyspace, "-80") + require.NoError(t, err) + err = cluster.WaitForHealthyShard(vc.VtctldClient, merchantKeyspace, "80-") + require.NoError(t, err) moveTablesAction(t, "Create", cell, workflow, sourceKs, targetKs, tables) merchantKs := vc.Cells[defaultCell.Name].Keyspaces[merchantKeyspace] merchantTab1 := merchantKs.Shards["-80"].Tablets["zone1-400"].Vttablet diff --git a/go/test/endtoend/vreplication/vschema_load_test.go b/go/test/endtoend/vreplication/vschema_load_test.go index 731679e1eba..5d20d7f2d32 100644 --- a/go/test/endtoend/vreplication/vschema_load_test.go +++ b/go/test/endtoend/vreplication/vschema_load_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -52,7 +53,8 @@ func TestVSchemaChangesUnderLoad(t *testing.T) { vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, 1, 0, 100, sourceKsOpts) vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0") + require.NoError(t, err) vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 1) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index d011b23dbec..fa95a28dbb7 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -245,7 +246,8 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) - vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1) + err := cluster.WaitForHealthyShard(vc.VtctldClient, "unsharded", "0") + require.NoError(t, err) vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() diff --git a/test/local_example.sh b/test/local_example.sh index e71855f7960..792c20d4e09 100755 --- a/test/local_example.sh +++ b/test/local_example.sh @@ -38,7 +38,7 @@ mysql --table < ../common/select_commerce_data.sql for shard in "customer/0"; do while true; do - if $(mysql "$shard" -e 'show tables' &>/dev/null); then + if (mysql "$shard" -e 'show tables' &>/dev/null); then break fi echo -e "waiting for shard: $shard ..." @@ -62,7 +62,7 @@ mysql --table < ../common/select_customer0_data.sql ./205_clean_commerce.sh # We expect this to fail as the keyspace is now gone. -$(mysql --table < ../common/select_commerce_data.sql &>/dev/null || true) +(mysql --table < ../common/select_commerce_data.sql &>/dev/null || true) ./301_customer_sharded.sh ./302_new_shards.sh @@ -71,7 +71,7 @@ $(mysql --table < ../common/select_commerce_data.sql &>/dev/null || true) # TODO: Eliminate this race in the examples' scripts for shard in "customer/-80" "customer/80-"; do while true; do - if $(mysql "$shard" -e 'show tables' &>/dev/null); then + if (mysql "$shard" -e 'show tables' &>/dev/null); then break fi echo -e "waiting for shard: $shard ..."