Skip to content

Commit

Permalink
[Fix quarkiverse#281] Adding integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Sep 24, 2024
1 parent 5ace968 commit 57e2c2b
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private void initSerializers(List<String> serializers, String className, Class<?

try (MethodCreator init = factory.getMethodCreator("initAdditionalSerializers", void.class)) {
init.setModifiers(Modifier.PROTECTED);
MethodDescriptor addSerializer = MethodDescriptor.ofMethod(SerializerFactoryBase.class, "addSerializer",
MethodDescriptor addSerializer = MethodDescriptor.ofMethod(baseClass, "addSerializer",
void.class, String.class, type);

for (String serializerName : serializers) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.reactivemessaging.http;

import io.quarkus.reactivemessaging.http.runtime.serializers.Deserializer;
import io.vertx.core.buffer.Buffer;

public class FirstLowerCaseDeserializer implements Deserializer<String> {

@Override
public String deserialize(Buffer payload) {
StringBuilder sb = new StringBuilder(payload.toString());
if (sb.length() > 0) {
sb.setCharAt(0, Character.toLowerCase(sb.charAt(0)));
}
return sb.toString();
}
}
2 changes: 2 additions & 0 deletions integration-tests/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mp.messaging.incoming.websocket-source.connector=quarkus-websocket
mp.messaging.incoming.websocket-source.path=/my-websocket
mp.messaging.incoming.websocket-source.deserializer=io.quarkus.reactivemessaging.http.FirstLowerCaseDeserializer

mp.messaging.outgoing.websocket-sink.connector=quarkus-websocket
mp.messaging.outgoing.websocket-sink.url=ws://localhost:${quarkus.http.test-port:8081}/my-websocket
Expand All @@ -8,6 +9,7 @@ mp.messaging.outgoing.websocket-sink.serializer=io.quarkus.reactivemessaging.htt
mp.messaging.incoming.http-source.connector=quarkus-http
mp.messaging.incoming.http-source.path=/my-http-resource
mp.messaging.incoming.http-source.method=PUT
mp.messaging.incoming.http-source.deserializer=io.quarkus.reactivemessaging.http.FirstLowerCaseDeserializer

mp.messaging.outgoing.http-sink.connector=quarkus-http
mp.messaging.outgoing.http-sink.method=PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void shouldSendAndConsumeWebSocketAndUseCustomSerializer() {

await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> get("/websocket-helper").getBody().asString(), Predicate.isEqual("TEST-MESSAGE"));
.until(() -> get("/websocket-helper").getBody().asString(), Predicate.isEqual("tEST-MESSAGE"));
}

@Test
Expand All @@ -84,7 +84,7 @@ public void shouldSendAndConsumeHttpAndUseCustomSerializer() throws Exception {

await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> get("/http-helper").getBody().asString(), Predicate.isEqual("TEST-MESSAGE"));
.until(() -> get("/http-helper").getBody().asString(), Predicate.isEqual("tEST-MESSAGE"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.quarkus.reactivemessaging.http.runtime.serializers.DeserializerFactoryBase;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.RoutingContext;

Expand Down Expand Up @@ -62,16 +61,16 @@ protected void handleRequest(RoutingContext event, MultiEmitter<? super HttpMess
"No consumer subscribed for messages sent to Reactive Messaging HTTP endpoint on path: " + path);
} else if (guard.prepareToEmit()) {
try {
HttpMessage<Buffer> message = new HttpMessage<>(event.getBody(), new IncomingHttpMetadata(event),
emitter.emit(new HttpMessage<>(
deserializerFactory.getDeserializer(deserializerName).map(d -> d.deserialize(event.getBody()))
.orElse(event.getBody()),
new IncomingHttpMetadata(event),
() -> {
if (!event.response().ended()) {
event.response().setStatusCode(202).end();
}
},
error -> onUnexpectedError(event, error, "Failed to process message."));
deserializerFactory.getDeserializer(deserializerName)
.ifPresent(d -> message.withPayload(d.deserialize(message.getPayload())));
emitter.emit(message);
error -> onUnexpectedError(event, error, "Failed to process message.")));
} catch (Exception any) {
guard.dequeue();
onUnexpectedError(event, any, "Emitting message failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@
import java.util.Map;
import java.util.Optional;

import org.jboss.logging.Logger;

/**
* a base superclass for a SerializerFactory that is generated in build time
*/
public abstract class DeserializerFactoryBase {
private static final Logger log = Logger.getLogger(DeserializerFactoryBase.class);

private final Map<String, Deserializer<?>> deserializersByClassName = new HashMap<>();

Expand All @@ -35,7 +32,7 @@ public void addSerializer(String className, Deserializer<?> serializer) {
deserializersByClassName.put(className, serializer);
}

/**
/*
* method that initializes additional serializers (used by user's config).
* Implemented in the generated subclass
*/
Expand Down

0 comments on commit 57e2c2b

Please sign in to comment.