Skip to content

Commit

Permalink
Merge pull request #21992 from cescoffier/serde-generation
Browse files Browse the repository at this point in the history
Generate serializer and deserializer for Kafka when none are found
  • Loading branch information
cescoffier authored Dec 7, 2021
2 parents 99ea971 + b23b986 commit fa6184d
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 106 deletions.
52 changes: 10 additions & 42 deletions docs/src/main/asciidoc/kafka-reactive-getting-started.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,6 @@ In this case, we need to configure the channel in the `application.properties` f
# Configure the incoming `quote-requests` Kafka topic
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.auto.offset.reset=earliest
# Configure the outgoing `quotes` Kafka topic
mp.messaging.outgoing.quotes.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
----

Note that in this case we have one incoming and one outgoing connector configuration, each one distinctly named.
Expand All @@ -286,14 +283,8 @@ An exhaustive list of configuration properties is available in xref:kafka.adoc#k
`mp.messaging.incoming.requests.auto.offset.reset=earliest` instructs the application to start reading the topics from the first offset, when there is no committed offset for the consumer group.
In other words, it will also process messages sent before we start the processor application.

Also, for the outgoing configuration we specified the serializer because we are sending a `Quote` object as the message payload.

Quarkus provides default implementations for Kafka serializer/deserializer pairs using Jackson `ObjectMapper`.
`ObjectMapperSerializer` can be used to serialize all objects via Jackson.

TIP: Quarkus detects serializers and deserializers.
However, in this case, there is no serializer on the classpath that handle `Quote` objects, so Quarkus would not find one.
Thus, we must configure the serializer attribute to indicate that we want to send the value as JSON objects.
There is no need to set serializers or deserializers.
Quarkus detects them, and if none are found, generates them using JSON serialization.

== Receiving quotes

Expand Down Expand Up @@ -323,37 +314,7 @@ public Multi<Quote> stream() {
<3> Returns the stream (_Reactive Stream_)

No need to configure anything, as Quarkus will automatically associate the `quotes` channel to the `quotes` Kafka topic.

[NOTE]
====
In this guide we explore Smallrye Reactive Messaging framework to interact with Apache Kafka.
Quarkus extension for Kafka also allows
xref:kafka.adoc#kafka-bare-clients[using Kafka clients directly].
====

== JSON serialization via Jackson

Finally, we will configure JSON serialization for messages using Jackson.
Previously, we've seen the usage of `ObjectMapperSerializer` to serialize objects via Jackson.
For the corresponding deserializer class, we need to create a subclass of `ObjectMapperDeserializer`.

So, let's create it inside _producer_ project on `src/main/java/org/acme/kafka/model/QuoteDeserializer.java`

[source,java]
----
package org.acme.kafka.model;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class QuoteDeserializer extends ObjectMapperDeserializer<Quote> {
public QuoteDeserializer() {
super(Quote.class);
}
}
----

No need to add any configuration for this inside `application.properties` file.
Quarkus will automatically detect this deserializer.
It will also generate a deserializer for the `Quote` class.

[TIP]
====
Expand Down Expand Up @@ -552,3 +513,10 @@ This guide has shown how you can interact with Kafka using Quarkus.
It utilizes https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging] to build data streaming applications.

For the exhaustive list of features and configuration options, check the xref:kafka.adoc[Reference guide for Apache Kafka Extension].

[NOTE]
====
In this guide we explore Smallrye Reactive Messaging framework to interact with Apache Kafka.
Quarkus extension for Kafka also allows
xref:kafka.adoc#kafka-bare-clients[using Kafka clients directly].
====
33 changes: 22 additions & 11 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1237,15 +1237,9 @@ NOTE: With JSON serialization correctly configured, you can also use `Publisher<
[[jackson-serialization]]
=== Serializing via Jackson

First, you need to include the `quarkus-jackson` extension.

[source, xml]
----
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
----
Quarkus has built-in support for JSON serialization and deserialization based on Jackson.
It will also <<serialization-generation, generate>> the serializer and deserializer for you, so you do not have to configure anything.
When generation is disabled, you can use the provided `ObjectMapperSerializer` and `ObjectMapperDeserializer` as explained below.

There is an existing `ObjectMapperSerializer` that can be used to serialize all data objects via Jackson.
You may create an empty subclass if you want to use <<serialization-autodetection>>.
Expand Down Expand Up @@ -1275,12 +1269,10 @@ Finally, configure your channels to use the Jackson serializer and deserializer.
[source,properties]
----
# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
----
Expand Down Expand Up @@ -1453,6 +1445,25 @@ If a serializer/deserializer is set by configuration, it won't be replaced by th
In case you have any issues with serializer autodetection, you can switch it off completely by setting `quarkus.reactive-messaging.kafka.serializer-autodetection.enabled=false`.
If you find you need to do this, please file a bug in the link:https://github.com/quarkusio/quarkus/issues[Quarkus issue tracker] so we can fix whatever problem you have.

[[serialization-generation]]
== JSON Serializer/deserializer generation
Quarkus automatically generates serializers and deserializers for channels where:

1. the serializer/deserializer is not configured
2. the auto-detection did not find a matching serializer/deserializer

It uses Jackson underneath.

This generation can be disabled using:

[source, properties]
----
quarkus.reactive-messaging.kafka.serializer-generation.enabled=false
----

IMPORTANT: Generation does not support collections such as `List<Fruit>`.
Refer to <<jackson-serialization>> to write your own serializer/deserializer for this case.

== Using Schema Registry

This is described in a dedicated guide: link:kafka-schema-registry-avro[Using Apache Kafka with Schema Registry and Avro].
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import java.util.UUID;

import org.jboss.jandex.Type;

import io.quarkus.deployment.GeneratedClassGizmoAdaptor;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.gizmo.ClassCreator;
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.gizmo.MethodCreator;
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import io.quarkus.kafka.client.serialization.ObjectMapperSerializer;
import io.quarkus.runtime.util.HashUtil;

public class JacksonSerdeGenerator {

private JacksonSerdeGenerator() {
// Avoid direct instantiation
}

public static String generateSerializer(BuildProducer<GeneratedClassBuildItem> generatedClass, Type type) {
ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);
String baseName = type.name().withoutPackagePrefix();
String targetPackage = io.quarkus.arc.processor.DotNames
.internalPackageNameWithTrailingSlash(type.name());
String out = baseName + "_Serializer_" + HashUtil.sha1(UUID.randomUUID().toString());
String generatedName = targetPackage + out;
ClassCreator creator = ClassCreator.builder().classOutput(classOutput).className(generatedName)
.superClass(ObjectMapperSerializer.class).build();
creator.close();
return type.name().packagePrefix() + "." + out;
}

public static String generateDeserializer(BuildProducer<GeneratedClassBuildItem> generatedClass, Type type) {
ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);
String baseName = type.name().withoutPackagePrefix();
String targetPackage = io.quarkus.arc.processor.DotNames
.internalPackageNameWithTrailingSlash(type.name());
String out = baseName + "_Deserializer_" + HashUtil.sha1(Long.toString(UUID.randomUUID().getMostSignificantBits()));
String generatedName = targetPackage + out;
ClassCreator creator = ClassCreator.builder().classOutput(classOutput).className(generatedName)
.superClass(ObjectMapperDeserializer.class).build();
MethodCreator constructor = creator.getMethodCreator("<init>", void.class);
MethodDescriptor superConstructor = MethodDescriptor.ofConstructor(ObjectMapperDeserializer.class, Class.class);
constructor.invokeSpecialMethod(superConstructor, constructor.getThis(), constructor.loadClass(type.name().toString()));
constructor.returnValue(null);
constructor.close();
creator.close();
return type.name().packagePrefix() + "." + out;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@
@ConfigRoot(name = "reactive-messaging.kafka", phase = ConfigPhase.BUILD_TIME)
public class ReactiveMessagingKafkaBuildTimeConfig {
/**
* Whether or not Kafka serializer/deserializer autodetection is enabled.
* Whether or not Kafka serializer/deserializer auto-detection is enabled.
*/
@ConfigItem(name = "serializer-autodetection.enabled", defaultValue = "true")
public boolean serializerAutodetectionEnabled;

/**
* Whether or not Kafka serializer/deserializer generation is enabled.
* When no serializer/deserializer are found and not set, Quarkus generates a Jackson-based serde.
*/
@ConfigItem(name = "serializer-generation.enabled", defaultValue = "true")
public boolean serializerGenerationEnabled;
}
Loading

0 comments on commit fa6184d

Please sign in to comment.