diff --git a/src/config/config.go b/src/config/config.go index 513b1a4..0253507 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -48,6 +48,9 @@ type RaftConfig struct { // leader heartbeat interval(ms) HeartbeatTimeout int `json:"heartbeat-timeout"` + // admit defeat count for hearbeat + AdmitDefeatHtCnt int `json:"admit-defeat-hearbeat-count"` + // election timeout(ms) ElectionTimeout int `json:"election-timeout"` @@ -76,6 +79,7 @@ func DefaultRaftConfig() *RaftConfig { return &RaftConfig{ MetaDatadir: ".", HeartbeatTimeout: 1000, + AdmitDefeatHtCnt: 10, ElectionTimeout: 3000, PurgeBinlogInterval: 1000 * 60 * 5, LeaderStartCommand: "nop", @@ -117,6 +121,9 @@ type MysqlConfig struct { // ping mysql interval(ms) PingTimeout int `json:"ping-timeout"` + // admit defeat count for ping mysql + AdmitDefeatPingCnt int `json:"admit-defeat-ping-count"` + // master system variables configure(separated by ;) MasterSysVars string `json:"master-sysvars"` @@ -135,16 +142,17 @@ type MysqlConfig struct { func DefaultMysqlConfig() *MysqlConfig { return &MysqlConfig{ - Admin: "root", - Passwd: "", - Host: "localhost", - Port: 3306, - PingTimeout: 1000, - Basedir: "/u01/mysql_20160606/", - DefaultsFile: "/etc/my3306.cnf", - ReplHost: "127.0.0.1", - ReplUser: "repl", - ReplPasswd: "repl", + Admin: "root", + Passwd: "", + Host: "localhost", + Port: 3306, + PingTimeout: 1000, + AdmitDefeatPingCnt: 2, + Basedir: "/u01/mysql_20160606/", + DefaultsFile: "/etc/my3306.cnf", + ReplHost: "127.0.0.1", + ReplUser: "repl", + ReplPasswd: "repl", } } diff --git a/src/mysql/mysql.go b/src/mysql/mysql.go index 01d6e18..0c89613 100644 --- a/src/mysql/mysql.go +++ b/src/mysql/mysql.go @@ -39,10 +39,6 @@ const ( MysqlReadwrite Option = "READWRITE" ) -var ( - downsLimits = 2 -) - // PingEntry tuple. type PingEntry struct { Relay_Master_Log_File string @@ -95,6 +91,8 @@ func (m *Mysql) Ping() { var pe *PingEntry log := m.log + downsLimits := m.conf.AdmitDefeatPingCnt + if db, err = m.getDB(); err != nil { log.Error("mysql[%v].ping.getdb.error[%v].downs:%v,downslimits:%v", m.getConnStr(), err, m.downs, downsLimits) if m.downs > downsLimits { diff --git a/src/raft/leader.go b/src/raft/leader.go index c583b13..e8b57ce 100644 --- a/src/raft/leader.go +++ b/src/raft/leader.go @@ -39,7 +39,7 @@ type Leader struct { processRequestVoteRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse // leader send heartbeat request to other followers - sendHeartbeatHandler func(*int, chan *model.RaftRPCResponse) + sendHeartbeatHandler func(*bool, chan *model.RaftRPCResponse) // leader process send heartbeat response processHeartbeatResponseHandler func(*int, *model.RaftRPCResponse) @@ -65,18 +65,23 @@ func (r *Leader) Loop() { r.stateInit() defer r.stateExit() - mysqlDowns := 0 + mysqlDown := false ackGranted := 1 lessHtAcks := 0 - maxLessHtAcks := 10 + maxLessHtAcks := r.Raft.conf.AdmitDefeatHtCnt // send heartbeat respChan := make(chan *model.RaftRPCResponse, r.getMembers()) - r.sendHeartbeatHandler(&mysqlDowns, respChan) + r.sendHeartbeatHandler(&mysqlDown, respChan) r.resetHeartbeatTimeout() for r.getState() == LEADER { + if mysqlDown { + r.WARNING("feel.mysql.down.degrade.to.follower") + r.degradeToFollower() + } + select { case <-r.fired: r.WARNING("state.machine.loop.got.fired") @@ -97,7 +102,7 @@ func (r *Leader) Loop() { ackGranted = 1 respChan = make(chan *model.RaftRPCResponse, r.getMembers()) - r.sendHeartbeatHandler(&mysqlDowns, respChan) + r.sendHeartbeatHandler(&mysqlDown, respChan) r.resetHeartbeatTimeout() case rsp := <-respChan: r.processHeartbeatResponseHandler(&ackGranted, rsp) @@ -248,10 +253,10 @@ func (r *Leader) processRequestVoteRequest(req *model.RaftRPCRequest) *model.Raf // leaderSendHeartbeatHandler // broadcast hearbeat requests to other peers of the cluster -func (r *Leader) sendHeartbeat(mysqlDowns *int, c chan *model.RaftRPCResponse) { +func (r *Leader) sendHeartbeat(mysqlDown *bool, c chan *model.RaftRPCResponse) { // check MySQL down if r.mysql.GetState() == mysql.MysqlDead { - r.WARNING("feel.mysql.down") + *mysqlDown = true return } @@ -495,7 +500,7 @@ func (r *Leader) setProcessRequestVoteRequestHandler(f func(*model.RaftRPCReques r.processRequestVoteRequestHandler = f } -func (r *Leader) setSendHeartbeatHandler(f func(*int, chan *model.RaftRPCResponse)) { +func (r *Leader) setSendHeartbeatHandler(f func(*bool, chan *model.RaftRPCResponse)) { r.sendHeartbeatHandler = f } diff --git a/src/raft/mock.go b/src/raft/mock.go index b9c6640..0ee0c59 100644 --- a/src/raft/mock.go +++ b/src/raft/mock.go @@ -208,7 +208,7 @@ func (r *Raft) mockLeaderProcessRequestVoteRequest(req *model.RaftRPCRequest) *m // mock leader send heartbeat request // nop here, so other followers will start a new leader election -func (r *Raft) mockLeaderSendHeartbeat(mysqlDownLimits *int, c chan *model.RaftRPCResponse) { +func (r *Raft) mockLeaderSendHeartbeat(mysqlDown *bool, c chan *model.RaftRPCResponse) { r.DEBUG("mock.send.nop.heartbeat.request") }