Skip to content

Commit

Permalink
Watermarked response backpressure #3136
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Kec <[email protected]>
  • Loading branch information
danielkec committed Oct 3, 2022
1 parent 7121859 commit d335c0f
Show file tree
Hide file tree
Showing 19 changed files with 745 additions and 222 deletions.
16 changes: 16 additions & 0 deletions docs/config/io_helidon_reactive_webserver_WebServer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ This is a standalone configuration type, prefix from configuration root: `server
|`max-upgrade-content-length` |int |`65536` |Set a maximum length of the content of an upgrade request.
Default is `64*1024`
|`backpressure-buffer-size` |long |`5242880` |Set a maximum length of the unflushed response data sending buffer can keep without applying backpressure.
Depends on `backpressure-policy` what happens if max buffer size is reached.
Default is `5*1024*1024` - 5Mb
|`backpressure-policy` | String | `LINEAR` |Sets the strategy for applying backpressure to the reactive stream
of response data.
* LINEAR - Data chunks are requested one-by-one after previous data chunk has been written to Netty's buffer, when
`backpressure-buffer-size` watermark is reached, new chunks are not requested until buffer size decrease under
the watermark value.
* PREFETCH - After first data chunk arrives, expected number of chunks needed to fill the buffer up
to watermark is calculated and requested.
* AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested and extra flush is initiated.
* UNBOUNDED - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
Default is `LINEAR`
|`port` |int |`0` |Configures a server port to listen on with the server socket. If port is
`0` then any available ephemeral port will be used.
|`receive-buffer-size` |int |{nbsp} |Configures proposed value of the TCP receive window that is advertised to the remote peer on the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.microprofile.server;

import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.StreamingOutput;

import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import io.helidon.microprofile.tests.junit5.AddBean;
import io.helidon.microprofile.tests.junit5.AddConfig;
import io.helidon.microprofile.tests.junit5.AddExtension;
import io.helidon.microprofile.tests.junit5.DisableDiscovery;
import io.helidon.microprofile.tests.junit5.HelidonTest;

import org.glassfish.jersey.ext.cdi1x.internal.CdiComponentProvider;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;

@HelidonTest
@DisableDiscovery
@AddBean(StreamingOutputLeakTest.DownloadResource.class)
@AddExtension(ServerCdiExtension.class)
@AddExtension(JaxRsCdiExtension.class)
@AddExtension(CdiComponentProvider.class)
@AddConfig(key = "server.backpressure-buffer-size", value = "20971520")//20Mb
class StreamingOutputLeakTest {

private static final int SIZE10MB = 10 * 1024 * 1024;
private static final int SIZE = SIZE10MB;
private static final long NUMBER_OF_BUFS = 20;
private static final byte[] DATA_10MB = new byte[SIZE];

static {
Random r = new Random();
r.nextBytes(DATA_10MB);
}

/**
* Reproducer for issue #4643
*/
@Test
void streamingOutput(WebTarget target) throws IOException {

InputStream is = target.path("/download")
.request()
.get(InputStream.class);
long size = 0;
while (is.read() != -1) {
size++;
}
is.close();

// Make sure all data has been read
assertThat(size, is(NUMBER_OF_BUFS * SIZE));
}

@Path("/download")
public static class DownloadResource {

@GET
@Produces(MediaType.MULTIPART_FORM_DATA)
public Response getPayload(
@NotNull @QueryParam("fileName") String fileName) {
StreamingOutput fileStream = output -> {

// 2gb
for (int i = 0; i < NUMBER_OF_BUFS; i++) {
output.write(DATA_10MB);
output.flush();
}

};
return Response
.ok(fileStream, MediaType.MULTIPART_FORM_DATA)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.helidon.common.reactive.Single;
import io.helidon.reactive.media.common.MediaContext;
import io.helidon.reactive.media.common.MediaSupport;
import io.helidon.reactive.webserver.BackpressureStrategy;
import io.helidon.reactive.webserver.BareRequest;
import io.helidon.reactive.webserver.BareResponse;
import io.helidon.reactive.webserver.RequestHeaders;
Expand Down Expand Up @@ -294,6 +295,11 @@ public Single<BareResponse> whenCompleted() {
return Single.create(completionStage);
}

@Override
public void backpressureStrategy(BackpressureStrategy backpressureStrategy) {
//noop
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
Expand Down
23 changes: 18 additions & 5 deletions reactive/webserver/webserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,30 @@
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>tck-test</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>**/*TckTest.java</include>
</includes>
</configuration>
</execution>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>**/*IT.java</include>
</includes>
</configuration>
</execution>
</executions>
<configuration>
<includes>
<include>**/*TckTest.java</include>
</includes>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.reactive.webserver;

import java.util.concurrent.Flow;

import io.helidon.reactive.webserver.ServerResponseSubscription.Unbounded;
import io.helidon.reactive.webserver.ServerResponseSubscription.WatermarkAutoFlush;
import io.helidon.reactive.webserver.ServerResponseSubscription.WatermarkLinear;
import io.helidon.reactive.webserver.ServerResponseSubscription.WatermarkPrefetch;

/**
* Strategy for applying backpressure to the reactive stream of response data.
*/
public enum BackpressureStrategy {
/**
* Data chunks are requested one-by-one after previous data chunk has been given to Netty for writing.
* When backpressure-buffer-size watermark is reached new chunks are not requested until buffer size
* decrease under the watermark value.
*/
LINEAR(1),
/**
* Data are requested one-by-one, in case buffer reaches watermark,
* no other data is requested and extra flush is initiated.
*/
AUTO_FLUSH(2),
/**
* After first data chunk arrives, expected number of chunks needed
* to fill the buffer up to watermark is calculated and requested.
*/
PREFETCH(3),
/**
* No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
*/
UNBOUNDED(4);

private final int type;

BackpressureStrategy(int type) {
this.type = type;
}

ServerResponseSubscription createSubscription(Flow.Subscription subscription,
long backpressureBufferSize) {
switch (type) {
case 1: return new WatermarkLinear(subscription, backpressureBufferSize);
case 2: return new WatermarkAutoFlush(subscription, backpressureBufferSize);
case 3: return new WatermarkPrefetch(subscription, backpressureBufferSize);
case 4: return new Unbounded(subscription);
default: throw new IllegalStateException("Unknown backpressure strategy.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ void writeStatusAndHeaders(Http.Status status, Map<String, List<String>> headers
*/
Single<BareResponse> whenCompleted();

/**
* Set the backpressure strategy used for requesting response data.
*
* @param backpressureStrategy strategy used for requesting response data
*/
void backpressureStrategy(BackpressureStrategy backpressureStrategy);

/**
* Each response is subscribed up to a single publisher and AFTER {@link #writeStatusAndHeaders(Http.Status, Map)}
* method is called and returned.
Expand Down
Loading

0 comments on commit d335c0f

Please sign in to comment.