diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 7201fa70652..9af1a2ba5ab 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -23,9 +23,9 @@ import ( "math/rand" "os" "path" + "runtime" "strings" "sync" - "sync/atomic" "testing" "time" @@ -135,12 +135,14 @@ var ( writeMetrics WriteMetrics ) +var ( + countIterations = 5 +) + const ( - maxTableRows = 4096 - maxConcurrency = 20 - singleConnectionSleepInterval = 2 * time.Millisecond - countIterations = 5 - migrationWaitTimeout = 60 * time.Second + maxTableRows = 4096 + workloadDuration = 5 * time.Second + migrationWaitTimeout = 60 * time.Second ) func resetOpOrder() { @@ -173,7 +175,7 @@ func TestMain(m *testing.M) { clusterInstance.VtctldExtraArgs = []string{ "--schema_change_dir", schemaChangeDirectory, "--schema_change_controller", "local", - "--schema_change_check_interval", "1", + "--schema_change_check_interval", "1s", } clusterInstance.VtTabletExtraArgs = []string{ @@ -220,15 +222,15 @@ func TestMain(m *testing.M) { if err != nil { fmt.Printf("%v\n", err) os.Exit(1) - } else { - os.Exit(exitcode) } - + os.Exit(exitcode) } func TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) + ctx := context.Background() + shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) @@ -251,16 +253,17 @@ func TestSchemaChange(t *testing.T) { // that our testing/metrics logic is sound in the first place. testName := fmt.Sprintf("workload without ALTER TABLE %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) initTable(t) + + ctx, cancel := context.WithTimeout(ctx, workloadDuration) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() runMultipleConnections(ctx, t) }() - time.Sleep(5 * time.Second) - cancel() // will cause runMultipleConnections() to terminate wg.Wait() testSelectTableMetrics(t) }) @@ -285,7 +288,7 @@ func TestSchemaChange(t *testing.T) { // the vreplication/ALTER TABLE did not corrupt our data and we are happy. testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() t.Run("create schema", func(t *testing.T) { testWithInitialSchema(t) }) @@ -293,6 +296,9 @@ func TestSchemaChange(t *testing.T) { initTable(t) }) t.Run("migrate", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -302,7 +308,7 @@ func TestSchemaChange(t *testing.T) { hint := fmt.Sprintf("hint-alter-with-workload-%d", i) uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), onlineDDLStrategy, "vtgate", hint) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) - cancel() // will cause runMultipleConnections() to terminate + cancel() // Now that the migration is complete, we can stop the workload. wg.Wait() }) t.Run("validate metrics", func(t *testing.T) { @@ -371,7 +377,11 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + rowcount := 0 + for { queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) require.Nil(t, err) @@ -381,11 +391,15 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri } select { - case <-time.After(time.Second): + case <-ticker.C: + continue // Keep looping case <-ctx.Done(): - break + // Break below to the assertion } + + break } + assert.Equal(t, expectCount, rowcount) } @@ -480,7 +494,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } -func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { +func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) { log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -491,11 +505,10 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) require.Nil(t, err) + ticker := time.NewTicker(sleepInterval) + defer ticker.Stop() + for { - if atomic.LoadInt64(done) == 1 { - log.Infof("Terminating single connection") - return - } switch rand.Int31n(3) { case 0: err = generateInsert(t, conn) @@ -504,27 +517,39 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { case 2: err = generateDelete(t, conn) } + select { + case <-ctx.Done(): + log.Infof("Terminating single connection") + return + case <-ticker.C: + } assert.Nil(t, err) - time.Sleep(singleConnectionSleepInterval) } } func runMultipleConnections(ctx context.Context, t *testing.T) { - log.Infof("Running multiple connections") - var done int64 + // The workload for a 16 vCPU machine is: + // - Concurrency of 16 + // - 2ms interval between queries for each connection + // As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine + // we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms. + maxConcurrency := runtime.NumCPU() + sleepModifier := 16.0 / float64(maxConcurrency) + baseSleepInterval := 2 * time.Millisecond + singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier + sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) + + log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval) var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() - runSingleConnection(ctx, t, &done) + runSingleConnection(ctx, t, sleepInterval) }() } - <-ctx.Done() - atomic.StoreInt64(&done, 1) - log.Infof("Running multiple connections: done") wg.Wait() - log.Infof("All connections cancelled") + log.Infof("Running multiple connections: done") } func initTable(t *testing.T) { diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 5052065082b..b93c43bb7eb 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -246,9 +246,13 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c for _, status := range expectStatuses { statusesMap[string(status)] = true } - startTime := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + lastKnownStatus := "" - for time.Since(startTime) < timeout { + for { countMatchedShards := 0 r := VtgateExecQuery(t, vtParams, query, "") for _, row := range r.Named().Rows { @@ -265,9 +269,12 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c if countMatchedShards == len(shards) { return schema.OnlineDDLStatus(lastKnownStatus) } - time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } } - return schema.OnlineDDLStatus(lastKnownStatus) } // CheckMigrationArtifacts verifies given migration exists, and checks if it has artifacts