Skip to content

Commit

Permalink
Keep alive is now configurable per request (#7122)
Browse files Browse the repository at this point in the history
  • Loading branch information
Verdent authored Jun 29, 2023
1 parent 48862dc commit 552458e
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ public Http2ClientRequest property(String propertyName, String propertyValue) {
return this;
}

@Override
public Http2ClientRequest keepAlive(boolean keepAlive) {
//NOOP
return this;
}

@Override
public Http2ClientRequest priority(int priority) {
if (priority < 1 || priority > 256) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ void testConnectionQueueDequeue() {
connectionNow = ConnectionCache.connection(requestImpl.clientConfig(),
null,
requestImpl.uri(),
requestImpl.headers());
requestImpl.headers(),
requestImpl.keepAlive());
request.connection(connectionNow);
Http1ClientResponse response = request.request();
// connection will be queued up
Expand All @@ -242,9 +243,10 @@ void testConnectionQueueSizeLimit() {
Http1ClientRequest request = injectedHttp1client.put("/test");
ClientRequestImpl requestImpl = (ClientRequestImpl) request;
connectionList.add(ConnectionCache.connection(requestImpl.clientConfig(),
null,
requestImpl.uri(),
requestImpl.headers()));
null,
requestImpl.uri(),
requestImpl.headers(),
requestImpl.keepAlive()));
request.connection(connectionList.get(i));
responseList.add(request.request());
}
Expand All @@ -261,9 +263,10 @@ void testConnectionQueueSizeLimit() {
Http1ClientRequest request = injectedHttp1client.put("/test");
ClientRequestImpl requestImpl = (ClientRequestImpl) request;
connection = ConnectionCache.connection(requestImpl.clientConfig(),
null,
requestImpl.uri(),
requestImpl.headers());
null,
requestImpl.uri(),
requestImpl.headers(),
requestImpl.keepAlive());
request.connection(connection);
response = request.request();
if (i < connectionQueueSize) {
Expand All @@ -280,9 +283,10 @@ void testConnectionQueueSizeLimit() {
Http1ClientRequest request = injectedHttp1client.put("/test");
ClientRequestImpl requestImpl = (ClientRequestImpl) request;
ClientConnection connectionNow = ConnectionCache.connection(requestImpl.clientConfig(),
null,
requestImpl.uri(),
requestImpl.headers());
null,
requestImpl.uri(),
requestImpl.headers(),
requestImpl.keepAlive());
request.connection(connectionNow);
Http1ClientResponse responseNow = request.request();
// Verify that the connection was dequeued
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ default <T> T request(Class<T> type) {
*/
B property(String propertyName, String propertyValue);

/**
* Whether to use keep alive with this request.
*
* @param keepAlive use keep alive
* @return updated client request
*/
B keepAlive(boolean keepAlive);

/**
* Handle output stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class ClientRequestImpl implements Http1ClientRequest {
private ClientConnection connection;
private UriFragment fragment = UriFragment.empty();
private boolean skipUriEncoding = false;
private boolean keepAlive;

ClientRequestImpl(Http1ClientConfig clientConfig,
Http.Method method,
Expand All @@ -82,6 +83,7 @@ class ClientRequestImpl implements Http1ClientRequest {
this.maxRedirects = clientConfig.maxRedirects();
this.tls = clientConfig.tls().orElse(null);
this.query = query;
this.keepAlive = clientConfig.defaultKeepAlive();

this.requestId = "http1-client-" + COUNTER.getAndIncrement();
this.explicitHeaders = WritableHeaders.create(clientConfig.defaultHeaders());
Expand Down Expand Up @@ -199,7 +201,8 @@ public Http1ClientResponse outputStream(OutputStreamHandler streamHandler) {
rejectHeadWithEntity();
CompletableFuture<WebClientServiceRequest> whenSent = new CompletableFuture<>();
CompletableFuture<WebClientServiceResponse> whenComplete = new CompletableFuture<>();
WebClientService.Chain callChain = new HttpCallOutputStreamChain(clientConfig,
WebClientService.Chain callChain = new HttpCallOutputStreamChain(this,
clientConfig,
connection,
tls,
whenSent,
Expand Down Expand Up @@ -244,6 +247,12 @@ public Http1ClientRequest property(String propertyName, String propertyValue) {
return this;
}

@Override
public Http1ClientRequest keepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
return this;
}

Http1ClientConfig clientConfig() {
return clientConfig;
}
Expand All @@ -252,6 +261,10 @@ UriHelper uri() {
return uri;
}

boolean keepAlive() {
return keepAlive;
}

@Override
public ClientRequestHeaders headers() {
return ClientRequestHeaders.create(explicitHeaders);
Expand Down Expand Up @@ -306,7 +319,8 @@ private ClientResponseImpl invokeWithFollowRedirectsEntity(Object entity) {
private ClientResponseImpl invokeRequestWithEntity(Object entity) {
CompletableFuture<WebClientServiceRequest> whenSent = new CompletableFuture<>();
CompletableFuture<WebClientServiceResponse> whenComplete = new CompletableFuture<>();
WebClientService.Chain callChain = new HttpCallEntityChain(clientConfig,
WebClientService.Chain callChain = new HttpCallEntityChain(this,
clientConfig,
connection,
tls,
whenSent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ private ConnectionCache() {
static ClientConnection connection(Http1ClientConfig clientConfig,
Tls tls,
UriHelper uri,
ClientRequestHeaders headers) {
boolean keepAlive = handleKeepAlive(clientConfig.defaultKeepAlive(), headers);
ClientRequestHeaders headers,
boolean defaultKeepAlive) {
boolean keepAlive = handleKeepAlive(defaultKeepAlive, headers);
Tls effectiveTls = HTTPS.equals(uri.scheme()) ? tls : null;
if (keepAlive) {
return keepAliveConnection(clientConfig, effectiveTls, uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ abstract class HttpCallChainBase implements WebClientService.Chain {
private final Http1ClientConfig clientConfig;
private final ClientConnection connection;
private final Tls tls;
private final boolean keepAlive;

HttpCallChainBase(Http1ClientConfig clientConfig,
ClientConnection connection,
Tls tls) {
Tls tls,
boolean keepAlive) {
this.clientConfig = clientConfig;
this.connection = connection;
this.tls = tls;
this.keepAlive = keepAlive;
}

static void writeHeaders(Headers headers, BufferData bufferData, boolean validate) {
Expand Down Expand Up @@ -105,6 +108,7 @@ private ClientConnection obtainConnection(WebClientServiceRequest request) {
return ConnectionCache.connection(clientConfig,
tls,
request.uri(),
request.headers());
request.headers(),
keepAlive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ class HttpCallEntityChain extends HttpCallChainBase {
private final CompletableFuture<WebClientServiceResponse> whenComplete;
private final Object entity;

HttpCallEntityChain(Http1ClientConfig clientConfig,
HttpCallEntityChain(ClientRequestImpl request,
Http1ClientConfig clientConfig,
ClientConnection connection,
Tls tls,
CompletableFuture<WebClientServiceRequest> whenSent,
CompletableFuture<WebClientServiceResponse> whenComplete,
Object entity) {
super(clientConfig, connection, tls);
super(clientConfig, connection, tls, request.keepAlive());
this.clientConfig = clientConfig;
this.whenSent = whenSent;
this.whenComplete = whenComplete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ class HttpCallOutputStreamChain extends HttpCallChainBase {
private final CompletableFuture<WebClientServiceResponse> whenComplete;
private final ClientRequest.OutputStreamHandler osHandler;

HttpCallOutputStreamChain(Http1ClientConfig clientConfig,
HttpCallOutputStreamChain(ClientRequestImpl clientRequest,
Http1ClientConfig clientConfig,
ClientConnection connection,
Tls tls,
CompletableFuture<WebClientServiceRequest> whenSent,
CompletableFuture<WebClientServiceResponse> whenComplete,
ClientRequest.OutputStreamHandler osHandler) {
super(clientConfig, connection, tls);
super(clientConfig, connection, tls, clientRequest.keepAlive());
this.clientConfig = clientConfig;
this.whenSent = whenSent;
this.whenComplete = whenComplete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,10 @@ public FakeHttpClientRequest skipUriEncoding() {
public FakeHttpClientRequest property(String propertyName, String propertyValue) {
return null;
}

@Override
public FakeHttpClientRequest keepAlive(boolean keepAlive) {
return this;
}
}
}

0 comments on commit 552458e

Please sign in to comment.