Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle lock release with SIGHUP in VTGR #8472

Merged
merged 2 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions go/vt/vtgr/controller/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ type GRTmcClient interface {
Ping(ctx context.Context, tablet *topodatapb.Tablet) error
}

// HostnameGetter is used to get local hostname
type HostnameGetter func() (string, error)

// GRShard stores the information about a Vitess shard that's running MySQL GR
type GRShard struct {
KeyspaceShard *topo.KeyspaceShard
Expand All @@ -77,6 +74,12 @@ type GRShard struct {
tmc GRTmcClient
dbAgent db.Agent

// Every GRShard tracks a unlock function after it grab a topo lock for the shard
// VTGR needs to release the topo lock before gracefully shutdown
unlock func(*error)
// mutex to protect unlock function access
unlockMu sync.Mutex

// configuration
minNumReplicas int
localDbPort int
Expand Down Expand Up @@ -131,6 +134,7 @@ func NewGRShard(
tmc: tmc,
ts: ts,
dbAgent: dbAgent,
unlock: nil,
sqlGroup: NewSQLGroup(config.GroupSize, true, keyspace, shard),
minNumReplicas: config.MinNumReplica,
disableReadOnlyProtection: config.DisableReadOnlyProtection,
Expand Down Expand Up @@ -200,14 +204,37 @@ func parseTabletInfos(tablets map[string]*topo.TabletInfo) []*grInstance {
}

// LockShard locks the keyspace-shard on topo server to prevent others from executing conflicting actions.
func (shard *GRShard) LockShard(ctx context.Context, action string) (context.Context, func(*error), error) {
func (shard *GRShard) LockShard(ctx context.Context, action string) (context.Context, error) {
if shard.KeyspaceShard.Keyspace == "" || shard.KeyspaceShard.Shard == "" {
return nil, nil, fmt.Errorf("try to grab lock with incomplete information: %v/%v", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard)
return nil, fmt.Errorf("try to grab lock with incomplete information: %v/%v", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard)
}
shard.unlockMu.Lock()
defer shard.unlockMu.Unlock()
if shard.unlock != nil {
return nil, fmt.Errorf("try to grab lock for %s/%s while the shard holds an unlock function", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard)
}
start := time.Now()
ctx, unlock, err := shard.ts.LockShard(ctx, shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard, fmt.Sprintf("VTGR repairing %s", action))
lockShardTimingsMs.Record([]string{action, strconv.FormatBool(err == nil)}, start)
return ctx, unlock, err
if err != nil {
return nil, err
}
shard.unlock = unlock
return ctx, nil
}

// UnlockShard unlocks the keyspace-shard on topo server
// and set the unlock function to nil in the container
func (shard *GRShard) UnlockShard() {
shard.unlockMu.Lock()
defer shard.unlockMu.Unlock()
if shard.unlock == nil {
log.Warningf("Shard %s/%s does not hold a lock", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard)
return
}
var err error
shard.unlock(&err)
shard.unlock = nil
}

func (shard *GRShard) findTabletByHostAndPort(host string, port int) *grInstance {
Expand Down Expand Up @@ -245,6 +272,13 @@ func (shard *GRShard) GetCurrentShardStatuses() ShardStatus {
return status
}

// GetUnlock returns the unlock function for the shard for testing
func (shard *GRShard) GetUnlock() func(*error) {
shard.unlockMu.Lock()
defer shard.unlockMu.Unlock()
return shard.unlock
}

func (collector *shardStatusCollector) isUnreachable(instance *grInstance) bool {
if instance.instanceKey == nil || instance.instanceKey.Hostname == "" {
return true
Expand Down
20 changes: 20 additions & 0 deletions go/vt/vtgr/controller/refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ func TestRefreshWithEmptyCells(t *testing.T) {
assert.Equal(t, "cell3-0000000002", instances[2].alias)
}

func TestLockRelease(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("cell1", "cell2", "cell3")
defer ts.Close()
ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{})
ts.CreateShard(ctx, "ks", "0")
cfg := &config.VTGRConfig{GroupSize: 3, MinNumReplica: 0, BackoffErrorWaitTimeSeconds: 1, BootstrapWaitTimeSeconds: 1}
shard := NewGRShard("ks", "0", nil, nil, ts, nil, cfg, testPort0)
ctx, err := shard.LockShard(ctx, "")
assert.NoError(t, err)
// make sure we get the lock
err = shard.checkShardLocked(ctx)
assert.NoError(t, err)
assert.NotNil(t, shard.unlock)
shard.UnlockShard()
assert.Nil(t, shard.unlock)
err = shard.checkShardLocked(ctx)
assert.EqualError(t, err, "lost topology lock; aborting: shard ks/0 is not locked (no lockInfo in map)")
}

func buildTabletInfo(id uint32, host string, mysqlPort int, ttype topodatapb.TabletType, masterTermTime time.Time) *topo.TabletInfo {
return buildTabletInfoWithCell(id, host, "test_cell", mysqlPort, ttype, masterTermTime)
}
Expand Down
36 changes: 18 additions & 18 deletions go/vt/vtgr/controller/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (shard *GRShard) Repair(ctx context.Context, status DiagnoseType) (RepairRe
}

func (shard *GRShard) repairShardHasNoGroup(ctx context.Context) (RepairResultCode, error) {
ctx, unlock, err := shard.LockShard(ctx, "repairShardHasNoGroup")
ctx, err := shard.LockShard(ctx, "repairShardHasNoGroup")
if err != nil {
log.Warningf("repairShardHasNoPrimaryTablet fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err)
return Noop, err
}
defer unlock(&err)
defer shard.UnlockShard()
shard.refreshTabletsInShardLocked(ctx)
// Diagnose() will call shardAgreedGroup as the first thing
// which will update mysqlGroup stored in the shard
Expand Down Expand Up @@ -169,12 +169,12 @@ func (shard *GRShard) repairShardHasNoGroupAction(ctx context.Context) error {
}

func (shard *GRShard) repairShardHasInactiveGroup(ctx context.Context) (RepairResultCode, error) {
ctx, unlock, err := shard.LockShard(ctx, "repairShardHasInactiveGroup")
ctx, err := shard.LockShard(ctx, "repairShardHasInactiveGroup")
if err != nil {
log.Warningf("repairShardHasInactiveGroup fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err)
return Noop, err
}
defer unlock(&err)
defer shard.UnlockShard()
shard.refreshTabletsInShardLocked(ctx)
// Diagnose() will call shardAgreedGroup as the first thing
// which will update mysqlGroup stored in the shard
Expand All @@ -200,12 +200,12 @@ func (shard *GRShard) repairShardHasInactiveGroup(ctx context.Context) (RepairRe
}

func (shard *GRShard) repairBackoffError(ctx context.Context, diagnose DiagnoseType) (RepairResultCode, error) {
ctx, unlock, err := shard.LockShard(ctx, "repairBackoffError")
ctx, err := shard.LockShard(ctx, "repairBackoffError")
if err != nil {
log.Warningf("repairBackoffError fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err)
return Noop, err
}
defer unlock(&err)
defer shard.UnlockShard()
shard.refreshTabletsInShardLocked(ctx)
status, err := shard.diagnoseLocked(ctx)
if err != nil {
Expand Down Expand Up @@ -416,12 +416,12 @@ func (shard *GRShard) findFailoverCandidate(ctx context.Context) (*grInstance, e
}

func (shard *GRShard) repairWrongPrimaryTablet(ctx context.Context) (RepairResultCode, error) {
ctx, unlock, err := shard.LockShard(ctx, "repairWrongPrimaryTablet")
ctx, err := shard.LockShard(ctx, "repairWrongPrimaryTablet")
if err != nil {
log.Warningf("repairWrongPrimaryTablet fails to grab lock for the shard %v: %v", shard.KeyspaceShard, err)
return Noop, err
}
defer unlock(&err)
defer shard.UnlockShard()
// We grab shard level lock and check again if there is no primary
// to avoid race conditions
shard.refreshTabletsInShardLocked(ctx)
Expand Down Expand Up @@ -469,12 +469,12 @@ func (shard *GRShard) fixPrimaryTabletLocked(ctx context.Context) error {
// repairUnconnectedReplica usually handle the case when there is a DiagnoseTypeHealthy tablet and
// it is not connected to mysql primary node
func (shard *GRShard) repairUnconnectedReplica(ctx context.Context) (RepairResultCode, error) {
ctx, unlock, err := shard.LockShard(ctx, "repairUnconnectedReplica")
ctx, err := shard.LockShard(ctx, "repairUnconnectedReplica")
if err != nil {
log.Warningf("repairUnconnectedReplica fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err)
return Noop, err
}
defer unlock(&err)
defer shard.UnlockShard()
shard.refreshTabletsInShardLocked(ctx)
status, err := shard.diagnoseLocked(ctx)
if err != nil {
Expand Down Expand Up @@ -526,12 +526,12 @@ func (shard *GRShard) repairUnconnectedReplicaAction(ctx context.Context) error
}

func (shard *GRShard) repairUnreachablePrimary(ctx context.Context) (RepairResultCode, error) {
ctx, unlock, err := shard.LockShard(ctx, "repairUnreachablePrimary")
ctx, err := shard.LockShard(ctx, "repairUnreachablePrimary")
if err != nil {
log.Warningf("repairUnreachablePrimary fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err)
return Noop, err
}
defer unlock(&err)
defer shard.UnlockShard()
shard.refreshTabletsInShardLocked(ctx)
status, err := shard.diagnoseLocked(ctx)
if err != nil {
Expand Down Expand Up @@ -559,12 +559,12 @@ func (shard *GRShard) repairUnreachablePrimary(ctx context.Context) (RepairResul
}

func (shard *GRShard) repairInsufficientGroupSize(ctx context.Context) (RepairResultCode, error) {
ctx, unlock, err := shard.LockShard(ctx, "repairInsufficientGroupSize")
ctx, err := shard.LockShard(ctx, "repairInsufficientGroupSize")
if err != nil {
log.Warningf("repairInsufficientGroupSize fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err)
return Noop, err
}
defer unlock(&err)
defer shard.UnlockShard()
shard.refreshTabletsInShardLocked(ctx)
status, err := shard.diagnoseLocked(ctx)
if err != nil {
Expand Down Expand Up @@ -594,12 +594,12 @@ func (shard *GRShard) repairInsufficientGroupSize(ctx context.Context) (RepairRe
}

func (shard *GRShard) repairReadOnlyShard(ctx context.Context) (RepairResultCode, error) {
ctx, unlock, err := shard.LockShard(ctx, "repairReadOnlyShard")
ctx, err := shard.LockShard(ctx, "repairReadOnlyShard")
if err != nil {
log.Warningf("repairReadOnlyShard fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err)
return Noop, err
}
defer unlock(&err)
defer shard.UnlockShard()
shard.refreshTabletsInShardLocked(ctx)
status, err := shard.diagnoseLocked(ctx)
if err != nil {
Expand All @@ -625,12 +625,12 @@ func (shard *GRShard) repairReadOnlyShard(ctx context.Context) (RepairResultCode

// Failover takes a shard and find an node with largest GTID as the mysql primary of the group
func (shard *GRShard) Failover(ctx context.Context) error {
ctx, unlock, err := shard.LockShard(ctx, "Failover")
ctx, err := shard.LockShard(ctx, "Failover")
if err != nil {
log.Warningf("Failover fails to grab lock for the shard %v: %v", formatKeyspaceShard(shard.KeyspaceShard), err)
return err
}
defer unlock(&err)
defer shard.UnlockShard()
shard.refreshTabletsInShardLocked(ctx)
return shard.failoverLocked(ctx)
}
Expand Down
42 changes: 35 additions & 7 deletions go/vt/vtgr/vtgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@ limitations under the License.
package vtgr

import (
"errors"
"flag"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"vitess.io/vitess/go/vt/vtgr/config"

"vitess.io/vitess/go/vt/vtgr/db"

"golang.org/x/net/context"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgr/config"
"vitess.io/vitess/go/vt/vtgr/controller"
"vitess.io/vitess/go/vt/vtgr/db"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

Expand All @@ -54,6 +57,8 @@ type VTGR struct {
topo controller.GRTopo
tmc tmclient.TabletManagerClient
ctx context.Context

stopped sync2.AtomicBool
}

func newVTGR(ctx context.Context, ts controller.GRTopo, tmc tmclient.TabletManagerClient) *VTGR {
Expand Down Expand Up @@ -104,6 +109,7 @@ func OpenTabletDiscovery(ctx context.Context, cellsToWatch, clustersToWatch []st
}
}
}
vtgr.handleSignal(os.Exit)
vtgr.Shards = shards
log.Infof("Monitoring shards size %v", len(vtgr.Shards))
// Force refresh all tablet here to populate data for vtgr
Expand Down Expand Up @@ -143,9 +149,11 @@ func (vtgr *VTGR) ScanAndRepair() {
func() {
ctx, cancel := context.WithTimeout(vtgr.ctx, *scanAndRepairTimeout)
defer cancel()
log.Infof("Start scan and repair %v/%v", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard)
shard.ScanAndRepairShard(ctx)
log.Infof("Finished scan and repair %v/%v", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard)
if !vtgr.stopped.Get() {
log.Infof("Start scan and repair %v/%v", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard)
shard.ScanAndRepairShard(ctx)
log.Infof("Finished scan and repair %v/%v", shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard)
}
}()
}
}(shard)
Expand All @@ -159,6 +167,9 @@ func (vtgr *VTGR) Diagnose(ctx context.Context, shard *controller.GRShard) (cont

// Repair exposes the endpoint to repair a particular shard
func (vtgr *VTGR) Repair(ctx context.Context, shard *controller.GRShard, diagnose controller.DiagnoseType) (controller.RepairResultCode, error) {
if vtgr.stopped.Get() {
return controller.Fail, errors.New("VTGR is stopped")
}
return shard.Repair(ctx, diagnose)
}

Expand All @@ -172,3 +183,20 @@ func (vtgr *VTGR) GetCurrentShardStatuses() []controller.ShardStatus {
}
return result
}

func (vtgr *VTGR) handleSignal(action func(int)) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
go func() {
// block until the signal is received
<-sigChan
log.Infof("Handling SIGHUP")
// Set stopped to true so that following repair call won't do anything
// For the ongoing repairs, checkShardLocked will abort if needed
vtgr.stopped.Set(true)
for _, shard := range vtgr.Shards {
shard.UnlockShard()
}
action(1)
}()
}
55 changes: 55 additions & 0 deletions go/vt/vtgr/vtgr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package vtgr

import (
"context"
"syscall"
"testing"
"time"

"vitess.io/vitess/go/sync2"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtgr/config"
"vitess.io/vitess/go/vt/vtgr/controller"
"vitess.io/vitess/go/vt/vtgr/db"
"vitess.io/vitess/go/vt/vttablet/tmclient"

"github.com/stretchr/testify/assert"
)

func TestSighupHandle(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("cell1")
defer ts.Close()
ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{})
ts.CreateShard(ctx, "ks", "0")
vtgr := newVTGR(
ctx,
ts,
tmclient.NewTabletManagerClient(),
)
var shards []*controller.GRShard
config := &config.VTGRConfig{
DisableReadOnlyProtection: false,
GroupSize: 5,
MinNumReplica: 3,
BackoffErrorWaitTimeSeconds: 10,
BootstrapWaitTimeSeconds: 10 * 60,
}
shards = append(shards, controller.NewGRShard("ks", "0", nil, vtgr.tmc, vtgr.topo, db.NewVTGRSqlAgent(), config, *localDbPort))
vtgr.Shards = shards
shard := vtgr.Shards[0]
shard.LockShard(ctx, "test")
res := sync2.NewAtomicInt32(0)
vtgr.handleSignal(func(i int) {
res.Set(1)
})
assert.NotNil(t, shard.GetUnlock())
assert.False(t, vtgr.stopped.Get())
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int32(1), res.Get())
assert.Nil(t, shard.GetUnlock())
assert.True(t, vtgr.stopped.Get())
}