diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 85d7bc86584..7765408a627 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -846,6 +846,15 @@ public void run() { } } + private static final boolean isExperimentalRetryJitterEnabled = GrpcUtil + .getFlag("GRPC_EXPERIMENTAL_XDS_RLS_LB", true); + + public static long intervalWithJitter(long intervalNanos) { + double inverseJitterFactor = isExperimentalRetryJitterEnabled + ? 0.8 * random.nextDouble() + 0.4 : random.nextDouble(); + return (long) (intervalNanos * inverseJitterFactor); + } + private static final class SavedCloseMasterListenerReason { private final Status status; private final RpcProgress progress; @@ -1066,7 +1075,7 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { if (pushbackMillis == null) { if (isRetryableStatusCode) { shouldRetry = true; - backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble()); + backoffNanos = intervalWithJitter(nextBackoffIntervalNanos); nextBackoffIntervalNanos = Math.min( (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), retryPolicy.maxBackoffNanos); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 98900cecf2b..21ccf1095df 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -3875,7 +3875,7 @@ public double nextDouble() { Status.UNAVAILABLE, PROCESSED, new Metadata()); // in backoff - timer.forwardTime(5, TimeUnit.SECONDS); + timer.forwardTime(6, TimeUnit.SECONDS); assertThat(timer.getPendingTasks()).hasSize(1); verify(mockStream2, never()).start(any(ClientStreamListener.class)); @@ -3894,7 +3894,7 @@ public double nextDouble() { assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription()); // backoff ends - timer.forwardTime(5, TimeUnit.SECONDS); + timer.forwardTime(6, TimeUnit.SECONDS); assertThat(timer.getPendingTasks()).isEmpty(); verify(mockStream2).start(streamListenerCaptor.capture()); verify(mockLoadBalancer, never()).shutdown(); diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 658ed70a135..9b1ec343bb7 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -147,6 +147,17 @@ public double nextDouble() { private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); private final FakeClock fakeClock = new FakeClock(); + private static long calculateBackoffWithRetries(int retryCount) { + // Calculate the exponential backoff delay with jitter + double exponent = retryCount > 0 ? Math.pow(BACKOFF_MULTIPLIER, retryCount) : 1; + long delay = (long) (INITIAL_BACKOFF_IN_SECONDS * exponent); + return RetriableStream.intervalWithJitter(delay); + } + + private static long calculateMaxBackoff() { + return RetriableStream.intervalWithJitter(MAX_BACKOFF_IN_SECONDS); + } + private final class RecordedRetriableStream extends RetriableStream { RecordedRetriableStream(MethodDescriptor method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, @@ -307,7 +318,7 @@ public Void answer(InvocationOnMock in) { retriableStream.sendMessage("msg1 during backoff1"); retriableStream.sendMessage("msg2 during backoff1"); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS); inOrder.verifyNoMoreInteractions(); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); @@ -364,9 +375,7 @@ public Void answer(InvocationOnMock in) { retriableStream.sendMessage("msg2 during backoff2"); retriableStream.sendMessage("msg3 during backoff2"); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS); inOrder.verifyNoMoreInteractions(); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); @@ -459,7 +468,7 @@ public void retry_headersRead_cancel() { sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -518,7 +527,7 @@ public void retry_headersRead_closed() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -584,7 +593,7 @@ public void retry_cancel_closed() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -687,7 +696,7 @@ public void retry_unretriableClosed_cancel() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -821,7 +830,7 @@ public boolean isReady() { // send more requests during backoff retriableStream.request(789); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.get()); inOrder.verify(mockStream2).request(3); @@ -875,7 +884,7 @@ public void request(int numMessages) { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).request(3); @@ -920,7 +929,7 @@ public void start(ClientStreamListener listener) { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(retriableStreamRecorder).postCommit(); @@ -1028,7 +1037,7 @@ public boolean isReady() { retriableStream.request(789); readiness.add(retriableStream.isReady()); // expected false b/c in backoff - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); readiness.add(retriableStream.isReady()); // expected true @@ -1110,7 +1119,7 @@ public void addPrevRetryAttemptsToRespHeaders() { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1160,13 +1169,12 @@ public void start(ClientStreamListener listener) { listener1.closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); // send requests during backoff retriableStream.request(3); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1), TimeUnit.SECONDS); retriableStream.request(1); verify(mockStream1, never()).request(anyInt()); @@ -1207,7 +1215,7 @@ public void start(ClientStreamListener listener) { // retry listener1.closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); verify(retriableStreamRecorder).postCommit(); @@ -1260,7 +1268,7 @@ public void perRpcBufferLimitExceededDuringBackoff() { bufferSizeTracer.outboundWireSize(2); verify(retriableStreamRecorder, never()).postCommit(); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); verify(mockStream2).start(any(ClientStreamListener.class)); verify(mockStream2).isReady(); @@ -1332,7 +1340,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1347,9 +1355,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor2.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1364,10 +1370,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor3.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM) - - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(2) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1382,7 +1385,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor4.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateMaxBackoff() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1397,7 +1400,7 @@ public void expBackoff_maxBackoff_maxRetryAttempts() { sublistenerCaptor5.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateMaxBackoff() - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1480,7 +1483,7 @@ public void pushback() { sublistenerCaptor3.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1495,9 +1498,7 @@ public void pushback() { sublistenerCaptor4.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1512,10 +1513,7 @@ public void pushback() { sublistenerCaptor5.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM) - - 1L, - TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(2) - 1L, TimeUnit.SECONDS); assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); @@ -1804,7 +1802,7 @@ public void transparentRetry_onlyOnceOnRefused() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1907,7 +1905,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1923,8 +1921,7 @@ public void normalRetry_thenNoTransparentRetry_butNormalRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime( - (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(1), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(2); ArgumentCaptor sublistenerCaptor3 = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -1960,7 +1957,7 @@ public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); assertEquals(1, fakeClock.numPendingTasks()); - fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + fakeClock.forwardTime(calculateBackoffWithRetries(0), TimeUnit.SECONDS); inOrder.verify(retriableStreamRecorder).newSubstream(1); ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index edd2a57ab9d..669ce1c69db 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -303,7 +303,7 @@ public void retryUntilBufferLimitExceeded() throws Exception { serverCall.close( Status.UNAVAILABLE.withDescription("original attempt failed"), new Metadata()); - elapseBackoff(10, SECONDS); + elapseBackoff(12, SECONDS); // 2nd attempt received serverCall = serverCalls.poll(5, SECONDS); serverCall.request(2); @@ -348,7 +348,7 @@ public void statsRecorded() throws Exception { Status.UNAVAILABLE.withDescription("original attempt failed"), new Metadata()); assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1); - elapseBackoff(10, SECONDS); + elapseBackoff(12, SECONDS); assertRpcStartedRecorded(); assertOutboundMessageRecorded(); serverCall = serverCalls.poll(5, SECONDS); @@ -366,7 +366,7 @@ public void statsRecorded() throws Exception { call.request(1); assertInboundMessageRecorded(); assertInboundWireSizeRecorded(1); - assertRpcStatusRecorded(Status.Code.OK, 12000, 2); + assertRpcStatusRecorded(Status.Code.OK, 14000, 2); assertRetryStatsRecorded(1, 0, 0); } @@ -418,7 +418,7 @@ public void streamClosed(Status status) { Status.UNAVAILABLE.withDescription("original attempt failed"), new Metadata()); assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1); - elapseBackoff(10, SECONDS); + elapseBackoff(12, SECONDS); assertRpcStartedRecorded(); assertOutboundMessageRecorded(); serverCall = serverCalls.poll(5, SECONDS); @@ -431,7 +431,7 @@ public void streamClosed(Status status) { streamClosedLatch.countDown(); // The call listener is closed. verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); - assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1); + assertRpcStatusRecorded(Code.CANCELLED, 19_000, 1); assertRetryStatsRecorded(1, 0, 0); } diff --git a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java index 9e93ee1155c..40b84717160 100644 --- a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java +++ b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java @@ -33,9 +33,6 @@ */ class NettyWritableBufferAllocator implements WritableBufferAllocator { - // Use 4k as our minimum buffer size. - private static final int MIN_BUFFER = 4 * 1024; - // Set the maximum buffer size to 1MB. private static final int MAX_BUFFER = 1024 * 1024; @@ -47,7 +44,7 @@ class NettyWritableBufferAllocator implements WritableBufferAllocator { @Override public WritableBuffer allocate(int capacityHint) { - capacityHint = Math.min(MAX_BUFFER, Math.max(MIN_BUFFER, capacityHint)); + capacityHint = Math.min(MAX_BUFFER, capacityHint); return new NettyWritableBuffer(allocator.buffer(capacityHint, capacityHint)); } } diff --git a/netty/src/test/java/io/grpc/netty/NettyWritableBufferAllocatorTest.java b/netty/src/test/java/io/grpc/netty/NettyWritableBufferAllocatorTest.java index d577ec46b03..0b741ae24b3 100644 --- a/netty/src/test/java/io/grpc/netty/NettyWritableBufferAllocatorTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyWritableBufferAllocatorTest.java @@ -40,13 +40,6 @@ protected WritableBufferAllocator allocator() { return allocator; } - @Test - public void testCapacityHasMinimum() { - WritableBuffer buffer = allocator().allocate(100); - assertEquals(0, buffer.readableBytes()); - assertEquals(4096, buffer.writableBytes()); - } - @Test public void testCapacityIsExactAboveMinimum() { WritableBuffer buffer = allocator().allocate(9000); diff --git a/xds/src/main/java/io/grpc/xds/FaultFilter.java b/xds/src/main/java/io/grpc/xds/FaultFilter.java index d46b3d30f5a..b7f7fa9c226 100644 --- a/xds/src/main/java/io/grpc/xds/FaultFilter.java +++ b/xds/src/main/java/io/grpc/xds/FaultFilter.java @@ -190,94 +190,102 @@ public ClientInterceptor buildClientInterceptor( config = overrideConfig; } FaultConfig faultConfig = (FaultConfig) config; - Long delayNanos = null; - Status abortStatus = null; - if (faultConfig.maxActiveFaults() == null - || activeFaultCounter.get() < faultConfig.maxActiveFaults()) { - Metadata headers = args.getHeaders(); - if (faultConfig.faultDelay() != null) { - delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers); - } - if (faultConfig.faultAbort() != null) { - abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers); - } - } - if (delayNanos == null && abortStatus == null) { - return null; - } - final Long finalDelayNanos = delayNanos; - final Status finalAbortStatus = getAbortStatusWithDescription(abortStatus); final class FaultInjectionInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( final MethodDescriptor method, final CallOptions callOptions, final Channel next) { - Executor callExecutor = callOptions.getExecutor(); - if (callExecutor == null) { // This should never happen in practice because - // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with - // a callExecutor. - // TODO(https://github.com/grpc/grpc-java/issues/7868) - callExecutor = MoreExecutors.directExecutor(); + boolean checkFault = false; + if (faultConfig.maxActiveFaults() == null + || activeFaultCounter.get() < faultConfig.maxActiveFaults()) { + checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null; } - if (finalDelayNanos != null) { - Supplier> callSupplier; - if (finalAbortStatus != null) { - callSupplier = Suppliers.ofInstance( - new FailingClientCall(finalAbortStatus, callExecutor)); - } else { - callSupplier = new Supplier>() { - @Override - public ClientCall get() { - return next.newCall(method, callOptions); - } - }; + if (!checkFault) { + return next.newCall(method, callOptions); + } + final class DeadlineInsightForwardingCall extends ForwardingClientCall { + private ClientCall delegate; + + @Override + protected ClientCall delegate() { + return delegate; } - final DelayInjectedCall delayInjectedCall = new DelayInjectedCall<>( - finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier); - final class DeadlineInsightForwardingCall extends ForwardingClientCall { - @Override - protected ClientCall delegate() { - return delayInjectedCall; + @Override + public void start(Listener listener, Metadata headers) { + Executor callExecutor = callOptions.getExecutor(); + if (callExecutor == null) { // This should never happen in practice because + // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with + // a callExecutor. + // TODO(https://github.com/grpc/grpc-java/issues/7868) + callExecutor = MoreExecutors.directExecutor(); } - @Override - public void start(Listener listener, Metadata headers) { - Listener finalListener = - new SimpleForwardingClientCallListener(listener) { - @Override - public void onClose(Status status, Metadata trailers) { - if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) { - // TODO(zdapeng:) check effective deadline locally, and - // do the following only if the local deadline is exceeded. - // (If the server sends DEADLINE_EXCEEDED for its own deadline, then the - // injected delay does not contribute to the error, because the request is - // only sent out after the delay. There could be a race between local and - // remote, but it is rather rare.) - String description = String.format( - Locale.US, - "Deadline exceeded after up to %d ns of fault-injected delay", - finalDelayNanos); - if (status.getDescription() != null) { - description = description + ": " + status.getDescription(); - } - status = Status.DEADLINE_EXCEEDED - .withDescription(description).withCause(status.getCause()); - // Replace trailers to prevent mixing sources of status and trailers. - trailers = new Metadata(); + Long delayNanos; + Status abortStatus = null; + if (faultConfig.faultDelay() != null) { + delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers); + } else { + delayNanos = null; + } + if (faultConfig.faultAbort() != null) { + abortStatus = getAbortStatusWithDescription( + determineFaultAbortStatus(faultConfig.faultAbort(), headers)); + } + + Supplier> callSupplier; + if (abortStatus != null) { + callSupplier = Suppliers.ofInstance( + new FailingClientCall(abortStatus, callExecutor)); + } else { + callSupplier = new Supplier>() { + @Override + public ClientCall get() { + return next.newCall(method, callOptions); + } + }; + } + if (delayNanos == null) { + delegate = callSupplier.get(); + delegate().start(listener, headers); + return; + } + + delegate = new DelayInjectedCall<>( + delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier); + + Listener finalListener = + new SimpleForwardingClientCallListener(listener) { + @Override + public void onClose(Status status, Metadata trailers) { + if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) { + // TODO(zdapeng:) check effective deadline locally, and + // do the following only if the local deadline is exceeded. + // (If the server sends DEADLINE_EXCEEDED for its own deadline, then the + // injected delay does not contribute to the error, because the request is + // only sent out after the delay. There could be a race between local and + // remote, but it is rather rare.) + String description = String.format( + Locale.US, + "Deadline exceeded after up to %d ns of fault-injected delay", + delayNanos); + if (status.getDescription() != null) { + description = description + ": " + status.getDescription(); } - delegate().onClose(status, trailers); + status = Status.DEADLINE_EXCEEDED + .withDescription(description).withCause(status.getCause()); + // Replace trailers to prevent mixing sources of status and trailers. + trailers = new Metadata(); } - }; - delegate().start(finalListener, headers); - } + delegate().onClose(status, trailers); + } + }; + delegate().start(finalListener, headers); } - - return new DeadlineInsightForwardingCall(); - } else { - return new FailingClientCall<>(finalAbortStatus, callExecutor); } + + return new DeadlineInsightForwardingCall(); } }