Skip to content

Commit

Permalink
graceful shutdown using ctr+c (#5855)
Browse files Browse the repository at this point in the history
* created teardown handler on interupt signal. also added panic handler
in each testcase to handle the unwanted panics.

Signed-off-by: pradip parmar <[email protected]>

* handled nil cancelFun issue.

Signed-off-by: pradip parmar <[email protected]>

* panic handler to each testcase.

Signed-off-by: pradip parmar <[email protected]>

* panic handler at starting of each main test.

Signed-off-by: pradip parmar <[email protected]>

* teardown fixes when process exits because of panic.

Signed-off-by: pradip parmar <[email protected]>

* comment for mutex in cluster.

Signed-off-by: pradip parmar <[email protected]>
  • Loading branch information
princeparmar authored Mar 2, 2020
1 parent 1eb4277 commit 16bf503
Show file tree
Hide file tree
Showing 60 changed files with 200 additions and 8 deletions.
3 changes: 3 additions & 0 deletions go/test/endtoend/backup/transform/backup_transform_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
)

func TestMainSetup(m *testing.M, useMysqlctld bool) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode, err := func() (int, error) {
Expand Down Expand Up @@ -183,6 +184,7 @@ var vtInsertTest = `create table vt_insert_test (

func TestBackupTransformImpl(t *testing.T) {
// insert data in master, validate same in slave
defer cluster.PanicHandler(t)
verifyInitialReplication(t)

// restart the replica with transform hook parameter
Expand Down Expand Up @@ -268,6 +270,7 @@ func TestBackupTransformImpl(t *testing.T) {
// backup_storage_hook, which should fail.
func TestBackupTransformErrorImpl(t *testing.T) {
// restart the replica with transform hook parameter
defer cluster.PanicHandler(t)
err := replica1.VttabletProcess.TearDown()
require.Nil(t, err)

Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestTabletInitialBackup(t *testing.T) {
// - Take a Second Backup
// - Bring up a second replica, and restore from the second backup
// - list the backups, remove them
defer cluster.PanicHandler(t)

vtBackup(t, true)
verifyBackupCount(t, shardKsName, 1)
Expand Down Expand Up @@ -82,6 +83,7 @@ func TestTabletBackupOnly(t *testing.T) {
// - Take a Second Backup
// - Bring up a second replica, and restore from the second backup
// - list the backups, remove them
defer cluster.PanicHandler(t)

// Reset the tablet object values in order on init tablet in the next step.
master.VttabletProcess.ServingStatus = "NOT_SERVING"
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/backup/vtbackup/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode, err := func() (int, error) {
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) {
}, //
}

defer cluster.PanicHandler(t)

// setup cluster for the testing
code, err := LaunchCluster(setupType, streamMode, stripes)
require.Nilf(t, err, "setup failed with status code %d", code)
Expand Down
4 changes: 4 additions & 0 deletions go/test/endtoend/binlog/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitcode, err := func() (int, error) {
Expand Down Expand Up @@ -259,6 +260,7 @@ func TestMain(m *testing.M) {
// pretend it's latin1. If the binlog player doesn't also pretend it's
// latin1, it will be inserted as utf8, which will change its value.
func TestCharset(t *testing.T) {
defer cluster.PanicHandler(t)
position, _ := cluster.GetMasterPosition(t, *destReplica, hostname)

_, err := queryTablet(t, *srcMaster, fmt.Sprintf(insertSql, tableName, 1, "Šṛ́rỏé"), "latin1")
Expand All @@ -272,6 +274,7 @@ func TestCharset(t *testing.T) {
// Enable binlog_checksum, which will also force a log rotation that should
// cause binlog streamer to notice the new checksum setting.
func TestChecksumEnabled(t *testing.T) {
defer cluster.PanicHandler(t)
position, _ := cluster.GetMasterPosition(t, *destReplica, hostname)
_, err := queryTablet(t, *destReplica, "SET @@global.binlog_checksum=1", "")
require.Nil(t, err)
Expand All @@ -290,6 +293,7 @@ func TestChecksumEnabled(t *testing.T) {
// Disable binlog_checksum to make sure we can also talk to a server without
// checksums enabled, in case they are enabled by default
func TestChecksumDisabled(t *testing.T) {
defer cluster.PanicHandler(t)
position, _ := cluster.GetMasterPosition(t, *destReplica, hostname)

_, err := queryTablet(t, *destReplica, "SET @@global.binlog_checksum=0", "")
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/cellalias/cell_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ var (
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitcode, err := func() (int, error) {
Expand Down Expand Up @@ -236,6 +237,7 @@ func TestMain(m *testing.M) {
}

func TestAlias(t *testing.T) {
defer cluster.PanicHandler(t)
insertInitialValues(t)
err := localCluster.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", keyspaceName)
require.Nil(t, err)
Expand Down
38 changes: 37 additions & 1 deletion go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ limitations under the License.
package cluster

import (
"context"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"os/signal"
"path"
"strconv"
"sync"
"syscall"
"time"

"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -79,6 +83,13 @@ type LocalProcessCluster struct {
VtctldExtraArgs []string

EnableSemiSync bool

// mutex added to handle the parallel teardowns
mx *sync.Mutex
teardownCompleted bool

context.Context
context.CancelFunc
}

// Vttablet stores the properties needed to start a vttablet process
Expand Down Expand Up @@ -137,6 +148,20 @@ func (shard *Shard) Replica() *Vttablet {
return nil
}

// CtrlCHandler handles the teardown for the ctrl-c.
func (cluster *LocalProcessCluster) CtrlCHandler() {
cluster.Context, cluster.CancelFunc = context.WithCancel(context.Background())

c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
cluster.Teardown()
os.Exit(0)
case <-cluster.Done():
}
}

// StartTopo starts topology server
func (cluster *LocalProcessCluster) StartTopo() (err error) {
if cluster.Cell == "" {
Expand Down Expand Up @@ -401,7 +426,8 @@ func (cluster *LocalProcessCluster) GetVtgateInstance() *VtgateProcess {

// NewCluster instantiates a new cluster
func NewCluster(cell string, hostname string) *LocalProcessCluster {
cluster := &LocalProcessCluster{Cell: cell, Hostname: hostname}
cluster := &LocalProcessCluster{Cell: cell, Hostname: hostname, mx: new(sync.Mutex)}
go cluster.CtrlCHandler()
cluster.OriginalVTDATAROOT = os.Getenv("VTDATAROOT")
cluster.CurrentVTDATAROOT = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("vtroot_%d", cluster.GetAndReservePort()))
_ = createDirectory(cluster.CurrentVTDATAROOT, 0700)
Expand Down Expand Up @@ -455,6 +481,15 @@ func (cluster *LocalProcessCluster) WaitForTabletsToHealthyInVtgate() (err error

// Teardown brings down the cluster by invoking teardown for individual processes
func (cluster *LocalProcessCluster) Teardown() {
PanicHandler(nil)
cluster.mx.Lock()
defer cluster.mx.Unlock()
if cluster.teardownCompleted {
return
}
if cluster.CancelFunc != nil {
cluster.CancelFunc()
}
if err := cluster.VtgateProcess.TearDown(); err != nil {
log.Errorf("Error in vtgate teardown - %s", err.Error())
}
Expand Down Expand Up @@ -497,6 +532,7 @@ func (cluster *LocalProcessCluster) Teardown() {
log.Errorf("Error in topo server teardown - %s", err.Error())
}

cluster.teardownCompleted = true
}

// StartVtworker starts a vtworker
Expand Down
9 changes: 9 additions & 0 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ func VerifyRowsInTablet(t *testing.T, vttablet *Vttablet, ksName string, expecte
assert.Fail(t, "expected rows not found.")
}

// PanicHandler handles the panic in the testcase.
func PanicHandler(t *testing.T) {
err := recover()
if t == nil {
return
}
require.Nilf(t, err, "panic occured in testcase %v", t.Name())
}

// VerifyLocalMetadata Verify Local Metadata of a tablet
func VerifyLocalMetadata(t *testing.T, tablet *Vttablet, ksName string, shardName string, cell string) {
qr, err := tablet.VttabletProcess.QueryTablet("select * from _vt.local_metadata", ksName, false)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/clustertest/add_keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ primary key (id)
)

func TestAddKeyspace(t *testing.T) {
defer cluster.PanicHandler(t)
if err := clusterInstance.StartKeyspace(*testKeyspace, []string{"-80", "80-"}, 1, true); err != nil {
println(err.Error())
t.Fatal(err)
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/clustertest/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package clustertest
import (
"fmt"
"testing"

"vitess.io/vitess/go/test/endtoend/cluster"
)

func TestEtcdServer(t *testing.T) {
defer cluster.PanicHandler(t)
etcdURL := fmt.Sprintf("http://%s:%d/v2/keys", clusterInstance.Hostname, clusterInstance.TopoPort)
testURL(t, etcdURL, "generic etcd url")
testURL(t, etcdURL+"/vitess/global", "vitess global key")
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/clustertest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/clustertest/vtcltd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/test/endtoend/cluster"
)

var (
Expand All @@ -41,6 +42,7 @@ var (
)

func TestVtctldProcess(t *testing.T) {
defer cluster.PanicHandler(t)
url := fmt.Sprintf("http://%s:%d/api/keyspaces/", clusterInstance.Hostname, clusterInstance.VtctldHTTPPort)
testURL(t, url, "keyspace url")

Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/clustertest/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
)

func TestVtgateProcess(t *testing.T) {
defer cluster.PanicHandler(t)
verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/clustertest/vttablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import (
"io/ioutil"
"net/http"
"testing"

"vitess.io/vitess/go/test/endtoend/cluster"
)

func TestVttabletProcess(t *testing.T) {
defer cluster.PanicHandler(t)
firstTabletPort := clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].HTTPPort
testURL(t, fmt.Sprintf("http://localhost:%d/debug/vars/", firstTabletPort), "tablet debug var url")
resp, _ := http.Get(fmt.Sprintf("http://localhost:%d/debug/vars", firstTabletPort))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (

// This test makes sure that we can use SSL replication with Vitess
func TestSecure(t *testing.T) {
defer cluster.PanicHandler(t)
testReplicationBase(t, true)
testReplicationBase(t, false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var (
)

func TestSecureTransport(t *testing.T) {
defer cluster.PanicHandler(t)
flag.Parse()

// initialize cluster
Expand Down
11 changes: 10 additions & 1 deletion go/test/endtoend/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ var (
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterForKSTest = &cluster.LocalProcessCluster{Cell: cell, Hostname: hostname}
clusterForKSTest = cluster.NewCluster(cell, hostname)
defer clusterForKSTest.Teardown()

// Start topo server
Expand Down Expand Up @@ -137,13 +138,15 @@ func TestMain(m *testing.M) {
}

func TestGetSrvKeyspaceNames(t *testing.T) {
defer cluster.PanicHandler(t)
output, err := clusterForKSTest.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspaceNames", cell)
require.Nil(t, err)
assert.Contains(t, strings.Split(output, "\n"), keyspaceUnshardedName)
assert.Contains(t, strings.Split(output, "\n"), keyspaceShardedName)
}

func TestGetSrvKeyspacePartitions(t *testing.T) {
defer cluster.PanicHandler(t)
shardedSrvKeyspace := getSrvKeyspace(t, cell, keyspaceShardedName)
otherShardRefFound := false
for _, partition := range shardedSrvKeyspace.Partitions {
Expand Down Expand Up @@ -172,6 +175,7 @@ func TestGetSrvKeyspacePartitions(t *testing.T) {
}

func TestShardNames(t *testing.T) {
defer cluster.PanicHandler(t)
output, err := clusterForKSTest.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, keyspaceShardedName)
require.Nil(t, err)
var srvKeyspace topodata.SrvKeyspace
Expand All @@ -181,6 +185,7 @@ func TestShardNames(t *testing.T) {
}

func TestGetKeyspace(t *testing.T) {
defer cluster.PanicHandler(t)
output, err := clusterForKSTest.VtctlclientProcess.ExecuteCommandWithOutput("GetKeyspace", keyspaceUnshardedName)
require.Nil(t, err)

Expand All @@ -194,6 +199,7 @@ func TestGetKeyspace(t *testing.T) {
}

func TestDeleteKeyspace(t *testing.T) {
defer cluster.PanicHandler(t)
_ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("CreateKeyspace", "test_delete_keyspace")
_ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace/0")
_ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "-keyspace=test_delete_keyspace", "-shard=0", "zone1-0000000100", "master")
Expand Down Expand Up @@ -311,6 +317,7 @@ func RemoveKeyspaceCell(t *testing.T) {
}

func TestShardCountForAllKeyspaces(t *testing.T) {
defer cluster.PanicHandler(t)
testShardCountForKeyspace(t, keyspaceUnshardedName, 1)
testShardCountForKeyspace(t, keyspaceShardedName, 2)
}
Expand All @@ -327,6 +334,7 @@ func testShardCountForKeyspace(t *testing.T, keyspace string, count int) {
}

func TestShardNameForAllKeyspaces(t *testing.T) {
defer cluster.PanicHandler(t)
testShardNameForKeyspace(t, keyspaceUnshardedName, []string{"test_ks_unsharded"})
testShardNameForKeyspace(t, keyspaceShardedName, []string{"-80", "80-"})
}
Expand All @@ -345,6 +353,7 @@ func testShardNameForKeyspace(t *testing.T, keyspace string, shardNames []string
}

func TestKeyspaceToShardName(t *testing.T) {
defer cluster.PanicHandler(t)
var id []byte
srvKeyspace := getSrvKeyspace(t, cell, keyspaceShardedName)

Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/messaging/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitcode, err := func() (int, error) {
Expand Down
Loading

0 comments on commit 16bf503

Please sign in to comment.