Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <[email protected]>
  • Loading branch information
disksing committed Oct 18, 2023
1 parent d80a4f8 commit ed860a2
Showing 1 changed file with 21 additions and 24 deletions.
45 changes: 21 additions & 24 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,6 @@ func (m *ModeManager) Run(ctx context.Context) {
wg.Wait()
}

func storeIDs(stores []*core.StoreInfo) []uint64 {
ids := make([]uint64, len(stores))
for i, s := range stores {
ids[i] = s.GetID()
}
return ids
}

func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int {
if rule.Role == placement.Learner {
return 0
Expand Down Expand Up @@ -411,7 +403,7 @@ func (m *ModeManager) tickUpdateState() {

drTickCounter.Inc()

stores := m.checkStoreStatus()
stores, storeIDs := m.checkStoreStatus()

var primaryHasVoter, drHasVoter bool
var totalVoter, totalUpVoter int
Expand Down Expand Up @@ -440,10 +432,10 @@ func (m *ModeManager) tickUpdateState() {
hasMajority := totalUpVoter*2 > totalVoter

log.Debug("replication store status",
zap.Uint64s("up-primary", storeIDs(stores[primaryUp])),
zap.Uint64s("up-dr", storeIDs(stores[drUp])),
zap.Uint64s("down-primary", storeIDs(stores[primaryDown])),
zap.Uint64s("down-dr", storeIDs(stores[drDown])),
zap.Uint64s("up-primary", storeIDs[primaryUp]),
zap.Uint64s("up-dr", storeIDs[drUp]),
zap.Uint64s("down-primary", storeIDs[primaryDown]),
zap.Uint64s("down-dr", storeIDs[drDown]),
zap.Bool("can-sync", canSync),
zap.Bool("has-majority", hasMajority),
)
Expand All @@ -470,31 +462,31 @@ func (m *ModeManager) tickUpdateState() {
case drStateSync:
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
if !canSync && hasMajority {
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
m.drSwitchToAsyncWait(storeIDs[primaryUp])
}
case drStateAsyncWait:
if canSync {
m.drSwitchToSync()
break
}
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) {
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) {
m.drSwitchToAsyncWait(storeIDs[primaryUp])
break
}
if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) {
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateAsync:
if canSync {
m.drSwitchToSyncRecover()
break
}
if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs(stores[primaryUp])) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) {
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateSyncRecover:
if !canSync && hasMajority {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
m.drSwitchToAsync(storeIDs[primaryUp])
} else {
m.updateProgress()
progress := m.estimateProgress()
Expand Down Expand Up @@ -569,10 +561,10 @@ const (
storeStatusTypeCount
)

func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
func (m *ModeManager) checkStoreStatus() ([][]*core.StoreInfo, [][]uint64) {
m.RLock()
defer m.RUnlock()
stores := make([][]*core.StoreInfo, storeStatusTypeCount)
stores, storeIDs := make([][]*core.StoreInfo, storeStatusTypeCount), make([][]uint64, storeStatusTypeCount)
for _, s := range m.cluster.GetStores() {
if s.IsRemoved() {
continue
Expand All @@ -582,22 +574,27 @@ func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
if labelValue == m.config.DRAutoSync.Primary {
if down {
stores[primaryDown] = append(stores[primaryDown], s)
storeIDs[primaryDown] = append(storeIDs[primaryDown], s.GetID())
} else {
stores[primaryUp] = append(stores[primaryUp], s)
storeIDs[primaryUp] = append(storeIDs[primaryUp], s.GetID())
}
}
if labelValue == m.config.DRAutoSync.DR {
if down {
stores[drDown] = append(stores[drDown], s)
storeIDs[drDown] = append(storeIDs[drDown], s.GetID())
} else {
stores[drUp] = append(stores[drUp], s)
storeIDs[drUp] = append(storeIDs[drUp], s.GetID())
}
}
}
for i := range stores {
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() })
sort.Slice(storeIDs[i], func(a, b int) bool { return storeIDs[i][a] < storeIDs[i][b] })
}
return stores
return stores, storeIDs
}

// UpdateStoreDRStatus saves the dr-autosync status of a store.
Expand Down

0 comments on commit ed860a2

Please sign in to comment.