Skip to content

Commit

Permalink
Use json schema autoregister in apicurio integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Dec 15, 2023
1 parent 0eec917 commit 99d4de1
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 28 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
<log4j2-api.version>2.22.0</log4j2-api.version>
<log4j-jboss-logmanager.version>1.3.0.Final</log4j-jboss-logmanager.version>
<avro.version>1.11.3</avro.version>
<apicurio-registry.version>2.5.3.Final</apicurio-registry.version>
<apicurio-registry.version>2.5.6.Final</apicurio-registry.version>
<apicurio-common-rest-client.version>0.1.18.Final</apicurio-common-rest-client.version> <!-- must be the version Apicurio Registry uses -->
<testcontainers.version>1.19.3</testcontainers.version> <!-- Make sure to also update docker-java.version to match its needs -->
<docker-java.version>3.3.4</docker-java.version> <!-- must be the version Testcontainers use -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,8 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T
}
}

//TODO autodiscovery of json serdes

// Jackson-based serializer/deserializer
// note that Jackson is always present with Kafka, so no need to check
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.quarkus.it.kafka.jsonschema;

import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -12,9 +14,9 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;

/**
* Create Json Schema Kafka Consumers and Producers
Expand Down Expand Up @@ -49,13 +51,13 @@ public KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) {
}

public static KafkaConsumer<Integer, Pet> createApicurioConsumer(String bootstrap, String apicurio,
String groupdIdConfig, String subscribtionName) {
String groupdIdConfig, String subscribtionName) {
Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig);
return createConsumer(p, subscribtionName);
}

public static KafkaProducer<Integer, Pet> createApicurioProducer(String bootstrap, String apicurio,
String clientId) {
String clientId) {
Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId);
return createProducer(p);
}
Expand Down Expand Up @@ -98,6 +100,9 @@ private static Properties getApicurioProducerProperties(String bootstrap, String
Properties props = getGenericProducerProperties(bootstrap, clientId);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName());
props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
props.put(SerdeConfig.SCHEMA_LOCATION, "/io/quarkus/it/kafka/json-schema.json");
props.put(SerdeConfig.VALIDATION_ENABLED, "true");
props.put(SerdeConfig.REGISTRY_URL, apicurio);
return props;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.quarkus.it.kafka;

import org.junit.jupiter.api.BeforeAll;

import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.rest.client.VertxHttpClientProvider;
import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.vertx.core.Vertx;
import org.junit.jupiter.api.BeforeAll;

@QuarkusIntegrationTest
@QuarkusTestResource(value = KafkaResource.class, restrictToAnnotatedClass = true)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.quarkus.it.kafka;

import jakarta.inject.Inject;

import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;

@QuarkusTest
public class KafkaJsonSchemaTest extends KafkaJsonSchemaTestBase {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package io.quarkus.it.kafka;

import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.registry.types.ArtifactType;
import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
import io.quarkus.it.kafka.jsonschema.Pet;
import io.restassured.RestAssured;
import java.time.Duration;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.InputStream;
import java.time.Duration;
import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
import io.quarkus.it.kafka.jsonschema.Pet;
import io.restassured.RestAssured;

public abstract class KafkaJsonSchemaTestBase {

Expand All @@ -30,10 +28,6 @@ public void testUrls() {
public void testApicurioJsonSchemaProducer() {
String subscriptionName = "test-json-schema-apicurio-producer";

//Since autoregister is not supported for json schema, we must register the schema beforehand.
InputStream jsonSchema = getClass().getResourceAsStream("/io/quarkus/it/kafka/json-schema.json");
RegistryClientFactory.create(creator().getApicurioRegistryUrl()).createArtifact(null, subscriptionName + "-value", ArtifactType.JSON, jsonSchema);

KafkaConsumer<Integer, Pet> consumer = creator().createApicurioConsumer(
"test-json-schema-apicurio",
subscriptionName);
Expand All @@ -45,9 +39,6 @@ public void testApicurioJsonSchemaConsumer() {
String topic = "test-json-schema-apicurio-consumer";

//Since autoregister is not supported for json schema, we must register the schema beforehand.
InputStream jsonSchema = getClass().getResourceAsStream("/io/quarkus/it/kafka/json-schema.json");
RegistryClientFactory.create(creator().getApicurioRegistryUrl()).createArtifact(null, topic + "-value", ArtifactType.JSON, jsonSchema);

KafkaProducer<Integer, Pet> producer = creator().createApicurioProducer("test-json-schema-apicurio-test");
testJsonSchemaConsumer(producer, APICURIO_PATH, topic);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.quarkus.it.kafka;

import java.util.Collections;
import java.util.Map;

import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
import io.quarkus.test.common.DevServicesContext;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;

import java.util.Collections;
import java.util.Map;

public class KafkaResource implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware {

JsonSchemaKafkaCreator creator;
Expand Down

0 comments on commit 99d4de1

Please sign in to comment.