Skip to content

Commit

Permalink
[priority_connectivity_fix] xds/priority: start the init timer when a…
Browse files Browse the repository at this point in the history
… child switch to Connecting from non-transient-failure state

See grpc/proposal#296 for context.

After this change, priority will restart the failover timer when a child
reports Connecting, if that child hasn't reported TF more recently than
it reported Ready or Idle.

Also changed the priority policy to always call the centralized function
`syncPriority` to handle child switching.
  • Loading branch information
menghanl committed May 2, 2022
1 parent dc86d5d commit cfa2dea
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 182 deletions.
1 change: 1 addition & 0 deletions xds/internal/balancer/clusterresolver/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh // Clear the Connecting picker from the channel.
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})

// Picks with drops.
Expand Down
26 changes: 5 additions & 21 deletions xds/internal/balancer/priority/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ type priorityBalancer struct {
childToPriority map[string]int
// children is a map from child name to sub-balancers.
children map[string]*childBalancer
// The timer to give a priority some time to connect. And if the priority
// doesn't go into Ready/Failure, the next priority will be started.
//
// One timer is enough because there can be at most one priority in init
// state.
priorityInitTimer *timerWrapper
}

func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
Expand Down Expand Up @@ -198,27 +192,17 @@ func (b *priorityBalancer) Close() {
// Clear states of the current child in use, so if there's a race in picker
// update, it will be dropped.
b.childInUse = ""
b.stopPriorityInitTimer()
// Stop the child policies, this is necessary to stop the init timers in the
// children.
for _, child := range b.children {
child.stop()
}
}

func (b *priorityBalancer) ExitIdle() {
b.bg.ExitIdle()
}

// stopPriorityInitTimer stops the priorityInitTimer if it's not nil, and set it
// to nil.
//
// Caller must hold b.mu.
func (b *priorityBalancer) stopPriorityInitTimer() {
timerW := b.priorityInitTimer
if timerW == nil {
return
}
b.priorityInitTimer = nil
timerW.stopped = true
timerW.timer.Stop()
}

// UpdateState implements balancergroup.BalancerStateAggregator interface. The
// balancer group sends new connectivity state and picker here.
func (b *priorityBalancer) UpdateState(childName string, state balancer.State) {
Expand Down
50 changes: 49 additions & 1 deletion xds/internal/balancer/priority/balancer_child.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package priority

import (
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
Expand All @@ -36,7 +38,16 @@ type childBalancer struct {
rState resolver.State

started bool
state balancer.State
// This is set when the child reports TransientFailure, and unset when it
// reports Ready or Idle. It is used to decide whether the failover timer
// should start when the child is transitioning into Connecting. The timer
// will be restarted if the child has not reported TF more recently than it
// reported Ready or Idle.
reportedTF bool
state balancer.State
// The timer to give a priority some time to connect. And if the priority
// doesn't go into Ready/Failure, the next priority will be started.
initTimer *timerWrapper
}

// newChildBalancer creates a child balancer place holder, but doesn't
Expand Down Expand Up @@ -79,6 +90,7 @@ func (cb *childBalancer) start() {
}
cb.started = true
cb.parent.bg.Add(cb.name, cb.bb)
cb.startInitTimer()
}

// sendUpdate sends the addresses and config to the child balancer.
Expand All @@ -103,10 +115,46 @@ func (cb *childBalancer) stop() {
if !cb.started {
return
}
cb.stopInitTimer()
cb.parent.bg.Remove(cb.name)
cb.started = false
cb.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
// Clear child.reportedTF, so that if this child is started later, it will
// be given time to connect.
cb.reportedTF = false
}

func (cb *childBalancer) startInitTimer() {
if cb.initTimer != nil {
return
}
// Need this local variable to capture timerW in the AfterFunc closure
// to check the stopped boolean.
timerW := &timerWrapper{}
cb.initTimer = timerW
timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() {
cb.parent.mu.Lock()
defer cb.parent.mu.Unlock()
if timerW.stopped {
return
}
cb.initTimer = nil
// Re-sync the priority. This will switch to the next priority if
// there's any. Note that it's important sync() is called after setting
// initTimer to nil.
cb.parent.syncPriority()
})
}

func (cb *childBalancer) stopInitTimer() {
timerW := cb.initTimer
if timerW == nil {
return
}
cb.initTimer = nil
timerW.stopped = true
timerW.timer.Stop()
}
187 changes: 30 additions & 157 deletions xds/internal/balancer/priority/balancer_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package priority

import (
"errors"
"fmt"
"time"

"google.golang.org/grpc/balancer"
Expand All @@ -36,9 +37,10 @@ var (
DefaultPriorityInitTimeout = 10 * time.Second
)

// syncPriority handles priority after a config update. It makes sure the
// balancer state (started or not) is in sync with the priorities (even in
// tricky cases where a child is moved from a priority to another).
// syncPriority handles priority after a config update or a child balancer
// connectivity state update. It makes sure the balancer state (started or not)
// is in sync with the priorities (even in tricky cases where a child is moved
// from a priority to another).
//
// It's guaranteed that after this function returns:
// - If some child is READY, it is childInUse, and all lower priorities are
Expand All @@ -55,6 +57,9 @@ var (
// - For any of the following cases:
// - If balancer is not started (not built), this is either a new child
// with high priority, or a new builder for an existing child.
// - If balancer is Connecting and has non-nil initTimer (meaning it
// transitioned from Ready or Idle to connecting, not from TF, so we
// should give it init-time to connect).
// - If balancer is READY
// - If this is the lowest priority
// - do the following:
Expand All @@ -69,9 +74,6 @@ func (b *priorityBalancer) syncPriority() {
if len(b.priorities) == 0 {
b.childInUse = ""
b.priorityInUse = 0
// Stop the init timer. This can happen if the only priority is removed
// shortly after it's added.
b.stopPriorityInitTimer()
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(ErrAllPrioritiesRemoved),
Expand All @@ -89,6 +91,7 @@ func (b *priorityBalancer) syncPriority() {
if !child.started ||
child.state.ConnectivityState == connectivity.Ready ||
child.state.ConnectivityState == connectivity.Idle ||
(child.state.ConnectivityState == connectivity.Connecting && child.initTimer != nil) ||
p == len(b.priorities)-1 {
if b.childInUse != "" && b.childInUse != child.name {
// childInUse was set and is different from this child, will
Expand Down Expand Up @@ -123,8 +126,7 @@ func (b *priorityBalancer) stopSubBalancersLowerThanPriority(p int) {
// - stop all child with lower priorities
// - if childInUse is not this child
// - set childInUse to this child
// - stops init timer
// - if this child is not started, start it, and start a init timer
// - if this child is not started, start it
//
// Note that it does NOT send the current child state (picker) to the parent
// ClientConn. The caller needs to send it if necessary.
Expand Down Expand Up @@ -156,33 +158,8 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) {
b.childInUse = child.name
b.priorityInUse = priority

// Init timer is always for childInUse. Since we are switching to a
// different child, we will stop the init timer no matter what. If this
// child is not started, we will start the init timer later.
b.stopPriorityInitTimer()

if !child.started {
child.start()
// Need this local variable to capture timerW in the AfterFunc closure
// to check the stopped boolean.
timerW := &timerWrapper{}
b.priorityInitTimer = timerW
timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() {
b.mu.Lock()
defer b.mu.Unlock()
if timerW.stopped {
return
}
b.priorityInitTimer = nil
// Switch to the next priority if there's any.
if pNext := priority + 1; pNext < len(b.priorities) {
nameNext := b.priorities[pNext]
if childNext, ok := b.children[nameNext]; ok {
b.switchToChild(childNext, pNext)
childNext.sendUpdate()
}
}
})
}
}

Expand Down Expand Up @@ -222,141 +199,37 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S
b.logger.Warningf("priority: child balancer not found for child %v, priority %v", childName, priority)
return
}
oldState := child.state.ConnectivityState
// oldState := child.state.ConnectivityState
child.state = s

// We start/stop the init timer of this child based on the new connectivity
// state. syncPriority() later will need the init timer (to check if it's
// nil or not) to decide which child to switch to.
switch s.ConnectivityState {
case connectivity.Ready, connectivity.Idle:
// Note that idle is also handled as if it's Ready. It will close the
// lower priorities (which will be kept in a cache, not deleted), and
// new picks will use the Idle picker.
b.handlePriorityWithNewStateReady(child, priority)
child.reportedTF = false
child.stopInitTimer()
case connectivity.TransientFailure:
b.handlePriorityWithNewStateTransientFailure(child, priority)
child.reportedTF = true
child.stopInitTimer()
case connectivity.Connecting:
b.handlePriorityWithNewStateConnecting(child, priority, oldState)
if !child.reportedTF {
child.startInitTimer()
}
default:
// New state is Shutdown, should never happen. Don't forward.
}
}

// handlePriorityWithNewStateReady handles state Ready from a higher or equal
// priority.
//
// An update with state Ready:
// - If it's from higher priority:
// - Switch to this priority
// - Forward the update
// - If it's from priorityInUse:
// - Forward only
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateReady(child *childBalancer, priority int) {
// If one priority higher or equal to priorityInUse goes Ready, stop the
// init timer. If update is from higher than priorityInUse, priorityInUse
// will be closed, and the init timer will become useless.
b.stopPriorityInitTimer()

// priorityInUse is lower than this priority, switch to this.
if b.priorityInUse > priority {
b.logger.Infof("Switching priority from %v to %v, because latter became Ready", b.priorityInUse, priority)
b.switchToChild(child, priority)
}
// Forward the update since it's READY.
b.cc.UpdateState(child.state)
}

// handlePriorityWithNewStateTransientFailure handles state TransientFailure
// from a higher or equal priority.
//
// An update with state TransientFailure:
// - If it's from a higher priority:
// - Do not forward, and do nothing
// - If it's from priorityInUse:
// - If there's no lower:
// - Forward and do nothing else
// - If there's a lower priority:
// - Switch to the lower
// - Forward the lower child's state
// - Do NOT forward this update
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateTransientFailure(child *childBalancer, priority int) {
// priorityInUse is lower than this priority, do nothing.
if b.priorityInUse > priority {
return
}
// priorityInUse sends a failure. Stop its init timer.
b.stopPriorityInitTimer()
priorityNext := priority + 1
if priorityNext >= len(b.priorities) {
// Forward this update.
oldPriorityInUse := b.priorityInUse
child.parent.syncPriority()
if b.priorityInUse == oldPriorityInUse && b.priorityInUse == priority {
// Only forward this update if sync() didn't switch child, and this
// child is in use.
//
// sync() forwards the update if the child was switched, so there's no
// need to forward again.
fmt.Println(" ------ replace picker after sync()", b.priorityInUse, oldPriorityInUse, child.state.ConnectivityState)
b.cc.UpdateState(child.state)
return
}
b.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, priorityNext)
nameNext := b.priorities[priorityNext]
childNext := b.children[nameNext]
b.switchToChild(childNext, priorityNext)
b.cc.UpdateState(childNext.state)
childNext.sendUpdate()
}

// handlePriorityWithNewStateConnecting handles state Connecting from a higher
// than or equal priority.
//
// An update with state Connecting:
// - If it's from a higher priority
// - Do nothing
// - If it's from priorityInUse, the behavior depends on previous state.
//
// When new state is Connecting, the behavior depends on previous state. If the
// previous state was Ready, this is a transition out from Ready to Connecting.
// Assuming there are multiple backends in the same priority, this mean we are
// in a bad situation and we should failover to the next priority (Side note:
// the current connectivity state aggregating algorithm (e.g. round-robin) is
// not handling this right, because if many backends all go from Ready to
// Connecting, the overall situation is more like TransientFailure, not
// Connecting).
//
// If the previous state was Idle, we don't do anything special with failure,
// and simply forward the update. The init timer should be in process, will
// handle failover if it timeouts. If the previous state was TransientFailure,
// we do not forward, because the lower priority is in use.
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateConnecting(child *childBalancer, priority int, oldState connectivity.State) {
// priorityInUse is lower than this priority, do nothing.
if b.priorityInUse > priority {
return
}

switch oldState {
case connectivity.Ready:
// Handling transition from Ready to Connecting, is same as handling
// TransientFailure. There's no need to stop the init timer, because it
// should have been stopped when state turned Ready.
priorityNext := priority + 1
if priorityNext >= len(b.priorities) {
// Forward this update.
b.cc.UpdateState(child.state)
return
}
b.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, priorityNext)
nameNext := b.priorities[priorityNext]
childNext := b.children[nameNext]
b.switchToChild(childNext, priorityNext)
b.cc.UpdateState(childNext.state)
childNext.sendUpdate()
case connectivity.Idle:
b.cc.UpdateState(child.state)
default:
// Old state is Connecting, TransientFailure or Shutdown. Don't forward.
}
}
Loading

0 comments on commit cfa2dea

Please sign in to comment.