diff --git a/kafka-admin/pom.xml b/kafka-admin/pom.xml
index a94c1873..677c44f1 100644
--- a/kafka-admin/pom.xml
+++ b/kafka-admin/pom.xml
@@ -223,6 +223,10 @@
io.quarkus
quarkus-hibernate-validator
+
+ io.quarkus
+ quarkus-kafka-client
+
io.smallrye.common
smallrye-common-annotation
diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/AdminClientFactory.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/AdminClientFactory.java
index 16b241cc..3601b4d7 100644
--- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/AdminClientFactory.java
+++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/AdminClientFactory.java
@@ -1,6 +1,5 @@
package org.bf2.admin.kafka.admin.handlers;
-import io.vertx.core.Vertx;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -25,6 +24,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
@RequestScoped
public class AdminClientFactory {
@@ -39,9 +40,6 @@ public class AdminClientFactory {
@Inject
Logger log;
- @Inject
- Vertx vertx;
-
@Inject
KafkaAdminConfigRetriever config;
@@ -60,7 +58,7 @@ public class AdminClientFactory {
* map will be placed in the context under the key identified by the
* {@link #ADMIN_CLIENT_CONFIG} constant.
*/
- public AdminClient createAdminClient() {
+ public CompletionStage createAdminClient() {
Map acConfig = config.getAcConfig();
if (config.isOauthEnabled()) {
@@ -83,7 +81,7 @@ public AdminClient createAdminClient() {
log.debug("OAuth is disabled - no attempt to set access token in Admin Client config");
}
- return AdminClient.create(acConfig);
+ return CompletableFuture.supplyAsync(() -> AdminClient.create(acConfig));
}
Optional extractCredentials(Optional authorizationHeader) {
diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/OperationsHandler.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/OperationsHandler.java
index 773606e7..16b52009 100644
--- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/OperationsHandler.java
+++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/OperationsHandler.java
@@ -1,5 +1,6 @@
package org.bf2.admin.kafka.admin.handlers;
+import io.quarkus.runtime.annotations.RegisterForReflection;
import org.bf2.admin.kafka.admin.Operations;
import org.bf2.admin.kafka.admin.model.Types;
import org.eclipse.microprofile.openapi.annotations.Operation;
@@ -29,6 +30,7 @@
import java.util.Optional;
import java.util.concurrent.CompletionStage;
+@RegisterForReflection
public interface OperationsHandler {
@POST
diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java
index 14ff7581..3996a1b1 100644
--- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java
+++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java
@@ -128,9 +128,9 @@ public CompletionStage listTopics(String filter, Types.DeprecatedPageR
.thenApply(topicList -> Response.ok().entity(topicList).build());
}
- @Blocking
@Counted("consume_records_requests")
@Timed("consume_records_request_time")
+ @Blocking
public Response consumeRecords(String topicName,
RecordFilterParams params) {
@@ -292,16 +292,16 @@ private Pattern filterPattern(String filter) {
}
CompletionStage withAdminClient(Function> function) {
- final AdminClient client = clientFactory.createAdminClient();
-
- return threadContext.withContextCapture(function.apply(client))
- .whenComplete((result, error) -> {
- try {
- client.close();
- } catch (Exception e) {
- log.warnf("Exception closing Kafka AdminClient", e);
- }
- });
+ return threadContext.withContextCapture(clientFactory.createAdminClient())
+ .thenCompose(client -> function.apply(client).whenComplete((result, error) -> close(client)));
+ }
+
+ void close(AdminClient client) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ log.warnf("Exception closing Kafka AdminClient", e);
+ }
}
CompletionStage badRequest(String message) {
diff --git a/kafka-admin/src/main/resources/application.properties b/kafka-admin/src/main/resources/application.properties
index ff6e9978..e6c403ff 100644
--- a/kafka-admin/src/main/resources/application.properties
+++ b/kafka-admin/src/main/resources/application.properties
@@ -21,14 +21,13 @@ quarkus.http.ssl.certificate.key-file=${kafka.admin.tls.key:}
# See https://quarkus.io/guides/kafka-dev-services
# Enable when using quarkus-kafka-client
-#quarkus.kafka.devservices.enabled=false
+quarkus.kafka.devservices.enabled=false
-# Remove when quarkus-kafka-client supports Kafka client 3.0
-quarkus.index-dependency.kafka-clients.group-id=org.apache.kafka
-quarkus.index-dependency.kafka-clients.artifact-id=kafka-clients
+quarkus.vertx.max-event-loop-execute-time=4000
# The following properties will be used when adding JWT RBAC provided by quarkus-smallrye-jwt
quarkus.smallrye-jwt.enabled=true
+quarkus.smallrye-jwt.blocking-authentication=true
mp.jwt.verify.publickey.location=${kafka.admin.oauth.jwks.endpoint.uri: }
mp.jwt.verify.issuer=${kafka.admin.oauth.valid.issuer.uri: }
smallrye.jwt.client.tls.certificate=${kafka.admin.oauth.trusted.cert:}
@@ -44,7 +43,7 @@ quarkus.swagger-ui.theme=monokai
quarkus.log.category."org.apache.kafka".level=WARN
-kafka.admin.oauth.enabled=${quarkus.smallrye-jwt.enabled}
+kafka.admin.oauth.enabled=${quarkus.smallrye-jwt.enabled:true}
# Default limit to the number of partitions that new topics may have configured.
kafka.admin.num.partitions.max=100
# Default resource/operations mapping
diff --git a/pom.xml b/pom.xml
index 21551ec0..98ee0a70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,6 @@
2.7.6.Final
1.2.1
- 3.0.0
0.8.1
3.5.3
2.1
@@ -70,11 +69,6 @@
kafka-admin
${project.version}
-
- org.apache.kafka
- kafka-clients
- ${kafka.version}
-
io.strimzi
kafka-oauth-client