Skip to content

Commit

Permalink
raft:Add INVALID state, modify raft #67
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason authored and BohuTANG committed Jul 15, 2019
1 parent 9a80276 commit df24ca0
Show file tree
Hide file tree
Showing 24 changed files with 1,063 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/cli/cmd/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func raftNodesCommandFn(cmd *cobra.Command, args []string) {
func NewRaftStatusCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "status",
Short: "status in JSON(state(LEADER/CANDIDATE/FOLLOWER/IDLE))",
Short: "status in JSON(state(LEADER/CANDIDATE/FOLLOWER/IDLE/INVALID))",
Run: raftStatusCommandFn,
}

Expand Down
12 changes: 6 additions & 6 deletions src/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ func DefaultBackupConfig() *BackupConfig {
UseMemory: "2GB",
Parallel: 2,
MysqldMonitorInterval: 1000 * 1,
Admin: "root",
Passwd: "",
Host: "localhost",
Port: 3306,
Basedir: "/u01/mysql_20160606/",
DefaultsFile: "/etc/my3306.cnf",
Admin: "root",
Passwd: "",
Host: "localhost",
Port: 3306,
Basedir: "/u01/mysql_20160606/",
DefaultsFile: "/etc/my3306.cnf",
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/model/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type NodeRPCResponse struct {
ViewID uint64

// The State of the raft:
// FOLLOWER/CANDIDATE/LEADER/IDLE
// FOLLOWER/CANDIDATE/LEADER/IDLE/INVALID
State string

// The Leader endpoint of the cluster
Expand Down
3 changes: 2 additions & 1 deletion src/model/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
RAFTMYSQL_WAITUNTILAFTERGTID RAFTMYSQL_STATUS = "WaitUntilAfterGTID"
)
const (
RPCRaftPing = "RaftRPC.Ping"
RPCRaftHeartbeat = "RaftRPC.Heartbeat"
RPCRaftRequestVote = "RaftRPC.RequestVote"
RPCRaftStatus = "RaftRPC.Status"
Expand All @@ -39,7 +40,7 @@ type Raft struct {
// The endpoint of the rpc call to
To string

// The state string(LEADER/CANCIDATE/FOLLOWER/IDLE)
// The state string(LEADER/CANCIDATE/FOLLOWER/IDLE/INVALID)
State string
}

Expand Down
39 changes: 39 additions & 0 deletions src/mysql/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,45 @@ func (m *Mysql) GTIDGreaterThan(gtid *model.GTID) (bool, model.GTID, error) {
return cmp > 0, this, nil
}

// CheckGTID use to compare the followerGTID and candidateGTID
func (m *Mysql) CheckGTID(followerGTID *model.GTID, candidateGTID *model.GTID) bool {
log := m.log
fRetrivedGTID := followerGTID.Retrieved_GTID_Set
cRetrivedGTID := candidateGTID.Retrieved_GTID_Set

// follower never generate events, should vote, but if some one execute reset master, this may be error
// if a normal restart the follower retrived_gtid_set will be "" can't setState(INVALID)
if fRetrivedGTID == "" {
return false
}

// candidate has none RetrivedGTID, may be none retrived_gtid_set
// this means the candidate or new leader has not written, shouldnt vote
if cRetrivedGTID == "" {
return false
}

// gtid_sub is not none, means the follower gtid is bigger than candidate gtid
// if viewdiff<=0 it must be localcommitted
gtid_sub, err := m.GetGtidSubtract(fRetrivedGTID, cRetrivedGTID)
if err != nil {
log.Error("mysql.CheckGTID.error[%v]", err)
return false
} else if err == nil && gtid_sub != "" {
log.Warning("follower.gtid[%v].bigger.than.remote[%v]", followerGTID, candidateGTID)
return true
}
return false
}

func (m *Mysql) GetGtidSubtract(subsetGTID string, setGTID string) (string, error) {
db, err := m.getDB()
if err != nil {
return "", err
}
return m.mysqlHandler.GetGtidSubtract(db, subsetGTID, setGTID)
}

// StartSlaveIOThread used to start the slave io thread.
func (m *Mysql) StartSlaveIOThread() error {
db, err := m.getDB()
Expand Down
121 changes: 120 additions & 1 deletion src/mysql/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,125 @@ func TestWaitUntilAfterGTID(t *testing.T) {
assert.Nil(t, err)
}

func TestCheckGTID(t *testing.T) {
db, mock, err := sqlmock.New()
assert.Nil(t, err)
defer db.Close()
var GTID1, GTID2 model.GTID

//log
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
conf := config.DefaultMysqlConfig()
mysql := NewMysql(conf, log)
mysql.db = db

// local is a normal follower, leader Executed_Gtid_Set is ""
{
GTID1 = model.GTID{
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160",
}
GTID2 = model.GTID{
Retrieved_GTID_Set: "",
}

want := false
got := mysql.CheckGTID(&GTID1, &GTID2)

assert.Equal(t, want, got)
}

// local is a normal follower Retrieved_GTID_Set is "", leader Executed_Gtid_Set is ""
{
GTID1 = model.GTID{
Retrieved_GTID_Set: "",
}
GTID2 = model.GTID{
Retrieved_GTID_Set: "",
}

want := false
got := mysql.CheckGTID(&GTID1, &GTID2)

assert.Equal(t, want, got)
}

// local is a normal follower Retrieved_GTID_Set is "", leader do some dml
{
GTID1 = model.GTID{
Retrieved_GTID_Set: "",
}
GTID2 = model.GTID{
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160",
}

want := false
got := mysql.CheckGTID(&GTID1, &GTID2)

assert.Equal(t, want, got)
}

// local is a leader bug sprain, remote has leader but has none write
{
GTID1 = model.GTID{
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160",
}
GTID2 = model.GTID{
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160",
}

query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-160','84030605-66aa-11e6-9465-52540e7fd51c:1-160'\\) as gtid_sub"
log.Warning("%v", query)
columns := []string{"gtid_sub"}
mockRows := sqlmock.NewRows(columns).AddRow("")
mock.ExpectQuery(query).WillReturnRows(mockRows)

want := false
got := mysql.CheckGTID(&GTID1, &GTID2)

assert.Equal(t, want, got)
}

// local is a leader bug sprain, remote has leader has writen
{
GTID1 = model.GTID{
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160",
}
GTID2 = model.GTID{
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10",
}

query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-160','84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10'\\) as gtid_sub"
columns := []string{"gtid_sub"}
mockRows := sqlmock.NewRows(columns).AddRow("")
mock.ExpectQuery(query).WillReturnRows(mockRows)

want := false
got := mysql.CheckGTID(&GTID1, &GTID2)

assert.Equal(t, want, got)
}

// local is a leader bug sprain and localcommitted, remote has leader has writen
{
GTID1 = model.GTID{
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-161",
}
GTID2 = model.GTID{
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10",
}

query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-161','84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10'\\) as gtid_sub"
columns := []string{"gtid_sub"}
mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c:161")
mock.ExpectQuery(query).WillReturnRows(mockRows)

want := true
got := mysql.CheckGTID(&GTID1, &GTID2)

assert.Equal(t, want, got)
}
}

func TestGTIDGreaterThan(t *testing.T) {
db, mock, err := sqlmock.New()
assert.Nil(t, err)
Expand Down Expand Up @@ -402,7 +521,7 @@ func TestGetGTID(t *testing.T) {

want := model.GTID{Master_Log_File: "mysql-bin.000001",
Read_Master_Log_Pos: 147,
Retrieved_GTID_Set: "",
Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160",
Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160",
Slave_IO_Running: true,
Slave_SQL_Running: true,
Expand Down
Loading

0 comments on commit df24ca0

Please sign in to comment.