Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#8077 Client keep alive fix #8101

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void close() throws IOException {
*/
void idle() {
if (idlingThread != null) {
throw new IllegalStateException("Already in idle mode!");
return;
}
idlingThread = executor.get().submit(this::handle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static HeaderName readHeaderName(DataReader reader,
if (col == maxLength) {
throw new IllegalStateException("Header size exceeded");
} else if (col < 0) {
throw new IllegalArgumentException("Invalid header, missing colon: " + reader.debugDataHex());
throw new IllegalArgumentException("Invalid header, missing colon:\n" + reader.debugDataHex());
}

String headerName = reader.readAsciiString(col);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.helidon.webclient.api;

import java.time.Duration;

import io.helidon.common.tls.Tls;
import io.helidon.webclient.spi.DnsResolver;

Expand All @@ -25,6 +27,7 @@
* @param scheme uri address scheme
* @param host uri address host
* @param port uri address port
* @param readTimeout SO read timeout
* @param tls TLS to be used in connection
* @param dnsResolver DNS resolver to be used
* @param dnsAddressLookup DNS address lookup strategy
Expand All @@ -33,6 +36,7 @@
public record ConnectionKey(String scheme,
String host,
int port,
Duration readTimeout,
Tls tls,
DnsResolver dnsResolver,
DnsAddressLookup dnsAddressLookup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.helidon.webclient.api;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -135,6 +136,7 @@ private ClientRequest<?> discoverHttpImplementation() {
ConnectionKey connectionKey = new ConnectionKey(resolvedUri.scheme(),
resolvedUri.host(),
resolvedUri.port(),
clientConfig().readTimeout().orElse(Duration.ZERO),
tls(),
clientConfig().dnsResolver(),
clientConfig().dnsAddressLookup(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.Socket;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
Expand Down Expand Up @@ -457,6 +458,8 @@ private static Socket connectToProxy(WebClient webClient,
new ConnectionKey("http",
proxyAddress.getHostName(),
proxyAddress.getPort(),
clientConfig.readTimeout()
.orElse(Duration.ZERO),
NO_TLS,
clientConfig.dnsResolver(),
clientConfig.dnsAddressLookup(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ static WebClientServiceResponse createServiceResponse(HttpClientConfig clientCon
@Override
public WebClientServiceResponse proceed(WebClientServiceRequest serviceRequest) {
// either use the explicit connection, or obtain one (keep alive or one-off)
effectiveConnection = connection == null ? obtainConnection(serviceRequest) : connection;
effectiveConnection = connection == null ? obtainConnection(serviceRequest, timeout) : connection;
effectiveConnection.readTimeout(this.timeout);

DataWriter writer = effectiveConnection.writer();
Expand All @@ -157,7 +157,7 @@ void prologue(BufferData nonEntityData, WebClientServiceRequest request, ClientU
// When CONNECT, the first line contains the remote host:port, in the same way as the HOST header.
nonEntityData.writeAscii(request.method().text()
+ " "
+ request.headers().get(HeaderNames.HOST).value()
+ request.headers().get(HeaderNames.HOST).get()
+ " HTTP/1.1\r\n");
} else {
// When proxy is set, ensure that the request uses absolute URI because of Section 5.1.2 Request-URI in
Expand Down Expand Up @@ -286,9 +286,10 @@ private static boolean mayHaveEntity(Status responseStatus, ClientResponseHeader
return true;
}

private ClientConnection obtainConnection(WebClientServiceRequest request) {
private ClientConnection obtainConnection(WebClientServiceRequest request, Duration requestReadTimeout) {
return http1Client.connectionCache()
.connection(http1Client,
requestReadTimeout,
tls,
proxy,
request.uri(),
Expand Down Expand Up @@ -525,7 +526,7 @@ private void ensureBuffer() {
BufferData chunk = reader.readBuffer(length);

if (LOGGER.isLoggable(TRACE)) {
helidonSocket.log(LOGGER, TRACE, "client read chunk %s", chunk.debugDataHex(true));
helidonSocket.log(LOGGER, TRACE, "client read chunk\n%s", chunk.debugDataHex(true));
}

reader.skip(2); // trailing CRLF after each chunk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ WebClientServiceResponse doProceed(ClientConnection connection,
throw new IllegalStateException("Output stream was not closed in handler");
}

reader = cos.reader;
connection = cos.connection;

Status responseStatus;
try {
responseStatus = Http1StatusParser.readStatus(reader, http1Client.protocolConfig().maxStatusLineLength());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ private Http1ClientResponseImpl invokeWithServices(Http1CallChainBase callChain,
});

return new Http1ClientResponseImpl(clientConfig(),
http1Client().protocolConfig(),
serviceResponse.status(),
serviceResponse.serviceRequest().headers(),
serviceResponse.headers(),
callChain.connection(),
serviceResponse.inputStream().orElse(null),
mediaContext(),
clientConfig().mediaTypeParserMode(),
resolvedUri,
complete);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@
package io.helidon.webclient.http1;

import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.OptionalLong;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import io.helidon.common.GenericType;
import io.helidon.common.HelidonServiceLoader;
import io.helidon.common.LazyValue;
import io.helidon.common.buffers.BufferData;
import io.helidon.common.media.type.ParserMode;
import io.helidon.http.ClientRequestHeaders;
import io.helidon.http.ClientResponseHeaders;
import io.helidon.http.ClientResponseTrailers;
import io.helidon.http.HeaderNames;
import io.helidon.http.HeaderValues;
import io.helidon.http.Headers;
import io.helidon.http.Http1HeadersParser;
import io.helidon.http.Status;
import io.helidon.http.media.MediaContext;
Expand All @@ -57,6 +56,7 @@ class Http1ClientResponseImpl implements Http1ClientResponse {
private final AtomicBoolean closed = new AtomicBoolean();

private final HttpClientConfig clientConfig;
private final Http1ClientProtocolConfig protocolConfig;
private final Status responseStatus;
private final ClientRequestHeaders requestHeaders;
private final ClientResponseHeaders responseHeaders;
Expand All @@ -70,36 +70,45 @@ class Http1ClientResponseImpl implements Http1ClientResponse {
private final ClientUri lastEndpointUri;

private final ClientConnection connection;
private final CompletableFuture<io.helidon.http.Headers> trailers = new CompletableFuture<>();
private final LazyValue<Headers> trailers;
private boolean entityRequested;
private long entityLength;
private boolean entityFullyRead = false;

Http1ClientResponseImpl(HttpClientConfig clientConfig,
Http1ClientProtocolConfig protocolConfig,
Status responseStatus,
ClientRequestHeaders requestHeaders,
ClientResponseHeaders responseHeaders,
ClientConnection connection,
InputStream inputStream, // can be null if no entity
MediaContext mediaContext,
ParserMode parserMode,
ClientUri lastEndpointUri,
CompletableFuture<Void> whenComplete) {
this.clientConfig = clientConfig;
this.protocolConfig = protocolConfig;
this.responseStatus = responseStatus;
this.requestHeaders = requestHeaders;
this.responseHeaders = responseHeaders;
this.connection = connection;
this.inputStream = inputStream;
this.mediaContext = mediaContext;
this.parserMode = parserMode;
this.parserMode = clientConfig.mediaTypeParserMode();
this.lastEndpointUri = lastEndpointUri;
this.whenComplete = whenComplete;

if (responseHeaders.contains(HeaderNames.CONTENT_LENGTH)) {
this.entityLength = Long.parseLong(responseHeaders.get(HeaderNames.CONTENT_LENGTH).value());
this.trailers = LazyValue.create(() -> Http1HeadersParser.readHeaders(
connection.reader(),
protocolConfig.maxHeaderSize(),
protocolConfig.validateResponseHeaders()
));

OptionalLong contentLength = responseHeaders.contentLength();
if (contentLength.isPresent()) {
this.entityLength = contentLength.getAsLong();
} else if (responseHeaders.contains(HeaderValues.TRANSFER_ENCODING_CHUNKED)) {
this.entityLength = ENTITY_LENGTH_CHUNKED;
}

if (responseHeaders.contains(HeaderNames.TRAILER)) {
this.hasTrailers = true;
this.trailerNames = responseHeaders.get(HeaderNames.TRAILER).allValues(true);
Expand All @@ -122,27 +131,10 @@ public ClientResponseHeaders headers() {
@Override
public ClientResponseTrailers trailers() {
if (hasTrailers) {
// Block until trailers arrive
Duration timeout = clientConfig.readTimeout()
.orElseGet(() -> clientConfig.socketOptions().readTimeout());

if (!this.entityRequested) {
throw new IllegalStateException("Trailers requested before reading entity.");
}

try {
return ClientResponseTrailers.create(this.trailers.get(timeout.toMillis(), TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
throw new IllegalStateException("Timeout " + timeout + " reached while waiting for trailers.", e);
} catch (InterruptedException e) {
throw new IllegalStateException("Interrupted while waiting for trailers.", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalStateException ise) {
throw ise;
} else {
throw new IllegalStateException(e.getCause());
}
}
return ClientResponseTrailers.create(this.trailers.get());
} else {
return ClientResponseTrailers.create();
}
Expand All @@ -161,15 +153,8 @@ public void close() {
if (headers().contains(HeaderValues.CONNECTION_CLOSE)) {
connection.closeResource();
} else {
if (entityLength == 0) {
if (entityFullyRead || entityLength == 0) {
connection.releaseResource();
} else if (entityLength == ENTITY_LENGTH_CHUNKED) {
if (hasTrailers) {
readTrailers();
connection.releaseResource();
} else {
connection.closeResource();
}
} else {
connection.closeResource();
}
Expand Down Expand Up @@ -208,15 +193,16 @@ private ReadableEntity entity(ClientRequestHeaders requestHeaders,
}
return ClientResponseEntity.create(
this::readBytes,
this::close,
this::entityFullyRead,
requestHeaders,
responseHeaders,
mediaContext
);
}

private void readTrailers() {
this.trailers.complete(Http1HeadersParser.readHeaders(connection.reader(), 1024, true));
private void entityFullyRead() {
this.entityFullyRead = true;
this.close();
}

private BufferData readBytes(int estimate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ static Http1ConnectionCache create() {
}

ClientConnection connection(Http1ClientImpl http1Client,
Duration requestReadTimeout,
Tls tls,
Proxy proxy,
ClientUri uri,
Expand All @@ -73,7 +74,7 @@ ClientConnection connection(Http1ClientImpl http1Client,
boolean keepAlive = handleKeepAlive(defaultKeepAlive, headers);
Tls effectiveTls = HTTPS.equals(uri.scheme()) ? tls : NO_TLS;
if (keepAlive) {
return keepAliveConnection(http1Client, effectiveTls, uri, proxy);
return keepAliveConnection(http1Client, requestReadTimeout, effectiveTls, uri, proxy);
} else {
return oneOffConnection(http1Client, effectiveTls, uri, proxy);
}
Expand Down Expand Up @@ -105,6 +106,7 @@ private boolean handleKeepAlive(boolean defaultKeepAlive, WritableHeaders<?> hea
}

private ClientConnection keepAliveConnection(Http1ClientImpl http1Client,
Duration requestReadTimeout,
Tls tls,
ClientUri uri,
Proxy proxy) {
Expand All @@ -118,6 +120,7 @@ private ClientConnection keepAliveConnection(Http1ClientImpl http1Client,
ConnectionKey connectionKey = new ConnectionKey(uri.scheme(),
uri.host(),
uri.port(),
clientConfig.readTimeout().orElse(requestReadTimeout),
tls,
clientConfig.dnsResolver(),
clientConfig.dnsAddressLookup(),
Expand Down Expand Up @@ -161,6 +164,7 @@ private ClientConnection oneOffConnection(Http1ClientImpl http1Client,
new ConnectionKey(uri.scheme(),
uri.host(),
uri.port(),
clientConfig.readTimeout().orElse(Duration.ZERO),
tls,
clientConfig.dnsResolver(),
clientConfig.dnsAddressLookup(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -266,6 +267,7 @@ private ConnectionKey connectionKey(WebClientServiceRequest serviceRequest) {
return new ConnectionKey(uri.scheme(),
uri.host(),
uri.port(),
clientConfig.readTimeout().orElse(Duration.ZERO),
"https".equals(uri.scheme()) ? clientRequest.tls() : NO_TLS,
clientConfig.dnsResolver(),
clientConfig.dnsAddressLookup(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.helidon.webclient.http2;

import java.time.Duration;

import io.helidon.common.uri.UriQueryWriteable;
import io.helidon.http.Method;
import io.helidon.webclient.api.ClientRequest;
Expand Down Expand Up @@ -67,6 +69,7 @@ public SupportLevel supports(FullClientRequest<?> clientRequest, ClientUri clien
ConnectionKey ck = new ConnectionKey(clientUri.scheme(),
clientUri.host(),
clientUri.port(),
clientConfig.readTimeout().orElse(Duration.ZERO),
clientRequest.tls(),
clientConfig.dnsResolver(),
clientConfig.dnsAddressLookup(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ void testConnectionQueueDequeue() {
connectionNow = http1Client
.connectionCache()
.connection(http1Client,
Duration.ZERO,
injectedHttp1client.prototype().tls(),
Proxy.noProxy(),
request.resolvedUri(),
Expand Down Expand Up @@ -271,6 +272,7 @@ void testConnectionQueueSizeLimit() {
connectionList.add(http1Client
.connectionCache()
.connection(http1Client,
Duration.ZERO,
clientConfig.tls(),
Proxy.noProxy(),
request.resolvedUri(),
Expand Down Expand Up @@ -298,6 +300,7 @@ void testConnectionQueueSizeLimit() {
connection = http1Client
.connectionCache()
.connection(http1Client,
Duration.ZERO,
clientConfig.tls(),
Proxy.noProxy(),
request.resolvedUri(),
Expand Down Expand Up @@ -326,6 +329,7 @@ void testConnectionQueueSizeLimit() {
ClientConnection connectionNow = http1Client
.connectionCache()
.connection(http1Client,
Duration.ZERO,
clientConfig.tls(),
Proxy.noProxy(),
request.resolvedUri(),
Expand Down
Loading