diff --git a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java index 24ac179c47df..f92e69c2f628 100644 --- a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java +++ b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java @@ -107,6 +107,28 @@ void cacheHitDetermination(Class configClass) { } + @ParameterizedTest + @ValueSource(classes = {AsyncCacheModeConfig.class, AsyncCacheModeConfig.class}) + void fluxCacheDoesntDependOnFirstRequest(Class configClass) { + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Object key = new Object(); + + List l1 = service.cacheFlux(key).take(1L, true).collectList().block(); + List l2 = service.cacheFlux(key).take(3L, true).collectList().block(); + List l3 = service.cacheFlux(key).collectList().block(); + + Long first = l1.get(0); + + assertThat(l1).as("l1").containsExactly(first); + assertThat(l2).as("l2").containsExactly(first, 0L, -1L); + assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L); + + ctx.close(); + } + + @CacheConfig(cacheNames = "first") static class ReactiveCacheableService { @@ -119,12 +141,16 @@ CompletableFuture cacheFuture(Object arg) { @Cacheable Mono cacheMono(Object arg) { - return Mono.just(this.counter.getAndIncrement()); + // here counter not only reflects invocations of cacheMono but subscriptions to + // the returned Mono as well. See https://github.com/spring-projects/spring-framework/issues/32370 + return Mono.defer(() -> Mono.just(this.counter.getAndIncrement())); } @Cacheable Flux cacheFlux(Object arg) { - return Flux.just(this.counter.getAndIncrement(), 0L); + // here counter not only reflects invocations of cacheFlux but subscriptions to + // the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370 + return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L)); } } diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index 8342eb3a44b8..be356dbf6a31 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -26,12 +26,12 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import reactor.core.observability.DefaultSignalListener; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -90,7 +90,6 @@ * @author Sam Brannen * @author Stephane Nicoll * @author Sebastien Deleuze - * @author Simon Baslé * @since 3.1 */ public abstract class CacheAspectSupport extends AbstractCacheInvoker @@ -1037,45 +1036,34 @@ public void performCachePut(@Nullable Object value) { /** - * Reactor stateful SignalListener for collecting a List to cache. + * Reactive Streams Subscriber for exhausting the Flux and collecting a List + * to cache. */ - private class CachePutSignalListener extends DefaultSignalListener { + private final class CachePutListSubscriber implements Subscriber { - private final AtomicReference request; + private final CachePutRequest request; private final List cacheValue = new ArrayList<>(); - public CachePutSignalListener(CachePutRequest request) { - this.request = new AtomicReference<>(request); + public CachePutListSubscriber(CachePutRequest request) { + this.request = request; } @Override - public void doOnNext(Object o) { - this.cacheValue.add(o); + public void onSubscribe(Subscription s) { + s.request(Integer.MAX_VALUE); } - @Override - public void doOnComplete() { - CachePutRequest r = this.request.get(); - if (this.request.compareAndSet(r, null)) { - r.performCachePut(this.cacheValue); - } + public void onNext(Object o) { + this.cacheValue.add(o); } - @Override - public void doOnCancel() { - // Note: we don't use doFinally as we want to propagate the signal after cache put, not before - CachePutRequest r = this.request.get(); - if (this.request.compareAndSet(r, null)) { - r.performCachePut(this.cacheValue); - } + public void onError(Throwable t) { + this.cacheValue.clear(); } - @Override - public void doOnError(Throwable error) { - if (this.request.getAndSet(null) != null) { - this.cacheValue.clear(); - } + public void onComplete() { + this.request.performCachePut(this.cacheValue); } } @@ -1159,8 +1147,10 @@ public Object processPutRequest(CachePutRequest request, @Nullable Object result ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null); if (adapter != null) { if (adapter.isMultiValue()) { - return adapter.fromPublisher(Flux.from(adapter.toPublisher(result)) - .tap(() -> new CachePutSignalListener(request))); + Flux source = Flux.from(adapter.toPublisher(result)) + .publish().refCount(2); + source.subscribe(new CachePutListSubscriber(request)); + return adapter.fromPublisher(source); } else { return adapter.fromPublisher(Mono.from(adapter.toPublisher(result)) diff --git a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java index 7300ee59d467..5c04f80a519e 100644 --- a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java +++ b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java @@ -111,6 +111,30 @@ void cacheHitDetermination(Class configClass) { ctx.close(); } + @ParameterizedTest + @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, + EarlyCacheHitDeterminationWithoutNullValuesConfig.class, + LateCacheHitDeterminationConfig.class, + LateCacheHitDeterminationWithValueWrapperConfig.class}) + void fluxCacheDoesntDependOnFirstRequest(Class configClass) { + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Object key = new Object(); + + List l1 = service.cacheFlux(key).take(1L, true).collectList().block(); + List l2 = service.cacheFlux(key).take(3L, true).collectList().block(); + List l3 = service.cacheFlux(key).collectList().block(); + + Long first = l1.get(0); + + assertThat(l1).as("l1").containsExactly(first); + assertThat(l2).as("l2").containsExactly(first, 0L, -1L); + assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L); + + ctx.close(); + } + @CacheConfig(cacheNames = "first") static class ReactiveCacheableService { @@ -132,7 +156,7 @@ Mono cacheMono(Object arg) { Flux cacheFlux(Object arg) { // here counter not only reflects invocations of cacheFlux but subscriptions to // the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370 - return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L)); + return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L)); } }