Skip to content

Commit

Permalink
Merge pull request #16473 from Ladicek/apicurio-registry-2.x
Browse files Browse the repository at this point in the history
Add support for Apicurio Registry 2.x Avro library
  • Loading branch information
cescoffier authored Apr 25, 2021
2 parents c5d116a + 6691294 commit 398a7f9
Show file tree
Hide file tree
Showing 18 changed files with 828 additions and 129 deletions.
2 changes: 1 addition & 1 deletion .github/native-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
{
"category": "Messaging1",
"timeout": 100,
"test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-snappy kafka-streams reactive-messaging-kafka"
"test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-avro-apicurio2 kafka-snappy kafka-streams reactive-messaging-kafka"
},
{
"category": "Messaging2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public enum Feature {
JDBC_ORACLE,
JGIT,
JSCH,
KAFKA_CLIENT,
KAFKA_STREAMS,
KEYCLOAK_AUTHORIZATION,
KOTLIN,
Expand Down
185 changes: 79 additions & 106 deletions docs/src/main/asciidoc/kafka-schema-registry-avro.adoc

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/src/main/asciidoc/native-and-ssl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ As SSL is de facto the standard nowadays, we decided to enable its support autom
* the Infinispan Client extension (`quarkus-infinispan-client`).
* the Jaeger extension (`quarkus-jaeger`),
* the JGit extension (`quarkus-jgit`),
* the Kafka Client extension (`quarkus-kafka-client`), if Apicurio Registry 2.x Avro library is used
* the Keycloak extension (`quarkus-keycloak`),
* the Kubernetes client extension (`quarkus-kubernetes-client`),
* the Mailer extension (`quarkus-mailer`),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.BooleanSupplier;
import java.util.function.Function;

import org.apache.avro.Schema;
Expand All @@ -12,14 +13,15 @@
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.WeakIdentityHashMap;
import org.graalvm.home.Version;

import com.oracle.svm.core.annotate.Alias;
import com.oracle.svm.core.annotate.Inject;
import com.oracle.svm.core.annotate.RecomputeFieldValue;
import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;

@TargetClass(className = "org.apache.avro.reflect.ReflectionUtil")
@TargetClass(className = "org.apache.avro.reflect.ReflectionUtil", onlyWith = GraalVM20OrEarlier.class)
final class Target_org_apache_avro_reflect_ReflectionUtil {

/**
Expand Down Expand Up @@ -47,7 +49,7 @@ public R apply(V v) {

}

@TargetClass(className = "org.apache.avro.reflect.ReflectData")
@TargetClass(className = "org.apache.avro.reflect.ReflectData", onlyWith = GraalVM20OrEarlier.class)
final class Target_org_apache_avro_reflect_ReflectData {

@Inject
Expand All @@ -74,7 +76,7 @@ private Target_org_apache_avro_reflect_ReflectData_ClassAccessorData getClassAcc
}
}

@TargetClass(className = "org.apache.avro.reflect.ReflectData", innerClass = "ClassAccessorData")
@TargetClass(className = "org.apache.avro.reflect.ReflectData", innerClass = "ClassAccessorData", onlyWith = GraalVM20OrEarlier.class)
final class Target_org_apache_avro_reflect_ReflectData_ClassAccessorData<T> {
// Just provide access to "ReflectData.ClassAccessorData"

Expand Down Expand Up @@ -125,5 +127,12 @@ protected Target_org_apache_avro_generic_GenericDatumReader(GenericData data) {

}

class GraalVM20OrEarlier implements BooleanSupplier {
@Override
public boolean getAsBoolean() {
return Version.getCurrent().compareTo(21) < 0;
}
}

class AvroSubstitutions {
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.AdditionalIndexedClassesBuildItem;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
Expand Down Expand Up @@ -103,6 +106,11 @@ public class KafkaProcessor {

static final DotName OBJECT_MAPPER = DotName.createSimple("com.fasterxml.jackson.databind.ObjectMapper");

@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.KAFKA_CLIENT);
}

@BuildStep
void contributeClassesToIndex(BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexedClasses,
BuildProducer<IndexDependencyBuildItem> indexDependency) {
Expand All @@ -124,7 +132,8 @@ public void build(
BuildProducer<ServiceProviderBuildItem> serviceProviders,
BuildProducer<NativeImageProxyDefinitionBuildItem> proxies,
Capabilities capabilities, BuildProducer<UnremovableBeanBuildItem> beans,
BuildProducer<NativeImageResourceBuildItem> nativeLibs, NativeConfig nativeConfig) {
BuildProducer<NativeImageResourceBuildItem> nativeLibs, NativeConfig nativeConfig,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport) {
final Set<DotName> toRegister = new HashSet<>();

collectImplementors(toRegister, indexBuildItem, Serializer.class);
Expand Down Expand Up @@ -177,7 +186,7 @@ public void build(
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "java.nio.DirectByteBuffer"));
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "sun.misc.Cleaner"));

handleAvro(reflectiveClass, proxies, serviceProviders);
handleAvro(reflectiveClass, proxies, serviceProviders, sslNativeSupport);
handleOpenTracing(reflectiveClass, capabilities);
handleStrimziOAuth(reflectiveClass);
if (config.snappyEnabled) {
Expand Down Expand Up @@ -259,8 +268,11 @@ private void handleStrimziOAuth(BuildProducer<ReflectiveClassBuildItem> reflecti

private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<NativeImageProxyDefinitionBuildItem> proxies,
BuildProducer<ServiceProviderBuildItem> serviceProviders) {
BuildProducer<ServiceProviderBuildItem> serviceProviders,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport) {
// Avro - for both Confluent and Apicurio

// --- Confluent ---
try {
Class.forName("io.confluent.kafka.serializers.KafkaAvroDeserializer", false,
Thread.currentThread().getContextClassLoader());
Expand Down Expand Up @@ -310,6 +322,8 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
} catch (ClassNotFoundException e) {
// ignore, Confluent schema registry client not in the classpath
}

// --- Apicurio Registry 1.x ---
try {
Class.forName("io.apicurio.registry.utils.serde.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
Expand All @@ -319,6 +333,7 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
"io.apicurio.registry.utils.serde.AvroKafkaSerializer"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.utils.serde.avro.DefaultAvroDatumProvider",
"io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider",
"io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
"io.apicurio.registry.utils.serde.strategy.CachedSchemaIdStrategy",
Expand All @@ -337,6 +352,38 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
} catch (ClassNotFoundException e) {
//ignore, Apicurio Avro is not in the classpath
}

// --- Apicurio Registry 2.x ---
try {
Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
reflectiveClass.produce(
new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.serde.avro.AvroKafkaDeserializer",
"io.apicurio.registry.serde.avro.AvroKafkaSerializer"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy",
"io.apicurio.registry.serde.strategy.TopicIdStrategy",
"io.apicurio.registry.serde.avro.DefaultAvroDatumProvider",
"io.apicurio.registry.serde.avro.ReflectAvroDatumProvider",
"io.apicurio.registry.serde.avro.strategy.RecordIdStrategy",
"io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.serde.DefaultSchemaResolver",
"io.apicurio.registry.serde.DefaultIdHandler",
"io.apicurio.registry.serde.Legacy4ByteIdHandler",
"io.apicurio.registry.serde.DefaultSchemaResolver",
"io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider",
"io.apicurio.registry.serde.headers.DefaultHeadersHandler"));

// Apicurio Registry 2.x uses the JDK 11 HTTP client, which unconditionally requires SSL
sslNativeSupport.produce(new ExtensionSslNativeSupportBuildItem(Feature.KAFKA_CLIENT));

} catch (ClassNotFoundException e) {
//ignore, Apicurio Avro is not in the classpath
}
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ public class KafkaStreamsRuntimeConfig {
/**
* The schema registry key.
*
* e.g. to diff between different registry impls / instances
* as they have this registry url under different property key.
* Different schema registry libraries expect a registry URL
* in different configuration properties.
*
* Red Hat / Apicurio - apicurio.registry.url
* Confluent - schema.registry.url
* For Apicurio Registry, use {@code apicurio.registry.url}.
* For Confluent schema registry, use {@code schema.registry.url}.
*/
@ConfigItem(defaultValue = "schema.registry.url")
public String schemaRegistryKey;

/**
* The schema registry url.
* The schema registry URL.
*/
@ConfigItem
public Optional<String> schemaRegistryUrl;
Expand Down
Loading

0 comments on commit 398a7f9

Please sign in to comment.