diff --git a/deployment/src/main/java/io/quarkus/reactivemessaging/http/deployment/ReactiveHttpProcessor.java b/deployment/src/main/java/io/quarkus/reactivemessaging/http/deployment/ReactiveHttpProcessor.java index 9a3eaff..caa2853 100644 --- a/deployment/src/main/java/io/quarkus/reactivemessaging/http/deployment/ReactiveHttpProcessor.java +++ b/deployment/src/main/java/io/quarkus/reactivemessaging/http/deployment/ReactiveHttpProcessor.java @@ -219,7 +219,7 @@ private void initSerializers(List serializers, String className, Class { + + @Override + public String deserialize(Buffer payload) { + StringBuilder sb = new StringBuilder(payload.toString()); + if (sb.length() > 0) { + sb.setCharAt(0, Character.toLowerCase(sb.charAt(0))); + } + return sb.toString(); + } +} diff --git a/integration-tests/src/main/resources/application.properties b/integration-tests/src/main/resources/application.properties index d045abe..ec8f829 100644 --- a/integration-tests/src/main/resources/application.properties +++ b/integration-tests/src/main/resources/application.properties @@ -1,5 +1,6 @@ mp.messaging.incoming.websocket-source.connector=quarkus-websocket mp.messaging.incoming.websocket-source.path=/my-websocket +mp.messaging.incoming.websocket-source.deserializer=io.quarkus.reactivemessaging.http.FirstLowerCaseDeserializer mp.messaging.outgoing.websocket-sink.connector=quarkus-websocket mp.messaging.outgoing.websocket-sink.url=ws://localhost:${quarkus.http.test-port:8081}/my-websocket @@ -8,6 +9,7 @@ mp.messaging.outgoing.websocket-sink.serializer=io.quarkus.reactivemessaging.htt mp.messaging.incoming.http-source.connector=quarkus-http mp.messaging.incoming.http-source.path=/my-http-resource mp.messaging.incoming.http-source.method=PUT +mp.messaging.incoming.http-source.deserializer=io.quarkus.reactivemessaging.http.FirstLowerCaseDeserializer mp.messaging.outgoing.http-sink.connector=quarkus-http mp.messaging.outgoing.http-sink.method=PUT diff --git a/integration-tests/src/test/java/io/quarkus/reactivemessaging/http/ReactiveMessagingHttpTest.java b/integration-tests/src/test/java/io/quarkus/reactivemessaging/http/ReactiveMessagingHttpTest.java index fb631d4..060c1fb 100644 --- a/integration-tests/src/test/java/io/quarkus/reactivemessaging/http/ReactiveMessagingHttpTest.java +++ b/integration-tests/src/test/java/io/quarkus/reactivemessaging/http/ReactiveMessagingHttpTest.java @@ -68,7 +68,7 @@ public void shouldSendAndConsumeWebSocketAndUseCustomSerializer() { await() .atMost(10, TimeUnit.SECONDS) - .until(() -> get("/websocket-helper").getBody().asString(), Predicate.isEqual("TEST-MESSAGE")); + .until(() -> get("/websocket-helper").getBody().asString(), Predicate.isEqual("tEST-MESSAGE")); } @Test @@ -84,7 +84,7 @@ public void shouldSendAndConsumeHttpAndUseCustomSerializer() throws Exception { await() .atMost(10, TimeUnit.SECONDS) - .until(() -> get("/http-helper").getBody().asString(), Predicate.isEqual("TEST-MESSAGE")); + .until(() -> get("/http-helper").getBody().asString(), Predicate.isEqual("tEST-MESSAGE")); } @Test diff --git a/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/ReactiveHttpHandlerBean.java b/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/ReactiveHttpHandlerBean.java index fbf1397..3106954 100644 --- a/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/ReactiveHttpHandlerBean.java +++ b/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/ReactiveHttpHandlerBean.java @@ -12,7 +12,6 @@ import io.quarkus.reactivemessaging.http.runtime.serializers.DeserializerFactoryBase; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.subscription.MultiEmitter; -import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.ext.web.RoutingContext; @@ -62,16 +61,16 @@ protected void handleRequest(RoutingContext event, MultiEmitter message = new HttpMessage<>(event.getBody(), new IncomingHttpMetadata(event), + emitter.emit(new HttpMessage<>( + deserializerFactory.getDeserializer(deserializerName).map(d -> d.deserialize(event.getBody())) + .orElse(event.getBody()), + new IncomingHttpMetadata(event), () -> { if (!event.response().ended()) { event.response().setStatusCode(202).end(); } }, - error -> onUnexpectedError(event, error, "Failed to process message.")); - deserializerFactory.getDeserializer(deserializerName) - .ifPresent(d -> message.withPayload(d.deserialize(message.getPayload()))); - emitter.emit(message); + error -> onUnexpectedError(event, error, "Failed to process message."))); } catch (Exception any) { guard.dequeue(); onUnexpectedError(event, any, "Emitting message failed"); diff --git a/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/serializers/DeserializerFactoryBase.java b/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/serializers/DeserializerFactoryBase.java index 6f65cdb..99411ae 100644 --- a/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/serializers/DeserializerFactoryBase.java +++ b/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/serializers/DeserializerFactoryBase.java @@ -4,13 +4,10 @@ import java.util.Map; import java.util.Optional; -import org.jboss.logging.Logger; - /** * a base superclass for a SerializerFactory that is generated in build time */ public abstract class DeserializerFactoryBase { - private static final Logger log = Logger.getLogger(DeserializerFactoryBase.class); private final Map> deserializersByClassName = new HashMap<>(); @@ -35,7 +32,7 @@ public void addSerializer(String className, Deserializer serializer) { deserializersByClassName.put(className, serializer); } - /** + /* * method that initializes additional serializers (used by user's config). * Implemented in the generated subclass */