diff --git a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc index dc6cd8338bace..4dbddc0afce18 100644 --- a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc +++ b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc @@ -27,7 +27,7 @@ To complete this guide, you need: == Architecture -In this guide we are going to implement a REST resource, namely `MovieResource` that +In this guide we are going to implement a REST resource, namely `MovieResource`, that will consume movie DTOs and put them in a Kafka topic. Then, we will implement a consumer that will consume and collect messages from the same topic. @@ -35,7 +35,7 @@ The collected messages will be then exposed by another resource, `ConsumedMovieR https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events[Server-Sent Events]. The _Movies_ will be serialized and deserialized using Avro. -The schema, describing the _Movie_, is stored in an Apicurio schema registry. +The schema, describing the _Movie_, is stored in Apicurio Registry. The same concept applies if you are using the Confluent Avro _serde_ and Confluent Schema Registry. == Solution @@ -57,13 +57,10 @@ mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ -DprojectArtifactId=kafka-avro-schema-quickstart \ -DclassName="org.acme.kafka.MovieResource" \ -Dpath="/movies" \ - -Dextensions="resteasy-reactive,resteasy-reactive-jackson,rest-client,smallrye-reactive-messaging-kafka,avro" + -Dextensions="resteasy-reactive,resteasy-reactive-jackson,smallrye-reactive-messaging-kafka,avro" cd kafka-avro-schema-quickstart ---- -NOTE: Even though our application will not use a REST client directly, it is used under the hood by -the Apicurio serializer (to interact with the registry) and, at the moment it is required as a dependency. - Additionally, we need a serializer and deserializer for Avro. In this guide, we will use the ones provided by Apicurio. @@ -71,49 +68,29 @@ In this guide, we will use the ones provided by Apicurio. ---- io.apicurio - apicurio-registry-utils-serde - 1.2.2.Final - - - org.jboss.spec.javax.interceptor - jboss-interceptors-api_1.2_spec - - + apicurio-registry-serdes-avro-serde + 2.0.0.Final ---- [TIP] ==== -If you use Confluent Schema Registry, you need the following dependencies and the confluent repository added +If you use Confluent Schema Registry, you need the following dependencies and the Confluent Maven repository added to your `pom.xml`: [source,xml] ---- ... + - com.google.guava - guava - 30.1.1-jre - - - org.checkerframework - checker-qual - - + io.quarkus + quarkus-rest-client io.confluent kafka-avro-serializer - 5.5.0 + 6.1.1 - - io.swagger - swagger-core - - - jakarta.xml.bind - jakarta.xml.bind-api - jakarta.ws.rs jakarta.ws.rs-api @@ -121,6 +98,7 @@ to your `pom.xml`: + confluent @@ -137,7 +115,7 @@ to your `pom.xml`: Apache Avro is a data serialization system. Data structures are described using schemas. The first thing we need to do is to create a schema describing the `Movie` structure. Create a file called `src/main/avro/movie.avsc` with the schema for our record (Kafka message): -[source,javascript] +[source,json] ---- { "namespace": "org.acme.kafka.quarkus", @@ -162,7 +140,7 @@ placed in the `target/generated-sources/avsc` directory. Take a look at the https://avro.apache.org/docs/current/spec.html#schemas[Avro specification] to learn more about the Avro syntax and supported types. -TIP: With Quarkus, no need to a specific plugin to process the Avro schema, this is all done for you! +TIP: With Quarkus, there's no need to use a specific Maven plugin to process the Avro schema, this is all done for you! If you run the project with `mvn compile quarkus:dev`, the changes you do to the schema file will be automatically applied to the generated Java files. @@ -171,7 +149,7 @@ automatically applied to the generated Java files. == The `Movie` producer Having defined the schema, we can now jump to implementing the `MovieResource`. -Let's open the `MovieResource`, inject an Let's open the `MovieResource`, inject an https://quarkus.io/blog/reactive-messaging-emitter/[`Emitter`] of `Movie` DTO and implement a `@POST` method of `Movie` DTO and implement a `@POST` method +Let's open the `MovieResource`, inject an https://quarkus.io/blog/reactive-messaging-emitter/[`Emitter`] of `Movie` DTO and implement a `@POST` method that consumes `Movie` and sends it through the `Emitter`: [source,java] @@ -185,21 +163,18 @@ import org.jboss.logging.Logger; import javax.ws.rs.POST; import javax.ws.rs.Path; -import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @Path("/movies") public class MovieResource { - private static final Logger LOGGER = Logger.getLogger(MovieResource.class); - @Channel("movies") Emitter emitter; + @Channel("movies") + Emitter emitter; @POST public Response enqueueMovie(Movie movie) { - LOGGER.infof("Sending movie %s to Kafka", - movie.getTitle() - ); + LOGGER.infof("Sending movie %s to Kafka", movie.getTitle()); emitter.send(movie); return Response.accepted().build(); } @@ -211,8 +186,8 @@ Now, we need to _map_ the `movies` channel (the `Emitter` emits to this channel) To achieve this, edit the `application.properties` file, and add the following content: [source,properties] ---- -# set the URL of the Apicurio Schema Registry, a global setting shared between producers and consumers -mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api +# set the URL of the Apicurio Schema Registry, a global setting shared between all Kafka producers and consumers +mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 # set the connector to use for the `movies` channel to smallrye-kafka mp.messaging.outgoing.movies.connector=smallrye-kafka @@ -220,13 +195,11 @@ mp.messaging.outgoing.movies.connector=smallrye-kafka # the name of the corresponding Kafka topic to `movies` mp.messaging.outgoing.movies.topic=movies -# set the serializer for the `movies` channel to the Avro Serializer for Apicurio -mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer +# set the serializer for the `movies` channel to the Apicurio Avro Serializer +mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.avro.AvroKafkaSerializer -# Apicurio schema specific settings: -mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy -mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy -mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +# automatically register the schema with the registry, if not present +mp.messaging.outgoing.movies.apicurio.registry.auto-register=true ---- == The `Movie` consumer @@ -249,7 +222,7 @@ import javax.ws.rs.core.MediaType; import org.acme.kafka.quarkus.Movie; import org.eclipse.microprofile.reactive.messaging.Channel; -import org.jboss.resteasy.annotations.SseElementType; +import org.jboss.resteasy.reactive.RestSseElementType; import io.smallrye.mutiny.Multi; @@ -262,7 +235,7 @@ public class ConsumedMovieResource { @GET @Produces(MediaType.SERVER_SENT_EVENTS) - @SseElementType(MediaType.TEXT_PLAIN) + @RestSseElementType(MediaType.TEXT_PLAIN) public Multi stream() { return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear())); } @@ -279,14 +252,14 @@ mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka # set the topic name for the channel to `movies` mp.messaging.incoming.movies-from-kafka.topic=movies -# set the deserializer for the `movies-from-kafka` channel to the Avro Deserializer for Apicurio -mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer +# set the deserializer for the `movies-from-kafka` channel to the Apicurio Avro Deserializer +mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer # disable auto-commit, Reactive Messaging handles it itself mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest -mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +mp.messaging.incoming.movies-from-kafka.apicurio.registry.use-specific-avro-reader=true ---- @@ -303,10 +276,10 @@ version: '2' services: zookeeper: - image: strimzi/kafka:0.20.1-kafka-2.5.0 + image: quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 command: [ - "sh", "-c", - "bin/zookeeper-server-start.sh config/zookeeper.properties" + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" ] ports: - "2181:2181" @@ -314,10 +287,10 @@ services: LOG_DIR: /tmp/logs kafka: - image: strimzi/kafka:0.20.1-kafka-2.5.0 + image: quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 command: [ - "sh", "-c", - "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" ] depends_on: - zookeeper @@ -328,17 +301,15 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + schema-registry: - image: apicurio/apicurio-registry-mem:1.3.2.Final + image: apicurio/apicurio-registry-mem:2.0.0.Final ports: - 8081:8080 depends_on: - kafka environment: QUARKUS_PROFILE: prod - KAFKA_BOOTSTRAP_SERVERS: localhost:9092 - APPLICATION_ID: registry_id - APPLICATION_SERVER: localhost:9000 ---- == Running the application @@ -401,7 +372,7 @@ data:'12 Angry Men' from 1957 == Building a native executable Building a native executable -You can build a native executable with the usual command ./mvnw package -Dnative. +You can build a native executable with the usual command `./mvnw package -Dnative`. Running it is as simple as executing `./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner`. @@ -410,12 +381,18 @@ Running it is as simple as executing `./target/kafka-avro-schema-quickstart-1.0. === Infrastructure for tests We will now use Testcontainers to set up Kafka and Apicurio Schema Registry for tests. -First, let's add test dependencies on Awaitility and Strimzi to `pom.xml`, Testcontainers will be pulled +First, let's add test dependencies on REST Client, Awaitility and Strimzi to `pom.xml`. Testcontainers will be pulled in transitively by `strimzi-test-container`: [source,xml] ---- ... + + + io.quarkus + quarkus-rest-client + test + io.strimzi strimzi-test-container @@ -459,16 +436,13 @@ public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLi @Override public Map start() { kafka.start(); - registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final") + registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.0.0.Final") .withExposedPorts(8080) - .withEnv("QUARKUS_PROFILE", "prod") - .withEnv("KAFKA_BOOTSTRAP_SERVERS", kafka.getBootstrapServers()) - .withEnv("APPLICATION_ID", "registry_id") - .withEnv("APPLICATION_SERVER", "localhost:9000"); + .withEnv("QUARKUS_PROFILE", "prod"); registry.start(); Map properties = new HashMap<>(); properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", - "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api"); + "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"); properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); return properties; } @@ -500,7 +474,6 @@ import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; import javax.ws.rs.sse.SseEventSource; - import java.net.URI; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -509,11 +482,9 @@ import java.util.concurrent.Executors; import static io.restassured.RestAssured.given; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.equalToIgnoringCase; @QuarkusTest // register the class that sets up Testcontainers: @@ -524,7 +495,7 @@ public class MovieResourceTest { URI consumedMovies; @Test - public void testHelloEndpoint() { + public void testHelloEndpoint() throws InterruptedException { // create a client for `ConsumedMovieResource` and collect the consumed resources in a list Client client = ClientBuilder.newClient(); WebTarget target = client.target(consumedMovies); @@ -540,41 +511,43 @@ public class MovieResourceTest { source.open(); // check if, after at most 5 seconds, we have at last 2 items collected, and they are what we expect: - await().atMost(5000, MILLISECONDS).until(() -> received.size() >= 2); + await().atMost(5, SECONDS).until(() -> received.size() >= 2); assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994", "'12 Angry Men' from 1957")); source.close(); // shutdown the executor that is feeding the `MovieResource` - movieSender.shutdown(); + movieSender.shutdownNow(); + movieSender.awaitTermination(5, SECONDS); } private ExecutorService startSendingMovies() { ExecutorService executorService = Executors.newSingleThreadExecutor(); - executorService - .execute( - () -> { - while (true) { - given() - .contentType(ContentType.JSON) - .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}") - .when().post("/movies") - .then() - .statusCode(202); - given() - .contentType(ContentType.JSON) - .body("{\"title\":\"12 Angry Men\",\"year\":1957}") - .when().post("/movies") - .then() - .statusCode(202); - try { - Thread.sleep(200L); - } catch (InterruptedException e) { - break; - } - } - } - ); + executorService.execute(() -> { + while (true) { + given() + .contentType(ContentType.JSON) + .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}") + .when() + .post("/movies") + .then() + .statusCode(202); + + given() + .contentType(ContentType.JSON) + .body("{\"title\":\"12 Angry Men\",\"year\":1957}") + .when() + .post("/movies") + .then() + .statusCode(202); + + try { + Thread.sleep(200L); + } catch (InterruptedException e) { + break; + } + } + }); return executorService; } @@ -583,7 +556,7 @@ public class MovieResourceTest { NOTE: We modified the `MovieResourceTest` that was generated together with the project. This test class has a subclass, `NativeMovieResourceIT`, that runs the same test against the native executable. -To run it, execute `mvn package verify -Dnative`, or `mvn clean install -Dnative` +To run it, execute `mvn verify -Dnative`, or `mvn clean install -Dnative` == Avro code generation details @@ -614,4 +587,4 @@ regular getter will be generated. Defaults to `false` * link:https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.9/kafka/kafka.html[SmallRye Reactive Messaging Kafka] documentation * link:https://quarkus.io/blog/kafka-avro/[How to Use Kafka, Schema Registry and Avro with Quarkus] - a blog post on which -the guide is based. It gives a good introduction to Avro and the concept of Schema Registry \ No newline at end of file +the guide is based. It gives a good introduction to Avro and the concept of Schema Registry