diff --git a/framework-docs/modules/ROOT/pages/web/webflux/config.adoc b/framework-docs/modules/ROOT/pages/web/webflux/config.adoc index 10e87907b6dc..144a65de46b6 100644 --- a/framework-docs/modules/ROOT/pages/web/webflux/config.adoc +++ b/framework-docs/modules/ROOT/pages/web/webflux/config.adoc @@ -700,9 +700,8 @@ Java:: @Override public void configurePathMatch(PathMatchConfigurer configurer) { - configurer - .setUseCaseSensitiveMatch(true) - .addPathPrefix("/api", HandlerTypePredicate.forAnnotation(RestController.class)); + configurer.addPathPrefix( + "/api", HandlerTypePredicate.forAnnotation(RestController.class)); } } ---- @@ -717,9 +716,8 @@ Kotlin:: @Override fun configurePathMatch(configurer: PathMatchConfigurer) { - configurer - .setUseCaseSensitiveMatch(true) - .addPathPrefix("/api", HandlerTypePredicate.forAnnotation(RestController::class.java)) + configurer.addPathPrefix( + "/api", HandlerTypePredicate.forAnnotation(RestController::class.java)) } } ---- @@ -740,6 +738,59 @@ reliance on it. + +[[webflux-config-blocking-execution]] +== Blocking Execution + +The WebFlux Java config lets you to customize blocking execution in WebFlux. + +You can have blocking controller methods called on a separate thread by providing +an `Executor` such as the +{api-spring-framework}/core/task/VirtualThreadTaskExecutor.html[`VirtualThreadTaskExecutor`] +as follows: + +[tabs] +====== +Java:: ++ +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +---- + @Configuration + @EnableWebFlux + public class WebConfig implements WebFluxConfigurer { + + @Override + public void configureBlockingExecution(BlockingExecutionConfigurer configurer) { + Executor executor = ... + configurer.setExecutor(executor); + } + } +---- + +Kotlin:: ++ +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +---- + @Configuration + @EnableWebFlux + class WebConfig : WebFluxConfigurer { + + @Override + fun configureBlockingExecution(configurer: BlockingExecutionConfigurer) { + val executor = ... + configurer.setExecutor(executor) + } + } +---- +====== + +By default, controller methods whose return type is not recognized by the configured +`ReactiveAdapterRegistry` are considered blocking, but you can set a custom controller +method predicate via `BlockingExecutionConfigurer`. + + + + [[webflux-config-websocket-service]] == WebSocketService diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultControllerSpec.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultControllerSpec.java index a120acf622e1..900a985c25a4 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultControllerSpec.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultControllerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.springframework.util.ObjectUtils; import org.springframework.validation.Validator; import org.springframework.web.reactive.accept.RequestedContentTypeResolverBuilder; +import org.springframework.web.reactive.config.BlockingExecutionConfigurer; import org.springframework.web.reactive.config.CorsRegistry; import org.springframework.web.reactive.config.DelegatingWebFluxConfiguration; import org.springframework.web.reactive.config.PathMatchConfigurer; @@ -122,6 +123,11 @@ public DefaultControllerSpec viewResolvers(Consumer consum return this; } + @Override + public WebTestClient.ControllerSpec blockingExecution(Consumer consumer) { + this.configurer.executionConsumer = consumer; + return this; + } @Override protected WebHttpHandlerBuilder initHttpHandlerBuilder() { @@ -145,7 +151,7 @@ private ApplicationContext initApplicationContext() { } - private class TestWebFluxConfigurer implements WebFluxConfigurer { + private static class TestWebFluxConfigurer implements WebFluxConfigurer { @Nullable private Consumer contentTypeResolverConsumer; @@ -171,6 +177,9 @@ private class TestWebFluxConfigurer implements WebFluxConfigurer { @Nullable private Consumer viewResolversConsumer; + @Nullable + private Consumer executionConsumer; + @Override public void configureContentTypeResolver(RequestedContentTypeResolverBuilder builder) { if (this.contentTypeResolverConsumer != null) { @@ -225,6 +234,13 @@ public void configureViewResolvers(ViewResolverRegistry registry) { this.viewResolversConsumer.accept(registry); } } + + @Override + public void configureBlockingExecution(BlockingExecutionConfigurer configurer) { + if (this.executionConsumer != null) { + this.executionConsumer.accept(configurer); + } + } } } diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java index e9e824aeb76e..5b264b1c9de5 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import org.springframework.util.MultiValueMap; import org.springframework.validation.Validator; import org.springframework.web.reactive.accept.RequestedContentTypeResolverBuilder; +import org.springframework.web.reactive.config.BlockingExecutionConfigurer; import org.springframework.web.reactive.config.CorsRegistry; import org.springframework.web.reactive.config.PathMatchConfigurer; import org.springframework.web.reactive.config.ViewResolverRegistry; @@ -353,6 +354,13 @@ interface ControllerSpec extends MockServerSpec { * @see WebFluxConfigurer#configureViewResolvers */ ControllerSpec viewResolvers(Consumer consumer); + + /** + * Configure blocking execution options. + * @since 6.1 + * @see WebFluxConfigurer#configureBlockingExecution + */ + ControllerSpec blockingExecution(Consumer consumer); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/config/BlockingExecutionConfigurer.java b/spring-webflux/src/main/java/org/springframework/web/reactive/config/BlockingExecutionConfigurer.java new file mode 100644 index 000000000000..c04cfec6cc43 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/config/BlockingExecutionConfigurer.java @@ -0,0 +1,76 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.config; + +import java.util.function.Predicate; + +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.lang.Nullable; +import org.springframework.web.method.HandlerMethod; + +/** + * Helps to configure options related to blocking execution in WebFlux. + * + * @author Rossen Stoyanchev + * @since 6.1 + */ +public class BlockingExecutionConfigurer { + + @Nullable + private AsyncTaskExecutor executor; + + @Nullable + private Predicate blockingControllerMethodPredicate; + + + /** + * Configure an executor to invoke blocking controller methods with. + *

By default, this is not set in which case controller methods are + * invoked without the use of an Executor. + * @param executor the task executor to use + */ + public BlockingExecutionConfigurer setExecutor(AsyncTaskExecutor executor) { + this.executor = executor; + return this; + } + + /** + * Configure a predicate to decide if a controller method is blocking and + * should be called on a separate thread if an executor is + * {@link #setExecutor configured}. + *

The default predicate matches controller methods whose return type is + * not recognized by the configured + * {@link org.springframework.core.ReactiveAdapterRegistry}. + * @param predicate the predicate to use + */ + public BlockingExecutionConfigurer setControllerMethodPredicate(Predicate predicate) { + this.blockingControllerMethodPredicate = predicate; + return this; + } + + + @Nullable + protected AsyncTaskExecutor getExecutor() { + return this.executor; + } + + @Nullable + protected Predicate getBlockingControllerMethodPredicate() { + return this.blockingControllerMethodPredicate; + } + +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/config/DelegatingWebFluxConfiguration.java b/spring-webflux/src/main/java/org/springframework/web/reactive/config/DelegatingWebFluxConfiguration.java index 5db8000d3c4a..e657b87c0b44 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/config/DelegatingWebFluxConfiguration.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/config/DelegatingWebFluxConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -110,4 +110,8 @@ protected void configureViewResolvers(ViewResolverRegistry registry) { this.configurers.configureViewResolvers(registry); } + @Override + protected void configureBlockingExecution(BlockingExecutionConfigurer configurer) { + this.configurers.configureBlockingExecution(configurer); + } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java b/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java index dfcee4b21ca0..ac71dc84675d 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurationSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,6 +95,9 @@ public class WebFluxConfigurationSupport implements ApplicationContextAware { @Nullable private PathMatchConfigurer pathMatchConfigurer; + @Nullable + private BlockingExecutionConfigurer blockingExecutionConfigurer; + @Nullable private ViewResolverRegistry viewResolverRegistry; @@ -282,6 +285,14 @@ public RequestMappingHandlerAdapter requestMappingHandlerAdapter( adapter.setWebBindingInitializer(getConfigurableWebBindingInitializer(conversionService, validator)); adapter.setReactiveAdapterRegistry(reactiveAdapterRegistry); + BlockingExecutionConfigurer executorConfigurer = getBlockingExecutionConfigurer(); + if (executorConfigurer.getExecutor() != null) { + adapter.setBlockingExecutor(executorConfigurer.getExecutor()); + } + if (executorConfigurer.getBlockingControllerMethodPredicate() != null) { + adapter.setBlockingMethodPredicate(executorConfigurer.getBlockingControllerMethodPredicate()); + } + ArgumentResolverConfigurer configurer = new ArgumentResolverConfigurer(); configureArgumentResolvers(configurer); adapter.setArgumentResolverConfigurer(configurer); @@ -419,6 +430,27 @@ protected MessageCodesResolver getMessageCodesResolver() { return null; } + /** + * Callback to build and cache the {@link BlockingExecutionConfigurer}. + * This method is final, but subclasses can override + * {@link #configureBlockingExecution}. + * @since 6.1 + */ + protected final BlockingExecutionConfigurer getBlockingExecutionConfigurer() { + if (this.blockingExecutionConfigurer == null) { + this.blockingExecutionConfigurer = new BlockingExecutionConfigurer(); + configureBlockingExecution(this.blockingExecutionConfigurer); + } + return this.blockingExecutionConfigurer; + } + + /** + * Override this method to configure blocking execution. + * @since 6.1 + */ + protected void configureBlockingExecution(BlockingExecutionConfigurer configurer) { + } + @Bean public HandlerFunctionAdapter handlerFunctionAdapter() { return new HandlerFunctionAdapter(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurer.java b/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurer.java index b459b564cdb4..6ee2c68c2aa8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurer.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -155,4 +155,11 @@ default WebSocketService getWebSocketService() { default void configureViewResolvers(ViewResolverRegistry registry) { } + /** + * Configure settings related to blocking execution in WebFlux. + * @since 6.1 + */ + default void configureBlockingExecution(BlockingExecutionConfigurer configurer) { + } + } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurerComposite.java b/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurerComposite.java index 67487b32390b..866dabe48b29 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurerComposite.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/config/WebFluxConfigurerComposite.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -106,6 +106,11 @@ public void configureViewResolvers(ViewResolverRegistry registry) { this.delegates.forEach(delegate -> delegate.configureViewResolvers(registry)); } + @Override + public void configureBlockingExecution(BlockingExecutionConfigurer configurer) { + this.delegates.forEach(delegate -> delegate.configureBlockingExecution(configurer)); + } + @Nullable private T createSingleBean(Function factory, Class beanType) { List result = this.delegates.stream().map(factory).filter(Objects::nonNull).toList(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestMappingHandlerAdapter.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestMappingHandlerAdapter.java index 5d35eff509da..6a1130c09c47 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestMappingHandlerAdapter.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestMappingHandlerAdapter.java @@ -19,10 +19,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.Predicate; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; @@ -66,6 +70,12 @@ public class RequestMappingHandlerAdapter @Nullable private ArgumentResolverConfigurer argumentResolverConfigurer; + @Nullable + private Scheduler scheduler; + + @Nullable + private Predicate blockingMethodPredicate; + @Nullable private ReactiveAdapterRegistry reactiveAdapterRegistry; @@ -126,6 +136,30 @@ public ArgumentResolverConfigurer getArgumentResolverConfigurer() { return this.argumentResolverConfigurer; } + /** + * Configure an executor to invoke blocking controller methods with. + *

By default, this is not set in which case controller methods are + * invoked without the use of an Executor. + * @param executor the task executor to use + * @since 6.1 + */ + public void setBlockingExecutor(@Nullable Executor executor) { + this.scheduler = (executor != null ? Schedulers.fromExecutor(executor) : null); + } + + /** + * Provide a predicate to decide which controller methods to invoke through + * the configured {@link #setBlockingExecutor blockingExecutor}. + *

If an executor is configured, the default predicate matches controller + * methods whose return type not recognized by the configured + * {@link org.springframework.core.ReactiveAdapterRegistry}. + * @param predicate the predicate to use + * @since 6.1 + */ + public void setBlockingMethodPredicate(Predicate predicate) { + this.blockingMethodPredicate = predicate; + } + /** * Configure the registry for adapting various reactive types. *

By default this is an instance of {@link ReactiveAdapterRegistry} with @@ -164,13 +198,19 @@ public void afterPropertiesSet() throws Exception { ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create(); this.messageReaders = codecConfigurer.getReaders(); } + if (this.argumentResolverConfigurer == null) { this.argumentResolverConfigurer = new ArgumentResolverConfigurer(); } + if (this.reactiveAdapterRegistry == null) { this.reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance(); } + if (this.scheduler != null && this.blockingMethodPredicate == null) { + this.blockingMethodPredicate = new NonReactiveHandlerMethodPredicate(this.reactiveAdapterRegistry); + } + this.methodResolver = new ControllerMethodResolver( this.argumentResolverConfigurer, this.reactiveAdapterRegistry, this.applicationContext, this.messageReaders, this.webBindingInitializer); @@ -202,11 +242,20 @@ public Mono handle(ServerWebExchange exchange, Object handler) { DispatchExceptionHandler exceptionHandler = (exchange2, ex) -> handleException(exchange, ex, handlerMethod, bindingContext); - return this.modelInitializer + Mono resultMono = this.modelInitializer .initModel(handlerMethod, bindingContext, exchange) .then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext))) .doOnNext(result -> result.setExceptionHandler(exceptionHandler)) .onErrorResume(ex -> exceptionHandler.handleError(exchange, ex)); + + if (this.scheduler != null) { + Assert.state(this.blockingMethodPredicate != null, "Expected HandlerMethod Predicate"); + if (this.blockingMethodPredicate.test(handlerMethod)) { + resultMono = resultMono.subscribeOn(this.scheduler); + } + } + + return resultMono; } private Mono handleException( @@ -264,4 +313,18 @@ public Mono handleError(ServerWebExchange exchange, Throwable ex) return handleException(exchange, ex, null, null); } + + /** + * Match methods with a return type without an adapter in {@link ReactiveAdapterRegistry}. + */ + private record NonReactiveHandlerMethodPredicate(ReactiveAdapterRegistry adapterRegistry) + implements Predicate { + + @Override + public boolean test(HandlerMethod handlerMethod) { + Class returnType = handlerMethod.getReturnType().getParameterType(); + return (this.adapterRegistry.getAdapter(returnType) == null); + } + } + } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java index 7f879e586522..b650ebe8850b 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java @@ -18,6 +18,8 @@ import java.net.URI; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import org.reactivestreams.Publisher; @@ -25,6 +27,8 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.http.HttpHeaders; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; @@ -33,7 +37,10 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.method.HandlerMethod; +import org.springframework.web.reactive.config.BlockingExecutionConfigurer; import org.springframework.web.reactive.config.EnableWebFlux; +import org.springframework.web.reactive.config.WebFluxConfigurer; import org.springframework.web.server.adapter.ForwardedHeaderTransformer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; @@ -70,6 +77,9 @@ void emptyMapping(HttpServer httpServer) throws Exception { url += "/"; assertThat(getRestTemplate().getForObject(url, String.class)).isEqualTo("root"); + + assertThat(getApplicationContext().getBean(TestExecutor.class).invocationCount.get()).isEqualTo(2); + assertThat(getApplicationContext().getBean(TestPredicate.class).invocationCount.get()).isEqualTo(2); } @ParameterizedHttpServerTest @@ -109,7 +119,34 @@ void stream(HttpServer httpServer) throws Exception { @Configuration @EnableWebFlux - static class WebConfig { + static class WebConfig implements WebFluxConfigurer { + + @Override + public void configureBlockingExecution(BlockingExecutionConfigurer configurer) { + configurer.setExecutor(executor()); + configurer.setControllerMethodPredicate(predicate()); + } + + @Bean + TestExecutor executor() { + return new TestExecutor(); + } + + @Bean + TestPredicate predicate() { + return new TestPredicate(); + } + + } + + + @Configuration + static class LocalConfig { + + @Bean + public ForwardedHeaderTransformer forwardedHeaderTransformer() { + return new ForwardedHeaderTransformer(); + } } @@ -145,12 +182,27 @@ public Publisher stream() { } - @Configuration - static class LocalConfig { + private static class TestExecutor implements AsyncTaskExecutor { - @Bean - public ForwardedHeaderTransformer forwardedHeaderTransformer() { - return new ForwardedHeaderTransformer(); + private final AtomicInteger invocationCount = new AtomicInteger(); + + @Override + public void execute(Runnable task) { + this.invocationCount.incrementAndGet(); + task.run(); + } + } + + + private static class TestPredicate implements Predicate { + + private final AtomicInteger invocationCount = new AtomicInteger(); + + @Override + public boolean test(HandlerMethod handlerMethod) { + this.invocationCount.incrementAndGet(); + Class returnType = handlerMethod.getReturnType().getParameterType(); + return (ReactiveAdapterRegistry.getSharedInstance().getAdapter(returnType) == null); } }