Skip to content

Commit

Permalink
Use Vert.x based HTTP client for Apicurio Registry
Browse files Browse the repository at this point in the history
  • Loading branch information
Ladicek committed Jun 17, 2021
1 parent ea6c9b4 commit 29c4624
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 5 deletions.
7 changes: 6 additions & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
<log4j2-jboss-logmanager.version>1.0.0.Final</log4j2-jboss-logmanager.version>
<log4j-jboss-logmanager.version>1.2.0.Final</log4j-jboss-logmanager.version>
<avro.version>1.10.2</avro.version>
<apicurio-registry.version>2.0.0.Final</apicurio-registry.version>
<apicurio-registry.version>2.1.0-SNAPSHOT</apicurio-registry.version>
<jacoco.version>0.8.7</jacoco.version>
<testcontainers.version>1.15.3</testcontainers.version>
<docker-java.version>3.2.8</docker-java.version> <!-- must be the version Testcontainers use -->
Expand Down Expand Up @@ -2855,6 +2855,11 @@
<artifactId>agroal-pool</artifactId>
<version>${agroal.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client-common</artifactId>
<version>${apicurio-registry.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions extensions/apicurio-registry-avro/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apache-httpclient-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-deployment</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.vertx.deployment.VertxBuildItem;

public class ApicurioRegistryAvroProcessor {
@BuildStep
Expand Down Expand Up @@ -35,10 +38,11 @@ public void apicurioRegistryAvro(BuildProducer<ReflectiveClassBuildItem> reflect
"io.apicurio.registry.serde.Legacy4ByteIdHandler",
"io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider",
"io.apicurio.registry.serde.headers.DefaultHeadersHandler"));
}

// Apicurio Registry 2.x uses the JDK 11 HTTP client, which unconditionally requires SSL
// TODO when the new HTTP client SPI in Apicurio Registry client appears, this will no longer be needed
// (but we'll have to make sure that the Vert.x HTTP client is used)
sslNativeSupport.produce(new ExtensionSslNativeSupportBuildItem(Feature.APICURIO_REGISTRY_AVRO));
@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
public void apicurioRegistryClient(VertxBuildItem vertx, ApicurioRegistryClient client) {
client.setup(vertx.getVertx());
}
}
26 changes: 26 additions & 0 deletions extensions/apicurio-registry-avro/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,28 @@
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<!--
- when we remove our own implementation of the HTTP client SPI (see ApicurioRegistryClient),
- this dependency should become `apicurio-registry-client-vertx` (n.b. must update the BOM too!)
-->
<artifactId>apicurio-registry-client-common</artifactId>
</dependency>
<dependency>
<!--
- when we remove our own implementation of the HTTP client SPI (see ApicurioRegistryClient),
- this dependency should be removed
-->
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
</dependency>

<dependency>
Expand All @@ -32,6 +54,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apache-httpclient</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.apicurio.registry.avro;

import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.Vertx;

@Recorder
public class ApicurioRegistryClient {
public void setup(RuntimeValue<Vertx> vertx) {
// currently, we have our own copy of the RegistryHttpClient SPI
// implementation, because the implementation in Apicurio Registry
// is compiled against Vert.x 3.9 (because Apicurio Registry is based
// on Quarkus 1.13)
//
// when Apicurio Registry is updated to Quarkus 2 and hence Vert.x 4,
// we should remove our implementation of the SPI and just use the one
// from Apicurio Registry
RegistryClientFactory.setProvider(new VertxHttpClientProvider(vertx.getValue()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.quarkus.apicurio.registry.avro;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.apicurio.registry.rest.client.impl.ErrorHandler;
import io.apicurio.registry.utils.IoUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;

public class ResponseHandler<T> implements Handler<AsyncResult<HttpResponse<Buffer>>> {

final CompletableFuture<T> resultHolder;
final TypeReference<T> targetType;
private static final ObjectMapper mapper = new ObjectMapper();

public ResponseHandler(CompletableFuture<T> resultHolder, TypeReference<T> targetType) {
this.resultHolder = resultHolder;
this.targetType = targetType;
}

@SuppressWarnings("unchecked")
@Override
public void handle(AsyncResult<HttpResponse<Buffer>> event) {

try {
if (isFailure(event.result().statusCode())) {
resultHolder.completeExceptionally(ErrorHandler
.handleErrorResponse(IoUtil.toStream(event.result().body().getBytes()), event.result().statusCode()));
} else if (event.succeeded()) {
final HttpResponse<Buffer> result = event.result();
final String typeName = targetType.getType().getTypeName();
if (typeName.contains("InputStream")) {
resultHolder.complete((T) IoUtil.toStream(result.body().getBytes()));
} else if (typeName.contains("Void")) {
//Intended null return
resultHolder.complete(null);
} else {
resultHolder.complete(mapper.readValue(result.body().getBytes(), targetType));
}
} else {
resultHolder.completeExceptionally(event.cause());
}
} catch (IOException e) {
resultHolder.completeExceptionally(e);
}
}

private static boolean isFailure(int statusCode) {
return statusCode / 100 != 2;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package io.quarkus.apicurio.registry.avro;

import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.keycloak.authorization.client.util.HttpResponseException;

import io.apicurio.registry.auth.Auth;
import io.apicurio.registry.rest.client.config.ClientConfig;
import io.apicurio.registry.rest.client.exception.RestClientException;
import io.apicurio.registry.rest.client.impl.ErrorHandler;
import io.apicurio.registry.rest.client.request.Request;
import io.apicurio.registry.rest.client.spi.RegistryHttpClient;
import io.apicurio.registry.utils.ConcurrentUtil;
import io.apicurio.registry.utils.IoUtil;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.WebClient;

/**
* @author Carles Arnal '[email protected]'
*/
public class VertxHttpClient implements RegistryHttpClient {

private final WebClient webClient;
private final Auth auth;
private final String basePath;

private static final Map<String, String> DEFAULT_HEADERS = new HashMap<>();
private static final ThreadLocal<Map<String, String>> requestHeaders = ThreadLocal.withInitial(Collections::emptyMap);

public VertxHttpClient(Vertx vertx, String basePath, Map<String, Object> options, Auth auth) {
if (!basePath.endsWith("/")) {
basePath += "/";
}
this.webClient = WebClient.create(vertx);
this.auth = auth;
this.basePath = basePath;
addHeaders(options);
}

private static void addHeaders(Map<String, Object> configs) {

Map<String, String> requestHeaders = configs.entrySet().stream()
.filter(map -> map.getKey().startsWith(ClientConfig.REGISTRY_REQUEST_HEADERS_PREFIX))
.collect(Collectors.toMap(map -> map.getKey()
.replace(ClientConfig.REGISTRY_REQUEST_HEADERS_PREFIX, ""), map -> map.getValue().toString()));

if (!requestHeaders.isEmpty()) {
requestHeaders.forEach(DEFAULT_HEADERS::put);
}
}

@Override
public <T> T sendRequest(Request<T> request) {
if (Context.isOnEventLoopThread()) {
throw new UnsupportedOperationException("Must not be called on event loop");
}

try {
final URI uri = buildURI(basePath + request.getRequestPath(), request.getPathParams());
final RequestOptions requestOptions = new RequestOptions();

requestOptions.setHost(uri.getHost());
requestOptions.setURI(uri.getPath());
requestOptions.setPort(uri.getPort());

DEFAULT_HEADERS.forEach(requestOptions::addHeader);

//Add current request headers
requestHeaders.get().forEach(requestOptions::addHeader);
requestHeaders.remove();

Map<String, String> headers = request.getHeaders();
if (this.auth != null) {
//make headers mutable...
headers = new HashMap<>(headers);
this.auth.apply(headers);
}
headers.forEach(requestOptions::addHeader);

CompletableFuture<T> resultHolder;

switch (request.getOperation()) {
case GET:
resultHolder = executeGet(request, requestOptions);
break;
case PUT:
resultHolder = executePut(request, requestOptions);
break;
case POST:
resultHolder = executePost(request, requestOptions);
break;
case DELETE:
resultHolder = executeDelete(request, requestOptions);
break;
default:
throw new IllegalStateException("Operation not allowed");
}

return ConcurrentUtil.result(resultHolder);

} catch (URISyntaxException | HttpResponseException e) {
if (e.getCause() != null && e.getCause() instanceof RestClientException) {
throw (RestClientException) e.getCause();
} else {
throw ErrorHandler.parseError(e);
}
}
}

private <T> CompletableFuture<T> executeGet(Request<T> request, RequestOptions requestOptions) {
return sendRequestWithoutPayload(HttpMethod.GET, request, requestOptions);
}

private <T> CompletableFuture<T> executeDelete(Request<T> request, RequestOptions requestOptions) {
return sendRequestWithoutPayload(HttpMethod.DELETE, request, requestOptions);
}

private <T> CompletableFuture<T> executePost(Request<T> request, RequestOptions requestOptions) {
return sendRequestWithPayload(HttpMethod.POST, request, requestOptions);
}

private <T> CompletableFuture<T> executePut(Request<T> request, RequestOptions requestOptions) {
return sendRequestWithPayload(HttpMethod.PUT, request, requestOptions);
}

private <T> CompletableFuture<T> sendRequestWithoutPayload(HttpMethod httpMethod, Request<T> request,
RequestOptions requestOptions) {
final HttpRequest<Buffer> httpClientRequest = webClient.request(httpMethod, requestOptions);

//Iterate over query params list so we can add multiple query params with the same key
request.getQueryParams().forEach((key, paramList) -> paramList
.forEach(value -> httpClientRequest.setQueryParam(key, value)));

final CompletableFuture<T> resultHolder = new CompletableFuture<T>();
final ResponseHandler<T> responseHandler = new ResponseHandler<>(resultHolder, request.getResponseType());
httpClientRequest.send(responseHandler);
return resultHolder;
}

private <T> CompletableFuture<T> sendRequestWithPayload(HttpMethod httpMethod, Request<T> request,
RequestOptions requestOptions) {
final HttpRequest<Buffer> httpClientRequest = webClient.request(httpMethod, requestOptions);
final CompletableFuture<T> resultHolder = new CompletableFuture<T>();

//Iterate over query params list so we can add multiple query params with the same key
request.getQueryParams().forEach((key, paramList) -> paramList
.forEach(value -> httpClientRequest.setQueryParam(key, value)));

final ResponseHandler<T> responseHandler = new ResponseHandler<>(resultHolder, request.getResponseType());

Buffer buffer = Buffer.buffer(IoUtil.toBytes(request.getData()));
httpClientRequest.sendBuffer(buffer, responseHandler);

return resultHolder;
}

private static URI buildURI(String basePath, List<String> pathParams) throws URISyntaxException {
Object[] encodedPathParams = pathParams
.stream()
.map(VertxHttpClient::encodeURIComponent)
.toArray();
final URIBuilder uriBuilder = new URIBuilder(String.format(basePath, encodedPathParams));
final List<NameValuePair> queryParamsExpanded = new ArrayList<>();
uriBuilder.setParameters(queryParamsExpanded);
return uriBuilder.build();
}

private static String encodeURIComponent(String value) {
try {
return URLEncoder.encode(value, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void setNextRequestHeaders(Map<String, String> headers) {
requestHeaders.set(headers);
}

@Override
public Map<String, String> getHeaders() {
return requestHeaders.get();
}
}
Loading

0 comments on commit 29c4624

Please sign in to comment.