Skip to content

Commit

Permalink
cli: add enablechecksemisync and disablechecksemisync
Browse files Browse the repository at this point in the history
  • Loading branch information
dbkernel authored and BohuTANG committed Mar 13, 2020
1 parent ad73417 commit 15f172e
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 24 deletions.
30 changes: 30 additions & 0 deletions src/cli/callx/callx.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,36 @@ func RaftDisablePurgeBinlogRPC(node string) error {
return err
}

func RaftEnableCheckSemiSyncRPC(node string) error {
cli, cleanup, err := GetClient(node)
if err != nil {
return err
}
defer cleanup()

method := model.RPCRaftEnableCheckSemiSync
req := model.NewRaftStatusRPCRequest()
rsp := model.NewRaftStatusRPCResponse(model.OK)
err = cli.Call(method, req, rsp)

return err
}

func RaftDisableCheckSemiSyncRPC(node string) error {
cli, cleanup, err := GetClient(node)
if err != nil {
return err
}
defer cleanup()

method := model.RPCRaftDisableCheckSemiSync
req := model.NewRaftStatusRPCRequest()
rsp := model.NewRaftStatusRPCResponse(model.OK)
err = cli.Call(method, req, rsp)

return err
}

// mysql
func WaitMysqlWorkingRPC(node string) error {
cli, cleanup, err := GetClient(node)
Expand Down
56 changes: 56 additions & 0 deletions src/cli/cmd/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func NewRaftCommand() *cobra.Command {
cmd.AddCommand(NewRaftStatusCommand())
cmd.AddCommand(NewRaftEnablePurgeBinlogCommand())
cmd.AddCommand(NewRaftDisablePurgeBinlogCommand())
cmd.AddCommand(NewRaftEnableCheckSemiSyncCommand())
cmd.AddCommand(NewRaftDisableCheckSemiSyncCommand())

return cmd
}
Expand Down Expand Up @@ -300,3 +302,57 @@ func raftDisablePurgeBinlogCommandFn(cmd *cobra.Command, args []string) {
log.Warning("[%v].disable.purge.binlog.done", self)
}
}

func NewRaftEnableCheckSemiSyncCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "enablechecksemisync",
Short: "enable leader to check semi-sync(default)",
Run: raftEnableCheckSemiSyncCommandFn,
}

return cmd
}

func raftEnableCheckSemiSyncCommandFn(cmd *cobra.Command, args []string) {
if len(args) > 0 {
ErrorOK(fmt.Errorf("too.many.args"))
}

// send enable
{
conf, err := GetConfig()
ErrorOK(err)
self := conf.Server.Endpoint
log.Warning("[%v].prepare.to.enable.check.semi-sync", self)
err = callx.RaftEnableCheckSemiSyncRPC(self)
ErrorOK(err)
log.Warning("[%v].enable.check.semi-sync.done", self)
}
}

func NewRaftDisableCheckSemiSyncCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "disablechecksemisync",
Short: "disable leader to check semi-sync",
Run: raftDisableCheckSemiSyncCommandFn,
}

return cmd
}

func raftDisableCheckSemiSyncCommandFn(cmd *cobra.Command, args []string) {
if len(args) > 0 {
ErrorOK(fmt.Errorf("too.many.args"))
}

// send enable
{
conf, err := GetConfig()
ErrorOK(err)
self := conf.Server.Endpoint
log.Warning("[%v].prepare.to.disable.check.semi-sync", self)
err = callx.RaftDisableCheckSemiSyncRPC(self)
ErrorOK(err)
log.Warning("[%v].disable.check.semi-sync.done", self)
}
}
14 changes: 14 additions & 0 deletions src/cli/cmd/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,19 @@ func TestCLIRaftCommand(t *testing.T) {
_, err := executeCommand(cmd, "enablepurgebinlog")
assert.Nil(t, err)
}

// disable check semi-sync
{
cmd := NewRaftCommand()
_, err := executeCommand(cmd, "disablechecksemisync")
assert.Nil(t, err)
}

// enable check semi-sync
{
cmd := NewRaftCommand()
_, err := executeCommand(cmd, "enablechecksemisync")
assert.Nil(t, err)
}
}
}
14 changes: 8 additions & 6 deletions src/model/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ const (
RAFTMYSQL_WAITUNTILAFTERGTID RAFTMYSQL_STATUS = "WaitUntilAfterGTID"
)
const (
RPCRaftPing = "RaftRPC.Ping"
RPCRaftHeartbeat = "RaftRPC.Heartbeat"
RPCRaftRequestVote = "RaftRPC.RequestVote"
RPCRaftStatus = "RaftRPC.Status"
RPCRaftEnablePurgeBinlog = "RaftRPC.EnablePurgeBinlog"
RPCRaftDisablePurgeBinlog = "RaftRPC.DisablePurgeBinlog"
RPCRaftPing = "RaftRPC.Ping"
RPCRaftHeartbeat = "RaftRPC.Heartbeat"
RPCRaftRequestVote = "RaftRPC.RequestVote"
RPCRaftStatus = "RaftRPC.Status"
RPCRaftEnablePurgeBinlog = "RaftRPC.EnablePurgeBinlog"
RPCRaftDisablePurgeBinlog = "RaftRPC.DisablePurgeBinlog"
RPCRaftEnableCheckSemiSync = "RaftRPC.EnableCheckSemiSync"
RPCRaftDisableCheckSemiSync = "RaftRPC.DisableCheckSemiSync"
)

// raft
Expand Down
2 changes: 1 addition & 1 deletion src/mysql/mysqlbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (my *MysqlBase) DisableSemiSyncMaster(db *sql.DB) error {
return ExecuteWithTimeout(db, reqTimeout, cmds)
}

// SetSemiSyncMasterTimeout useed to set semi-sync master timeout
// SetSemiSyncMasterTimeout used to set semi-sync master timeout
func (my *MysqlBase) SetSemiSyncMasterTimeout(db *sql.DB, timeout uint64) error {
cmds := fmt.Sprintf("SET GLOBAL rpl_semi_sync_master_timeout=%d", timeout)
return ExecuteWithTimeout(db, reqTimeout, cmds)
Expand Down
5 changes: 5 additions & 0 deletions src/raft/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,11 @@ func (r *Leader) checkSemiSyncStop() {

// Disable the semi-sync if the nodes number less than 3.
func (r *Leader) checkSemiSync() {
if r.skipCheckSemiSync {
r.WARNING("check.semi-sync.skipped[skipCheckSemiSync is true]")
return
}

min := 3
cur := r.getMembers()
if cur < min {
Expand Down
27 changes: 17 additions & 10 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,24 @@ type Raft struct {
idlePeers map[string]*Peer // all SuperIDLE peers
stats model.RaftStats
skipPurgeBinlog bool // if true, purge binlog will skipped
skipCheckSemiSync bool // if true, check semi-sync will skipped
isBrainSplit bool // if true, follower can upgrade to candidate
}

// NewRaft creates the new raft.
func NewRaft(id string, conf *config.RaftConfig, log *xlog.Log, mysql *mysql.Mysql) *Raft {
r := &Raft{
id: id,
conf: conf,
log: log,
cmd: common.NewLinuxCommand(log),
mysql: mysql,
leader: noLeader,
state: FOLLOWER,
meta: &RaftMeta{},
peers: make(map[string]*Peer),
idlePeers: make(map[string]*Peer),
id: id,
conf: conf,
log: log,
cmd: common.NewLinuxCommand(log),
mysql: mysql,
leader: noLeader,
state: FOLLOWER,
meta: &RaftMeta{},
peers: make(map[string]*Peer),
idlePeers: make(map[string]*Peer),
skipCheckSemiSync: false,
}

// state handler
Expand Down Expand Up @@ -370,3 +372,8 @@ func (r *Raft) resetCheckVotesTimeout() {
func (r *Raft) SetSkipPurgeBinlog(v bool) {
r.skipPurgeBinlog = v
}

// SetSkipCheckSemiSync used to set check semi-sync or not.
func (r *Raft) SetSkipCheckSemiSync(v bool) {
r.skipCheckSemiSync = v
}
32 changes: 25 additions & 7 deletions src/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,8 +1407,26 @@ func TestRaftLeaderCheckSemiSync(t *testing.T) {
assert.Equal(t, want, got)
assert.Equal(t, 2, whoisleader)

// wait for check semi-sync to be invoked
// wait for check semi-sync to be invoked and skipCheckSemiSync changed
time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*16))
assert.Equal(t, false, rafts[2].skipCheckSemiSync)
}

// disable check semi-sync
{
rafts[2].SetSkipCheckSemiSync(true)
time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*16))
assert.Equal(t, true, rafts[2].skipCheckSemiSync)
}

// enable check semi-sync
{
rafts[2].SetSkipCheckSemiSync(false)

MockWaitLeaderEggs(rafts, 0)
MockWaitLeaderEggs(rafts, 0)

assert.Equal(t, false, rafts[2].skipCheckSemiSync)
}
}

Expand Down Expand Up @@ -1918,14 +1936,14 @@ func TestRaftElectionUnderLearnerInMinority(t *testing.T) {
// 1.1 rafts[1] with MockGTID_X3{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123}
// 1.2 rafts[2] with MockGTID_X3{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123}
// 2. start rafts[0] state as CANDIDATE
// 3. wait 20 times the election timeout
// 3. wait 30 times the election timeout
// 4. start rafts[1] as FOLLOWER and rafts[2] as IDLE
// InvalidGITD
// rafts[0]: C --------------> F --------------> C -> ... -> F
// InvalidViewID
// rafts[1]: F --------------> C --------------> F -> ... -> C

// 5. wait 4 times the election timeout
// 5. wait 8 times the election timeout
// 6. check if rafts[1] is the leader
func TestRaftElectionUnderFollowerAndCandidateAlternate(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
Expand All @@ -1946,8 +1964,8 @@ func TestRaftElectionUnderFollowerAndCandidateAlternate(t *testing.T) {
MockStateTransition(rafts[0], CANDIDATE)
}

// 3. wait 20 times the election timeout
time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*20))
// 3. wait 30 times the election timeout
time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*30))

// 4. start rafts[1] as FOLLOWER and rafts[2] as IDLE
{
Expand All @@ -1958,8 +1976,8 @@ func TestRaftElectionUnderFollowerAndCandidateAlternate(t *testing.T) {
MockStateTransition(rafts[2], IDLE)
}

// 5. wait 4 times the election timeout
time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*4))
// 5. wait 8 times the election timeout
time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*8))

//6. check if rafts[1] is the leader
{
Expand Down
12 changes: 12 additions & 0 deletions src/raft/rpc_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,15 @@ func (r *RaftRPC) DisablePurgeBinlog(req *model.RaftStatusRPCRequest, rsp *model
r.raft.SetSkipPurgeBinlog(true)
return nil
}

// EnableCheckSemiSync rpc.
func (r *RaftRPC) EnableCheckSemiSync(req *model.RaftStatusRPCRequest, rsp *model.RaftStatusRPCResponse) error {
r.raft.SetSkipCheckSemiSync(false)
return nil
}

// DisableCheckSemiSync rpc.
func (r *RaftRPC) DisableCheckSemiSync(req *model.RaftStatusRPCRequest, rsp *model.RaftStatusRPCResponse) error {
r.raft.SetSkipCheckSemiSync(true)
return nil
}
72 changes: 72 additions & 0 deletions src/raft/rpc_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,75 @@ func TestRaftRPCPurgeBinlog(t *testing.T) {
assert.NotEqual(t, want, got)
}
}

func TestRaftRPCCheckSemiSync(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
port := common.RandomPort(8000, 9000)
names, rafts, scleanup := MockRafts(log, port, 3, -1)
defer scleanup()
whoisleader := 2

// 1. set rafts GTID
// 1.0 rafts[0] with MockGTIDB{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123}
// 1.1 rafts[1] with MockGTIDB{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123}
// 1.2 rafts[2] with MockGTIDC{Master_Log_File = "mysql-bin.000005", Read_Master_Log_Pos = 123}
{
rafts[0].mysql.SetMysqlHandler(mysql.NewMockGTIDX1())
rafts[1].mysql.SetMysqlHandler(mysql.NewMockGTIDX3())
rafts[2].mysql.SetMysqlHandler(mysql.NewMockGTIDX5())
}

// 2. Start 3 rafts state as FOLLOWER
for _, raft := range rafts {
raft.Start()
}

// wait leader eggs
{
MockWaitLeaderEggs(rafts, 1)
}
// check(default is enable)
{
MockWaitLeaderEggs(rafts, 0)
MockWaitLeaderEggs(rafts, 0)
check := rafts[whoisleader].skipCheckSemiSync
assert.Equal(t, false, check)
}

// disable check semi-sync
{
c, cleanup := MockGetClient(t, names[whoisleader])
defer cleanup()

method := model.RPCRaftDisableCheckSemiSync
req := model.NewRaftStatusRPCRequest()
rsp := model.NewRaftStatusRPCResponse(model.OK)
err := c.Call(method, req, rsp)
assert.Nil(t, err)

// check
MockWaitLeaderEggs(rafts, 0)
MockWaitLeaderEggs(rafts, 0)
got := rafts[whoisleader].skipCheckSemiSync
assert.Equal(t, true, got)
}

// enable check semi-sync
{
MockWaitLeaderEggs(rafts, 1)
c, cleanup := MockGetClient(t, names[whoisleader])
defer cleanup()

method := model.RPCRaftEnableCheckSemiSync
req := model.NewRaftStatusRPCRequest()
rsp := model.NewRaftStatusRPCResponse(model.OK)
err := c.Call(method, req, rsp)
assert.Nil(t, err)

// check
MockWaitLeaderEggs(rafts, 0)
MockWaitLeaderEggs(rafts, 0)
got := rafts[whoisleader].skipCheckSemiSync
assert.Equal(t, false, got)
}
}

0 comments on commit 15f172e

Please sign in to comment.