diff --git a/.github/native-tests.json b/.github/native-tests.json index 4308dea0d13c3..577be470bb8eb 100644 --- a/.github/native-tests.json +++ b/.github/native-tests.json @@ -57,7 +57,7 @@ { "category": "Messaging1", "timeout": 115, - "test-modules": "kafka, kafka-ssl, kafka-sasl, kafka-avro-apicurio2, kafka-snappy, kafka-streams, reactive-messaging-kafka, kafka-oauth-keycloak", + "test-modules": "kafka, kafka-ssl, kafka-sasl, kafka-avro-apicurio2, kafka-json-schema-apicurio2, kafka-snappy, kafka-streams, reactive-messaging-kafka, kafka-oauth-keycloak", "os-name": "ubuntu-latest" }, { diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 745784680990f..ceaf1e633c032 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -207,7 +207,7 @@ 2.22.0 1.3.0.Final 1.11.3 - 2.5.3.Final + 2.5.7.Final 0.1.18.Final 1.19.3 3.3.4 @@ -1379,6 +1379,16 @@ quarkus-apicurio-registry-avro-deployment ${project.version} + + io.quarkus + quarkus-apicurio-registry-json-schema + ${project.version} + + + io.quarkus + quarkus-apicurio-registry-json-schema-deployment + ${project.version} + io.quarkus quarkus-confluent-registry-common @@ -3379,6 +3389,11 @@ apicurio-registry-serdes-avro-serde ${apicurio-registry.version} + + io.apicurio + apicurio-registry-serdes-jsonschema-serde + ${apicurio-registry.version} + io.apicurio apicurio-common-rest-client-vertx diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java index 6afbe631768b1..6cd1937771799 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java @@ -130,6 +130,7 @@ public interface Capability { String APICURIO_REGISTRY = QUARKUS_PREFIX + ".apicurio.registry"; String APICURIO_REGISTRY_AVRO = APICURIO_REGISTRY + ".avro"; + String APICURIO_REGISTRY_JSON_SCHEMA = APICURIO_REGISTRY + ".json"; String CONFLUENT_REGISTRY = QUARKUS_PREFIX + ".confluent.registry"; String CONFLUENT_REGISTRY_AVRO = CONFLUENT_REGISTRY + ".avro"; diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java index 1f52df61cac15..3c7fa9726a664 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java @@ -13,6 +13,7 @@ public enum Feature { AMAZON_LAMBDA, AZURE_FUNCTIONS, APICURIO_REGISTRY_AVRO, + APICURIO_REGISTRY_JSON_SCHEMA, AWT, CACHE, CDI, diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index 79f1e15561eed..66b67abc53c13 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -200,6 +200,19 @@ + + io.quarkus + quarkus-apicurio-registry-json-schema + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc diff --git a/docs/pom.xml b/docs/pom.xml index 274c6effd891e..07bf78b6b8df0 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -216,6 +216,19 @@ + + io.quarkus + quarkus-apicurio-registry-json-schema-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc-deployment diff --git a/docs/src/main/asciidoc/kafka-reactive-getting-started.adoc b/docs/src/main/asciidoc/kafka-reactive-getting-started.adoc index b6813bfba3ae0..0cda3a44255a1 100644 --- a/docs/src/main/asciidoc/kafka-reactive-getting-started.adoc +++ b/docs/src/main/asciidoc/kafka-reactive-getting-started.adoc @@ -314,7 +314,8 @@ For more options on message serialization, see xref:kafka.adoc#kafka-serializati We strongly suggest adopting a contract-first approach using a schema registry. To learn more about how to use Apache Kafka with the schema registry and Avro, follow the -xref:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro] guide. +xref:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro] guide for Avro +or you can follow the xref:kafka-schema-registry-json-schema.adoc[Using Apache Kafka with Schema Registry and JSON Schema] guide.. ==== == The HTML page diff --git a/docs/src/main/asciidoc/kafka-schema-registry-json-schema.adoc b/docs/src/main/asciidoc/kafka-schema-registry-json-schema.adoc new file mode 100644 index 0000000000000..ac3270f611662 --- /dev/null +++ b/docs/src/main/asciidoc/kafka-schema-registry-json-schema.adoc @@ -0,0 +1,851 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc +//// += Using Apache Kafka with Schema Registry and JSON Schema +include::_attributes.adoc[] +:categories: messaging +:summary: Use Apache Kafka, Json Schema serialized records, and connect to a schema registry. +:topics: messaging,kafka,apicurio,registry +:extensions: io.quarkus:quarkus-apicurio-registry-json-schema,io.quarkus:quarkus-smallrye-reactive-messaging-kafka + +This guide shows how your Quarkus application can use Apache Kafka, https://json-schema.org/[JSON Schema] serialized +records, and connect to a schema registry (such as the https://docs.confluent.io/platform/current/schema-registry/index.html[Confluent Schema Registry] or https://www.apicur.io/registry/[Apicurio Registry]). + +If you are not familiar with Kafka and Kafka in Quarkus in particular, consider +first going through the xref:kafka.adoc[Using Apache Kafka with Reactive Messaging] guide. + +== Prerequisites + +:prerequisites-time: 30 minutes +:prerequisites-docker-compose: +include::{includes}/prerequisites.adoc[] + +== Architecture + +In this guide we are going to implement a REST resource, namely `MovieResource`, that +will consume movie DTOs and put them in a Kafka topic. + +Then, we will implement a consumer that will consume and collect messages from the same topic. +The collected messages will be then exposed by another resource, `ConsumedMovieResource`, via +https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events[Server-Sent Events]. + +The _Movies_ will be serialized and deserialized using JSON Schema. +The schema, describing the _Movie_, is stored in Apicurio Registry. +The same concept applies if you are using the Confluent JSON Schema _serde_ and Confluent Schema Registry. + +== Solution + +We recommend that you follow the instructions in the next sections and create the application step by step. +However, you can go right to the completed example. + +Clone the Git repository: `git clone {quickstarts-clone-url}`, or download an {quickstarts-archive-url}[archive]. + +The solution is located in the `kafka-json-schema-quickstart` link:{quickstarts-tree-url}/kafka-json-schema-quickstart[directory]. + +== Creating the Maven Project + +First, we need a new project. Create a new project with the following command: + +:create-app-artifact-id: kafka-json-schema-quickstart +:create-app-extensions: resteasy-reactive-jackson,smallrye-reactive-messaging-kafka,apicurio-registry-json-schema +include::{includes}/devtools/create-app.adoc[] + +[TIP] +==== +If you use Confluent Schema Registry, you don't need the `quarkus-apicurio-registry-json-schema` extension. +Instead, you need the `quarkus-confluent-registry-json-schema` extension and a few more dependencies. +See <> for details. +==== + +== Json Schema + +Json Schema is a data serialization system. Data structures are described using schemas. +The first thing we need to do is to create a schema describing the `Movie` structure. +Create a file called `src/main/resources/json-schema.json` with the schema for our record (Kafka message): +[source,json] +---- +{ + "$id": "https://example.com/person.schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Movie", + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "The movie's title." + }, + "yeay": { + "type": "integer", + "description": "The movie's year." + } + } +} +---- + +Note that auto-generating the Java class from the JSON Schema definition is not possible. Therefore, you must define the Java class as follows so it can be used by the serialization process: + +[source,java] +---- +package org.acme.kafka; + +public class Movie { + + private String title; + private Integer year; + + public Movie() { + } + + public Movie(String title, Integer year) { + this.title = title; + this.year = year; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public Integer getYear() { + return year; + } + + public void setYear(Integer year) { + this.year = year; + } +} +---- + +== The `Movie` producer + +Having defined the schema, we can now jump to implementing the `MovieResource`. + +Let's open the `MovieResource`, inject an https://quarkus.io/blog/reactive-messaging-emitter/[`Emitter`] of `Movie` DTO and implement a `@POST` method +that consumes `Movie` and sends it through the `Emitter`: + +[source,java] +---- +package org.acme.kafka; + +import org.acme.kafka.quarkus.Movie; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.jboss.logging.Logger; + +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Response; + +@Path("/movies") +public class MovieResource { + private static final Logger LOGGER = Logger.getLogger(MovieResource.class); + + @Channel("movies") + Emitter emitter; + + @POST + public Response enqueueMovie(Movie movie) { + LOGGER.infof("Sending movie %s to Kafka", movie.getTitle()); + emitter.send(movie); + return Response.accepted().build(); + } + +} +---- + +Now, we need to _map_ the `movies` channel (the `Emitter` emits to this channel) to a Kafka topic and also _map_ the schema to be used on this channel. +To achieve this, edit the `application.properties` file, and add the following content: + +[source,properties] +---- +# set the connector for the outgoing channel to `smallrye-kafka` +mp.messaging.outgoing.movies.connector=smallrye-kafka + +# disable automatic detection of the serializers +quarkus.reactive-messaging.kafka.serializer-autodetection.enabled=false + +# Set the value serializer for the channel `movies` +mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer + +# set the topic name for the channel to `movies` +mp.messaging.outgoing.movies.topic=movies + +# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath. +mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json + +# automatically register the schema with the registry, if not present +mp.messaging.outgoing.movies.apicurio.registry.auto-register=true +---- + +[TIP] +==== +Note that unlike in the avro serialization, xref:kafka.adoc#serialization-autodetection[autodetect] can't be used with JSON Schema, so we must define the `value.serializer`. +Just like with avro, we still have to define the `apicurio.registry.auto-register` property. + +If you use Confluent Schema Registry, in this case you must define the `value.serializer` as well with the value `io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer`. +It is also detected automatically. +The Confluent Schema Registry analogue of `apicurio.registry.auto-register` is called `auto.register.schemas`. +It defaults to `true`, so it doesn't have to be configured in this example. +It can be explicitly set to `false` if you want to disable automatic schema registration. +==== + +== The `Movie` consumer + +So, we can write records into Kafka containing our `Movie` data. +That data is serialized using JSON Schema. +Now, it's time to implement a consumer for them. + +Let's create `ConsumedMovieResource` that will consume `Movie` messages +from the `movies-from-kafka` channel and will expose it via Server-Sent Events: + +[source,java] +---- +package org.acme.kafka; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +import org.acme.kafka.quarkus.Movie; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.jboss.resteasy.reactive.RestStreamElementType; + +import io.smallrye.mutiny.Multi; + +@ApplicationScoped +@Path("/consumed-movies") +public class ConsumedMovieResource { + + @Channel("movies-from-kafka") + Multi movies; + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @RestStreamElementType(MediaType.TEXT_PLAIN) + public Multi stream() { + return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear())); + } +} +---- + +The last bit of the application's code is the configuration of the `movies-from-kafka` channel in +`application.properties`: + +[source,properties] +---- +# set the connector for the incoming channel to `smallrye-kafka` +mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka + +# set the topic name for the channel to `movies` +mp.messaging.incoming.movies-from-kafka.topic=movies + +# set the deserializer for the incoming channel +mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer + +# disable auto-commit, Reactive Messaging handles it itself +mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false + +mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest +---- + +[TIP] +==== +Again, unlike with Avro, we have to define the `value.deserializer`. + +If you use Confluent Schema Registry, you must configure `value.deserializer` as well with the value ´io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer´. +They are both detected automatically. +==== + +== Running the application + +Start the application in dev mode: + +include::{includes}/devtools/dev.adoc[] + +Kafka broker and Apicurio Registry instance are started automatically thanks to Dev Services. +See xref:kafka-dev-services.adoc[Dev Services for Kafka] and xref:apicurio-registry-dev-services.adoc[Dev Services for Apicurio Registry] for more details. + +[TIP] +==== +You might have noticed that we didn't configure the schema registry URL anywhere. +This is because Dev Services for Apicurio Registry configures all Kafka channels in SmallRye Reactive Messaging to use the automatically started registry instance. + +Apicurio Registry, in addition to its native API, also exposes an endpoint that is API-compatible with Confluent Schema Registry. +Therefore, this automatic configuration works both for Apicurio Registry serde and Confluent Schema Registry serde. + +However, note that there's no Dev Services support for running Confluent Schema Registry itself. +If you want to use a running instance of Confluent Schema Registry, configure its URL, together with the URL of a Kafka broker: + +[source,properties] +---- +kafka.bootstrap.servers=PLAINTEXT://localhost:9092 +mp.messaging.connector.smallrye-kafka.schema.registry.url=http://localhost:8081 +---- +==== + +In the second terminal, query the `ConsumedMovieResource` resource with `curl`: + +[source,bash] +---- +curl -N http://localhost:8080/consumed-movies +---- + +In the third one, post a few movies: + +[source,bash] +---- +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Shawshank Redemption","year":1994}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Godfather","year":1972}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Dark Knight","year":2008}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"12 Angry Men","year":1957}' \ + http://localhost:8080/movies +---- + +Observe what is printed in the second terminal. You should see something along the lines of: + +[source] +---- +data:'The Shawshank Redemption' from 1994 + +data:'The Godfather' from 1972 + +data:'The Dark Knight' from 2008 + +data:'12 Angry Men' from 1957 +---- + +== Running in JVM or Native mode + +When not running in dev or test mode, you will need to start your own Kafka broker and Apicurio Registry. +The easiest way to get them running is to use `docker-compose` to start the appropriate containers. + +TIP: If you use Confluent Schema Registry, you already have a Kafka broker and Confluent Schema Registry instance running and configured. +You can ignore the `docker-compose` instructions here, as well as the Apicurio Registry configuration. + +Create a `docker-compose.yaml` file at the root of the project with the following content: + +[source,yaml] +---- +version: '2' + +services: + + zookeeper: + image: quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 + command: [ + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" + ] + ports: + - "2181:2181" + environment: + LOG_DIR: /tmp/logs + + kafka: + image: quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 + command: [ + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" + ] + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + LOG_DIR: "/tmp/logs" + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + schema-registry: + image: apicurio/apicurio-registry-mem:2.4.2.Final + ports: + - 8081:8080 + depends_on: + - kafka + environment: + QUARKUS_PROFILE: prod +---- + +Before starting the application, let's first start the Kafka broker and Apicurio Registry: + +[source,bash] +---- +docker-compose up +---- + +NOTE: To stop the containers, use `docker-compose down`. You can also clean up +the containers with `docker-compose rm` + +You can build the application with: + +include::{includes}/devtools/build.adoc[] + +And run it in JVM mode with: + +[source, bash] +---- +java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar +---- + +NOTE: By default, the application tries to connect to a Kafka broker listening at `localhost:9092`. +You can configure the bootstrap server using: `java -Dkafka.bootstrap.servers=\... -jar target/quarkus-app/quarkus-run.jar` + +Specifying the registry URL on the command line is not very convenient, so you can add a configuration property only for the `prod` profile: + +[source,properties] +---- +%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 +---- + +You can build a native executable with: + +include::{includes}/devtools/build-native.adoc[] + +and run it with: + +[source,bash] +---- +./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092 +---- + +== Testing the application + +As mentioned above, Dev Services for Kafka and Apicurio Registry automatically start and configure a Kafka broker and Apicurio Registry instance in dev mode and for tests. +Hence, we don't have to set up Kafka and Apicurio Registry ourselves. +We can just focus on writing the test. + +First, let's add test dependencies on REST Client and Awaitility to the build file: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + + io.quarkus + quarkus-rest-client-reactive + test + + + org.awaitility + awaitility + test + +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +testImplementation("io.quarkus:quarkus-rest-client-reactive") +testImplementation("org.awaitility:awaitility") +---- + +In the test, we will send movies in a loop and check if the `ConsumedMovieResource` returns +what we send. + +[source,java] +---- +package org.acme.kafka; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.ContentType; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import jakarta.ws.rs.client.Client; +import jakarta.ws.rs.client.ClientBuilder; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.sse.SseEventSource; +import java.net.URI; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static io.restassured.RestAssured.given; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; + +@QuarkusTest +public class MovieResourceTest { + + @TestHTTPResource("/consumed-movies") + URI consumedMovies; + + @Test + public void testHelloEndpoint() throws InterruptedException { + // create a client for `ConsumedMovieResource` and collect the consumed resources in a list + Client client = ClientBuilder.newClient(); + WebTarget target = client.target(consumedMovies); + + List received = new CopyOnWriteArrayList<>(); + + SseEventSource source = SseEventSource.target(target).build(); + source.register(inboundSseEvent -> received.add(inboundSseEvent.readData())); + + // in a separate thread, feed the `MovieResource` + ExecutorService movieSender = startSendingMovies(); + + source.open(); + + // check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect + await().atMost(5, SECONDS).until(() -> received.size() >= 2); + assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994", + "'12 Angry Men' from 1957")); + source.close(); + + // shutdown the executor that is feeding the `MovieResource` + movieSender.shutdownNow(); + movieSender.awaitTermination(5, SECONDS); + } + + private ExecutorService startSendingMovies() { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.execute(() -> { + while (true) { + given() + .contentType(ContentType.JSON) + .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}") + .when() + .post("/movies") + .then() + .statusCode(202); + + given() + .contentType(ContentType.JSON) + .body("{\"title\":\"12 Angry Men\",\"year\":1957}") + .when() + .post("/movies") + .then() + .statusCode(202); + + try { + Thread.sleep(200L); + } catch (InterruptedException e) { + break; + } + } + }); + return executorService; + } + +} +---- + +NOTE: We modified the `MovieResourceTest` that was generated together with the project. This test class has a +subclass, `NativeMovieResourceIT`, that runs the same test against the native executable. +To run it, execute: + +include::{includes}/devtools/build-native.adoc[] + +=== Manual setup + +If we couldn't use Dev Services and wanted to start a Kafka broker and Apicurio Registry instance manually, we would define a xref:getting-started-testing.adoc#quarkus-test-resource[QuarkusTestResourceLifecycleManager]. + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.strimzi + strimzi-test-container + 0.22.1 + test + + + org.apache.logging.log4j + log4j-core + + + +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +testImplementation("io.strimzi:strimzi-test-container:0.22.1") { + exclude group: "org.apache.logging.log4j", module: "log4j-core" +} +---- + +[source,java] +---- +package org.acme.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.testcontainers.containers.GenericContainer; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import io.strimzi.StrimziKafkaContainer; + +public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager { + + private final StrimziKafkaContainer kafka = new StrimziKafkaContainer(); + + private GenericContainer registry; + + @Override + public Map start() { + kafka.start(); + registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final") + .withExposedPorts(8080) + .withEnv("QUARKUS_PROFILE", "prod"); + registry.start(); + Map properties = new HashMap<>(); + properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", + "http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"); + properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); + return properties; + } + + @Override + public void stop() { + registry.stop(); + kafka.stop(); + } +} +---- + +[source,java] +---- +@QuarkusTest +@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) +public class MovieResourceTest { + ... +} +---- + +[[apicurio-versions-compatibility]] +== Using compatible versions of the Apicurio Registry + +The `quarkus-apicurio-registry-json-schema` extension depends on recent versions of Apicurio Registry client, +and most versions of Apicurio Registry server and client are backwards compatible. +For some you need to make sure that the client used by Serdes is compatible with the server. + +For example, with Apicurio dev service if you set the image name to use version `2.1.5.Final`: + +[source,properties] +---- +quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final +---- + +You need to make sure that `apicurio-registry-serdes-json-schema-serde` dependency +and the REST client `apicurio-common-rest-client-vertx` dependency are set to compatible versions: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.quarkus + quarkus-apicurio-registry-json-schema + + + io.apicurio + apicurio-common-rest-client-vertx + + + io.apicurio + apicurio-registry-serdes-json-schema-serde + + + + + io.apicurio + apicurio-registry-client + 2.1.5.Final + + + io.apicurio + apicurio-registry-common + 2.1.5.Final + + + io.apicurio + apicurio-registry-serdes-json-schema-serde + 2.1.5.Final + + + io.apicurio + apicurio-common-rest-client-jdk + + + io.apicurio + apicurio-registry-client + + + io.apicurio + apicurio-registry-common + + + + + io.apicurio + apicurio-common-rest-client-vertx + 0.1.5.Final + +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle",subs=attributes+] +.build.gradle +---- +dependencies { + implementation(platform("{quarkus-platform-groupid}:quarkus-bom:2.12.3.Final")) + + ... + + implementation("io.quarkus:quarkus-apicurio-registry-json-schema") + implementation("io.apicurio:apicurio-registry-serdes-json-schema-serde") { + exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk" + exclude group: "io.apicurio", module: "apicurio-registry-client" + exclude group: "io.apicurio", module: "apicurio-registry-common" + version { + strictly "2.1.5.Final" + } + } + implementation("io.apicurio:apicurio-registry-client") { + version { + strictly "2.1.5.Final" + } + } + implementation("io.apicurio:apicurio-registry-common") { + version { + strictly "2.1.5.Final" + } + } + implementation("io.apicurio:apicurio-common-rest-client-vertx") { + version { + strictly "0.1.5.Final" + } + } +} +---- + +Known previous compatible versions for `apicurio-registry-client` and `apicurio-common-rest-client-vertx` are the following + +- `apicurio-registry-client` 2.1.5.Final with `apicurio-common-rest-client-vertx` 0.1.5.Final +- `apicurio-registry-client` 2.3.1.Final with `apicurio-common-rest-client-vertx` 0.1.13.Final + +[[confluent]] +== Using the Confluent Schema Registry + +If you want to use the Confluent Schema Registry, you need the `quarkus-confluent-registry-json-schema` extension, instead of the `quarkus-apicurio-registry-json-schema` extension. +Also, you need to add a few dependencies and a custom Maven repository to your `pom.xml` / `build.gradle` file: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + ... + + + io.quarkus + quarkus-confluent-registry-json-schema + + + + io.quarkus + quarkus-rest-client-reactive + + + io.confluent + kafka-json-schema-serializer + 7.2.0 + + + jakarta.ws.rs + jakarta.ws.rs-api + + + + + + + + + confluent + https://packages.confluent.io/maven/ + + false + + + +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +repositories { + ... + + maven { + url "https://packages.confluent.io/maven/" + } +} + +dependencies { + ... + + implementation("io.quarkus:quarkus-confluent-registry-json-schema") + + // Confluent registry libraries use Jakarta REST client + implementation("io.quarkus:quarkus-rest-client-reactive") + + implementation("io.confluent:kafka-json-schema-serializer:7.2.0") { + exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api" + } +} +---- + +In JVM mode, any version of `io.confluent:kafka-json-schema-serializer` can be used. +In native mode, Quarkus supports the following versions: `6.2.x`, `7.0.x`, `7.1.x`, `7.2.x`, `7.3.x`. + +For version `7.4.x` and `7.5.x`, due to an issue with the Confluent Schema Serializer, you need to add another dependency: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + +---- +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +dependencies { + implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv") +} +---- + +For any other versions, the native configuration may need to be adjusted. + + +== Further reading + +* link:https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.4/kafka/kafka.html[SmallRye Reactive Messaging Kafka] documentation diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index b559acfa4d85d..c6487f654f687 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -1794,6 +1794,10 @@ that will deserialize to a `io.vertx.core.json.JsonObject`. The corresponding se This is described in a dedicated guide: xref:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro]. +== JSON Schema Serialization + +This is described in a dedicated guide: xref:kafka-schema-registry-json-schema.adoc[Using Apache Kafka with Schema Registry and JSON Schema]. + [[serialization-autodetection]] == Serializer/deserializer autodetection @@ -1886,7 +1890,9 @@ Refer to <> to write your own serializer/deserializer for == Using Schema Registry -This is described in a dedicated guide: xref:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro]. +This is described in a dedicated guide for Avro: xref:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro]. +And a different one for JSON Schema: xref:kafka-schema-registry-json-schema.adoc[Using Apache Kafka with Schema Registry and JSON Schema]. + [[kafka-health-check]] == Health Checks diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 37ea59286d602..3a574b4424f85 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -356,12 +356,19 @@ private void handleAvro(BuildProducer reflectiveClass, "java.lang.AutoCloseable")); } - // --- Apicurio Registry 2.x --- + // --- Apicurio Registry 2.x Avro --- if (QuarkusClassLoader.isClassPresentAtRuntime("io.apicurio.registry.serde.avro.AvroKafkaDeserializer") && !capabilities.isPresent(Capability.APICURIO_REGISTRY_AVRO)) { throw new RuntimeException( "Apicurio Registry 2.x Avro classes detected, please use the quarkus-apicurio-registry-avro extension"); } + + // --- Apicurio Registry 2.x Json Schema --- + if (QuarkusClassLoader.isClassPresentAtRuntime("io.apicurio.registry.serde.avro.JsonKafkaDeserializer") + && !capabilities.isPresent(Capability.APICURIO_REGISTRY_JSON_SCHEMA)) { + throw new RuntimeException( + "Apicurio Registry 2.x Json classes detected, please use the quarkus-apicurio-registry-json extension"); + } } @BuildStep diff --git a/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml b/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml new file mode 100644 index 0000000000000..0fb9a83544704 --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + + io.quarkus + quarkus-apicurio-registry-json-schema-parent + 999-SNAPSHOT + + + quarkus-apicurio-registry-json-schema-deployment + Quarkus - Apicurio Registry - Json Schema - Deployment + + + + io.quarkus + quarkus-apicurio-registry-json-schema + + + + io.quarkus + quarkus-apicurio-registry-common-deployment + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java b/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java new file mode 100644 index 0000000000000..83c6f0886bba9 --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java @@ -0,0 +1,51 @@ +package io.quarkus.apicurio.registry.jsonschema; + +import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.Feature; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; + +public class ApicurioRegistryJsonSchemaProcessor { + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(Feature.APICURIO_REGISTRY_JSON_SCHEMA); + } + + @BuildStep + public void apicurioRegistryJsonSchema(BuildProducer reflectiveClass, + BuildProducer sslNativeSupport) { + + reflectiveClass + .produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer", + "io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer").methods().build()); + + reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy", + "io.apicurio.registry.serde.strategy.TopicIdStrategy", + "io.apicurio.registry.serde.strategy.QualifiedRecordIdStrategy", + "io.apicurio.registry.serde.strategy.RecordIdStrategy", + "io.apicurio.registry.serde.jsonschema.strategy.TopicRecordIdStrategy").methods().fields() + .build()); + + reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.DefaultIdHandler", + "io.apicurio.registry.serde.Legacy4ByteIdHandler", + "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider", + "io.apicurio.registry.serde.headers.DefaultHeadersHandler").methods().fields() + .build()); + + String defaultSchemaResolver = "io.apicurio.registry.serde.DefaultSchemaResolver"; + if (QuarkusClassLoader.isClassPresentAtRuntime(defaultSchemaResolver)) { + // Class not present after 2.2.0.Final + reflectiveClass.produce(ReflectiveClassBuildItem.builder(defaultSchemaResolver).methods() + .fields().build()); + } + } + + @BuildStep + ExtensionSslNativeSupportBuildItem enableSslInNative() { + return new ExtensionSslNativeSupportBuildItem(Feature.APICURIO_REGISTRY_JSON_SCHEMA); + } + +} diff --git a/extensions/schema-registry/apicurio/json-schema/pom.xml b/extensions/schema-registry/apicurio/json-schema/pom.xml new file mode 100644 index 0000000000000..946f8f2d857f2 --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/pom.xml @@ -0,0 +1,21 @@ + + + + quarkus-apicurio-registry-parent + io.quarkus + 999-SNAPSHOT + ../pom.xml + + + 4.0.0 + quarkus-apicurio-registry-json-schema-parent + Quarkus - Apicurio Registry - Json Schema + pom + + + deployment + runtime + + \ No newline at end of file diff --git a/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml b/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml new file mode 100644 index 0000000000000..cdb6a28929e45 --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml @@ -0,0 +1,61 @@ + + + 4.0.0 + + + io.quarkus + quarkus-apicurio-registry-json-schema-parent + 999-SNAPSHOT + + + quarkus-apicurio-registry-json-schema + Quarkus - Apicurio Registry - Json Schema - Runtime + Use Apicurio as Json schema registry + + + + io.apicurio + apicurio-registry-serdes-jsonschema-serde + + + io.apicurio + apicurio-common-rest-client-jdk + + + + + + io.quarkus + quarkus-apicurio-registry-common + + + + + + + + io.quarkus + quarkus-extension-maven-plugin + + + io.quarkus.apicurio.registry.json + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 0000000000000..439f9158e3cda --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,12 @@ +--- +artifact: ${project.groupId}:${project.artifactId}:${project.version} +name: "Apicurio Registry - Json Schema" +metadata: + keywords: + - "apicurio" + - "json-schema" + - "kafka" + guide: "" + categories: + - "serialization" + status: "draft" diff --git a/extensions/schema-registry/apicurio/pom.xml b/extensions/schema-registry/apicurio/pom.xml index ce3c9d4de020f..48249736defb5 100644 --- a/extensions/schema-registry/apicurio/pom.xml +++ b/extensions/schema-registry/apicurio/pom.xml @@ -17,5 +17,6 @@ common avro + json-schema diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java index dbe9ede291e33..3582c5bccfa37 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java @@ -36,7 +36,7 @@ class DefaultSerdeDiscoveryState { private Boolean hasConfluent; private Boolean hasApicurio1; - private Boolean hasApicurio2; + private Boolean hasApicurio2Avro; private Boolean hasJsonb; DefaultSerdeDiscoveryState(IndexView index) { @@ -155,18 +155,18 @@ boolean hasApicurio1() { return hasApicurio1; } - boolean hasApicurio2() { - if (hasApicurio2 == null) { + boolean hasApicurio2Avro() { + if (hasApicurio2Avro == null) { try { Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false, Thread.currentThread().getContextClassLoader()); - hasApicurio2 = true; + hasApicurio2Avro = true; } catch (ClassNotFoundException e) { - hasApicurio2 = false; + hasApicurio2Avro = false; } } - return hasApicurio2; + return hasApicurio2Avro; } boolean hasJsonb() { diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index a14824a45801f..503b81f253dfb 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -880,7 +880,7 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T int avroLibraries = 0; avroLibraries += discovery.hasConfluent() ? 1 : 0; avroLibraries += discovery.hasApicurio1() ? 1 : 0; - avroLibraries += discovery.hasApicurio2() ? 1 : 0; + avroLibraries += discovery.hasApicurio2Avro() ? 1 : 0; if (avroLibraries > 1) { LOGGER.debugf("Skipping Avro serde autodetection for %s, because multiple Avro serde libraries are present", typeName); @@ -897,7 +897,7 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T ? Result.of("io.apicurio.registry.utils.serde.AvroKafkaSerializer") : Result.of("io.apicurio.registry.utils.serde.AvroKafkaDeserializer") .with(isAvroGenerated, "apicurio.registry.use-specific-avro-reader", "true"); - } else if (discovery.hasApicurio2()) { + } else if (discovery.hasApicurio2Avro()) { return serializer ? Result.of("io.apicurio.registry.serde.avro.AvroKafkaSerializer") : Result.of("io.apicurio.registry.serde.avro.AvroKafkaDeserializer") @@ -908,6 +908,8 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T } } + //TODO autodiscovery of json serdes + // Jackson-based serializer/deserializer // note that Jackson is always present with Kafka, so no need to check { diff --git a/integration-tests/kafka-json-schema-apicurio2/pom.xml b/integration-tests/kafka-json-schema-apicurio2/pom.xml new file mode 100644 index 0000000000000..88a9216d54975 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/pom.xml @@ -0,0 +1,222 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-kafka-json-schema-apicurio2 + Quarkus - Integration Tests - Kafka Json Schema with Apicurio 2.x + The Apache Kafka Json Schema with Apicurio Registry 2.x integration tests module + + + + io.quarkus + quarkus-integration-test-class-transformer + + + io.quarkus + quarkus-integration-test-shared-library + + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jackson + + + + + io.quarkus + quarkus-kafka-client + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + + + + io.quarkus + quarkus-apicurio-registry-json-schema + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + jakarta.xml.bind + jakarta.xml.bind-api + + + + + io.strimzi + strimzi-test-container + test + + + org.apache.logging.log4j + log4j-core + + + + + org.testcontainers + testcontainers + test + + + + + io.quarkus + quarkus-integration-test-class-transformer-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-kafka-client-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-apicurio-registry-json-schema-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + confluent + https://packages.confluent.io/maven/ + + false + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + maven-failsafe-plugin + + true + + + + + maven-surefire-plugin + + true + + + + + + + + test-kafka + + + test-containers + + + + + + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + false + + + + + + + + diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java new file mode 100644 index 0000000000000..f65ff696a15a2 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java @@ -0,0 +1,58 @@ +package io.quarkus.it.kafka.jsonschema; + +import java.time.Duration; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.vertx.core.json.JsonObject; + +/** + * Endpoint to test the Json Schema support + */ +@Path("/json-schema") +public class JsonSchemaEndpoint { + + @Inject + JsonSchemaKafkaCreator creator; + + @GET + @Path("/apicurio") + public JsonObject getApicurio() { + return get(creator.createApicurioConsumer("test-json-schema-apicurio-consumer", "test-json-schema-apicurio-consumer")); + } + + @POST + @Path("/apicurio") + public void sendApicurio(Pet pet) { + KafkaProducer p = creator.createApicurioProducer("test-json-schema-apicurio"); + send(p, pet, "test-json-schema-apicurio-producer"); + } + + private JsonObject get(KafkaConsumer consumer) { + final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000)); + if (records.isEmpty()) { + return null; + } + ConsumerRecord consumerRecord = records.iterator().next(); + Pet p = consumerRecord.value(); + // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema. + JsonObject result = new JsonObject(); + result.put("name", p.getName()); + result.put("color", p.getColor()); + return result; + } + + private void send(KafkaProducer producer, Pet pet, String topic) { + producer.send(new ProducerRecord<>(topic, 0, pet)); + producer.flush(); + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java new file mode 100644 index 0000000000000..989d2f0e10667 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java @@ -0,0 +1,117 @@ +package io.quarkus.it.kafka.jsonschema; + +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer; +import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer; + +/** + * Create Json Schema Kafka Consumers and Producers + */ +@ApplicationScoped +public class JsonSchemaKafkaCreator { + + @ConfigProperty(name = "kafka.bootstrap.servers") + String bootstrap; + + @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url") + String apicurioRegistryUrl; + + public JsonSchemaKafkaCreator() { + } + + public JsonSchemaKafkaCreator(String bootstrap, String apicurioRegistryUrl) { + this.bootstrap = bootstrap; + this.apicurioRegistryUrl = apicurioRegistryUrl; + } + + public String getApicurioRegistryUrl() { + return apicurioRegistryUrl; + } + + public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) { + return createApicurioConsumer(bootstrap, getApicurioRegistryUrl(), groupdIdConfig, subscribtionName); + } + + public KafkaProducer createApicurioProducer(String clientId) { + return createApicurioProducer(bootstrap, getApicurioRegistryUrl(), clientId); + } + + public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio, + String groupdIdConfig, String subscribtionName) { + Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig); + return createConsumer(p, subscribtionName); + } + + public static KafkaProducer createApicurioProducer(String bootstrap, String apicurio, + String clientId) { + Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId); + return createProducer(p); + } + + private static KafkaConsumer createConsumer(Properties props, String subscribtionName) { + if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) { + props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(subscribtionName)); + return consumer; + } + + private static KafkaProducer createProducer(Properties props) { + if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) { + props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + return new KafkaProducer<>(props); + } + + public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) { + Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class.getName()); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + return props; + } + + private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + return props; + } + + private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) { + Properties props = getGenericProducerProperties(bootstrap, clientId); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName()); + props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true); + props.put(SerdeConfig.SCHEMA_LOCATION, "json-schema.json"); + props.put(SerdeConfig.VALIDATION_ENABLED, "true"); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + return props; + } + + private static Properties getGenericProducerProperties(String bootstrap, String clientId) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + return props; + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java new file mode 100644 index 0000000000000..ee47fb2fe9482 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java @@ -0,0 +1,31 @@ +package io.quarkus.it.kafka.jsonschema; + +public class Pet { + + private String name; + private String color; + + public Pet() { + } + + public Pet(String name, String color) { + this.name = name; + this.color = color; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getColor() { + return color; + } + + public void setColor(String color) { + this.color = color; + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties new file mode 100644 index 0000000000000..69d4364f6b1c5 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties @@ -0,0 +1,10 @@ +quarkus.log.category.kafka.level=WARN +quarkus.log.category.\"org.apache.kafka\".level=WARN +quarkus.log.category.\"org.apache.zookeeper\".level=WARN + +quarkus.native.resources.includes=json-schema.json + +# enable health check +quarkus.kafka.health.enabled=true + +quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.4.2.Final diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json new file mode 100644 index 0000000000000..18a1c5f482bd6 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json @@ -0,0 +1,16 @@ +{ + "$id": "https://example.com/person.schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Pet", + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "The pet's name." + }, + "color": { + "type": "string", + "description": "The pet's color." + } + } +} \ No newline at end of file diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java new file mode 100644 index 0000000000000..31ddb23296938 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java @@ -0,0 +1,29 @@ +package io.quarkus.it.kafka; + +import org.junit.jupiter.api.BeforeAll; + +import io.apicurio.registry.rest.client.RegistryClientFactory; +import io.apicurio.rest.client.VertxHttpClientProvider; +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.vertx.core.Vertx; + +@QuarkusIntegrationTest +@QuarkusTestResource(value = KafkaResource.class, restrictToAnnotatedClass = true) +public class KafkaJsonSchemaIT extends KafkaJsonSchemaTestBase { + + JsonSchemaKafkaCreator creator; + + @Override + JsonSchemaKafkaCreator creator() { + return creator; + } + + @BeforeAll + public static void setUp() { + // this is for the test JVM, which also uses Kafka client, which in turn also interacts with the registry + RegistryClientFactory.setProvider(new VertxHttpClientProvider(Vertx.vertx())); + } + +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java new file mode 100644 index 0000000000000..606ded95dadfb --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java @@ -0,0 +1,18 @@ +package io.quarkus.it.kafka; + +import jakarta.inject.Inject; + +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class KafkaJsonSchemaTest extends KafkaJsonSchemaTestBase { + + @Inject + JsonSchemaKafkaCreator creator; + + @Override + JsonSchemaKafkaCreator creator() { + return creator; + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java new file mode 100644 index 0000000000000..796540becc0a7 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java @@ -0,0 +1,71 @@ +package io.quarkus.it.kafka; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.it.kafka.jsonschema.Pet; +import io.restassured.RestAssured; + +public abstract class KafkaJsonSchemaTestBase { + + static final String APICURIO_PATH = "/json-schema/apicurio"; + + abstract JsonSchemaKafkaCreator creator(); + + @Test + public void testUrls() { + Assertions.assertTrue(creator().getApicurioRegistryUrl().endsWith("/apis/registry/v2")); + } + + @Test + public void testApicurioJsonSchemaProducer() { + String subscriptionName = "test-json-schema-apicurio-producer"; + + KafkaConsumer consumer = creator().createApicurioConsumer( + "test-json-schema-apicurio", + subscriptionName); + testJsonSchemaProducer(consumer, APICURIO_PATH); + } + + @Test + public void testApicurioJsonSchemaConsumer() { + String topic = "test-json-schema-apicurio-consumer"; + KafkaProducer producer = creator().createApicurioProducer("test-json-schema-apicurio-test"); + testJsonSchemaConsumer(producer, APICURIO_PATH, topic); + } + + protected void testJsonSchemaProducer(KafkaConsumer consumer, String path) { + RestAssured.given() + .header("content-type", "application/json") + .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") + .post(path); + ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); + Assertions.assertEquals(records.key(), (Integer) 0); + Pet pet = records.value(); + Assertions.assertEquals("neo", pet.getName()); + Assertions.assertEquals("tricolor", pet.getColor()); + consumer.close(); + } + + protected void testJsonSchemaConsumer(KafkaProducer producer, String path, String topic) { + producer.send(new ProducerRecord<>(topic, 1, createPet())); + Pet retrieved = RestAssured.when().get(path).as(Pet.class); + Assertions.assertEquals("neo", retrieved.getName()); + Assertions.assertEquals("white", retrieved.getColor()); + producer.close(); + } + + private Pet createPet() { + Pet pet = new Pet(); + pet.setName("neo"); + pet.setColor("white"); + return pet; + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java new file mode 100644 index 0000000000000..dabe27a7715ed --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java @@ -0,0 +1,39 @@ +package io.quarkus.it.kafka; + +import java.util.Collections; +import java.util.Map; + +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.test.common.DevServicesContext; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class KafkaResource implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware { + + JsonSchemaKafkaCreator creator; + + @Override + public void setIntegrationTestContext(DevServicesContext context) { + Map devServicesProperties = context.devServicesProperties(); + String bootstrapServers = devServicesProperties.get("kafka.bootstrap.servers"); + if (bootstrapServers != null) { + String apicurioUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.apicurio.registry.url"); + creator = new JsonSchemaKafkaCreator(bootstrapServers, apicurioUrl); + } + } + + @Override + public Map start() { + return Collections.emptyMap(); + } + + @Override + public void stop() { + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields( + creator, + new TestInjector.MatchesType(JsonSchemaKafkaCreator.class)); + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 3e9da6012ddb9..7b9728847c268 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -8,6 +8,7 @@ 999-SNAPSHOT ../build-parent/pom.xml + 4.0.0 quarkus-integration-tests-parent @@ -220,6 +221,7 @@ kafka-oauth-keycloak kafka-snappy kafka-avro-apicurio2 + kafka-json-schema-apicurio2 kafka-streams kafka-devservices jpa