From bafbe397f351f51a7ca5f74e8f54387711581e42 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Wed, 20 Dec 2023 18:01:09 +0100 Subject: [PATCH 1/5] Add confluent json serde support --- extensions/schema-registry/confluent/pom.xml | 2 +- integration-tests/kafka-json-schema-apicurio2/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/schema-registry/confluent/pom.xml b/extensions/schema-registry/confluent/pom.xml index 8331593118dbf..f1f3fd770436f 100644 --- a/extensions/schema-registry/confluent/pom.xml +++ b/extensions/schema-registry/confluent/pom.xml @@ -25,7 +25,7 @@ org.jetbrains.kotlin kotlin-scripting-compiler-embeddable - 1.9.22 + 1.6.0 org.json diff --git a/integration-tests/kafka-json-schema-apicurio2/pom.xml b/integration-tests/kafka-json-schema-apicurio2/pom.xml index 3a1713af0cc62..7fa3b388a9ff8 100644 --- a/integration-tests/kafka-json-schema-apicurio2/pom.xml +++ b/integration-tests/kafka-json-schema-apicurio2/pom.xml @@ -23,7 +23,7 @@ org.jetbrains.kotlin kotlin-scripting-compiler-embeddable - 1.9.22 + 1.6.0 org.json From 46165fff695d64e75371b5f90fcc47aae80ea237 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Wed, 20 Dec 2023 18:01:09 +0100 Subject: [PATCH 2/5] Add protobuf extension for apicurio registry --- .../apicurio/protobuf/deployment/pom.xml | 45 ++++++++++++ .../ApicurioRegistryProtobufProcessor.java | 48 +++++++++++++ .../schema-registry/apicurio/protobuf/pom.xml | 31 +++++++++ .../apicurio/protobuf/runtime/pom.xml | 69 +++++++++++++++++++ .../resources/META-INF/quarkus-extension.yaml | 12 ++++ 5 files changed, 205 insertions(+) create mode 100644 extensions/schema-registry/apicurio/protobuf/deployment/pom.xml create mode 100644 extensions/schema-registry/apicurio/protobuf/deployment/src/main/java/io/quarkus/apicurio/registry/protobuf/ApicurioRegistryProtobufProcessor.java create mode 100644 extensions/schema-registry/apicurio/protobuf/pom.xml create mode 100644 extensions/schema-registry/apicurio/protobuf/runtime/pom.xml create mode 100644 extensions/schema-registry/apicurio/protobuf/runtime/src/main/resources/META-INF/quarkus-extension.yaml diff --git a/extensions/schema-registry/apicurio/protobuf/deployment/pom.xml b/extensions/schema-registry/apicurio/protobuf/deployment/pom.xml new file mode 100644 index 0000000000000..3418339af4b67 --- /dev/null +++ b/extensions/schema-registry/apicurio/protobuf/deployment/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + + io.quarkus + quarkus-apicurio-registry-protobuf-parent + 999-SNAPSHOT + + + quarkus-apicurio-registry-protobuf-deployment + Quarkus - Apicurio Registry - Protobuf - Deployment + + + + io.quarkus + quarkus-apicurio-registry-protobuf + + + + io.quarkus + quarkus-apicurio-registry-common-deployment + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/schema-registry/apicurio/protobuf/deployment/src/main/java/io/quarkus/apicurio/registry/protobuf/ApicurioRegistryProtobufProcessor.java b/extensions/schema-registry/apicurio/protobuf/deployment/src/main/java/io/quarkus/apicurio/registry/protobuf/ApicurioRegistryProtobufProcessor.java new file mode 100644 index 0000000000000..b50b38a5b7ed7 --- /dev/null +++ b/extensions/schema-registry/apicurio/protobuf/deployment/src/main/java/io/quarkus/apicurio/registry/protobuf/ApicurioRegistryProtobufProcessor.java @@ -0,0 +1,48 @@ +package io.quarkus.apicurio.registry.protobuf; + +import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.Feature; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; + +public class ApicurioRegistryProtobufProcessor { + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(Feature.APICURIO_REGISTRY_PROTOBUF); + } + + @BuildStep + public void apicurioRegistryProtobuf(BuildProducer reflectiveClass, + BuildProducer sslNativeSupport) { + + reflectiveClass + .produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.protobug.ProtobufKafkaDeserializer", + "io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer").methods().build()); + + reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy", + "io.apicurio.registry.serde.strategy.TopicIdStrategy").methods().fields() + .build()); + + reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.DefaultIdHandler", + "io.apicurio.registry.serde.Legacy4ByteIdHandler", + "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider", + "io.apicurio.registry.serde.headers.DefaultHeadersHandler").methods().fields() + .build()); + + String defaultSchemaResolver = "io.apicurio.registry.serde.DefaultSchemaResolver"; + if (QuarkusClassLoader.isClassPresentAtRuntime(defaultSchemaResolver)) { + // Class not present after 2.2.0.Final + reflectiveClass.produce(ReflectiveClassBuildItem.builder(defaultSchemaResolver).methods() + .fields().build()); + } + } + + @BuildStep + ExtensionSslNativeSupportBuildItem enableSslInNative() { + return new ExtensionSslNativeSupportBuildItem(Feature.APICURIO_REGISTRY_PROTOBUF); + } + +} diff --git a/extensions/schema-registry/apicurio/protobuf/pom.xml b/extensions/schema-registry/apicurio/protobuf/pom.xml new file mode 100644 index 0000000000000..d2e6866f46a43 --- /dev/null +++ b/extensions/schema-registry/apicurio/protobuf/pom.xml @@ -0,0 +1,31 @@ + + + + quarkus-apicurio-registry-parent + io.quarkus + 999-SNAPSHOT + ../pom.xml + + + 4.0.0 + quarkus-apicurio-registry-protobuf-parent + Quarkus - Apicurio Registry - Protobuf + pom + + + + + kotlinx-serialization-core-jvm + org.jetbrains.kotlinx + 1.6.0 + + + + + + deployment + runtime + + diff --git a/extensions/schema-registry/apicurio/protobuf/runtime/pom.xml b/extensions/schema-registry/apicurio/protobuf/runtime/pom.xml new file mode 100644 index 0000000000000..d256a8a6cf183 --- /dev/null +++ b/extensions/schema-registry/apicurio/protobuf/runtime/pom.xml @@ -0,0 +1,69 @@ + + + 4.0.0 + + + io.quarkus + quarkus-apicurio-registry-protobuf-parent + 999-SNAPSHOT + + + quarkus-apicurio-registry-protobuf + Quarkus - Apicurio Registry - Protobuf - Runtime + Use Apicurio as Protobuf schema registry + + + + io.apicurio + apicurio-registry-serdes-protobuf-serde + + + io.apicurio + apicurio-common-rest-client-jdk + + + checker-qual + org.checkerframework + + + slf4j-jboss-logging + org.jboss.slf4j + + + + + + io.quarkus + quarkus-apicurio-registry-common + + + + + + + + io.quarkus + quarkus-extension-maven-plugin + + + io.quarkus.apicurio.registry.protobuf + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/schema-registry/apicurio/protobuf/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/schema-registry/apicurio/protobuf/runtime/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 0000000000000..f3cde6421af57 --- /dev/null +++ b/extensions/schema-registry/apicurio/protobuf/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,12 @@ +--- +artifact: ${project.groupId}:${project.artifactId}:${project.version} +name: "Apicurio Registry - Protobuf" +metadata: + keywords: + - "apicurio" + - "protobuf" + - "kafka" + guide: "https://quarkus.io/guides/kafka-schema-registry-protobuf" + categories: + - "serialization" + status: "draft" From 710f24159dac176f45d9b37df6db33d9d6a80c34 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Thu, 21 Dec 2023 13:12:24 +0100 Subject: [PATCH 3/5] Add apicurio protobuf integration tests --- bom/application/pom.xml | 15 + .../java/io/quarkus/deployment/Feature.java | 1 + devtools/bom-descriptor-json/pom.xml | 13 + docs/pom.xml | 13 + extensions/schema-registry/apicurio/pom.xml | 1 + .../schema-registry/apicurio/protobuf/pom.xml | 2 +- .../kafka-protobuf-apicurio2/pom.xml | 274 ++++++++++++++++++ .../io/quarkus/it/kafka/protobuf/Pet.java | 31 ++ .../it/kafka/protobuf/ProtobufEndpoint.java | 65 +++++ .../kafka/protobuf/ProtobufKafkaCreator.java | 117 ++++++++ .../src/main/proto/pet.proto | 10 + .../src/main/resources/application.properties | 8 + .../quarkus/it/kafka/KafkaProtobufTest.java | 18 ++ .../it/kafka/KafkaProtobufTestBase.java | 66 +++++ .../io/quarkus/it/kafka/KafkaResource.java | 39 +++ integration-tests/pom.xml | 1 + 16 files changed, 673 insertions(+), 1 deletion(-) create mode 100644 integration-tests/kafka-protobuf-apicurio2/pom.xml create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/Pet.java create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufEndpoint.java create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufKafkaCreator.java create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/main/proto/pet.proto create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/main/resources/application.properties create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTest.java create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestBase.java create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java diff --git a/bom/application/pom.xml b/bom/application/pom.xml index f7a3e52c128ee..61501795eb62f 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -1405,6 +1405,16 @@ quarkus-apicurio-registry-json-schema-deployment ${project.version} + + io.quarkus + quarkus-apicurio-registry-protobuf + ${project.version} + + + io.quarkus + quarkus-apicurio-registry-protobuf-deployment + ${project.version} + io.quarkus quarkus-confluent-registry-common @@ -3420,6 +3430,11 @@ apicurio-registry-serdes-jsonschema-serde ${apicurio-registry.version} + + io.apicurio + apicurio-registry-serdes-protobuf-serde + ${apicurio-registry.version} + io.apicurio apicurio-common-rest-client-vertx diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java index a504a565e058f..d0662022bcc34 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java @@ -14,6 +14,7 @@ public enum Feature { AZURE_FUNCTIONS, APICURIO_REGISTRY_AVRO, APICURIO_REGISTRY_JSON_SCHEMA, + APICURIO_REGISTRY_PROTOBUF, AWT, CACHE, CDI, diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index bdc9ab6b66b46..a2651d3613f6f 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -213,6 +213,19 @@ + + io.quarkus + quarkus-apicurio-registry-protobuf + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc diff --git a/docs/pom.xml b/docs/pom.xml index b64f175513abd..a0b0fce8fa75b 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -229,6 +229,19 @@ + + io.quarkus + quarkus-apicurio-registry-protobuf-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc-deployment diff --git a/extensions/schema-registry/apicurio/pom.xml b/extensions/schema-registry/apicurio/pom.xml index 48249736defb5..1de3e8ff5643f 100644 --- a/extensions/schema-registry/apicurio/pom.xml +++ b/extensions/schema-registry/apicurio/pom.xml @@ -18,5 +18,6 @@ common avro json-schema + protobuf diff --git a/extensions/schema-registry/apicurio/protobuf/pom.xml b/extensions/schema-registry/apicurio/protobuf/pom.xml index d2e6866f46a43..45a1f5cdd660a 100644 --- a/extensions/schema-registry/apicurio/protobuf/pom.xml +++ b/extensions/schema-registry/apicurio/protobuf/pom.xml @@ -19,7 +19,7 @@ kotlinx-serialization-core-jvm org.jetbrains.kotlinx - 1.6.0 + 1.6.1 diff --git a/integration-tests/kafka-protobuf-apicurio2/pom.xml b/integration-tests/kafka-protobuf-apicurio2/pom.xml new file mode 100644 index 0000000000000..d15378d82f7b4 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/pom.xml @@ -0,0 +1,274 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-kafka-protobuf-apicurio2 + Quarkus - Integration Tests - Kafka Protobuf with Apicurio 2.x + The Apache Kafka Protobuf with Apicurio Registry 2.x integration tests module + + + + + kotlinx-serialization-core-jvm + org.jetbrains.kotlinx + 1.6.0 + + + com.squareup.okio + okio + 3.6.0 + + + + + + + io.quarkus + quarkus-integration-test-class-transformer + + + io.quarkus + quarkus-integration-test-shared-library + + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jackson + + + + + io.quarkus + quarkus-kafka-client + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + + + + io.quarkus + quarkus-apicurio-registry-protobuf + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + jakarta.xml.bind + jakarta.xml.bind-api + + + + + io.strimzi + strimzi-test-container + test + + + org.apache.logging.log4j + log4j-core + + + + + org.testcontainers + testcontainers + test + + + + + io.quarkus + quarkus-integration-test-class-transformer-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-kafka-client-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-apicurio-registry-protobuf-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + confluent + https://packages.confluent.io/maven/ + + false + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + initialize + + detect + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + true + + + gencode + generate-sources + + compile + test-compile + + + ./src/main/proto + + com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier} + + + + + + + + maven-failsafe-plugin + + true + + + + + maven-surefire-plugin + + true + + + + + + + + test-kafka + + + test-containers + + + + + + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + false + + + + + + + + diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/Pet.java b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/Pet.java new file mode 100644 index 0000000000000..97eb9b1696118 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/Pet.java @@ -0,0 +1,31 @@ +package io.quarkus.it.kafka.protobuf; + +public class Pet { + + private String name; + private String color; + + public Pet() { + } + + public Pet(String name, String color) { + this.name = name; + this.color = color; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getColor() { + return color; + } + + public void setColor(String color) { + this.color = color; + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufEndpoint.java b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufEndpoint.java new file mode 100644 index 0000000000000..15244de51be67 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufEndpoint.java @@ -0,0 +1,65 @@ +package io.quarkus.it.kafka.protobuf; + +import java.time.Duration; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.vertx.core.json.JsonObject; + +/** + * Endpoint to test the Protobuf support + */ +@Path("/protobuf") +public class ProtobufEndpoint { + + @Inject + ProtobufKafkaCreator creator; + + @GET + @Path("/apicurio") + public JsonObject getApicurio() { + return get(creator.createApicurioConsumer("test-protobuf-apicurio-consumer", "test-protobuf-apicurio-consumer")); + } + + @POST + @Path("/apicurio") + public void sendApicurio(io.quarkus.it.kafka.protobuf.Pet pet) { + KafkaProducer p = creator + .createApicurioProducer("test-protobuf-apicurio"); + send(p, pet, "test-protobuf-apicurio-producer"); + } + + private JsonObject get(KafkaConsumer consumer) { + final ConsumerRecords records = consumer + .poll(Duration.ofMillis(60000)); + if (records.isEmpty()) { + return null; + } + ConsumerRecord consumerRecord = records.iterator().next(); + com.example.tutorial.PetOuterClass.Pet p = consumerRecord.value(); + // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema. + JsonObject result = new JsonObject(); + result.put("name", p.getName()); + result.put("color", p.getColor()); + return result; + } + + private void send(KafkaProducer producer, Pet pet, String topic) { + com.example.tutorial.PetOuterClass.Pet protoPet = com.example.tutorial.PetOuterClass.Pet.newBuilder() + .setColor(pet.getColor()) + .setName(pet.getName()) + .build(); + + producer.send(new ProducerRecord<>(topic, 0, protoPet)); + producer.flush(); + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufKafkaCreator.java b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufKafkaCreator.java new file mode 100644 index 0000000000000..f576b763183ed --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/java/io/quarkus/it/kafka/protobuf/ProtobufKafkaCreator.java @@ -0,0 +1,117 @@ +package io.quarkus.it.kafka.protobuf; + +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import com.example.tutorial.PetOuterClass.Pet; + +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer; + +/** + * Create Protobuf Kafka Consumers and Producers + */ +@ApplicationScoped +public class ProtobufKafkaCreator { + + @ConfigProperty(name = "kafka.bootstrap.servers") + String bootstrap; + + @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url") + String apicurioRegistryUrl; + + public ProtobufKafkaCreator() { + } + + public ProtobufKafkaCreator(String bootstrap, String apicurioRegistryUrl) { + this.bootstrap = bootstrap; + this.apicurioRegistryUrl = apicurioRegistryUrl; + } + + public String getApicurioRegistryUrl() { + return apicurioRegistryUrl; + } + + public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) { + return createApicurioConsumer(bootstrap, getApicurioRegistryUrl(), groupdIdConfig, subscribtionName); + } + + public KafkaProducer createApicurioProducer(String clientId) { + return createApicurioProducer(bootstrap, getApicurioRegistryUrl(), clientId); + } + + public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio, + String groupdIdConfig, String subscribtionName) { + Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig); + return createConsumer(p, subscribtionName); + } + + public static KafkaProducer createApicurioProducer(String bootstrap, String apicurio, + String clientId) { + Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId); + return createProducer(p); + } + + private static KafkaConsumer createConsumer(Properties props, String subscribtionName) { + if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) { + props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(subscribtionName)); + return consumer; + } + + private static KafkaProducer createProducer(Properties props) { + if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) { + props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + return new KafkaProducer<>(props); + } + + public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) { + Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufKafkaDeserializer.class.getName()); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + return props; + } + + private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + return props; + } + + private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) { + Properties props = getGenericProducerProperties(bootstrap, clientId); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufKafkaSerializer.class.getName()); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true); + return props; + } + + private static Properties getGenericProducerProperties(String bootstrap, String clientId) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + return props; + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/proto/pet.proto b/integration-tests/kafka-protobuf-apicurio2/src/main/proto/pet.proto new file mode 100644 index 0000000000000..548f4bb384692 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/proto/pet.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package tutorial; + +option java_package = "com.example.tutorial"; + +message Pet { + string name = 1; + string color = 2; +} \ No newline at end of file diff --git a/integration-tests/kafka-protobuf-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-protobuf-apicurio2/src/main/resources/application.properties new file mode 100644 index 0000000000000..da7eb6b7cfb26 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/main/resources/application.properties @@ -0,0 +1,8 @@ +quarkus.log.category.kafka.level=WARN +quarkus.log.category.\"org.apache.kafka\".level=WARN +quarkus.log.category.\"org.apache.zookeeper\".level=WARN + +# enable health check +quarkus.kafka.health.enabled=true + +quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.4.2.Final diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTest.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTest.java new file mode 100644 index 0000000000000..b632a7df73205 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTest.java @@ -0,0 +1,18 @@ +package io.quarkus.it.kafka; + +import jakarta.inject.Inject; + +import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class KafkaProtobufTest extends KafkaProtobufTestBase { + + @Inject + ProtobufKafkaCreator creator; + + @Override + ProtobufKafkaCreator creator() { + return creator; + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestBase.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestBase.java new file mode 100644 index 0000000000000..ed87f1cfa083b --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestBase.java @@ -0,0 +1,66 @@ +package io.quarkus.it.kafka; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.example.tutorial.PetOuterClass.Pet; + +import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; +import io.restassured.RestAssured; + +public abstract class KafkaProtobufTestBase { + + static final String APICURIO_PATH = "/protobuf/apicurio"; + + abstract ProtobufKafkaCreator creator(); + + @Test + public void testUrls() { + Assertions.assertTrue(creator().getApicurioRegistryUrl().endsWith("/apis/registry/v2")); + } + + @Test + public void testApicurioProtobufProducer() { + KafkaConsumer consumer = creator().createApicurioConsumer( + "test-protobuf-apicurio", + "test-protobuf-apicurio-producer"); + testProtobufProducer(consumer, APICURIO_PATH); + } + + @Test + public void testApicurioProtobufConsumer() { + KafkaProducer producer = creator().createApicurioProducer("test-protobuf-apicurio-test"); + testProtobufConsumer(producer, APICURIO_PATH, "test-protobuf-apicurio-consumer"); + } + + protected void testProtobufProducer(KafkaConsumer consumer, String path) { + RestAssured.given() + .header("content-type", "application/json") + .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") + .post(path); + + ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); + Assertions.assertEquals(records.key(), (Integer) 0); + Pet pet = records.value(); + Assertions.assertEquals("neo", pet.getName()); + Assertions.assertEquals("tricolor", pet.getColor()); + consumer.close(); + } + + protected void testProtobufConsumer(KafkaProducer producer, String path, String topic) { + producer.send(new ProducerRecord<>(topic, 1, createPet())); + producer.close(); + } + + private Pet createPet() { + return Pet.newBuilder() + .setName("neo") + .setColor("white").build(); + } +} diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java new file mode 100644 index 0000000000000..edbd3647cb306 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java @@ -0,0 +1,39 @@ +package io.quarkus.it.kafka; + +import java.util.Collections; +import java.util.Map; + +import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; +import io.quarkus.test.common.DevServicesContext; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class KafkaResource implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware { + + ProtobufKafkaCreator creator; + + @Override + public void setIntegrationTestContext(DevServicesContext context) { + Map devServicesProperties = context.devServicesProperties(); + String bootstrapServers = devServicesProperties.get("kafka.bootstrap.servers"); + if (bootstrapServers != null) { + String apicurioUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.apicurio.registry.url"); + creator = new ProtobufKafkaCreator(bootstrapServers, apicurioUrl); + } + } + + @Override + public Map start() { + return Collections.emptyMap(); + } + + @Override + public void stop() { + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields( + creator, + new TestInjector.MatchesType(ProtobufKafkaCreator.class)); + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index f2ab116a3cbd2..b22b4b652f495 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -233,6 +233,7 @@ kafka-snappy kafka-avro-apicurio2 kafka-json-schema-apicurio2 + kafka-protobuf-apicurio2 kafka-streams kafka-devservices jpa From ede45bd6969a02d0e5ffd068fb657b47046ebb35 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Wed, 31 Jan 2024 11:49:11 +0100 Subject: [PATCH 4/5] Add protobuf native integration tests for apicurio serdes support --- .../quarkus/it/kafka/KafkaProtobufTestIT.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java new file mode 100644 index 0000000000000..88d526393d442 --- /dev/null +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java @@ -0,0 +1,28 @@ +package io.quarkus.it.kafka; + +import io.apicurio.registry.rest.client.RegistryClientFactory; +import io.apicurio.rest.client.VertxHttpClientProvider; +import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.vertx.core.Vertx; +import org.junit.jupiter.api.BeforeAll; + +@QuarkusIntegrationTest +@QuarkusTestResource(value = KafkaResource.class, restrictToAnnotatedClass = true) +public class KafkaProtobufTestIT extends KafkaProtobufTestBase { + + ProtobufKafkaCreator creator; + + @Override + ProtobufKafkaCreator creator() { + return creator; + } + + @BeforeAll + public static void setUp() { + // this is for the test JVM, which also uses Kafka client, which in turn also interacts with the registry + RegistryClientFactory.setProvider(new VertxHttpClientProvider(Vertx.vertx())); + } + +} From 267407bca3001e88414a2a54140d3dff33e26b70 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Mon, 19 Feb 2024 17:22:15 +0100 Subject: [PATCH 5/5] Make test error reproducible --- bom/application/pom.xml | 2 -- extensions/schema-registry/apicurio/protobuf/runtime/pom.xml | 4 ++++ .../enforcer-rules/quarkus-banned-dependencies-okhttp.xml | 1 - .../test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java | 3 ++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 61501795eb62f..67333cbe93cfe 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -6330,8 +6330,6 @@ com.squareup.okhttp3:* com.squareup.okhttp:* - com.squareup.okio:* - diff --git a/extensions/schema-registry/apicurio/protobuf/runtime/pom.xml b/extensions/schema-registry/apicurio/protobuf/runtime/pom.xml index d256a8a6cf183..a5b5c12c4002a 100644 --- a/extensions/schema-registry/apicurio/protobuf/runtime/pom.xml +++ b/extensions/schema-registry/apicurio/protobuf/runtime/pom.xml @@ -31,6 +31,10 @@ slf4j-jboss-logging org.jboss.slf4j + + okhttp + com.squareup.okhttp3 + diff --git a/independent-projects/enforcer-rules/src/main/resources/enforcer-rules/quarkus-banned-dependencies-okhttp.xml b/independent-projects/enforcer-rules/src/main/resources/enforcer-rules/quarkus-banned-dependencies-okhttp.xml index 2b6932e5d6f95..430f1b29dbc9a 100644 --- a/independent-projects/enforcer-rules/src/main/resources/enforcer-rules/quarkus-banned-dependencies-okhttp.xml +++ b/independent-projects/enforcer-rules/src/main/resources/enforcer-rules/quarkus-banned-dependencies-okhttp.xml @@ -5,7 +5,6 @@ com.squareup.okhttp3:* com.squareup.okhttp:* - com.squareup.okio:* diff --git a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java index 88d526393d442..250fde57e6111 100644 --- a/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java +++ b/integration-tests/kafka-protobuf-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaProtobufTestIT.java @@ -1,12 +1,13 @@ package io.quarkus.it.kafka; +import org.junit.jupiter.api.BeforeAll; + import io.apicurio.registry.rest.client.RegistryClientFactory; import io.apicurio.rest.client.VertxHttpClientProvider; import io.quarkus.it.kafka.protobuf.ProtobufKafkaCreator; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusIntegrationTest; import io.vertx.core.Vertx; -import org.junit.jupiter.api.BeforeAll; @QuarkusIntegrationTest @QuarkusTestResource(value = KafkaResource.class, restrictToAnnotatedClass = true)