Skip to content

Commit

Permalink
Flakes: Use new healthy shard check in vreplication e2e tests (#12502)
Browse files Browse the repository at this point in the history
* Use new healthy shard check in vreplication e2e tests

This is needed because checking that there's a primary tablet for
the shard in vtgate's healtcheck is no longer a reliable indicator
that the shard has a healthy serving primary, because now a
primary needs to initialize its sidecar database and wait for
that to replicate via semi-sync before it becomes serving and can
proceed to perform normal functions. So this delay could cause
test flakiness if you required a healthy shard before continuing
with the test.

Signed-off-by: Matt Lord <[email protected]>

* Try to address unit test race flakes around log size

They looked like this:
WARNING: DATA RACE
Write at 0x000005bf9b60 by goroutine 27141:
  github.com/spf13/pflag.newUint64Value()
      /home/runner/go/pkg/mod/github.com/spf13/[email protected]/uint64.go:9 +0x5a
  github.com/spf13/pflag.(*FlagSet).Uint64Var()
      /home/runner/go/pkg/mod/github.com/spf13/[email protected]/uint64.go:45 +0x55
  vitess.io/vitess/go/vt/log.RegisterFlags()
      /home/runner/work/vitess/vitess/go/vt/log/log.go:81 +0x64
  vitess.io/vitess/go/vt/servenv.GetFlagSetFor()
      /home/runner/work/vitess/vitess/go/vt/servenv/servenv.go:347 +0x183
  vitess.io/vitess/go/vt/servenv.ParseFlags()
      /home/runner/work/vitess/vitess/go/vt/servenv/servenv.go:326 +0x49
...
Previous read at 0x000005bf9b60 by goroutine 27136:
1744
  github.com/golang/glog.(*syncBuffer).Write()
...

And they most often occurred in the wrangler unit tests, which makes sense
because it creates a log of loggers.

Signed-off-by: Matt Lord <[email protected]>

* Revert "Try to address unit test race flakes around log size"

This reverts commit 51992b8.

Signed-off-by: Matt Lord <[email protected]>

* Use external cluster vtctld in TestMigrate

Signed-off-by: Matt Lord <[email protected]>

* Use subshell vs command output interpolation

Signed-off-by: Matt Lord <[email protected]>

* Ingnore any config files in mysql alias

Signed-off-by: Matt Lord <[email protected]>

---------

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Mar 1, 2023
1 parent cdebb7e commit 96c3dca
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 68 deletions.
2 changes: 1 addition & 1 deletion examples/common/env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mkdir -p "${VTDATAROOT}/tmp"
# In your own environment you may prefer to use config files,
# such as ~/.my.cnf

alias mysql="command mysql -h 127.0.0.1 -P 15306"
alias mysql="command mysql --no-defaults -h 127.0.0.1 -P 15306"
alias vtctlclient="command vtctlclient --server localhost:15999 --log_dir ${VTDATAROOT}/tmp --alsologtostderr"
alias vtctldclient="command vtctldclient --server localhost:15999"

Expand Down
48 changes: 48 additions & 0 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"
"time"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -41,6 +42,8 @@ var (
tmClient = tmc.NewClient()
dbCredentialFile string
InsertTabletTemplateKsID = `insert into %s (id, msg) values (%d, '%s') /* id:%d */`
defaultOperationTimeout = 60 * time.Second
defeaultRetryDelay = 1 * time.Second
)

// Restart restarts vttablet and mysql.
Expand Down Expand Up @@ -381,3 +384,48 @@ func WaitForTabletSetup(vtctlClientProcess *VtctlClientProcess, expectedTablets

return fmt.Errorf("all %d tablet are not in expected state %s", expectedTablets, expectedStatus)
}

// WaitForHealthyShard waits for the given shard info record in the topo
// server to list a tablet (alias and uid) as the primary serving tablet
// for the shard. This is done using "vtctldclient GetShard" and parsing
// its JSON output. All other watchers should then also see this shard
// info status as well.
func WaitForHealthyShard(vtctldclient *VtctldClientProcess, keyspace, shard string) error {
var (
tmr = time.NewTimer(defaultOperationTimeout)
res string
err error
json []byte
cell string
uid int64
)
for {
res, err = vtctldclient.ExecuteCommandWithOutput("GetShard", fmt.Sprintf("%s/%s", keyspace, shard))
if err != nil {
return err
}
json = []byte(res)

cell, err = jsonparser.GetString(json, "shard", "primary_alias", "cell")
if err != nil && err != jsonparser.KeyPathNotFoundError {
return err
}
uid, err = jsonparser.GetInt(json, "shard", "primary_alias", "uid")
if err != nil && err != jsonparser.KeyPathNotFoundError {
return err
}

if cell != "" && uid > 0 {
return nil
}

select {
case <-tmr.C:
return fmt.Errorf("timed out waiting for the %s/%s shard to become healthy in the topo after %v; last seen status: %s; last seen error: %v",
keyspace, shard, defaultOperationTimeout, res, err)
default:
}

time.Sleep(defeaultRetryDelay)
}
}
23 changes: 15 additions & 8 deletions go/test/endtoend/vreplication/materialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ limitations under the License.
package vreplication

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
)

const smSchema = `
Expand Down Expand Up @@ -68,6 +69,7 @@ func testShardedMaterialize(t *testing.T) {
vc = NewVitessCluster(t, "TestShardedMaterialize", allCells, mainClusterConfig)
ks1 := "ks1"
ks2 := "ks2"
shard := "0"
require.NotNil(t, vc)
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func() { defaultReplicas = 1 }()
Expand All @@ -78,15 +80,17 @@ func testShardedMaterialize(t *testing.T) {
vc.AddKeyspace(t, []*Cell{defaultCell}, ks1, "0", smVSchema, smSchema, defaultReplicas, defaultRdonly, 100, nil)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ks1, "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, ks1, shard)
require.NoError(t, err)

vc.AddKeyspace(t, []*Cell{defaultCell}, ks2, "0", smVSchema, smSchema, defaultReplicas, defaultRdonly, 200, nil)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ks2, "0"), 1)
err = cluster.WaitForHealthyShard(vc.VtctldClient, ks2, shard)
require.NoError(t, err)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)
_, err := vtgateConn.ExecuteFetch(initDataQuery, 0, false)
_, err = vtgateConn.ExecuteFetch(initDataQuery, 0, false)
require.NoError(t, err)
materialize(t, smMaterializeSpec)
tab := vc.getPrimaryTablet(t, ks2, "0")
Expand Down Expand Up @@ -184,6 +188,7 @@ func testMaterialize(t *testing.T) {
vc = NewVitessCluster(t, "TestMaterialize", allCells, mainClusterConfig)
sourceKs := "source"
targetKs := "target"
shard := "0"
require.NotNil(t, vc)
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func() { defaultReplicas = 1 }()
Expand All @@ -194,19 +199,21 @@ func testMaterialize(t *testing.T) {
vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, "0", smMaterializeVSchemaSource, smMaterializeSchemaSource, defaultReplicas, defaultRdonly, 300, nil)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard)
require.NoError(t, err)

vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, "0", smMaterializeVSchemaTarget, smMaterializeSchemaTarget, defaultReplicas, defaultRdonly, 400, nil)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, "0"), 1)
err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard)
require.NoError(t, err)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

_, err := vtgateConn.ExecuteFetch(materializeInitDataQuery, 0, false)
_, err = vtgateConn.ExecuteFetch(materializeInitDataQuery, 0, false)
require.NoError(t, err)

ks2Primary := vc.getPrimaryTablet(t, targetKs, "0")
ks2Primary := vc.getPrimaryTablet(t, targetKs, shard)
_, err = ks2Primary.QueryTablet(customFunc, targetKs, true)
require.NoError(t, err)

Expand Down
8 changes: 5 additions & 3 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
)

func insertInitialDataIntoExternalCluster(t *testing.T, conn *mysql.Conn) {
Expand Down Expand Up @@ -55,9 +56,10 @@ func TestMigrate(t *testing.T) {

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0")
require.NoError(t, err)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
Expand All @@ -76,12 +78,12 @@ func TestMigrate(t *testing.T) {
extVtgate := extCell2.Vtgates[0]
require.NotNil(t, extVtgate)

extVtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "rating", "0"), 1)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "0")
require.NoError(t, err)
verifyClusterHealth(t, extVc)
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort)
insertInitialDataIntoExternalCluster(t, extVtgateConn)

var err error
var output, expected string
ksWorkflow := "product.e1"

Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ create table customer(cid int, name varbinary(128), meta json default null, typ
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)

vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0")
require.NoError(t, err)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
Expand Down
23 changes: 10 additions & 13 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ func setupCluster(t *testing.T) *VitessCluster {

vtgate = zone1.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0")
require.NoError(t, err)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 2)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", "product", "0"), 1)

Expand All @@ -590,12 +591,10 @@ func setupCustomerKeyspace(t *testing.T) {
customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil {
t.Fatal(err)
}
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "-80"), 1); err != nil {
t.Fatal(err)
}
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "80-"), 1); err != nil {
t.Fatal(err)
}
err := cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "-80")
require.NoError(t, err)
err = cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "80-")
require.NoError(t, err)
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "customer", "-80"), 2); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -623,9 +622,8 @@ func setupCustomer2Keyspace(t *testing.T) {
t.Fatal(err)
}
for _, c2shard := range c2shards {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", c2keyspace, c2shard), 1); err != nil {
t.Fatal(err)
}
err := cluster.WaitForHealthyShard(vc.VtctldClient, c2keyspace, c2shard)
require.NoError(t, err)
if defaultReplicas > 0 {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", c2keyspace, c2shard), defaultReplicas); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -758,9 +756,8 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
arrTargetShardNames := strings.Split(shards, ",")

for _, shardName := range arrTargetShardNames {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ksName, shardName), 1); err != nil {
require.NoError(t, err)
}
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shardName)
require.NoError(t, err)
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shardName), 2); err != nil {
require.NoError(t, err)
}
Expand Down
9 changes: 5 additions & 4 deletions go/test/endtoend/vreplication/time_zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestMoveTablesTZ(t *testing.T) {
workflow := "tz"
sourceKs := "product"
targetKs := "customer"
shard := "0"
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
ksReverseWorkflow := fmt.Sprintf("%s.%s_reverse", sourceKs, workflow)

Expand All @@ -51,7 +52,8 @@ func TestMoveTablesTZ(t *testing.T) {

vtgate = cell1.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard)
require.NoError(t, err)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
Expand Down Expand Up @@ -87,9 +89,8 @@ func TestMoveTablesTZ(t *testing.T) {
if _, err := vc.AddKeyspace(t, cells, targetKs, "0", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, targetKsOpts); err != nil {
t.Fatal(err)
}
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "0"), 1); err != nil {
t.Fatal(err)
}
err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard)
require.NoError(t, err)

defaultCell := vc.Cells["zone1"]
custKs := vc.Cells[defaultCell.Name].Keyspaces[targetKs]
Expand Down
8 changes: 5 additions & 3 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
)

type testCase struct {
Expand Down Expand Up @@ -121,7 +123,7 @@ func TestVDiff2(t *testing.T) {
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
for _, shard := range sourceShards {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1))
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard))
}

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
Expand All @@ -139,7 +141,7 @@ func TestVDiff2(t *testing.T) {
_, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
require.NoError(t, err)
for _, shard := range targetShards {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1))
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard))
}

for _, tc := range testCases {
Expand All @@ -155,7 +157,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
tks := vc.Cells[cells[0].Name].Keyspaces[tc.targetKs]
require.NoError(t, vc.AddShards(t, cells, tks, tc.targetShards, 0, 0, tc.tabletBaseID, targetKsOpts))
for _, shard := range arrTargetShards {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", tc.targetKs, shard), 1))
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, tc.targetKs, shard))
}
}
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)
Expand Down
Loading

0 comments on commit 96c3dca

Please sign in to comment.