From cfa2deaff3a88e0db7a25860148def1dd0a035b5 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 28 Apr 2022 13:52:11 -0700 Subject: [PATCH] [priority_connectivity_fix] xds/priority: start the init timer when a child switch to Connecting from non-transient-failure state See https://github.com/grpc/proposal/pull/296 for context. After this change, priority will restart the failover timer when a child reports Connecting, if that child hasn't reported TF more recently than it reported Ready or Idle. Also changed the priority policy to always call the centralized function `syncPriority` to handle child switching. --- .../balancer/clusterresolver/eds_impl_test.go | 1 + xds/internal/balancer/priority/balancer.go | 26 +-- .../balancer/priority/balancer_child.go | 50 ++++- .../balancer/priority/balancer_priority.go | 187 +++--------------- .../balancer/priority/balancer_test.go | 48 ++++- 5 files changed, 130 insertions(+), 182 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/eds_impl_test.go b/xds/internal/balancer/clusterresolver/eds_impl_test.go index 7f2bfa8a75d1..12928e795b9c 100644 --- a/xds/internal/balancer/clusterresolver/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/eds_impl_test.go @@ -478,6 +478,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Picks with drops. diff --git a/xds/internal/balancer/priority/balancer.go b/xds/internal/balancer/priority/balancer.go index 98fd0672af42..d82bce761751 100644 --- a/xds/internal/balancer/priority/balancer.go +++ b/xds/internal/balancer/priority/balancer.go @@ -100,12 +100,6 @@ type priorityBalancer struct { childToPriority map[string]int // children is a map from child name to sub-balancers. children map[string]*childBalancer - // The timer to give a priority some time to connect. And if the priority - // doesn't go into Ready/Failure, the next priority will be started. - // - // One timer is enough because there can be at most one priority in init - // state. - priorityInitTimer *timerWrapper } func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error { @@ -198,27 +192,17 @@ func (b *priorityBalancer) Close() { // Clear states of the current child in use, so if there's a race in picker // update, it will be dropped. b.childInUse = "" - b.stopPriorityInitTimer() + // Stop the child policies, this is necessary to stop the init timers in the + // children. + for _, child := range b.children { + child.stop() + } } func (b *priorityBalancer) ExitIdle() { b.bg.ExitIdle() } -// stopPriorityInitTimer stops the priorityInitTimer if it's not nil, and set it -// to nil. -// -// Caller must hold b.mu. -func (b *priorityBalancer) stopPriorityInitTimer() { - timerW := b.priorityInitTimer - if timerW == nil { - return - } - b.priorityInitTimer = nil - timerW.stopped = true - timerW.timer.Stop() -} - // UpdateState implements balancergroup.BalancerStateAggregator interface. The // balancer group sends new connectivity state and picker here. func (b *priorityBalancer) UpdateState(childName string, state balancer.State) { diff --git a/xds/internal/balancer/priority/balancer_child.go b/xds/internal/balancer/priority/balancer_child.go index 600705da01af..95bb34f26252 100644 --- a/xds/internal/balancer/priority/balancer_child.go +++ b/xds/internal/balancer/priority/balancer_child.go @@ -19,6 +19,8 @@ package priority import ( + "time" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" @@ -36,7 +38,16 @@ type childBalancer struct { rState resolver.State started bool - state balancer.State + // This is set when the child reports TransientFailure, and unset when it + // reports Ready or Idle. It is used to decide whether the failover timer + // should start when the child is transitioning into Connecting. The timer + // will be restarted if the child has not reported TF more recently than it + // reported Ready or Idle. + reportedTF bool + state balancer.State + // The timer to give a priority some time to connect. And if the priority + // doesn't go into Ready/Failure, the next priority will be started. + initTimer *timerWrapper } // newChildBalancer creates a child balancer place holder, but doesn't @@ -79,6 +90,7 @@ func (cb *childBalancer) start() { } cb.started = true cb.parent.bg.Add(cb.name, cb.bb) + cb.startInitTimer() } // sendUpdate sends the addresses and config to the child balancer. @@ -103,10 +115,46 @@ func (cb *childBalancer) stop() { if !cb.started { return } + cb.stopInitTimer() cb.parent.bg.Remove(cb.name) cb.started = false cb.state = balancer.State{ ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), } + // Clear child.reportedTF, so that if this child is started later, it will + // be given time to connect. + cb.reportedTF = false +} + +func (cb *childBalancer) startInitTimer() { + if cb.initTimer != nil { + return + } + // Need this local variable to capture timerW in the AfterFunc closure + // to check the stopped boolean. + timerW := &timerWrapper{} + cb.initTimer = timerW + timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() { + cb.parent.mu.Lock() + defer cb.parent.mu.Unlock() + if timerW.stopped { + return + } + cb.initTimer = nil + // Re-sync the priority. This will switch to the next priority if + // there's any. Note that it's important sync() is called after setting + // initTimer to nil. + cb.parent.syncPriority() + }) +} + +func (cb *childBalancer) stopInitTimer() { + timerW := cb.initTimer + if timerW == nil { + return + } + cb.initTimer = nil + timerW.stopped = true + timerW.timer.Stop() } diff --git a/xds/internal/balancer/priority/balancer_priority.go b/xds/internal/balancer/priority/balancer_priority.go index 3a18f6e10d83..379de362d2ab 100644 --- a/xds/internal/balancer/priority/balancer_priority.go +++ b/xds/internal/balancer/priority/balancer_priority.go @@ -20,6 +20,7 @@ package priority import ( "errors" + "fmt" "time" "google.golang.org/grpc/balancer" @@ -36,9 +37,10 @@ var ( DefaultPriorityInitTimeout = 10 * time.Second ) -// syncPriority handles priority after a config update. It makes sure the -// balancer state (started or not) is in sync with the priorities (even in -// tricky cases where a child is moved from a priority to another). +// syncPriority handles priority after a config update or a child balancer +// connectivity state update. It makes sure the balancer state (started or not) +// is in sync with the priorities (even in tricky cases where a child is moved +// from a priority to another). // // It's guaranteed that after this function returns: // - If some child is READY, it is childInUse, and all lower priorities are @@ -55,6 +57,9 @@ var ( // - For any of the following cases: // - If balancer is not started (not built), this is either a new child // with high priority, or a new builder for an existing child. +// - If balancer is Connecting and has non-nil initTimer (meaning it +// transitioned from Ready or Idle to connecting, not from TF, so we +// should give it init-time to connect). // - If balancer is READY // - If this is the lowest priority // - do the following: @@ -69,9 +74,6 @@ func (b *priorityBalancer) syncPriority() { if len(b.priorities) == 0 { b.childInUse = "" b.priorityInUse = 0 - // Stop the init timer. This can happen if the only priority is removed - // shortly after it's added. - b.stopPriorityInitTimer() b.cc.UpdateState(balancer.State{ ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(ErrAllPrioritiesRemoved), @@ -89,6 +91,7 @@ func (b *priorityBalancer) syncPriority() { if !child.started || child.state.ConnectivityState == connectivity.Ready || child.state.ConnectivityState == connectivity.Idle || + (child.state.ConnectivityState == connectivity.Connecting && child.initTimer != nil) || p == len(b.priorities)-1 { if b.childInUse != "" && b.childInUse != child.name { // childInUse was set and is different from this child, will @@ -123,8 +126,7 @@ func (b *priorityBalancer) stopSubBalancersLowerThanPriority(p int) { // - stop all child with lower priorities // - if childInUse is not this child // - set childInUse to this child -// - stops init timer -// - if this child is not started, start it, and start a init timer +// - if this child is not started, start it // // Note that it does NOT send the current child state (picker) to the parent // ClientConn. The caller needs to send it if necessary. @@ -156,33 +158,8 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) { b.childInUse = child.name b.priorityInUse = priority - // Init timer is always for childInUse. Since we are switching to a - // different child, we will stop the init timer no matter what. If this - // child is not started, we will start the init timer later. - b.stopPriorityInitTimer() - if !child.started { child.start() - // Need this local variable to capture timerW in the AfterFunc closure - // to check the stopped boolean. - timerW := &timerWrapper{} - b.priorityInitTimer = timerW - timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() { - b.mu.Lock() - defer b.mu.Unlock() - if timerW.stopped { - return - } - b.priorityInitTimer = nil - // Switch to the next priority if there's any. - if pNext := priority + 1; pNext < len(b.priorities) { - nameNext := b.priorities[pNext] - if childNext, ok := b.children[nameNext]; ok { - b.switchToChild(childNext, pNext) - childNext.sendUpdate() - } - } - }) } } @@ -222,141 +199,37 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S b.logger.Warningf("priority: child balancer not found for child %v, priority %v", childName, priority) return } - oldState := child.state.ConnectivityState + // oldState := child.state.ConnectivityState child.state = s + // We start/stop the init timer of this child based on the new connectivity + // state. syncPriority() later will need the init timer (to check if it's + // nil or not) to decide which child to switch to. switch s.ConnectivityState { case connectivity.Ready, connectivity.Idle: - // Note that idle is also handled as if it's Ready. It will close the - // lower priorities (which will be kept in a cache, not deleted), and - // new picks will use the Idle picker. - b.handlePriorityWithNewStateReady(child, priority) + child.reportedTF = false + child.stopInitTimer() case connectivity.TransientFailure: - b.handlePriorityWithNewStateTransientFailure(child, priority) + child.reportedTF = true + child.stopInitTimer() case connectivity.Connecting: - b.handlePriorityWithNewStateConnecting(child, priority, oldState) + if !child.reportedTF { + child.startInitTimer() + } default: // New state is Shutdown, should never happen. Don't forward. } -} - -// handlePriorityWithNewStateReady handles state Ready from a higher or equal -// priority. -// -// An update with state Ready: -// - If it's from higher priority: -// - Switch to this priority -// - Forward the update -// - If it's from priorityInUse: -// - Forward only -// -// Caller must make sure priorityInUse is not higher than priority. -// -// Caller must hold mu. -func (b *priorityBalancer) handlePriorityWithNewStateReady(child *childBalancer, priority int) { - // If one priority higher or equal to priorityInUse goes Ready, stop the - // init timer. If update is from higher than priorityInUse, priorityInUse - // will be closed, and the init timer will become useless. - b.stopPriorityInitTimer() - - // priorityInUse is lower than this priority, switch to this. - if b.priorityInUse > priority { - b.logger.Infof("Switching priority from %v to %v, because latter became Ready", b.priorityInUse, priority) - b.switchToChild(child, priority) - } - // Forward the update since it's READY. - b.cc.UpdateState(child.state) -} -// handlePriorityWithNewStateTransientFailure handles state TransientFailure -// from a higher or equal priority. -// -// An update with state TransientFailure: -// - If it's from a higher priority: -// - Do not forward, and do nothing -// - If it's from priorityInUse: -// - If there's no lower: -// - Forward and do nothing else -// - If there's a lower priority: -// - Switch to the lower -// - Forward the lower child's state -// - Do NOT forward this update -// -// Caller must make sure priorityInUse is not higher than priority. -// -// Caller must hold mu. -func (b *priorityBalancer) handlePriorityWithNewStateTransientFailure(child *childBalancer, priority int) { - // priorityInUse is lower than this priority, do nothing. - if b.priorityInUse > priority { - return - } - // priorityInUse sends a failure. Stop its init timer. - b.stopPriorityInitTimer() - priorityNext := priority + 1 - if priorityNext >= len(b.priorities) { - // Forward this update. + oldPriorityInUse := b.priorityInUse + child.parent.syncPriority() + if b.priorityInUse == oldPriorityInUse && b.priorityInUse == priority { + // Only forward this update if sync() didn't switch child, and this + // child is in use. + // + // sync() forwards the update if the child was switched, so there's no + // need to forward again. + fmt.Println(" ------ replace picker after sync()", b.priorityInUse, oldPriorityInUse, child.state.ConnectivityState) b.cc.UpdateState(child.state) - return - } - b.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, priorityNext) - nameNext := b.priorities[priorityNext] - childNext := b.children[nameNext] - b.switchToChild(childNext, priorityNext) - b.cc.UpdateState(childNext.state) - childNext.sendUpdate() -} - -// handlePriorityWithNewStateConnecting handles state Connecting from a higher -// than or equal priority. -// -// An update with state Connecting: -// - If it's from a higher priority -// - Do nothing -// - If it's from priorityInUse, the behavior depends on previous state. -// -// When new state is Connecting, the behavior depends on previous state. If the -// previous state was Ready, this is a transition out from Ready to Connecting. -// Assuming there are multiple backends in the same priority, this mean we are -// in a bad situation and we should failover to the next priority (Side note: -// the current connectivity state aggregating algorithm (e.g. round-robin) is -// not handling this right, because if many backends all go from Ready to -// Connecting, the overall situation is more like TransientFailure, not -// Connecting). -// -// If the previous state was Idle, we don't do anything special with failure, -// and simply forward the update. The init timer should be in process, will -// handle failover if it timeouts. If the previous state was TransientFailure, -// we do not forward, because the lower priority is in use. -// -// Caller must make sure priorityInUse is not higher than priority. -// -// Caller must hold mu. -func (b *priorityBalancer) handlePriorityWithNewStateConnecting(child *childBalancer, priority int, oldState connectivity.State) { - // priorityInUse is lower than this priority, do nothing. - if b.priorityInUse > priority { - return } - switch oldState { - case connectivity.Ready: - // Handling transition from Ready to Connecting, is same as handling - // TransientFailure. There's no need to stop the init timer, because it - // should have been stopped when state turned Ready. - priorityNext := priority + 1 - if priorityNext >= len(b.priorities) { - // Forward this update. - b.cc.UpdateState(child.state) - return - } - b.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, priorityNext) - nameNext := b.priorities[priorityNext] - childNext := b.children[nameNext] - b.switchToChild(childNext, priorityNext) - b.cc.UpdateState(childNext.state) - childNext.sendUpdate() - case connectivity.Idle: - b.cc.UpdateState(child.state) - default: - // Old state is Connecting, TransientFailure or Shutdown. Don't forward. - } } diff --git a/xds/internal/balancer/priority/balancer_test.go b/xds/internal/balancer/priority/balancer_test.go index e8963898727c..77a57bd66bdf 100644 --- a/xds/internal/balancer/priority/balancer_test.go +++ b/xds/internal/balancer/priority/balancer_test.go @@ -115,6 +115,7 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { // p0 is ready. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -222,6 +223,7 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { // p0 is ready. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -250,6 +252,7 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { } sc1 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. @@ -310,6 +313,7 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { } sc2 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. @@ -405,6 +409,7 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { // p0 is ready. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -414,9 +419,8 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { t.Fatalf("want %v, got %v", want, err) } - // Turn 0 to Connecting, will start and use 1. Because 0 changing from Ready - // to Connecting is a failure. - pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + // Turn 0 to TransientFailure, will start and use 1. + pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. @@ -434,6 +438,7 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { } sc1 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. @@ -561,6 +566,7 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { } sc2 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. @@ -645,6 +651,7 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { } sc2 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. @@ -741,7 +748,11 @@ func (s) TestPriority_InitTimeout(t *testing.T) { } sc1 := <-cc.NewSubConnCh + // After the init timer of p0, when switching to p1, a connecting picker + // will be sent to the parent. Clear it here. + <-cc.NewPickerCh pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. @@ -795,6 +806,7 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { } sc0 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -859,6 +871,7 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // Don't send any update to p0, so to not override the old state of p0. // Later, connect to p1 and then remove p1. This will fallback to p0, and // will send p0's old picker if they are not correctly removed. + <-cc.NewPickerCh // Clear the picker from old p0. // p1 will be used after priority init timeout. addrs11 := <-cc.NewSubConnAddrsCh @@ -867,6 +880,7 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { } sc11 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc11, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc11, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. @@ -910,6 +924,7 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // Send an ready update for the p0 sc that was received when re-adding // priorities. pb.UpdateSubConnState(sc01, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc01, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -965,6 +980,7 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { // p0 is ready. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -992,6 +1008,8 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } + fmt.Print("\n\n\n\n\n") + // p0 will remove the subconn, and ClientConn will send a sc update to // shutdown. scToRemove := <-cc.RemoveSubConnCh @@ -1014,6 +1032,7 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { // p1 is ready. pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. @@ -1107,6 +1126,7 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { // p0 is ready. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -1161,6 +1181,7 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { // New p0 child is ready. pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only new subconns. @@ -1223,6 +1244,7 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { } sc1 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. @@ -1323,6 +1345,7 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { } sc1 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. @@ -1414,6 +1437,7 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { // p0 is ready. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -1517,6 +1541,7 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { // p0 is ready. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. @@ -1557,6 +1582,7 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { } sc2 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + <-cc.NewPickerCh // Clear the Connecting picker from the channel. pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pickfirst with the new subconns. @@ -1851,6 +1877,22 @@ func (s) TestPriority_HighPriorityInitIdle(t *testing.T) { t.Fatalf("pick returned %v, %v, want _, %v", pr, err, errsTestInitIdle[0]) } + // The ClientConn state update triggers a priority switch, from p0 -> p0 + // (since p0 is still in use). Along with this the update, p0 also gets a + // ClientConn state update, with the addresses, which didn't change in this + // test (this update to the child is necessary in case the addresses are + // different). + // + // The test child policy, initIdleBalancer, blindly calls NewSubConn with + // all the addresses it receives, so this will trigger a NewSubConn with the + // old p0 addresses. (Note that in a real balancer, like roundrobin, no new + // SubConn will be created because the addresses didn't change). + // + // Clear those from the channel so the rest of the test can get the expected + // behavior. + <-cc.NewSubConnAddrsCh + <-cc.NewSubConnCh + // Turn p0 down, to start p1. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs