Skip to content

Commit

Permalink
fix: automatic fix master-slave replication relationship after master…
Browse files Browse the repository at this point in the history
… or slave service restarted (#2373, #2038, #1950, #1967, #2351))
  • Loading branch information
liuchengyu committed Feb 4, 2024
1 parent 3be4d52 commit 2ea0b92
Show file tree
Hide file tree
Showing 13 changed files with 484 additions and 277 deletions.
7 changes: 4 additions & 3 deletions codis/config/dashboard.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ migration_async_numkeys = 500
migration_timeout = "30s"

# Set configs for redis sentinel.
sentinel_check_server_state_interval = "5s"
sentinel_check_master_failover_interval = "1s"
sentinel_master_dead_check_times = 5
sentinel_check_server_state_interval = "10s"
sentinel_check_master_failover_interval = "2s"
sentinel_master_dead_check_times = 10
sentinel_check_offline_server_interval = "2s"
sentinel_client_timeout = "10s"
sentinel_quorum = 2
sentinel_parallel_syncs = 1
Expand Down
3 changes: 3 additions & 0 deletions codis/pkg/models/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ const (
ActionMigrating = "migrating"
ActionFinished = "finished"
ActionSyncing = "syncing"
ActionSynced = "synced"

ActionSyncedFailed = "synced_failed"
)
45 changes: 43 additions & 2 deletions codis/pkg/models/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,38 @@ func (g *Group) GetServersMap() map[string]*GroupServer {
return results
}

// SelectNewMaster choose a new master node in the group
func (g *Group) SelectNewMaster() (string, int) {
var newMasterServer *GroupServer
var newMasterIndex = -1

for index, server := range g.Servers {
if index == 0 || server.State != GroupServerStateNormal {
continue
}

if newMasterServer == nil {
newMasterServer = server
newMasterIndex = index
} else if server.DbBinlogFileNum > newMasterServer.DbBinlogFileNum {
// Select the slave node with the latest offset as the master node
newMasterServer = server
newMasterIndex = index
} else if server.DbBinlogFileNum == newMasterServer.DbBinlogFileNum {
if server.DbBinlogOffset > newMasterServer.DbBinlogOffset {
newMasterServer = server
newMasterIndex = index
}
}
}

if newMasterServer == nil {
return "", newMasterIndex
}

return newMasterServer.Addr, newMasterIndex
}

type GroupServerState int8

const (
Expand All @@ -33,6 +65,13 @@ const (
GroupServerStateOffline
)

type GroupServerRole string

const (
RoleMaster GroupServerRole = "master"
RoleSlave GroupServerRole = "slave"
)

type GroupServer struct {
Addr string `json:"server"`
DataCenter string `json:"datacenter"`
Expand All @@ -43,9 +82,11 @@ type GroupServer struct {
} `json:"action"`

// master or slave
Role string `json:"role"`
Role GroupServerRole `json:"role"`
// If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field
ReplyOffset int `json:"reply_offset"`
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0

// Monitoring status, 0 normal, 1 subjective offline, 2 actual offline
// If marked as 2 , no service is provided
State GroupServerState `json:"state"`
Expand Down
8 changes: 5 additions & 3 deletions codis/pkg/topom/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ migration_async_numkeys = 500
migration_timeout = "30s"
# Set configs for redis sentinel.
sentinel_check_server_state_interval = "5s"
sentinel_check_master_failover_interval = "1s"
sentinel_master_dead_check_times = 5
sentinel_check_server_state_interval = "10s"
sentinel_check_master_failover_interval = "2s"
sentinel_master_dead_check_times = 10
sentinel_check_offline_server_interval = "2s"
sentinel_client_timeout = "10s"
sentinel_quorum = 2
sentinel_parallel_syncs = 1
Expand Down Expand Up @@ -86,6 +87,7 @@ type Config struct {
SentinelCheckServerStateInterval timesize.Duration `toml:"sentinel_check_server_state_interval" json:"sentinel_client_timeout"`
SentinelCheckMasterFailoverInterval timesize.Duration `toml:"sentinel_check_master_failover_interval" json:"sentinel_check_master_failover_interval"`
SentinelMasterDeadCheckTimes int8 `toml:"sentinel_master_dead_check_times" json:"sentinel_master_dead_check_times"`
SentinelCheckOfflineServerInterval timesize.Duration `toml:"sentinel_check_offline_server_interval" json:"sentinel_check_offline_server_interval"`
SentinelClientTimeout timesize.Duration `toml:"sentinel_client_timeout" json:"sentinel_client_timeout"`
SentinelQuorum int `toml:"sentinel_quorum" json:"sentinel_quorum"`
SentinelParallelSyncs int `toml:"sentinel_parallel_syncs" json:"sentinel_parallel_syncs"`
Expand Down
2 changes: 1 addition & 1 deletion codis/pkg/topom/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (ctx *context) getSlotMapping(sid int) (*models.SlotMapping, error) {
}

func (ctx *context) getSlotMappingsByGroupId(gid int) []*models.SlotMapping {
var slots = []*models.SlotMapping{}
var slots []*models.SlotMapping
for _, m := range ctx.slots {
if m.GroupId == gid || m.Action.TargetId == gid {
slots = append(slots, m)
Expand Down
18 changes: 16 additions & 2 deletions codis/pkg/topom/topom.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ func (s *Topom) Start(routines bool) error {
}
}, nil, true, 0)

// Check the status of the pre-offline master every 1 second
// Check the status of the pre-offline master every 2 second
// to determine whether to automatically switch master and slave
gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
w, _ := s.CheckPreOffineMastersState(5 * time.Second)
w, _ := s.CheckPreOfflineMastersState(5 * time.Second)
if w != nil {
w.Wait()
}
Expand All @@ -224,6 +224,20 @@ func (s *Topom) Start(routines bool) error {
}
}, nil, true, 0)

// Check the status of the offline master and slave every 30 second
// to determine whether to automatically recover to right master-slave replication relationship
gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
w, _ := s.CheckOfflineMastersAndSlavesState(5 * time.Second)
if w != nil {
w.Wait()
}
}
time.Sleep(s.Config().SentinelCheckOfflineServerInterval.Duration())
}
}, nil, true, 0)

gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
Expand Down
Loading

0 comments on commit 2ea0b92

Please sign in to comment.