From 7166859fc0bd40d17aea3dfbdf4258a37272f31b Mon Sep 17 00:00:00 2001 From: laststem Date: Fri, 26 Jan 2024 00:52:13 +0900 Subject: [PATCH] GH-2601: Add a batchReceiveTimeout (#2605) Fixes: #2601 Stop to waiting next message and execute listener when `batchReceiveTimeout` is timed out. * Add `batchReceiveTimeout` to the `SimpleMessageListenerContainer` configuration. --- .../config/ListenerContainerFactoryBean.java | 18 ++++++- .../SimpleRabbitListenerContainerFactory.java | 21 +++++++- .../SimpleMessageListenerContainer.java | 30 ++++++++++- .../SimpleMessageListenerContainerTests.java | 54 +++++++++++++++++++ .../ROOT/pages/amqp/containerAttributes.adoc | 14 ++++- 5 files changed, 131 insertions(+), 6 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java index a23d29ca83..0a070e1eca 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ * @author Gary Russell * @author Artem Bilan * @author Johno Crawford + * @author Jeonggi Kim * * @since 2.0 * @@ -166,6 +167,8 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean @@ -552,6 +567,7 @@ private AbstractMessageListenerContainer createContainer() { .acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger) .acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger) .acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout) + .acceptIfNotNull(this.batchReceiveTimeout, container::setBatchReceiveTimeout) .acceptIfNotNull(this.batchSize, container::setBatchSize) .acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled) .acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java index 2e8c5afdd6..baa4c975df 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ * @author Gary Russell * @author Artem Bilan * @author Dustin Schultz + * @author Jeonggi Kim * * @since 1.4 */ @@ -54,6 +55,8 @@ public class SimpleRabbitListenerContainerFactory private Long receiveTimeout; + private Long batchReceiveTimeout; + private Boolean consumerBatchEnabled; /** @@ -121,6 +124,19 @@ public void setReceiveTimeout(Long receiveTimeout) { this.receiveTimeout = receiveTimeout; } + /** + * The number of milliseconds of timeout for gathering batch messages. + * It limits the time to wait to fill batchSize. + * Default is 0 (no timeout). + * @param batchReceiveTimeout the timeout for gathering batch messages. + * @since 3.1.2 + * @see SimpleMessageListenerContainer#setBatchReceiveTimeout + * @see #setBatchSize(Integer) + */ + public void setBatchReceiveTimeout(Long batchReceiveTimeout) { + this.batchReceiveTimeout = batchReceiveTimeout; + } + /** * Set to true to present a list of messages based on the {@link #setBatchSize(Integer)}, * if the listener supports it. Starting with version 3.0, setting this to true will @@ -163,7 +179,8 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb .acceptIfNotNull(this.stopConsumerMinInterval, instance::setStopConsumerMinInterval) .acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger) .acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger) - .acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout); + .acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout) + .acceptIfNotNull(this.batchReceiveTimeout, instance::setBatchReceiveTimeout); if (Boolean.TRUE.equals(this.consumerBatchEnabled)) { instance.setConsumerBatchEnabled(true); /* diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 9007bfda0d..737ee14716 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,6 +80,7 @@ * @author Mat Jaggard * @author Yansong Ren * @author Tim Bourquin + * @author Jeonggi Kim * * @since 1.0 */ @@ -121,6 +122,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; + private long batchReceiveTimeout; + private Set consumers; private Integer declarationRetries; @@ -330,6 +333,19 @@ public void setReceiveTimeout(long receiveTimeout) { this.receiveTimeout = receiveTimeout; } + /** + * The number of milliseconds of timeout for gathering batch messages. + * It limits the time to wait to fill batchSize. + * Default is 0 (no timeout). + * @param batchReceiveTimeout the timeout for gathering batch messages. + * @since 3.1.2 + * @see #setBatchSize(int) + */ + public void setBatchReceiveTimeout(long batchReceiveTimeout) { + Assert.isTrue(batchReceiveTimeout >= 0, "'batchReceiveTimeout' must be >= 0"); + this.batchReceiveTimeout = batchReceiveTimeout; + } + /** * This property has several functions. *

@@ -996,8 +1012,18 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep List messages = null; long deliveryTag = 0; - + boolean isBatchReceiveTimeoutEnabled = this.batchReceiveTimeout > 0; + long startTime = isBatchReceiveTimeoutEnabled ? System.currentTimeMillis() : 0; for (int i = 0; i < this.batchSize; i++) { + boolean batchTimedOut = isBatchReceiveTimeoutEnabled && + (System.currentTimeMillis() - startTime) > this.batchReceiveTimeout; + if (batchTimedOut) { + if (logger.isTraceEnabled()) { + long gathered = messages != null ? messages.size() : 0; + logger.trace("Timed out for gathering batch messages. gathered size is " + gathered); + } + break; + } logger.trace("Waiting for message from consumer."); Message message = consumer.nextMessage(this.receiveTimeout); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java index 7044f5ee6c..26c72b14e3 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java @@ -110,6 +110,7 @@ * @author Mohammad Hewedy * @author Yansong Ren * @author Tim Bourquin + * @author Jeonggi Kim */ public class SimpleMessageListenerContainerTests { @@ -784,6 +785,59 @@ void testWithConsumerStartWhenNotActive() { assertThat(start.getCount()).isEqualTo(0L); } + @Test + public void testBatchReceiveTimedOut() throws Exception { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + Connection connection = mock(Connection.class); + Channel channel = mock(Channel.class); + given(connectionFactory.createConnection()).willReturn(connection); + given(connection.createChannel(false)).willReturn(channel); + final AtomicReference consumer = new AtomicReference<>(); + willAnswer(invocation -> { + consumer.set(invocation.getArgument(6)); + consumer.get().handleConsumeOk("1"); + return "1"; + }).given(channel) + .basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(), + any(Consumer.class)); + final CountDownLatch latch = new CountDownLatch(2); + willAnswer(invocation -> { + latch.countDown(); + return null; + }).given(channel).basicAck(anyLong(), anyBoolean()); + + final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setAfterReceivePostProcessors(msg -> null); + container.setQueueNames("foo"); + MessageListener listener = mock(BatchMessageListener.class); + container.setMessageListener(listener); + container.setBatchSize(3); + container.setConsumerBatchEnabled(true); + container.setReceiveTimeout(10); + container.setBatchReceiveTimeout(20); + container.start(); + + BasicProperties props = new BasicProperties(); + byte[] payload = "baz".getBytes(); + Envelope envelope = new Envelope(1L, false, "foo", "bar"); + consumer.get().handleDelivery("1", envelope, props, payload); + envelope = new Envelope(2L, false, "foo", "bar"); + consumer.get().handleDelivery("1", envelope, props, payload); + // waiting for batch receive timed out + Thread.sleep(20); + envelope = new Envelope(3L, false, "foo", "bar"); + consumer.get().handleDelivery("1", envelope, props, payload); + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(channel, never()).basicAck(eq(1), anyBoolean()); + verify(channel).basicAck(2, true); + verify(channel, never()).basicAck(eq(2), anyBoolean()); + verify(channel).basicAck(3, true); + container.stop(); + verify(listener).containerAckMode(AcknowledgeMode.AUTO); + verify(listener).isAsyncReplies(); + verifyNoMoreInteractions(listener); + } + private Answer messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container, final boolean cancel, final CountDownLatch latch) { return invocation -> { diff --git a/src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc b/src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc index 4f9d827f33..9aec7d7c9b 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/containerAttributes.adoc @@ -198,7 +198,7 @@ a| |[[consumerBatchEnabled]]<> + (batch-enabled) -|If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout`. +|If the `MessageListener` supports it, setting this to true enables batching of discrete messages, up to `batchSize`; a partial batch will be delivered if no new messages arrive in `receiveTimeout` or gathering batch messages time exceeded `batchReceiveTimeout`. When this is false, batching is only supported for batches created by a producer; see xref:amqp/sending-messages.adoc#template-batching[Batching]. a|image::tickmark.png[] @@ -611,6 +611,18 @@ a|image::tickmark.png[] a| a| +|[[batchReceiveTimeout]]<> + +(batch-receive-timeout) + +|The number of milliseconds of timeout for gathering batch messages. +It limits the time to wait to fill batchSize. +When `batchSize > 1` and the time to gathering batch messages is greater than `batchReceiveTime`, batch will be delivered. +Default is 0 (no timeout). + +a|image::tickmark.png[] +a| +a| + |[[recoveryBackOff]]<> + (recovery-back-off)