Skip to content

Commit

Permalink
100 continue triggered by content request - reactive (#5714)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Feb 1, 2023
1 parent 8a041bc commit 32a5381
Show file tree
Hide file tree
Showing 15 changed files with 550 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
* Copyright (c) 2017, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -58,6 +59,7 @@ public class SocketHttpClient implements AutoCloseable {

private Socket socket;
private boolean connected;
private BufferedReader socketReader;

protected SocketHttpClient(String host, int port, Duration timeout) {
this.host = host;
Expand Down Expand Up @@ -320,6 +322,45 @@ public String receive() {
}
}

/**
* Execute immediately given consumer with this
* socket client as an argument.
*
* @param exec consumer to execute
* @return this http client
*/
public SocketHttpClient then(Consumer<SocketHttpClient> exec){
exec.accept(this);
return this;
}

/**
* Wait for text coming from socket.
*
* @param expectedStartsWith expected beginning
* @param expectedEndsWith expected end
* @return this http client
*/
public SocketHttpClient awaitResponse(String expectedStartsWith, String expectedEndsWith) {
StringBuilder sb = new StringBuilder();
String t;
while (true) {
try {
t = socketReader.readLine();
if (t == null) {
break;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
sb.append(t).append("\n");
if (sb.toString().startsWith(expectedStartsWith) && sb.toString().endsWith(expectedEndsWith)) {
break;
}
}
return this;
}

/**
* Sends a request to the server.
*
Expand Down Expand Up @@ -449,6 +490,7 @@ public void connect(Duration timeout) {
try {
socket = new Socket(host, port);
socket.setSoTimeout((int) timeout.toMillis());
socketReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
connected = true;
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -464,6 +506,41 @@ public boolean connected() {
return connected;
}

/**
* Send supplied text manually to socket.
*
* @param formatString text to send
* @param args format arguments
* @return this http client
* @throws IOException
*/
public SocketHttpClient manualRequest(String formatString, Object... args) throws IOException {
if (socket == null) {
connect();
}
PrintWriter pw = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
pw.printf(formatString.replaceAll("\\n", EOL), args);
pw.flush();
return this;
}


/**
* Continue sending more to text to socket.
* @param payload text to be sent
* @return this http client
* @throws IOException
*/
public SocketHttpClient continuePayload(String payload)
throws IOException {
PrintWriter pw = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
if (payload != null) {
pw.print(payload);
}
pw.flush();
return this;
}

/**
* Override this to send a specific payload.
*
Expand Down
20 changes: 17 additions & 3 deletions docs/config/io_helidon_reactive_webserver_SocketConfiguration.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
///////////////////////////////////////////////////////////////////////////////

Copyright (c) 2022 Oracle and/or its affiliates.
Copyright (c) 2023 Oracle and/or its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@ ifndef::rootdir[:rootdir: {docdir}/..]
:basic-table-intro: The table below lists the configuration keys that configure io.helidon.reactive.webserver.SocketConfiguration
include::{rootdir}/includes/attributes.adoc[]
= SocketConfiguration (webserver) Configuration
= SocketConfiguration (reactive.webserver) Configuration
// tag::config[]
Expand Down Expand Up @@ -55,8 +55,22 @@ Type: link:{javadoc-base-url}/io.helidon.reactive.webserver/io/helidon/reactive/
socket.
Default value is #DEFAULT_BACKLOG_SIZE.
|`backpressure-buffer-size` |long |{nbsp} |Maximum length of the response data sending buffer can keep without flushing.
Depends on `backpressure-policy` what happens if max buffer size is reached.
|`backpressure-strategy` |BackpressureStrategy (LINEAR, AUTO_FLUSH, PREFETCH, UNBOUNDED) |`LINEAR` |Sets a backpressure strategy for the server to apply against user provided response upstream.
- LINEAR - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
- AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
- PREFETCH - After first data chunk arrives, probable number of chunks needed to fill the buffer up to watermark is calculated and requested.
- NONE - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
|[.line-through]#`bind-address`# |string |{nbsp} |*Deprecated* Configures local address where the server listens on with the server socket.
If not configured, then listens an all local addresses.
|`continue-immediately` |boolean |`false` |When true WebServer answers to expect continue with 100 continue immediately,
not waiting for user to actually request the data.
Default is `false`
|`enable-compression` |boolean |`false` |Enable negotiation for gzip/deflate content encodings. Clients can
request compression using the "Accept-Encoding" header.
Expand All @@ -66,7 +80,7 @@ Type: link:{javadoc-base-url}/io.helidon.reactive.webserver/io/helidon/reactive/
io.helidon.common.http.Http.Status#BAD_REQUEST_400
is returned.
Default is `8192`
Default is `16384`
|`max-initial-line-length` |int |`4096` |Maximal number of characters in the initial HTTP line.
Default is `4096`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
///////////////////////////////////////////////////////////////////////////////

Copyright (c) 2022 Oracle and/or its affiliates.
Copyright (c) 2023 Oracle and/or its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@ ifndef::rootdir[:rootdir: {docdir}/..]
:basic-table-intro: The table below lists the configuration keys that configure io.helidon.reactive.webserver.SocketConfiguration.SocketConfigurationBuilder
include::{rootdir}/includes/attributes.adoc[]
= SocketConfigurationBuilder (webserver.SocketConfiguration) Configuration
= SocketConfigurationBuilder (reactive.webserver.SocketConfiguration) Configuration
// tag::config[]
Expand All @@ -46,8 +46,22 @@ Type: link:{javadoc-base-url}/io.helidon.reactive.webserver.SocketConfiguration/
socket.
Default value is #DEFAULT_BACKLOG_SIZE.
|`backpressure-buffer-size` |long |{nbsp} |Maximum length of the response data sending buffer can keep without flushing.
Depends on `backpressure-policy` what happens if max buffer size is reached.
|`backpressure-strategy` |BackpressureStrategy (LINEAR, AUTO_FLUSH, PREFETCH, UNBOUNDED) |`LINEAR` |Sets a backpressure strategy for the server to apply against user provided response upstream.
- LINEAR - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
- AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
- PREFETCH - After first data chunk arrives, probable number of chunks needed to fill the buffer up to watermark is calculated and requested.
- NONE - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
|[.line-through]#`bind-address`# |string |{nbsp} |*Deprecated* Configures local address where the server listens on with the server socket.
If not configured, then listens an all local addresses.
|`continue-immediately` |boolean |`false` |When true WebServer answers to expect continue with 100 continue immediately,
not waiting for user to actually request the data.
Default is `false`
|`enable-compression` |boolean |`false` |Enable negotiation for gzip/deflate content encodings. Clients can
request compression using the "Accept-Encoding" header.
Expand All @@ -57,7 +71,7 @@ Type: link:{javadoc-base-url}/io.helidon.reactive.webserver.SocketConfiguration/
io.helidon.common.http.Http.Status#BAD_REQUEST_400
is returned.
Default is `8192`
Default is `16384`
|`max-initial-line-length` |int |`4096` |Maximal number of characters in the initial HTTP line.
Default is `4096`
Expand Down
38 changes: 18 additions & 20 deletions docs/config/io_helidon_reactive_webserver_WebServer.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
///////////////////////////////////////////////////////////////////////////////

Copyright (c) 2022 Oracle and/or its affiliates.
Copyright (c) 2023 Oracle and/or its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@ ifndef::rootdir[:rootdir: {docdir}/..]
:basic-table-intro: The table below lists the configuration keys that configure io.helidon.reactive.webserver.WebServer
include::{rootdir}/includes/attributes.adoc[]
= WebServer (webserver) Configuration
= WebServer (reactive.webserver) Configuration
// tag::config[]
Expand Down Expand Up @@ -50,8 +50,22 @@ This is a standalone configuration type, prefix from configuration root: `server
socket.
Default value is #DEFAULT_BACKLOG_SIZE.
|`backpressure-buffer-size` |long |{nbsp} |Maximum length of the response data sending buffer can keep without flushing.
Depends on `backpressure-policy` what happens if max buffer size is reached.
|`backpressure-strategy` |BackpressureStrategy (LINEAR, AUTO_FLUSH, PREFETCH, UNBOUNDED) |`LINEAR` |Sets a backpressure strategy for the server to apply against user provided response upstream.
- LINEAR - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
- AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
- PREFETCH - After first data chunk arrives, probable number of chunks needed to fill the buffer up to watermark is calculated and requested.
- NONE - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
|[.line-through]#`bind-address`# |string |{nbsp} |*Deprecated* Configures local address where the server listens on with the server socket.
If not configured, then listens an all local addresses.
|`continue-immediately` |boolean |`false` |When true WebServer answers to expect continue with 100 continue immediately,
not waiting for user to actually request the data.
Default is `false`
|`enable-compression` |boolean |`false` |Enable negotiation for gzip/deflate content encodings. Clients can
request compression using the "Accept-Encoding" header.
Expand All @@ -62,7 +76,7 @@ This is a standalone configuration type, prefix from configuration root: `server
io.helidon.common.http.Http.Status#BAD_REQUEST_400
is returned.
Default is `8192`
Default is `16384`
|`max-initial-line-length` |int |`4096` |Maximal number of characters in the initial HTTP line.
Default is `4096`
Expand All @@ -71,22 +85,6 @@ This is a standalone configuration type, prefix from configuration root: `server
|`max-upgrade-content-length` |int |`65536` |Set a maximum length of the content of an upgrade request.
Default is `64*1024`
|`backpressure-buffer-size` |long |`5242880` |Set a maximum length of the unflushed response data sending buffer can keep without applying backpressure.
Depends on `backpressure-policy` what happens if max buffer size is reached.
Default is `5*1024*1024` - 5Mb
|`backpressure-policy` | String | `LINEAR` |Sets the strategy for applying backpressure to the reactive stream
of response data.
* LINEAR - Data chunks are requested one-by-one after previous data chunk has been written to Netty's buffer, when
`backpressure-buffer-size` watermark is reached, new chunks are not requested until buffer size decrease under
the watermark value.
* PREFETCH - After first data chunk arrives, expected number of chunks needed to fill the buffer up
to watermark is calculated and requested.
* AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested and extra flush is initiated.
* UNBOUNDED - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
Default is `LINEAR`
|`port` |int |`0` |Configures a server port to listen on with the server socket. If port is
`0` then any available ephemeral port will be used.
|`receive-buffer-size` |int |{nbsp} |Configures proposed value of the TCP receive window that is advertised to the remote peer on the
Expand All @@ -97,7 +95,7 @@ to watermark is calculated and requested.
on multiple ports.
An additional named server socket may have a dedicated Routing configured
through io.helidon.reactive.webserver.WebServer.Builder#addNamedRouting(String, Routing).
through WebServer.Builder#addNamedRouting(String, Routing).
|`timeout-millis` |long |`0` |Socket timeout in milliseconds
|`tls` |xref:{rootdir}/config/io_helidon_reactive_webserver_WebServerTls.adoc[WebServerTls] |{nbsp} |Configures SSL for this socket. When configured, the server enforces SSL
configuration.
Expand Down
4 changes: 2 additions & 2 deletions docs/config/io_helidon_reactive_webserver_WebServerTls.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
///////////////////////////////////////////////////////////////////////////////

Copyright (c) 2022 Oracle and/or its affiliates.
Copyright (c) 2023 Oracle and/or its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@ ifndef::rootdir[:rootdir: {docdir}/..]
:basic-table-intro: The table below lists the configuration keys that configure io.helidon.reactive.webserver.WebServerTls
include::{rootdir}/includes/attributes.adoc[]
= WebServerTls (webserver) Configuration
= WebServerTls (reactive.webserver) Configuration
// tag::config[]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
* Copyright (c) 2017, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,7 @@ class BareResponseImpl implements BareResponse {
private final AtomicBoolean internallyClosed = new AtomicBoolean(false);
private final CompletableFuture<BareResponse> responseFuture;
private final CompletableFuture<BareResponse> headersFuture;
private final CompletableFuture<Boolean> entityRequested;
private final RequestContext requestContext;
private final long requestId;
private final String http2StreamId;
Expand Down Expand Up @@ -98,21 +99,22 @@ class BareResponseImpl implements BareResponse {
* @param requestId the correlation ID that is added to the log statements
*/
BareResponseImpl(ChannelHandlerContext ctx,
CompletableFuture<Boolean> entityRequested,
HttpRequest request,
RequestContext requestContext,
CompletableFuture<?> prevRequestChunk,
CompletableFuture<ChannelFutureListener> requestEntityAnalyzed,
long backpressureBufferSize,
BackpressureStrategy backpressureStrategy,
SocketConfiguration soConfig,
long requestId) {
this.entityRequested = entityRequested;
this.requestContext = requestContext;
this.originalEntityAnalyzed = requestEntityAnalyzed;
this.requestEntityAnalyzed = requestEntityAnalyzed;
this.backpressureStrategy = backpressureStrategy;
this.backpressureBufferSize = backpressureBufferSize;
this.backpressureStrategy = soConfig.backpressureStrategy();
this.backpressureBufferSize = soConfig.backpressureBufferSize();
this.responseFuture = new CompletableFuture<>();
this.headersFuture = new CompletableFuture<>();
this.channel = new NettyChannel(ctx.channel());
this.channel = new NettyChannel(ctx);
this.requestId = requestId;
this.keepAlive = HttpUtil.isKeepAlive(request);
this.requestHeaders = request.headers();
Expand Down Expand Up @@ -168,6 +170,14 @@ public void writeStatusAndHeaders(Http.Status status, Map<String, List<String>>
throw new IllegalStateException("Status and headers were already sent");
}

if (!requestContext.socketConfiguration().continueImmediately()
&& HttpUtil.is100ContinueExpected(requestContext.request())
&& !requestContext.isDataRequested()) {
channel.expectationFailed();
entityRequested.complete(false);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE_ON_FAILURE);
}

HttpResponseStatus nettyStatus;

if (status.reasonPhrase() == null) {
Expand Down
Loading

0 comments on commit 32a5381

Please sign in to comment.