Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early queue connection and message consumption causes issues on application startup #125

Closed
3 tasks done
StefanSchmidtOz opened this issue Jul 2, 2021 · 1 comment
Closed
3 tasks done

Comments

@StefanSchmidtOz
Copy link

Is there a way to delay the listener to consume messages?

In our environments the listener starts consuming messages while the application itself hasn't fully initialised itself, e.g. not all beans have been created, this can delay the overall application startup while message consumption takes place.
Another issue is, when trying to write tests with TestContainers the Listener is instantiated before TestContainers brings up the localstack.

Task List

  • Steps to reproduce provided
  • Stacktrace (if present) provided
  • Full description of the issue provided (see below)

Steps to Reproduce

  1. Listener
@JMSListener(CONNECTION_FACTORY_BEAN_NAME)
public class TransactionEventListener {

    @Queue(value = "${aws.sqs.queue.name.events}", acknowledgeMode = CLIENT_ACKNOWLEDGE, executor = SQS_EXECUTOR_BEAN_NAME)
    void receive(@Message SQSTextMessage message) throws JMSException {
    ...
    }
  1. Test setup
@JMSProducer(CONNECTION_FACTORY_BEAN_NAME)
public interface TransactionEventProducer {

    @Queue(value = "${aws.sqs.queue.name.events}")
    void send(@MessageBody String message);
}
@MicronautTest(transactional = false, environments = {"local", "test"})
@Slf4j
public class TransactionEventPipelineIntegrationTest {

    @Container
    public static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack"), false)
            .withServices(LocalStackContainer.Service.SQS);

    @Inject
    private TransactionEventProducer eventProducer;

    @Test
    void test() {
        localstack.start();
        String payload = "some message";
        eventProducer.send(payload);
    }

Expected Behaviour

Delay connection to queue and message consumption until certain criteria are met, e.g. startup checks have been performed or TestContainers has been initialised.

Actual Behaviour

Test application startup fails

TransactionEventPipelineIntegrationTest > initializationError FAILED
    io.micronaut.messaging.exceptions.MessageListenerException: Problem registering a MessageConsumer for events
        at io.micronaut.jms.listener.JMSListenerContainer.registerListener(JMSListenerContainer.java:193)
        at io.micronaut.jms.listener.JMSListenerContainerFactory.registerListener(JMSListenerContainerFactory.java:104)
        at io.micronaut.jms.configuration.AbstractJMSListenerMethodProcessor.registerListener(AbstractJMSListenerMethodProcessor.java:169)
        at io.micronaut.jms.configuration.AbstractJMSListenerMethodProcessor.process(AbstractJMSListenerMethodProcessor.java:89)
        at io.micronaut.context.DefaultBeanContext.lambda$null$31(DefaultBeanContext.java:1643)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at io.micronaut.context.DefaultBeanContext.lambda$initializeContext$32(DefaultBeanContext.java:1616)
        at java.base/java.util.HashMap.forEach(HashMap.java:1336)
        at io.micronaut.context.DefaultBeanContext.initializeContext(DefaultBeanContext.java:1614)
        at io.micronaut.context.DefaultApplicationContext.initializeContext(DefaultApplicationContext.java:234)
        at io.micronaut.context.DefaultBeanContext.readAllBeanDefinitionClasses(DefaultBeanContext.java:2905)
        at io.micronaut.context.DefaultBeanContext.start(DefaultBeanContext.java:231)
        at io.micronaut.context.DefaultApplicationContext.start(DefaultApplicationContext.java:180)
        at io.micronaut.test.extensions.AbstractMicronautExtension.startApplicationContext(AbstractMicronautExtension.java:360)
        at io.micronaut.test.extensions.AbstractMicronautExtension.beforeClass(AbstractMicronautExtension.java:239)
        at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.beforeAll(MicronautJunit5Extension.java:62)
        at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllCallbacks$8(ClassBasedTestDescriptor.java:368)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllCallbacks(ClassBasedTestDescriptor.java:368)
        at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:192)
        at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:78)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:136)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
        at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
        at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
        at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
        at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
        at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
        at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
        at com.sun.proxy.$Proxy2.stop(Unknown Source)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:135)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
        at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
        at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
        at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
        at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
        at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
        at java.base/java.lang.Thread.run(Thread.java:834)
        Caused by:
        javax.jms.JMSException: AmazonClientException: getQueueUrl.
            at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:436)
            at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.getQueueUrl(AmazonSQSMessagingClientWrapper.java:296)
            at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.getQueueUrl(AmazonSQSMessagingClientWrapper.java:265)
            at com.amazon.sqs.javamessaging.SQSSession.createQueue(SQSSession.java:636)
            at io.micronaut.jms.listener.JMSListenerContainer.lookupDestination(JMSListenerContainer.java:232)
            at io.micronaut.jms.listener.JMSListenerContainer.registerListener(JMSListenerContainer.java:170)
            ... 85 more
            Caused by:
            com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to localhost:4566 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
                at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
                at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
                at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
                at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
                at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2283)
                at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2250)
                at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2239)
                at com.amazonaws.services.sqs.AmazonSQSClient.executeGetQueueUrl(AmazonSQSClient.java:1229)
                at com.amazonaws.services.sqs.AmazonSQSClient.getQueueUrl(AmazonSQSClient.java:1198)
                at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.getQueueUrl(AmazonSQSMessagingClientWrapper.java:294)
                ... 89 more
                Caused by:
                org.apache.http.conn.HttpHostConnectException: Connect to localhost:4566 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
                    at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156)
                    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
                    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
                    at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
                    at com.amazonaws.http.conn.$Proxy69.connect(Unknown Source)
                    at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
                    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
                    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
                    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
                    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
                    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
                    at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
                    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1331)
                    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
                    ... 102 more
                    Caused by:
                    java.net.ConnectException: Connection refused (Connection refused)
                        at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
                        at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
                        at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
                        at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
                        at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
                        at java.base/java.net.Socket.connect(Socket.java:609)
                        at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
                        at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
                        ... 118 more

Environment Information

  • Operating System: Mac 11.2.3
  • Micronaut Version: 2.5.7,
  • JDK Version: openjdk version "11.0.10"
@StefanSchmidtOz
Copy link
Author

I was able to take the test in your repo https://github.com/micronaut-projects/micronaut-jms/blob/master/tests/tasks-sqs/src/test/groovy/example/TasksSpec.groovy
and translate it for our setup with JUnit 5 (https://micronaut-projects.github.io/micronaut-test/latest/guide/#junit5).

For others with the same problem:

@MicronautTest(transactional = false, environments = {"local", "test"})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)// When using TestPropertyProvider you test must be declared with JUnit’s @TestInstance(TestInstance.Lifecycle.PER_CLASS) annotation
@Slf4j
public class TransactionEventPipelineIntegrationTest {

    @Container
    public static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack"), false)
            .withServices(LocalStackContainer.Service.SQS);

    @Inject
    private TransactionEventProducer eventProducer;

    @Test
    void test() {
        String payload = "some message";
        eventProducer.send(payload);
    }

    @Override
    public Map<String, String> getProperties() {
        localstack.start();
        return Map.of(
                "aws.sqs.endpoint", localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString(),
                "aws.region", localstack.getRegion()
        );
    }

Next thing is, I'll have to create the queue before the Listener starts.
But that is also solved in your test setup (SqsClientFactory.java) and I should be able to replicate it in some form or another.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant