Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7222
Browse files Browse the repository at this point in the history
close tikv#7221

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
disksing authored and ti-chi-bot committed Feb 1, 2024
1 parent 2a2b949 commit 9f67bab
Show file tree
Hide file tree
Showing 2 changed files with 281 additions and 5 deletions.
96 changes: 91 additions & 5 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,48 @@ func (m *ModeManager) Run(ctx context.Context) {
}
}

<<<<<<< HEAD:server/replication/replication_mode.go
func (m *ModeManager) tickDR() {
=======
func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int {
if rule.Role == placement.Learner {
return 0
}
var up, down int
for _, s := range upStores {
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
up++
}
}
for _, s := range downStores {
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
down++
}
}
minimalUp := rule.Count - down
if minimalUp < 0 {
minimalUp = 0
}
if minimalUp > up {
minimalUp = up
}
return minimalUp
}

func (m *ModeManager) tickUpdateState() {
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
if m.getModeName() != modeDRAutoSync {
return
}

drTickCounter.Inc()

<<<<<<< HEAD:server/replication/replication_mode.go
totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas
stores := m.checkStoreStatus()
=======
stores, storeIDs := m.checkStoreStatus()
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go

// canSync is true when every region has at least 1 replica in each DC.
canSync := len(stores[primaryDown]) < totalPrimaryPeers && len(stores[drDown]) < totalDrPeers &&
Expand All @@ -416,10 +449,17 @@ func (m *ModeManager) tickDR() {
hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers

log.Debug("replication store status",
<<<<<<< HEAD:server/replication/replication_mode.go
zap.Uint64s("up-primary", stores[primaryUp]),
zap.Uint64s("up-dr", stores[drUp]),
zap.Uint64s("down-primary", stores[primaryDown]),
zap.Uint64s("down-dr", stores[drDown]),
=======
zap.Uint64s("up-primary", storeIDs[primaryUp]),
zap.Uint64s("up-dr", storeIDs[drUp]),
zap.Uint64s("down-primary", storeIDs[primaryDown]),
zap.Uint64s("down-dr", storeIDs[drDown]),
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
zap.Bool("can-sync", canSync),
zap.Int("up-peers", upPeers),
zap.Bool("has-majority", hasMajority),
Expand Down Expand Up @@ -447,31 +487,53 @@ func (m *ModeManager) tickDR() {
case drStateSync:
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
if !canSync && hasMajority {
<<<<<<< HEAD:server/replication/replication_mode.go
m.drSwitchToAsyncWait(stores[primaryUp])
=======
m.drSwitchToAsyncWait(storeIDs[primaryUp])
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
}
case drStateAsyncWait:
if canSync {
m.drSwitchToSync()
break
}
<<<<<<< HEAD:server/replication/replication_mode.go
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, stores[primaryUp]) {
m.drSwitchToAsyncWait(stores[primaryUp])
break
}
if m.drCheckStoreStateUpdated(stores[primaryUp]) {
m.drSwitchToAsync(stores[primaryUp])
=======
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) {
m.drSwitchToAsyncWait(storeIDs[primaryUp])
break
}
if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) {
m.drSwitchToAsync(storeIDs[primaryUp])
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
}
case drStateAsync:
if canSync {
m.drSwitchToSyncRecover()
break
}
<<<<<<< HEAD:server/replication/replication_mode.go
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(stores[primaryUp]) {
m.drSwitchToAsync(stores[primaryUp])
}
case drStateSyncRecover:
if !canSync && hasMajority {
m.drSwitchToAsync(stores[primaryUp])
=======
if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) {
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateSyncRecover:
if !canSync && hasMajority {
m.drSwitchToAsync(storeIDs[primaryUp])
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
} else {
m.updateProgress()
progress := m.estimateProgress()
Expand All @@ -496,39 +558,63 @@ const (
storeStatusTypeCount
)

<<<<<<< HEAD:server/replication/replication_mode.go
func (m *ModeManager) checkStoreStatus() [][]uint64 {
m.RLock()
defer m.RUnlock()
stores := make([][]uint64, storeStatusTypeCount)
=======
func (m *ModeManager) checkStoreStatus() ([][]*core.StoreInfo, [][]uint64) {
m.RLock()
defer m.RUnlock()
stores, storeIDs := make([][]*core.StoreInfo, storeStatusTypeCount), make([][]uint64, storeStatusTypeCount)
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
for _, s := range m.cluster.GetStores() {
if s.IsRemoved() {
continue
}
// learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
if s.GetRegionCount() == s.GetLearnerCount() {
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
if down {
<<<<<<< HEAD:server/replication/replication_mode.go
stores[primaryDown] = append(stores[primaryDown], s.GetID())
} else {
stores[primaryUp] = append(stores[primaryUp], s.GetID())
=======
stores[primaryDown] = append(stores[primaryDown], s)
storeIDs[primaryDown] = append(storeIDs[primaryDown], s.GetID())
} else {
stores[primaryUp] = append(stores[primaryUp], s)
storeIDs[primaryUp] = append(storeIDs[primaryUp], s.GetID())
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
}
}
if labelValue == m.config.DRAutoSync.DR {
if down {
<<<<<<< HEAD:server/replication/replication_mode.go
stores[drDown] = append(stores[drDown], s.GetID())
} else {
stores[drUp] = append(stores[drUp], s.GetID())
=======
stores[drDown] = append(stores[drDown], s)
storeIDs[drDown] = append(storeIDs[drDown], s.GetID())
} else {
stores[drUp] = append(stores[drUp], s)
storeIDs[drUp] = append(storeIDs[drUp], s.GetID())
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
}
}
}
for i := range stores {
<<<<<<< HEAD:server/replication/replication_mode.go
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a] < stores[i][b] })
=======
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() })
sort.Slice(storeIDs[i], func(a, b int) bool { return storeIDs[i][a] < storeIDs[i][b] })
>>>>>>> cb9c70c6e (replication mode: fix wrong available store list (#7222)):pkg/replication/replication_mode.go
}
return stores
return stores, storeIDs
}

// UpdateStoreDRStatus saves the dr-autosync status of a store.
Expand Down
Loading

0 comments on commit 9f67bab

Please sign in to comment.