Skip to content

Commit

Permalink
Vertx: make the address in ConsumeEvent annotation configurable
Browse files Browse the repository at this point in the history
- resolves quarkusio#36851
  • Loading branch information
mkouba committed Nov 20, 2023
1 parent 668874e commit ad70cb5
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 2 deletions.
16 changes: 16 additions & 0 deletions docs/src/main/asciidoc/vertx-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,22 @@ public String consume(String name) {
----
<1> Receive the messages sent to the `greeting` address

The address value can be a property expression.
In this case, the configured value is used instead: `@ConsumeEvent("${my.consumer.address}")`.
Additionally, the property expression can specify a default value: `@ConsumeEvent("${my.consumer.address:defaultAddress}")`.

.Config Property Example
[source, java]
----
@ConsumeEvent("${my.consumer.address}") // <1>
public String consume(String name) {
return name.toLowerCase();
}
----
<1> Receive the messages sent to the address configured with the `my.consumer.address` key.

NOTE: If no config property with the specified key exists and no default value is set then the application startup fails.

=== Process events asynchronously

The previous examples use synchronous processing.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.vertx.deployment;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import jakarta.enterprise.context.ApplicationScoped;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.runtime.util.ExceptionUtil;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.mutiny.core.eventbus.Message;

public class ConsumerNonexistingConfigPropertyTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot(root -> root.addClasses(MessageConsumers.class))
.assertException(t -> {
Throwable root = ExceptionUtil.getRootCause(t);
assertTrue(
root.getMessage().contains(
"Could not expand value address.does.not.exist in property ${address.does.not.exist}"),
t.toString());
});

@Test
public void test() throws InterruptedException {
fail();
}

@ApplicationScoped
static class MessageConsumers {

@ConsumeEvent("${address.does.not.exist}")
void pub(Message<String> name) {
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class MessageConsumerMethodTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar.addClasses(SimpleBean.class, Transformer.class));
.withApplicationRoot(root -> root.addClasses(SimpleBean.class, Transformer.class))
.overrideConfigKey("foo", "foo-config");

@Inject
SimpleBean simpleBean;
Expand Down Expand Up @@ -200,6 +201,40 @@ public void testBlockingConsumerUsingSmallRyeBlocking() throws InterruptedExcept
assertTrue(message.contains("hello::true"));
}

@Test
public void testConfiguredAddress() throws InterruptedException {
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("foo-config", "hello", ar -> {
if (ar.succeeded()) {
try {
synchronizer.put(ar.result().body());
} catch (InterruptedException e) {
fail(e);
}
} else {
fail(ar.cause());
}
});
assertEquals("HELLO!", synchronizer.poll(2, TimeUnit.SECONDS));
}

@Test
public void testConfiguredAddressDefault() throws InterruptedException {
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("foo-config-default", "hello", ar -> {
if (ar.succeeded()) {
try {
synchronizer.put(ar.result().body());
} catch (InterruptedException e) {
fail(e);
}
} else {
fail(ar.cause());
}
});
assertEquals("hello!", synchronizer.poll(2, TimeUnit.SECONDS));
}

static class SimpleBean {

static volatile CountDownLatch latch;
Expand Down Expand Up @@ -275,6 +310,17 @@ String blockingRequestContextActive(String message) {
int reply(List<Integer> numbers) {
return numbers.stream().collect(Collectors.summingInt(Integer::intValue));
}

@ConsumeEvent("${foo}")
String replyFooConfig(String message) {
return (message + "!").toUpperCase();
}

@ConsumeEvent("${non-existent.address:foo-config-default}")
String replyFooConfigDefault(String message) {
return (message + "!").toLowerCase();
}

}

@RequestScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@
/**
* The address the consumer will be registered to. By default, the fully qualified name of the declaring bean class is
* assumed.
* <p>
* The value can be a config property expression. In this case, the configured value is used instead:
* {@code @ConsumeEvent("${my.consumer.address}")}. Additionally, the property expression can specify a default value:
* {@code @ConsumeEvent("${my.consumer.address:defaultAddress}")}.
*
* @return the address
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;
import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setCurrentContextSafe;
import static io.smallrye.common.expression.Expression.Flag.LENIENT_SYNTAX;
import static io.smallrye.common.expression.Expression.Flag.NO_TRIM;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.spi.ConfigProviderResolver;
import org.jboss.logging.Logger;

import io.quarkus.arc.CurrentContextFactory;
Expand All @@ -25,6 +32,8 @@
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.vertx.LocalEventBusCodec;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.expression.Expression;
import io.smallrye.common.expression.ResolveContext;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
Expand Down Expand Up @@ -92,7 +101,7 @@ void registerMessageConsumers(Map<String, ConsumeEvent> messageConsumerConfigura
final List<Throwable> registrationFailures = new ArrayList<>();
for (Entry<String, ConsumeEvent> entry : messageConsumerConfigurations.entrySet()) {
EventConsumerInvoker invoker = createInvoker(entry.getKey());
String address = entry.getValue().value();
String address = lookUpPropertyValue(entry.getValue().value());
// Create a context attached to each consumer
// If we don't all consumers will use the same event loop and so published messages (dispatched to all
// consumers) delivery is serialized.
Expand Down Expand Up @@ -293,4 +302,55 @@ public String apply(Object messageBody) {
public RuntimeValue<Vertx> forceStart(Supplier<Vertx> vertx) {
return new RuntimeValue<>(vertx.get());
}

/**
* Looks up the property value by checking whether the value is a configuration key and resolves it if so.
*
* @param propertyValue property value to look up.
* @return the resolved property value.
*/
private static String lookUpPropertyValue(String propertyValue) {
String value = propertyValue.stripLeading();
if (!value.isEmpty() && isConfigExpression(value)) {
value = resolvePropertyExpression(value);
}
return value;
}

/**
* Adapted from {@link io.smallrye.config.ExpressionConfigSourceInterceptor}
*/
private static String resolvePropertyExpression(String expr) {
// Force the runtime CL in order to make the DEV UI page work
final ClassLoader cl = VertxEventBusConsumerRecorder.class.getClassLoader();
final Config config = ConfigProviderResolver.instance().getConfig(cl);
final Expression expression = Expression.compile(expr, LENIENT_SYNTAX, NO_TRIM);
final String expanded = expression.evaluate(new BiConsumer<ResolveContext<RuntimeException>, StringBuilder>() {
@Override
public void accept(ResolveContext<RuntimeException> resolveContext, StringBuilder stringBuilder) {
final Optional<String> resolve = config.getOptionalValue(resolveContext.getKey(), String.class);
if (resolve.isPresent()) {
stringBuilder.append(resolve.get());
} else if (resolveContext.hasDefault()) {
resolveContext.expandDefault();
} else {
throw new NoSuchElementException(String.format("Could not expand value %s in property %s",
resolveContext.getKey(), expr));
}
}
});
return expanded;
}

private static boolean isConfigExpression(String val) {
if (val == null) {
return false;
}
int exprStart = val.indexOf("${");
int exprEnd = -1;
if (exprStart >= 0) {
exprEnd = val.indexOf('}', exprStart + 2);
}
return exprEnd > 0;
}
}

0 comments on commit ad70cb5

Please sign in to comment.