Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce 'idleReceivesPerTaskLimit' in DefaultMessageListenerContainer #26442

Merged
merged 1 commit into from
Mar 12, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe

private int maxMessagesPerTask = Integer.MIN_VALUE;

private int idleReceivesPerTaskLimit = Integer.MIN_VALUE;

private int idleConsumerLimit = 1;

private int idleTaskExecutionLimit = 1;
Expand Down Expand Up @@ -439,6 +441,44 @@ public final int getMaxMessagesPerTask() {
}
}

/**
* Return the maximum number of subsequent idle (or null) messages to receive in a single task.
*/
public int getIdleReceivesPerTaskLimit() {
synchronized (this.lifecycleMonitor) {
return idleReceivesPerTaskLimit;
}
}

/**
* Marks the consumer as 'idle' after the specified number of idle receives
* have been reached. An idle receive is counted from the moment a null message
* is returned by the receiver after the potential {@link #setReceiveTimeout(long)}
* elapsed. This gives the opportunity to check if the idle task count exceeds
* {@link #setIdleTaskExecutionLimit(int)} and based on that decide if the task needs
* to be re-scheduled or not, saving resources that would otherwise be held.
* <p> This setting differs from {@link #setMaxMessagesPerTask(int)} where the task
* is released and re-scheduled after this limit is reached, no matter if the the received
* messages were null or non-null messages. This setting alone can be inflexible if one
* desires to have a large enough batch for each task but requires a quick(er) release
* from the moment there are no more messages to process. <p> This setting differs from
* {@link #setIdleTaskExecutionLimit(int)} where this limit decides after how many iterations
* of being marked as idle, a task is released. <p> For example; if
* {@link #setMaxMessagesPerTask(int)} is set to '500' and
* {@link #setIdleReceivesPerTaskLimit(int)} is set to '60' and {@link #setReceiveTimeout(long)}
* is set to '1000' and {@link #setIdleTaskExecutionLimit(int)} is set to '1', then 500 messages
* per task would be processed unless there is a subsequent number of 60 idle messages received,
* the task would be marked as idle and released. This also means that after the last message was
* processed, the task would be released after 60seconds as long as no new messages appear.
* @param idleReceivesPerTaskLimit {@link Integer#MIN_VALUE} to disable, any value > 0 to enable releasing the
*/
public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) {
Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)");
synchronized (this.lifecycleMonitor) {
this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit;
}
}

/**
* Specify the limit for the number of consumers that are allowed to be idle
* at any given time.
Expand Down Expand Up @@ -1072,13 +1112,16 @@ public void run() {
}
boolean messageReceived = false;
try {
if (maxMessagesPerTask < 0) {
if (maxMessagesPerTask < 0 && idleReceivesPerTaskLimit < 0) {
messageReceived = executeOngoingLoop();
}
else {
int idleMessagesReceived = 0;
int messageCount = 0;
while (isRunning() && messageCount < maxMessagesPerTask) {
messageReceived = (invokeListener() || messageReceived);
while (isRunning() && (messageCount < maxMessagesPerTask) && (idleMessagesReceived < idleReceivesPerTaskLimit)) {
boolean messageReceivedThisInvocation = invokeListener();
idleMessagesReceived = messageReceivedThisInvocation ? 0 : idleMessagesReceived + 1;
messageReceived |= messageReceivedThisInvocation;
messageCount++;
}
}
Expand Down