From 143fef2c9f8824d2e0d97804fe5096ded64feeb6 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Tue, 13 Apr 2021 14:53:33 +0200 Subject: [PATCH 1/3] Add support for Apicurio Registry 2.x Avro library --- .../java/io/quarkus/deployment/Feature.java | 1 + docs/src/main/asciidoc/native-and-ssl.adoc | 1 + .../quarkus/avro/graal/AvroSubstitutions.java | 15 ++++-- .../client/deployment/KafkaProcessor.java | 53 +++++++++++++++++-- .../runtime/KafkaStreamsRuntimeConfig.java | 10 ++-- 5 files changed, 69 insertions(+), 11 deletions(-) diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java index a370c6e9dac75..0870fb8903ec3 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java @@ -53,6 +53,7 @@ public enum Feature { JDBC_ORACLE, JGIT, JSCH, + KAFKA_CLIENT, KAFKA_STREAMS, KEYCLOAK_AUTHORIZATION, KOTLIN, diff --git a/docs/src/main/asciidoc/native-and-ssl.adoc b/docs/src/main/asciidoc/native-and-ssl.adoc index 352647f88bac5..05a45e6433fca 100644 --- a/docs/src/main/asciidoc/native-and-ssl.adoc +++ b/docs/src/main/asciidoc/native-and-ssl.adoc @@ -81,6 +81,7 @@ As SSL is de facto the standard nowadays, we decided to enable its support autom * the Infinispan Client extension (`quarkus-infinispan-client`). * the Jaeger extension (`quarkus-jaeger`), * the JGit extension (`quarkus-jgit`), + * the Kafka Client extension (`quarkus-kafka-client`), if Apicurio Registry 2.x Avro library is used * the Keycloak extension (`quarkus-keycloak`), * the Kubernetes client extension (`quarkus-kubernetes-client`), * the Mailer extension (`quarkus-mailer`), diff --git a/extensions/avro/runtime/src/main/java/io/quarkus/avro/graal/AvroSubstitutions.java b/extensions/avro/runtime/src/main/java/io/quarkus/avro/graal/AvroSubstitutions.java index 009db1593e49e..d329f152e49d8 100644 --- a/extensions/avro/runtime/src/main/java/io/quarkus/avro/graal/AvroSubstitutions.java +++ b/extensions/avro/runtime/src/main/java/io/quarkus/avro/graal/AvroSubstitutions.java @@ -4,6 +4,7 @@ import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Map; +import java.util.function.BooleanSupplier; import java.util.function.Function; import org.apache.avro.Schema; @@ -12,6 +13,7 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.ResolvingDecoder; import org.apache.avro.util.WeakIdentityHashMap; +import org.graalvm.home.Version; import com.oracle.svm.core.annotate.Alias; import com.oracle.svm.core.annotate.Inject; @@ -19,7 +21,7 @@ import com.oracle.svm.core.annotate.Substitute; import com.oracle.svm.core.annotate.TargetClass; -@TargetClass(className = "org.apache.avro.reflect.ReflectionUtil") +@TargetClass(className = "org.apache.avro.reflect.ReflectionUtil", onlyWith = GraalVM20OrEarlier.class) final class Target_org_apache_avro_reflect_ReflectionUtil { /** @@ -47,7 +49,7 @@ public R apply(V v) { } -@TargetClass(className = "org.apache.avro.reflect.ReflectData") +@TargetClass(className = "org.apache.avro.reflect.ReflectData", onlyWith = GraalVM20OrEarlier.class) final class Target_org_apache_avro_reflect_ReflectData { @Inject @@ -74,7 +76,7 @@ private Target_org_apache_avro_reflect_ReflectData_ClassAccessorData getClassAcc } } -@TargetClass(className = "org.apache.avro.reflect.ReflectData", innerClass = "ClassAccessorData") +@TargetClass(className = "org.apache.avro.reflect.ReflectData", innerClass = "ClassAccessorData", onlyWith = GraalVM20OrEarlier.class) final class Target_org_apache_avro_reflect_ReflectData_ClassAccessorData { // Just provide access to "ReflectData.ClassAccessorData" @@ -125,5 +127,12 @@ protected Target_org_apache_avro_generic_GenericDatumReader(GenericData data) { } +class GraalVM20OrEarlier implements BooleanSupplier { + @Override + public boolean getAsBoolean() { + return Version.getCurrent().compareTo(21) < 0; + } +} + class AvroSubstitutions { } diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 6b90114830ec7..1ada9b57459ac 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -52,12 +52,15 @@ import io.quarkus.arc.deployment.UnremovableBeanBuildItem; import io.quarkus.deployment.Capabilities; import io.quarkus.deployment.Capability; +import io.quarkus.deployment.Feature; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.ExecutionTime; import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.AdditionalIndexedClassesBuildItem; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; +import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.IndexDependencyBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; @@ -103,6 +106,11 @@ public class KafkaProcessor { static final DotName OBJECT_MAPPER = DotName.createSimple("com.fasterxml.jackson.databind.ObjectMapper"); + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(Feature.KAFKA_CLIENT); + } + @BuildStep void contributeClassesToIndex(BuildProducer additionalIndexedClasses, BuildProducer indexDependency) { @@ -124,7 +132,8 @@ public void build( BuildProducer serviceProviders, BuildProducer proxies, Capabilities capabilities, BuildProducer beans, - BuildProducer nativeLibs, NativeConfig nativeConfig) { + BuildProducer nativeLibs, NativeConfig nativeConfig, + BuildProducer sslNativeSupport) { final Set toRegister = new HashSet<>(); collectImplementors(toRegister, indexBuildItem, Serializer.class); @@ -177,7 +186,7 @@ public void build( reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "java.nio.DirectByteBuffer")); reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "sun.misc.Cleaner")); - handleAvro(reflectiveClass, proxies, serviceProviders); + handleAvro(reflectiveClass, proxies, serviceProviders, sslNativeSupport); handleOpenTracing(reflectiveClass, capabilities); handleStrimziOAuth(reflectiveClass); if (config.snappyEnabled) { @@ -259,8 +268,11 @@ private void handleStrimziOAuth(BuildProducer reflecti private void handleAvro(BuildProducer reflectiveClass, BuildProducer proxies, - BuildProducer serviceProviders) { + BuildProducer serviceProviders, + BuildProducer sslNativeSupport) { // Avro - for both Confluent and Apicurio + + // --- Confluent --- try { Class.forName("io.confluent.kafka.serializers.KafkaAvroDeserializer", false, Thread.currentThread().getContextClassLoader()); @@ -310,6 +322,8 @@ private void handleAvro(BuildProducer reflectiveClass, } catch (ClassNotFoundException e) { // ignore, Confluent schema registry client not in the classpath } + + // --- Apicurio Registry 1.x --- try { Class.forName("io.apicurio.registry.utils.serde.AvroKafkaDeserializer", false, Thread.currentThread().getContextClassLoader()); @@ -319,6 +333,7 @@ private void handleAvro(BuildProducer reflectiveClass, "io.apicurio.registry.utils.serde.AvroKafkaSerializer")); reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, + "io.apicurio.registry.utils.serde.avro.DefaultAvroDatumProvider", "io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy", "io.apicurio.registry.utils.serde.strategy.CachedSchemaIdStrategy", @@ -337,6 +352,38 @@ private void handleAvro(BuildProducer reflectiveClass, } catch (ClassNotFoundException e) { //ignore, Apicurio Avro is not in the classpath } + + // --- Apicurio Registry 2.x --- + try { + Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false, + Thread.currentThread().getContextClassLoader()); + reflectiveClass.produce( + new ReflectiveClassBuildItem(true, true, false, + "io.apicurio.registry.serde.avro.AvroKafkaDeserializer", + "io.apicurio.registry.serde.avro.AvroKafkaSerializer")); + + reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, + "io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy", + "io.apicurio.registry.serde.strategy.TopicIdStrategy", + "io.apicurio.registry.serde.avro.DefaultAvroDatumProvider", + "io.apicurio.registry.serde.avro.ReflectAvroDatumProvider", + "io.apicurio.registry.serde.avro.strategy.RecordIdStrategy", + "io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy")); + + reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, + "io.apicurio.registry.serde.DefaultSchemaResolver", + "io.apicurio.registry.serde.DefaultIdHandler", + "io.apicurio.registry.serde.Legacy4ByteIdHandler", + "io.apicurio.registry.serde.DefaultSchemaResolver", + "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider", + "io.apicurio.registry.serde.headers.DefaultHeadersHandler")); + + // Apicurio Registry 2.x uses the JDK 11 HTTP client, which unconditionally requires SSL + sslNativeSupport.produce(new ExtensionSslNativeSupportBuildItem(Feature.KAFKA_CLIENT)); + + } catch (ClassNotFoundException e) { + //ignore, Apicurio Avro is not in the classpath + } } @BuildStep diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java index c42fc5a58f488..c1d263aca0e93 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java @@ -47,17 +47,17 @@ public class KafkaStreamsRuntimeConfig { /** * The schema registry key. * - * e.g. to diff between different registry impls / instances - * as they have this registry url under different property key. + * Different schema registry libraries expect a registry URL + * in different configuration properties. * - * Red Hat / Apicurio - apicurio.registry.url - * Confluent - schema.registry.url + * For Apicurio Registry, use {@code apicurio.registry.url}. + * For Confluent schema registry, use {@code schema.registry.url}. */ @ConfigItem(defaultValue = "schema.registry.url") public String schemaRegistryKey; /** - * The schema registry url. + * The schema registry URL. */ @ConfigItem public Optional schemaRegistryUrl; From 2fdc0bf75f63142a5a86e9d200eaa93302c8dae3 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Mon, 19 Apr 2021 15:09:11 +0200 Subject: [PATCH 2/3] Add tests for Apicurio Registry 2.x Avro library --- .github/native-tests.json | 2 +- .../kafka-avro-apicurio2/pom.xml | 280 ++++++++++++++++++ .../src/main/avro/pet.avsc | 15 + .../quarkus/it/kafka/avro/AvroEndpoint.java | 71 +++++ .../it/kafka/avro/AvroKafkaCreator.java | 146 +++++++++ .../src/main/resources/application.properties | 10 + .../KafkaAndSchemaRegistryTestResource.java | 50 ++++ .../java/io/quarkus/it/kafka/KafkaAvroIT.java | 10 + .../io/quarkus/it/kafka/KafkaAvroTest.java | 90 ++++++ .../it/kafka/avro/AvroKafkaCreator.java | 12 +- .../KafkaAndSchemaRegistryTestResource.java | 5 +- integration-tests/pom.xml | 1 + 12 files changed, 680 insertions(+), 12 deletions(-) create mode 100644 integration-tests/kafka-avro-apicurio2/pom.xml create mode 100644 integration-tests/kafka-avro-apicurio2/src/main/avro/pet.avsc create mode 100644 integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java create mode 100644 integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java create mode 100644 integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties create mode 100644 integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java create mode 100644 integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java create mode 100644 integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java diff --git a/.github/native-tests.json b/.github/native-tests.json index 618c179493223..8cb869a545ae6 100644 --- a/.github/native-tests.json +++ b/.github/native-tests.json @@ -48,7 +48,7 @@ { "category": "Messaging1", "timeout": 100, - "test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-snappy kafka-streams reactive-messaging-kafka" + "test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-avro-apicurio2 kafka-snappy kafka-streams reactive-messaging-kafka" }, { "category": "Messaging2", diff --git a/integration-tests/kafka-avro-apicurio2/pom.xml b/integration-tests/kafka-avro-apicurio2/pom.xml new file mode 100644 index 0000000000000..f5f066742f615 --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/pom.xml @@ -0,0 +1,280 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-kafka-avro-apicurio2 + Quarkus - Integration Tests - Kafka Avro with Apicurio 2.x + The Apache Kafka Avro with Apicurio Registry 2.x integration tests module + + + + + 2.0.0.Final + + + + + io.quarkus + quarkus-integration-test-class-transformer + + + io.quarkus + quarkus-integration-test-shared-library + + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jackson + + + io.quarkus + quarkus-resteasy-jsonb + + + + + io.quarkus + quarkus-kafka-client + + + + + io.quarkus + quarkus-avro + + + + + io.confluent + kafka-avro-serializer + 6.1.1 + + + jakarta.ws.rs + jakarta.ws.rs-api + + + + + + io.apicurio + apicurio-registry-serdes-avro-serde + ${apicurio.version} + + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + jakarta.xml.bind + jakarta.xml.bind-api + + + + + io.strimzi + strimzi-test-container + test + + + org.apache.logging.log4j + log4j-core + + + + + org.testcontainers + testcontainers + test + + + + + io.quarkus + quarkus-avro-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-integration-test-class-transformer-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-kafka-client-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-jsonb-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + confluent + https://packages.confluent.io/maven/ + + false + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + maven-failsafe-plugin + + true + + + + + maven-surefire-plugin + + true + + + + + + + + test-kafka + + + test-containers + + + + + + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + false + + + + + + + + native-image + + + native + + + + + true + + + + + diff --git a/integration-tests/kafka-avro-apicurio2/src/main/avro/pet.avsc b/integration-tests/kafka-avro-apicurio2/src/main/avro/pet.avsc new file mode 100644 index 0000000000000..ab16e83ff438c --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/main/avro/pet.avsc @@ -0,0 +1,15 @@ +{ + "namespace": "io.quarkus.it.kafka.avro", + "type": "record", + "name": "Pet", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "color", + "type": "string" + } + ] +} \ No newline at end of file diff --git a/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java new file mode 100644 index 0000000000000..1960ffe1df280 --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java @@ -0,0 +1,71 @@ +package io.quarkus.it.kafka.avro; + +import java.time.Duration; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.vertx.core.json.JsonObject; + +/** + * Endpoint to test the Avro support + */ +@Path("/avro") +public class AvroEndpoint { + + @Inject + AvroKafkaCreator creator; + + @GET + @Path("/confluent") + public JsonObject getConfluent() { + return get(creator.createConfluentConsumer("test-avro-confluent-consumer", "test-avro-confluent-consumer")); + } + + @POST + @Path("/confluent") + public void sendConfluent(Pet pet) { + KafkaProducer p = creator.createConfluentProducer("test-avro-confluent"); + send(p, pet, "test-avro-confluent-producer"); + } + + @GET + @Path("/apicurio") + public JsonObject getApicurio() { + return get(creator.createApicurioConsumer("test-avro-apicurio-consumer", "test-avro-apicurio-consumer")); + } + + @POST + @Path("/apicurio") + public void sendApicurio(Pet pet) { + KafkaProducer p = creator.createApicurioProducer("test-avro-apicurio"); + send(p, pet, "test-avro-apicurio-producer"); + } + + private JsonObject get(KafkaConsumer consumer) { + final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000)); + if (records.isEmpty()) { + return null; + } + ConsumerRecord consumerRecord = records.iterator().next(); + Pet p = consumerRecord.value(); + // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema. + JsonObject result = new JsonObject(); + result.put("name", p.getName()); + result.put("color", p.getColor()); + return result; + } + + private void send(KafkaProducer producer, Pet pet, String topic) { + producer.send(new ProducerRecord<>(topic, 0, pet)); + producer.flush(); + } +} diff --git a/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java new file mode 100644 index 0000000000000..065092035cb61 --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java @@ -0,0 +1,146 @@ +package io.quarkus.it.kafka.avro; + +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import javax.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaDeserializer; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaSerializer; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +/** + * Create Avro Kafka Consumers and Producers + */ +@ApplicationScoped +public class AvroKafkaCreator { + + @ConfigProperty(name = "kafka.bootstrap.servers") + String bootstrap; + @ConfigProperty(name = "schema.url.confluent") + String confluent; + @ConfigProperty(name = "schema.url.apicurio") + String apicurio; + + public KafkaConsumer createConfluentConsumer(String groupdIdConfig, String subscribtionName) { + return createConfluentConsumer(bootstrap, confluent, groupdIdConfig, subscribtionName); + } + + public KafkaProducer createConfluentProducer(String clientId) { + return createConfluentProducer(bootstrap, confluent, clientId); + } + + public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) { + return createApicurioConsumer(bootstrap, apicurio, groupdIdConfig, subscribtionName); + } + + public KafkaProducer createApicurioProducer(String clientId) { + return createApicurioProducer(bootstrap, apicurio, clientId); + } + + public static KafkaConsumer createConfluentConsumer(String bootstrap, String confluent, + String groupdIdConfig, String subscribtionName) { + Properties p = getConfluentConsumerProperties(bootstrap, confluent, groupdIdConfig); + return createConsumer(p, subscribtionName); + } + + public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio, + String groupdIdConfig, String subscribtionName) { + Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig); + return createConsumer(p, subscribtionName); + } + + public static KafkaProducer createConfluentProducer(String bootstrap, String confluent, + String clientId) { + Properties p = getConfluentProducerProperties(bootstrap, confluent, clientId); + return createProducer(p); + } + + public static KafkaProducer createApicurioProducer(String bootstrap, String apicurio, + String clientId) { + Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId); + return createProducer(p); + } + + private static KafkaConsumer createConsumer(Properties props, String subscribtionName) { + if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) { + props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(subscribtionName)); + return consumer; + } + + private static KafkaProducer createProducer(Properties props) { + if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) { + props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + return new KafkaProducer<>(props); + } + + private static Properties getConfluentConsumerProperties(String bootstrap, String confluent, + String groupdIdConfig) { + Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent); + props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); + return props; + } + + public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) { + Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName()); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + props.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true); + return props; + } + + private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + return props; + } + + private static Properties getConfluentProducerProperties(String bootstrap, String confluent, String clientId) { + Properties props = getGenericProducerProperties(bootstrap, clientId); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent); + return props; + } + + private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) { + Properties props = getGenericProducerProperties(bootstrap, clientId); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true); + return props; + } + + private static Properties getGenericProducerProperties(String bootstrap, String clientId) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + return props; + } +} diff --git a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties new file mode 100644 index 0000000000000..1713f0b4061ea --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties @@ -0,0 +1,10 @@ +quarkus.log.category.kafka.level=WARN +quarkus.log.category.\"org.apache.kafka\".level=WARN +quarkus.log.category.\"org.apache.zookeeper\".level=WARN + +# enable health check +quarkus.kafka.health.enabled=true + +# TODO: this should not be needed, but Avro does not seem to use the correct CL +# This will also cause dev mode issues +quarkus.test.flat-class-path=true diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java new file mode 100644 index 0000000000000..56e1cc5c56674 --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java @@ -0,0 +1,50 @@ +package io.quarkus.it.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.testcontainers.containers.GenericContainer; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import io.strimzi.StrimziKafkaContainer; + +public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager { + + private static final StrimziKafkaContainer kafka = new StrimziKafkaContainer(); + + private static GenericContainer registry; + + public static String getBootstrapServers() { + return kafka.getBootstrapServers(); + } + + public static String getConfluentSchemaRegistryUrl() { + return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/ccompat/v6"; + } + + public static String getApicurioSchemaRegistryUrl() { + return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"; + } + + @Override + public Map start() { + kafka.start(); + registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.0.0.Final") + .withExposedPorts(8080) + .withEnv("QUARKUS_PROFILE", "prod"); + registry.start(); + Map properties = new HashMap<>(); + properties.put("schema.url.confluent", + "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/ccompat/v6"); + properties.put("schema.url.apicurio", + "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"); + properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); + return properties; + } + + @Override + public void stop() { + registry.stop(); + kafka.close(); + } +} diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java new file mode 100644 index 0000000000000..d2467dca37b48 --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java @@ -0,0 +1,10 @@ +package io.quarkus.it.kafka; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.NativeImageTest; + +@NativeImageTest +@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) +public class KafkaAvroIT extends KafkaAvroTest { + +} diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java new file mode 100644 index 0000000000000..418243f5d778d --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java @@ -0,0 +1,90 @@ +package io.quarkus.it.kafka; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.it.kafka.avro.AvroKafkaCreator; +import io.quarkus.it.kafka.avro.Pet; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; + +@QuarkusTest +@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) +public class KafkaAvroTest { + + private static final String CONFLUENT_PATH = "/avro/confluent"; + private static final String APICURIO_PATH = "/avro/apicurio"; + + @Test + public void testConfluentAvroProducer() { + KafkaConsumer consumer = AvroKafkaCreator.createConfluentConsumer( + KafkaAndSchemaRegistryTestResource.getBootstrapServers(), + KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(), + "test-avro-confluent", + "test-avro-confluent-producer"); + testAvroProducer(consumer, CONFLUENT_PATH); + } + + @Test + public void testConfluentAvroConsumer() { + KafkaProducer producer = AvroKafkaCreator.createConfluentProducer( + KafkaAndSchemaRegistryTestResource.getBootstrapServers(), + KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(), + "test-avro-confluent-test"); + testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer"); + } + + @Test + public void testApicurioAvroProducer() { + KafkaConsumer consumer = AvroKafkaCreator.createApicurioConsumer( + KafkaAndSchemaRegistryTestResource.getBootstrapServers(), + KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(), + "test-avro-apicurio", + "test-avro-apicurio-producer"); + testAvroProducer(consumer, APICURIO_PATH); + } + + @Test + public void testApicurioAvroConsumer() { + KafkaProducer producer = AvroKafkaCreator.createApicurioProducer( + KafkaAndSchemaRegistryTestResource.getBootstrapServers(), + KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(), + "test-avro-apicurio-test"); + testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer"); + } + + private void testAvroProducer(KafkaConsumer consumer, String path) { + RestAssured.given() + .header("content-type", "application/json") + .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") + .post(path); + ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); + Assertions.assertEquals(records.key(), (Integer) 0); + Pet pet = records.value(); + Assertions.assertEquals("neo", pet.getName()); + Assertions.assertEquals("tricolor", pet.getColor()); + consumer.close(); + } + + private void testAvroConsumer(KafkaProducer producer, String path, String topic) { + producer.send(new ProducerRecord<>(topic, 1, createPet())); + Pet retrieved = RestAssured.when().get(path).as(Pet.class); + Assertions.assertEquals("neo", retrieved.getName()); + Assertions.assertEquals("white", retrieved.getColor()); + producer.close(); + } + + private Pet createPet() { + Pet pet = new Pet(); + pet.setName("neo"); + pet.setColor("white"); + return pet; + } +} diff --git a/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java index 96c0e0ff27077..2a6e9d809378d 100644 --- a/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java +++ b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java @@ -19,9 +19,8 @@ import io.apicurio.registry.utils.serde.AvroKafkaDeserializer; import io.apicurio.registry.utils.serde.AvroKafkaSerializer; import io.apicurio.registry.utils.serde.avro.AvroDatumProvider; -import io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider; +import io.apicurio.registry.utils.serde.avro.DefaultAvroDatumProvider; import io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy; -import io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; @@ -109,8 +108,11 @@ public static Properties getApicurioConsumerProperties(String bootstrap, String Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName()); props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, apicurio); + // this is a workaround for Apicurio Registry 1.2.2.Final bug: if `avro-datum-provider` + // isn't set to `DefaultAvroDatumProvider` explicitly, `use-specific-avro-reader` isn't processed props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, - ReflectAvroDatumProvider.class.getName()); + DefaultAvroDatumProvider.class.getName()); + props.put(AvroDatumProvider.REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM, true); return props; } @@ -137,12 +139,8 @@ private static Properties getApicurioProducerProperties(String bootstrap, String props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, apicurio); - props.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, - SimpleTopicIdStrategy.class.getName()); props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, GetOrCreateIdStrategy.class.getName()); - props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, - ReflectAvroDatumProvider.class.getName()); return props; } diff --git a/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java index e4a50a1d5963a..308b3e7a2d739 100644 --- a/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java +++ b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java @@ -31,10 +31,7 @@ public Map start() { kafka.start(); registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final") .withExposedPorts(8080) - .withEnv("QUARKUS_PROFILE", "prod") - .withEnv("KAFKA_BOOTSTRAP_SERVERS", kafka.getBootstrapServers()) - .withEnv("APPLICATION_ID", "registry_id") - .withEnv("APPLICATION_SERVER", "localhost:9000"); + .withEnv("QUARKUS_PROFILE", "prod"); registry.start(); Map properties = new HashMap<>(); properties.put("schema.url.confluent", diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 101f092fc900a..bd71007b1a8e7 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -43,6 +43,7 @@ kafka-sasl kafka-snappy kafka-avro + kafka-avro-apicurio2 kafka-streams jpa jpa-db2 From 6691294c9b0cc75f8fc96b890b97a5f10767361a Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Tue, 20 Apr 2021 17:20:52 +0200 Subject: [PATCH 3/3] Update Kafka + Avro documentation to use Apicurio Registry 2.x --- .../asciidoc/kafka-schema-registry-avro.adoc | 185 ++++++++---------- 1 file changed, 79 insertions(+), 106 deletions(-) diff --git a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc index dc6cd8338bace..4dbddc0afce18 100644 --- a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc +++ b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc @@ -27,7 +27,7 @@ To complete this guide, you need: == Architecture -In this guide we are going to implement a REST resource, namely `MovieResource` that +In this guide we are going to implement a REST resource, namely `MovieResource`, that will consume movie DTOs and put them in a Kafka topic. Then, we will implement a consumer that will consume and collect messages from the same topic. @@ -35,7 +35,7 @@ The collected messages will be then exposed by another resource, `ConsumedMovieR https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events[Server-Sent Events]. The _Movies_ will be serialized and deserialized using Avro. -The schema, describing the _Movie_, is stored in an Apicurio schema registry. +The schema, describing the _Movie_, is stored in Apicurio Registry. The same concept applies if you are using the Confluent Avro _serde_ and Confluent Schema Registry. == Solution @@ -57,13 +57,10 @@ mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ -DprojectArtifactId=kafka-avro-schema-quickstart \ -DclassName="org.acme.kafka.MovieResource" \ -Dpath="/movies" \ - -Dextensions="resteasy-reactive,resteasy-reactive-jackson,rest-client,smallrye-reactive-messaging-kafka,avro" + -Dextensions="resteasy-reactive,resteasy-reactive-jackson,smallrye-reactive-messaging-kafka,avro" cd kafka-avro-schema-quickstart ---- -NOTE: Even though our application will not use a REST client directly, it is used under the hood by -the Apicurio serializer (to interact with the registry) and, at the moment it is required as a dependency. - Additionally, we need a serializer and deserializer for Avro. In this guide, we will use the ones provided by Apicurio. @@ -71,49 +68,29 @@ In this guide, we will use the ones provided by Apicurio. ---- io.apicurio - apicurio-registry-utils-serde - 1.2.2.Final - - - org.jboss.spec.javax.interceptor - jboss-interceptors-api_1.2_spec - - + apicurio-registry-serdes-avro-serde + 2.0.0.Final ---- [TIP] ==== -If you use Confluent Schema Registry, you need the following dependencies and the confluent repository added +If you use Confluent Schema Registry, you need the following dependencies and the Confluent Maven repository added to your `pom.xml`: [source,xml] ---- ... + - com.google.guava - guava - 30.1.1-jre - - - org.checkerframework - checker-qual - - + io.quarkus + quarkus-rest-client io.confluent kafka-avro-serializer - 5.5.0 + 6.1.1 - - io.swagger - swagger-core - - - jakarta.xml.bind - jakarta.xml.bind-api - jakarta.ws.rs jakarta.ws.rs-api @@ -121,6 +98,7 @@ to your `pom.xml`: + confluent @@ -137,7 +115,7 @@ to your `pom.xml`: Apache Avro is a data serialization system. Data structures are described using schemas. The first thing we need to do is to create a schema describing the `Movie` structure. Create a file called `src/main/avro/movie.avsc` with the schema for our record (Kafka message): -[source,javascript] +[source,json] ---- { "namespace": "org.acme.kafka.quarkus", @@ -162,7 +140,7 @@ placed in the `target/generated-sources/avsc` directory. Take a look at the https://avro.apache.org/docs/current/spec.html#schemas[Avro specification] to learn more about the Avro syntax and supported types. -TIP: With Quarkus, no need to a specific plugin to process the Avro schema, this is all done for you! +TIP: With Quarkus, there's no need to use a specific Maven plugin to process the Avro schema, this is all done for you! If you run the project with `mvn compile quarkus:dev`, the changes you do to the schema file will be automatically applied to the generated Java files. @@ -171,7 +149,7 @@ automatically applied to the generated Java files. == The `Movie` producer Having defined the schema, we can now jump to implementing the `MovieResource`. -Let's open the `MovieResource`, inject an Let's open the `MovieResource`, inject an https://quarkus.io/blog/reactive-messaging-emitter/[`Emitter`] of `Movie` DTO and implement a `@POST` method of `Movie` DTO and implement a `@POST` method +Let's open the `MovieResource`, inject an https://quarkus.io/blog/reactive-messaging-emitter/[`Emitter`] of `Movie` DTO and implement a `@POST` method that consumes `Movie` and sends it through the `Emitter`: [source,java] @@ -185,21 +163,18 @@ import org.jboss.logging.Logger; import javax.ws.rs.POST; import javax.ws.rs.Path; -import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @Path("/movies") public class MovieResource { - private static final Logger LOGGER = Logger.getLogger(MovieResource.class); - @Channel("movies") Emitter emitter; + @Channel("movies") + Emitter emitter; @POST public Response enqueueMovie(Movie movie) { - LOGGER.infof("Sending movie %s to Kafka", - movie.getTitle() - ); + LOGGER.infof("Sending movie %s to Kafka", movie.getTitle()); emitter.send(movie); return Response.accepted().build(); } @@ -211,8 +186,8 @@ Now, we need to _map_ the `movies` channel (the `Emitter` emits to this channel) To achieve this, edit the `application.properties` file, and add the following content: [source,properties] ---- -# set the URL of the Apicurio Schema Registry, a global setting shared between producers and consumers -mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api +# set the URL of the Apicurio Schema Registry, a global setting shared between all Kafka producers and consumers +mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 # set the connector to use for the `movies` channel to smallrye-kafka mp.messaging.outgoing.movies.connector=smallrye-kafka @@ -220,13 +195,11 @@ mp.messaging.outgoing.movies.connector=smallrye-kafka # the name of the corresponding Kafka topic to `movies` mp.messaging.outgoing.movies.topic=movies -# set the serializer for the `movies` channel to the Avro Serializer for Apicurio -mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer +# set the serializer for the `movies` channel to the Apicurio Avro Serializer +mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer -# Apicurio schema specific settings: -mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy -mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy -mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +# automatically register the schema with the registry, if not present +mp.messaging.outgoing.movies.apicurio.registry.auto-register=true ---- == The `Movie` consumer @@ -249,7 +222,7 @@ import javax.ws.rs.core.MediaType; import org.acme.kafka.quarkus.Movie; import org.eclipse.microprofile.reactive.messaging.Channel; -import org.jboss.resteasy.annotations.SseElementType; +import org.jboss.resteasy.reactive.RestSseElementType; import io.smallrye.mutiny.Multi; @@ -262,7 +235,7 @@ public class ConsumedMovieResource { @GET @Produces(MediaType.SERVER_SENT_EVENTS) - @SseElementType(MediaType.TEXT_PLAIN) + @RestSseElementType(MediaType.TEXT_PLAIN) public Multi stream() { return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear())); } @@ -279,14 +252,14 @@ mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka # set the topic name for the channel to `movies` mp.messaging.incoming.movies-from-kafka.topic=movies -# set the deserializer for the `movies-from-kafka` channel to the Avro Deserializer for Apicurio -mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer +# set the deserializer for the `movies-from-kafka` channel to the Apicurio Avro Deserializer +mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer # disable auto-commit, Reactive Messaging handles it itself mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest -mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +mp.messaging.incoming.movies-from-kafka.apicurio.registry.use-specific-avro-reader=true ---- @@ -303,10 +276,10 @@ version: '2' services: zookeeper: - image: strimzi/kafka:0.20.1-kafka-2.5.0 + image: quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 command: [ - "sh", "-c", - "bin/zookeeper-server-start.sh config/zookeeper.properties" + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" ] ports: - "2181:2181" @@ -314,10 +287,10 @@ services: LOG_DIR: /tmp/logs kafka: - image: strimzi/kafka:0.20.1-kafka-2.5.0 + image: quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 command: [ - "sh", "-c", - "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" ] depends_on: - zookeeper @@ -328,17 +301,15 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + schema-registry: - image: apicurio/apicurio-registry-mem:1.3.2.Final + image: apicurio/apicurio-registry-mem:2.0.0.Final ports: - 8081:8080 depends_on: - kafka environment: QUARKUS_PROFILE: prod - KAFKA_BOOTSTRAP_SERVERS: localhost:9092 - APPLICATION_ID: registry_id - APPLICATION_SERVER: localhost:9000 ---- == Running the application @@ -401,7 +372,7 @@ data:'12 Angry Men' from 1957 == Building a native executable Building a native executable -You can build a native executable with the usual command ./mvnw package -Dnative. +You can build a native executable with the usual command `./mvnw package -Dnative`. Running it is as simple as executing `./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner`. @@ -410,12 +381,18 @@ Running it is as simple as executing `./target/kafka-avro-schema-quickstart-1.0. === Infrastructure for tests We will now use Testcontainers to set up Kafka and Apicurio Schema Registry for tests. -First, let's add test dependencies on Awaitility and Strimzi to `pom.xml`, Testcontainers will be pulled +First, let's add test dependencies on REST Client, Awaitility and Strimzi to `pom.xml`. Testcontainers will be pulled in transitively by `strimzi-test-container`: [source,xml] ---- ... + + + io.quarkus + quarkus-rest-client + test + io.strimzi strimzi-test-container @@ -459,16 +436,13 @@ public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLi @Override public Map start() { kafka.start(); - registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final") + registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.0.0.Final") .withExposedPorts(8080) - .withEnv("QUARKUS_PROFILE", "prod") - .withEnv("KAFKA_BOOTSTRAP_SERVERS", kafka.getBootstrapServers()) - .withEnv("APPLICATION_ID", "registry_id") - .withEnv("APPLICATION_SERVER", "localhost:9000"); + .withEnv("QUARKUS_PROFILE", "prod"); registry.start(); Map properties = new HashMap<>(); properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", - "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api"); + "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"); properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); return properties; } @@ -500,7 +474,6 @@ import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; import javax.ws.rs.sse.SseEventSource; - import java.net.URI; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -509,11 +482,9 @@ import java.util.concurrent.Executors; import static io.restassured.RestAssured.given; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.equalToIgnoringCase; @QuarkusTest // register the class that sets up Testcontainers: @@ -524,7 +495,7 @@ public class MovieResourceTest { URI consumedMovies; @Test - public void testHelloEndpoint() { + public void testHelloEndpoint() throws InterruptedException { // create a client for `ConsumedMovieResource` and collect the consumed resources in a list Client client = ClientBuilder.newClient(); WebTarget target = client.target(consumedMovies); @@ -540,41 +511,43 @@ public class MovieResourceTest { source.open(); // check if, after at most 5 seconds, we have at last 2 items collected, and they are what we expect: - await().atMost(5000, MILLISECONDS).until(() -> received.size() >= 2); + await().atMost(5, SECONDS).until(() -> received.size() >= 2); assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994", "'12 Angry Men' from 1957")); source.close(); // shutdown the executor that is feeding the `MovieResource` - movieSender.shutdown(); + movieSender.shutdownNow(); + movieSender.awaitTermination(5, SECONDS); } private ExecutorService startSendingMovies() { ExecutorService executorService = Executors.newSingleThreadExecutor(); - executorService - .execute( - () -> { - while (true) { - given() - .contentType(ContentType.JSON) - .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}") - .when().post("/movies") - .then() - .statusCode(202); - given() - .contentType(ContentType.JSON) - .body("{\"title\":\"12 Angry Men\",\"year\":1957}") - .when().post("/movies") - .then() - .statusCode(202); - try { - Thread.sleep(200L); - } catch (InterruptedException e) { - break; - } - } - } - ); + executorService.execute(() -> { + while (true) { + given() + .contentType(ContentType.JSON) + .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}") + .when() + .post("/movies") + .then() + .statusCode(202); + + given() + .contentType(ContentType.JSON) + .body("{\"title\":\"12 Angry Men\",\"year\":1957}") + .when() + .post("/movies") + .then() + .statusCode(202); + + try { + Thread.sleep(200L); + } catch (InterruptedException e) { + break; + } + } + }); return executorService; } @@ -583,7 +556,7 @@ public class MovieResourceTest { NOTE: We modified the `MovieResourceTest` that was generated together with the project. This test class has a subclass, `NativeMovieResourceIT`, that runs the same test against the native executable. -To run it, execute `mvn package verify -Dnative`, or `mvn clean install -Dnative` +To run it, execute `mvn verify -Dnative`, or `mvn clean install -Dnative` == Avro code generation details @@ -614,4 +587,4 @@ regular getter will be generated. Defaults to `false` * link:https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.9/kafka/kafka.html[SmallRye Reactive Messaging Kafka] documentation * link:https://quarkus.io/blog/kafka-avro/[How to Use Kafka, Schema Registry and Avro with Quarkus] - a blog post on which -the guide is based. It gives a good introduction to Avro and the concept of Schema Registry \ No newline at end of file +the guide is based. It gives a good introduction to Avro and the concept of Schema Registry