From 83766aa159e7a5a89d52b445d2c9fb6c29950ff8 Mon Sep 17 00:00:00 2001 From: tai Date: Thu, 11 Jul 2019 12:16:04 +0800 Subject: [PATCH] raft:Add INVALID state, modify raft #67 --- src/cli/cmd/raft.go | 2 +- src/config/config.go | 12 +- src/model/node.go | 2 +- src/model/raft.go | 3 +- src/mysql/api.go | 39 +++++++ src/mysql/api_test.go | 121 +++++++++++++++++++- src/mysql/mock.go | 219 +++++++++++++++++++++++++++++++++++- src/mysql/mysql_handler.go | 3 + src/mysql/mysql_test.go | 2 +- src/mysql/mysqlbase.go | 17 +++ src/mysql/mysqlbase_test.go | 2 +- src/mysql/rpc_mysql_test.go | 1 + src/raft/attr.go | 9 ++ src/raft/candidate.go | 23 +++- src/raft/follower.go | 120 ++++++++++++++++++-- src/raft/idle.go | 21 +++- src/raft/invalid.go | 207 ++++++++++++++++++++++++++++++++++ src/raft/leader.go | 40 ++++++- src/raft/mock.go | 6 + src/raft/peer.go | 30 +++++ src/raft/raft.go | 60 ++++++---- src/raft/raft_test.go | 202 +++++++++++++++++++++++++++------ src/raft/rpc_ha.go | 3 + src/raft/rpc_raft.go | 11 ++ 24 files changed, 1063 insertions(+), 92 deletions(-) create mode 100644 src/raft/invalid.go diff --git a/src/cli/cmd/raft.go b/src/cli/cmd/raft.go index bad704c..99e6522 100644 --- a/src/cli/cmd/raft.go +++ b/src/cli/cmd/raft.go @@ -213,7 +213,7 @@ func raftNodesCommandFn(cmd *cobra.Command, args []string) { func NewRaftStatusCommand() *cobra.Command { cmd := &cobra.Command{ Use: "status", - Short: "status in JSON(state(LEADER/CANDIDATE/FOLLOWER/IDLE))", + Short: "status in JSON(state(LEADER/CANDIDATE/FOLLOWER/IDLE/INVALID))", Run: raftStatusCommandFn, } diff --git a/src/config/config.go b/src/config/config.go index f13e5f3..cd13930 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -239,12 +239,12 @@ func DefaultBackupConfig() *BackupConfig { UseMemory: "2GB", Parallel: 2, MysqldMonitorInterval: 1000 * 1, - Admin: "root", - Passwd: "", - Host: "localhost", - Port: 3306, - Basedir: "/u01/mysql_20160606/", - DefaultsFile: "/etc/my3306.cnf", + Admin: "root", + Passwd: "", + Host: "localhost", + Port: 3306, + Basedir: "/u01/mysql_20160606/", + DefaultsFile: "/etc/my3306.cnf", } } diff --git a/src/model/node.go b/src/model/node.go index 97c55ed..1d97b60 100644 --- a/src/model/node.go +++ b/src/model/node.go @@ -30,7 +30,7 @@ type NodeRPCResponse struct { ViewID uint64 // The State of the raft: - // FOLLOWER/CANDIDATE/LEADER/IDLE + // FOLLOWER/CANDIDATE/LEADER/IDLE/INVALID State string // The Leader endpoint of the cluster diff --git a/src/model/raft.go b/src/model/raft.go index 3f8afc8..acadfa0 100644 --- a/src/model/raft.go +++ b/src/model/raft.go @@ -15,6 +15,7 @@ const ( RAFTMYSQL_WAITUNTILAFTERGTID RAFTMYSQL_STATUS = "WaitUntilAfterGTID" ) const ( + RPCRaftPing = "RaftRPC.Ping" RPCRaftHeartbeat = "RaftRPC.Heartbeat" RPCRaftRequestVote = "RaftRPC.RequestVote" RPCRaftStatus = "RaftRPC.Status" @@ -39,7 +40,7 @@ type Raft struct { // The endpoint of the rpc call to To string - // The state string(LEADER/CANCIDATE/FOLLOWER/IDLE) + // The state string(LEADER/CANCIDATE/FOLLOWER/IDLE/INVALID) State string } diff --git a/src/mysql/api.go b/src/mysql/api.go index 31a8285..36d0cd5 100644 --- a/src/mysql/api.go +++ b/src/mysql/api.go @@ -114,6 +114,45 @@ func (m *Mysql) GTIDGreaterThan(gtid *model.GTID) (bool, model.GTID, error) { return cmp > 0, this, nil } +// CheckGTID use to compare the followerGTID and candidateGTID +func (m *Mysql) CheckGTID(followerGTID *model.GTID, candidateGTID *model.GTID) bool { + log := m.log + fRetrivedGTID := followerGTID.Retrieved_GTID_Set + cRetrivedGTID := candidateGTID.Retrieved_GTID_Set + + // follower never generate events, should vote, but if some one execute reset master, this may be error + // if a normal restart the follower retrived_gtid_set will be "" can't setState(INVALID) + if fRetrivedGTID == "" { + return false + } + + // candidate has none RetrivedGTID, may be none retrived_gtid_set + // this means the candidate or new leader has not written, shouldnt vote + if cRetrivedGTID == "" { + return false + } + + // gtid_sub is not none, means the follower gtid is bigger than candidate gtid + // if viewdiff<=0 it must be localcommitted + gtid_sub, err := m.GetGtidSubtract(fRetrivedGTID, cRetrivedGTID) + if err != nil { + log.Error("mysql.CheckGTID.error[%v]", err) + return false + } else if err == nil && gtid_sub != "" { + log.Warning("follower.gtid[%v].bigger.than.remote[%v]", followerGTID, candidateGTID) + return true + } + return false +} + +func (m *Mysql) GetGtidSubtract(subsetGTID string, setGTID string) (string, error) { + db, err := m.getDB() + if err != nil { + return "", err + } + return m.mysqlHandler.GetGtidSubtract(db, subsetGTID, setGTID) +} + // StartSlaveIOThread used to start the slave io thread. func (m *Mysql) StartSlaveIOThread() error { db, err := m.getDB() diff --git a/src/mysql/api_test.go b/src/mysql/api_test.go index b34f2d7..a6dab4b 100644 --- a/src/mysql/api_test.go +++ b/src/mysql/api_test.go @@ -178,6 +178,125 @@ func TestWaitUntilAfterGTID(t *testing.T) { assert.Nil(t, err) } +func TestCheckGTID(t *testing.T) { + db, mock, err := sqlmock.New() + assert.Nil(t, err) + defer db.Close() + var GTID1, GTID2 model.GTID + + //log + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + conf := config.DefaultMysqlConfig() + mysql := NewMysql(conf, log) + mysql.db = db + + // local is a normal follower, leader Executed_Gtid_Set is "" + { + GTID1 = model.GTID{ + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", + } + GTID2 = model.GTID{ + Retrieved_GTID_Set: "", + } + + want := false + got := mysql.CheckGTID(>ID1, >ID2) + + assert.Equal(t, want, got) + } + + // local is a normal follower Retrieved_GTID_Set is "", leader Executed_Gtid_Set is "" + { + GTID1 = model.GTID{ + Retrieved_GTID_Set: "", + } + GTID2 = model.GTID{ + Retrieved_GTID_Set: "", + } + + want := false + got := mysql.CheckGTID(>ID1, >ID2) + + assert.Equal(t, want, got) + } + + // local is a normal follower Retrieved_GTID_Set is "", leader do some dml + { + GTID1 = model.GTID{ + Retrieved_GTID_Set: "", + } + GTID2 = model.GTID{ + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", + } + + want := false + got := mysql.CheckGTID(>ID1, >ID2) + + assert.Equal(t, want, got) + } + + // local is a leader bug sprain, remote has leader but has none write + { + GTID1 = model.GTID{ + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", + } + GTID2 = model.GTID{ + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", + } + + query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-160','84030605-66aa-11e6-9465-52540e7fd51c:1-160'\\) as gtid_sub" + log.Warning("%v", query) + columns := []string{"gtid_sub"} + mockRows := sqlmock.NewRows(columns).AddRow("") + mock.ExpectQuery(query).WillReturnRows(mockRows) + + want := false + got := mysql.CheckGTID(>ID1, >ID2) + + assert.Equal(t, want, got) + } + + // local is a leader bug sprain, remote has leader has writen + { + GTID1 = model.GTID{ + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160", + } + GTID2 = model.GTID{ + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10", + } + + query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-160','84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10'\\) as gtid_sub" + columns := []string{"gtid_sub"} + mockRows := sqlmock.NewRows(columns).AddRow("") + mock.ExpectQuery(query).WillReturnRows(mockRows) + + want := false + got := mysql.CheckGTID(>ID1, >ID2) + + assert.Equal(t, want, got) + } + + // local is a leader bug sprain and localcommitted, remote has leader has writen + { + GTID1 = model.GTID{ + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-161", + } + GTID2 = model.GTID{ + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10", + } + + query := "SELECT GTID_SUBTRACT\\('84030605-66aa-11e6-9465-52540e7fd51c:1-161','84030605-66aa-11e6-9465-52540e7fd51c:1-160, 84030605-77bb-11e6-9465-52540e7fd51c:1-10'\\) as gtid_sub" + columns := []string{"gtid_sub"} + mockRows := sqlmock.NewRows(columns).AddRow("84030605-66aa-11e6-9465-52540e7fd51c:161") + mock.ExpectQuery(query).WillReturnRows(mockRows) + + want := true + got := mysql.CheckGTID(>ID1, >ID2) + + assert.Equal(t, want, got) + } +} + func TestGTIDGreaterThan(t *testing.T) { db, mock, err := sqlmock.New() assert.Nil(t, err) @@ -402,7 +521,7 @@ func TestGetGTID(t *testing.T) { want := model.GTID{Master_Log_File: "mysql-bin.000001", Read_Master_Log_Pos: 147, - Retrieved_GTID_Set: "", + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", Slave_IO_Running: true, Slave_SQL_Running: true, diff --git a/src/mysql/mock.go b/src/mysql/mock.go index d4457c7..2d6fa5e 100644 --- a/src/mysql/mock.go +++ b/src/mysql/mock.go @@ -34,6 +34,8 @@ type MockGTID struct { ChangeMasterToFn func(*sql.DB, *model.Repl) error ChangeToMasterFn func(*sql.DB) error WaitUntilAfterGTIDFn func(*sql.DB, string) error + GetGtidSubtractFn func(*sql.DB, string, string) (string, error) + CheckGTIDFn func(*model.GTID, *model.GTID) bool SetGlobalSysVarFn func(*sql.DB, string) error ResetMasterFn func(*sql.DB) error ResetSlaveAllFn func(*sql.DB) error @@ -149,6 +151,20 @@ func (mogtid *MockGTID) WaitUntilAfterGTID(db *sql.DB, targetGTID string) error return mogtid.WaitUntilAfterGTIDFn(db, targetGTID) } +// DefaultGetGtidSubtract mock. +func DefaultGetGtidSubtract(db *sql.DB, slaveGTID string, masterGTID string) (string, error) { + return "", nil +} + +func DefaultCheckGTID(followerGTID *model.GTID, leaderGTID *model.GTID) bool { + return false +} + +// GetGtidSubtract mock. +func (mogtid *MockGTID) GetGtidSubtract(db *sql.DB, slaveGTID string, masterGTID string) (string, error) { + return mogtid.GetGtidSubtractFn(db, slaveGTID, masterGTID) +} + // DefaultPing mock. func DefaultPing(db *sql.DB) (*PingEntry, error) { return &PingEntry{}, nil @@ -369,6 +385,8 @@ func defaultMockGTID() *MockGTID { mock.ChangeMasterToFn = DefaultChangeMasterTo mock.ChangeToMasterFn = DefaultChangeToMaster mock.WaitUntilAfterGTIDFn = DefaultWaitUntilAfterGTID + mock.GetGtidSubtractFn = DefaultGetGtidSubtract + mock.CheckGTIDFn = DefaultCheckGTID mock.SetGlobalSysVarFn = DefaultSetGlobalSysVar mock.ResetMasterFn = DefaultResetMaster mock.ResetSlaveAllFn = DefaultResetSlaveAll @@ -440,6 +458,92 @@ func NewMockGTIDA() *MockGTID { return mock } +// GetSlaveGTIDLC mock. +func GetSlaveGTIDLC(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + gtid.Master_Log_File = "" + gtid.Read_Master_Log_Pos = 0 + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + gtid.Slave_IO_Running_Str = "Yes" + gtid.Slave_SQL_Running_Str = "Yes" + gtid.Seconds_Behind_Master = "1" + gtid.Last_Error = "" + gtid.Slave_SQL_Running_State = "Slave has read all relay log; waiting for the slave I/O thread to update it" + gtid.Executed_GTID_Set = `052077a5-b6f4-ee1b-61ec-d80a8b27d749:1-37, + 12446bf7-3219-11e5-9434-080027079e3d:8058-963126` + gtid.Retrieved_GTID_Set = `052077a5-b6f4-ee1b-61ec-d80a8b27d749:1-36, + 12446bf7-3219-11e5-9434-080027079e3d:8058-963126` + return gtid, nil +} + +// GetMasterGTIDLC mock. +func GetMasterGTIDLC(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + gtid.Master_Log_File = "" + gtid.Read_Master_Log_Pos = 0 + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + gtid.Seconds_Behind_Master = "0" + gtid.Last_Error = "" + gtid.Slave_SQL_Running_State = "" + gtid.Executed_GTID_Set = `052077a5-b6f4-ee1b-61ec-d80a8b27d749:1-37, + 12446bf7-3219-11e5-9434-080027079e3d:8058-963126` + gtid.Retrieved_GTID_Set = `052077a5-b6f4-ee1b-61ec-d80a8b27d749:1-37, + 12446bf7-3219-11e5-9434-080027079e3d:8058-963126` + return gtid, nil +} + +func CheckGTIDLC(*model.GTID, *model.GTID) bool { + return true +} + +// NewMockGTIDLC mock +func NewMockGTIDLC() *MockGTID { + mock := defaultMockGTID() + mock.GetMasterGTIDFn = GetMasterGTIDLC + mock.GetSlaveGTIDFn = GetMasterGTIDLC + mock.CheckGTIDFn = CheckGTIDLC + mock.GetGtidSubtractFn = GetGtidSubtractInvalid + return mock +} + +// NewMockGTIDAA mock. +// with GTID{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} +// all functions return is OK +func NewMockGTIDAA() *MockGTID { + mock := defaultMockGTID() + mock.GetMasterGTIDFn = GetMasterGTIDAA + mock.GetSlaveGTIDFn = GetSlaveGTIDAA + return mock +} + +// GetSlaveGTIDB mock. +func GetSlaveGTIDAA(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + + gtid.Master_Log_File = "mysql-bin.000001" + gtid.Read_Master_Log_Pos = 122 + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1" + gtid.Retrieved_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1" + return gtid, nil +} + +// GetMasterGTIDB mock. +func GetMasterGTIDAA(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + + gtid.Master_Log_File = "mysql-bin.000001" + gtid.Read_Master_Log_Pos = 122 + gtid.Executed_GTID_Set = "" + gtid.Retrieved_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1" + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + return gtid, nil +} + // NewMockGTIDB mock. // with GTID{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} // all functions return is OK @@ -458,6 +562,8 @@ func GetSlaveGTIDB(db *sql.DB) (*model.GTID, error) { gtid.Read_Master_Log_Pos = 123 gtid.Slave_IO_Running = true gtid.Slave_SQL_Running = true + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1-2" + gtid.Retrieved_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1-2" return gtid, nil } @@ -467,6 +573,41 @@ func GetMasterGTIDB(db *sql.DB) (*model.GTID, error) { gtid.Master_Log_File = "mysql-bin.000001" gtid.Read_Master_Log_Pos = 123 + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1-2" + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + return gtid, nil +} + +// NewMockGTIDBB mock. +// with GTID{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} +// all functions return is OK +func NewMockGTIDBB() *MockGTID { + mock := defaultMockGTID() + mock.GetMasterGTIDFn = GetMasterGTIDBB + mock.GetSlaveGTIDFn = GetSlaveGTIDBB + return mock +} + +// GetSlaveGTIDBB mock. +func GetSlaveGTIDBB(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + + gtid.Master_Log_File = "mysql-bin.000001" + gtid.Read_Master_Log_Pos = 123 + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1, df24366e-bbbb-bbbb-bbbb-525433b6dbaa:1" + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + return gtid, nil +} + +// GetMasterGTIDBB mock. +func GetMasterGTIDBB(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + + gtid.Master_Log_File = "mysql-bin.000001" + gtid.Read_Master_Log_Pos = 123 + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1, df24366e-inva-bbbb-bbbb-525433b6dbaa:1" gtid.Slave_IO_Running = true gtid.Slave_SQL_Running = true return gtid, nil @@ -491,14 +632,38 @@ func GetSlaveGTIDC(db *sql.DB) (*model.GTID, error) { gtid.Slave_SQL_Running = true gtid.Slave_IO_Running_Str = "Yes" gtid.Slave_SQL_Running_Str = "Yes" + gtid.Retrieved_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1-3" return gtid, nil } // GetMasterGTIDC mock. func GetMasterGTIDC(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + gtid.Master_Log_File = "mysql-bin.000001" + gtid.Read_Master_Log_Pos = 125 + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1-3" + gtid.Retrieved_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1-3" + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + return gtid, nil +} + +// NewMockGTIDCC mock. +// with GTID{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 124} +// all functions return is OK +func NewMockGTIDCC() *MockGTID { + mock := defaultMockGTID() + mock.GetMasterGTIDFn = GetMasterGTIDCC + mock.GetSlaveGTIDFn = GetSlaveGTIDCC + return mock +} + +// GetSlaveGTIDC mock. +func GetSlaveGTIDCC(db *sql.DB) (*model.GTID, error) { gtid := &model.GTID{} gtid.Master_Log_File = "mysql-bin.000001" gtid.Read_Master_Log_Pos = 124 + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1, df24366e-inva-bbbb-bbbb-525433b6dbaa:1" gtid.Slave_IO_Running = true gtid.Slave_SQL_Running = true gtid.Slave_IO_Running_Str = "Yes" @@ -506,6 +671,17 @@ func GetMasterGTIDC(db *sql.DB) (*model.GTID, error) { return gtid, nil } +// GetMasterGTIDC mock. +func GetMasterGTIDCC(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + gtid.Master_Log_File = "mysql-bin.000001" + gtid.Read_Master_Log_Pos = 125 + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1, df24366e-inva-bbbb-bbbb-525433b6dbaa:1" + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + return gtid, nil +} + // NewMockGTIDD mock. func NewMockGTIDD() *MockGTID { mock := defaultMockGTID() @@ -517,7 +693,8 @@ func NewMockGTIDD() *MockGTID { func GetMasterGTIDD(db *sql.DB) (*model.GTID, error) { gtid := &model.GTID{} gtid.Master_Log_File = "mysql-bin.000001" - gtid.Read_Master_Log_Pos = 124 + gtid.Read_Master_Log_Pos = 126 + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1, df24366e-inva-bbbb-bbbb-525433b6dbaa:1" gtid.Slave_IO_Running = true gtid.Slave_SQL_Running = true return gtid, nil @@ -547,6 +724,26 @@ func PingError1(db *sql.DB) (*PingEntry, error) { return nil, errors.New("MockGTIDPingError.ping.error") } +// NewMockGTIDInvalid mock. +// mock GetSlaveGTIDInvalid returns Invalid +// mock GetMasterGTIDInvalid returns Invalid +func NewMockGTIDInvalid() *MockGTID { + mock := defaultMockGTID() + mock.GetGtidSubtractFn = GetGtidSubtractInvalid + mock.GetMasterGTIDFn = GetMasterGTIDInvalid + return mock +} + +func GetMasterGTIDInvalid(db *sql.DB) (*model.GTID, error) { + gtid := &model.GTID{} + gtid.Master_Log_File = "mysql-bin.000001" + gtid.Read_Master_Log_Pos = 122 + gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1, df24366e-inva-bbbb-bbbb-525433b6dbaa:1-3" + gtid.Slave_IO_Running = true + gtid.Slave_SQL_Running = true + return gtid, nil +} + // NewMockGTIDError mock. // mock GetSlaveGTID returns error // mock GetMasterGTID returns error @@ -611,6 +808,11 @@ func WaitUntilAfterGTIDError(db *sql.DB, targetGTID string) error { return errors.New("mock.WaitUntilAfterGTID.error") } +// GetGtidSubtractInvalid mock. +func GetGtidSubtractInvalid(db *sql.DB, slaveGTID string, masterGTID string) (string, error) { + return "localcommit", nil +} + // PingError2 mock. func PingError2(db *sql.DB) (*PingEntry, error) { return nil, errors.New("MockGTIDE.ping.error") @@ -628,7 +830,8 @@ func setupRPC(rpc *xrpc.Service, mysql *Mysql) { } // NewMockGTIDX1 mock. -// with GTID{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} +// with GTID{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123, +// gtid.Executed_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1"} // all functions return is OK func NewMockGTIDX1() *MockGTID { mock := defaultMockGTID() @@ -647,6 +850,8 @@ func GetSlaveGTIDX1(db *sql.DB) (*model.GTID, error) { gtid := &model.GTID{} gtid.Master_Log_File = "mysql-bin.000001" gtid.Read_Master_Log_Pos = 123 + gtid.Retrieved_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1" + gtid.Executed_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1" gtid.Slave_IO_Running = true gtid.Slave_IO_Running_Str = "Yes" gtid.Slave_SQL_Running = true @@ -655,7 +860,8 @@ func GetSlaveGTIDX1(db *sql.DB) (*model.GTID, error) { } // NewMockGTIDX3 mock. -// with GTID{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123} +// with GTID{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123 , +// gtid.Executed_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1"} // all functions return is OK func NewMockGTIDX3() *MockGTID { mock := defaultMockGTID() @@ -674,6 +880,8 @@ func GetSlaveGTIDX3(db *sql.DB) (*model.GTID, error) { gtid := &model.GTID{} gtid.Master_Log_File = "mysql-bin.000003" gtid.Read_Master_Log_Pos = 123 + gtid.Retrieved_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1-2" + gtid.Executed_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1" gtid.Slave_IO_Running = true gtid.Slave_IO_Running_Str = "Yes" gtid.Slave_SQL_Running = true @@ -682,7 +890,8 @@ func GetSlaveGTIDX3(db *sql.DB) (*model.GTID, error) { } // NewMockGTIDX5 mock. -// with GTID{Master_Log_File = "mysql-bin.000005", Read_Master_Log_Pos = 123} +// with GTID{Master_Log_File = "mysql-bin.000005", Read_Master_Log_Pos = 123, +// gtid.Executed_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1"} // all functions return is OK func NewMockGTIDX5() *MockGTID { mock := defaultMockGTID() @@ -701,6 +910,8 @@ func GetSlaveGTIDX5(db *sql.DB) (*model.GTID, error) { gtid := &model.GTID{} gtid.Master_Log_File = "mysql-bin.000005" gtid.Read_Master_Log_Pos = 123 + gtid.Executed_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1" + gtid.Retrieved_GTID_Set = "6127a668-gtid-x555-a28d-5254335479b2:1-3" gtid.Slave_IO_Running = true gtid.Slave_IO_Running_Str = "Yes" gtid.Slave_SQL_Running = true diff --git a/src/mysql/mysql_handler.go b/src/mysql/mysql_handler.go index 11c119f..332f062 100644 --- a/src/mysql/mysql_handler.go +++ b/src/mysql/mysql_handler.go @@ -48,6 +48,9 @@ type MysqlHandler interface { // waits until slave replication reaches at least targetGTID WaitUntilAfterGTID(*sql.DB, string) error + // get gtid subtract with slavegtid and master gtid + GetGtidSubtract(*sql.DB, string, string) (string, error) + // set global variables SetGlobalSysVar(db *sql.DB, varsql string) error diff --git a/src/mysql/mysql_test.go b/src/mysql/mysql_test.go index 6fe42ec..37caadf 100644 --- a/src/mysql/mysql_test.go +++ b/src/mysql/mysql_test.go @@ -47,7 +47,7 @@ func TestStateDead(t *testing.T) { mysql.PingStop() } -func TestCreateReplUser (t *testing.T) { +func TestCreateReplUser(t *testing.T) { // log log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) port := common.RandomPort(8000, 9000) diff --git a/src/mysql/mysqlbase.go b/src/mysql/mysqlbase.go index 1491cd4..3cf353b 100644 --- a/src/mysql/mysqlbase.go +++ b/src/mysql/mysqlbase.go @@ -126,6 +126,7 @@ func (my *MysqlBase) GetMasterGTID(db *sql.DB) (*model.GTID, error) { gtid.Master_Log_File = row["File"] gtid.Read_Master_Log_Pos, _ = strconv.ParseUint(row["Position"], 10, 64) gtid.Executed_GTID_Set = row["Executed_Gtid_Set"] + gtid.Retrieved_GTID_Set = row["Executed_Gtid_Set"] gtid.Seconds_Behind_Master = "0" gtid.Slave_IO_Running = true gtid.Slave_SQL_Running = true @@ -193,6 +194,22 @@ func (my *MysqlBase) WaitUntilAfterGTID(db *sql.DB, targetGTID string) error { return Execute(db, query) } +// GetGtidSubtract used to do "SELECT GTID_SUBTRACT('subsetGTID','setGTID') as gtid_sub" command +func (my *MysqlBase) GetGtidSubtract(db *sql.DB, subsetGTID string, setGTID string) (string, error) { + query := fmt.Sprintf("SELECT GTID_SUBTRACT('%s','%s') as gtid_sub", subsetGTID, setGTID) + rows, err := QueryWithTimeout(db, reqTimeout, query) + if err != nil { + return "", err + } + + if len(rows) > 0 { + row := rows[0] + gtid_sub := row["gtid_sub"] + return gtid_sub, nil + } + return "", nil +} + // SetGlobalSysVar used to set global variables. func (my *MysqlBase) SetGlobalSysVar(db *sql.DB, varsql string) error { prefix := "SET GLOBAL" diff --git a/src/mysql/mysqlbase_test.go b/src/mysql/mysqlbase_test.go index 89d898c..f42f0ed 100644 --- a/src/mysql/mysqlbase_test.go +++ b/src/mysql/mysqlbase_test.go @@ -136,7 +136,7 @@ func TestMysqlBaseGetMasterGTID(t *testing.T) { want := model.GTID{Master_Log_File: "mysql-bin.000001", Read_Master_Log_Pos: 147, - Retrieved_GTID_Set: "", + Retrieved_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", Executed_GTID_Set: "84030605-66aa-11e6-9465-52540e7fd51c:154-160", Slave_IO_Running: true, Slave_SQL_Running: true, diff --git a/src/mysql/rpc_mysql_test.go b/src/mysql/rpc_mysql_test.go index fb22f67..241d73f 100644 --- a/src/mysql/rpc_mysql_test.go +++ b/src/mysql/rpc_mysql_test.go @@ -36,6 +36,7 @@ func TestMysqlRPCStatus(t *testing.T) { GTID := model.GTID{ Master_Log_File: "mysql-bin.000001", Read_Master_Log_Pos: 123, + Executed_GTID_Set: "c78e798a-cccc-cccc-cccc-525433e8e796:1-2", Slave_IO_Running: true, Slave_SQL_Running: true, } diff --git a/src/raft/attr.go b/src/raft/attr.go index c0c670b..d84c731 100644 --- a/src/raft/attr.go +++ b/src/raft/attr.go @@ -38,6 +38,10 @@ const ( // neither process heartbeat nor voterequest(return ErrorInvalidRequest) IDLE + // INVALID state. + // neither process heartbeat nor voterequest(return ErrorInvalidRequest) + INVALID + // STOPPED state. STOPPED ) @@ -53,6 +57,8 @@ func (s State) String() string { case 1 << 3: return "IDLE" case 1 << 4: + return "INVALID" + case 1 << 5: return "STOPPED" } return "UNKNOW" @@ -67,6 +73,9 @@ const ( // MsgRaftRequestVote type. MsgRaftRequestVote + + // MsgRaftPing type. + MsgRaftPing ) var ( diff --git a/src/raft/candidate.go b/src/raft/candidate.go index 6c4d246..3459e44 100644 --- a/src/raft/candidate.go +++ b/src/raft/candidate.go @@ -32,6 +32,9 @@ type Candidate struct { // candiadte process requestvote response processRequestVoteResponseHandler func(*int, *int, *model.RaftRPCResponse, *bool) + + // candidate process ping request handler + processPingRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse } // NewCandidate creates the new Candidate. @@ -114,8 +117,12 @@ func (r *Candidate) Loop() { req := e.request.(*model.RaftRPCRequest) rsp := r.processRequestVoteRequestHandler(req) e.response <- rsp + case MsgRaftPing: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processPingRequestHandler(req) + e.response <- rsp default: - r.ERROR("get.unkonw.request[%v]", e.Type) + r.ERROR("get.unknown.request[%v]", e.Type) } } } @@ -297,6 +304,13 @@ func (r *Candidate) processRequestVoteResponse(voteGranted *int, idleVoted *int, } } +func (r *Candidate) processPingRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.OK) + rsp.Raft.State = r.state.String() + + return rsp +} + // candidateUpgradeToLeader // 1. goto the LEADER state // 2. start the vip for public rafts @@ -328,6 +342,9 @@ func (r *Candidate) initHandlers() { // send vote requet r.setSendRequestVoteHandler(r.sendRequestVote) r.setProcessRequestVoteResponseHandler(r.processRequestVoteResponse) + + // ping request + r.setProcessPingRequestHandler(r.processPingRequest) } // for tests @@ -346,3 +363,7 @@ func (r *Candidate) setSendRequestVoteHandler(f func(chan *model.RaftRPCResponse func (r *Candidate) setProcessRequestVoteResponseHandler(f func(*int, *int, *model.RaftRPCResponse, *bool)) { r.processRequestVoteResponseHandler = f } + +func (r *Candidate) setProcessPingRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processPingRequestHandler = f +} diff --git a/src/raft/follower.go b/src/raft/follower.go index 92a3003..a88d826 100644 --- a/src/raft/follower.go +++ b/src/raft/follower.go @@ -10,6 +10,7 @@ package raft import ( "model" + "strings" "sync" ) @@ -27,6 +28,9 @@ type Follower struct { // follower process voterequest request handler processRequestVoteRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse + + // follower process raft ping request handler + processPingRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse } // NewFollower creates new Follower. @@ -41,7 +45,7 @@ func NewFollower(r *Raft) *Follower { //-------------------------------------- // State Machine //-------------------------------------- -// timeout +// timeout and ping ack greater or equal to n/2+1 // State1. FOLLOWER ------------------> CANDIDATE // func (r *Follower) Loop() { @@ -58,8 +62,8 @@ func (r *Follower) Loop() { // promotable cases: // 1. MySQL is MYSQL_ALIVE // 2. Slave_SQL_RNNNING is OK - if r.mysql.Promotable() { - r.WARNING("timeout.promote.to.candidate") + if r.fUpgradeToC && r.mysql.Promotable() { + r.WARNING("timeout.and.ping.almost.node.successed.promote.to.candidate") r.upgradeToCandidate() } @@ -86,8 +90,12 @@ func (r *Follower) Loop() { if rsp.RetCode == model.OK { r.resetElectionTimeout() } + case MsgRaftPing: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processPingRequestHandler(req) + e.response <- rsp default: - r.ERROR("get.unkonw.request[%v]", e.Type) + r.ERROR("get.unknown.request[%v]", e.Type) } } } @@ -145,9 +153,14 @@ func (r *Follower) processHeartbeatRequest(req *model.RaftRPCRequest) *model.Raf // MySQL4: change master if r.getLeader() != req.GetFrom() { - if gtid, err := r.mysql.GetGTID(); err == nil { + gtid, err := r.mysql.GetGTID() + if err == nil { r.WARNING("get.heartbeat.my.gtid.is:%v", gtid) } + + // before change master need check gtid, if local gtid bigger than remote gtid degrade to INVALID + r.degradeToInvalid(>id, &req.GTID) + r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].change.mysql.master", req.GetFrom(), req.GetViewID(), req.GetEpochID()) if err := r.mysql.ChangeMasterTo(&req.Repl); err != nil { @@ -242,11 +255,11 @@ func (r *Follower) processRequestVoteRequest(req *model.RaftRPCRequest) *model.R } // 3. check viewid(req.viewid >= thisnode.viewid) - // if the req.viewid is larger than this node, update the viewid - // if the req.viewid is equal and we have voted for other one then - // don't voted for this candidate + // if the req.viewid is larger than or equal with this node, update the viewid + // if the req.viewid is less than this node, we don't voted for other one then + // voted for this candidate { - if req.GetViewID() > r.getViewID() { + if req.GetViewID() >= r.getViewID() { r.updateView(req.GetViewID(), noLeader) } else { if (r.votedFor != noVote) && (r.votedFor != req.GetFrom()) { @@ -262,6 +275,67 @@ func (r *Follower) processRequestVoteRequest(req *model.RaftRPCRequest) *model.R return rsp } +func (r *Follower) processPingRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.OK) + rsp.Raft.State = r.state.String() + + return rsp +} + +func (r *Follower) startCheckUpgradeToC() { + r.WARNING("start.check.upgrade.to.candidate") + + var cnt int + respChan := make(chan *model.RaftRPCResponse, r.getMembers()) + r.resetCheckUpgradeToCTimeout() + go func() { + for r.getState() == FOLLOWER { + select { + case <-r.fired: + r.WARNING("state.machine.exit.startCheckUpgradeToC.exit") + case <-r.checkUpgradeTick.C: + r.WARNING("timeout.to.do.new.check.upgrade.to.candidate") + cnt = 1 + for { + if len(respChan) == 0 { + break + } + <-respChan + } + r.sendClusterPing(respChan) + r.resetCheckUpgradeToCTimeout() + case rsp := <-respChan: + if rsp.RetCode == model.OK { + if rsp.Raft.State == "LEADER" { + r.WARNING("ping.responses.includes.leader.skip") + r.fUpgradeToC = false + continue + } else if strings.Contains("FOLLOWER, CANDIDATE, IDLE", rsp.Raft.State) { + cnt++ + } + } + + if cnt >= r.GetQuorums() { + r.WARNING("ping.responses[%v].more.than.half.upgrade.to.candidate", cnt) + r.fUpgradeToC = true + continue + } + } + } + }() + r.INFO("start.checkUpgradeToC.can.UpgradeToC[%v]", r.fUpgradeToC) +} + +func (r *Follower) sendClusterPing(respChan chan *model.RaftRPCResponse) { + r.mutex.RLock() + defer r.mutex.RUnlock() + for _, peer := range r.peers { + go func(peer *Peer) { + peer.SendPing(respChan) + }(peer) + } +} + func (r *Follower) upgradeToCandidate() { // only you if len(r.peers) == 0 { @@ -283,6 +357,29 @@ func (r *Follower) upgradeToCandidate() { r.IncCandidatePromotes() } +func (r *Follower) degradeToInvalid(followerGTID *model.GTID, candidateGTID *model.GTID) { + // only you + if len(r.peers) == 0 { + r.WARNING("peers.is.null.can.not.upgrade.to.candidate") + return + } + + // stop io thread + // it will re-start again when heartbeat received + if err := r.mysql.StopSlaveIOThread(); err != nil { + r.ERROR("mysql.StopSlaveIOThread.error[%v]", err) + } + + // if error can not vote candidate + greater := r.mysql.CheckGTID(followerGTID, candidateGTID) + if greater { + // degrade to INVALID + r.setState(INVALID) + return + } + return +} + // setMySQLAsync used to setting mysql in async func (r *Follower) setMySQLAsync() { r.WARNING("mysql.waitMysqlDoneAsync.prepare") @@ -334,6 +431,7 @@ func (r *Follower) stateExit() { func (r *Follower) initHandlers() { r.setProcessHeartbeatRequestHandler(r.processHeartbeatRequest) r.setProcessRequestVoteRequestHandler(r.processRequestVoteRequest) + r.setProcessPingRequestHandler(r.processPingRequest) } // for tests @@ -344,3 +442,7 @@ func (r *Follower) setProcessHeartbeatRequestHandler(f func(*model.RaftRPCReques func (r *Follower) setProcessRequestVoteRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { r.processRequestVoteRequestHandler = f } + +func (r *Follower) setProcessPingRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processPingRequestHandler = f +} diff --git a/src/raft/idle.go b/src/raft/idle.go index 8844b54..568e126 100644 --- a/src/raft/idle.go +++ b/src/raft/idle.go @@ -30,6 +30,9 @@ type Idle struct { // idle process voterequest request handler processRequestVoteRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse + + // idle process ping request handler + processPingRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse } // NewIdle creates new Idle. @@ -67,8 +70,12 @@ func (r *Idle) Loop() { req := e.request.(*model.RaftRPCRequest) rsp := r.processRequestVoteRequest(req) e.response <- rsp + case MsgRaftPing: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processPingRequestHandler(req) + e.response <- rsp default: - r.ERROR("get.unknow.request[%v]", r.getID(), e.Type) + r.ERROR("get.unknow.request[%v].[%v]", r.getID(), e.Type) } } } @@ -177,10 +184,18 @@ func (r *Idle) stateInit() { } } +func (r *Idle) processPingRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.OK) + rsp.Raft.State = r.state.String() + + return rsp +} + // handlers func (r *Idle) initHandlers() { r.setProcessHeartbeatRequestHandler(r.processHeartbeatRequest) r.setProcessRequestVoteRequestHandler(r.processRequestVoteRequest) + r.setProcessPingRequestHandler(r.processPingRequest) } // for tests @@ -191,3 +206,7 @@ func (r *Idle) setProcessHeartbeatRequestHandler(f func(*model.RaftRPCRequest) * func (r *Idle) setProcessRequestVoteRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { r.processRequestVoteRequestHandler = f } + +func (r *Idle) setProcessPingRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processPingRequestHandler = f +} diff --git a/src/raft/invalid.go b/src/raft/invalid.go new file mode 100644 index 0000000..092591c --- /dev/null +++ b/src/raft/invalid.go @@ -0,0 +1,207 @@ +/* + * Xenon + * + * Copyright 2018 The Xenon Authors. + * Code is licensed under the GPLv3. + * + */ + +package raft + +import ( + "model" +) + +// Invalid is a special STATE with other FOLLOWER/CANDICATE/LEADER states. +// But it seems like IDLE state. +// It is usually used as READ-ONLY but does not have RAFT features, such as +// LEADER election +// FOLLOWER promotion +// +// IDLE is one member of a RAFT cluster but without the rights to vote and return ErrorInvalidRequest to CANDICATEs + +// Invalid tuple. +type Invalid struct { + *Raft + + // Invalid process heartbeat request handler + processHeartbeatRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse + + // Invalid process voterequest request handler + processRequestVoteRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse + + // Invalid process ping request handler + processPingRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse +} + +// NewInvalid creates new Invalid. +func NewInvalid(r *Raft) *Invalid { + IV := &Invalid{Raft: r} + IV.initHandlers() + return IV +} + +// Loop used to start the loop of the state machine. +//-------------------------------------- +// State Machine +//-------------------------------------- +// in INVALID state, we never do leader election +// +func (r *Invalid) Loop() { + // update begin + r.updateStateBegin() + r.stateInit() + + for r.getState() == INVALID { + select { + case <-r.fired: + r.WARNING("state.machine.loop.got.fired") + case e := <-r.c: + switch e.Type { + // 1) Heartbeat + case MsgRaftHeartbeat: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processHeartbeatRequestHandler(req) + e.response <- rsp + + // 2) RequestVote + case MsgRaftRequestVote: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processRequestVoteRequest(req) + e.response <- rsp + case MsgRaftPing: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processPingRequestHandler(req) + e.response <- rsp + default: + r.ERROR("get.unknow.request[%v].[%v]", r.getID(), e.Type) + } + } + } +} + +// processHeartbeatRequest +// EFFECT +// handles the heartbeat request from the leader +// In Invalid state, we only handle the master changed +// +func (r *Invalid) processHeartbeatRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.OK) + rsp.Raft.From = r.getID() + rsp.Raft.ViewID = r.getViewID() + rsp.Raft.EpochID = r.getEpochID() + rsp.Raft.State = r.state.String() + rsp.Relay_Master_Log_File = r.mysql.RelayMasterLogFile() + + if !r.checkRequest(req) { + rsp.RetCode = model.ErrorInvalidRequest + return rsp + } + + viewdiff := (int)(r.getViewID() - req.GetViewID()) + epochdiff := (int)(r.getEpochID() - req.GetEpochID()) + switch { + case viewdiff <= 0: + // MySQL1: disable master semi-sync because I am a slave + if err := r.mysql.DisableSemiSyncMaster(); err != nil { + r.ERROR("mysql.DisableSemiSyncMaster.error[%v]", err) + } + + // MySQL2: set mysql readonly(mysql maybe down and up then the LEADER changes) + if err := r.mysql.SetReadOnly(); err != nil { + r.ERROR("mysql.SetReadOnly.error[%v]", err) + } + + // MySQL3: start slave + if err := r.mysql.StartSlave(); err != nil { + r.ERROR("mysql.StartSlave.error[%v]", err) + } + + // MySQL4: change master + if r.getLeader() != req.GetFrom() { + r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].change.mysql.master[%+v]", req.GetFrom(), req.GetViewID(), req.GetEpochID(), req.GetGTID()) + + if err := r.mysql.ChangeMasterTo(&req.Repl); err != nil { + r.ERROR("change.master.to[FROM:%v, GTID:%v].error[%v]", req.GetFrom(), req.GetRepl(), err) + rsp.RetCode = model.ErrorChangeMaster + return rsp + } + r.leader = req.GetFrom() + } + + // view change + if viewdiff < 0 { + r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].update.view", req.GetFrom(), req.GetViewID(), req.GetEpochID()) + r.updateView(req.GetViewID(), req.GetFrom()) + } + + // epoch change + if epochdiff != 0 { + r.WARNING("get.heartbeat.from[N:%v, V:%v, E:%v].update.epoch", req.GetFrom(), req.GetViewID(), req.GetEpochID()) + r.updateEpoch(req.GetEpochID(), req.GetPeers()) + } + } + return rsp +} + +// processRequestVoteRequest +// EFFECT +// handles the requestvote request from other CANDIDATEs +// Invalid is special, it returns ErrorInvalidRequest +// +// RETURN +// 1. ErrorInvalidRequest: do not give a vote +func (r *Invalid) processRequestVoteRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.ErrorInvalidRequest) + rsp.Raft.From = r.getID() + rsp.Raft.ViewID = r.getViewID() + rsp.Raft.EpochID = r.getEpochID() + rsp.Raft.State = r.state.String() + + return rsp +} + +func (r *Invalid) stateInit() { + // 1. stop vip + if err := r.leaderStopShellCommand(); err != nil { + // TODO(array): what todo? + r.ERROR("stopshell.error[%v]", err) + } + + // MySQL1: set readonly + if err := r.mysql.SetReadOnly(); err != nil { + r.ERROR("mysql.SetReadOnly.error[%v]", err) + } + + // MySQL2. set mysql slave system variables + if err := r.mysql.SetSlaveGlobalSysVar(); err != nil { + r.ERROR("mysql.SetSlaveGlobalSysVar.error[%v]", err) + } +} + +func (r *Invalid) processPingRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.OK) + rsp.Raft.State = r.state.String() + + return rsp +} + +// handlers +func (r *Invalid) initHandlers() { + r.setProcessHeartbeatRequestHandler(r.processHeartbeatRequest) + r.setProcessRequestVoteRequestHandler(r.processRequestVoteRequest) + r.setProcessPingRequestHandler(r.processPingRequest) +} + +// for tests +func (r *Invalid) setProcessHeartbeatRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processHeartbeatRequestHandler = f +} + +func (r *Invalid) setProcessRequestVoteRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processRequestVoteRequestHandler = f +} + +func (r *Invalid) setProcessPingRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processPingRequestHandler = f +} diff --git a/src/raft/leader.go b/src/raft/leader.go index 22c8d0f..1a18f4b 100644 --- a/src/raft/leader.go +++ b/src/raft/leader.go @@ -46,6 +46,9 @@ type Leader struct { // leader process send heartbeat response processHeartbeatResponseHandler func(*int, *model.RaftRPCResponse) + + // leader process ping request handler + processPingRequestHandler func(*model.RaftRPCRequest) *model.RaftRPCResponse } // NewLeader creates new Leader. @@ -68,6 +71,7 @@ func (r *Leader) Loop() { r.stateInit() defer r.stateExit() + incViewID := true mysqlDown := false ackGranted := 1 @@ -103,6 +107,14 @@ func (r *Leader) Loop() { } } else { lessHtAcks = 0 + if incViewID { + if ackGranted != r.getMembers() { + r.updateView(r.getViewID()+2, r.GetLeader()) + incViewID = false + } + } else if ackGranted == r.getMembers() { + incViewID = true + } } ackGranted = 1 @@ -123,8 +135,12 @@ func (r *Leader) Loop() { req := e.request.(*model.RaftRPCRequest) rsp := r.processRequestVoteRequestHandler(req) e.response <- rsp + case MsgRaftPing: + req := e.request.(*model.RaftRPCRequest) + rsp := r.processPingRequestHandler(req) + e.response <- rsp default: - r.ERROR("get.unkonw.request[%+v]", e.Type) + r.ERROR("get.unknown.request[%+v]", e.Type) } } } @@ -209,9 +225,9 @@ func (r *Leader) processRequestVoteRequest(req *model.RaftRPCRequest) *model.Raf r.WARNING("get.voterequest.from[%+v]", *req) // 1. check viewid - // request viewid is from an old view, reject + // request viewid is from an old view or equal with me, reject { - if req.GetViewID() < r.getViewID() { + if req.GetViewID() <= r.getViewID() { r.WARNING("get.requestvote.from[N:%v, V:%v, E:%v].stale.viewid", req.GetFrom(), req.GetViewID(), req.GetEpochID()) rsp.RetCode = model.ErrorInvalidViewID return rsp @@ -241,9 +257,9 @@ func (r *Leader) processRequestVoteRequest(req *model.RaftRPCRequest) *model.Raf } } - // 3. update viewid + // 3. update viewid, if Candidate viewid equal with Leader viewid don't update viewid { - if req.GetViewID() >= r.getViewID() { + if req.GetViewID() > r.getViewID() { r.WARNING("get.requestvote.from[N:%v, V:%v, E:%v].degrade.to.follower", req.GetFrom(), req.GetViewID(), req.GetEpochID()) r.updateView(req.GetViewID(), noLeader) // downgrade to FOLLOWER @@ -312,6 +328,13 @@ func (r *Leader) processHeartbeatResponse(ackGranted *int, rsp *model.RaftRPCRes } +func (r *Leader) processPingRequest(req *model.RaftRPCRequest) *model.RaftRPCResponse { + rsp := model.NewRaftRPCResponse(model.OK) + rsp.Raft.State = r.state.String() + + return rsp +} + func (r *Leader) degradeToFollower() { r.WARNING("degrade.to.follower.stop.the.vip...") if err := r.leaderStopShellCommand(); err != nil { @@ -506,6 +529,9 @@ func (r *Leader) initHandlers() { // send heartbeat r.setSendHeartbeatHandler(r.sendHeartbeat) r.setProcessHeartbeatResponseHandler(r.processHeartbeatResponse) + + // ping request + r.setProcessPingRequestHandler(r.processPingRequest) } // for tests @@ -524,3 +550,7 @@ func (r *Leader) setSendHeartbeatHandler(f func(*bool, chan *model.RaftRPCRespon func (r *Leader) setProcessHeartbeatResponseHandler(f func(*int, *model.RaftRPCResponse)) { r.processHeartbeatResponseHandler = f } + +func (r *Leader) setProcessPingRequestHandler(f func(*model.RaftRPCRequest) *model.RaftRPCResponse) { + r.processPingRequestHandler = f +} diff --git a/src/raft/mock.go b/src/raft/mock.go index a1dcf01..1bb8221 100644 --- a/src/raft/mock.go +++ b/src/raft/mock.go @@ -127,6 +127,12 @@ func MockWaitMySQLPingTimeout() { time.Sleep(time.Millisecond * time.Duration(pingTimeout)) } +// MockWaitHeartBeatTimeout used to wait mysql ping timeout. +func MockWaitHeartBeatTimeout() { + hbTimeout := config.DefaultRaftConfig().HeartbeatTimeout * 6 + time.Sleep(time.Millisecond * time.Duration(hbTimeout)) +} + // MockWaitLeaderEggs mock. // wait the leader eggs when leadernums >0 // if leadernums == 0, we just want to sleep for a heartbeat broadcast diff --git a/src/raft/peer.go b/src/raft/peer.go index 3602d58..177d3ab 100644 --- a/src/raft/peer.go +++ b/src/raft/peer.go @@ -46,6 +46,7 @@ func (p *Peer) sendHeartbeat(c chan *model.RaftRPCResponse) { req.Raft.Leader = p.raft.getLeader() req.Peers = p.raft.getPeers() req.Repl = p.raft.mysql.GetRepl() + req.GTID, _ = p.raft.mysql.GetGTID() client, cleanup, err := p.NewClient() if err != nil { @@ -111,6 +112,35 @@ func (p *Peer) sendRequestVote(c chan *model.RaftRPCResponse) { c <- rsp } +// follower SendPing +func (p *Peer) SendPing(c chan *model.RaftRPCResponse) { + // response + rsp := model.NewRaftRPCResponse(model.OK) + + // request body + req := model.NewRaftRPCRequest() + + client, cleanup, err := p.NewClient() + if err != nil { + p.raft.ERROR("ping.peer[%v].new.client.error[%v]", p.getID(), err) + rsp.RetCode = model.ErrorRPCCall + c <- rsp + return + } + defer cleanup() + + method := model.RPCRaftPing + err = client.CallTimeout(p.requestTimeout, method, req, rsp) + if err != nil { + p.raft.ERROR("ping.peer.[%v].client.call.error[%v]", p.getID(), err) + rsp.RetCode = model.ErrorRPCCall + c <- rsp + return + } + p.raft.WARNING("send.ping.to.peer[%v].client.call.ok.rsp[%v]", p.getID(), rsp) + c <- rsp +} + // NewClient creates new client. func (p *Peer) NewClient() (*xrpc.Client, func(), error) { client, err := xrpc.NewClient(p.connectionStr, p.requestTimeout) diff --git a/src/raft/raft.go b/src/raft/raft.go index 0198bcf..55318b7 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -40,30 +40,33 @@ type RaftMeta struct { // Raft tuple. type Raft struct { - log *xlog.Log - mysql *mysql.Mysql - cmd common.Command - conf *config.RaftConfig - leader string - votedFor string - id string - fired chan bool - state State - meta *RaftMeta - mutex sync.RWMutex - lock sync.WaitGroup - heartbeatTick *time.Timer - electionTick *time.Timer - checkVotesTick *time.Timer - stateBegin time.Time - c chan *ev - L *Leader - C *Candidate - F *Follower - I *Idle - peers map[string]*Peer - stats model.RaftStats - skipPurgeBinlog bool // if true, purge binlog will skipped + log *xlog.Log + mysql *mysql.Mysql + cmd common.Command + conf *config.RaftConfig + leader string + votedFor string + id string + fired chan bool + state State + meta *RaftMeta + mutex sync.RWMutex + lock sync.WaitGroup + heartbeatTick *time.Timer + electionTick *time.Timer + checkUpgradeTick *time.Timer + checkVotesTick *time.Timer + stateBegin time.Time + c chan *ev + L *Leader + C *Candidate + F *Follower + I *Idle + IV *Invalid + peers map[string]*Peer + stats model.RaftStats + skipPurgeBinlog bool // if true, purge binlog will skipped + fUpgradeToC bool // if true, follower can upgrade to candidate } // NewRaft creates the new raft. @@ -85,6 +88,7 @@ func NewRaft(id string, conf *config.RaftConfig, log *xlog.Log, mysql *mysql.Mys r.C = NewCandidate(r) r.F = NewFollower(r) r.I = NewIdle(r) + r.IV = NewInvalid(r) // setup raft timeout r.resetHeartbeatTimeout() @@ -220,6 +224,7 @@ func (r *Raft) stateLoop() { for state != STOPPED { switch state { case FOLLOWER: + r.F.startCheckUpgradeToC() r.F.Loop() case CANDIDATE: r.C.Loop() @@ -227,6 +232,8 @@ func (r *Raft) stateLoop() { r.L.Loop() case IDLE: r.I.Loop() + case INVALID: + r.IV.Loop() } state = r.getState() } @@ -303,6 +310,11 @@ func (r *Raft) resetElectionTimeout() { r.electionTick = common.RandomTimeout(r.getElectionTimeout()) } +func (r *Raft) resetCheckUpgradeToCTimeout() { + common.NormalTimerRelaese(r.checkUpgradeTick) + r.checkUpgradeTick = common.RandomTimeout(r.getElectionTimeout() / 2) +} + func (r *Raft) resetCheckVotesTimeout() { // timeout is 1/2 of electiontimout common.NormalTimerRelaese(r.checkVotesTick) diff --git a/src/raft/raft_test.go b/src/raft/raft_test.go index c380900..485f519 100644 --- a/src/raft/raft_test.go +++ b/src/raft/raft_test.go @@ -227,8 +227,92 @@ func TestRaftLeaderDown(t *testing.T) { for _, raft := range rafts { got += raft.getState() } - // [CANDIDATE, STOPPED, STOPPED] - want = (CANDIDATE + STOPPED + STOPPED) + // [FOLLOWER, STOPPED, STOPPED] + want = (FOLLOWER + STOPPED + STOPPED) + assert.Equal(t, want, got) + } +} + +// TEST EFFECTS: +// test the leader localcommit case +// +// TEST PROCESSES: +// 1. Start 3 rafts state as FOLLOWER +// 2. wait leader election from 3 FOLLOWER +// 3. Stop leader wait new leader election from 2 FOLLOWER +// remock old leader to localcommit then wait a heartbeat timeout +func TestRaftLeaderLocalCommit(t *testing.T) { + var want, got State + var whoisleader int + var leader *Raft + + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + port := common.RandomPort(8000, 9000) + _, rafts, cleanup := MockRafts(log, port, 3) + defer cleanup() + + // 1. Start 3 rafts state as FOLLOWER + { + for _, raft := range rafts { + raft.Start() + } + + got = 0 + want = (FOLLOWER + FOLLOWER + FOLLOWER) + for _, raft := range rafts { + got += raft.getState() + } + + // [FOLLOWER, FOLLOWER, FOLLOWER] + assert.Equal(t, want, got) + } + + // 2. wait leader election from 3 FOLLOWER + { + MockWaitLeaderEggs(rafts, 1) + + whoisleader = 0 + got = 0 + want = (LEADER + FOLLOWER + FOLLOWER) + for i, raft := range rafts { + got += raft.getState() + if raft.getState() == LEADER { + whoisleader = i + } + } + // [LEADER, FOLLOWER, FOLLOWER] + assert.Equal(t, want, got) + } + + leader = rafts[whoisleader] + leader.mysql.SetMysqlHandler(mysql.NewMockGTIDLC()) + + // 3. stop leader wait new leader election from 2 FOLLOWER + // remock leader to localcommit + + { + leader.Stop() + + MockWaitLeaderEggs(rafts, 1) + got = 0 + want = (LEADER + FOLLOWER + STOPPED) + for _, raft := range rafts { + got += raft.getState() + } + + // [LEADER, FOLLOWER, STOPPED] + assert.Equal(t, want, got) + + got = 0 + leader.mysql.SetMysqlHandler(mysql.NewMockGTIDLC()) + leader.Start() + MockWaitHeartBeatTimeout() + want = (LEADER + FOLLOWER + INVALID) + for _, raft := range rafts { + got += raft.getState() + } + + // [LEADER, FOLLOWER, INVALID] assert.Equal(t, want, got) } } @@ -262,8 +346,8 @@ func TestRaftDoubleClusterDiffraction(t *testing.T) { var want, got State var cluster1WhoIsLeader int var cluster1Leader *Raft - var cluster1WhoIsCandidate int - var cluster1Candidate *Raft + var cluster1WhoIsFollower int + var cluster1Follower *Raft var cluster2WhoIsLeader int var cluster2Leader *Raft @@ -346,15 +430,15 @@ func TestRaftDoubleClusterDiffraction(t *testing.T) { MockWaitLeaderEggs(rafts1, 0) got = 0 - want = (CANDIDATE + STOPPED) + want = (FOLLOWER + STOPPED) for i, raft := range rafts1 { got += raft.getState() - if raft.getState() == CANDIDATE { - cluster1WhoIsCandidate = i + if raft.getState() == FOLLOWER { + cluster1WhoIsFollower = i } } - // [CANDIDATE, STOPPED] + // [FOLLOWER, STOPPED] assert.Equal(t, want, got) } @@ -365,19 +449,19 @@ func TestRaftDoubleClusterDiffraction(t *testing.T) { // cluster2Leader never degrade with this hook cluster2Leader.L.setProcessHeartbeatResponseHandler(cluster2Leader.mockLeaderProcessSendHeartbeatResponse) - cluster1Candidate = rafts1[cluster1WhoIsCandidate] - cluster2Leader.AddPeer(cluster1Candidate.getID()) + cluster1Follower = rafts1[cluster1WhoIsFollower] + cluster2Leader.AddPeer(cluster1Follower.getID()) // wait a hearbeat broadcast - // cluster1Candidate will give ErrorInvalidRequest because cluster2Leader not one of our cluster member + // cluster1Follower will give ErrorInvalidRequest because cluster2Leader not one of our cluster member MockWaitLeaderEggs(rafts1, 0) } - // 6. add cluster2Leader to cluster1Candidate + // 6. add cluster2Leader to cluster1Follower { cluster2Leader = rafts2[cluster2WhoIsLeader] - cluster1Candidate = rafts1[cluster1WhoIsCandidate] - cluster1Candidate.AddPeer(cluster2Leader.getID()) + cluster1Follower = rafts1[cluster1WhoIsFollower] + cluster1Follower.AddPeer(cluster2Leader.getID()) // wait a hearbeat broadcast MockWaitLeaderEggs(rafts1, 0) @@ -416,7 +500,7 @@ func TestRaftDoubleClusterDiffraction(t *testing.T) { // 2. wait leader election from 3 FOLLOWER // 3. set leader handlers to mock // 4. Stop leader hearbeat -// 5. wait new-leader election from 2 FOLLOWER +// 5. wait a pingtimeout, wait new-leader election from 2 FOLLOWER // 6. old-leader get requestvote and return ErrorInvalidRequest // 7. new-leader eggs // 8. old-leader reset handlers to work @@ -482,13 +566,14 @@ func TestRaftLeaderDownAndUp(t *testing.T) { leader.L.setSendHeartbeatHandler(leader.mockLeaderSendHeartbeat) } - // 5. wait new-leader election from 2 FOLLOWER + // 5. wait a pingtimeout, wait new-leader election from 2 FOLLOWER { - MockWaitLeaderEggs(rafts, 2) + MockWaitMySQLPingTimeout() + MockWaitLeaderEggs(rafts, 1) got = 0 imoldleader := whoisleader - want = (LEADER + LEADER + FOLLOWER) + want = (LEADER + FOLLOWER + FOLLOWER) for i, raft := range rafts { got += raft.getState() if raft.getState() == LEADER { @@ -862,20 +947,56 @@ func TestRaftStartAsIDLE(t *testing.T) { } // TEST EFFECTS: -// test a cluster with 17 rafts +// test run as FOLLOWER // // TEST PROCESSES: -// 1. Start 17 rafts state as FOLLOWER -// 2. wait leader election from 32 FOLLOWERs +// 1. Start 1 raft as FOLLOWER +// 2. check the STATE still FOLLOWER +func TestRaftStartAsFOLLOWER(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + conf := config.DefaultRaftConfig() + port := common.RandomPort(8100, 8200) + _, rafts, cleanup := MockRaftsWithConfig(log, conf, port, 1) + defer cleanup() + + // 1. Start rafts + { + for _, raft := range rafts { + raft.Start() + } + } + + // 2. check state + { + want := FOLLOWER + got := rafts[0].getState() + assert.Equal(t, want, got) + } + + // 3. wait a election timeout check state + { + MockWaitLeaderEggs(rafts, 0) + want := FOLLOWER + got := rafts[0].getState() + assert.Equal(t, want, got) + } +} + +// TEST EFFECTS: +// test a cluster with 11 rafts +// +// TEST PROCESSES: +// 1. Start 11 rafts state as FOLLOWER +// 2. wait leader election from 10 FOLLOWERs // 3. Stop the leader // 4. wait the new leader eggs // 5. Byzantine Failures Attack -func TestRaft17Rafts1Cluster(t *testing.T) { +func TestRaft11Rafts1Cluster(t *testing.T) { var raftnums State var whoisleader int var leader *Raft - raftnums = 17 + raftnums = 11 log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) port := common.RandomPort(8600, 9000) _, rafts, cleanup := MockRafts(log, port, int(raftnums)) @@ -921,8 +1042,8 @@ func TestRaft17Rafts1Cluster(t *testing.T) { assert.Equal(t, want, got) } - // 3. Stop the leader(mock to IDLE) - MockStateTransition(leader, IDLE) + // 3. Stop the leader(mock to INVALID) + MockStateTransition(leader, INVALID) // 4. wait the new leader eggs { @@ -938,12 +1059,12 @@ func TestRaft17Rafts1Cluster(t *testing.T) { } var got State - want := (LEADER + IDLE + FOLLOWER*(raftnums-2)) + want := (LEADER + INVALID + FOLLOWER*(raftnums-2)) for _, raft := range rafts { got += raft.getState() } - //want = (LEADER + IDLE + FOLLOWER*(raftnums-2)) + //want = (LEADER + INVALID + FOLLOWER*(raftnums-2)) assert.Equal(t, want, got) } @@ -967,8 +1088,10 @@ func TestRaft17Rafts1Cluster(t *testing.T) { // TEST PROCESSES: // 1. set rafts GTID // 1.0 rafts[0] with MockGTIDB{Master_Log_File = "", Read_Master_Log_Pos = 0} -// 1.1 rafts[1] with MockGTIDB{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} -// 1.2 rafts[2] with MockGTIDC{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 124} +// 1.1 rafts[1] with MockGTIDB{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123 +// gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1"} +// 1.2 rafts[2] with MockGTIDC{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 124 +// gtid.Executed_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:2"} // 2. Start 3 rafts state as FOLLOWER // 3. wait rafts[2] elected as leader // 4. Stop rafts[2] @@ -981,14 +1104,19 @@ func TestRaftLeaderWithGTID(t *testing.T) { _, rafts, cleanup := MockRafts(log, port, 3) defer cleanup() + GTIDAIDX := 0 GTIDBIDX := 1 GTIDCIDX := 2 // 1. set rafts GTID - // 1.0 rafts[0] with MockGTIDB{Master_Log_File = "", Read_Master_Log_Pos = 0} - // 1.1 rafts[1] with MockGTIDB{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} - // 1.2 rafts[2] with MockGTIDC{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 124} - { + // 1.0 rafts[0] with MockGTIDAA{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 122, + // gtid.Retrieved_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1"} + // 1.1 rafts[1] with MockGTIDB{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123, + // gtid.Retrieved_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1-2"} + // 1.2 rafts[2] with MockGTIDC{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 124, + // gtid.Retrieved_GTID_Set = "c78e798a-cccc-cccc-cccc-525433e8e796:1-3"} + { + rafts[GTIDAIDX].mysql.SetMysqlHandler(mysql.NewMockGTIDAA()) rafts[GTIDBIDX].mysql.SetMysqlHandler(mysql.NewMockGTIDB()) rafts[GTIDCIDX].mysql.SetMysqlHandler(mysql.NewMockGTIDC()) } @@ -1251,9 +1379,9 @@ func TestRaftChangeMasterToFail(t *testing.T) { defer cleanup() // 1. set rafts GTID - // 1.0 rafts[0] with MockGTIDB{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} - // 1.1 rafts[1] with MockGTIDB{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123} - // 1.2 rafts[2] with MockGTIDC{Master_Log_File = "mysql-bin.000005", Read_Master_Log_Pos = 123} + // 1.0 rafts[0] with MockGTIDX1{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} + // 1.1 rafts[1] with MockGTIDX3{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123} + // 1.2 rafts[2] with MockGTIDX5{Master_Log_File = "mysql-bin.000005", Read_Master_Log_Pos = 123} { rafts[0].mysql.SetMysqlHandler(mysql.NewMockGTIDX1()) rafts[1].mysql.SetMysqlHandler(mysql.NewMockGTIDX3()) @@ -1399,6 +1527,7 @@ func TestRaft1Nodes(t *testing.T) { } } +/* // TEST EFFECTS: // test the 2nodes of cluster // @@ -1513,6 +1642,7 @@ func TestRaft2NodesWithGTID(t *testing.T) { assert.Equal(t, 0, whoisleader) } } +*/ // TEST EFFECTS: // test the leader heartbeat acks less than the quorum. diff --git a/src/raft/rpc_ha.go b/src/raft/rpc_ha.go index 0450205..c6931d2 100644 --- a/src/raft/rpc_ha.go +++ b/src/raft/rpc_ha.go @@ -78,6 +78,9 @@ func (h *HARPC) HATryToLeader(req *model.HARPCRequest, rsp *model.HARPCResponse) case IDLE: rsp.RetCode = model.ErrorInvalidRequest return nil + case INVALID: + rsp.RetCode = model.ErrorInvalidRequest + return nil } // promotable cases: // 1. MySQL is MYSQL_ALIVE diff --git a/src/raft/rpc_raft.go b/src/raft/rpc_raft.go index ce291ba..b319ee3 100644 --- a/src/raft/rpc_raft.go +++ b/src/raft/rpc_raft.go @@ -17,6 +17,17 @@ type RaftRPC struct { raft *Raft } +// Ping rpc. +// send MsgRaftPing +func (r *RaftRPC) Ping(req *model.RaftRPCRequest, rsp *model.RaftRPCResponse) error { + ret, err := r.raft.send(MsgRaftPing, req) + if err != nil { + return err + } + *rsp = *ret.(*model.RaftRPCResponse) + return nil +} + // Heartbeat rpc. func (r *RaftRPC) Heartbeat(req *model.RaftRPCRequest, rsp *model.RaftRPCResponse) error { ret, err := r.raft.send(MsgRaftHeartbeat, req)