diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 4812c5c207d91..6f29977a0c332 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -207,7 +207,7 @@ 2.22.0 1.3.0.Final 1.11.3 - 2.5.3.Final + 2.5.6.Final 0.1.18.Final 1.19.3 3.3.4 diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index b19dd413f2076..503b81f253dfb 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -908,6 +908,8 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T } } + //TODO autodiscovery of json serdes + // Jackson-based serializer/deserializer // note that Jackson is always present with Kafka, so no need to check { diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java index 20b3933c8a3ef..e04adaaaf1fe6 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java @@ -1,9 +1,11 @@ package io.quarkus.it.kafka.jsonschema; -import io.apicurio.registry.serde.SerdeConfig; -import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer; -import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + import jakarta.enterprise.context.ApplicationScoped; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -12,9 +14,9 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.eclipse.microprofile.config.inject.ConfigProperty; -import java.util.Collections; -import java.util.Properties; -import java.util.UUID; +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer; +import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer; /** * Create Json Schema Kafka Consumers and Producers @@ -49,13 +51,13 @@ public KafkaProducer createApicurioProducer(String clientId) { } public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio, - String groupdIdConfig, String subscribtionName) { + String groupdIdConfig, String subscribtionName) { Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig); return createConsumer(p, subscribtionName); } public static KafkaProducer createApicurioProducer(String bootstrap, String apicurio, - String clientId) { + String clientId) { Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId); return createProducer(p); } @@ -98,6 +100,9 @@ private static Properties getApicurioProducerProperties(String bootstrap, String Properties props = getGenericProducerProperties(bootstrap, clientId); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName()); + props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true); + props.put(SerdeConfig.SCHEMA_LOCATION, "/io/quarkus/it/kafka/json-schema.json"); + props.put(SerdeConfig.VALIDATION_ENABLED, "true"); props.put(SerdeConfig.REGISTRY_URL, apicurio); return props; } diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/resources/io/quarkus/it/kafka/json-schema.json b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/io/quarkus/it/kafka/json-schema.json similarity index 100% rename from integration-tests/kafka-json-schema-apicurio2/src/test/resources/io/quarkus/it/kafka/json-schema.json rename to integration-tests/kafka-json-schema-apicurio2/src/main/resources/io/quarkus/it/kafka/json-schema.json diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java index 3e51516254338..31ddb23296938 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java @@ -1,12 +1,13 @@ package io.quarkus.it.kafka; +import org.junit.jupiter.api.BeforeAll; + import io.apicurio.registry.rest.client.RegistryClientFactory; import io.apicurio.rest.client.VertxHttpClientProvider; import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusIntegrationTest; import io.vertx.core.Vertx; -import org.junit.jupiter.api.BeforeAll; @QuarkusIntegrationTest @QuarkusTestResource(value = KafkaResource.class, restrictToAnnotatedClass = true) diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java index 4bcfe8c6982b2..606ded95dadfb 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java @@ -1,8 +1,9 @@ package io.quarkus.it.kafka; +import jakarta.inject.Inject; + import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; import io.quarkus.test.junit.QuarkusTest; -import jakarta.inject.Inject; @QuarkusTest public class KafkaJsonSchemaTest extends KafkaJsonSchemaTestBase { diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java index 71444631ba03e..1d92e57cdb404 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java @@ -1,10 +1,7 @@ package io.quarkus.it.kafka; -import io.apicurio.registry.rest.client.RegistryClientFactory; -import io.apicurio.registry.types.ArtifactType; -import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; -import io.quarkus.it.kafka.jsonschema.Pet; -import io.restassured.RestAssured; +import java.time.Duration; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -12,8 +9,9 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.InputStream; -import java.time.Duration; +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.it.kafka.jsonschema.Pet; +import io.restassured.RestAssured; public abstract class KafkaJsonSchemaTestBase { @@ -30,10 +28,6 @@ public void testUrls() { public void testApicurioJsonSchemaProducer() { String subscriptionName = "test-json-schema-apicurio-producer"; - //Since autoregister is not supported for json schema, we must register the schema beforehand. - InputStream jsonSchema = getClass().getResourceAsStream("/io/quarkus/it/kafka/json-schema.json"); - RegistryClientFactory.create(creator().getApicurioRegistryUrl()).createArtifact(null, subscriptionName + "-value", ArtifactType.JSON, jsonSchema); - KafkaConsumer consumer = creator().createApicurioConsumer( "test-json-schema-apicurio", subscriptionName); @@ -45,9 +39,6 @@ public void testApicurioJsonSchemaConsumer() { String topic = "test-json-schema-apicurio-consumer"; //Since autoregister is not supported for json schema, we must register the schema beforehand. - InputStream jsonSchema = getClass().getResourceAsStream("/io/quarkus/it/kafka/json-schema.json"); - RegistryClientFactory.create(creator().getApicurioRegistryUrl()).createArtifact(null, topic + "-value", ArtifactType.JSON, jsonSchema); - KafkaProducer producer = creator().createApicurioProducer("test-json-schema-apicurio-test"); testJsonSchemaConsumer(producer, APICURIO_PATH, topic); } diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java index 3ea4a51d86b6b..dabe27a7715ed 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java @@ -1,12 +1,12 @@ package io.quarkus.it.kafka; +import java.util.Collections; +import java.util.Map; + import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; import io.quarkus.test.common.DevServicesContext; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; -import java.util.Collections; -import java.util.Map; - public class KafkaResource implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware { JsonSchemaKafkaCreator creator;