Skip to content

Commit

Permalink
GH-9524: Expose SourcePollingChannelAdapterSpec.taskScheduler
Browse files Browse the repository at this point in the history
Fixes: #9524
Issue link: #9524

It is useful in some use-cases to be able to inject a custom `TaskScheduler`
(e.g. with a `TaskDecorator`) into a source polling channel adapter.

* Add `SourcePollingChannelAdapterFactoryBean.setTaskScheduler()`
 and call it from the `SourcePollingChannelAdapterSpec.taskScheduler()`
* Fix JavaDocs typos in the `ConsumerEndpointSpec`
* Test custom `TaskScheduler` usage and mention new option in the `whats-new.adoc`
  • Loading branch information
artembilan committed Oct 14, 2024
1 parent be1156a commit d5ab03c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.core.BeanFactoryMessageChannelDestinationResolver;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

Expand Down Expand Up @@ -75,6 +76,8 @@ public class SourcePollingChannelAdapterFactoryBean implements FactoryBean<Sourc

private String role;

private TaskScheduler taskScheduler;

private volatile SourcePollingChannelAdapter adapter;

private volatile boolean initialized;
Expand Down Expand Up @@ -111,6 +114,15 @@ public void setRole(String role) {
this.role = role;
}

/**
* Set a {@link TaskScheduler} for polling tasks.
* @param taskScheduler the {@link TaskScheduler} for polling tasks.
* @since 6.4
*/
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}

/**
* Specify the {@link DestinationResolver} strategy to use.
* The default is a BeanFactoryChannelResolver.
Expand Down Expand Up @@ -208,6 +220,9 @@ private void initializeAdapter() {
spca.setBeanName(this.beanName);
spca.setBeanFactory(this.beanFactory);
spca.setTransactionSynchronizationFactory(this.pollerMetadata.getTransactionSynchronizationFactory());
if (this.taskScheduler != null) {
spca.setTaskScheduler(this.taskScheduler);
}
spca.afterPropertiesSet();
this.adapter = spca;
this.initialized = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,7 +128,7 @@ public S role(String role) {

/**
* Configure a {@link TaskScheduler} for scheduling tasks, for example in the
* Polling Consumer. By default the global {@code ThreadPoolTaskScheduler} bean is used.
* Polling Consumer. By default, the global {@code ThreadPoolTaskScheduler} bean is used.
* This configuration is useful when there are requirements to dedicate particular threads
* for polling task, for example.
* @param taskScheduler the {@link TaskScheduler} to use.
Expand All @@ -144,7 +144,7 @@ public S taskScheduler(TaskScheduler taskScheduler) {
/**
* Configure a list of {@link MethodInterceptor} objects to be applied, in nested order, to the
* endpoint's handler. The advice objects are applied to the {@code handleMessage()} method
* and therefore to the whole sub-flow afterwards.
* and therefore to the whole sub-flow afterward.
* @param interceptors the advice chain.
* @return the endpoint spec.
* @since 5.3
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;

/**
* @author Artem Bilan
Expand Down Expand Up @@ -60,4 +61,15 @@ public SourcePollingChannelAdapterSpec role(String role) {
return this;
}

/**
* Set a {@link TaskScheduler} for polling tasks.
* @param taskScheduler the {@link TaskScheduler} for polling tasks.
* @return the spec
* @since 6.4
*/
public SourcePollingChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) {
this.endpointFactoryBean.setTaskScheduler(taskScheduler);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.integration.transformer.PayloadSerializingTransformer;
import org.springframework.integration.util.NoBeansOverrideAnnotationConfigContextLoader;
import org.springframework.messaging.Message;
Expand All @@ -94,6 +95,7 @@
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -187,10 +189,15 @@ public class IntegrationFlowTests {
@Autowired
AbstractEndpoint stringSupplierEndpoint;

@Autowired
TaskScheduler customScheduler;

@Test
public void testWithSupplierMessageSourceImpliedPoller() {
assertThat(this.stringSupplierEndpoint.isAutoStartup()).isFalse();
assertThat(this.stringSupplierEndpoint.isRunning()).isFalse();
assertThat(TestUtils.getPropertyValue(this.stringSupplierEndpoint, "taskScheduler"))
.isSameAs(this.customScheduler);
this.stringSupplierEndpoint.start();
assertThat(this.suppliedChannel.receive(10000).getPayload()).isEqualTo("FOO");
}
Expand Down Expand Up @@ -569,8 +576,14 @@ public Supplier<String> stringSupplier() {
}

@Bean
public IntegrationFlow supplierFlow() {
return IntegrationFlow.fromSupplier(stringSupplier(), c -> c.id("stringSupplierEndpoint"))
public TaskScheduler customScheduler() {
return new SimpleAsyncTaskScheduler();
}

@Bean
public IntegrationFlow supplierFlow(TaskScheduler customScheduler) {
return IntegrationFlow.fromSupplier(stringSupplier(),
c -> c.id("stringSupplierEndpoint").taskScheduler(customScheduler))
.transform(toUpperCaseFunction())
.channel("suppliedChannel")
.get();
Expand Down
2 changes: 2 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ See xref:spel.adoc[SpEL Support] for more information.
[[x6.4-general]]
=== General Changes

The Java DSL `SourcePollingChannelAdapterSpec` can now be configured with a custom `TaskScheduler`

[[x6.4-remote-files-changes]]
=== Remote File Adapters Changes

Expand Down

0 comments on commit d5ab03c

Please sign in to comment.