diff --git a/.github/native-tests.json b/.github/native-tests.json
index 618c179493223e..8cb869a545ae62 100644
--- a/.github/native-tests.json
+++ b/.github/native-tests.json
@@ -48,7 +48,7 @@
{
"category": "Messaging1",
"timeout": 100,
- "test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-snappy kafka-streams reactive-messaging-kafka"
+ "test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-avro-apicurio2 kafka-snappy kafka-streams reactive-messaging-kafka"
},
{
"category": "Messaging2",
diff --git a/integration-tests/kafka-avro-apicurio2/pom.xml b/integration-tests/kafka-avro-apicurio2/pom.xml
new file mode 100644
index 00000000000000..f5f066742f615d
--- /dev/null
+++ b/integration-tests/kafka-avro-apicurio2/pom.xml
@@ -0,0 +1,280 @@
+
+
+
+ quarkus-integration-tests-parent
+ io.quarkus
+ 999-SNAPSHOT
+
+ 4.0.0
+
+ quarkus-integration-test-kafka-avro-apicurio2
+ Quarkus - Integration Tests - Kafka Avro with Apicurio 2.x
+ The Apache Kafka Avro with Apicurio Registry 2.x integration tests module
+
+
+
+
+ 2.0.0.Final
+
+
+
+
+ io.quarkus
+ quarkus-integration-test-class-transformer
+
+
+ io.quarkus
+ quarkus-integration-test-shared-library
+
+
+
+
+ io.quarkus
+ quarkus-resteasy
+
+
+ io.quarkus
+ quarkus-resteasy-jackson
+
+
+ io.quarkus
+ quarkus-resteasy-jsonb
+
+
+
+
+ io.quarkus
+ quarkus-kafka-client
+
+
+
+
+ io.quarkus
+ quarkus-avro
+
+
+
+
+ io.confluent
+ kafka-avro-serializer
+ 6.1.1
+
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+
+
+
+
+
+ io.apicurio
+ apicurio-registry-serdes-avro-serde
+ ${apicurio.version}
+
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+
+
+
+
+ io.strimzi
+ strimzi-test-container
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+
+
+ io.quarkus
+ quarkus-avro-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-integration-test-class-transformer-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-kafka-client-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-jackson-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-jsonb-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+
+
+
+ confluent
+ https://packages.confluent.io/maven/
+
+ false
+
+
+
+
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+
+
+
+ generate-code
+ build
+
+
+
+
+
+
+ maven-failsafe-plugin
+
+ true
+
+
+
+
+ maven-surefire-plugin
+
+ true
+
+
+
+
+
+
+
+ test-kafka
+
+
+ test-containers
+
+
+
+
+
+ maven-surefire-plugin
+
+ false
+
+
+
+ maven-failsafe-plugin
+
+ false
+
+
+
+
+
+
+
+ native-image
+
+
+ native
+
+
+
+
+ true
+
+
+
+
+
diff --git a/integration-tests/kafka-avro-apicurio2/src/main/avro/pet.avsc b/integration-tests/kafka-avro-apicurio2/src/main/avro/pet.avsc
new file mode 100644
index 00000000000000..ab16e83ff438c0
--- /dev/null
+++ b/integration-tests/kafka-avro-apicurio2/src/main/avro/pet.avsc
@@ -0,0 +1,15 @@
+{
+ "namespace": "io.quarkus.it.kafka.avro",
+ "type": "record",
+ "name": "Pet",
+ "fields": [
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "color",
+ "type": "string"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java
new file mode 100644
index 00000000000000..1960ffe1df2802
--- /dev/null
+++ b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java
@@ -0,0 +1,71 @@
+package io.quarkus.it.kafka.avro;
+
+import java.time.Duration;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import io.vertx.core.json.JsonObject;
+
+/**
+ * Endpoint to test the Avro support
+ */
+@Path("/avro")
+public class AvroEndpoint {
+
+ @Inject
+ AvroKafkaCreator creator;
+
+ @GET
+ @Path("/confluent")
+ public JsonObject getConfluent() {
+ return get(creator.createConfluentConsumer("test-avro-confluent-consumer", "test-avro-confluent-consumer"));
+ }
+
+ @POST
+ @Path("/confluent")
+ public void sendConfluent(Pet pet) {
+ KafkaProducer p = creator.createConfluentProducer("test-avro-confluent");
+ send(p, pet, "test-avro-confluent-producer");
+ }
+
+ @GET
+ @Path("/apicurio")
+ public JsonObject getApicurio() {
+ return get(creator.createApicurioConsumer("test-avro-apicurio-consumer", "test-avro-apicurio-consumer"));
+ }
+
+ @POST
+ @Path("/apicurio")
+ public void sendApicurio(Pet pet) {
+ KafkaProducer p = creator.createApicurioProducer("test-avro-apicurio");
+ send(p, pet, "test-avro-apicurio-producer");
+ }
+
+ private JsonObject get(KafkaConsumer consumer) {
+ final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000));
+ if (records.isEmpty()) {
+ return null;
+ }
+ ConsumerRecord consumerRecord = records.iterator().next();
+ Pet p = consumerRecord.value();
+ // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema.
+ JsonObject result = new JsonObject();
+ result.put("name", p.getName());
+ result.put("color", p.getColor());
+ return result;
+ }
+
+ private void send(KafkaProducer producer, Pet pet, String topic) {
+ producer.send(new ProducerRecord<>(topic, 0, pet));
+ producer.flush();
+ }
+}
diff --git a/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java
new file mode 100644
index 00000000000000..065092035cb61d
--- /dev/null
+++ b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java
@@ -0,0 +1,146 @@
+package io.quarkus.it.kafka.avro;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import javax.enterprise.context.ApplicationScoped;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+import io.apicurio.registry.serde.SerdeConfig;
+import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
+import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
+import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+
+/**
+ * Create Avro Kafka Consumers and Producers
+ */
+@ApplicationScoped
+public class AvroKafkaCreator {
+
+ @ConfigProperty(name = "kafka.bootstrap.servers")
+ String bootstrap;
+ @ConfigProperty(name = "schema.url.confluent")
+ String confluent;
+ @ConfigProperty(name = "schema.url.apicurio")
+ String apicurio;
+
+ public KafkaConsumer createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
+ return createConfluentConsumer(bootstrap, confluent, groupdIdConfig, subscribtionName);
+ }
+
+ public KafkaProducer createConfluentProducer(String clientId) {
+ return createConfluentProducer(bootstrap, confluent, clientId);
+ }
+
+ public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
+ return createApicurioConsumer(bootstrap, apicurio, groupdIdConfig, subscribtionName);
+ }
+
+ public KafkaProducer createApicurioProducer(String clientId) {
+ return createApicurioProducer(bootstrap, apicurio, clientId);
+ }
+
+ public static KafkaConsumer createConfluentConsumer(String bootstrap, String confluent,
+ String groupdIdConfig, String subscribtionName) {
+ Properties p = getConfluentConsumerProperties(bootstrap, confluent, groupdIdConfig);
+ return createConsumer(p, subscribtionName);
+ }
+
+ public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio,
+ String groupdIdConfig, String subscribtionName) {
+ Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig);
+ return createConsumer(p, subscribtionName);
+ }
+
+ public static KafkaProducer createConfluentProducer(String bootstrap, String confluent,
+ String clientId) {
+ Properties p = getConfluentProducerProperties(bootstrap, confluent, clientId);
+ return createProducer(p);
+ }
+
+ public static KafkaProducer createApicurioProducer(String bootstrap, String apicurio,
+ String clientId) {
+ Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId);
+ return createProducer(p);
+ }
+
+ private static KafkaConsumer createConsumer(Properties props, String subscribtionName) {
+ if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) {
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ }
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList(subscribtionName));
+ return consumer;
+ }
+
+ private static KafkaProducer createProducer(Properties props) {
+ if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) {
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ }
+ return new KafkaProducer<>(props);
+ }
+
+ private static Properties getConfluentConsumerProperties(String bootstrap, String confluent,
+ String groupdIdConfig) {
+ Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent);
+ props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
+ return props;
+ }
+
+ public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) {
+ Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
+ props.put(SerdeConfig.REGISTRY_URL, apicurio);
+ props.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true);
+ return props;
+ }
+
+ private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ return props;
+ }
+
+ private static Properties getConfluentProducerProperties(String bootstrap, String confluent, String clientId) {
+ Properties props = getGenericProducerProperties(bootstrap, clientId);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent);
+ return props;
+ }
+
+ private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) {
+ Properties props = getGenericProducerProperties(bootstrap, clientId);
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
+ props.put(SerdeConfig.REGISTRY_URL, apicurio);
+ props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
+ return props;
+ }
+
+ private static Properties getGenericProducerProperties(String bootstrap, String clientId) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+ return props;
+ }
+}
diff --git a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties
new file mode 100644
index 00000000000000..1713f0b4061eac
--- /dev/null
+++ b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties
@@ -0,0 +1,10 @@
+quarkus.log.category.kafka.level=WARN
+quarkus.log.category.\"org.apache.kafka\".level=WARN
+quarkus.log.category.\"org.apache.zookeeper\".level=WARN
+
+# enable health check
+quarkus.kafka.health.enabled=true
+
+# TODO: this should not be needed, but Avro does not seem to use the correct CL
+# This will also cause dev mode issues
+quarkus.test.flat-class-path=true
diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java
new file mode 100644
index 00000000000000..56e1cc5c566747
--- /dev/null
+++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java
@@ -0,0 +1,50 @@
+package io.quarkus.it.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testcontainers.containers.GenericContainer;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import io.strimzi.StrimziKafkaContainer;
+
+public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
+
+ private static final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
+
+ private static GenericContainer> registry;
+
+ public static String getBootstrapServers() {
+ return kafka.getBootstrapServers();
+ }
+
+ public static String getConfluentSchemaRegistryUrl() {
+ return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/ccompat/v6";
+ }
+
+ public static String getApicurioSchemaRegistryUrl() {
+ return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2";
+ }
+
+ @Override
+ public Map start() {
+ kafka.start();
+ registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.0.0.Final")
+ .withExposedPorts(8080)
+ .withEnv("QUARKUS_PROFILE", "prod");
+ registry.start();
+ Map properties = new HashMap<>();
+ properties.put("schema.url.confluent",
+ "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/ccompat/v6");
+ properties.put("schema.url.apicurio",
+ "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
+ properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
+ return properties;
+ }
+
+ @Override
+ public void stop() {
+ registry.stop();
+ kafka.close();
+ }
+}
diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java
new file mode 100644
index 00000000000000..d2467dca37b48a
--- /dev/null
+++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java
@@ -0,0 +1,10 @@
+package io.quarkus.it.kafka;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.NativeImageTest;
+
+@NativeImageTest
+@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
+public class KafkaAvroIT extends KafkaAvroTest {
+
+}
diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java
new file mode 100644
index 00000000000000..418243f5d778d9
--- /dev/null
+++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java
@@ -0,0 +1,90 @@
+package io.quarkus.it.kafka;
+
+import java.time.Duration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.it.kafka.avro.AvroKafkaCreator;
+import io.quarkus.it.kafka.avro.Pet;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.RestAssured;
+
+@QuarkusTest
+@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
+public class KafkaAvroTest {
+
+ private static final String CONFLUENT_PATH = "/avro/confluent";
+ private static final String APICURIO_PATH = "/avro/apicurio";
+
+ @Test
+ public void testConfluentAvroProducer() {
+ KafkaConsumer consumer = AvroKafkaCreator.createConfluentConsumer(
+ KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
+ KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(),
+ "test-avro-confluent",
+ "test-avro-confluent-producer");
+ testAvroProducer(consumer, CONFLUENT_PATH);
+ }
+
+ @Test
+ public void testConfluentAvroConsumer() {
+ KafkaProducer producer = AvroKafkaCreator.createConfluentProducer(
+ KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
+ KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(),
+ "test-avro-confluent-test");
+ testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer");
+ }
+
+ @Test
+ public void testApicurioAvroProducer() {
+ KafkaConsumer consumer = AvroKafkaCreator.createApicurioConsumer(
+ KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
+ KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(),
+ "test-avro-apicurio",
+ "test-avro-apicurio-producer");
+ testAvroProducer(consumer, APICURIO_PATH);
+ }
+
+ @Test
+ public void testApicurioAvroConsumer() {
+ KafkaProducer producer = AvroKafkaCreator.createApicurioProducer(
+ KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
+ KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(),
+ "test-avro-apicurio-test");
+ testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer");
+ }
+
+ private void testAvroProducer(KafkaConsumer consumer, String path) {
+ RestAssured.given()
+ .header("content-type", "application/json")
+ .body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
+ .post(path);
+ ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next();
+ Assertions.assertEquals(records.key(), (Integer) 0);
+ Pet pet = records.value();
+ Assertions.assertEquals("neo", pet.getName());
+ Assertions.assertEquals("tricolor", pet.getColor());
+ consumer.close();
+ }
+
+ private void testAvroConsumer(KafkaProducer producer, String path, String topic) {
+ producer.send(new ProducerRecord<>(topic, 1, createPet()));
+ Pet retrieved = RestAssured.when().get(path).as(Pet.class);
+ Assertions.assertEquals("neo", retrieved.getName());
+ Assertions.assertEquals("white", retrieved.getColor());
+ producer.close();
+ }
+
+ private Pet createPet() {
+ Pet pet = new Pet();
+ pet.setName("neo");
+ pet.setColor("white");
+ return pet;
+ }
+}
diff --git a/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java
index 96c0e0ff27077b..2a6e9d809378dd 100644
--- a/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java
+++ b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java
@@ -19,9 +19,8 @@
import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
import io.apicurio.registry.utils.serde.AvroKafkaSerializer;
import io.apicurio.registry.utils.serde.avro.AvroDatumProvider;
-import io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider;
+import io.apicurio.registry.utils.serde.avro.DefaultAvroDatumProvider;
import io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy;
-import io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
@@ -109,8 +108,11 @@ public static Properties getApicurioConsumerProperties(String bootstrap, String
Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, apicurio);
+ // this is a workaround for Apicurio Registry 1.2.2.Final bug: if `avro-datum-provider`
+ // isn't set to `DefaultAvroDatumProvider` explicitly, `use-specific-avro-reader` isn't processed
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM,
- ReflectAvroDatumProvider.class.getName());
+ DefaultAvroDatumProvider.class.getName());
+ props.put(AvroDatumProvider.REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM, true);
return props;
}
@@ -137,12 +139,8 @@ private static Properties getApicurioProducerProperties(String bootstrap, String
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, apicurio);
- props.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM,
- SimpleTopicIdStrategy.class.getName());
props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM,
GetOrCreateIdStrategy.class.getName());
- props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM,
- ReflectAvroDatumProvider.class.getName());
return props;
}
diff --git a/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java
index e4a50a1d5963ad..308b3e7a2d739c 100644
--- a/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java
+++ b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java
@@ -31,10 +31,7 @@ public Map start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final")
.withExposedPorts(8080)
- .withEnv("QUARKUS_PROFILE", "prod")
- .withEnv("KAFKA_BOOTSTRAP_SERVERS", kafka.getBootstrapServers())
- .withEnv("APPLICATION_ID", "registry_id")
- .withEnv("APPLICATION_SERVER", "localhost:9000");
+ .withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map properties = new HashMap<>();
properties.put("schema.url.confluent",
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 101f092fc900a1..bd71007b1a8e77 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -43,6 +43,7 @@
kafka-sasl
kafka-snappy
kafka-avro
+ kafka-avro-apicurio2
kafka-streams
jpa
jpa-db2