Skip to content

Commit

Permalink
More aggressive tests for vitess migration cut-over (#9956)
Browse files Browse the repository at this point in the history
* More aggressive tests for vitess migration cut-over

Signed-off-by: Shlomi Noach <[email protected]>

* restore original concurrency

Signed-off-by: Shlomi Noach <[email protected]>

* kick build

Signed-off-by: Shlomi Noach <[email protected]>

Co-authored-by: Florent Poinsard <[email protected]>
  • Loading branch information
shlomi-noach and frouioui authored Mar 27, 2022
1 parent 270cf96 commit 1836b8d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,20 @@ deletesAttempts=%d, deletesFailures=%d, deletesNoops=%d, deletes=%d,
}

var (
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
evaluatedMysqlParams *mysql.ConnParams
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams

opOrder int64
opOrderMutex sync.Mutex
onlineDDLStrategy = "online -vreplication-test-suite"
onlineDDLStrategy = "vitess"
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
schemaChangeDirectory = ""
tableName = `stress_test`
afterTableName = `stress_test_after`
cleanupStatements = []string{
`DROP TABLE IF EXISTS stress_test`,
`DROP TABLE IF EXISTS stress_test_before`,
`DROP TABLE IF EXISTS stress_test_after`,
}
createStatement = `
CREATE TABLE stress_test (
Expand All @@ -126,32 +122,13 @@ var (
deleteRowStatement = `
DELETE FROM stress_test WHERE id=%d AND updates=1
`
selectMaxOpOrder = `
SELECT MAX(op_order) as m FROM stress_test
`
// We use CAST(SUM(updates) AS SIGNED) because SUM() returns a DECIMAL datatype, and we want to read a SIGNED INTEGER type
selectCountRowsStatement = `
SELECT COUNT(*) AS num_rows, CAST(SUM(updates) AS SIGNED) AS sum_updates FROM stress_test
`
// We use CAST(SUM(updates) AS SIGNED) because SUM() returns a DECIMAL datatype, and we want to read a SIGNED INTEGER type
selectCountRowsFromAfterTableStatement = `
SELECT COUNT(*) AS num_rows, CAST(SUM(updates) AS SIGNED) AS sum_updates FROM stress_test_after
`
selectCountFromTableBefore = `
SELECT count(*) as c FROM stress_test_before
`
selectCountFromTableAfter = `
SELECT count(*) as c FROM stress_test_after
`
selectMaxOpOrderFromTableBefore = `
SELECT MAX(op_order) as m FROM stress_test_before
`
selectMaxOpOrderFromTableAfter = `
SELECT MAX(op_order) as m FROM stress_test_after
`
selectBeforeTable = `
SELECT * FROM stress_test_before order by id
`
selectAfterTable = `
SELECT * FROM stress_test_after order by id
`
truncateStatement = `
TRUNCATE TABLE stress_test
`
Expand All @@ -178,22 +155,6 @@ func nextOpOrder() int64 {
return opOrder
}

func getTablet() *cluster.Vttablet {
return clusterInstance.Keyspaces[0].Shards[0].Vttablets[0]
}

func mysqlParams() *mysql.ConnParams {
if evaluatedMysqlParams != nil {
return evaluatedMysqlParams
}
evaluatedMysqlParams = &mysql.ConnParams{
Uname: "vt_dba",
UnixSocket: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", getTablet().TabletUID), "/mysql.sock"),
DbName: fmt.Sprintf("vt_%s", keyspaceName),
}
return evaluatedMysqlParams
}

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
Expand Down Expand Up @@ -311,7 +272,7 @@ func TestSchemaChange(t *testing.T) {
hint := "hint-alter-without-workload"
uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), onlineDDLStrategy, "vtgate", hint)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
testSelectTableMetricsAfterMigration(t)
testSelectTableMetrics(t)
})

for i := 0; i < countIterations; i++ {
Expand Down Expand Up @@ -344,7 +305,7 @@ func TestSchemaChange(t *testing.T) {
wg.Wait()
})
t.Run("validate metrics", func(t *testing.T) {
testSelectTableMetricsAfterMigration(t)
testSelectTableMetrics(t)
})
})
}
Expand Down Expand Up @@ -388,7 +349,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
}

if expectHint != "" {
checkMigratedTable(t, afterTableName, expectHint)
checkMigratedTable(t, tableName, expectHint)
}
return uuid
}
Expand Down Expand Up @@ -523,12 +484,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
case 2:
err = generateDelete(t, conn)
}
if err != nil {
if strings.Contains(err.Error(), "doesn't exist") {
// Table renamed to _before, due to -vreplication-test-suite flag
err = nil
}
}
assert.Nil(t, err)
time.Sleep(singleConnectionSleepInterval)
}
Expand Down Expand Up @@ -585,99 +540,39 @@ func initTable(t *testing.T) {
}
}

func testSelectTableMetricsWithStatement(t *testing.T, statement string) {
func testSelectTableMetrics(t *testing.T) {
writeMetrics.mu.Lock()
defer writeMetrics.mu.Unlock()

{
rs := onlineddl.VtgateExecQuery(t, &vtParams, selectMaxOpOrder, "")
row := rs.Named().Row()
require.NotNil(t, row)

maxOpOrder := row.AsInt64("m", 0)
fmt.Printf("# max op_order in table: %d\n", maxOpOrder)
}

log.Infof("%s", writeMetrics.String())

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()

rs, err := conn.ExecuteFetch(statement, 1000, true)
rs, err := conn.ExecuteFetch(selectCountRowsStatement, 1000, true)
require.Nil(t, err)

row := rs.Named().Row()
require.NotNil(t, row)
log.Infof("testSelectTableMetrics, row: %v", row)
numRows := row.AsInt64("num_rows", 0)
sumUpdates := row.AsInt64("sum_updates", 0)

assert.NotZero(t, numRows)
assert.NotZero(t, sumUpdates)
assert.NotZero(t, writeMetrics.inserts)
assert.NotZero(t, writeMetrics.deletes)
assert.NotZero(t, writeMetrics.updates)
assert.Equal(t, writeMetrics.inserts-writeMetrics.deletes, numRows)
}

func testSelectTableMetrics(t *testing.T) {
testSelectTableMetricsWithStatement(t, selectCountRowsStatement)
}

func testSelectTableMetricsAfterMigration(t *testing.T) {
writeMetrics.mu.Lock()
defer writeMetrics.mu.Unlock()

var countBefore int64
{
// Validate after table is populated
rs := onlineddl.VtgateExecQuery(t, &vtParams, selectCountFromTableBefore, "")
row := rs.Named().Row()
require.NotNil(t, row)

countBefore = row.AsInt64("c", 0)
require.NotZero(t, countBefore)
require.Less(t, countBefore, int64(maxTableRows))

fmt.Printf("# count rows in table (before): %d\n", countBefore)
}
var countAfter int64
{
// Validate after table is populated
rs := onlineddl.VtgateExecQuery(t, &vtParams, selectCountFromTableAfter, "")
row := rs.Named().Row()
require.NotNil(t, row)

countAfter = row.AsInt64("c", 0)
require.NotZero(t, countAfter)
require.Less(t, countAfter, int64(maxTableRows))

fmt.Printf("# count rows in table (after): %d\n", countAfter)
}
{
rs := onlineddl.VtgateExecQuery(t, &vtParams, selectMaxOpOrderFromTableBefore, "")
row := rs.Named().Row()
require.NotNil(t, row)

maxOpOrder := row.AsInt64("m", 0)
fmt.Printf("# max op_order in table (before): %d\n", maxOpOrder)
}
{
rs := onlineddl.VtgateExecQuery(t, &vtParams, selectMaxOpOrderFromTableAfter, "")
row := rs.Named().Row()
require.NotNil(t, row)

maxOpOrder := row.AsInt64("m", 0)
fmt.Printf("# max op_order in table (after): %d\n", maxOpOrder)
}

testSelectTableMetricsWithStatement(t, selectCountRowsFromAfterTableStatement)

{
selectBeforeFile := onlineddl.CreateTempScript(t, selectBeforeTable)
defer os.Remove(selectBeforeFile)
beforeOutput := onlineddl.MysqlClientExecFile(t, mysqlParams(), os.TempDir(), "", selectBeforeFile)
beforeOutput = strings.TrimSpace(beforeOutput)
require.NotEmpty(t, beforeOutput)
assert.Equal(t, countBefore, int64(len(strings.Split(beforeOutput, "\n"))))

selectAfterFile := onlineddl.CreateTempScript(t, selectAfterTable)
defer os.Remove(selectAfterFile)
afterOutput := onlineddl.MysqlClientExecFile(t, mysqlParams(), os.TempDir(), "", selectAfterFile)
afterOutput = strings.TrimSpace(afterOutput)
require.NotEmpty(t, afterOutput)
assert.Equal(t, countAfter, int64(len(strings.Split(afterOutput, "\n"))))

require.Equal(t, beforeOutput, afterOutput, "results mismatch: (%s) and (%s)", selectBeforeTable, selectAfterTable)
}
assert.Equal(t, writeMetrics.updates-writeMetrics.deletes, sumUpdates) // because we DELETE WHERE updates=1
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type testcase struct {
expectAddedUniqueKeys int64
// expectRemovedUniqueKeys is the number of alleviated constraints
expectRemovedUniqueKeys int64
// autoIncInsert is a special case where we don't generate id values. It's a specific test case.
autoIncInsert bool
}

var (
Expand Down Expand Up @@ -108,6 +110,12 @@ var (
prepareStatement: "",
alterStatement: "engine=innodb",
},
{
name: "autoinc PK",
prepareStatement: "modify id bigint not null auto_increment",
alterStatement: "engine=innodb",
autoIncInsert: true,
},
{
name: "UK similar to PK, no PK",
prepareStatement: "add unique key id_uidx(id)",
Expand Down Expand Up @@ -321,6 +329,9 @@ var (
alter table stress_test modify hint_col varchar(64) not null default '%s'
`

insertRowAutoIncStatement = `
INSERT IGNORE INTO stress_test (id, id_negative, rand_text, rand_num, op_order) VALUES (NULL, %d, concat(left(md5(%d), 8), '_', %d), floor(rand()*1000000), %d)
`
insertRowStatement = `
INSERT IGNORE INTO stress_test (id, id_negative, rand_text, rand_num, op_order) VALUES (%d, %d, concat(left(md5(%d), 8), '_', %d), floor(rand()*1000000), %d)
`
Expand Down Expand Up @@ -506,7 +517,7 @@ func TestSchemaChange(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
runMultipleConnections(ctx, t)
runMultipleConnections(ctx, t, testcase.autoIncInsert)
}()
uuid := testOnlineDDLStatement(t, fullStatement, onlineDDLStrategy, "vtgate", hintText)
expectStatus := schema.OnlineDDLStatusComplete
Expand All @@ -517,7 +528,7 @@ func TestSchemaChange(t *testing.T) {
cancel() // will cause runMultipleConnections() to terminate
wg.Wait()
if !testcase.expectFailure {
testCompareBeforeAfterTables(t)
testCompareBeforeAfterTables(t, testcase.autoIncInsert)
}

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
Expand Down Expand Up @@ -608,9 +619,13 @@ func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName s
return statement
}

func generateInsert(t *testing.T, conn *mysql.Conn) error {
func generateInsert(t *testing.T, conn *mysql.Conn, autoIncInsert bool) error {
id := rand.Int31n(int32(maxTableRows))
query := fmt.Sprintf(insertRowStatement, id, -id, id, id, nextOpOrder())
if autoIncInsert {
id = rand.Int31()
query = fmt.Sprintf(insertRowAutoIncStatement, -id, id, id, nextOpOrder())
}
qr, err := conn.ExecuteFetch(query, 1000, true)
if err == nil && qr != nil {
assert.Less(t, qr.RowsAffected, uint64(2))
Expand Down Expand Up @@ -638,7 +653,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error {
return err
}

func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
func runSingleConnection(ctx context.Context, t *testing.T, autoIncInsert bool, done *int64) {
log.Infof("Running single connection")
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -656,7 +671,7 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
}
switch rand.Int31n(3) {
case 0:
err = generateInsert(t, conn)
err = generateInsert(t, conn, autoIncInsert)
case 1:
err = generateUpdate(t, conn)
case 2:
Expand All @@ -673,15 +688,15 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
}
}

func runMultipleConnections(ctx context.Context, t *testing.T) {
func runMultipleConnections(ctx context.Context, t *testing.T, autoIncInsert bool) {
log.Infof("Running multiple connections")
var done int64
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t, &done)
runSingleConnection(ctx, t, autoIncInsert, &done)
}()
}
<-ctx.Done()
Expand All @@ -706,7 +721,7 @@ func initTable(t *testing.T) {
require.Nil(t, err)

for i := 0; i < maxTableRows/2; i++ {
generateInsert(t, conn)
generateInsert(t, conn, false)
}
for i := 0; i < maxTableRows/4; i++ {
generateUpdate(t, conn)
Expand All @@ -730,7 +745,7 @@ func initTable(t *testing.T) {
}

// testCompareBeforeAfterTables validates that stress_test_before and stress_test_after contents are non empty and completely identical
func testCompareBeforeAfterTables(t *testing.T) {
func testCompareBeforeAfterTables(t *testing.T, autoIncInsert bool) {
var countBefore int64
{
// Validate after table is populated
Expand All @@ -740,8 +755,9 @@ func testCompareBeforeAfterTables(t *testing.T) {

countBefore = row.AsInt64("c", 0)
require.NotZero(t, countBefore)
require.Less(t, countBefore, int64(maxTableRows))

if !autoIncInsert {
require.Less(t, countBefore, int64(maxTableRows))
}
fmt.Printf("# count rows in table (before): %d\n", countBefore)
}
var countAfter int64
Expand All @@ -753,8 +769,9 @@ func testCompareBeforeAfterTables(t *testing.T) {

countAfter = row.AsInt64("c", 0)
require.NotZero(t, countAfter)
require.Less(t, countAfter, int64(maxTableRows))

if !autoIncInsert {
require.Less(t, countAfter, int64(maxTableRows))
}
fmt.Printf("# count rows in table (after): %d\n", countAfter)
}
{
Expand Down

0 comments on commit 1836b8d

Please sign in to comment.