Skip to content

Commit

Permalink
Merge pull request #31192 from Sgitario/13969
Browse files Browse the repository at this point in the history
Support HTTP/2 connections in REST Client Reactive
  • Loading branch information
gsmet authored Mar 2, 2023
2 parents a74aa01 + a9a6c97 commit cd3ad1f
Show file tree
Hide file tree
Showing 26 changed files with 1,439 additions and 3,810 deletions.
21 changes: 21 additions & 0 deletions docs/src/main/asciidoc/rest-client-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,27 @@ quarkus.rest-client.extensions-api.verify-host=false
This setting should not be used in production as it will disable the SSL hostname verification.
====

=== HTTP/2 Support

HTTP/2 is disabled by default in REST Client. If you want to enable it, you can set:

[source, properties]
----
// for all REST Clients:
quarkus.rest-client.http2=true
// or for a single REST Client:
quarkus.rest-client.extensions-api.http2=true
----

Alternatively, you can enable the Application-Layer Protocol Negotiation (alpn) TLS extension and the client will negotiate which HTTP version to use over the ones compatible by the server. By default, it will try to use HTTP/2 first and if it's not enabled, it will use HTTP/1.1. If you want to enable it, you can set:

[source, properties]
----
quarkus.rest-client.alpn=true
// or for a single REST Client:
quarkus.rest-client.extensions-api.alpn=true
----

== Create the JAX-RS resource

Create the `src/main/java/org/acme/rest/client/ExtensionsResource.java` file with the following content:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class RestClientConfig {
EMPTY.shared = Optional.empty();
EMPTY.name = Optional.empty();
EMPTY.userAgent = Optional.empty();
EMPTY.http2 = Optional.empty();
EMPTY.alpn = Optional.empty();
}

/**
Expand Down Expand Up @@ -245,6 +247,21 @@ public class RestClientConfig {
@ConfigItem
public Optional<String> userAgent;

/**
* If this is true then HTTP/2 will be enabled.
*/
@ConfigItem
public Optional<Boolean> http2;

/**
* If the Application-Layer Protocol Negotiation is enabled, the client will negotiate which protocol to use over the
* protocols exposed by the server. By default, it will try to use HTTP/2 first and if it's not enabled, it will
* use HTTP/1.1.
* When the property `http2` is enabled, this flag will be automatically enabled.
*/
@ConfigItem
public Optional<Boolean> alpn;

public static RestClientConfig load(String configKey) {
final RestClientConfig instance = new RestClientConfig();

Expand Down Expand Up @@ -276,6 +293,7 @@ public static RestClientConfig load(String configKey) {
instance.shared = getConfigValue(configKey, "shared", Boolean.class);
instance.name = getConfigValue(configKey, "name", String.class);
instance.userAgent = getConfigValue(configKey, "user-agent", String.class);
instance.http2 = getConfigValue(configKey, "http2", Boolean.class);

return instance;
}
Expand Down Expand Up @@ -311,6 +329,8 @@ public static RestClientConfig load(Class<?> interfaceClass) {
instance.shared = getConfigValue(interfaceClass, "shared", Boolean.class);
instance.name = getConfigValue(interfaceClass, "name", String.class);
instance.userAgent = getConfigValue(interfaceClass, "user-agent", String.class);
instance.http2 = getConfigValue(interfaceClass, "http2", Boolean.class);
instance.alpn = getConfigValue(interfaceClass, "alpn", Boolean.class);

return instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ public class RestClientsConfig {
@ConfigItem
public Optional<String> keyStoreType;

/**
* If this is true then HTTP/2 will be enabled.
*/
@ConfigItem(defaultValue = "false")
public boolean http2;

/**
* If the Application-Layer Protocol Negotiation is enabled, the client will negotiate which protocol to use over the
* protocols exposed by the server. By default, it will try to use HTTP/2 first and if it's not enabled, it will
* use HTTP/1.1.
* When the property `http2` is enabled, this flag will be automatically enabled.
*/
@ConfigItem
public Optional<Boolean> alpn;

public RestClientConfig getClientConfig(String configKey) {
if (configKey == null) {
return RestClientConfig.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ public <T> T build(Class<T> aClass) throws IllegalStateException, RestClientDefi
clientBuilder.setUserAgent(restClientsConfig.userAgent.get());
}

if (getConfiguration().hasProperty(QuarkusRestClientProperties.HTTP2)) {
clientBuilder.http2((Boolean) getConfiguration().getProperty(QuarkusRestClientProperties.HTTP2));
}

if (proxyHost != null) {
configureProxy(proxyHost, proxyPort, proxyUser, proxyPassword, nonProxyHosts);
} else if (restClientsConfig.proxyAddress.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ private void configureCustomProperties(RestClientBuilder builder) {
if (userAgent.isPresent()) {
builder.property(QuarkusRestClientProperties.USER_AGENT, userAgent.get());
}

Boolean http2 = oneOf(clientConfigByClassName().http2,
clientConfigByConfigKey().http2).orElse(configRoot.http2);
builder.property(QuarkusRestClientProperties.HTTP2, http2);

Optional<Boolean> alpn = oneOf(clientConfigByClassName().alpn,
clientConfigByConfigKey().alpn, configRoot.alpn);
if (alpn.isPresent()) {
builder.property(QuarkusRestClientProperties.ALPN, alpn.get());
}
}

private void configureProxy(RestClientBuilderImpl builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,14 @@ public class QuarkusRestClientProperties {

public static final String USER_AGENT = "io.quarkus.rest.client.user-agent";

/**
* Set to true to explicitly use the HTTP/2 version.
*/
public static final String HTTP2 = "io.quarkus.rest.client.http2";

/**
* Set to true to explicitly use the Application-Layer Protocol Negotiation extension.
*/
public static final String ALPN = "io.quarkus.rest.client.alpn";

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.Pump;
Expand Down Expand Up @@ -114,6 +115,9 @@ public void handle(Void ignored) {
future.subscribe().with(new Consumer<>() {
@Override
public void accept(HttpClientRequest httpClientRequest) {
// adapt headers to HTTP/2 depending on the underlying HTTP connection
ClientSendRequestHandler.this.adaptRequest(httpClientRequest);

if (requestContext.isMultipart()) {
Promise<HttpClientRequest> requestPromise = Promise.promise();
QuarkusMultipartFormUpload actualEntity;
Expand Down Expand Up @@ -458,6 +462,7 @@ private QuarkusMultipartFormUpload setMultipartHeadersAndPrepareBody(HttpClientR
throw new IllegalArgumentException(
"Multipart form upload expects an entity of type MultipartForm, got: " + state.getEntity().getEntity());
}

MultivaluedMap<String, String> headerMap = state.getRequestHeaders().asMap();
updateRequestHeadersFromConfig(state, headerMap);
QuarkusMultipartForm multipartForm = (QuarkusMultipartForm) state.getEntity().getEntity();
Expand All @@ -470,6 +475,7 @@ private QuarkusMultipartFormUpload setMultipartHeadersAndPrepareBody(HttpClientR
}
QuarkusMultipartFormUpload multipartFormUpload = new QuarkusMultipartFormUpload(Vertx.currentContext(), multipartForm,
true, mode);
httpClientRequest.setChunked(multipartFormUpload.isChunked());
setEntityRelatedHeaders(headerMap, state.getEntity());

// multipart has its own headers:
Expand Down Expand Up @@ -507,6 +513,25 @@ private Buffer setRequestHeadersAndPrepareBody(HttpClientRequest httpClientReque
return actualEntity;
}

private void adaptRequest(HttpClientRequest request) {
if (request.version() == HttpVersion.HTTP_2) {
// When using the protocol HTTP/2, Netty which is internally used by Vert.x will validate the headers and reject
// the requests with invalid metadata.
// When we start a new connection, the Vert.x client will automatically upgrade the first request we make to be
// valid in HTTP/2.
// The problem is that in next requests, the Vert.x client reuses the same connection within the same window time
// and hence does not upgrade the following requests. Therefore, even though the first request works fine, the
// next requests won't work.
// This has been reported in https://github.com/eclipse-vertx/vert.x/issues/4618.
// To workaround this issue, we need to "upgrade" the next requests by ourselves when the version is already set
// to HTTP/2:
if (request.path() == null || request.path().length() == 0) {
// HTTP/2 does not allow empty paths
request.setURI(request.getURI() + "/");
}
}
}

private void updateRequestHeadersFromConfig(RestClientRequestContext state, MultivaluedMap<String, String> headerMap) {
Object staticHeaders = state.getConfiguration().getProperty(QuarkusRestClientProperties.STATIC_HEADERS);
if (staticHeaders instanceof Map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.ProxyOptions;

Expand All @@ -54,6 +55,8 @@ public class ClientBuilderImpl extends ClientBuilder {
private SSLContext sslContext;
private KeyStore trustStore;
private char[] trustStorePassword;
private boolean http2;
private boolean alpn;

private String proxyHost;
private int proxyPort;
Expand Down Expand Up @@ -134,6 +137,16 @@ public ClientBuilder readTimeout(long timeout, TimeUnit unit) {
return this;
}

public ClientBuilder http2(boolean http2) {
this.http2 = http2;
return this;
}

public ClientBuilder alpn(boolean alpn) {
this.alpn = alpn;
return this;
}

public ClientBuilder proxy(String proxyHost, int proxyPort) {
this.proxyPort = proxyPort;
this.proxyHost = proxyHost;
Expand Down Expand Up @@ -182,6 +195,14 @@ public ClientImpl build() {

HttpClientOptions options = Optional.ofNullable(configuration.getFromContext(HttpClientOptions.class))
.orElseGet(HttpClientOptions::new);
if (http2) {
options.setProtocolVersion(HttpVersion.HTTP_2);
}

if (http2 || alpn) {
options.setUseAlpn(true);
options.setAlpnVersions(List.of(HttpVersion.HTTP_2, HttpVersion.HTTP_1_1));
}

options.setVerifyHost(verifyHost);
if (trustAll) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public Vertx get() {
Object connectionTTL = configuration.getProperty(CONNECTION_TTL);
if (connectionTTL != null) {
options.setKeepAliveTimeout((int) connectionTTL);
options.setHttp2KeepAliveTimeout((int) connectionTTL);
}

Object connectionPoolSize = configuration.getProperty(CONNECTION_POOL_SIZE);
Expand All @@ -156,6 +157,7 @@ public Vertx get() {
log.debugf("Setting connectionPoolSize to %d", connectionPoolSize);
}
options.setMaxPoolSize((int) connectionPoolSize);
options.setHttp2MaxPoolSize((int) connectionPoolSize);

Object keepAliveEnabled = configuration.getProperty(KEEP_ALIVE_ENABLED);
if (keepAliveEnabled != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ restClientRequestContext.properties, restClientRequestContext, getStringHeaders(
throw new ProcessingException(e);
}
}

public String getHttpVersion() {
return restClientRequestContext.getVertxClientResponse().version().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ public void setContentTransferEncoding(String contentTransferEncoding) {
this.contentTransferEncoding = contentTransferEncoding;
}

@Override
public long length() {
return buffer.readableBytes() + super.length();
}

@Override
public String toString() {
return HttpHeaderNames.CONTENT_DISPOSITION + ": " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.multipart.Attribute;
Expand Down Expand Up @@ -855,7 +854,6 @@ public HttpRequest finalizeRequest() throws ErrorDataEncoderException {
}
}
}
HttpUtil.setTransferEncodingChunked(request, true);

// wrap to hide the possible content
return new WrappedHttpRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.File;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
Expand All @@ -11,6 +10,7 @@
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.MemoryFileUpload;
Expand Down Expand Up @@ -148,25 +148,24 @@ public void run() {
throw new IllegalArgumentException("Wrong Vert.x context used for multipart upload. Expected: " + context +
", actual: " + Vertx.currentContext());
}
AtomicInteger counter = new AtomicInteger();
while (!ended) {
if (encoder.isChunked()) {
try {
HttpContent chunk = encoder.readChunk(ALLOC);
if (chunk == PausableHttpPostRequestEncoder.WAIT_MARKER) {
return; // resumption will be scheduled by encoder
}
ByteBuf content = chunk.content();
Buffer buff = Buffer.buffer(content);
counter.incrementAndGet();
boolean writable = pending.write(buff);
if (encoder.isEndOfInput()) {
} else if (chunk == LastHttpContent.EMPTY_LAST_CONTENT || encoder.isEndOfInput()) {
ended = true;
request = null;
encoder = null;
pending.write(InboundBuffer.END_SENTINEL);
} else if (!writable) {
break;
} else {
ByteBuf content = chunk.content();
Buffer buff = Buffer.buffer(content);
boolean writable = pending.write(buff);
if (!writable) {
break;
}
}
} catch (Exception e) {
handleError(e);
Expand All @@ -184,6 +183,10 @@ public void run() {
}
}

public boolean isChunked() {
return encoder.isChunked();
}

private void handleError(Throwable e) {
ended = true;
request = null;
Expand Down
1 change: 1 addition & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@
<module>rest-client</module>
<module>resteasy-reactive-kotlin</module>
<module>rest-client-reactive</module>
<module>rest-client-reactive-http2</module>
<module>rest-client-reactive-kotlin-serialization</module>
<module>rest-client-reactive-multipart</module>
<module>rest-client-reactive-stork</module>
Expand Down
Loading

0 comments on commit cd3ad1f

Please sign in to comment.