Skip to content

Commit

Permalink
Fix maxMessagesPerPoll for reactive poll endpoint
Browse files Browse the repository at this point in the history
The current `Flux.take()` doesn't allow an arg `< 0` treating
it as an unbound request.

* Change `take()` to `limitRequest()` according strict `MessageSource.receive()`
producing expectations
* Treat `maxMessagesPerPoll < 0` as a `Long.MAX_VALUE` for unbound requests;
`0` is treated in the `limitRequest()` as "no more requests - cancel"
* Revise `AbstractPollingEndpoint` for `LogAccessor` usage
* Add `AbstractPollingEndpoint` class JavaDocs
* Fix tests according `AbstractPollingEndpoint` changes

**Cherry-pick to `5.4.x`**
  • Loading branch information
artembilan authored and garyrussell committed Feb 1, 2021
1 parent d3ce5b5 commit ab12b6b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.aopalliance.aop.Advice;
Expand All @@ -48,6 +49,7 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.scheduling.support.SimpleTriggerContext;
Expand All @@ -65,6 +67,13 @@
import reactor.core.scheduler.Schedulers;

/**
* An {@link AbstractEndpoint} extension for Polling Consumer pattern basics.
* The standard polling logic is based on a periodic task scheduling according the provided
* {@link Trigger}.
* When this endpoint is treated as {@link #isReactive()}, a polling logic is turned into a
* {@link Flux#generate(Consumer)} and {@link Mono#delay(Duration)} combination based on the
* {@link SimpleTriggerContext} state.
*
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Gary Russell
Expand All @@ -90,8 +99,6 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement

private Trigger trigger = new PeriodicTrigger(DEFAULT_POLLING_PERIOD);

private long maxMessagesPerPoll = -1;

private ErrorHandler errorHandler;

private boolean errorHandlerIsDefault;
Expand All @@ -100,6 +107,8 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement

private TransactionSynchronizationFactory transactionSynchronizationFactory;

private volatile long maxMessagesPerPoll = -1;

private volatile Callable<Message<?>> pollingTask;

private volatile Flux<Message<?>> pollingFlux;
Expand Down Expand Up @@ -204,8 +213,8 @@ protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain) {
}
this.appliedAdvices.clear();
this.appliedAdvices.addAll(chain);
if (!(isSyncExecutor()) && logger.isWarnEnabled()) {
logger.warn(getComponentName() + ": A task executor is supplied and " + chain.size()
if (!(isSyncExecutor())) {
logger.warn(() -> getComponentName() + ": A task executor is supplied and " + chain.size()
+ "ReceiveMessageAdvice(s) is/are provided. If an advice mutates the source, such "
+ "mutations are not thread safe and could cause unexpected results, especially with "
+ "high frequency pollers. Consider using a downstream ExecutorChannel instead of "
Expand Down Expand Up @@ -261,8 +270,8 @@ protected void onInit() {
try {
super.onInit();
}
catch (Exception e) {
throw new BeanInitializationException("Cannot initialize: " + this, e);
catch (Exception ex) {
throw new BeanInitializationException("Cannot initialize: " + this, ex);
}
}

Expand All @@ -280,11 +289,9 @@ protected void doStart() {
this.pollingFlux = createFluxGenerator();
}
else {
Assert.state(getTaskScheduler() != null, "unable to start polling, no taskScheduler available");

this.runningTask =
getTaskScheduler()
.schedule(createPoller(), this.trigger);
TaskScheduler taskScheduler = getTaskScheduler();
Assert.state(taskScheduler != null, "unable to start polling, no taskScheduler available");
this.runningTask = taskScheduler.schedule(createPoller(), this.trigger);
}
}

Expand Down Expand Up @@ -360,7 +367,10 @@ private Flux<Message<?>> createFluxGenerator() {
fluxSink.complete();
}
})
.take(this.maxMessagesPerPoll)
.limitRequest(
this.maxMessagesPerPoll < 0
? Long.MAX_VALUE
: this.maxMessagesPerPoll)
.subscribeOn(Schedulers.fromExecutor(this.taskExecutor))
.doOnComplete(() ->
triggerContext
Expand Down Expand Up @@ -407,22 +417,18 @@ private Message<?> doPoll() {
try {
message = receiveMessage();
}
catch (Exception e) {
catch (Exception ex) {
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Poll interrupted - during stop()? : " + e.getMessage());
}
logger.debug(() -> "Poll interrupted - during stop()? : " + ex.getMessage());
return null;
}
else {
ReflectionUtils.rethrowRuntimeException(e);
ReflectionUtils.rethrowRuntimeException(ex);
}
}

if (message == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Received no Message during the poll, returning 'false'");
}
this.logger.debug("Received no Message during the poll, returning 'false'");
return null;
}
else {
Expand All @@ -433,9 +439,7 @@ private Message<?> doPoll() {
}

private void messageReceived(IntegrationResourceHolder holder, Message<?> message) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Poll resulted in Message: " + message);
}
this.logger.debug(() -> "Poll resulted in Message: " + message);
if (holder != null) {
holder.setMessage(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -32,12 +31,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -188,7 +189,9 @@ public void testInterrupted() throws Exception {
taskScheduler.shutdown();

verifyNoInteractions(errorHandlerLogger);
verify(adapterLogger).debug(contains("Poll interrupted - during stop()?"));
verify(adapterLogger)
.debug(ArgumentMatchers.<Supplier<String>>argThat(logMessage ->
logMessage.get().contains("Poll interrupted - during stop()?")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,6 @@
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.test.condition.LongRunningTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
Expand All @@ -52,7 +51,6 @@
*/
@SpringJUnitConfig
@DirtiesContext
@LongRunningTest
public class ReactiveInboundChannelAdapterTests {

@Autowired
Expand Down Expand Up @@ -111,7 +109,7 @@ public TaskExecutor taskExecutor() {

@Bean
@InboundChannelAdapter(value = "fluxChannel",
poller = @Poller(fixedDelay = "100", maxMessagesPerPoll = "3", taskExecutor = "taskExecutor"))
poller = @Poller(fixedDelay = "100", maxMessagesPerPoll = "-1", taskExecutor = "taskExecutor"))
public Supplier<Integer> counterMessageSupplier() {
return () -> {
int i = counter().incrementAndGet();
Expand Down

0 comments on commit ab12b6b

Please sign in to comment.