From ad70cb5dd2ec0a33323a16097296e0d4e8b1d290 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Mon, 20 Nov 2023 09:47:52 +0100 Subject: [PATCH] Vertx: make the address in ConsumeEvent annotation configurable - resolves #36851 --- docs/src/main/asciidoc/vertx-reference.adoc | 16 +++++ ...ConsumerNonexistingConfigPropertyTest.java | 43 +++++++++++++ .../deployment/MessageConsumerMethodTest.java | 48 +++++++++++++- .../java/io/quarkus/vertx/ConsumeEvent.java | 4 ++ .../VertxEventBusConsumerRecorder.java | 62 ++++++++++++++++++- 5 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/ConsumerNonexistingConfigPropertyTest.java diff --git a/docs/src/main/asciidoc/vertx-reference.adoc b/docs/src/main/asciidoc/vertx-reference.adoc index eed2eadb4d316..e08da1e8ce0ea 100644 --- a/docs/src/main/asciidoc/vertx-reference.adoc +++ b/docs/src/main/asciidoc/vertx-reference.adoc @@ -515,6 +515,22 @@ public String consume(String name) { ---- <1> Receive the messages sent to the `greeting` address +The address value can be a property expression. +In this case, the configured value is used instead: `@ConsumeEvent("${my.consumer.address}")`. +Additionally, the property expression can specify a default value: `@ConsumeEvent("${my.consumer.address:defaultAddress}")`. + +.Config Property Example +[source, java] +---- +@ConsumeEvent("${my.consumer.address}") // <1> +public String consume(String name) { + return name.toLowerCase(); +} +---- +<1> Receive the messages sent to the address configured with the `my.consumer.address` key. + +NOTE: If no config property with the specified key exists and no default value is set then the application startup fails. + === Process events asynchronously The previous examples use synchronous processing. diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/ConsumerNonexistingConfigPropertyTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/ConsumerNonexistingConfigPropertyTest.java new file mode 100644 index 0000000000000..7a44591d0c578 --- /dev/null +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/ConsumerNonexistingConfigPropertyTest.java @@ -0,0 +1,43 @@ +package io.quarkus.vertx.deployment; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.runtime.util.ExceptionUtil; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.ConsumeEvent; +import io.vertx.mutiny.core.eventbus.Message; + +public class ConsumerNonexistingConfigPropertyTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .withApplicationRoot(root -> root.addClasses(MessageConsumers.class)) + .assertException(t -> { + Throwable root = ExceptionUtil.getRootCause(t); + assertTrue( + root.getMessage().contains( + "Could not expand value address.does.not.exist in property ${address.does.not.exist}"), + t.toString()); + }); + + @Test + public void test() throws InterruptedException { + fail(); + } + + @ApplicationScoped + static class MessageConsumers { + + @ConsumeEvent("${address.does.not.exist}") + void pub(Message name) { + } + + } + +} diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerMethodTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerMethodTest.java index abb459f515f87..6c62711ec84f1 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerMethodTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerMethodTest.java @@ -32,7 +32,8 @@ public class MessageConsumerMethodTest { @RegisterExtension static final QuarkusUnitTest config = new QuarkusUnitTest() - .withApplicationRoot((jar) -> jar.addClasses(SimpleBean.class, Transformer.class)); + .withApplicationRoot(root -> root.addClasses(SimpleBean.class, Transformer.class)) + .overrideConfigKey("foo", "foo-config"); @Inject SimpleBean simpleBean; @@ -200,6 +201,40 @@ public void testBlockingConsumerUsingSmallRyeBlocking() throws InterruptedExcept assertTrue(message.contains("hello::true")); } + @Test + public void testConfiguredAddress() throws InterruptedException { + BlockingQueue synchronizer = new LinkedBlockingQueue<>(); + eventBus.request("foo-config", "hello", ar -> { + if (ar.succeeded()) { + try { + synchronizer.put(ar.result().body()); + } catch (InterruptedException e) { + fail(e); + } + } else { + fail(ar.cause()); + } + }); + assertEquals("HELLO!", synchronizer.poll(2, TimeUnit.SECONDS)); + } + + @Test + public void testConfiguredAddressDefault() throws InterruptedException { + BlockingQueue synchronizer = new LinkedBlockingQueue<>(); + eventBus.request("foo-config-default", "hello", ar -> { + if (ar.succeeded()) { + try { + synchronizer.put(ar.result().body()); + } catch (InterruptedException e) { + fail(e); + } + } else { + fail(ar.cause()); + } + }); + assertEquals("hello!", synchronizer.poll(2, TimeUnit.SECONDS)); + } + static class SimpleBean { static volatile CountDownLatch latch; @@ -275,6 +310,17 @@ String blockingRequestContextActive(String message) { int reply(List numbers) { return numbers.stream().collect(Collectors.summingInt(Integer::intValue)); } + + @ConsumeEvent("${foo}") + String replyFooConfig(String message) { + return (message + "!").toUpperCase(); + } + + @ConsumeEvent("${non-existent.address:foo-config-default}") + String replyFooConfigDefault(String message) { + return (message + "!").toLowerCase(); + } + } @RequestScoped diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/ConsumeEvent.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/ConsumeEvent.java index 7e18bfb55dfbb..c25c9f683076f 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/ConsumeEvent.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/ConsumeEvent.java @@ -95,6 +95,10 @@ /** * The address the consumer will be registered to. By default, the fully qualified name of the declaring bean class is * assumed. + *

+ * The value can be a config property expression. In this case, the configured value is used instead: + * {@code @ConsumeEvent("${my.consumer.address}")}. Additionally, the property expression can specify a default value: + * {@code @ConsumeEvent("${my.consumer.address:defaultAddress}")}. * * @return the address */ diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java index 7a3edda524766..71adb62608645 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java @@ -2,18 +2,25 @@ import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setCurrentContextSafe; +import static io.smallrye.common.expression.Expression.Flag.LENIENT_SYNTAX; +import static io.smallrye.common.expression.Expression.Flag.NO_TRIM; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.spi.ConfigProviderResolver; import org.jboss.logging.Logger; import io.quarkus.arc.CurrentContextFactory; @@ -25,6 +32,8 @@ import io.quarkus.vertx.ConsumeEvent; import io.quarkus.vertx.LocalEventBusCodec; import io.quarkus.virtual.threads.VirtualThreadsRecorder; +import io.smallrye.common.expression.Expression; +import io.smallrye.common.expression.ResolveContext; import io.smallrye.common.vertx.VertxContext; import io.vertx.core.AsyncResult; import io.vertx.core.Context; @@ -92,7 +101,7 @@ void registerMessageConsumers(Map messageConsumerConfigura final List registrationFailures = new ArrayList<>(); for (Entry entry : messageConsumerConfigurations.entrySet()) { EventConsumerInvoker invoker = createInvoker(entry.getKey()); - String address = entry.getValue().value(); + String address = lookUpPropertyValue(entry.getValue().value()); // Create a context attached to each consumer // If we don't all consumers will use the same event loop and so published messages (dispatched to all // consumers) delivery is serialized. @@ -293,4 +302,55 @@ public String apply(Object messageBody) { public RuntimeValue forceStart(Supplier vertx) { return new RuntimeValue<>(vertx.get()); } + + /** + * Looks up the property value by checking whether the value is a configuration key and resolves it if so. + * + * @param propertyValue property value to look up. + * @return the resolved property value. + */ + private static String lookUpPropertyValue(String propertyValue) { + String value = propertyValue.stripLeading(); + if (!value.isEmpty() && isConfigExpression(value)) { + value = resolvePropertyExpression(value); + } + return value; + } + + /** + * Adapted from {@link io.smallrye.config.ExpressionConfigSourceInterceptor} + */ + private static String resolvePropertyExpression(String expr) { + // Force the runtime CL in order to make the DEV UI page work + final ClassLoader cl = VertxEventBusConsumerRecorder.class.getClassLoader(); + final Config config = ConfigProviderResolver.instance().getConfig(cl); + final Expression expression = Expression.compile(expr, LENIENT_SYNTAX, NO_TRIM); + final String expanded = expression.evaluate(new BiConsumer, StringBuilder>() { + @Override + public void accept(ResolveContext resolveContext, StringBuilder stringBuilder) { + final Optional resolve = config.getOptionalValue(resolveContext.getKey(), String.class); + if (resolve.isPresent()) { + stringBuilder.append(resolve.get()); + } else if (resolveContext.hasDefault()) { + resolveContext.expandDefault(); + } else { + throw new NoSuchElementException(String.format("Could not expand value %s in property %s", + resolveContext.getKey(), expr)); + } + } + }); + return expanded; + } + + private static boolean isConfigExpression(String val) { + if (val == null) { + return false; + } + int exprStart = val.indexOf("${"); + int exprEnd = -1; + if (exprStart >= 0) { + exprEnd = val.indexOf('}', exprStart + 2); + } + return exprEnd > 0; + } }