Skip to content

Commit

Permalink
Merge pull request #284 from fjtirado/Fix_#281
Browse files Browse the repository at this point in the history
[Fix #281] Adding deserializer support
  • Loading branch information
fjtirado authored Sep 24, 2024
2 parents eae96c6 + 87014f2 commit 39e7392
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import io.quarkus.reactivemessaging.http.runtime.converters.JsonObjectConverter;
import io.quarkus.reactivemessaging.http.runtime.converters.ObjectConverter;
import io.quarkus.reactivemessaging.http.runtime.converters.StringConverter;
import io.quarkus.reactivemessaging.http.runtime.serializers.Deserializer;
import io.quarkus.reactivemessaging.http.runtime.serializers.DeserializerFactoryBase;
import io.quarkus.reactivemessaging.http.runtime.serializers.Serializer;
import io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactoryBase;
import io.quarkus.vertx.http.deployment.BodyHandlerBuildItem;
Expand Down Expand Up @@ -135,7 +137,12 @@ void registerHttpConnector(BuildProducer<AdditionalBeanBuildItem> beanProducer,
.forEach(path -> routeProducer.produce(RouteBuildItem.builder().route(path).handler(handler).build()));
}

initSerializers(ReactiveHttpConfig.readSerializers(), generatedBeanProducer);
initSerializers(ReactiveHttpConfig.readSerializers(),
"io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactory", Serializer.class,
SerializerFactoryBase.class, generatedBeanProducer);
initSerializers(ReactiveHttpConfig.readDeserializers(),
"io.quarkus.reactivemessaging.http.runtime.serializers.DeserializerFactory", Deserializer.class,
DeserializerFactoryBase.class, generatedBeanProducer);
}

@BuildStep
Expand Down Expand Up @@ -201,18 +208,19 @@ private void collectPayloadType(Set<String> payloadClasses, Type type) {
}
}

private void initSerializers(List<String> serializers, BuildProducer<GeneratedBeanBuildItem> generatedBeans) {
private void initSerializers(List<String> serializers, String className, Class<?> type, Class<?> baseClass,
BuildProducer<GeneratedBeanBuildItem> generatedBeans) {
ClassOutput classOutput = new GeneratedBeanGizmoAdaptor(generatedBeans);
try (ClassCreator factory = ClassCreator.builder().classOutput(classOutput)
.className("io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactory")
.superClass(SerializerFactoryBase.class)
.className(className)
.superClass(baseClass)
.build()) {
factory.addAnnotation(ApplicationScoped.class);

try (MethodCreator init = factory.getMethodCreator("initAdditionalSerializers", void.class)) {
init.setModifiers(Modifier.PROTECTED);
MethodDescriptor addSerializer = MethodDescriptor.ofMethod(SerializerFactoryBase.class, "addSerializer",
void.class, String.class, Serializer.class);
MethodDescriptor addSerializer = MethodDescriptor.ofMethod(baseClass, "addSerializer",
void.class, String.class, type);

for (String serializerName : serializers) {
ResultHandle serializer = init.newInstance(MethodDescriptor.ofConstructor(serializerName));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.reactivemessaging.http;

import io.quarkus.reactivemessaging.http.runtime.serializers.Deserializer;
import io.vertx.core.buffer.Buffer;

public class FirstLowerCaseDeserializer implements Deserializer<String> {

@Override
public String deserialize(Buffer payload) {
StringBuilder sb = new StringBuilder(payload.toString());
if (sb.length() > 0) {
sb.setCharAt(0, Character.toLowerCase(sb.charAt(0)));
}
return sb.toString();
}
}
2 changes: 2 additions & 0 deletions integration-tests/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mp.messaging.incoming.websocket-source.connector=quarkus-websocket
mp.messaging.incoming.websocket-source.path=/my-websocket
mp.messaging.incoming.websocket-source.deserializer=io.quarkus.reactivemessaging.http.FirstLowerCaseDeserializer

mp.messaging.outgoing.websocket-sink.connector=quarkus-websocket
mp.messaging.outgoing.websocket-sink.url=ws://localhost:${quarkus.http.test-port:8081}/my-websocket
Expand All @@ -8,6 +9,7 @@ mp.messaging.outgoing.websocket-sink.serializer=io.quarkus.reactivemessaging.htt
mp.messaging.incoming.http-source.connector=quarkus-http
mp.messaging.incoming.http-source.path=/my-http-resource
mp.messaging.incoming.http-source.method=PUT
mp.messaging.incoming.http-source.deserializer=io.quarkus.reactivemessaging.http.FirstLowerCaseDeserializer

mp.messaging.outgoing.http-sink.connector=quarkus-http
mp.messaging.outgoing.http-sink.method=PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void shouldSendAndConsumeWebSocketAndUseCustomSerializer() {

await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> get("/websocket-helper").getBody().asString(), Predicate.isEqual("TEST-MESSAGE"));
.until(() -> get("/websocket-helper").getBody().asString(), Predicate.isEqual("tEST-MESSAGE"));
}

@Test
Expand All @@ -84,7 +84,7 @@ public void shouldSendAndConsumeHttpAndUseCustomSerializer() throws Exception {

await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> get("/http-helper").getBody().asString(), Predicate.isEqual("TEST-MESSAGE"));
.until(() -> get("/http-helper").getBody().asString(), Predicate.isEqual("tEST-MESSAGE"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void handle(RoutingContext event) {
if (bundle != null) {
MultiEmitter<? super MessageType> emitter = bundle.emitter;
StrictQueueSizeGuard guard = bundle.guard;
handleRequest(event, emitter, guard, bundle.path);
handleRequest(event, emitter, guard, bundle.path, bundle.deserializerName);
} else {
event.response().setStatusCode(404).end();
}
Expand All @@ -42,6 +42,7 @@ private void addProcessor(ConfigType streamConfig) {
.onItem().invoke(guard::dequeue);
bundle.setProcessor(processor);
bundle.setPath(streamConfig.path);
bundle.setDeserializerName(streamConfig.deserializerName);

Bundle<MessageType> previousProcessor = processors.put(key(streamConfig), bundle);
if (previousProcessor != null) {
Expand All @@ -50,7 +51,7 @@ private void addProcessor(ConfigType streamConfig) {
}

protected abstract void handleRequest(RoutingContext event, MultiEmitter<? super MessageType> emitter,
StrictQueueSizeGuard guard, String path);
StrictQueueSizeGuard guard, String path, String deseralizerName);

protected abstract String description(ConfigType streamConfig);

Expand All @@ -65,6 +66,7 @@ protected class Bundle<MessageType> {
private Multi<MessageType> processor; // effectively final
private MultiEmitter<? super MessageType> emitter; // effectively final
private String path;
private String deserializerName;

private Bundle(StrictQueueSizeGuard guard) {
this.guard = guard;
Expand All @@ -85,5 +87,13 @@ public Multi<MessageType> getProcessor() {
public void setPath(String path) {
this.path = path;
}

public String getDeserializerName() {
return deserializerName;
}

public void setDeserializerName(String deserializerName) {
this.deserializerName = deserializerName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

import io.quarkus.reactivemessaging.http.runtime.config.HttpStreamConfig;
import io.quarkus.reactivemessaging.http.runtime.config.ReactiveHttpConfig;
import io.quarkus.reactivemessaging.http.runtime.serializers.DeserializerFactoryBase;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.RoutingContext;

Expand All @@ -26,6 +26,9 @@ public class ReactiveHttpHandlerBean extends ReactiveHandlerBeanBase<HttpStreamC
@Inject
ReactiveHttpConfig config;

@Inject
DeserializerFactoryBase deserializerFactory;

Multi<HttpMessage<?>> getProcessor(String path, HttpMethod method) {
return processors.get(key(path, method)).getProcessor();
}
Expand All @@ -52,20 +55,22 @@ protected String description(HttpStreamConfig streamConfig) {

@Override
protected void handleRequest(RoutingContext event, MultiEmitter<? super HttpMessage<?>> emitter,
StrictQueueSizeGuard guard, String path) {
StrictQueueSizeGuard guard, String path, String deserializerName) {
if (emitter == null) {
onUnexpectedError(event, null,
"No consumer subscribed for messages sent to Reactive Messaging HTTP endpoint on path: " + path);
} else if (guard.prepareToEmit()) {
try {
HttpMessage<Buffer> message = new HttpMessage<>(event.getBody(), new IncomingHttpMetadata(event),
emitter.emit(new HttpMessage<>(
deserializerFactory.getDeserializer(deserializerName).map(d -> d.deserialize(event.getBody()))
.orElse(event.getBody()),
new IncomingHttpMetadata(event),
() -> {
if (!event.response().ended()) {
event.response().setStatusCode(202).end();
}
},
error -> onUnexpectedError(event, error, "Failed to process message."));
emitter.emit(message);
error -> onUnexpectedError(event, error, "Failed to process message.")));
} catch (Exception any) {
guard.dequeue();
onUnexpectedError(event, any, "Emitting message failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.quarkus.reactivemessaging.http.runtime.config.ReactiveHttpConfig;
import io.quarkus.reactivemessaging.http.runtime.config.WebSocketStreamConfig;
import io.quarkus.reactivemessaging.http.runtime.serializers.DeserializerFactoryBase;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.buffer.Buffer;
Expand All @@ -26,9 +27,12 @@ public class ReactiveWebSocketHandlerBean extends ReactiveHandlerBeanBase<WebSoc
@Inject
ReactiveHttpConfig config;

@Inject
DeserializerFactoryBase deserializerFactory;

@Override
protected void handleRequest(RoutingContext event, MultiEmitter<? super WebSocketMessage<?>> emitter,
StrictQueueSizeGuard guard, String path) {
StrictQueueSizeGuard guard, String path, String deserializerName) {
event.request().toWebSocket(
webSocket -> {
if (webSocket.failed()) {
Expand All @@ -43,7 +47,9 @@ protected void handleRequest(RoutingContext event, MultiEmitter<? super WebSocke
"Reactive Messaging WebSocket endpoint on path: " + path);
} else if (guard.prepareToEmit()) {
try {
emitter.emit(new WebSocketMessage<>(b,
emitter.emit(new WebSocketMessage<>(
deserializerFactory.getDeserializer(deserializerName)
.map(d -> d.deserialize(b)).orElse(b),
new RequestMetadata(event),
() -> serverWebSocket.write(Buffer.buffer("ACK")),
error -> onUnexpectedError(serverWebSocket, error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
public class HttpStreamConfig extends StreamConfigBase {
public final HttpMethod method;

public HttpStreamConfig(String path, String method, String name, int bufferSize) {
super(bufferSize, path);
public HttpStreamConfig(String path, String method, String name, int bufferSize, String deserializerName) {
super(bufferSize, path, deserializerName);
this.method = toHttpMethod(method, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static List<HttpStreamConfig> readIncomingHttpConfigs() {
String path = getConfigProperty(IN_KEY, connectorName, "path", String.class);
int bufferSize = getConfigProperty(IN_KEY, connectorName, "buffer-size",
QuarkusHttpConnector.DEFAULT_SOURCE_BUFFER, Integer.class);
streamConfigs.add(new HttpStreamConfig(path, method, connectorName, bufferSize));
String deserializerName = getConfigProperty(IN_KEY, connectorName, "deserializer", null, String.class);
streamConfigs.add(new HttpStreamConfig(path, method, connectorName, bufferSize, deserializerName));
}
}
return streamConfigs;
Expand All @@ -93,7 +94,8 @@ public static List<WebSocketStreamConfig> readIncomingWebSocketConfigs() {
String path = getConfigProperty(IN_KEY, connectorName, "path", String.class);
int bufferSize = getConfigProperty(IN_KEY, connectorName, "buffer-size",
QuarkusWebSocketConnector.DEFAULT_SOURCE_BUFFER, Integer.class);
streamConfigs.add(new WebSocketStreamConfig(path, bufferSize));
String deserializerName = getConfigProperty(IN_KEY, connectorName, "deserializer", null, String.class);
streamConfigs.add(new WebSocketStreamConfig(path, bufferSize, deserializerName));
}
}
return streamConfigs;
Expand All @@ -105,17 +107,25 @@ public static List<WebSocketStreamConfig> readIncomingWebSocketConfigs() {
* @return list of custom serializer class names
*/
public static List<String> readSerializers() {
return readSerializers(OUT_PATTERN, OUT_KEY, MP_MSG_OUT, "serializer");
}

public static List<String> readDeserializers() {
return readSerializers(IN_PATTERN, IN_KEY, MP_MSG_IN, "deserializer");
}

private static List<String> readSerializers(Pattern pattern, String key, String message, String serializerKey) {
List<String> result = new ArrayList<>();
Config config = ConfigProviderResolver.instance().getConfig();
for (String propertyName : config.getPropertyNames()) {
String connectorName = getConnectorNameIfMatching(OUT_PATTERN, propertyName, OUT_KEY, MP_MSG_OUT,
String connectorName = getConnectorNameIfMatching(pattern, propertyName, key, message,
QuarkusWebSocketConnector.NAME);
if (connectorName == null) {
connectorName = getConnectorNameIfMatching(OUT_PATTERN, propertyName, OUT_KEY, MP_MSG_OUT,
connectorName = getConnectorNameIfMatching(pattern, propertyName, key, message,
QuarkusHttpConnector.NAME);
}
if (connectorName != null) {
String serializer = getConfigProperty(OUT_KEY, connectorName, "serializer", null, String.class);
String serializer = getConfigProperty(key, connectorName, serializerKey, null, String.class);
if (serializer != null) {
result.add(serializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
public class StreamConfigBase {
public final int bufferSize;
public final String path;
public final String deserializerName;

public StreamConfigBase(int bufferSize, String path) {
public StreamConfigBase(int bufferSize, String path, String deserializerName) {
this.path = path;
this.bufferSize = bufferSize;
this.deserializerName = deserializerName;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.quarkus.reactivemessaging.http.runtime.config;

public class WebSocketStreamConfig extends StreamConfigBase {
public WebSocketStreamConfig(String path, int bufferSize) {
super(bufferSize, path);
public WebSocketStreamConfig(String path, int bufferSize, String deserializerName) {
super(bufferSize, path, deserializerName);
}

public String path() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ public Message<?> convert(Message<?> in, Type target) {
}
}

@Override
public int getPriority() {
return CONVERTER_DEFAULT_PRIORITY - 1;
}

protected abstract Message<?> doConvert(Message<?> in, Type target);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
@ApplicationScoped
public class StringConverter implements MessageConverter {

@Override
public boolean canConvert(Message<?> in, Type target) {
return in.getPayload() instanceof Buffer && target == String.class;
Expand All @@ -24,4 +23,9 @@ public boolean canConvert(Message<?> in, Type target) {
public Message<String> convert(Message<?> in, Type target) {
return in.withPayload(((Buffer) in.getPayload()).toString());
}

@Override
public int getPriority() {
return CONVERTER_DEFAULT_PRIORITY - 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactivemessaging.http.runtime.serializers;

import io.vertx.core.buffer.Buffer;

/**
* Reactive http connector deserializer.
* Deserializes given payload from a {@link Buffer}
*
*
* @param <PayloadType> type of the payload to deserialize
*/
public interface Deserializer<PayloadType> {

/**
* Deserialize the payload
*
* @param payload buffer to deserialize
* @return deserialized object
*/
PayloadType deserialize(Buffer payload);

}
Loading

0 comments on commit 39e7392

Please sign in to comment.