Skip to content

Commit

Permalink
raft: add learner state for rebuildme
Browse files Browse the repository at this point in the history
  • Loading branch information
dbkernel authored and BohuTANG committed Nov 29, 2019
1 parent d2d871a commit 2026588
Show file tree
Hide file tree
Showing 9 changed files with 628 additions and 8 deletions.
16 changes: 16 additions & 0 deletions src/cli/callx/callx.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,22 @@ func DisableRaftRPC(node string) (*model.HARPCResponse, error) {
return rsp, err
}

func SetLearnerRPC(node string) (*model.HARPCResponse, error) {
cli, cleanup, err := GetClient(node)

if err != nil {
return nil, err
}
defer cleanup()

method := model.RPCHASetLearner
req := model.NewHARPCRequest()
rsp := model.NewHARPCResponse(model.OK)
err = cli.Call(method, req, rsp)

return rsp, err
}

func TryToLeaderRPC(node string) (*model.HARPCResponse, error) {
cli, cleanup, err := GetClient(node)
if err != nil {
Expand Down
25 changes: 17 additions & 8 deletions src/cli/cmd/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) {
log.Warning("S3-->check.bestone[%v].is.OK....", bestone)
}

// 4. disable raft
// 4. set learner
{
log.Warning("S4-->disable.raft")
if _, err := callx.DisableRaftRPC(self); err != nil {
log.Error("disableRaftRPC.error[%v]", err)
log.Warning("S4-->set.learner")
if _, err := callx.SetLearnerRPC(self); err != nil {
log.Error("SetLearnerRPC.error[%v]", err)
}
}

Expand Down Expand Up @@ -328,11 +328,20 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) {

// 16. enable raft
{
log.Warning("S16-->enable.raft.begin...")
if _, err := callx.EnableRaftRPC(self); err != nil {
log.Error("enbleRaftRPC.error[%v]", err)
// check whether the state is IDLE or not
if conf.Raft.SuperIDLE {
log.Warning("S16-->disable.raft.again...")
if _, err := callx.DisableRaftRPC(self); err != nil {
log.Error("enbleRaftRPC.error[%v]", err)
}
log.Warning("S16-->run.as.IDLE...")
} else {
log.Warning("S16-->enable.raft.begin...")
if _, err := callx.EnableRaftRPC(self); err != nil {
log.Error("enbleRaftRPC.error[%v]", err)
}
log.Warning("S16-->enable.raft.done...")
}
log.Warning("S16-->enable.raft.done...")
}

// 17. wait change to master
Expand Down
1 change: 1 addition & 0 deletions src/model/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package model

const (
RPCHASetLearner = "HARPC.HASetLearner"
RPCHADisable = "HARPC.HADisable"
RPCHAEnable = "HARPC.HAEnable"
RPCHATryToLeader = "HARPC.HATryToLeader"
Expand Down
5 changes: 5 additions & 0 deletions src/raft/attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
// neither process heartbeat nor voterequest(return ErrorInvalidRequest)
INVALID

// LEARNER state.
LEARNER

// STOPPED state.
STOPPED
)
Expand All @@ -59,6 +62,8 @@ func (s State) String() string {
case 1 << 4:
return "INVALID"
case 1 << 5:
return "LEARNER"
case 1 << 6:
return "STOPPED"
}
return "UNKNOW"
Expand Down
214 changes: 214 additions & 0 deletions src/raft/learner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Xenon
*
* Copyright 2018 The Xenon Authors.
* Code is licensed under the GPLv3.
*
*/

package raft

import (
"model"
)

// LEARNER is a special STATE with other FOLLOWER/CANDICATE/LEADER states.
// It is usually used as READ-ONLY but does not have RAFT features, such as
// LEADER election
// FOLLOWER promotion
//
// Because of we bring LEARNER state in RaftRPCResponse as vote-request response,
// the LEARNER vote will be filtered out by other CANDIDATEs.
// LEARNER is one member of a RAFT cluster but without the rights to vote.

// Learner tuple.
type Learner struct {
*Raft

// learner process heartbeat request handler
processHeartbeatRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse

// learner process voterequest request handler
processRequestVoteRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse

// learner process raft ping request handler
processPingRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse
}

// NewLearner creates new learner.
func NewLearner(r *Raft) *Learner {
B := &Learner{Raft: r}
B.initHandlers()
return B
}

// Loop used to start the loop of the state machine.
//--------------------------------------
// State Machine
//--------------------------------------
// in LEARNER state, we never do leader election
//
func (r *Learner) Loop() {
// update begin
r.updateStateBegin()
r.stateInit()

for r.getState() == LEARNER {
select {
case <-r.fired:
r.WARNING("state.machine.loop.got.fired")
case e := <-r.c:
switch e.Type {
// 1) Heartbeat
case MsgRaftHeartbeat:
req := e.request.(*model.RaftRPCRequest)
rsp := r.processHeartbeatRequestHandler(req)
e.response <- rsp

// 2) RequestVote
case MsgRaftRequestVote:
req := e.request.(*model.RaftRPCRequest)
rsp := r.processRequestVoteRequest(req)
e.response <- rsp

// 3) Ping
case MsgRaftPing:
req := e.request.(*model.RaftRPCRequest)
rsp := r.processPingRequestHandler(req)
e.response <- rsp

default:
r.ERROR("get.unknown.request[%v]", e.Type)
}
}
}
}

// processHeartbeatRequest
// EFFECT
// handles the heartbeat request from the leader
// In LEARNER state, we only handle the master changed
//
func (r *Learner) processHeartbeatRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse {
rsp := model.NewRaftRPCResponse(model.OK)
rsp.Raft.From = r.getID()
rsp.Raft.ViewID = r.getViewID()
rsp.Raft.EpochID = r.getEpochID()
rsp.Raft.State = r.state.String()
rsp.Relay_Master_Log_File = r.mysql.RelayMasterLogFile()

if !r.checkRequest(req) {
rsp.RetCode = model.ErrorInvalidRequest
return rsp
}

viewdiff := (int)(r.getViewID() - req.GetViewID())
epochdiff := (int)(r.getEpochID() - req.GetEpochID())
switch {
case viewdiff <= 0:
// MySQL1: disable master semi-sync because I am a slave
if err := r.mysql.DisableSemiSyncMaster(); err != nil {
r.ERROR("mysql.DisableSemiSyncMaster.error[%v]", err)
}

// MySQL2: set mysql readonly(mysql maybe down and up then the LEADER changes)
if err := r.mysql.SetReadOnly(); err != nil {
r.ERROR("mysql.SetReadOnly.error[%v]", err)
}

// MySQL3: start slave
if err := r.mysql.StartSlave(); err != nil {
r.ERROR("mysql.StartSlave.error[%v]", err)
}

// MySQL4: change master
if r.getLeader() != req.GetFrom() {
r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].change.mysql.master[%+v]", req.GetFrom(), req.GetViewID(), req.GetEpochID(), req.GetGTID())

if err := r.mysql.ChangeMasterTo(&req.Repl); err != nil {
r.ERROR("change.master.to[FROM:%v, GTID:%v].error[%v]", req.GetFrom(), req.GetRepl(), err)
rsp.RetCode = model.ErrorChangeMaster
return rsp
}
r.leader = req.GetFrom()
}

// view change
if viewdiff < 0 {
r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].update.view", req.GetFrom(), req.GetViewID(), req.GetEpochID())
r.updateView(req.GetViewID(), req.GetFrom())
}

// epoch change
if epochdiff != 0 {
r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].update.epoch", req.GetFrom(), req.GetViewID(), req.GetEpochID())
r.updateEpoch(req.GetEpochID(), req.GetPeers())
}
}
return rsp
}

// processRequestVoteRequest
// EFFECT
// handles the requestvote request from other CANDIDATEs
// LEARNER is special, it returns ErrorVoteNotGranted expect Request Denied
//
// RETURN
// 1. ErrorVoteNotGranted: dont give any vote
func (r *Learner) processRequestVoteRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse {
rsp := model.NewRaftRPCResponse(model.ErrorVoteNotGranted)
rsp.Raft.From = r.getID()
rsp.Raft.ViewID = r.getViewID()
rsp.Raft.EpochID = r.getEpochID()
rsp.Raft.State = r.state.String()

if !r.checkRequest(req) {
rsp.RetCode = model.ErrorInvalidRequest
return rsp
}
return rsp
}

func (r *Learner) processPingRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse {
rsp := model.NewRaftRPCResponse(model.OK)
rsp.Raft.State = r.state.String()
return rsp
}

func (r *Learner) stateInit() {
// 1. stop vip
if err := r.leaderStopShellCommand(); err != nil {
// TODO(array): what todo?
r.ERROR("stopshell.error[%v]", err)
}

// MySQL1: set readonly
if err := r.mysql.SetReadOnly(); err != nil {
r.ERROR("mysql.SetReadOnly.error[%v]", err)
}

// MySQL2. set mysql slave system variables
if err := r.mysql.SetSlaveGlobalSysVar(); err != nil {
r.ERROR("mysql.SetSlaveGlobalSysVar.error[%v]", err)
}
}

// handlers
func (r *Learner) initHandlers() {
r.setProcessHeartbeatRequestHandler(r.processHeartbeatRequest)
r.setProcessRequestVoteRequestHandler(r.processRequestVoteRequest)
r.setProcessPingRequestHandler(r.processPingRequest)
}

// for tests
func (r *Learner) setProcessHeartbeatRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) {
r.processHeartbeatRequestHandler = f
}

func (r *Learner) setProcessRequestVoteRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) {
r.processRequestVoteRequestHandler = f
}

func (r *Learner) setProcessPingRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) {
r.processPingRequestHandler = f
}
4 changes: 4 additions & 0 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Raft struct {
F *Follower
I *Idle
IV *Invalid
LN *Learner
peers map[string]*Peer
stats model.RaftStats
skipPurgeBinlog bool // if true, purge binlog will skipped
Expand All @@ -89,6 +90,7 @@ func NewRaft(id string, conf *config.RaftConfig, log *xlog.Log, mysql *mysql.Mys
r.F = NewFollower(r)
r.I = NewIdle(r)
r.IV = NewInvalid(r)
r.LN = NewLearner(r)

// setup raft timeout
r.resetHeartbeatTimeout()
Expand Down Expand Up @@ -234,6 +236,8 @@ func (r *Raft) stateLoop() {
r.I.Loop()
case INVALID:
r.IV.Loop()
case LEARNER:
r.LN.Loop()
}
state = r.getState()
}
Expand Down
Loading

0 comments on commit 2026588

Please sign in to comment.