From 160724afd985c3c4fdc5e5b01637fc92df276481 Mon Sep 17 00:00:00 2001 From: Jeremy Grelle Date: Fri, 27 Oct 2023 01:20:07 -0400 Subject: [PATCH] Add support for reactive PubSub consumers (#958) --- ...ronaut.build.internal.gcp-testsuite.gradle | 29 ++ gcp-function-http/build.gradle | 2 +- .../gcp/function/http/ReactiveController.java | 2 +- gcp-pubsub/build.gradle | 3 +- .../gcp/pubsub/bind/PubSubBodyBinder.java | 49 ++- .../bind/PubSubDefaultArgumentBinder.java | 2 +- .../intercept/PubSubConsumerAdvice.java | 59 +-- .../gcp/pubsub/bind/PubSubBinderSpec.groovy | 181 ++++++++- .../pubsub/bind/ReactiveConsumerSpec.groovy | 374 ++++++++++++++++++ .../PublisherIntroductionAdviceSpec.groovy | 2 +- gradle/libs.versions.toml | 15 +- settings.gradle | 4 +- .../pullConsumer/subscriberReactive.adoc | 14 + src/main/docs/guide/toc.yml | 1 + test-suite-groovy/build.gradle | 17 +- .../pubsub/quickstart/AnimalListener.groovy | 9 +- .../gcp/pubsub/serdes/XmlMessageSerDes.groovy | 41 ++ .../AcknowledgementSubscriber.groovy | 40 +- .../subscriber/ContentTypeSubscriber.groovy | 4 +- .../CustomConfigurationSubscriber.groovy | 13 +- .../subscriber/CustomHeaderSubscriber.groovy | 13 +- .../subscriber/ErrorHandlingSubscriber.groovy | 6 + .../pubsub/subscriber/MessageProcessor.groovy | 13 + .../subscriber/ReactiveSubscriber.groovy | 63 +++ .../pubsub/subscriber/SimpleSubscriber.groovy | 13 +- .../micronaut/gcp/pubsub/support/Animal.java | 3 + .../AcknowledgementSubscriberSpec.groovy | 147 +++++++ .../ContentTypeSubscriberSpec.groovy | 141 +++++++ .../subscriber/ReactiveSubscriberSpec.groovy | 131 ++++++ .../src/test/resources/application-test.yml | 19 + .../src/test/resources/logback.xml | 15 + test-suite-kotlin/build.gradle | 22 +- .../gcp/pubsub/quickstart/AnimalListener.kt | 6 +- .../gcp/pubsub/serdes/XmlMessageSerDes.kt | 32 ++ .../subscriber/AcknowledgementSubscriber.kt | 34 +- .../subscriber/ContentTypeSubscriber.kt | 8 +- .../CustomConfigurationSubscriber.kt | 4 + .../subscriber/CustomHeaderSubscriber.kt | 4 + .../subscriber/ErrorHandlingSubscriber.kt | 5 + .../gcp/pubsub/subscriber/MessageProcessor.kt | 17 + .../pubsub/subscriber/ReactiveSubscriber.kt | 62 +++ .../gcp/pubsub/subscriber/SimpleSubscriber.kt | 6 +- .../io/micronaut/gcp/pubsub/support/Animal.kt | 5 +- .../AcknowledgementSubscriberSpec.groovy | 144 +++++++ .../ContentTypeSubscriberSpec.groovy | 134 +++++++ .../subscriber/ReactiveSubscriberSpec.groovy | 132 +++++++ .../src/test/resources/application-test.yml | 19 + .../src/test/resources/logback.xml | 15 + test-suite/build.gradle | 20 +- .../gcp/pubsub/quickstart/AnimalListener.java | 4 + .../gcp/pubsub/serdes/XmlMessageSerDes.java | 43 ++ .../subscriber/AcknowledgementSubscriber.java | 28 +- .../subscriber/ContentTypeSubscriber.java | 5 +- .../CustomConfigurationSubscriber.java | 4 + .../subscriber/CustomHeaderSubscriber.java | 4 + .../subscriber/ErrorHandlingSubscriber.java | 4 + .../pubsub/subscriber/MessageProcessor.java | 19 + .../pubsub/subscriber/ReactiveSubscriber.java | 59 +++ .../pubsub/subscriber/SimpleSubscriber.java | 4 + .../micronaut/gcp/pubsub/support/Animal.java | 3 + .../AcknowledgementSubscriberTest.java | 137 +++++++ .../subscriber/ContentTypeSubscriberTest.java | 106 +++++ .../subscriber/ReactiveSubscriberTest.java | 116 ++++++ .../src/test/resources/application-test.yml | 19 + test-suite/src/test/resources/logback.xml | 15 + 65 files changed, 2571 insertions(+), 93 deletions(-) create mode 100644 buildSrc/src/main/groovy/io.micronaut.build.internal.gcp-testsuite.gradle create mode 100644 gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/bind/ReactiveConsumerSpec.groovy create mode 100644 src/main/docs/guide/pubsub/pullConsumer/subscriberReactive.adoc create mode 100644 test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.groovy create mode 100644 test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.groovy create mode 100644 test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.groovy create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriberSpec.groovy create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriberSpec.groovy create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriberSpec.groovy create mode 100644 test-suite-groovy/src/test/resources/application-test.yml create mode 100644 test-suite-groovy/src/test/resources/logback.xml create mode 100644 test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.kt create mode 100644 test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.kt create mode 100644 test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.kt create mode 100644 test-suite-kotlin/src/test/groovy/subscriber/AcknowledgementSubscriberSpec.groovy create mode 100644 test-suite-kotlin/src/test/groovy/subscriber/ContentTypeSubscriberSpec.groovy create mode 100644 test-suite-kotlin/src/test/groovy/subscriber/ReactiveSubscriberSpec.groovy create mode 100644 test-suite-kotlin/src/test/resources/application-test.yml create mode 100644 test-suite-kotlin/src/test/resources/logback.xml create mode 100644 test-suite/src/main/java/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.java create mode 100644 test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.java create mode 100644 test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.java create mode 100644 test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriberTest.java create mode 100644 test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriberTest.java create mode 100644 test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriberTest.java create mode 100644 test-suite/src/test/resources/application-test.yml create mode 100644 test-suite/src/test/resources/logback.xml diff --git a/buildSrc/src/main/groovy/io.micronaut.build.internal.gcp-testsuite.gradle b/buildSrc/src/main/groovy/io.micronaut.build.internal.gcp-testsuite.gradle new file mode 100644 index 000000000..1d5656af6 --- /dev/null +++ b/buildSrc/src/main/groovy/io.micronaut.build.internal.gcp-testsuite.gradle @@ -0,0 +1,29 @@ +plugins { + id "io.micronaut.build.internal.gcp-base" + id "io.micronaut.test-resources" +} + +tasks.withType(Test) { + useJUnitPlatform() +} + +dependencies { + annotationProcessor(mn.micronaut.inject.java) + + implementation(projects.micronautGcpFunctionHttp) + implementation(projects.micronautGcpPubsub) + implementation(projects.micronautGcpSecretManager) + implementation(mn.reactor) + implementation mnSerde.micronaut.serde.jackson + implementation mnJacksonXml.micronaut.jackson.xml + + compileOnly(libs.managed.functions.framework.api) + + testRuntimeOnly(libs.junit.jupiter.engine) + testRuntimeOnly mnLogging.logback.classic + testRuntimeOnly mn.snakeyaml +} + +micronaut { + importMicronautPlatform.set(false) +} diff --git a/gcp-function-http/build.gradle b/gcp-function-http/build.gradle index cde1b3b6d..a14dc4aae 100644 --- a/gcp-function-http/build.gradle +++ b/gcp-function-http/build.gradle @@ -16,7 +16,7 @@ dependencies { testAnnotationProcessor(mnSerde.micronaut.serde.processor) testImplementation(mnSerde.micronaut.serde.jackson) - testImplementation(mnRxjava2.micronaut.rxjava2) + testImplementation(mnRxjava3.micronaut.rxjava3) testAnnotationProcessor(mn.micronaut.inject.java) testImplementation(libs.managed.functions.framework.api) diff --git a/gcp-function-http/src/test/java/io/micronaut/gcp/function/http/ReactiveController.java b/gcp-function-http/src/test/java/io/micronaut/gcp/function/http/ReactiveController.java index 233585e7f..9ddd8fb6c 100644 --- a/gcp-function-http/src/test/java/io/micronaut/gcp/function/http/ReactiveController.java +++ b/gcp-function-http/src/test/java/io/micronaut/gcp/function/http/ReactiveController.java @@ -3,7 +3,7 @@ import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Post; -import io.reactivex.Flowable; +import io.reactivex.rxjava3.core.Flowable; @Controller("/reactive") public class ReactiveController { diff --git a/gcp-pubsub/build.gradle b/gcp-pubsub/build.gradle index 95a292d24..a6d158738 100644 --- a/gcp-pubsub/build.gradle +++ b/gcp-pubsub/build.gradle @@ -13,7 +13,8 @@ dependencies { testAnnotationProcessor(mn.micronaut.inject.java) testRuntimeOnly(mn.micronaut.discovery.core) - testImplementation(mnRxjava2.micronaut.rxjava2) + testImplementation(mnReactor.micronaut.reactor) + testImplementation(mnRxjava3.micronaut.rxjava3) testImplementation libs.testcontainers.spock testImplementation(mn.micronaut.http.server.netty) testImplementation(testFixtures(project(":micronaut-gcp-common"))) diff --git a/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/bind/PubSubBodyBinder.java b/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/bind/PubSubBodyBinder.java index 5f383ff11..f9099d9d9 100644 --- a/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/bind/PubSubBodyBinder.java +++ b/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/bind/PubSubBodyBinder.java @@ -16,15 +16,20 @@ package io.micronaut.gcp.pubsub.bind; import com.google.pubsub.v1.PubsubMessage; +import io.micronaut.core.async.publisher.Publishers; import io.micronaut.core.convert.ArgumentConversionContext; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.core.convert.MutableConversionService; import io.micronaut.core.type.Argument; import io.micronaut.core.util.StringUtils; import io.micronaut.gcp.pubsub.exception.PubSubListenerException; import io.micronaut.gcp.pubsub.serdes.PubSubMessageSerDes; import io.micronaut.gcp.pubsub.serdes.PubSubMessageSerDesRegistry; - import io.micronaut.messaging.annotation.MessageBody; +import jakarta.inject.Inject; import jakarta.inject.Singleton; +import reactor.core.publisher.Mono; + import java.util.Optional; /** @@ -37,9 +42,33 @@ @Singleton public class PubSubBodyBinder implements PubSubAnnotatedArgumentBinder { + private final ConversionService conversionService; + private final PubSubMessageSerDesRegistry serDesRegistry; + /** + * Constructs a PubSub body binder instance. + * + * @deprecated An instance of {@link ConversionService} is needed for binding the full range of supported + * types (including reactive) to PubSub subscriber methods.

{@link #PubSubBodyBinder(ConversionService, PubSubMessageSerDesRegistry)} should be used instead. + * + * @param serDesRegistry the SerDe registry + */ + @Deprecated(since = "5.2.0", forRemoval = true) public PubSubBodyBinder(PubSubMessageSerDesRegistry serDesRegistry) { + this.conversionService = MutableConversionService.create(); + this.serDesRegistry = serDesRegistry; + } + + /** + * Constructs a PubSub body binder instance. + * + * @param conversionService the conversion service + * @param serDesRegistry the SerDe registry + */ + @Inject + public PubSubBodyBinder(ConversionService conversionService, PubSubMessageSerDesRegistry serDesRegistry) { + this.conversionService = conversionService; this.serDesRegistry = serDesRegistry; } @@ -50,7 +79,10 @@ public Class getAnnotationType() { @Override public BindingResult bind(ArgumentConversionContext context, PubSubConsumerState state) { - Argument bodyType = context.getArgument(); + boolean isPublisher = Publishers.isConvertibleToPublisher(context.getArgument().getType()); + Argument bodyType = isPublisher ? + context.getArgument().getFirstTypeVariable().orElseThrow(() -> new PubSubListenerException("Could not determine publisher's argument type for PubSub message deserialization")) : + context.getArgument(); Object result = null; if (bodyType.getType().equals(byte[].class)) { result = state.getPubsubMessage().getData().toByteArray(); @@ -58,13 +90,18 @@ public BindingResult bind(ArgumentConversionContext context, Pub result = state.getPubsubMessage(); } else { if (StringUtils.isEmpty(state.getContentType()) && !state.getPubsubMessage().containsAttributes("Content-Type")) { - throw new PubSubListenerException("Could not detect Content-Type header at message and no Content-Type specified on method."); + throw new PubSubListenerException("Could not detect Content-Type header at message and no Content-Type specified on method."); } PubSubMessageSerDes serDes = serDesRegistry.find(state.getContentType()) - .orElseThrow(() -> new PubSubListenerException("Could not locate a valid SerDes implementation for type: " + state.getContentType())); + .orElseThrow(() -> new PubSubListenerException("Could not locate a valid SerDes implementation for type: " + state.getContentType())); result = serDes.deserialize(state.getPubsubMessage().getData().toByteArray(), bodyType); } - Object finalResult = result; - return () -> Optional.ofNullable(finalResult); + + if (isPublisher && result.getClass().isArray()) { + result = Mono.just(result); + } + + Optional finalResult = conversionService.convert(result, context); + return () -> finalResult; } } diff --git a/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/bind/PubSubDefaultArgumentBinder.java b/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/bind/PubSubDefaultArgumentBinder.java index 636e49894..8a9b08993 100644 --- a/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/bind/PubSubDefaultArgumentBinder.java +++ b/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/bind/PubSubDefaultArgumentBinder.java @@ -20,7 +20,7 @@ import jakarta.inject.Singleton; /** - * Default body binder of PubSub consumers. If no @{@link io.micronaut.messaging.annotation.Body} arguments are annotated. + * Default body binder of PubSub consumers. If no @{@link io.micronaut.messaging.annotation.MessageBody} arguments are annotated. * Delegates to {@link PubSubBodyBinder} * @author Vinicius Carvalho */ diff --git a/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/intercept/PubSubConsumerAdvice.java b/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/intercept/PubSubConsumerAdvice.java index a1d75e1b1..24c36c893 100644 --- a/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/intercept/PubSubConsumerAdvice.java +++ b/gcp-pubsub/src/main/java/io/micronaut/gcp/pubsub/intercept/PubSubConsumerAdvice.java @@ -22,8 +22,10 @@ import io.micronaut.context.BeanContext; import io.micronaut.context.processor.ExecutableMethodProcessor; import io.micronaut.core.annotation.AnnotationValue; +import io.micronaut.core.async.publisher.Publishers; import io.micronaut.core.bind.BoundExecutable; import io.micronaut.core.bind.DefaultExecutableBinder; +import io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException; import io.micronaut.core.convert.ConversionService; import io.micronaut.core.util.StringUtils; import io.micronaut.gcp.GoogleCloudConfiguration; @@ -49,10 +51,13 @@ import jakarta.annotation.PreDestroy; import jakarta.inject.Qualifier; import jakarta.inject.Singleton; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import java.util.Arrays; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,6 +78,7 @@ public class PubSubConsumerAdvice implements ExecutableMethodProcessor beanDefinition, ExecutableMethod method) { if (beanDefinition.hasDeclaredAnnotation(PubSubListener.class)) { AnnotationValue subscriptionAnnotation = method.getAnnotation(Subscription.class); @@ -134,27 +142,14 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met projectSubscriptionName, contentType); boolean autoAcknowledge = !hasAckArg; try { - BoundExecutable executable = null; - try { - executable = binder.bind(method, binderRegistry, consumerState); - } catch (Exception ex) { - handleException(new PubSubMessageReceiverException("Error binding message to the method", ex, bean, consumerState, autoAcknowledge)); - } - executable.invoke(bean); // Discard result - if (autoAcknowledge) { // if manual ack is not specified we auto ack message after method execution - pubSubAcknowledgement.ack(); - } else { - Optional boundAck = Arrays - .stream(executable.getBoundArguments()) - .filter(o -> (o instanceof DefaultPubSubAcknowledgement)) - .findFirst(); - if (boundAck.isPresent()) { - DefaultPubSubAcknowledgement manualAck = (DefaultPubSubAcknowledgement) boundAck.get(); - if (!manualAck.isClientAck()) { - logger.warn("Method {} was executed and no message acknowledge detected. Did you forget to invoke ack()/nack()?", method.getName()); - } - } - } + @SuppressWarnings("rawtypes") + BoundExecutable executable = binder.bind(method, binderRegistry, consumerState); + Flux resultPublisher = resultAsFlux(Objects.requireNonNull(executable).invoke(bean)); + resultPublisher.subscribe(data -> { }, //no-op + ex -> handleException(new PubSubMessageReceiverException("Error handling message", ex, bean, consumerState, autoAcknowledge)), + autoAcknowledge ? pubSubAcknowledgement::ack : () -> this.verifyManualAcknowledgment(executable, method.getName())); + } catch (UnsatisfiedArgumentException e) { + handleException(new PubSubMessageReceiverException("Error binding message to the method", e, bean, consumerState, autoAcknowledge)); } catch (Exception e) { handleException(new PubSubMessageReceiverException("Error handling message", e, bean, consumerState, autoAcknowledge)); } @@ -162,11 +157,23 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met try { this.subscriberFactory.createSubscriber(new SubscriberFactoryConfig(projectSubscriptionName, receiver, configuration, pubSubConfigurationProperties.getSubscribingExecutor())); } catch (Exception e) { - throw new PubSubListenerException("Failed to create subscriber", e); + throw new PubSubListenerException("Failed to create subscriber for %s with subscription method %s".formatted(beanDefinition.getBeanType(), method.getName()), e); } } } + } + private void verifyManualAcknowledgment(@SuppressWarnings("rawtypes") BoundExecutable executable, String methodName) { + Optional boundAck = Arrays + .stream(executable.getBoundArguments()) + .filter(o -> (o instanceof DefaultPubSubAcknowledgement)) + .findFirst(); + if (boundAck.isPresent()) { + DefaultPubSubAcknowledgement manualAck = (DefaultPubSubAcknowledgement) boundAck.get(); + if (!manualAck.isClientAck()) { + logger.warn("Method {} was executed and no message acknowledge detected. Did you forget to invoke ack()/nack()?", methodName); + } + } } @PreDestroy @@ -182,4 +189,12 @@ private void handleException(PubSubMessageReceiverException ex) { } } + @SuppressWarnings("unchecked") + private Flux resultAsFlux(T result) { + if (!Publishers.isConvertibleToPublisher(result)) { + return Flux.empty(); + } + return Flux.from(Publishers.convertPublisher(conversionService, result, Publisher.class)); + } + } diff --git a/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/bind/PubSubBinderSpec.groovy b/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/bind/PubSubBinderSpec.groovy index d4de88ad3..f7b158928 100644 --- a/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/bind/PubSubBinderSpec.groovy +++ b/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/bind/PubSubBinderSpec.groovy @@ -13,12 +13,86 @@ import io.micronaut.gcp.pubsub.annotation.MessageId import io.micronaut.inject.BeanDefinition import io.micronaut.inject.ExecutableMethod import io.micronaut.messaging.Acknowledgement +import io.micronaut.serde.annotation.Serdeable +import io.reactivex.rxjava3.core.Flowable +import jakarta.inject.Singleton +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import spock.lang.Specification -import jakarta.inject.Singleton +import java.nio.charset.StandardCharsets class PubSubBinderSpec extends Specification{ + String animalJson = """ + { + "name" : "dog" + } + """ + + void "can bind to supported message body types"(String methodName, Class argType) { + ApplicationContext applicationContext = ApplicationContext.run(["spec.name" : getClass().simpleName]) + TestBinderBean bean = applicationContext.getBean(TestBinderBean) + BeanDefinition beanDefinition = applicationContext.getBeanDefinition(TestBinderBean) + ExecutableMethod method = beanDefinition.findMethod(methodName, argType).get() + PubSubBinderRegistry binderRegistry = applicationContext.getBean(PubSubBinderRegistry) + DefaultExecutableBinder binder = new DefaultExecutableBinder<>() + AckReplyConsumer ackReplyConsumer = Mock(AckReplyConsumer) + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("test-project","test-subscription") + PubsubMessage message = PubsubMessage.newBuilder().setData(ByteString.copyFrom(animalJson.getBytes())).setMessageId("1234").build() + PubSubConsumerState consumerState = new PubSubConsumerState(message, ackReplyConsumer, subscriptionName, "application/json") + BoundExecutable executable = binder.bind(method, binderRegistry, consumerState) + + when: + executable.invoke(bean) + + then: + Map result = bean.dataHolder["receive"] + result["body"] != null && argType.isAssignableFrom(result["body"].getClass()) + verifyAnimalPayload(result["body"]) + + where: + methodName | argType + "bindByteArrayBody" | byte[] + "bindMonoByteArrayBody" | Mono + "bindFluxByteArrayBody" | Flux + "bindFlowableByteArrayBody" | Flowable + "bindPubsubMessageBody" | PubsubMessage + "bindMonoPubsubMessageBody" | Mono + "bindFluxPubsubMessageBody" | Flux + "bindFlowablePubsubMessageBody" | Flowable + "bindPojoBody" | Animal + "bindMonoPojoBody" | Mono + "bindFluxPojoBody" | Flux + "bindFlowablePojoBody" | Flowable + } + + void verifyAnimalPayload(Object result) { + Object unwrappedResult; + if (result instanceof Publisher) { + unwrappedResult = Mono.from(result).block() + } else { + unwrappedResult = result + } + switch(unwrappedResult) { + case byte[]: + byte[] message = (byte[]) unwrappedResult + assert animalJson == new String(message, StandardCharsets.UTF_8) + break + case PubsubMessage: + PubsubMessage message = (PubsubMessage) unwrappedResult + assert animalJson == new String(message.getData().toByteArray(), StandardCharsets.UTF_8) + break + case Animal: + Animal message = (Animal) unwrappedResult + assert message.getName() == "dog" + break + default: + throw new IllegalStateException("Unhandled result type "+unwrappedResult.getClass()) + } + } + void "test messageId argument"() { ApplicationContext applicationContext = ApplicationContext.run(["spec.name" : getClass().simpleName]) TestBinderBean bean = applicationContext.getBean(TestBinderBean) @@ -103,4 +177,107 @@ class TestBinderBean { data["ack"] = ack dataHolder["receive"] = data } -} \ No newline at end of file + + @Executable + void bindByteArrayBody(byte[] body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindMonoByteArrayBody(Mono body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindFluxByteArrayBody(Flux body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindFlowableByteArrayBody(Flowable body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindPubsubMessageBody(PubsubMessage body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindMonoPubsubMessageBody(Mono body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindFluxPubsubMessageBody(Flux body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindFlowablePubsubMessageBody(Flowable body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindPojoBody(Animal body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindMonoPojoBody(Mono body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindFluxPojoBody(Flux body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } + + @Executable + void bindFlowablePojoBody(Flowable body) { + Map data = new HashMap<>() + data["body"] = body + dataHolder["receive"] = data + } +} + +@Serdeable +final class Animal { + private String name; + + Animal(String name) { + this.name = name; + } + + Animal() { } + + String getName() { + return name; + } + + void setName(String name) { + this.name = name; + } +} diff --git a/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/bind/ReactiveConsumerSpec.groovy b/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/bind/ReactiveConsumerSpec.groovy new file mode 100644 index 000000000..e3398de12 --- /dev/null +++ b/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/bind/ReactiveConsumerSpec.groovy @@ -0,0 +1,374 @@ +package io.micronaut.gcp.pubsub.bind + +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.gcp.pubsub.AbstractConsumerSpec +import io.micronaut.gcp.pubsub.MockPubSubEngine +import io.micronaut.gcp.pubsub.annotation.PubSubClient +import io.micronaut.gcp.pubsub.annotation.PubSubListener +import io.micronaut.gcp.pubsub.annotation.Subscription +import io.micronaut.gcp.pubsub.annotation.Topic +import io.micronaut.messaging.Acknowledgement +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import io.reactivex.rxjava3.core.Flowable +import jakarta.inject.Inject +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import spock.util.concurrent.PollingConditions + +import java.time.Duration +import java.util.concurrent.atomic.AtomicBoolean + +@MicronautTest +@Property(name = "spec.name", value = "ReactiveConsumerSpec") +@Property(name = "gcp.projectId", value = "test-project") +class ReactiveConsumerSpec extends AbstractConsumerSpec { + + @Inject + TestMessagePublisher publisher; + + @Inject + ReactiveConsumer consumer; + + @Inject + MockPubSubEngine mockPubSubEngine + + void setup() { + consumer.msg = null + consumer.finished.set(false) + } + + void verifyAck() { + def conditions = new PollingConditions(timeout: 5); + conditions.eventually { + def message = consumer.msg + assert mockPubSubEngine.acknowledgements.containsKey(message) + assert mockPubSubEngine.acknowledgements.get(message) == MockPubSubEngine.ACK + } + assert consumer.finished.get() + } + + void verifyNack() { + def conditions = new PollingConditions(timeout: 5); + conditions.eventually { + def message = consumer.msg + assert mockPubSubEngine.acknowledgements.containsKey(message) + assert mockPubSubEngine.acknowledgements.get(message) == MockPubSubEngine.NACK + } + assert consumer.finished.get() + } + + void "a message is auto-acknowledged when a returned publisher completes successfully"() { + given: + publisher."$publisherMethod"(payload.getBytes()) + + expect: + verifyAck() + + where: + payload | publisherMethod + "ping-mono-result" | "publishMessageMonoResult" + "ping-flux-result" | "publishMessageFluxResult" + "ping-flowable-result" | "publishMessageFlowableResult" + } + + void "a message is auto-nacked when a returned publisher completes with an error"() { + given: + publisher."$publisherMethod"(payload.getBytes()) + + expect: + verifyNack() + + where: + payload | publisherMethod + "ping-mono-result-error" | "publishMessageMonoResultError" + "ping-flux-result-error" | "publishMessageFluxResultError" + "ping-flowable-result-error" | "publishMessageFlowableResultError" + } + + void "a message can be manually acknowledged when returning a publisher"() { + given: + publisher."$publisherMethod"(payload.getBytes()) + + expect: + verifyAck() + + where: + payload | publisherMethod + "ping-mono-result-manual-ack" | "publishMessageMonoResultManualAck" + "ping-flux-result-manual-ack" | "publishMessageFluxResultManualAck" + "ping-flowable-result-manual-ack" | "publishMessageFlowableResultManualAck" + } + + void "a message can be manually nacked when returning a publisher"() { + given: + publisher."$publisherMethod"(payload.getBytes()) + + expect: + verifyNack() + + where: + payload | publisherMethod + "ping-mono-result-manual-nack" | "publishMessageMonoResultManualNack" + "ping-flux-result-manual-nack" | "publishMessageFluxResultManualNack" + "ping-flowable-result-manual-nack" | "publishMessageFlowableResultManualNack" + } + + void "a message can be consumed as a reactive type and auto-acknowledged"() { + given: + publisher."$publisherMethod"(payload.getBytes()) + + expect: + verifyAck() + + where: + payload | publisherMethod + "ping-mono-payload-and-result" | "publishMessageMonoPayloadAndResult" + "ping-flux-payload-and-result" | "publishMessageFluxPayloadAndResult" + "ping-flowable-payload-and-result" | "publishMessageFlowablePayloadAndResult" + } + + void "a message can be consumed as a reactive type and auto-nacked on error"() { + given: + publisher."$publisherMethod"(payload.getBytes()) + + expect: + verifyNack() + + where: + payload | publisherMethod + "ping-mono-payload-and-result-error" | "publishMessageMonoPayloadAndResultError" + "ping-flux-payload-and-result-error" | "publishMessageFluxPayloadAndResultError" + "ping-flowable-payload-and-result-error" | "publishMessageFlowablePayloadAndResultError" + } + + void "a message can be consumed as a reactive type and manually acknowledged"() { + given: + publisher."$publisherMethod"(payload.getBytes()) + + expect: + verifyAck() + + where: + payload | publisherMethod + "ping-mono-payload-and-result-manual-ack" | "publishMessageMonoPayloadAndResultManualAck" + "ping-flux-payload-and-result-manual-ack" | "publishMessageFluxPayloadAndResultManualAck" + "ping-flowable-payload-and-result-manual-ack" | "publishMessageFlowablePayloadAndResultManualAck" + } + + void "a message can be consumed as a reactive type and manually nacked"() { + given: + publisher."$publisherMethod"(payload.getBytes()) + + expect: + verifyNack() + + where: + payload | publisherMethod + "ping-mono-payload-and-result-manual-nack" | "publishMessageMonoPayloadAndResultManualNack" + "ping-flux-payload-and-result-manual-nack" | "publishMessageFluxPayloadAndResultManualNack" + "ping-flowable-payload-and-result-manual-nack" | "publishMessageFlowablePayloadAndResultManualNack" + } +} + +@Requires(property = "spec.name", value = "ReactiveConsumerSpec") +@PubSubClient +interface TestMessagePublisher { + + @Topic("mono-result") void publishMessageMonoResult(byte[] message) + @Topic("flux-result") void publishMessageFluxResult(byte[] message) + @Topic("flowable-result") void publishMessageFlowableResult(byte[] message) + @Topic("mono-result-error") void publishMessageMonoResultError(byte[] message) + @Topic("flux-result-error") void publishMessageFluxResultError(byte[] message) + @Topic("flowable-result-error") void publishMessageFlowableResultError(byte[] message) + @Topic("mono-result-manual-ack") void publishMessageMonoResultManualAck(byte[] message) + @Topic("flux-result-manual-ack") void publishMessageFluxResultManualAck(byte[] message) + @Topic("flowable-result-manual-ack") void publishMessageFlowableResultManualAck(byte[] message) + @Topic("mono-result-manual-nack") void publishMessageMonoResultManualNack(byte[] message) + @Topic("flux-result-manual-nack") void publishMessageFluxResultManualNack(byte[] message) + @Topic("flowable-result-manual-nack") void publishMessageFlowableResultManualNack(byte[] message) + @Topic("mono-payload-and-result") void publishMessageMonoPayloadAndResult(byte[] message) + @Topic("flux-payload-and-result") void publishMessageFluxPayloadAndResult(byte[] message) + @Topic("flowable-payload-and-result") void publishMessageFlowablePayloadAndResult(byte[] message) + @Topic("mono-payload-and-result-error") void publishMessageMonoPayloadAndResultError(byte[] message) + @Topic("flux-payload-and-result-error") void publishMessageFluxPayloadAndResultError(byte[] message) + @Topic("flowable-payload-and-result-error") void publishMessageFlowablePayloadAndResultError(byte[] message) + @Topic("mono-payload-and-result-manual-ack") void publishMessageMonoPayloadAndResultManualAck(byte[] message) + @Topic("flux-payload-and-result-manual-ack") void publishMessageFluxPayloadAndResultManualAck(byte[] message) + @Topic("flowable-payload-and-result-manual-ack") void publishMessageFlowablePayloadAndResultManualAck(byte[] message) + @Topic("mono-payload-and-result-manual-nack") void publishMessageMonoPayloadAndResultManualNack(byte[] message) + @Topic("flux-payload-and-result-manual-nack") void publishMessageFluxPayloadAndResultManualNack(byte[] message) + @Topic("flowable-payload-and-result-manual-nack") void publishMessageFlowablePayloadAndResultManualNack(byte[] message) +} + +@Requires(property = "spec.name", value = "ReactiveConsumerSpec") +@PubSubListener +class ReactiveConsumer { + + PubsubMessage msg + AtomicBoolean finished = new AtomicBoolean(false) + + @Subscription("mono-result") + Mono onMessage1(PubsubMessage message) { + this.msg = message + return Mono.just(message).delayElement(Duration.ofSeconds(2)).then(Mono.just("success")).doOnTerminate { finished.set(true) } + } + + @Subscription("flux-result") + Flux onMessage2(PubsubMessage message) { + this.msg = message + return Flux.just(message).thenMany(Flux.just("1", "2")).delayElements(Duration.ofSeconds(1)).doOnTerminate { finished.set(true) }; + } + + @Subscription("flowable-result") + Flowable onMessage2Rx(PubsubMessage message) { + return Flowable.fromPublisher(onMessage2(message)) + } + + @Subscription("mono-result-error") + Mono onMessage3(PubsubMessage message) { + this.msg = message + return Mono.just(message).delayElement(Duration.ofSeconds(2)).then(Mono.error(new RuntimeException("Message processing error"))).doOnTerminate { finished.set(true) } + } + + @Subscription("flux-result-error") + Flux onMessage4(PubsubMessage message) { + this.msg = message + return Flux.just(message).thenMany(Flux.just("1", "2")).delayElements(Duration.ofSeconds(1)).thenMany(Flux.error(new RuntimeException("Message processing error"))).doOnTerminate { finished.set(true) }; + } + + @Subscription("flowable-result-error") + Flowable onMessage4Rx(PubsubMessage message) { + return Flowable.fromPublisher(onMessage4(message)) + } + + @Subscription("mono-result-manual-ack") + Mono onMessage5(PubsubMessage message, Acknowledgement acknowledgement) { + this.msg = message + return Mono.just(message).delayElement(Duration.ofSeconds(2)).then(Mono.just("success")) + .doOnSuccess { + finished.set(true) + acknowledgement.ack() + } + } + + @Subscription("flux-result-manual-ack") + Flux onMessage6(PubsubMessage message, Acknowledgement acknowledgement) { + this.msg = message + return Flux.just(message).thenMany(Flux.just("1", "2")).delayElements(Duration.ofSeconds(1)) + .doOnComplete(() -> { + finished.set(true) + acknowledgement.ack() + }) + } + + @Subscription("flowable-result-manual-ack") + Flowable onMessage6Rx(PubsubMessage message, Acknowledgement acknowledgement) { + return Flowable.fromPublisher(onMessage6(message, acknowledgement)) + } + + @Subscription("mono-result-manual-nack") + Mono onMessage7(PubsubMessage message, Acknowledgement acknowledgement) { + this.msg = message + return Mono.just(message).delayElement(Duration.ofSeconds(2)).then(Mono.just("success")).doOnSuccess { + finished.set(true) + acknowledgement.nack() + } + } + + @Subscription("flux-result-manual-nack") + Flux onMessage8(PubsubMessage message, Acknowledgement acknowledgement) { + this.msg = message + return Flux.just(message).thenMany(Flux.just("1", "2")).delayElements(Duration.ofSeconds(1)) + .doOnComplete(() -> { + finished.set(true) + acknowledgement.nack() + }) + } + + @Subscription("flowable-result-manual-nack") + Flowable onMessage8Rx(PubsubMessage message, Acknowledgement acknowledgement) { + return Flowable.fromPublisher(onMessage8(message, acknowledgement)) + } + + @Subscription("mono-payload-and-result") + Mono onMessage9(Mono message) { + return message.doOnNext {this.msg = it }.delayElement(Duration.ofSeconds(2)).then(Mono.just("success")).doOnTerminate { finished.set(true) } + } + + @Subscription("flux-payload-and-result") + Flux onMessage10(Flux message) { + return message.doOnNext { + this.msg = it + }.thenMany(Flux.just("1", "2")).delayElements(Duration.ofSeconds(1)).doOnTerminate { finished.set(true) }; + } + + @Subscription("flowable-payload-and-result") + Flowable onMessage10Rx(Flowable message) { + return Flowable.fromPublisher(onMessage10(Flux.from(message))) + } + + @Subscription("mono-payload-and-result-error") + Mono onMessage11(Mono message) { + return message.doOnNext {this.msg = it }.delayElement(Duration.ofSeconds(2)).then(Mono.error(new RuntimeException("Message processing error"))).doOnTerminate { finished.set(true) } + } + + @Subscription("flux-payload-and-result-error") + Flux onMessage12(Flux message) { + return message.doOnNext { + this.msg = it + }.thenMany(Flux.just("1", "2")).delayElements(Duration.ofSeconds(1)).thenMany(Flux.error(new RuntimeException("Message processing error"))).doOnTerminate { finished.set(true) }; + } + + @Subscription("flowable-payload-and-result-error") + Flowable onMessage12Rx(Flowable message) { + return Flowable.fromPublisher(onMessage12(Flux.from(message))) + } + + @Subscription("mono-payload-and-result-manual-ack") + Mono onMessage13(Mono message, Acknowledgement acknowledgement) { + return message.doOnNext {this.msg = it }.delayElement(Duration.ofSeconds(2)).then(Mono.just("success")) + .doOnSuccess { + finished.set(true) + acknowledgement.ack() + } + } + + @Subscription("flux-payload-and-result-manual-ack") + Flux onMessage14(Flux message, Acknowledgement acknowledgement) { + return message.doOnNext {this.msg = it }.thenMany(Flux.just("1", "2")).delayElements(Duration.ofSeconds(1)) + .doOnComplete { + finished.set(true) + acknowledgement.ack() + } + } + + @Subscription("flowable-payload-and-result-manual-ack") + Flowable onMessage14Rx(Flowable message, Acknowledgement acknowledgement) { + return Flowable.fromPublisher(onMessage14(Flux.from(message), acknowledgement)) + } + + @Subscription("mono-payload-and-result-manual-nack") + Mono onMessage15(Mono message, Acknowledgement acknowledgement) { + return message.doOnNext {this.msg = it }.delayElement(Duration.ofSeconds(2)).then(Mono.just("success")) + .doOnSuccess { + finished.set(true) + acknowledgement.nack() + } + } + + @Subscription("flux-payload-and-result-manual-nack") + Flux onMessage16(Flux message, Acknowledgement acknowledgement) { + return message.doOnNext {this.msg = it }.thenMany(Flux.just("1", "2")).delayElements(Duration.ofSeconds(1)) + .doOnComplete { + finished.set(true) + acknowledgement.nack() + } + } + + @Subscription("flowable-payload-and-result-manual-nack") + Flowable onMessage16Rx(Flowable message, Acknowledgement acknowledgement) { + return Flowable.fromPublisher(onMessage16(Flux.from(message), acknowledgement)) + } +} diff --git a/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/support/PublisherIntroductionAdviceSpec.groovy b/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/support/PublisherIntroductionAdviceSpec.groovy index 69310f307..e1114f523 100644 --- a/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/support/PublisherIntroductionAdviceSpec.groovy +++ b/gcp-pubsub/src/test/groovy/io/micronaut/gcp/pubsub/support/PublisherIntroductionAdviceSpec.groovy @@ -15,7 +15,7 @@ import io.micronaut.messaging.annotation.MessageHeader import io.micronaut.messaging.annotation.MessageHeaders import io.micronaut.serde.annotation.Serdeable import io.micronaut.test.extensions.spock.annotation.MicronautTest -import io.reactivex.Single +import io.reactivex.rxjava3.core.Single import jakarta.inject.Inject @MicronautTest diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c3ff6affd..5a06d1b96 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -25,9 +25,14 @@ testcontainers = "1.18.3" zipkin-sender-stackdriver = "1.0.4" system-stubs-core = "2.1.3" +awaitility = '4.2.0' +mockito = '5.6.0' + +micronaut-jackson-xml = "4.0.1" micronaut-logging = "1.0.0" -micronaut-rxjava2 = "2.0.1" -micronaut-serde = "2.2.6" +micronaut-reactor = "3.0.2" +micronaut-rxjava3 = "3.0.1" +micronaut-serde = "2.2.5" micronaut-servlet = "4.0.3" micronaut-tracing = "5.0.2" micronaut-test = "4.0.1" @@ -42,7 +47,9 @@ micronaut-gradle-plugin = "4.0.3" micronaut-core = { module = 'io.micronaut:micronaut-core-bom', version.ref = 'micronaut' } # micronaut boms -micronaut-rxjava2 = { module = "io.micronaut.rxjava2:micronaut-rxjava2-bom", version.ref = "micronaut-rxjava2" } +micronaut-jackson-xml = { module = "io.micronaut.xml:micronaut-jackson-xml-bom", version.ref = "micronaut-jackson-xml" } +micronaut-reactor = { module = "io.micronaut.reactor:micronaut-reactor-bom", version.ref = "micronaut-reactor" } +micronaut-rxjava3 = { module = "io.micronaut.rxjava3:micronaut-rxjava3-bom", version.ref = "micronaut-rxjava3" } micronaut-serde = { module = "io.micronaut.serde:micronaut-serde-bom", version.ref = "micronaut-serde" } micronaut-servlet = { module = "io.micronaut.servlet:micronaut-servlet-bom", version.ref = "micronaut-servlet" } micronaut-tracing = { module = "io.micronaut.tracing:micronaut-tracing-bom", version.ref = "micronaut-tracing" } @@ -74,6 +81,8 @@ kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = logback-json-classic = { module = "ch.qos.logback.contrib:logback-json-classic", version.ref = "logback-json-classic" } testcontainers-spock = { module = "org.testcontainers:spock", version.ref = "testcontainers" } zipkin-sender-stackdriver = { module = "io.zipkin.gcp:zipkin-sender-stackdriver", version.ref = "zipkin-sender-stackdriver" } +awaitility = { module = 'org.awaitility:awaitility', version.ref = 'awaitility' } +mockito = { module = 'org.mockito:mockito-core', version.ref = 'mockito' } system-stubs-core = { module = "uk.org.webcompere:system-stubs-core", version.ref = "system-stubs-core" } # Plugins diff --git a/settings.gradle b/settings.gradle index 66de39940..165eb578f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -36,7 +36,9 @@ micronautBuild { useStandardizedProjectNames=true importMicronautCatalog() - importMicronautCatalog("micronaut-rxjava2") + importMicronautCatalog("micronaut-jackson-xml") + importMicronautCatalog("micronaut-reactor") + importMicronautCatalog("micronaut-rxjava3") importMicronautCatalog("micronaut-serde") importMicronautCatalog("micronaut-servlet") importMicronautCatalog("micronaut-tracing") diff --git a/src/main/docs/guide/pubsub/pullConsumer/subscriberReactive.adoc b/src/main/docs/guide/pubsub/pullConsumer/subscriberReactive.adoc new file mode 100644 index 000000000..417e2e14b --- /dev/null +++ b/src/main/docs/guide/pubsub/pullConsumer/subscriberReactive.adoc @@ -0,0 +1,14 @@ +In addition to byte[], PubsubMessage, and POJOs you can also define listener methods that receive a Reactive type such as a Reactor reactor:Mono[] or a RxJava rx:Single[]. The same deserialization rules as above will be applied using the type parameter of the Reactive type. + +For the conversion to Reactive types to work correctly, you must add either the library https://micronaut-projects.github.io/micronaut-reactor/latest/guide/[Micronaut Reactor] or https://micronaut-projects.github.io/micronaut-rxjava3/latest/guide/[Micronaut RxJava 3] to your application's dependencies. + +For example, using Reactor: + +.Using Reactive Types +snippet::io.micronaut.gcp.pubsub.subscriber.ReactiveSubscriber[tags="imports, clazz", source="main"] +<1> Bytes are copied and wrapped in a `Mono`, SerDes is bypassed, message id injected for usage +<2> SerDes is bypassed, `PubSubMessage` object is copied and wrapped in a `Mono`, no need to use `@MessageId` +<3> The framework will try to deserialize this payload into `Mono`. If no `Content-Type` header is found, will default to `application/json` +<4> Uses a custom SerDes and the framework will find a api:serdes.PubSubMessageSerDes[] that can handle `application/xml` and then pass the deserialized payload as `Mono`. + +Note that the above examples all return a `Mono` to allow for a fully non-blocking reactive message processing pipeline. When a `Publisher` is returned from a `@Subscription` method, it will be subscribed to by the framework and the message will not be auto-acknowledged until the `Publisher` completes successfully. If the `Publisher` completes with an error, the framework will `nack()` the message for re-delivery. diff --git a/src/main/docs/guide/toc.yml b/src/main/docs/guide/toc.yml index d14e63edc..ac8b997e3 100644 --- a/src/main/docs/guide/toc.yml +++ b/src/main/docs/guide/toc.yml @@ -24,6 +24,7 @@ pubsub: pullConsumer: title: Receiving messages via @PubSubListener methods subscriberContentType: Content-Type and message deserialization + subscriberReactive: Receiving and Returning Reactive Types subscriberHeaders: Message Headers subscriberProperties: Subscriber properties messageAcknowledge: Handling message acknowledgement diff --git a/test-suite-groovy/build.gradle b/test-suite-groovy/build.gradle index 1c38dd49f..8303918dc 100644 --- a/test-suite-groovy/build.gradle +++ b/test-suite-groovy/build.gradle @@ -1,14 +1,19 @@ plugins { id 'groovy' - id "io.micronaut.build.internal.gcp-tests" + id "io.micronaut.build.internal.gcp-testsuite" } dependencies { compileOnly(mn.micronaut.inject.groovy) compileOnly(mn.groovy) - implementation(projects.micronautGcpPubsub) - implementation(projects.micronautGcpFunctionHttp) - implementation(projects.micronautGcpSecretManager) - implementation(mn.reactor) - compileOnly(libs.managed.functions.framework.api) + testImplementation(platform(mn.micronaut.core.bom)) + testCompileOnly(mn.micronaut.inject.groovy) + testImplementation(mnTest.micronaut.test.spock) +} + +micronaut { + testResources { + // 10 minutes as this image is massive + clientTimeout = 600 + } } diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/quickstart/AnimalListener.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/quickstart/AnimalListener.groovy index e37d093cc..081704d0a 100644 --- a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/quickstart/AnimalListener.groovy +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/quickstart/AnimalListener.groovy @@ -13,12 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.gcp.pubsub.quickstart; +package io.micronaut.gcp.pubsub.quickstart + +import io.micronaut.context.annotation.Requires; //tag::imports[] import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.annotation.Subscription; // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") // tag::clazz[] @PubSubListener // <1> class AnimalListener { @@ -29,4 +34,4 @@ class AnimalListener { } } -// end::clazz[] \ No newline at end of file +// end::clazz[] diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.groovy new file mode 100644 index 000000000..d5bbab222 --- /dev/null +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.groovy @@ -0,0 +1,41 @@ +package io.micronaut.gcp.pubsub.serdes + +import com.fasterxml.jackson.dataformat.xml.XmlMapper +import io.micronaut.core.serialize.exceptions.SerializationException +import io.micronaut.core.type.Argument +import io.micronaut.http.MediaType +import jakarta.inject.Named +import jakarta.inject.Singleton + +@Singleton +class XmlMessageSerDes implements PubSubMessageSerDes { + + private final XmlMapper xmlMapper + + XmlMessageSerDes(@Named("xml") XmlMapper xmlMapper) { + this.xmlMapper = xmlMapper + } + + @Override + Object deserialize(byte[] data, Argument type) { + try { + return xmlMapper.readValue(data, type.getType()) + } catch (IOException e) { + throw new SerializationException("Failed to deserialize PubSub message as XML", e) + } + } + + @Override + byte[] serialize(Object data) { + try { + return xmlMapper.writeValueAsBytes(data) + } catch (IOException e) { + throw new SerializationException("Failed to serialize PubSub message as XML", e) + } + } + + @Override + String supportedType() { + return MediaType.APPLICATION_XML; + } +} diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.groovy index a1a8afcd9..a3a3f3d6f 100644 --- a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.groovy +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.groovy @@ -13,21 +13,49 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.gcp.pubsub.subscriber; -//tag::imports[] +package io.micronaut.gcp.pubsub.subscriber + +import io.micronaut.context.annotation.Requires import io.micronaut.gcp.pubsub.annotation.PubSubListener; -import io.micronaut.gcp.pubsub.annotation.Subscription; -import io.micronaut.gcp.pubsub.support.Animal; -import io.micronaut.messaging.Acknowledgement; +//tag::imports[] + +import io.micronaut.gcp.pubsub.annotation.Subscription +import io.micronaut.gcp.pubsub.support.Animal +import io.micronaut.messaging.Acknowledgement +import reactor.core.publisher.Mono + // end::imports[] +@Requires(property = "spec.name", value = "AcknowledgementSubscriberSpec") // tag::clazz[] @PubSubListener class AcknowledgementSubscriber { + MessageProcessor messageProcessor + + AcknowledgementSubscriber(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor + } + @Subscription("animals") - void onMessage(Animal animal, Acknowledgement acknowledgement) { // <1> + void onMessage(Animal animal, Acknowledgement acknowledgement) { + if (Boolean.TRUE == messageProcessor.handleAnimalMessage(animal).block()) { + acknowledgement.ack() + } else { + acknowledgement.nack() + } + } + @Subscription("animals-async") + Mono onReactiveMessage(Mono animal, Acknowledgement acknowledgement) { + return animal.flatMap(messageProcessor::handleAnimalMessage) + .doOnNext(result -> { + if (Boolean.TRUE == result) { + acknowledgement.ack() + } else { + acknowledgement.nack() + } + }) } } diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.groovy index 18e0c4732..667eae15c 100644 --- a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.groovy +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.groovy @@ -15,13 +15,15 @@ */ package io.micronaut.gcp.pubsub.subscriber; //tag::imports[] -import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Requires; import io.micronaut.gcp.pubsub.annotation.MessageId; import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.annotation.Subscription; import io.micronaut.gcp.pubsub.support.Animal; // end::imports[] +@Requires(property = "spec.name", value = "ContentTypeSubscriberSpec") // tag::clazz[] @PubSubListener class ContentTypeSubscriber { diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.groovy index a905d1572..4ad5a8f11 100644 --- a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.groovy +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.groovy @@ -13,13 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.gcp.pubsub.subscriber; +package io.micronaut.gcp.pubsub.subscriber + +import io.micronaut.context.annotation.Requires //tag::imports[] -import io.micronaut.gcp.pubsub.annotation.PubSubListener; -import io.micronaut.gcp.pubsub.annotation.Subscription; -import io.micronaut.gcp.pubsub.support.Animal; +import io.micronaut.gcp.pubsub.annotation.PubSubListener +import io.micronaut.gcp.pubsub.annotation.Subscription +import io.micronaut.gcp.pubsub.support.Animal //end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") //tag::clazz[] @PubSubListener class CustomConfigurationSubscriber { diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.groovy index b84a87397..315e20b31 100644 --- a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.groovy +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.groovy @@ -13,14 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.gcp.pubsub.subscriber; +package io.micronaut.gcp.pubsub.subscriber + +import io.micronaut.context.annotation.Requires //tag::imports[] -import io.micronaut.gcp.pubsub.annotation.PubSubListener; -import io.micronaut.gcp.pubsub.annotation.Subscription; +import io.micronaut.gcp.pubsub.annotation.PubSubListener +import io.micronaut.gcp.pubsub.annotation.Subscription import io.micronaut.gcp.pubsub.support.Animal -import io.micronaut.messaging.annotation.MessageHeader; +import io.micronaut.messaging.annotation.MessageHeader // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") // tag::clazz[] @PubSubListener class CustomHeaderSubscriber { diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.groovy index 480dd6e37..a9e33a72d 100644 --- a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.groovy +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.groovy @@ -14,6 +14,9 @@ * limitations under the License. */ package io.micronaut.gcp.pubsub.subscriber + +import io.micronaut.context.annotation.Requires + //tag::imports[] import io.micronaut.gcp.pubsub.annotation.PubSubListener import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException @@ -21,6 +24,9 @@ import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler import io.micronaut.gcp.pubsub.support.Animal // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") // tag::clazz[] @PubSubListener class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { // <1> diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.groovy new file mode 100644 index 000000000..f5a00c43b --- /dev/null +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.groovy @@ -0,0 +1,13 @@ +package io.micronaut.gcp.pubsub.subscriber + +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.gcp.pubsub.support.Animal +import reactor.core.publisher.Mono + +interface MessageProcessor { + default Mono handleByteArrayMessage(byte[] message) { return Mono.just(Boolean.TRUE) } + + default Mono handlePubSubMessage(PubsubMessage pubsubMessage) { return Mono.just(Boolean.TRUE) } + + default Mono handleAnimalMessage(Animal message) { return Mono.just(Boolean.TRUE) } +} diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.groovy new file mode 100644 index 000000000..16ade5a60 --- /dev/null +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.groovy @@ -0,0 +1,63 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.gcp.pubsub.subscriber + +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Requires; +//tag::imports[] + +import io.micronaut.gcp.pubsub.annotation.MessageId +import io.micronaut.gcp.pubsub.annotation.PubSubListener +import io.micronaut.gcp.pubsub.annotation.Subscription +import io.micronaut.gcp.pubsub.support.Animal +import reactor.core.publisher.Mono + +// end::imports[] + +@Requires(property = "spec.name", value = "ReactiveSubscriberSpec") +// tag::clazz[] +@PubSubListener +class ReactiveSubscriber { + + private final MessageProcessor messageProcessor + + ReactiveSubscriber(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor + } + + @Subscription("raw-subscription") // <1> + Mono receiveRaw(Mono data, @MessageId String id) { + return data.flatMap(messageProcessor::handleByteArrayMessage) + } + + @Subscription("native-subscription") // <2> + Mono receiveNative(Mono message) { + return message.flatMap(messageProcessor::handlePubSubMessage) + } + + @Subscription("animals") // <3> + Mono receivePojo(Mono animal, @MessageId String id) { + return animal.flatMap(messageProcessor::handleAnimalMessage) + } + + @Subscription(value = "animals-legacy", contentType = "application/xml") // <4> + Mono receiveXML(Mono animal, @MessageId String id) { + return animal.flatMap(messageProcessor::handleAnimalMessage) + } + +} +// end::clazz[] + diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.groovy b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.groovy index d11673a54..60ac7500e 100644 --- a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.groovy +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.groovy @@ -13,14 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.gcp.pubsub.subscriber; +package io.micronaut.gcp.pubsub.subscriber + +import io.micronaut.context.annotation.Requires //tag::imports[] -import io.micronaut.gcp.pubsub.annotation.PubSubListener; -import io.micronaut.gcp.pubsub.annotation.Subscription; -import io.micronaut.gcp.pubsub.support.Animal; +import io.micronaut.gcp.pubsub.annotation.PubSubListener +import io.micronaut.gcp.pubsub.annotation.Subscription +import io.micronaut.gcp.pubsub.support.Animal // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") // tag::clazz[] @PubSubListener // <1> class SimpleSubscriber { diff --git a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/support/Animal.java b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/support/Animal.java index a7023746e..01e60558b 100644 --- a/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/support/Animal.java +++ b/test-suite-groovy/src/main/groovy/io/micronaut/gcp/pubsub/support/Animal.java @@ -15,6 +15,9 @@ */ package io.micronaut.gcp.pubsub.support; +import io.micronaut.serde.annotation.Serdeable; + +@Serdeable public final class Animal { private String name; diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriberSpec.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriberSpec.groovy new file mode 100644 index 000000000..39c2a3791 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriberSpec.groovy @@ -0,0 +1,147 @@ +package io.micronaut.gcp.pubsub.subscriber + +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.gcp.pubsub.annotation.PubSubClient +import io.micronaut.gcp.pubsub.annotation.Topic +import io.micronaut.gcp.pubsub.bind.DefaultPubSubAcknowledgement +import io.micronaut.gcp.pubsub.support.Animal +import io.micronaut.messaging.Acknowledgement +import io.micronaut.test.annotation.MockBean +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import reactor.core.publisher.Mono +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +@MicronautTest +@Property(name = "spec.name", value = "AcknowledgementSubscriberSpec") +class AcknowledgementSubscriberSpec extends Specification { + + @Inject + TestPublisher publisher + + MessageProcessor messageProcessor + + @Inject + AcknowledgementSubscriber subscriber + + Object message + + Acknowledgement acknowledgement + + def setup() { + message = null + acknowledgement = null + } + + void "blocking subscriber with manual ack"() { + setup: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + messageProcessor.handleAnimalMessage(_ as Animal) >> Mono.just(Boolean.TRUE) + + when: + publisher.publishAnimal(dog) + + then: + conditions.eventually { + assert message != null + assert message instanceof Animal + assert (message as Animal).name == "dog" + assert acknowledgement instanceof DefaultPubSubAcknowledgement + assert (acknowledgement as DefaultPubSubAcknowledgement).isClientAck() + } + } + + void "blocking subscriber with manual nack"() { + setup: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + messageProcessor.handleAnimalMessage(_ as Animal) >>> [Mono.just(Boolean.FALSE), Mono.just(Boolean.TRUE)] + + when: + publisher.publishAnimal(dog) + + then: + conditions.eventually { + assert message != null + assert message instanceof Animal + assert (message as Animal).name == "dog" + assert acknowledgement instanceof DefaultPubSubAcknowledgement + assert (acknowledgement as DefaultPubSubAcknowledgement).isClientAck() + } + } + + void "async subscriber with manual ack"() { + setup: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + messageProcessor.handleAnimalMessage(_ as Animal) >> Mono.just(Boolean.TRUE) + + when: + publisher.publishAnimalAsync(dog) + + then: + conditions.eventually { + assert message != null + assert message instanceof Mono + assert acknowledgement instanceof DefaultPubSubAcknowledgement + assert (acknowledgement as DefaultPubSubAcknowledgement).isClientAck() + } + } + + void "async subscriber with manual nack"() { + setup: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + messageProcessor.handleAnimalMessage(_ as Animal) >>> [Mono.just(Boolean.FALSE), Mono.just(Boolean.TRUE)] + + when: + publisher.publishAnimalAsync(dog) + + then: + conditions.eventually { + assert message != null + assert message instanceof Mono + assert acknowledgement instanceof DefaultPubSubAcknowledgement + assert (acknowledgement as DefaultPubSubAcknowledgement).isClientAck() + } + } + + @MockBean(AcknowledgementSubscriber) + AcknowledgementSubscriber subscriberForTest() { + messageProcessor = Mock(MessageProcessor) + return new TestAcknowledgementSubscriber(messageProcessor) + } + + class TestAcknowledgementSubscriber extends AcknowledgementSubscriber { + + TestAcknowledgementSubscriber(MessageProcessor messageProcessor) { + super(messageProcessor) + } + + @Override + void onMessage(Animal animal, Acknowledgement ack) { + message = animal + acknowledgement = ack + super.onMessage(animal, ack) + } + + @Override + Mono onReactiveMessage(Mono animal, Acknowledgement ack) { + message = animal + acknowledgement = ack + return super.onReactiveMessage(animal, ack) + } + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "AcknowledgementSubscriberSpec") + static interface TestPublisher { + @Topic("animals") void publishAnimal(Animal animal) + @Topic("animals-async") void publishAnimalAsync(Animal animal) + } +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriberSpec.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriberSpec.groovy new file mode 100644 index 000000000..99e610ff0 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriberSpec.groovy @@ -0,0 +1,141 @@ +package io.micronaut.gcp.pubsub.subscriber + +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.gcp.pubsub.annotation.PubSubClient +import io.micronaut.gcp.pubsub.annotation.Topic +import io.micronaut.gcp.pubsub.support.Animal +import io.micronaut.http.MediaType +import io.micronaut.test.annotation.MockBean +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.nio.charset.StandardCharsets + +@MicronautTest +@Property(name = "spec.name", value = "ContentTypeSubscriberSpec") +class ContentTypeSubscriberSpec extends Specification { + + @Inject + TestPublisher publisher + + @Inject + ContentTypeSubscriber subscriber + + Object message + + def setup() { + message = null + } + + void "receive raw bytes"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8) + + when: + publisher.publishRaw(bytesSent) + + then: + conditions.eventually { + assert message != null + assert message instanceof byte[] + String decodedMessage = new String((message as byte[]), StandardCharsets.UTF_8) + assert "foo" == decodedMessage + } + } + + void "receive native message"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8) + + when: + publisher.publishNative(bytesSent) + + then: + conditions.eventually { + assert message != null + assert message instanceof PubsubMessage + String decodedMessage = (message as PubsubMessage).getData().toString(StandardCharsets.UTF_8) + assert "foo" == decodedMessage + } + } + + void "receive pojo message from json"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimal(dog) + + then: + conditions.eventually { + assert message != null + assert message instanceof Animal + assert "dog" == (message as Animal).getName() + } + } + + void "receive pojo message from xml"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimalAsXml(dog) + + then: + conditions.eventually { + assert message != null + assert message instanceof Animal + assert "dog" == (message as Animal).getName() + } + } + + @MockBean(ContentTypeSubscriber) + ContentTypeSubscriber testSubscriber() { + return new TestContentTypeSubscriber() + } + + class TestContentTypeSubscriber extends ContentTypeSubscriber { + @Override + void receiveRaw(byte[] data, String id) { + message = data + super.receiveRaw(data, id) + } + + @Override + void receiveNative(PubsubMessage pubsubMessage) { + message = pubsubMessage + super.receiveNative(pubsubMessage) + } + + @Override + void receivePojo(Animal animal, String id) { + message = animal + super.receivePojo(animal, id) + } + + @Override + void receiveXML(Animal animal, String id) { + message = animal + super.receiveXML(animal, id) + } + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "ContentTypeSubscriberSpec") + static interface TestPublisher { + @Topic("raw-subscription") void publishRaw(byte[] payload) + @Topic("native-subscription") void publishNative(byte[] payload) + @Topic("animals") void publishAnimal(Animal animal) + @Topic(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) void publishAnimalAsXml(Animal animal) + } +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriberSpec.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriberSpec.groovy new file mode 100644 index 000000000..4d5708160 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriberSpec.groovy @@ -0,0 +1,131 @@ +package io.micronaut.gcp.pubsub.subscriber + +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.gcp.pubsub.annotation.PubSubClient +import io.micronaut.gcp.pubsub.annotation.Topic +import io.micronaut.gcp.pubsub.support.Animal +import io.micronaut.http.MediaType +import io.micronaut.test.annotation.MockBean +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import reactor.core.publisher.Mono +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.nio.charset.StandardCharsets + +@MicronautTest +@Property(name = "spec.name", value = "ReactiveSubscriberSpec") +class ReactiveSubscriberSpec extends Specification { + + @Inject + TestPublisher publisher + + Object unwrappedResult + + def setup() { + unwrappedResult = null + } + + void "receive raw bytes"() { + given: + def conditions = new PollingConditions(timeout: 1) + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8) + + when: + publisher.publishRaw(bytesSent) + + then: + conditions.eventually { + assert unwrappedResult != null + assert unwrappedResult instanceof byte[] + String decodedMessage = new String((unwrappedResult as byte[]), StandardCharsets.UTF_8) + assert "foo" == decodedMessage + } + } + + void "receive native message"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8) + + when: + publisher.publishNative(bytesSent) + + then: + conditions.eventually { + assert unwrappedResult != null + assert unwrappedResult instanceof PubsubMessage + String decodedMessage = (unwrappedResult as PubsubMessage).getData().toString(StandardCharsets.UTF_8) + assert "foo" == decodedMessage + } + } + + void "receive pojo message from json"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimal(dog) + + then: + conditions.eventually { + assert unwrappedResult != null + assert unwrappedResult instanceof Animal + assert "dog" == (unwrappedResult as Animal).getName() + } + } + + void "receive pojo message from xml"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimalAsXml(dog) + + then: + conditions.eventually { + assert unwrappedResult != null + assert unwrappedResult instanceof Animal + assert "dog" == (unwrappedResult as Animal).getName() + } + } + + @MockBean(MessageProcessor.class) + MessageProcessor mockMessageProcessor() { + return new MessageProcessor() { + @Override + Mono handleByteArrayMessage(byte[] message) { + unwrappedResult = message + return MessageProcessor.super.handleByteArrayMessage(message) + } + + @Override + Mono handlePubSubMessage(PubsubMessage pubsubMessage) { + unwrappedResult = pubsubMessage + return MessageProcessor.super.handlePubSubMessage(pubsubMessage) + } + + @Override + Mono handleAnimalMessage(Animal message) { + unwrappedResult = message + return MessageProcessor.super.handleAnimalMessage(message) + } + } + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "ReactiveSubscriberSpec") + static interface TestPublisher { + @Topic("raw-subscription") void publishRaw(byte[] payload); + @Topic("native-subscription") void publishNative(byte[] payload); + @Topic("animals") void publishAnimal(Animal animal); + @Topic(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) void publishAnimalAsXml(Animal animal); + } +} diff --git a/test-suite-groovy/src/test/resources/application-test.yml b/test-suite-groovy/src/test/resources/application-test.yml new file mode 100644 index 000000000..b951cd16a --- /dev/null +++ b/test-suite-groovy/src/test/resources/application-test.yml @@ -0,0 +1,19 @@ +gcp: + project-id: gcp-test-suite + +test-resources: + containers: + pubsub-emulator: + image-name: thekevjames/gcloud-pubsub-emulator:446.0.0 + startup-timeout: 600s # 10 minutes as this image is massive + hostnames: + - pubsub.host + exposed-ports: + - pubsub.port: 8681 + - pubsub.subscription.port: 8682 + env: + - PUBSUB_PROJECT1: gcp-test-suite,animals:animals,animals-async:animals-async,raw-subscription:raw-subscription,native-subscription:native-subscription,animals-legacy:animals-legacy + +pubsub: + emulator: + host: ${pubsub.host}:${pubsub.port} diff --git a/test-suite-groovy/src/test/resources/logback.xml b/test-suite-groovy/src/test/resources/logback.xml new file mode 100644 index 000000000..dd4d87f3f --- /dev/null +++ b/test-suite-groovy/src/test/resources/logback.xml @@ -0,0 +1,15 @@ + + + + false + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + diff --git a/test-suite-kotlin/build.gradle b/test-suite-kotlin/build.gradle index b8049e262..a4c494ee5 100644 --- a/test-suite-kotlin/build.gradle +++ b/test-suite-kotlin/build.gradle @@ -1,17 +1,23 @@ plugins { id "org.jetbrains.kotlin.jvm" id "org.jetbrains.kotlin.kapt" - id "io.micronaut.build.internal.gcp-tests" + id "groovy" + id "io.micronaut.build.internal.gcp-testsuite" } dependencies { - implementation(projects.micronautGcpPubsub) - implementation(projects.micronautGcpFunctionHttp) - implementation(projects.micronautGcpSecretManager) - implementation(mn.reactor) - implementation libs.kotlin.stdlib.jdk8 - implementation libs.kotlin.reflect - compileOnly(libs.managed.functions.framework.api) + kapt(platform(mn.micronaut.core.bom)) + kapt(mn.micronaut.inject.java) + testImplementation(platform(mn.micronaut.core.bom)) + testCompileOnly(mn.micronaut.inject.groovy) + testImplementation(mnTest.micronaut.test.spock) +} + +micronaut { + testResources { + // 10 minutes as this image is massive + clientTimeout = 600 + } } kotlin { diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/quickstart/AnimalListener.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/quickstart/AnimalListener.kt index f5733bcf4..a74db997b 100644 --- a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/quickstart/AnimalListener.kt +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/quickstart/AnimalListener.kt @@ -16,10 +16,14 @@ package io.micronaut.gcp.pubsub.quickstart //tag::imports[] +import io.micronaut.context.annotation.Requires import io.micronaut.gcp.pubsub.annotation.PubSubListener import io.micronaut.gcp.pubsub.annotation.Subscription // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = ["test"]) // tag::clazz[] @PubSubListener // <1> class AnimalListener { @@ -30,4 +34,4 @@ class AnimalListener { } } -// end::clazz[] \ No newline at end of file +// end::clazz[] diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.kt new file mode 100644 index 000000000..83f8fdb55 --- /dev/null +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.kt @@ -0,0 +1,32 @@ +package io.micronaut.gcp.pubsub.serdes + +import com.fasterxml.jackson.dataformat.xml.XmlMapper +import io.micronaut.core.serialize.exceptions.SerializationException +import io.micronaut.core.type.Argument +import io.micronaut.http.MediaType +import jakarta.inject.Named +import jakarta.inject.Singleton +import java.io.IOException + +@Singleton +class XmlMessageSerDes(@param:Named("xml") private val xmlMapper: XmlMapper) : PubSubMessageSerDes { + override fun deserialize(data: ByteArray, type: Argument<*>): Any { + return try { + xmlMapper.readValue(data, type.type) + } catch (e: IOException) { + throw SerializationException("Failed to deserialize PubSub message as XML", e) + } + } + + override fun serialize(data: Any): ByteArray { + return try { + xmlMapper.writeValueAsBytes(data) + } catch (e: IOException) { + throw SerializationException("Failed to serialize PubSub message as XML", e) + } + } + + override fun supportedType(): String { + return MediaType.APPLICATION_XML + } +} diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.kt index ba1528c8a..15bc8e60f 100644 --- a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.kt +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.kt @@ -16,19 +16,43 @@ package io.micronaut.gcp.pubsub.subscriber //tag::imports[] +import io.micronaut.context.annotation.Requires import io.micronaut.gcp.pubsub.annotation.PubSubListener import io.micronaut.gcp.pubsub.annotation.Subscription import io.micronaut.gcp.pubsub.support.Animal import io.micronaut.messaging.Acknowledgement +import reactor.core.publisher.Mono + // end::imports[] +@Requires(property = "spec.name", value = "AcknowledgementSubscriberSpec") // tag::clazz[] @PubSubListener -class AcknowledgementSubscriber { +class AcknowledgementSubscriber(private val messageProcessor: MessageProcessor) { - @Subscription("animals") - fun onMessage(animal: Animal, acknowledgement: Acknowledgement) { // <1> + @Subscription("animals") + fun onMessage(animal: Animal, acknowledgement: Acknowledgement) { + if (messageProcessor.handleAnimalMessage(animal).block() == true) { + acknowledgement.ack() + messageProcessor.recordAcknowledgement(acknowledgement) + } else { + acknowledgement.nack() + messageProcessor.recordAcknowledgement(acknowledgement) + } + } - } + @Subscription("animals-async") + fun onReactiveMessage(message: Mono, acknowledgement: Acknowledgement): Mono { + return message.flatMap { animal -> messageProcessor.handleAnimalMessage(animal) } + .doOnNext { result -> + if (result) { + acknowledgement.ack() + messageProcessor.recordAcknowledgement(acknowledgement) + } else { + acknowledgement.nack() + messageProcessor.recordAcknowledgement(acknowledgement) + } + } + } } -// tag::clazz[] \ No newline at end of file +// tag::clazz[] diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.kt index d9e648912..63168eba5 100644 --- a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.kt +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.kt @@ -16,29 +16,35 @@ package io.micronaut.gcp.pubsub.subscriber //tag::imports[] import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Requires import io.micronaut.gcp.pubsub.annotation.MessageId import io.micronaut.gcp.pubsub.annotation.PubSubListener import io.micronaut.gcp.pubsub.annotation.Subscription import io.micronaut.gcp.pubsub.support.Animal // end::imports[] +@Requires(property = "spec.name", value = "ContentTypeSubscriberSpec") // tag::clazz[] @PubSubListener -class ContentTypeSubscriber { +class ContentTypeSubscriber(private val messageProcessor: MessageProcessor) { @Subscription("raw-subscription") fun receiveRaw(data: ByteArray, @MessageId id: String) { // <1> + messageProcessor.handleByteArrayMessage(data).block() } @Subscription("native-subscription") fun receiveNative(message: PubsubMessage) { // <2> + messageProcessor.handlePubsubMessage(message).block() } @Subscription("animals") fun receivePojo(animal: Animal, @MessageId id: String) { // <3> + messageProcessor.handleAnimalMessage(animal).block() } @Subscription(value = "animals-legacy", contentType = "application/xml") fun receiveXML(animal: Animal, @MessageId id: String) { // <4> + messageProcessor.handleAnimalMessage(animal).block() } } // end::clazz[] diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.kt index 59ef3f023..307e030dd 100644 --- a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.kt +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.kt @@ -15,11 +15,15 @@ */ package io.micronaut.gcp.pubsub.subscriber //tag::imports[] +import io.micronaut.context.annotation.Requires import io.micronaut.gcp.pubsub.annotation.PubSubListener import io.micronaut.gcp.pubsub.annotation.Subscription import io.micronaut.gcp.pubsub.support.Animal //end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = ["test"]) //tag::clazz[] @PubSubListener class CustomConfigurationSubscriber { diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.kt index 461ff5337..11e37526a 100644 --- a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.kt +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.kt @@ -15,12 +15,16 @@ */ package io.micronaut.gcp.pubsub.subscriber //tag::imports[] +import io.micronaut.context.annotation.Requires import io.micronaut.gcp.pubsub.annotation.PubSubListener import io.micronaut.gcp.pubsub.annotation.Subscription import io.micronaut.gcp.pubsub.support.Animal import io.micronaut.messaging.annotation.MessageHeader // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = ["test"]) // tag::clazz[] @PubSubListener class CustomHeaderSubscriber { diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.kt index 90831941c..253a355ac 100644 --- a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.kt +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.kt @@ -15,12 +15,17 @@ */ package io.micronaut.gcp.pubsub.subscriber //tag::imports[] +import io.micronaut.context.annotation.Requires import io.micronaut.gcp.pubsub.annotation.PubSubListener import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler import io.micronaut.gcp.pubsub.support.Animal // end::imports[] + +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = ["test"]) // tag::clazz[] @PubSubListener class ErrorHandlingSubscriber : PubSubMessageReceiverExceptionHandler { // <1> diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.kt new file mode 100644 index 000000000..c2e7977d1 --- /dev/null +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.kt @@ -0,0 +1,17 @@ +package io.micronaut.gcp.pubsub.subscriber + +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.gcp.pubsub.support.Animal +import io.micronaut.messaging.Acknowledgement +import reactor.core.publisher.Mono + +open class MessageProcessor { + + open fun handleByteArrayMessage(message: ByteArray) = Mono.just(java.lang.Boolean.TRUE) + + open fun handlePubsubMessage(pubsubMessage: PubsubMessage) = Mono.just(java.lang.Boolean.TRUE) + + open fun handleAnimalMessage(message: Animal) = Mono.just(java.lang.Boolean.TRUE) + + open fun recordAcknowledgement(acknowledgement: Acknowledgement) = Unit +} diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.kt new file mode 100644 index 000000000..cf3edec8b --- /dev/null +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.kt @@ -0,0 +1,62 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.gcp.pubsub.subscriber +//tag::imports[] +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Requires +import io.micronaut.gcp.pubsub.annotation.MessageId +import io.micronaut.gcp.pubsub.annotation.PubSubListener +import io.micronaut.gcp.pubsub.annotation.Subscription +import io.micronaut.gcp.pubsub.support.Animal +import reactor.core.publisher.Mono + +// end::imports[] + +@Requires(property = "spec.name", value = "ReactiveSubscriberSpec") +// tag::clazz[] +@PubSubListener +class ReactiveSubscriber(private val messageProcessor: MessageProcessor) { + + @Subscription("raw-subscription") + fun receiveRaw(data: Mono, @MessageId id: String): Mono { // <1> + return data.flatMap { payload -> + messageProcessor.handleByteArrayMessage(payload) + } + } + + @Subscription("native-subscription") + fun receiveNative(message: Mono): Mono { // <2> + return message.flatMap { payload -> + messageProcessor.handlePubsubMessage(payload) + } + } + + @Subscription("animals") + fun receivePojo(message: Mono, @MessageId id: String): Mono { // <3> + return message.flatMap { animal -> + messageProcessor.handleAnimalMessage(animal) + } + } + + @Subscription(value = "animals-legacy", contentType = "application/xml") + fun receiveXML(message: Mono, @MessageId id: String): Mono { // <4> + return message.flatMap { animal -> + messageProcessor.handleAnimalMessage(animal) + } + } +} +// end::clazz[] + diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.kt index 33a74a3bf..498e008fd 100644 --- a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.kt +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.kt @@ -15,11 +15,15 @@ */ package io.micronaut.gcp.pubsub.subscriber //tag::imports[] +import io.micronaut.context.annotation.Requires import io.micronaut.gcp.pubsub.annotation.PubSubListener import io.micronaut.gcp.pubsub.annotation.Subscription import io.micronaut.gcp.pubsub.support.Animal // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = ["test"]) // tag::clazz[] @PubSubListener // <1> class SimpleSubscriber { @@ -34,4 +38,4 @@ class SimpleSubscriber { } -} \ No newline at end of file +} diff --git a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/support/Animal.kt b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/support/Animal.kt index 42bca7372..f5e9a4d6b 100644 --- a/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/support/Animal.kt +++ b/test-suite-kotlin/src/main/kotlin/io/micronaut/gcp/pubsub/support/Animal.kt @@ -15,5 +15,8 @@ */ package io.micronaut.gcp.pubsub.support +import io.micronaut.serde.annotation.Serdeable + +@Serdeable data class Animal(val name: String) { -} \ No newline at end of file +} diff --git a/test-suite-kotlin/src/test/groovy/subscriber/AcknowledgementSubscriberSpec.groovy b/test-suite-kotlin/src/test/groovy/subscriber/AcknowledgementSubscriberSpec.groovy new file mode 100644 index 000000000..36b941e05 --- /dev/null +++ b/test-suite-kotlin/src/test/groovy/subscriber/AcknowledgementSubscriberSpec.groovy @@ -0,0 +1,144 @@ +package subscriber + +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.gcp.pubsub.annotation.PubSubClient +import io.micronaut.gcp.pubsub.annotation.Topic +import io.micronaut.gcp.pubsub.bind.DefaultPubSubAcknowledgement +import io.micronaut.gcp.pubsub.subscriber.MessageProcessor +import io.micronaut.gcp.pubsub.support.Animal +import io.micronaut.messaging.Acknowledgement +import io.micronaut.test.annotation.MockBean +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import org.jetbrains.annotations.NotNull +import reactor.core.publisher.Mono +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.util.concurrent.atomic.AtomicInteger + +@MicronautTest +@Property(name = "spec.name", value = "AcknowledgementSubscriberSpec") +class AcknowledgementSubscriberSpec extends Specification { + + @Inject + TestPublisher publisher + + Animal receivedMessage + + Acknowledgement recordedAcknowledgement + + AtomicInteger messageCount + + boolean isNackTest + + def setup() { + receivedMessage = null + recordedAcknowledgement = null + messageCount = new AtomicInteger(0) + isNackTest = false + } + + void "blocking subscriber with manual ack"() { + setup: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimal(dog) + + then: + conditions.eventually { + assert receivedMessage != null + assert receivedMessage.name == "dog" + assert messageCount.get() == 1 + assert recordedAcknowledgement instanceof DefaultPubSubAcknowledgement + assert (recordedAcknowledgement as DefaultPubSubAcknowledgement).isClientAck() + } + } + + void "blocking subscriber with manual nack"() { + setup: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + isNackTest = true + + when: + publisher.publishAnimal(dog) + + then: + conditions.eventually { + assert receivedMessage != null + assert receivedMessage.name == "dog" + assert messageCount.get() == 2 + assert recordedAcknowledgement instanceof DefaultPubSubAcknowledgement + assert (recordedAcknowledgement as DefaultPubSubAcknowledgement).isClientAck() + } + } + + void "async subscriber with manual ack"() { + setup: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimalAsync(dog) + + then: + conditions.eventually { + assert receivedMessage != null + assert receivedMessage.name == "dog" + assert messageCount.get() == 1 + assert recordedAcknowledgement instanceof DefaultPubSubAcknowledgement + assert (recordedAcknowledgement as DefaultPubSubAcknowledgement).isClientAck() + } + } + + void "async subscriber with manual nack"() { + setup: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + isNackTest = true + + when: + publisher.publishAnimalAsync(dog) + + then: + conditions.eventually { + assert receivedMessage != null + assert receivedMessage.name == "dog" + assert messageCount.get() == 2 + assert recordedAcknowledgement instanceof DefaultPubSubAcknowledgement + assert (recordedAcknowledgement as DefaultPubSubAcknowledgement).isClientAck() + } + } + + @MockBean(MessageProcessor) + MessageProcessor subscriberForTest() { + return new MessageProcessor() { + @Override + Mono handleAnimalMessage(@NotNull Animal message) { + receivedMessage = message + if (messageCount.getAndIncrement() == 0 && isNackTest) { + return Mono.just(Boolean.FALSE) + } + return Mono.just(Boolean.TRUE) + } + + @Override + void recordAcknowledgement(@NotNull Acknowledgement acknowledgement) { + recordedAcknowledgement = acknowledgement + } + } + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "AcknowledgementSubscriberSpec") + static interface TestPublisher { + @Topic("animals") void publishAnimal(Animal animal) + @Topic("animals-async") void publishAnimalAsync(Animal animal) + } +} diff --git a/test-suite-kotlin/src/test/groovy/subscriber/ContentTypeSubscriberSpec.groovy b/test-suite-kotlin/src/test/groovy/subscriber/ContentTypeSubscriberSpec.groovy new file mode 100644 index 000000000..621d7682d --- /dev/null +++ b/test-suite-kotlin/src/test/groovy/subscriber/ContentTypeSubscriberSpec.groovy @@ -0,0 +1,134 @@ +package subscriber + +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.gcp.pubsub.annotation.PubSubClient +import io.micronaut.gcp.pubsub.annotation.Topic +import io.micronaut.gcp.pubsub.subscriber.MessageProcessor +import io.micronaut.gcp.pubsub.support.Animal +import io.micronaut.http.MediaType +import io.micronaut.test.annotation.MockBean +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import org.jetbrains.annotations.NotNull +import reactor.core.publisher.Mono +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.nio.charset.StandardCharsets + +@MicronautTest +@Property(name = "spec.name", value = "ContentTypeSubscriberSpec") +class ContentTypeSubscriberSpec extends Specification { + + @Inject + TestPublisher publisher + + Object receivedMessage + + def setup() { + receivedMessage = null + } + + void "receive raw bytes"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8) + + when: + publisher.publishRaw(bytesSent) + + then: + conditions.eventually { + assert receivedMessage != null + assert receivedMessage instanceof byte[] + String decodedMessage = new String((receivedMessage as byte[]), StandardCharsets.UTF_8) + assert "foo" == decodedMessage + } + } + + void "receive native message"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8) + + when: + publisher.publishNative(bytesSent) + + then: + conditions.eventually { + assert receivedMessage != null + assert receivedMessage instanceof PubsubMessage + String decodedMessage = (receivedMessage as PubsubMessage).getData().toString(StandardCharsets.UTF_8) + assert "foo" == decodedMessage + } + } + + void "receive pojo message from json"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimal(dog) + + then: + conditions.eventually { + assert receivedMessage != null + assert receivedMessage instanceof Animal + assert "dog" == (receivedMessage as Animal).getName() + } + } + + void "receive pojo message from xml"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimalAsXml(dog) + + then: + conditions.eventually { + assert receivedMessage != null + assert receivedMessage instanceof Animal + assert "dog" == (receivedMessage as Animal).getName() + } + } + + @MockBean(MessageProcessor) + MessageProcessor subscriberForTest() { + return new MessageProcessor() { + + @Override + Mono handleByteArrayMessage(@NotNull byte[] message) { + receivedMessage = message + return Mono.just(Boolean.TRUE) + } + + @Override + Mono handlePubsubMessage(@NotNull PubsubMessage pubsubMessage) { + receivedMessage = pubsubMessage + return Mono.just(Boolean.TRUE) + } + + @Override + Mono handleAnimalMessage(@NotNull Animal message) { + receivedMessage = message + return Mono.just(Boolean.TRUE) + } + } + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "ContentTypeSubscriberSpec") + static interface TestPublisher { + @Topic("raw-subscription") void publishRaw(byte[] payload) + @Topic("native-subscription") void publishNative(byte[] payload) + @Topic("animals") void publishAnimal(Animal animal) + @Topic(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) void publishAnimalAsXml(Animal animal) + } +} diff --git a/test-suite-kotlin/src/test/groovy/subscriber/ReactiveSubscriberSpec.groovy b/test-suite-kotlin/src/test/groovy/subscriber/ReactiveSubscriberSpec.groovy new file mode 100644 index 000000000..71b2910e2 --- /dev/null +++ b/test-suite-kotlin/src/test/groovy/subscriber/ReactiveSubscriberSpec.groovy @@ -0,0 +1,132 @@ +package subscriber + +import com.google.pubsub.v1.PubsubMessage +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.gcp.pubsub.annotation.PubSubClient +import io.micronaut.gcp.pubsub.annotation.Topic +import io.micronaut.gcp.pubsub.subscriber.MessageProcessor +import io.micronaut.gcp.pubsub.support.Animal +import io.micronaut.http.MediaType +import io.micronaut.test.annotation.MockBean +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import reactor.core.publisher.Mono +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.nio.charset.StandardCharsets + +@MicronautTest +@Property(name = "spec.name", value = "ReactiveSubscriberSpec") +class ReactiveSubscriberSpec extends Specification { + + @Inject + TestPublisher publisher + + Object unwrappedResult + + def setup() { + unwrappedResult = null + } + + void "receive raw bytes"() { + given: + def conditions = new PollingConditions(timeout: 1) + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8) + + when: + publisher.publishRaw(bytesSent) + + then: + conditions.eventually { + assert unwrappedResult != null + assert unwrappedResult instanceof byte[] + String decodedMessage = new String((unwrappedResult as byte[]), StandardCharsets.UTF_8) + assert "foo" == decodedMessage + } + } + + void "receive native message"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8) + + when: + publisher.publishNative(bytesSent) + + then: + conditions.eventually { + assert unwrappedResult != null + assert unwrappedResult instanceof PubsubMessage + String decodedMessage = (unwrappedResult as PubsubMessage).getData().toString(StandardCharsets.UTF_8) + assert "foo" == decodedMessage + } + } + + void "receive pojo message from json"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimal(dog) + + then: + conditions.eventually { + assert unwrappedResult != null + assert unwrappedResult instanceof Animal + assert "dog" == (unwrappedResult as Animal).getName() + } + } + + void "receive pojo message from xml"() { + given: + def conditions = new PollingConditions(initialDelay: 1) + Animal dog = new Animal("dog") + + when: + publisher.publishAnimalAsXml(dog) + + then: + conditions.eventually { + assert unwrappedResult != null + assert unwrappedResult instanceof Animal + assert "dog" == (unwrappedResult as Animal).getName() + } + } + + @MockBean(MessageProcessor.class) + MessageProcessor mockMessageProcessor() { + return new MessageProcessor() { + @Override + Mono handleByteArrayMessage(byte[] message) { + unwrappedResult = message + return super.handleByteArrayMessage(message) + } + + @Override + Mono handlePubsubMessage(PubsubMessage pubsubMessage) { + unwrappedResult = pubsubMessage + return super.handlePubsubMessage(pubsubMessage) + } + + @Override + Mono handleAnimalMessage(Animal message) { + unwrappedResult = message + return super.handleAnimalMessage(message) + } + } + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "ReactiveSubscriberSpec") + static interface TestPublisher { + @Topic("raw-subscription") void publishRaw(byte[] payload) + @Topic("native-subscription") void publishNative(byte[] payload) + @Topic("animals") void publishAnimal(Animal animal) + @Topic(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) void publishAnimalAsXml(Animal animal) + } +} diff --git a/test-suite-kotlin/src/test/resources/application-test.yml b/test-suite-kotlin/src/test/resources/application-test.yml new file mode 100644 index 000000000..b951cd16a --- /dev/null +++ b/test-suite-kotlin/src/test/resources/application-test.yml @@ -0,0 +1,19 @@ +gcp: + project-id: gcp-test-suite + +test-resources: + containers: + pubsub-emulator: + image-name: thekevjames/gcloud-pubsub-emulator:446.0.0 + startup-timeout: 600s # 10 minutes as this image is massive + hostnames: + - pubsub.host + exposed-ports: + - pubsub.port: 8681 + - pubsub.subscription.port: 8682 + env: + - PUBSUB_PROJECT1: gcp-test-suite,animals:animals,animals-async:animals-async,raw-subscription:raw-subscription,native-subscription:native-subscription,animals-legacy:animals-legacy + +pubsub: + emulator: + host: ${pubsub.host}:${pubsub.port} diff --git a/test-suite-kotlin/src/test/resources/logback.xml b/test-suite-kotlin/src/test/resources/logback.xml new file mode 100644 index 000000000..dd4d87f3f --- /dev/null +++ b/test-suite-kotlin/src/test/resources/logback.xml @@ -0,0 +1,15 @@ + + + + false + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + diff --git a/test-suite/build.gradle b/test-suite/build.gradle index b3e875dcc..5990214e3 100644 --- a/test-suite/build.gradle +++ b/test-suite/build.gradle @@ -1,15 +1,19 @@ plugins { id 'java-library' - id "io.micronaut.build.internal.gcp-tests" + id "io.micronaut.build.internal.gcp-testsuite" } dependencies { - annotationProcessor(mn.micronaut.inject.java) - - implementation(projects.micronautGcpFunctionHttp) - implementation(projects.micronautGcpPubsub) - implementation(projects.micronautGcpSecretManager) - implementation(mn.reactor) + testAnnotationProcessor(platform(mn.micronaut.core.bom)) + testAnnotationProcessor(mn.micronaut.inject.java) + testImplementation(mnTest.micronaut.test.junit5) + testImplementation(libs.awaitility) + testImplementation(libs.mockito) +} - compileOnly(libs.managed.functions.framework.api) +micronaut { + testResources { + // 10 minutes as this image is massive + clientTimeout = 600 + } } diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/quickstart/AnimalListener.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/quickstart/AnimalListener.java index d65e8a50e..cf0fbb832 100644 --- a/test-suite/src/main/java/io/micronaut/gcp/pubsub/quickstart/AnimalListener.java +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/quickstart/AnimalListener.java @@ -15,10 +15,14 @@ */ package io.micronaut.gcp.pubsub.quickstart; //tag::imports[] +import io.micronaut.context.annotation.Requires; import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.annotation.Subscription; // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") // tag::clazz[] @PubSubListener // <1> public class AnimalListener { diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.java new file mode 100644 index 000000000..ea8c562e3 --- /dev/null +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/serdes/XmlMessageSerDes.java @@ -0,0 +1,43 @@ +package io.micronaut.gcp.pubsub.serdes; + +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import io.micronaut.core.serialize.exceptions.SerializationException; +import io.micronaut.core.type.Argument; +import io.micronaut.http.MediaType; +import jakarta.inject.Named; +import jakarta.inject.Singleton; + +import java.io.IOException; + +@Singleton +public class XmlMessageSerDes implements PubSubMessageSerDes { + + private final XmlMapper xmlMapper; + + public XmlMessageSerDes(@Named("xml") XmlMapper xmlMapper) { + this.xmlMapper = xmlMapper; + } + + @Override + public Object deserialize(byte[] data, Argument type) { + try { + return xmlMapper.readValue(data, type.getType()); + } catch (IOException e) { + throw new SerializationException("Failed to deserialize PubSub message as XML", e); + } + } + + @Override + public byte[] serialize(Object data) { + try { + return xmlMapper.writeValueAsBytes(data); + } catch (IOException e) { + throw new SerializationException("Failed to serialize PubSub message as XML", e); + } + } + + @Override + public String supportedType() { + return MediaType.APPLICATION_XML; + } +} diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.java index b18098f61..f166e02d4 100644 --- a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.java +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriber.java @@ -15,19 +15,45 @@ */ package io.micronaut.gcp.pubsub.subscriber; //tag::imports[] + +import io.micronaut.context.annotation.Requires; import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.annotation.Subscription; import io.micronaut.gcp.pubsub.support.Animal; import io.micronaut.messaging.Acknowledgement; +import reactor.core.publisher.Mono; // end::imports[] +@Requires(property = "spec.name", value = "AcknowledgementSubscriberTest") // tag::clazz[] @PubSubListener public class AcknowledgementSubscriber { + private final MessageProcessor messageProcessor; + + public AcknowledgementSubscriber(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } + @Subscription("animals") - public void onMessage(Animal animal, Acknowledgement acknowledgement) { // <1> + public void onMessage(Animal animal, Acknowledgement acknowledgement) { + if (Boolean.TRUE.equals(messageProcessor.handleAnimalMessage(animal).block())) { + acknowledgement.ack(); + } else { + acknowledgement.nack(); + } + } + @Subscription("animals-async") + public Mono onReactiveMessage(Mono animal, Acknowledgement acknowledgement) { + return animal.flatMap(messageProcessor::handleAnimalMessage) + .doOnNext(result -> { + if (Boolean.TRUE.equals(result)) { + acknowledgement.ack(); + } else { + acknowledgement.nack(); + } + }); } } diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.java index 0f0cb1a0b..bbc3491d2 100644 --- a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.java +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriber.java @@ -16,12 +16,15 @@ package io.micronaut.gcp.pubsub.subscriber; //tag::imports[] import com.google.pubsub.v1.PubsubMessage; +import io.micronaut.context.annotation.Requires; import io.micronaut.gcp.pubsub.annotation.MessageId; import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.annotation.Subscription; import io.micronaut.gcp.pubsub.support.Animal; +import io.micronaut.http.MediaType; // end::imports[] +@Requires(property = "spec.name", value = "ContentTypeSubscriberTest") // tag::clazz[] @PubSubListener public class ContentTypeSubscriber { @@ -38,7 +41,7 @@ void receiveNative(PubsubMessage message) { void receivePojo(Animal animal, @MessageId String id) { } - @Subscription(value = "animals-legacy", contentType = "application/xml") // <4> + @Subscription(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) // <4> void receiveXML(Animal animal, @MessageId String id) { } diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.java index 0b3b1dddc..bfb21037f 100644 --- a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.java +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/CustomConfigurationSubscriber.java @@ -15,11 +15,15 @@ */ package io.micronaut.gcp.pubsub.subscriber; //tag::imports[] +import io.micronaut.context.annotation.Requires; import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.annotation.Subscription; import io.micronaut.gcp.pubsub.support.Animal; //end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") //tag::clazz[] @PubSubListener public class CustomConfigurationSubscriber { diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.java index f61754dce..346be6a3c 100644 --- a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.java +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/CustomHeaderSubscriber.java @@ -15,12 +15,16 @@ */ package io.micronaut.gcp.pubsub.subscriber; //tag::imports[] +import io.micronaut.context.annotation.Requires; import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.annotation.Subscription; import io.micronaut.gcp.pubsub.support.Animal; import io.micronaut.messaging.annotation.MessageHeader; // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") // tag::clazz[] @PubSubListener public class CustomHeaderSubscriber { diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.java index 766381e83..498ec036f 100644 --- a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.java +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ErrorHandlingSubscriber.java @@ -16,6 +16,7 @@ package io.micronaut.gcp.pubsub.subscriber; //tag::imports[] import com.google.pubsub.v1.PubsubMessage; +import io.micronaut.context.annotation.Requires; import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.bind.PubSubConsumerState; import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException; @@ -23,6 +24,9 @@ import io.micronaut.gcp.pubsub.support.Animal; // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") // tag::clazz[] @PubSubListener public class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { // <1> diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.java new file mode 100644 index 000000000..ca0d91345 --- /dev/null +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/MessageProcessor.java @@ -0,0 +1,19 @@ +package io.micronaut.gcp.pubsub.subscriber; + +import com.google.pubsub.v1.PubsubMessage; +import io.micronaut.gcp.pubsub.support.Animal; +import reactor.core.publisher.Mono; + +public interface MessageProcessor { + default Mono handleByteArrayMessage(byte[] message) { + return Mono.just(Boolean.TRUE); + } + + default Mono handlePubSubMessage(PubsubMessage pubsubMessage) { + return Mono.just(Boolean.TRUE); + } + + default Mono handleAnimalMessage(Animal message) { + return Mono.just(Boolean.TRUE); + } +} diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.java new file mode 100644 index 000000000..cb1f8047b --- /dev/null +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriber.java @@ -0,0 +1,59 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.gcp.pubsub.subscriber; +//tag::imports[] + +import com.google.pubsub.v1.PubsubMessage; +import io.micronaut.context.annotation.Requires; +import io.micronaut.gcp.pubsub.annotation.MessageId; +import io.micronaut.gcp.pubsub.annotation.PubSubListener; +import io.micronaut.gcp.pubsub.annotation.Subscription; +import io.micronaut.gcp.pubsub.support.Animal; +import reactor.core.publisher.Mono; +// end::imports[] + +@Requires(property = "spec.name", value = "ReactiveSubscriberTest") +// tag::clazz[] +@PubSubListener +public class ReactiveSubscriber { + + private final MessageProcessor messageProcessor; + + public ReactiveSubscriber(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } + + @Subscription("raw-subscription") // <1> + Mono receiveRaw(Mono data, @MessageId String id) { + return data.flatMap(messageProcessor::handleByteArrayMessage); + } + + @Subscription("native-subscription") // <2> + Mono receiveNative(Mono message) { + return message.flatMap(messageProcessor::handlePubSubMessage); + } + + @Subscription("animals") // <3> + Mono receivePojo(Mono animal, @MessageId String id) { + return animal.flatMap(messageProcessor::handleAnimalMessage); + } + + @Subscription(value = "animals-legacy", contentType = "application/xml") // <4> + Mono receiveXML(Mono animal, @MessageId String id) { + return animal.flatMap(messageProcessor::handleAnimalMessage); + } +} +// end::clazz[] diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.java index 789ed4837..903c581bf 100644 --- a/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.java +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/subscriber/SimpleSubscriber.java @@ -16,11 +16,15 @@ package io.micronaut.gcp.pubsub.subscriber; //tag::imports[] +import io.micronaut.context.annotation.Requires; import io.micronaut.gcp.pubsub.annotation.PubSubListener; import io.micronaut.gcp.pubsub.annotation.Subscription; import io.micronaut.gcp.pubsub.support.Animal; // end::imports[] +// There are currently no tests for this class. It is disabled in the test environment +// in order to prevent clashes with other subscribers. +@Requires(notEnv = "test") // tag::clazz[] @PubSubListener // <1> public class SimpleSubscriber { diff --git a/test-suite/src/main/java/io/micronaut/gcp/pubsub/support/Animal.java b/test-suite/src/main/java/io/micronaut/gcp/pubsub/support/Animal.java index f0881e05f..bc0eafe48 100644 --- a/test-suite/src/main/java/io/micronaut/gcp/pubsub/support/Animal.java +++ b/test-suite/src/main/java/io/micronaut/gcp/pubsub/support/Animal.java @@ -15,8 +15,11 @@ */ package io.micronaut.gcp.pubsub.support; +import io.micronaut.serde.annotation.Serdeable; + import java.io.Serializable; +@Serdeable public final class Animal implements Serializable { private String name; diff --git a/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriberTest.java b/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriberTest.java new file mode 100644 index 000000000..4fce0e14c --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/AcknowledgementSubscriberTest.java @@ -0,0 +1,137 @@ +package io.micronaut.gcp.pubsub.subscriber; + +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.event.BeanCreatedEvent; +import io.micronaut.context.event.BeanCreatedEventListener; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.gcp.pubsub.annotation.PubSubClient; +import io.micronaut.gcp.pubsub.annotation.Topic; +import io.micronaut.gcp.pubsub.bind.DefaultPubSubAcknowledgement; +import io.micronaut.gcp.pubsub.support.Animal; +import io.micronaut.messaging.Acknowledgement; +import io.micronaut.test.annotation.MockBean; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@MicronautTest +@Property(name = "spec.name", value = "AcknowledgementSubscriberTest") +class AcknowledgementSubscriberTest { + + @Inject + TestPublisher publisher; + + @Inject + MessageProcessor messageProcessor; + + @Inject + AcknowledgementSubscriber subscriber; + + @BeforeEach + void setup() { + Mockito.reset(messageProcessor, subscriber); + } + + @Test + void testBlockingAck() { + when(messageProcessor.handleAnimalMessage(any())).thenReturn(Mono.just(Boolean.TRUE)); + + Animal dog = new Animal("dog"); + publisher.publishAnimal(dog); + + ArgumentCaptor animalArg = ArgumentCaptor.forClass(Animal.class); + ArgumentCaptor ackArg = ArgumentCaptor.forClass(Acknowledgement.class); + + verify(subscriber, timeout(5000)).onMessage(animalArg.capture(), ackArg.capture()); + Assertions.assertEquals("dog", animalArg.getValue().getName()); + Assertions.assertInstanceOf(DefaultPubSubAcknowledgement.class, ackArg.getValue()); + Assertions.assertTrue(((DefaultPubSubAcknowledgement)ackArg.getValue()).isClientAck()); + } + + @Test + void testBlockingNack() { + when(messageProcessor.handleAnimalMessage(any())).thenReturn(Mono.just(Boolean.FALSE), Mono.just(Boolean.TRUE)); + + Animal cat = new Animal("cat"); + publisher.publishAnimal(cat); + + ArgumentCaptor animalArg = ArgumentCaptor.forClass(Animal.class); + ArgumentCaptor ackArg = ArgumentCaptor.forClass(Acknowledgement.class); + + verify(subscriber, timeout(5000).times(2)).onMessage(animalArg.capture(), ackArg.capture()); + Assertions.assertEquals("cat", animalArg.getValue().getName()); + Assertions.assertInstanceOf(DefaultPubSubAcknowledgement.class, ackArg.getValue()); + Assertions.assertTrue(((DefaultPubSubAcknowledgement)ackArg.getValue()).isClientAck()); + } + + @Test + void testAsyncAck() { + when(messageProcessor.handleAnimalMessage(any())).thenReturn(Mono.just(Boolean.TRUE)); + + Animal dog = new Animal("dog"); + publisher.publishAnimalAsync(dog); + + ArgumentCaptor> animalArg = ArgumentCaptor.forClass(Mono.class); + ArgumentCaptor ackArg = ArgumentCaptor.forClass(Acknowledgement.class); + + verify(subscriber, timeout(3000)).onReactiveMessage(animalArg.capture(), ackArg.capture()); + Assertions.assertNotNull(animalArg.getValue()); + Assertions.assertInstanceOf(DefaultPubSubAcknowledgement.class, ackArg.getValue()); + await().atMost(Duration.ofSeconds(3)).until(() -> ((DefaultPubSubAcknowledgement)ackArg.getValue()).isClientAck()); + } + + @Test + void testAsyncNack() { + when(messageProcessor.handleAnimalMessage(any())).thenReturn(Mono.just(Boolean.FALSE), Mono.just(Boolean.TRUE)); + + Animal cat = new Animal("cat"); + publisher.publishAnimalAsync(cat); + + ArgumentCaptor> animalArg = ArgumentCaptor.forClass(Mono.class); + ArgumentCaptor ackArg = ArgumentCaptor.forClass(Acknowledgement.class); + + verify(subscriber, timeout(5000).times(2)).onReactiveMessage(animalArg.capture(), ackArg.capture()); + Assertions.assertNotNull(animalArg.getValue()); + Assertions.assertInstanceOf(DefaultPubSubAcknowledgement.class, ackArg.getValue()); + await().atMost(Duration.ofSeconds(3)).until(() -> ((DefaultPubSubAcknowledgement)ackArg.getValue()).isClientAck()); + } + + @MockBean(MessageProcessor.class) + MessageProcessor mockMessageProcessor() { + return mock(MessageProcessor.class); + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "AcknowledgementSubscriberTest") + interface TestPublisher { + @Topic("animals") void publishAnimal(Animal animal); + @Topic("animals-async") void publishAnimalAsync(Animal animal); + } + + @Singleton + @Requires(property = "spec.name", value = "AcknowledgementSubscriberTest") + static class SubscriberCreatedListener implements BeanCreatedEventListener { + @Override + public AcknowledgementSubscriber onCreated(@NonNull BeanCreatedEvent event) { + return spy(event.getBean()); + } + } +} diff --git a/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriberTest.java b/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriberTest.java new file mode 100644 index 000000000..0448471a8 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/ContentTypeSubscriberTest.java @@ -0,0 +1,106 @@ +package io.micronaut.gcp.pubsub.subscriber; + +import com.google.pubsub.v1.PubsubMessage; +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.event.BeanCreatedEvent; +import io.micronaut.context.event.BeanCreatedEventListener; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.gcp.pubsub.annotation.PubSubClient; +import io.micronaut.gcp.pubsub.annotation.Topic; +import io.micronaut.gcp.pubsub.support.Animal; +import io.micronaut.http.MediaType; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.nio.charset.StandardCharsets; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +@MicronautTest +@Property(name = "spec.name", value = "ContentTypeSubscriberTest") +class ContentTypeSubscriberTest { + + @Inject + TestPublisher publisher; + + @Inject + ContentTypeSubscriber subscriber; + + @BeforeEach + void setup() { + Mockito.reset(subscriber); + } + + @Test + void testRawBytes() { + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8); + publisher.publishRaw(bytesSent); + + ArgumentCaptor bytesReceived = ArgumentCaptor.forClass(byte[].class); + verify(subscriber, timeout(3000)).receiveRaw(bytesReceived.capture(), any()); + String decodedMessage = new String(bytesReceived.getValue(), StandardCharsets.UTF_8); + Assertions.assertEquals("foo", decodedMessage); + } + + @Test + void testNativeMessage() { + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8); + publisher.publishNative(bytesSent); + + ArgumentCaptor messageReceived = ArgumentCaptor.forClass(PubsubMessage.class); + verify(subscriber, timeout(3000)).receiveNative(messageReceived.capture()); + String decodedMessage = messageReceived.getValue().getData().toString(StandardCharsets.UTF_8); + Assertions.assertEquals("foo", decodedMessage); + } + + @Test + void testJsonPojo() { + Animal dog = new Animal("dog"); + publisher.publishAnimal(dog); + + ArgumentCaptor messageReceived = ArgumentCaptor.forClass(Animal.class); + verify(subscriber, timeout(3000)).receivePojo(messageReceived.capture(), any()); + Assertions.assertNotNull(messageReceived.getValue()); + Assertions.assertEquals("dog", messageReceived.getValue().getName()); + } + + @Test + void testXmlPojo() { + Animal dog = new Animal("cat"); + publisher.publishAnimalAsXml(dog); + + ArgumentCaptor messageReceived = ArgumentCaptor.forClass(Animal.class); + verify(subscriber, timeout(3000)).receiveXML(messageReceived.capture(), any()); + Assertions.assertNotNull(messageReceived.getValue()); + Assertions.assertEquals("cat", messageReceived.getValue().getName()); + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "ContentTypeSubscriberTest") + interface TestPublisher { + @Topic("raw-subscription") void publishRaw(byte[] payload); + @Topic("native-subscription") void publishNative(byte[] payload); + @Topic("animals") void publishAnimal(Animal animal); + @Topic(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) void publishAnimalAsXml(Animal animal); + } + + @Singleton + @Requires(property = "spec.name", value = "ContentTypeSubscriberTest") + static class SubscriberCreatedListener implements BeanCreatedEventListener { + @Override + public ContentTypeSubscriber onCreated(@NonNull BeanCreatedEvent event) { + return spy(event.getBean()); + } + } +} diff --git a/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriberTest.java b/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriberTest.java new file mode 100644 index 000000000..64d5b32c2 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/gcp/pubsub/subscriber/ReactiveSubscriberTest.java @@ -0,0 +1,116 @@ +package io.micronaut.gcp.pubsub.subscriber; + +import com.google.pubsub.v1.PubsubMessage; +import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Requires; +import io.micronaut.gcp.pubsub.annotation.PubSubClient; +import io.micronaut.gcp.pubsub.annotation.Topic; +import io.micronaut.gcp.pubsub.support.Animal; +import io.micronaut.http.MediaType; +import io.micronaut.test.annotation.MockBean; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; + +import static org.awaitility.Awaitility.await; + +@MicronautTest +@Property(name = "spec.name", value = "ReactiveSubscriberTest") +class ReactiveSubscriberTest { + + @Inject + TestPublisher publisher; + + Object unwrappedResult; + + @BeforeEach + void setup() { + unwrappedResult = null; + } + + @Test + void testRawBytes() { + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8); + publisher.publishRaw(bytesSent); + + await().atMost(Duration.ofSeconds(2)).until(() -> unwrappedResult != null); + Assertions.assertInstanceOf(byte[].class, unwrappedResult); + byte[] convertedResult = (byte[]) unwrappedResult; + String decodedMessage = new String(convertedResult, StandardCharsets.UTF_8); + Assertions.assertEquals("foo", decodedMessage); + } + + @Test + void testNativeMessage() { + byte[] bytesSent = "foo".getBytes(StandardCharsets.UTF_8); + publisher.publishNative(bytesSent); + + await().atMost(Duration.ofSeconds(2)).until(() -> unwrappedResult != null); + Assertions.assertInstanceOf(PubsubMessage.class, unwrappedResult); + PubsubMessage convertedResult = (PubsubMessage) unwrappedResult; + String decodedMessage = convertedResult.getData().toString(StandardCharsets.UTF_8); + Assertions.assertEquals("foo", decodedMessage); + } + + @Test + void testJsonPojo() { + Animal dog = new Animal("dog"); + publisher.publishAnimal(dog); + + await().atMost(Duration.ofSeconds(2)).until(() -> unwrappedResult != null); + Assertions.assertInstanceOf(Animal.class, unwrappedResult); + Animal convertedResult = (Animal) unwrappedResult; + Assertions.assertEquals("dog", convertedResult.getName()); + } + + @Test + void testXmlPojo() { + Animal dog = new Animal("cat"); + publisher.publishAnimalAsXml(dog); + + await().atMost(Duration.ofSeconds(2)).until(() -> unwrappedResult != null); + Assertions.assertInstanceOf(Animal.class, unwrappedResult); + Animal convertedResult = (Animal) unwrappedResult; + Assertions.assertEquals("cat", convertedResult.getName()); + } + + @MockBean(MessageProcessor.class) + MessageProcessor mockMessageProcessor() { + return new MessageProcessor() { + @Override + public Mono handleByteArrayMessage(byte[] message) { + unwrappedResult = message; + return MessageProcessor.super.handleByteArrayMessage(message); + } + + @Override + public Mono handlePubSubMessage(PubsubMessage pubsubMessage) { + unwrappedResult = pubsubMessage; + return MessageProcessor.super.handlePubSubMessage(pubsubMessage); + } + + @Override + public Mono handleAnimalMessage(Animal message) { + unwrappedResult = message; + return MessageProcessor.super.handleAnimalMessage(message); + } + }; + } + + @Singleton + @PubSubClient + @Requires(property = "spec.name", value = "ReactiveSubscriberTest") + interface TestPublisher { + @Topic("raw-subscription") void publishRaw(byte[] payload); + @Topic("native-subscription") void publishNative(byte[] payload); + @Topic("animals") void publishAnimal(Animal animal); + @Topic(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) void publishAnimalAsXml(Animal animal); + } +} diff --git a/test-suite/src/test/resources/application-test.yml b/test-suite/src/test/resources/application-test.yml new file mode 100644 index 000000000..b951cd16a --- /dev/null +++ b/test-suite/src/test/resources/application-test.yml @@ -0,0 +1,19 @@ +gcp: + project-id: gcp-test-suite + +test-resources: + containers: + pubsub-emulator: + image-name: thekevjames/gcloud-pubsub-emulator:446.0.0 + startup-timeout: 600s # 10 minutes as this image is massive + hostnames: + - pubsub.host + exposed-ports: + - pubsub.port: 8681 + - pubsub.subscription.port: 8682 + env: + - PUBSUB_PROJECT1: gcp-test-suite,animals:animals,animals-async:animals-async,raw-subscription:raw-subscription,native-subscription:native-subscription,animals-legacy:animals-legacy + +pubsub: + emulator: + host: ${pubsub.host}:${pubsub.port} diff --git a/test-suite/src/test/resources/logback.xml b/test-suite/src/test/resources/logback.xml new file mode 100644 index 000000000..dd4d87f3f --- /dev/null +++ b/test-suite/src/test/resources/logback.xml @@ -0,0 +1,15 @@ + + + + false + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + +