Skip to content

Commit

Permalink
[priority_connectivity_fix] xds/priority: start the init timer when a…
Browse files Browse the repository at this point in the history
… child switch to Connecting from non-transient-failure state

See grpc/proposal#296 for context.

After this change, priority will restart the failover timer when a child
reports Connecting, if that child hasn't reported TF more recently than
it reported Ready or Idle.

Also changed the priority policy to always call the centralized function
`syncPriority` to handle child switching.
  • Loading branch information
menghanl committed May 2, 2022
1 parent dc86d5d commit 71a7cf7
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 182 deletions.
1 change: 1 addition & 0 deletions xds/internal/balancer/clusterresolver/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh // Clear the Connecting picker from the channel.
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})

// Picks with drops.
Expand Down
26 changes: 5 additions & 21 deletions xds/internal/balancer/priority/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ type priorityBalancer struct {
childToPriority map[string]int
// children is a map from child name to sub-balancers.
children map[string]*childBalancer
// The timer to give a priority some time to connect. And if the priority
// doesn't go into Ready/Failure, the next priority will be started.
//
// One timer is enough because there can be at most one priority in init
// state.
priorityInitTimer *timerWrapper
}

func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
Expand Down Expand Up @@ -198,27 +192,17 @@ func (b *priorityBalancer) Close() {
// Clear states of the current child in use, so if there's a race in picker
// update, it will be dropped.
b.childInUse = ""
b.stopPriorityInitTimer()
// Stop the child policies, this is necessary to stop the init timers in the
// children.
for _, child := range b.children {
child.stop()
}
}

func (b *priorityBalancer) ExitIdle() {
b.bg.ExitIdle()
}

// stopPriorityInitTimer stops the priorityInitTimer if it's not nil, and set it
// to nil.
//
// Caller must hold b.mu.
func (b *priorityBalancer) stopPriorityInitTimer() {
timerW := b.priorityInitTimer
if timerW == nil {
return
}
b.priorityInitTimer = nil
timerW.stopped = true
timerW.timer.Stop()
}

// UpdateState implements balancergroup.BalancerStateAggregator interface. The
// balancer group sends new connectivity state and picker here.
func (b *priorityBalancer) UpdateState(childName string, state balancer.State) {
Expand Down
50 changes: 49 additions & 1 deletion xds/internal/balancer/priority/balancer_child.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package priority

import (
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
Expand All @@ -36,7 +38,16 @@ type childBalancer struct {
rState resolver.State

started bool
state balancer.State
// This is set when the child reports TransientFailure, and unset when it
// reports Ready or Idle. It is used to decide whether the failover timer
// should start when the child is transitioning into Connecting. The timer
// will be restarted if the child has not reported TF more recently than it
// reported Ready or Idle.
reportedTF bool
state balancer.State
// The timer to give a priority some time to connect. And if the priority
// doesn't go into Ready/Failure, the next priority will be started.
initTimer *timerWrapper
}

// newChildBalancer creates a child balancer place holder, but doesn't
Expand Down Expand Up @@ -79,6 +90,7 @@ func (cb *childBalancer) start() {
}
cb.started = true
cb.parent.bg.Add(cb.name, cb.bb)
cb.startInitTimer()
}

// sendUpdate sends the addresses and config to the child balancer.
Expand All @@ -103,10 +115,46 @@ func (cb *childBalancer) stop() {
if !cb.started {
return
}
cb.stopInitTimer()
cb.parent.bg.Remove(cb.name)
cb.started = false
cb.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
// Clear child.reportedTF, so that if this child is started later, it will
// be given time to connect.
cb.reportedTF = false
}

func (cb *childBalancer) startInitTimer() {
if cb.initTimer != nil {
return
}
// Need this local variable to capture timerW in the AfterFunc closure
// to check the stopped boolean.
timerW := &timerWrapper{}
cb.initTimer = timerW
timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() {
cb.parent.mu.Lock()
defer cb.parent.mu.Unlock()
if timerW.stopped {
return
}
cb.initTimer = nil
// Re-sync the priority. This will switch to the next priority if
// there's any. Note that it's important sync() is called after setting
// initTimer to nil.
cb.parent.syncPriority()
})
}

func (cb *childBalancer) stopInitTimer() {
timerW := cb.initTimer
if timerW == nil {
return
}
cb.initTimer = nil
timerW.stopped = true
timerW.timer.Stop()
}
Loading

0 comments on commit 71a7cf7

Please sign in to comment.