Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Apicurio Registry 2.x Avro library #16473

Merged
merged 3 commits into from
Apr 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/native-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
{
"category": "Messaging1",
"timeout": 100,
"test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-snappy kafka-streams reactive-messaging-kafka"
"test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-avro-apicurio2 kafka-snappy kafka-streams reactive-messaging-kafka"
},
{
"category": "Messaging2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public enum Feature {
JDBC_ORACLE,
JGIT,
JSCH,
KAFKA_CLIENT,
KAFKA_STREAMS,
KEYCLOAK_AUTHORIZATION,
KOTLIN,
Expand Down
185 changes: 79 additions & 106 deletions docs/src/main/asciidoc/kafka-schema-registry-avro.adoc

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/src/main/asciidoc/native-and-ssl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ As SSL is de facto the standard nowadays, we decided to enable its support autom
* the Infinispan Client extension (`quarkus-infinispan-client`).
* the Jaeger extension (`quarkus-jaeger`),
* the JGit extension (`quarkus-jgit`),
* the Kafka Client extension (`quarkus-kafka-client`), if Apicurio Registry 2.x Avro library is used
* the Keycloak extension (`quarkus-keycloak`),
* the Kubernetes client extension (`quarkus-kubernetes-client`),
* the Mailer extension (`quarkus-mailer`),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.BooleanSupplier;
import java.util.function.Function;

import org.apache.avro.Schema;
Expand All @@ -12,14 +13,15 @@
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.WeakIdentityHashMap;
import org.graalvm.home.Version;

import com.oracle.svm.core.annotate.Alias;
import com.oracle.svm.core.annotate.Inject;
import com.oracle.svm.core.annotate.RecomputeFieldValue;
import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;

@TargetClass(className = "org.apache.avro.reflect.ReflectionUtil")
@TargetClass(className = "org.apache.avro.reflect.ReflectionUtil", onlyWith = GraalVM20OrEarlier.class)
final class Target_org_apache_avro_reflect_ReflectionUtil {

/**
Expand Down Expand Up @@ -47,7 +49,7 @@ public R apply(V v) {

}

@TargetClass(className = "org.apache.avro.reflect.ReflectData")
@TargetClass(className = "org.apache.avro.reflect.ReflectData", onlyWith = GraalVM20OrEarlier.class)
final class Target_org_apache_avro_reflect_ReflectData {

@Inject
Expand All @@ -74,7 +76,7 @@ private Target_org_apache_avro_reflect_ReflectData_ClassAccessorData getClassAcc
}
}

@TargetClass(className = "org.apache.avro.reflect.ReflectData", innerClass = "ClassAccessorData")
@TargetClass(className = "org.apache.avro.reflect.ReflectData", innerClass = "ClassAccessorData", onlyWith = GraalVM20OrEarlier.class)
final class Target_org_apache_avro_reflect_ReflectData_ClassAccessorData<T> {
// Just provide access to "ReflectData.ClassAccessorData"

Expand Down Expand Up @@ -125,5 +127,12 @@ protected Target_org_apache_avro_generic_GenericDatumReader(GenericData data) {

}

class GraalVM20OrEarlier implements BooleanSupplier {
@Override
public boolean getAsBoolean() {
return Version.getCurrent().compareTo(21) < 0;
}
}

class AvroSubstitutions {
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.AdditionalIndexedClassesBuildItem;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
Expand Down Expand Up @@ -103,6 +106,11 @@ public class KafkaProcessor {

static final DotName OBJECT_MAPPER = DotName.createSimple("com.fasterxml.jackson.databind.ObjectMapper");

@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.KAFKA_CLIENT);
}

@BuildStep
void contributeClassesToIndex(BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexedClasses,
BuildProducer<IndexDependencyBuildItem> indexDependency) {
Expand All @@ -124,7 +132,8 @@ public void build(
BuildProducer<ServiceProviderBuildItem> serviceProviders,
BuildProducer<NativeImageProxyDefinitionBuildItem> proxies,
Capabilities capabilities, BuildProducer<UnremovableBeanBuildItem> beans,
BuildProducer<NativeImageResourceBuildItem> nativeLibs, NativeConfig nativeConfig) {
BuildProducer<NativeImageResourceBuildItem> nativeLibs, NativeConfig nativeConfig,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport) {
final Set<DotName> toRegister = new HashSet<>();

collectImplementors(toRegister, indexBuildItem, Serializer.class);
Expand Down Expand Up @@ -177,7 +186,7 @@ public void build(
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "java.nio.DirectByteBuffer"));
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "sun.misc.Cleaner"));

handleAvro(reflectiveClass, proxies, serviceProviders);
handleAvro(reflectiveClass, proxies, serviceProviders, sslNativeSupport);
handleOpenTracing(reflectiveClass, capabilities);
handleStrimziOAuth(reflectiveClass);
if (config.snappyEnabled) {
Expand Down Expand Up @@ -259,8 +268,11 @@ private void handleStrimziOAuth(BuildProducer<ReflectiveClassBuildItem> reflecti

private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<NativeImageProxyDefinitionBuildItem> proxies,
BuildProducer<ServiceProviderBuildItem> serviceProviders) {
BuildProducer<ServiceProviderBuildItem> serviceProviders,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport) {
// Avro - for both Confluent and Apicurio

// --- Confluent ---
try {
Class.forName("io.confluent.kafka.serializers.KafkaAvroDeserializer", false,
Thread.currentThread().getContextClassLoader());
Expand Down Expand Up @@ -310,6 +322,8 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
} catch (ClassNotFoundException e) {
// ignore, Confluent schema registry client not in the classpath
}

// --- Apicurio Registry 1.x ---
try {
Class.forName("io.apicurio.registry.utils.serde.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
Expand All @@ -319,6 +333,7 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
"io.apicurio.registry.utils.serde.AvroKafkaSerializer"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.utils.serde.avro.DefaultAvroDatumProvider",
"io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider",
"io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
"io.apicurio.registry.utils.serde.strategy.CachedSchemaIdStrategy",
Expand All @@ -337,6 +352,38 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
} catch (ClassNotFoundException e) {
//ignore, Apicurio Avro is not in the classpath
}

// --- Apicurio Registry 2.x ---
try {
Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
reflectiveClass.produce(
new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.serde.avro.AvroKafkaDeserializer",
"io.apicurio.registry.serde.avro.AvroKafkaSerializer"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy",
"io.apicurio.registry.serde.strategy.TopicIdStrategy",
"io.apicurio.registry.serde.avro.DefaultAvroDatumProvider",
"io.apicurio.registry.serde.avro.ReflectAvroDatumProvider",
"io.apicurio.registry.serde.avro.strategy.RecordIdStrategy",
"io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.serde.DefaultSchemaResolver",
"io.apicurio.registry.serde.DefaultIdHandler",
"io.apicurio.registry.serde.Legacy4ByteIdHandler",
"io.apicurio.registry.serde.DefaultSchemaResolver",
"io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider",
"io.apicurio.registry.serde.headers.DefaultHeadersHandler"));

// Apicurio Registry 2.x uses the JDK 11 HTTP client, which unconditionally requires SSL
sslNativeSupport.produce(new ExtensionSslNativeSupportBuildItem(Feature.KAFKA_CLIENT));

} catch (ClassNotFoundException e) {
//ignore, Apicurio Avro is not in the classpath
}
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ public class KafkaStreamsRuntimeConfig {
/**
* The schema registry key.
*
* e.g. to diff between different registry impls / instances
* as they have this registry url under different property key.
* Different schema registry libraries expect a registry URL
* in different configuration properties.
*
* Red Hat / Apicurio - apicurio.registry.url
* Confluent - schema.registry.url
* For Apicurio Registry, use {@code apicurio.registry.url}.
* For Confluent schema registry, use {@code schema.registry.url}.
*/
@ConfigItem(defaultValue = "schema.registry.url")
public String schemaRegistryKey;

/**
* The schema registry url.
* The schema registry URL.
*/
@ConfigItem
public Optional<String> schemaRegistryUrl;
Expand Down
Loading