Skip to content

Commit

Permalink
spring-projectsGH-1032: Add consumer-side batching support
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jun 24, 2019
1 parent c1e3179 commit 65d3c70
Show file tree
Hide file tree
Showing 22 changed files with 431 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.core;

import java.util.List;

/**
* Used to receive a batch of messages if the container supports it.
*
* @author Gary Russell
* @since 2.2
*
*/
public interface BatchMessageListener extends MessageListener {

@Override
default void onMessage(Message message) {
throw new UnsupportedOperationException("Should never be called by the container");
}

@Override
void onMessageBatch(List<Message> messages);


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.amqp.core;

import java.util.List;

/**
* Listener interface to receive asynchronous delivery of Amqp Messages.
*
Expand All @@ -25,6 +27,10 @@
@FunctionalInterface
public interface MessageListener {

/**
* Delivers a single message.
* @param message the message.
*/
void onMessage(Message message);

/**
Expand All @@ -37,4 +43,13 @@ default void containerAckMode(AcknowledgeMode mode) {
// NOSONAR - empty
}

/**
* Delivers a batch of messages.
* @param messages the messages.
* @since 2.2
*/
default void onMessageBatch(List<Message> messages) {
throw new UnsupportedOperationException("Should never be called by the container");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,14 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMe

private Long receiveTimeout;

private Integer txSize;
private Integer batchSize;

private Integer declarationRetries;

private Long retryDeclarationInterval;

private Boolean consumerBatchEnabled;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -366,8 +368,17 @@ public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

@Deprecated
public void setTxSize(int txSize) {
this.txSize = txSize;
setBatchSize(txSize);
}

protected void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
this.consumerBatchEnabled = consumerBatchEnabled;
}

public void setDeclarationRetries(int declarationRetries) {
Expand All @@ -380,8 +391,9 @@ public void setRetryDeclarationInterval(long retryDeclarationInterval) {

@Override
public Class<?> getObjectType() {
return this.listenerContainer == null ? AbstractMessageListenerContainer.class : this.listenerContainer
.getClass();
return this.listenerContainer == null ? AbstractMessageListenerContainer.class
: this.listenerContainer
.getClass();
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -446,7 +458,8 @@ private AbstractMessageListenerContainer createContainer() {
.acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger)
.acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger)
.acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout)
.acceptIfNotNull(this.txSize, container::setTxSize)
.acceptIfNotNull(this.batchSize, container::setBatchSize)
.acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled)
.acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries)
.acceptIfNotNull(this.retryDeclarationInterval, container::setRetryDeclarationInterval);
return container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
super.initializeContainer(instance, endpoint);

JavaUtils javaUtils = JavaUtils.INSTANCE
.acceptIfNotNull(this.txSize, instance::setTxSize);
.acceptIfNotNull(this.txSize, instance::setBatchSize);
String concurrency = null;
if (endpoint != null) {
concurrency = endpoint.getConcurrency();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessagePostProcessor;
Expand All @@ -53,6 +54,7 @@
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessagelistener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
Expand Down Expand Up @@ -1059,6 +1061,14 @@ public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
this.batchingStrategy = batchingStrategy;
}

protected Collection<MessagePostProcessor> getAfterReceivePostProcessors() {
return this.afterReceivePostProcessors;
}

protected void setAfterReceivePostProcessors(Collection<MessagePostProcessor> afterReceivePostProcessors) {
this.afterReceivePostProcessors = afterReceivePostProcessors;
}

/**
* Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
*/
Expand Down Expand Up @@ -1321,70 +1331,84 @@ protected void invokeErrorHandler(Throwable ex) {
/**
* Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
* @param channel the Rabbit Channel to operate on
* @param messageIn the received Rabbit Message
* @param data the received Rabbit Message
* @see #invokeListener
* @see #handleListenerException
*/
protected void executeListener(Channel channel, Message messageIn) {
@SuppressWarnings("unchecked")
protected void executeListener(Channel channel, Object data) {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
logger.warn(
"Rejecting received message(s) because the listener container has been stopped: " + data);
}
throw new MessageRejectedWhileStoppingException();
}
try {
doExecuteListener(channel, messageIn);
doExecuteListener(channel, data);
}
catch (RuntimeException ex) {
if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
Message message;
if (data instanceof Message) {
message = (Message) data;
}
else {
message = ((List<Message>) data).get(0);
}
if (message.getMessageProperties().isFinalRetryForMessageWithNoId()) {
if (this.statefulRetryFatalWithNullMessageId) {
throw new FatalListenerExecutionException(
"Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);
"Illegal null id in message. Failed to manage retry for message: " + message, ex);
}
else {
throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",
new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),
messageIn);
message);
}
}
handleListenerException(ex);
throw ex;
}
}

private void doExecuteListener(Channel channel, Message messageIn) {
Message message = messageIn;
if (this.afterReceivePostProcessors != null) {
for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
message = processor.postProcessMessage(message);
if (message == null) {
throw new ImmediateAcknowledgeAmqpException(
"Message Post Processor returned 'null', discarding message");
private void doExecuteListener(Channel channel, Object data) {
if (data instanceof Message) {
Message message = (Message) data;
if (this.afterReceivePostProcessors != null) {
for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
message = processor.postProcessMessage(message);
if (message == null) {
throw new ImmediateAcknowledgeAmqpException(
"Message Post Processor returned 'null', discarding message");
}
}
}
}
if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) {
this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment));
if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) {
this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment));
}
else {
invokeListener(channel, message);
}
}
else {
invokeListener(channel, message);
invokeListener(channel, data);
}
}

protected void invokeListener(Channel channel, Message message) {
this.proxy.invokeListener(channel, message);
protected void invokeListener(Channel channel, Object data) {
this.proxy.invokeListener(channel, data);
}

/**
* Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
* @param channel the Rabbit Channel to operate on
* @param message the received Rabbit Message
* @param data the received Rabbit Message or List of Message.
* @see #setMessageListener(MessageListener)
*/
protected void actualInvokeListener(Channel channel, Message message) {
protected void actualInvokeListener(Channel channel, Object data) {
Object listener = getMessageListener();
if (listener instanceof ChannelAwareMessageListener) {
doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
doInvokeListener((ChannelAwareMessageListener) listener, channel, data);
}
else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
Expand All @@ -1395,7 +1419,7 @@ else if (listener instanceof MessageListener) {
resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, message);
doInvokeListener((MessageListener) listener, data);
}
finally {
if (bindChannel) {
Expand All @@ -1419,12 +1443,14 @@ else if (listener != null) {
* An exception thrown from the listener will be wrapped in a {@link ListenerExecutionFailedException}.
* @param listener the Spring ChannelAwareMessageListener to invoke
* @param channel the Rabbit Channel to operate on
* @param message the received Rabbit Message
* @param data the received Rabbit Message or List of Message.
* @see ChannelAwareMessageListener
* @see #setExposeListenerChannel(boolean)
*/
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message) {
@SuppressWarnings("unchecked")
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Object data) {

Message message = null;
RabbitResourceHolder resourceHolder = null;
Channel channelToUse = channel;
boolean boundHere = false;
Expand Down Expand Up @@ -1458,7 +1484,13 @@ protected void doInvokeListener(ChannelAwareMessageListener listener, Channel ch
}
// Actually invoke the message listener...
try {
listener.onMessage(message, channelToUse);
if (data instanceof List) {
((ChannelAwareBatchMessagelistener) listener).onMessageBatch((List<Message>) data, channelToUse);
}
else {
message = (Message) data;
listener.onMessage(message, channelToUse);
}
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
Expand Down Expand Up @@ -1500,13 +1532,21 @@ private void cleanUpAfterInvoke(@Nullable RabbitResourceHolder resourceHolder, C
* Exception thrown from listener will be wrapped to {@link ListenerExecutionFailedException}.
*
* @param listener the Rabbit MessageListener to invoke
* @param message the received Rabbit Message
* @param data the received Rabbit Message or List of Message.
*
* @see org.springframework.amqp.core.MessageListener#onMessage
*/
protected void doInvokeListener(MessageListener listener, Message message) {
@SuppressWarnings("unchecked")
protected void doInvokeListener(MessageListener listener, Object data) {
Message message = null;
try {
listener.onMessage(message);
if (listener instanceof BatchMessageListener) {
listener.onMessageBatch((List<Message>) data);
}
else {
message = (Message) data;
listener.onMessage(message);
}
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
Expand Down Expand Up @@ -1789,7 +1829,7 @@ private void checkPossibleAuthenticationFailureFatalFromProperty() {
@FunctionalInterface
private interface ContainerDelegate {

void invokeListener(Channel channel, Message message);
void invokeListener(Channel channel, Object data);

}

Expand Down
Loading

0 comments on commit 65d3c70

Please sign in to comment.