Skip to content

Commit

Permalink
Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time (g…
Browse files Browse the repository at this point in the history
…rpc#11520)

* Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time if environment variable GRPC_SERIALIZE_RETRIES == true.

Cache serializingRetries value so that it doesn't have to look up the flag every time.

Clear the correct task when READY in processSubchannelState and move the logic to cancelScheduledTasks

Cleanup based on PR review

remove unneeded checks for shutdown.

* Fix previously broken tests

* Shutdown previous subchannel when run off end of index.

* Provide option to disable subchannel retries to let PFLeafLB take control of retries.

* InternalSubchannel internally goes to IDLE when sees TF when reconnect is disabled.
Remove an extra index.increment in LeafLB
  • Loading branch information
larry-safran authored Oct 3, 2024
1 parent 6f35422 commit 9bb06af
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 53 deletions.
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ public abstract class LoadBalancer {
HEALTH_CONSUMER_LISTENER_ARG_KEY =
LoadBalancer.CreateSubchannelArgs.Key.create("internal:health-check-consumer-listener");

@Internal
public static final LoadBalancer.CreateSubchannelArgs.Key<Boolean>
DISABLE_SUBCHANNEL_RECONNECT_KEY =
LoadBalancer.CreateSubchannelArgs.Key.createWithDefault(
"internal:disable-subchannel-reconnect", Boolean.FALSE);

@Internal
public static final Attributes.Key<Boolean>
HAS_HEALTH_PRODUCER_LISTENER_KEY =
Expand Down
30 changes: 22 additions & 8 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
Expand Down Expand Up @@ -77,6 +78,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
private final CallTracer callsTracer;
private final ChannelTracer channelTracer;
private final ChannelLogger channelLogger;
private final boolean reconnectDisabled;

private final List<ClientTransportFilter> transportFilters;

Expand Down Expand Up @@ -159,13 +161,15 @@ protected void handleNotInUse() {

private volatile Attributes connectedAddressAttributes;

InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback,
InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer,
InternalLogId logId, ChannelLogger channelLogger,
List<ClientTransportFilter> transportFilters) {
InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory,
ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
Callback callback, InternalChannelz channelz, CallTracer callsTracer,
ChannelTracer channelTracer, InternalLogId logId,
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters) {
List<EquivalentAddressGroup> addressGroups = args.getAddresses();
Preconditions.checkNotNull(addressGroups, "addressGroups");
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
Expand All @@ -187,6 +191,7 @@ protected void handleNotInUse() {
this.logId = Preconditions.checkNotNull(logId, "logId");
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.transportFilters = transportFilters;
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
}

ChannelLogger getChannelLogger() {
Expand Down Expand Up @@ -289,6 +294,11 @@ public void run() {
}

gotoState(ConnectivityStateInfo.forTransientFailure(status));

if (reconnectDisabled) {
return;
}

if (reconnectPolicy == null) {
reconnectPolicy = backoffPolicyProvider.get();
}
Expand Down Expand Up @@ -337,7 +347,11 @@ private void gotoState(final ConnectivityStateInfo newState) {
if (state.getState() != newState.getState()) {
Preconditions.checkState(state.getState() != SHUTDOWN,
"Cannot transition out of SHUTDOWN to " + newState);
state = newState;
if (reconnectDisabled && newState.getState() == TRANSIENT_FAILURE) {
state = ConnectivityStateInfo.forNonError(IDLE);
} else {
state = newState;
}
callback.onStateChange(InternalSubchannel.this, newState);
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
}

final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup,
CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
authority, userAgent, backoffPolicyProvider, oobTransportFactory,
oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
// All callback methods are run from syncContext
Expand Down Expand Up @@ -1915,7 +1915,7 @@ void onNotInUse(InternalSubchannel is) {
}

final InternalSubchannel internalSubchannel = new InternalSubchannel(
args.getAddresses(),
args,
authority(),
userAgent,
backoffPolicyProvider,
Expand Down
96 changes: 86 additions & 10 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,26 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private int numTf = 0;
private boolean firstPass = true;
@Nullable
private ScheduledHandle scheduleConnectionTask;
private ScheduledHandle scheduleConnectionTask = null;
private ConnectivityState rawConnectivityState = IDLE;
private ConnectivityState concludedState = IDLE;
private final boolean enableHappyEyeballs =
PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private final boolean enableHappyEyeballs = !isSerializingRetries()
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy
private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
private BackoffPolicy reconnectPolicy;
@Nullable
private ScheduledHandle reconnectTask = null;
private final boolean serializingRetries = isSerializingRetries();

PickFirstLeafLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
}

static boolean isSerializingRetries() {
return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
}

@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (rawConnectivityState == SHUTDOWN) {
Expand Down Expand Up @@ -225,9 +234,10 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
return;
}

if (newState == IDLE) {
if (newState == IDLE && subchannelData.state == READY) {
helper.refreshNameResolution();
}

// If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
// transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
// transient failure". Only a subchannel state change to READY will get the LB out of
Expand Down Expand Up @@ -277,6 +287,8 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
if (addressIndex.increment()) {
cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses
} else {
scheduleBackoff();
}
}

Expand Down Expand Up @@ -304,6 +316,39 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
}
}

/**
* Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
*/
private void scheduleBackoff() {
if (!serializingRetries) {
return;
}

class EndOfCurrentBackoff implements Runnable {
@Override
public void run() {
reconnectTask = null;
addressIndex.reset();
requestConnection();
}
}

// Just allow the previous one to trigger when ready if we're already in backoff
if (reconnectTask != null) {
return;
}

if (reconnectPolicy == null) {
reconnectPolicy = bkoffPolProvider.get();
}
long delayNanos = reconnectPolicy.nextBackoffNanos();
reconnectTask = helper.getSynchronizationContext().schedule(
new EndOfCurrentBackoff(),
delayNanos,
TimeUnit.NANOSECONDS,
helper.getScheduledExecutorService());
}

private void updateHealthCheckedState(SubchannelData subchannelData) {
if (subchannelData.state != READY) {
return;
Expand Down Expand Up @@ -337,6 +382,11 @@ public void shutdown() {
rawConnectivityState = SHUTDOWN;
concludedState = SHUTDOWN;
cancelScheduleTask();
if (reconnectTask != null) {
reconnectTask.cancel();
reconnectTask = null;
}
reconnectPolicy = null;

for (SubchannelData subchannelData : subchannels.values()) {
subchannelData.getSubchannel().shutdown();
Expand All @@ -350,6 +400,12 @@ public void shutdown() {
* that all other subchannels must be shutdown.
*/
private void shutdownRemaining(SubchannelData activeSubchannelData) {
if (reconnectTask != null) {
reconnectTask.cancel();
reconnectTask = null;
}
reconnectPolicy = null;

cancelScheduleTask();
for (SubchannelData subchannelData : subchannels.values()) {
if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
Expand Down Expand Up @@ -391,8 +447,17 @@ public void requestConnection() {
scheduleNextConnection();
break;
case TRANSIENT_FAILURE:
addressIndex.increment();
requestConnection();
if (!serializingRetries) {
addressIndex.increment();
requestConnection();
} else {
if (!addressIndex.isValid()) {
scheduleBackoff();
} else {
subchannelData.subchannel.requestConnection();
subchannelData.updateState(CONNECTING);
}
}
break;
default:
// Wait for current subchannel to change state
Expand Down Expand Up @@ -438,9 +503,10 @@ private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs)
HealthListener hcListener = new HealthListener();
final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(Lists.newArrayList(
new EquivalentAddressGroup(addr, attrs)))
.addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
.setAddresses(Lists.newArrayList(
new EquivalentAddressGroup(addr, attrs)))
.addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
.addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries)
.build());
if (subchannel == null) {
log.warning("Was not able to create subchannel for " + addr);
Expand All @@ -458,7 +524,7 @@ private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs)
}

private boolean isPassComplete() {
if (addressIndex.isValid() || subchannels.size() < addressIndex.size()) {
if (subchannels.size() < addressIndex.size()) {
return false;
}
for (SubchannelData sc : subchannels.values()) {
Expand Down Expand Up @@ -646,6 +712,16 @@ public int size() {
}
}

@VisibleForTesting
int getGroupIndex() {
return addressIndex.groupIndex;
}

@VisibleForTesting
boolean isIndexValid() {
return addressIndex.isValid();
}

private static final class SubchannelData {
private final Subchannel subchannel;
private ConnectivityState state;
Expand Down
65 changes: 63 additions & 2 deletions core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.grpc.InternalChannelz;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.InternalSubchannel.CallTracingTransport;
Expand Down Expand Up @@ -309,10 +310,57 @@ public void constructor_eagListWithNull_throws() {
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos();
}

@Test public void twoAddressesReconnectDisabled() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createInternalSubchannel(true,
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)));
assertEquals(IDLE, internalSubchannel.getState());

assertNull(internalSubchannel.obtainActiveTransport());
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory).newClientTransport(eq(addr1), any(), any());
// Let this one fail without success
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Still in CONNECTING
assertNull(internalSubchannel.obtainActiveTransport());
assertNoCallbackInvoke();
assertEquals(CONNECTING, internalSubchannel.getState());

// Second attempt will start immediately. Still no back-off policy.
verify(mockBackoffPolicyProvider, times(0)).get();
verify(mockTransportFactory, times(1))
.newClientTransport(
eq(addr2),
eq(createClientTransportOptions()),
isA(TransportLogger.class));
assertNull(internalSubchannel.obtainActiveTransport());
// Fail this one too
assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed, but we aren't controlling retries.
assertEquals(IDLE, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
// Backoff reset and first back-off interval begins
verify(mockBackoffPolicy1, never()).nextBackoffNanos();
verify(mockBackoffPolicyProvider, never()).get();
assertTrue("Nothing should have been scheduled", fakeClock.getPendingTasks().isEmpty());

// Should follow orders and create an active transport.
internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(CONNECTING, internalSubchannel.getState());

// Shouldn't have anything scheduled, so shouldn't do anything
assertTrue("Nothing should have been scheduled 2", fakeClock.getPendingTasks().isEmpty());
}

@Test public void twoAddressesReconnect() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createInternalSubchannel(addr1, addr2);
createInternalSubchannel(false,
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)));
assertEquals(IDLE, internalSubchannel.getState());
// Invocation counters
int transportsAddr1 = 0;
Expand Down Expand Up @@ -1377,11 +1425,24 @@ private void createInternalSubchannel(SocketAddress ... addrs) {
}

private void createInternalSubchannel(EquivalentAddressGroup ... addrs) {
createInternalSubchannel(false, addrs);
}

private void createInternalSubchannel(boolean reconnectDisabled,
EquivalentAddressGroup ... addrs) {
List<EquivalentAddressGroup> addressGroups = Arrays.asList(addrs);
InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT,
LoadBalancer.CreateSubchannelArgs.Builder argBuilder =
LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups);
if (reconnectDisabled) {
argBuilder.addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, reconnectDisabled);
}
LoadBalancer.CreateSubchannelArgs createSubchannelArgs = argBuilder.build();
internalSubchannel = new InternalSubchannel(
createSubchannelArgs,
AUTHORITY, USER_AGENT,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback,
channelz, CallTracer.getDefaultFactory().create(),
Expand Down
Loading

0 comments on commit 9bb06af

Please sign in to comment.