Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/priority: start the init timer when a child switch to Connecting from non-transient-failure state #5334

Merged
merged 3 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions xds/internal/balancer/priority/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ type priorityBalancer struct {
childToPriority map[string]int
// children is a map from child name to sub-balancers.
children map[string]*childBalancer
// The timer to give a priority some time to connect. And if the priority
// doesn't go into Ready/Failure, the next priority will be started.
//
// One timer is enough because there can be at most one priority in init
// state.
priorityInitTimer *timerWrapper
}

func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
Expand Down Expand Up @@ -198,27 +192,17 @@ func (b *priorityBalancer) Close() {
// Clear states of the current child in use, so if there's a race in picker
// update, it will be dropped.
b.childInUse = ""
b.stopPriorityInitTimer()
// Stop the child policies, this is necessary to stop the init timers in the
// children.
for _, child := range b.children {
child.stop()
}
}

func (b *priorityBalancer) ExitIdle() {
b.bg.ExitIdle()
}

// stopPriorityInitTimer stops the priorityInitTimer if it's not nil, and set it
// to nil.
//
// Caller must hold b.mu.
func (b *priorityBalancer) stopPriorityInitTimer() {
timerW := b.priorityInitTimer
if timerW == nil {
return
}
b.priorityInitTimer = nil
timerW.stopped = true
timerW.timer.Stop()
}

// UpdateState implements balancergroup.BalancerStateAggregator interface. The
// balancer group sends new connectivity state and picker here.
func (b *priorityBalancer) UpdateState(childName string, state balancer.State) {
Expand Down
50 changes: 49 additions & 1 deletion xds/internal/balancer/priority/balancer_child.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package priority

import (
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
Expand All @@ -36,7 +38,16 @@ type childBalancer struct {
rState resolver.State

started bool
state balancer.State
// This is set when the child reports TransientFailure, and unset when it
// reports Ready or Idle. It is used to decide whether the failover timer
// should start when the child is transitioning into Connecting. The timer
// will be restarted if the child has not reported TF more recently than it
// reported Ready or Idle.
reportedTF bool
state balancer.State
// The timer to give a priority some time to connect. And if the priority
// doesn't go into Ready/Failure, the next priority will be started.
initTimer *timerWrapper
}

// newChildBalancer creates a child balancer place holder, but doesn't
Expand Down Expand Up @@ -79,6 +90,7 @@ func (cb *childBalancer) start() {
}
cb.started = true
cb.parent.bg.Add(cb.name, cb.bb)
cb.startInitTimer()
}

// sendUpdate sends the addresses and config to the child balancer.
Expand All @@ -103,10 +115,46 @@ func (cb *childBalancer) stop() {
if !cb.started {
return
}
cb.stopInitTimer()
cb.parent.bg.Remove(cb.name)
cb.started = false
cb.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
// Clear child.reportedTF, so that if this child is started later, it will
// be given time to connect.
cb.reportedTF = false
}

func (cb *childBalancer) startInitTimer() {
if cb.initTimer != nil {
return
}
// Need this local variable to capture timerW in the AfterFunc closure
// to check the stopped boolean.
timerW := &timerWrapper{}
cb.initTimer = timerW
timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() {
cb.parent.mu.Lock()
defer cb.parent.mu.Unlock()
if timerW.stopped {
return
}
cb.initTimer = nil
// Re-sync the priority. This will switch to the next priority if
// there's any. Note that it's important sync() is called after setting
// initTimer to nil.
cb.parent.syncPriority()
})
}

func (cb *childBalancer) stopInitTimer() {
timerW := cb.initTimer
if timerW == nil {
return
}
cb.initTimer = nil
timerW.stopped = true
timerW.timer.Stop()
}
204 changes: 48 additions & 156 deletions xds/internal/balancer/priority/balancer_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var (
DefaultPriorityInitTimeout = 10 * time.Second
)

// syncPriority handles priority after a config update. It makes sure the
// balancer state (started or not) is in sync with the priorities (even in
// tricky cases where a child is moved from a priority to another).
// syncPriority handles priority after a config update or a child balancer
// connectivity state update. It makes sure the balancer state (started or not)
// is in sync with the priorities (even in tricky cases where a child is moved
// from a priority to another).
//
// It's guaranteed that after this function returns:
// - If some child is READY, it is childInUse, and all lower priorities are
Expand All @@ -55,6 +56,9 @@ var (
// - For any of the following cases:
// - If balancer is not started (not built), this is either a new child
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indent here? (Remove tab?) Or is this just a github problem??

Copy link
Contributor Author

@menghanl menghanl May 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Seems there were tabs??? Who uses tabs???

// with high priority, or a new builder for an existing child.
// - If balancer is Connecting and has non-nil initTimer (meaning it
// transitioned from Ready or Idle to connecting, not from TF, so we
// should give it init-time to connect).
// - If balancer is READY
// - If this is the lowest priority
// - do the following:
Expand All @@ -69,9 +73,6 @@ func (b *priorityBalancer) syncPriority() {
if len(b.priorities) == 0 {
b.childInUse = ""
b.priorityInUse = 0
// Stop the init timer. This can happen if the only priority is removed
// shortly after it's added.
b.stopPriorityInitTimer()
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(ErrAllPrioritiesRemoved),
Expand All @@ -89,6 +90,7 @@ func (b *priorityBalancer) syncPriority() {
if !child.started ||
child.state.ConnectivityState == connectivity.Ready ||
child.state.ConnectivityState == connectivity.Idle ||
(child.state.ConnectivityState == connectivity.Connecting && child.initTimer != nil) ||
p == len(b.priorities)-1 {
if b.childInUse != "" && b.childInUse != child.name {
// childInUse was set and is different from this child, will
Expand Down Expand Up @@ -123,8 +125,7 @@ func (b *priorityBalancer) stopSubBalancersLowerThanPriority(p int) {
// - stop all child with lower priorities
// - if childInUse is not this child
// - set childInUse to this child
// - stops init timer
// - if this child is not started, start it, and start a init timer
// - if this child is not started, start it
//
// Note that it does NOT send the current child state (picker) to the parent
// ClientConn. The caller needs to send it if necessary.
Expand Down Expand Up @@ -156,33 +157,8 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) {
b.childInUse = child.name
b.priorityInUse = priority

// Init timer is always for childInUse. Since we are switching to a
// different child, we will stop the init timer no matter what. If this
// child is not started, we will start the init timer later.
b.stopPriorityInitTimer()

if !child.started {
child.start()
// Need this local variable to capture timerW in the AfterFunc closure
// to check the stopped boolean.
timerW := &timerWrapper{}
b.priorityInitTimer = timerW
timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() {
b.mu.Lock()
defer b.mu.Unlock()
if timerW.stopped {
return
}
b.priorityInitTimer = nil
// Switch to the next priority if there's any.
if pNext := priority + 1; pNext < len(b.priorities) {
nameNext := b.priorities[pNext]
if childNext, ok := b.children[nameNext]; ok {
b.switchToChild(childNext, pNext)
childNext.sendUpdate()
}
}
})
}
}

Expand Down Expand Up @@ -222,141 +198,57 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S
b.logger.Warningf("priority: child balancer not found for child %v, priority %v", childName, priority)
return
}
oldState := child.state.ConnectivityState
oldChildState := child.state
child.state = s

// We start/stop the init timer of this child based on the new connectivity
// state. syncPriority() later will need the init timer (to check if it's
// nil or not) to decide which child to switch to.
switch s.ConnectivityState {
case connectivity.Ready, connectivity.Idle:
// Note that idle is also handled as if it's Ready. It will close the
// lower priorities (which will be kept in a cache, not deleted), and
// new picks will use the Idle picker.
b.handlePriorityWithNewStateReady(child, priority)
child.reportedTF = false
child.stopInitTimer()
case connectivity.TransientFailure:
b.handlePriorityWithNewStateTransientFailure(child, priority)
child.reportedTF = true
child.stopInitTimer()
case connectivity.Connecting:
b.handlePriorityWithNewStateConnecting(child, priority, oldState)
if !child.reportedTF {
child.startInitTimer()
}
default:
// New state is Shutdown, should never happen. Don't forward.
}
}

// handlePriorityWithNewStateReady handles state Ready from a higher or equal
// priority.
//
// An update with state Ready:
// - If it's from higher priority:
// - Switch to this priority
// - Forward the update
// - If it's from priorityInUse:
// - Forward only
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateReady(child *childBalancer, priority int) {
// If one priority higher or equal to priorityInUse goes Ready, stop the
// init timer. If update is from higher than priorityInUse, priorityInUse
// will be closed, and the init timer will become useless.
b.stopPriorityInitTimer()

// priorityInUse is lower than this priority, switch to this.
if b.priorityInUse > priority {
b.logger.Infof("Switching priority from %v to %v, because latter became Ready", b.priorityInUse, priority)
b.switchToChild(child, priority)
}
// Forward the update since it's READY.
b.cc.UpdateState(child.state)
}

// handlePriorityWithNewStateTransientFailure handles state TransientFailure
// from a higher or equal priority.
//
// An update with state TransientFailure:
// - If it's from a higher priority:
// - Do not forward, and do nothing
// - If it's from priorityInUse:
// - If there's no lower:
// - Forward and do nothing else
// - If there's a lower priority:
// - Switch to the lower
// - Forward the lower child's state
// - Do NOT forward this update
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateTransientFailure(child *childBalancer, priority int) {
// priorityInUse is lower than this priority, do nothing.
if b.priorityInUse > priority {
return
}
// priorityInUse sends a failure. Stop its init timer.
b.stopPriorityInitTimer()
priorityNext := priority + 1
if priorityNext >= len(b.priorities) {
// Forward this update.
b.cc.UpdateState(child.state)
return
}
b.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, priorityNext)
nameNext := b.priorities[priorityNext]
childNext := b.children[nameNext]
b.switchToChild(childNext, priorityNext)
b.cc.UpdateState(childNext.state)
childNext.sendUpdate()
}

// handlePriorityWithNewStateConnecting handles state Connecting from a higher
// than or equal priority.
//
// An update with state Connecting:
// - If it's from a higher priority
// - Do nothing
// - If it's from priorityInUse, the behavior depends on previous state.
//
// When new state is Connecting, the behavior depends on previous state. If the
// previous state was Ready, this is a transition out from Ready to Connecting.
// Assuming there are multiple backends in the same priority, this mean we are
// in a bad situation and we should failover to the next priority (Side note:
// the current connectivity state aggregating algorithm (e.g. round-robin) is
// not handling this right, because if many backends all go from Ready to
// Connecting, the overall situation is more like TransientFailure, not
// Connecting).
//
// If the previous state was Idle, we don't do anything special with failure,
// and simply forward the update. The init timer should be in process, will
// handle failover if it timeouts. If the previous state was TransientFailure,
// we do not forward, because the lower priority is in use.
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateConnecting(child *childBalancer, priority int, oldState connectivity.State) {
// priorityInUse is lower than this priority, do nothing.
if b.priorityInUse > priority {
return
}

switch oldState {
case connectivity.Ready:
// Handling transition from Ready to Connecting, is same as handling
// TransientFailure. There's no need to stop the init timer, because it
// should have been stopped when state turned Ready.
priorityNext := priority + 1
if priorityNext >= len(b.priorities) {
// Forward this update.
b.cc.UpdateState(child.state)
oldPriorityInUse := b.priorityInUse
child.parent.syncPriority()
// If child is switched by syncPriority(), it also sends the update from the
// new child to overwrite the old picker used by the parent.
//
// But no update is sent if the child is not switches. That means if this
// update is from childInUse, and this child is still childInUse after
// syncing, the update being handled here is not sent to the parent. In that
// case, we need to do an explicit check here to forward the update.
if b.priorityInUse == oldPriorityInUse && b.priorityInUse == priority {
// Special handling for Connecting. If child was not switched, and this
// is a Connecting->Connecting transition, do not send the redundant
// update, since all Connecting pickers are the same (they tell the RPCs
// to repick).
//
// This can happen because the initial state of a child (before any
// update is received) is Connecting. When the child is started, it's
// picker is sent to the parent by syncPriority (to overwrite the old
// picker if there's any). When it reports Connecting after being
// started, it will send a Connecting update (handled here), causing a
// Connecting->Connecting transition.
if oldChildState.ConnectivityState == connectivity.Connecting && s.ConnectivityState == connectivity.Connecting {
return
}
b.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, priorityNext)
nameNext := b.priorities[priorityNext]
childNext := b.children[nameNext]
b.switchToChild(childNext, priorityNext)
b.cc.UpdateState(childNext.state)
childNext.sendUpdate()
case connectivity.Idle:
// Only forward this update if sync() didn't switch child, and this
// child is in use.
//
// sync() forwards the update if the child was switched, so there's no
// need to forward again.
b.cc.UpdateState(child.state)
default:
// Old state is Connecting, TransientFailure or Shutdown. Don't forward.
}

}
Loading