Skip to content

Commit

Permalink
Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time …
Browse files Browse the repository at this point in the history
…if environment variable GRPC_SERIALIZE_RETRIES == true.

Cache serializingRetries value so that it doesn't have to look up the flag every time.

Clear the correct task when READY in processSubchannelState and move the logic to cancelScheduledTasks

Cleanup based on PR review

remove unneeded checks for shutdown.
  • Loading branch information
larry-safran committed Sep 24, 2024
1 parent 9b0c19e commit df8e78c
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 9 deletions.
91 changes: 84 additions & 7 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,26 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private int numTf = 0;
private boolean firstPass = true;
@Nullable
private ScheduledHandle scheduleConnectionTask;
private ScheduledHandle scheduleConnectionTask = null;
private ConnectivityState rawConnectivityState = IDLE;
private ConnectivityState concludedState = IDLE;
private final boolean enableHappyEyeballs =
PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private final boolean enableHappyEyeballs = !isSerializingRetries()
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy
private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
private BackoffPolicy reconnectPolicy;
@Nullable
private ScheduledHandle reconnectTask = null;
private boolean serializingRetries = isSerializingRetries();

PickFirstLeafLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
}

static boolean isSerializingRetries() {
return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
}

@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (rawConnectivityState == SHUTDOWN) {
Expand Down Expand Up @@ -277,10 +286,12 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
if (addressIndex.increment()) {
cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses
} else {
scheduleBackoff();
}
}

if (isPassComplete()) {
if ( isPassComplete()) {
rawConnectivityState = TRANSIENT_FAILURE;
updateBalancingState(TRANSIENT_FAILURE,
new Picker(PickResult.withError(stateInfo.getStatus())));
Expand All @@ -304,6 +315,42 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
}
}

/**
* Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
*/
private void scheduleBackoff() {
if (!serializingRetries) {
return;
}

class EndOfCurrentBackoff implements Runnable {

Check warning on line 326 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L326

Added line #L326 was not covered by tests
@Override
public void run() {
reconnectTask = null;

Check warning on line 329 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L329

Added line #L329 was not covered by tests
if (rawConnectivityState == SHUTDOWN) {
return;

Check warning on line 331 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L331

Added line #L331 was not covered by tests
}
addressIndex.reset();
requestConnection();
}

Check warning on line 335 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L333-L335

Added lines #L333 - L335 were not covered by tests
}

// Just allow the previous one to trigger when ready if we're already in backoff
if (reconnectTask != null) {
return;

Check warning on line 340 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L340

Added line #L340 was not covered by tests
}

if (reconnectPolicy == null) {
reconnectPolicy = bkoffPolProvider.get();

Check warning on line 344 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L344

Added line #L344 was not covered by tests
}
long delayNanos = reconnectPolicy.nextBackoffNanos();
reconnectTask = helper.getSynchronizationContext().schedule(

Check warning on line 347 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L346-L347

Added lines #L346 - L347 were not covered by tests
new EndOfCurrentBackoff(),
delayNanos,
TimeUnit.NANOSECONDS,
helper.getScheduledExecutorService());
}

Check warning on line 352 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L351-L352

Added lines #L351 - L352 were not covered by tests

private void updateHealthCheckedState(SubchannelData subchannelData) {
if (subchannelData.state != READY) {
return;
Expand Down Expand Up @@ -337,6 +384,11 @@ public void shutdown() {
rawConnectivityState = SHUTDOWN;
concludedState = SHUTDOWN;
cancelScheduleTask();
if (reconnectTask != null) {
reconnectTask.cancel();
reconnectTask = null;

Check warning on line 389 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L388-L389

Added lines #L388 - L389 were not covered by tests
}
reconnectPolicy = null;

for (SubchannelData subchannelData : subchannels.values()) {
subchannelData.getSubchannel().shutdown();
Expand All @@ -350,6 +402,12 @@ public void shutdown() {
* that all other subchannels must be shutdown.
*/
private void shutdownRemaining(SubchannelData activeSubchannelData) {
if (reconnectTask != null) {
reconnectTask.cancel();
reconnectTask = null;

Check warning on line 407 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L406-L407

Added lines #L406 - L407 were not covered by tests
}
reconnectPolicy = null;

cancelScheduleTask();
for (SubchannelData subchannelData : subchannels.values()) {
if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
Expand All @@ -370,7 +428,12 @@ private void shutdownRemaining(SubchannelData activeSubchannelData) {
*/
@Override
public void requestConnection() {
if (!addressIndex.isValid() || rawConnectivityState == SHUTDOWN) {
if (rawConnectivityState == SHUTDOWN) {
return;

Check warning on line 432 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L432

Added line #L432 was not covered by tests
}

if (!addressIndex.isValid()) {
scheduleBackoff();
return;
}

Expand All @@ -391,8 +454,21 @@ public void requestConnection() {
scheduleNextConnection();
break;
case TRANSIENT_FAILURE:
if (scheduleConnectionTask != null) {
break; // let the already scheduled task do its job

Check warning on line 458 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L458

Added line #L458 was not covered by tests
}
addressIndex.increment();
requestConnection();
if (!serializingRetries) {
requestConnection();
} else {
if (!addressIndex.isValid()) {
scheduleBackoff();

Check warning on line 465 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L465

Added line #L465 was not covered by tests
} else {
subchannelData.subchannel.shutdown(); // shutdown the previous subchannel
subchannels.remove(currentAddress);
requestConnection();

Check warning on line 469 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L467-L469

Added lines #L467 - L469 were not covered by tests
}
}
break;
default:
// Wait for current subchannel to change state
Expand Down Expand Up @@ -458,7 +534,8 @@ private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs)
}

private boolean isPassComplete() {
if (addressIndex.isValid() || subchannels.size() < addressIndex.size()) {
if ((!serializingRetries && addressIndex.isValid())
|| subchannels.size() < addressIndex.size()) {
return false;
}
for (SubchannelData sc : subchannels.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
import static io.grpc.LoadBalancer.IS_PETIOLE_POLICY;
import static io.grpc.internal.PickFirstLeafLoadBalancer.CONNECTION_DELAY_INTERVAL_MS;
import static io.grpc.internal.PickFirstLeafLoadBalancer.isSerializingRetries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -95,7 +96,11 @@ public class PickFirstLeafLoadBalancerTest {

@Parameterized.Parameters(name = "{0}")
public static List<Boolean> enableHappyEyeballs() {
return Arrays.asList(true, false);
if (PickFirstLeafLoadBalancer.isSerializingRetries()) {
return Arrays.asList(false);
} else {
return Arrays.asList(false, true);
}
}

@Parameterized.Parameter
Expand Down Expand Up @@ -143,7 +148,8 @@ public void setUp() {
originalHappyEyeballsEnabledValue =
System.getProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS);
System.setProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS,
enableHappyEyeballs ? "true" : "false");
!PickFirstLeafLoadBalancer.isSerializingRetries() && enableHappyEyeballs
? "true" : "false");

for (int i = 1; i <= 5; i++) {
SocketAddress addr = new FakeSocketAddress("server" + i);
Expand Down Expand Up @@ -498,6 +504,9 @@ public void healthCheckFlow() {
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs)
.getSubchannel()).isSameInstanceAs(mockSubchannel1);
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);

healthListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
verifyNoMoreInteractions(mockHelper);
Expand Down Expand Up @@ -589,6 +598,8 @@ public void pickAfterResolutionAfterTransientValue() {

// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
}
Expand Down Expand Up @@ -619,6 +630,8 @@ public void pickWithDupAddressesUpDownUp() {

// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

Expand Down Expand Up @@ -651,6 +664,8 @@ public void pickWithDupEagsUpDownUp() {

// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

Expand Down Expand Up @@ -1518,6 +1533,8 @@ public void updateAddresses_intersecting_ready() {

@Test
public void updateAddresses_intersecting_transient_failure() {
Assume.assumeTrue(!isSerializingRetries());

// Starting first connection attempt
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2,
mockSubchannel3, mockSubchannel4); // captor: captures
Expand Down Expand Up @@ -1782,6 +1799,8 @@ public void updateAddresses_identical_ready() {

@Test
public void updateAddresses_identical_transient_failure() {
Assume.assumeTrue(!isSerializingRetries());

InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2,
mockSubchannel3, mockSubchannel4);
// Creating first set of endpoints/addresses
Expand Down

0 comments on commit df8e78c

Please sign in to comment.