Skip to content

Commit

Permalink
Flakes: Address TestMigrate Failures (#12866)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Sep 21, 2023
1 parent 66343b3 commit a5a0653
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 26 deletions.
18 changes: 17 additions & 1 deletion go/test/endtoend/cluster/mysqlctl_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ ssl_key={{.ServerKey}}
return tmpProcess, tmpProcess.Start()
}

// Stop executes mysqlctl command to stop mysql instance and kills the mysql instance if it doesn't shutdown in 30 seconds.
// Stop executes mysqlctl command to stop mysql instance and kills the mysql instance
// if it doesn't shutdown in 30 seconds.
func (mysqlctl *MysqlctlProcess) Stop() (err error) {
log.Infof("Shutting down MySQL: %d", mysqlctl.TabletUID)
defer log.Infof("MySQL shutdown complete: %d", mysqlctl.TabletUID)
Expand Down Expand Up @@ -203,6 +204,21 @@ func (mysqlctl *MysqlctlProcess) Stop() (err error) {
if err != nil {
return err
}
// We first need to try and kill any associated mysqld_safe process or
// else it will immediately restart the mysqld process when we kill it.
mspidb, err := exec.Command("sh", "-c",
fmt.Sprintf("ps auxww | grep mysqld_safe | grep vt_%010d | awk '{print $2}'", mysqlctl.TabletUID)).Output()
if err != nil {
return err
}
mysqldSafePID, err := strconv.Atoi(strings.TrimSpace(string(mspidb)))
// If we found a valid associated mysqld_safe process then let's kill
// it first.
if err == nil && mysqldSafePID > 0 {
if err = syscall.Kill(mysqldSafePID, syscall.SIGKILL); err != nil {
return err
}
}
return syscall.Kill(pid, syscall.SIGKILL)
}

Expand Down
8 changes: 8 additions & 0 deletions go/test/endtoend/cluster/mysqlctld_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MysqlctldProcess struct {
Name string
Binary string
LogDirectory string
ErrorLog string
Password string
TabletUID int
MySQLPort int
Expand Down Expand Up @@ -98,6 +99,7 @@ func (mysqlctld *MysqlctldProcess) Start() error {
tempProcess.Env = append(tempProcess.Env, DefaultVttestEnv)
tempProcess.Stdout = os.Stdout
tempProcess.Stderr = os.Stderr
mysqlctld.ErrorLog = errFile.Name()

log.Infof("%v", strings.Join(tempProcess.Args, " "))

Expand All @@ -112,6 +114,12 @@ func (mysqlctld *MysqlctldProcess) Start() error {
go func(mysqlctld *MysqlctldProcess) {
err := mysqlctld.process.Wait()
if !mysqlctld.exitSignalReceived {
errBytes, ferr := os.ReadFile(mysqlctld.ErrorLog)
if ferr == nil {
log.Errorf("mysqlctld error log contents:\n%s", string(errBytes))
} else {
log.Errorf("Failed to read the mysqlctld error log file %q: %v", mysqlctld.ErrorLog, ferr)
}
fmt.Printf("mysqlctld stopped unexpectedly, tabletUID %v, mysql port %v, PID %v\n", mysqlctld.TabletUID, mysqlctld.MySQLPort, mysqlctld.process.Process.Pid)
}
mysqlctld.process = nil
Expand Down
8 changes: 8 additions & 0 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type TopoProcess struct {
Binary string
DataDirectory string
LogDirectory string
ErrorLog string
ListenClientURL string
AdvertiseClientURL string
Port int
Expand Down Expand Up @@ -94,6 +95,7 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
}

topo.proc.Stderr = errFile
topo.ErrorLog = errFile.Name()

topo.proc.Env = append(topo.proc.Env, os.Environ()...)
topo.proc.Env = append(topo.proc.Env, DefaultVttestEnv)
Expand Down Expand Up @@ -126,6 +128,12 @@ func (topo *TopoProcess) SetupEtcd() (err error) {
}
select {
case err := <-topo.exit:
errBytes, ferr := os.ReadFile(topo.ErrorLog)
if ferr == nil {
log.Errorf("%s error log contents:\n%s", topo.Binary, string(errBytes))
} else {
log.Errorf("Failed to read the %s error log file %q: %v", topo.Binary, topo.ErrorLog, ferr)
}
return fmt.Errorf("process '%s' exited prematurely (err: %s)", topo.Binary, err)
default:
time.Sleep(300 * time.Millisecond)
Expand Down
8 changes: 8 additions & 0 deletions go/test/endtoend/cluster/vtctld_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type VtctldProcess struct {
BackupStorageImplementation string
FileBackupStorageRoot string
LogDir string
ErrorLog string
Port int
GrpcPort int
VerifyURL string
Expand Down Expand Up @@ -72,6 +73,7 @@ func (vtctld *VtctldProcess) Setup(cell string, extraArgs ...string) (err error)

errFile, _ := os.Create(path.Join(vtctld.LogDir, "vtctld-stderr.txt"))
vtctld.proc.Stderr = errFile
vtctld.ErrorLog = errFile.Name()

vtctld.proc.Env = append(vtctld.proc.Env, os.Environ()...)
vtctld.proc.Env = append(vtctld.proc.Env, DefaultVttestEnv)
Expand All @@ -96,6 +98,12 @@ func (vtctld *VtctldProcess) Setup(cell string, extraArgs ...string) (err error)
}
select {
case err := <-vtctld.exit:
errBytes, ferr := os.ReadFile(vtctld.ErrorLog)
if ferr == nil {
log.Errorf("vtctld error log contents:\n%s", string(errBytes))
} else {
log.Errorf("Failed to read the vtctld error log file %q: %v", vtctld.ErrorLog, ferr)
}
return fmt.Errorf("process '%s' exited prematurely (err: %s)", vtctld.Name, err)
default:
time.Sleep(300 * time.Millisecond)
Expand Down
7 changes: 7 additions & 0 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type VtgateProcess struct {
Binary string
CommonArg VtctlProcess
LogDir string
ErrorLog string
FileToLogQueries string
Port int
GrpcPort int
Expand Down Expand Up @@ -150,6 +151,12 @@ func (vtgate *VtgateProcess) Setup() (err error) {
}
select {
case err := <-vtgate.exit:
errBytes, ferr := os.ReadFile(vtgate.ErrorLog)
if ferr == nil {
log.Errorf("vtgate error log contents:\n%s", string(errBytes))
} else {
log.Errorf("Failed to read the vtgate error log file %q: %v", vtgate.ErrorLog, ferr)
}
return fmt.Errorf("process '%s' exited prematurely (err: %s)", vtgate.Name, err)
default:
time.Sleep(300 * time.Millisecond)
Expand Down
8 changes: 8 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type VttabletProcess struct {
Shard string
CommonArg VtctlProcess
LogDir string
ErrorLog string
TabletHostname string
Keyspace string
TabletType string
Expand Down Expand Up @@ -131,6 +132,7 @@ func (vttablet *VttabletProcess) Setup() (err error) {
fname := path.Join(vttablet.LogDir, vttablet.TabletPath+"-vttablet-stderr.txt")
errFile, _ := os.Create(fname)
vttablet.proc.Stderr = errFile
vttablet.ErrorLog = errFile.Name()

vttablet.proc.Env = append(vttablet.proc.Env, os.Environ()...)
vttablet.proc.Env = append(vttablet.proc.Env, DefaultVttestEnv)
Expand Down Expand Up @@ -307,6 +309,12 @@ func (vttablet *VttabletProcess) WaitForTabletStatusesForTimeout(expectedStatuse
}
select {
case err := <-vttablet.exit:
errBytes, ferr := os.ReadFile(vttablet.ErrorLog)
if ferr == nil {
log.Errorf("vttablet error log contents:\n%s", string(errBytes))
} else {
log.Errorf("Failed to read the vttablet error log file %q: %v", vttablet.ErrorLog, ferr)
}
return fmt.Errorf("process '%s' exited prematurely (err: %s)", vttablet.Name, err)
default:
time.Sleep(300 * time.Millisecond)
Expand Down
49 changes: 36 additions & 13 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,19 +248,32 @@ func downloadDBTypeVersion(dbType string, majorVersion string, path string) erro
if _, err := os.Stat(file); err == nil {
return nil
}
resp, err := client.Get(url)
if err != nil {
return fmt.Errorf("error downloading contents of %s to %s. Error: %v", url, file, err)
downloadFile := func() error {
resp, err := client.Get(url)
if err != nil {
return fmt.Errorf("error downloading contents of %s to %s. Error: %v", url, file, err)
}
defer resp.Body.Close()
out, err := os.Create(file)
if err != nil {
return fmt.Errorf("error creating file %s to save the contents of %s. Error: %v", file, url, err)
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
return fmt.Errorf("error saving contents of %s to %s. Error: %v", url, file, err)
}
return nil
}
defer resp.Body.Close()
out, err := os.Create(file)
if err != nil {
return fmt.Errorf("error creating file %s to save the contents of %s. Error: %v", file, url, err)
retries := 5
var dlerr error
for i := 0; i < retries; i++ {
if dlerr = downloadFile(); dlerr == nil {
break
}
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
return fmt.Errorf("error saving contents of %s to %s. Error: %v", url, file, err)
if dlerr != nil {
return dlerr
}

untarCmd := exec.Command("/bin/sh", "-c", fmt.Sprintf("tar xvf %s -C %s --strip-components=1", file, path))
Expand Down Expand Up @@ -559,7 +572,17 @@ func (vc *VitessCluster) AddShards(t *testing.T, cells []*Cell, keyspace *Keyspa
for ind, proc := range dbProcesses {
log.Infof("Waiting for mysql process for tablet %s", tablets[ind].Name)
if err := proc.Wait(); err != nil {
t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet)
// Retry starting the database process before giving up.
t.Logf("%v :: Unable to start mysql server for %v. Will retry...", err, tablets[ind].Vttablet)
tablets[ind].DbServer.CleanupFiles(tablets[ind].Vttablet.TabletUID)
time.Sleep(1 * time.Second)
dbcmd, err := tablets[ind].DbServer.StartProcess()
require.NoError(t, err)
if err = dbcmd.Wait(); err != nil {
output, _ := dbcmd.CombinedOutput()
t.Fatalf("%v :: Unable to start mysql server for %v; Output: %s", err,
tablets[ind].Vttablet, string(output))
}
}
}
for ind, tablet := range tablets {
Expand Down Expand Up @@ -666,7 +689,7 @@ func (vc *VitessCluster) teardown() {
go func(tablet2 *Tablet) {
defer wg.Done()
if tablet2.DbServer != nil && tablet2.DbServer.TabletUID > 0 {
if _, err := tablet2.DbServer.StopProcess(); err != nil {
if err := tablet2.DbServer.Stop(); err != nil {
log.Infof("Error stopping mysql process: %s", err.Error())
}
}
Expand Down
11 changes: 6 additions & 5 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ func TestMigrate(t *testing.T) {
allCellNames = "zone1"
vc = NewVitessCluster(t, "TestMigrate", cells, mainClusterConfig)

require.NotNil(t, vc)
require.NotNil(t, vc, "failed to create VitessCluster")
defaultReplicas = 0
defaultRdonly = 0
defer vc.TearDown(t)

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0")
require.NoError(t, err)
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
require.NoError(t, err, "failed to create product keyspace")
err = cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0")
require.NoError(t, err, "product shard did not become healthy")
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
require.NotNil(t, vtgate, "failed to get vtgate")

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,15 +1313,15 @@ func checkVtgateHealth(t *testing.T, cell *Cell) {
for _, vtgate := range cell.Vtgates {
vtgateHealthURL := strings.Replace(vtgate.VerifyURL, "vars", "health", -1)
if !checkHealth(t, vtgateHealthURL) {
assert.Failf(t, "Vtgate not healthy: ", vtgateHealthURL)
assert.Fail(t, "Vtgate not healthy: ", vtgateHealthURL)
}
}
}

func checkTabletHealth(t *testing.T, tablet *Tablet) {
vttabletHealthURL := strings.Replace(tablet.Vttablet.VerifyURL, "debug/vars", "healthz", -1)
if !checkHealth(t, vttabletHealthURL) {
assert.Failf(t, "Vttablet not healthy: ", vttabletHealthURL)
assert.Fail(t, "Vttablet not healthy: ", vttabletHealthURL)
}
}

Expand Down
10 changes: 5 additions & 5 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@
},
"vreplication_basic": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicVreplicationWorkflow"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicVreplicationWorkflow", "-timeout", "20m"],
"Command": [],
"Manual": false,
"Shard": "vreplication_basic",
Expand All @@ -1069,7 +1069,7 @@
},
"vreplication_copy_parallel": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVreplicationCopyParallel"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVreplicationCopyParallel", "-timeout", "20m"],
"Command": [],
"Manual": false,
"Shard": "vreplication_basic",
Expand Down Expand Up @@ -1204,7 +1204,7 @@
},
"vreplication_mariadb_to_mysql": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesMariaDBToMySQL", "-timeout", "10m"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesMariaDBToMySQL", "-timeout", "20m"],
"Command": [],
"Manual": false,
"Shard": "vreplication_across_db_versions",
Expand All @@ -1213,7 +1213,7 @@
},
"vreplication_migrate": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMigrate"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMigrate", "-timeout", "30m"],
"Command": [],
"Manual": false,
"Shard": "vreplication_migrate_vdiff2_convert_tz",
Expand All @@ -1222,7 +1222,7 @@
},
"vdiff2": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVDiff2"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVDiff2", "-timeout", "20m"],
"Command": [],
"Manual": false,
"Shard": "vreplication_migrate_vdiff2_convert_tz",
Expand Down

0 comments on commit a5a0653

Please sign in to comment.