Skip to content

Commit

Permalink
core:Have acceptResolvedAddresses() do a seek when in CONNECTING stat…
Browse files Browse the repository at this point in the history
…e and cleanup removed subchannels when a seek was successful (#11849)

* Have acceptResolvedAddresses() do a seek when in CONNECTING state and cleanup removed subchannels when a seek was successful.
Move cleanup of removed subchannels into a method so it can be called from 2 places in acceptResolvedAddresses.
Since the seek could mean we never looked at the first address, if we go off the end of the index and haven't looked at the all of the addresses then instead of scheduleBackoff() we reset the index and request a connection.
  • Loading branch information
larry-safran authored Jan 25, 2025
1 parent 67351c0 commit 87aa6de
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 25 deletions.
59 changes: 40 additions & 19 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,38 +137,27 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();

if (rawConnectivityState == READY) {
// If the previous ready subchannel exists in new address list,
if (rawConnectivityState == READY || rawConnectivityState == CONNECTING) {
// If the previous ready (or connecting) subchannel exists in new address list,
// keep this connection and don't create new subchannels
SocketAddress previousAddress = addressIndex.getCurrentAddress();
addressIndex.updateGroups(newImmutableAddressGroups);
if (addressIndex.seekTo(previousAddress)) {
SubchannelData subchannelData = subchannels.get(previousAddress);
subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
shutdownRemovedAddresses(newImmutableAddressGroups);
return Status.OK;
}
// Previous ready subchannel not in the new list of addresses
} else {
addressIndex.updateGroups(newImmutableAddressGroups);
}

// remove old subchannels that were not in new address list
Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());

// Flatten the new EAGs addresses
Set<SocketAddress> newAddrs = new HashSet<>();
for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
newAddrs.addAll(endpoint.getAddresses());
}

// Shut them down and remove them
for (SocketAddress oldAddr : oldAddrs) {
if (!newAddrs.contains(oldAddr)) {
subchannels.remove(oldAddr).getSubchannel().shutdown();
}
}
// No old addresses means first time through, so we will do an explicit move to CONNECTING
// which is what we implicitly started with
boolean noOldAddrs = shutdownRemovedAddresses(newImmutableAddressGroups);

if (oldAddrs.size() == 0) {
if (noOldAddrs) {
// Make tests happy; they don't properly assume starting in CONNECTING
rawConnectivityState = CONNECTING;
updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
Expand All @@ -188,6 +177,31 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
return Status.OK;
}

/**
* Compute the difference between the flattened new addresses and the old addresses that had been
* made into subchannels and then shutdown the matching subchannels.
* @return true if there were no old addresses
*/
private boolean shutdownRemovedAddresses(
ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups) {

Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());

// Flatten the new EAGs addresses
Set<SocketAddress> newAddrs = new HashSet<>();
for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
newAddrs.addAll(endpoint.getAddresses());
}

// Shut them down and remove them
for (SocketAddress oldAddr : oldAddrs) {
if (!newAddrs.contains(oldAddr)) {
subchannels.remove(oldAddr).getSubchannel().shutdown();
}
}
return oldAddrs.isEmpty();
}

private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
Set<SocketAddress> seenAddresses = new HashSet<>();
List<EquivalentAddressGroup> newGroups = new ArrayList<>();
Expand Down Expand Up @@ -290,7 +304,14 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses
} else {
scheduleBackoff();
if (subchannels.size() >= addressIndex.size()) {
scheduleBackoff();
} else {
// We must have done a seek to the middle of the list lets start over from the
// beginning
addressIndex.reset();
requestConnection();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2133,18 +2133,20 @@ public void lastAddressFailingNotTransientFailure() {
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());

// Verify that no new subchannels were created or started
// Subchannel 2 should be reused since it was trying to connect and is present.
inOrder.verify(mockSubchannel1).shutdown();
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
inOrder.verify(mockSubchannel3).requestConnection();
inOrder.verify(mockSubchannel3, never()).start(stateListenerCaptor.capture());
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());

// Second address connection attempt is unsuccessful, but should not go into transient failure
// Second address connection attempt is unsuccessful, so since at end, but don't have all
// subchannels, schedule a backoff for the first address
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
fakeClock.forwardTime(1, TimeUnit.SECONDS);
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());

// Third address connection attempt is unsuccessful, now we enter transient failure
// Third address connection attempt is unsuccessful, now we enter TF, do name resolution
stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());

Expand Down

0 comments on commit 87aa6de

Please sign in to comment.