Skip to content

Commit

Permalink
Merge branch '6.1.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 7, 2024
2 parents 92f4e88 + 6d9a2eb commit 7f0ab22
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,28 @@ void cacheHitDetermination(Class<?> configClass) {
}


@ParameterizedTest
@ValueSource(classes = {AsyncCacheModeConfig.class, AsyncCacheModeConfig.class})
void fluxCacheDoesntDependOnFirstRequest(Class<?> configClass) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class);
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);

Object key = new Object();

List<Long> l1 = service.cacheFlux(key).take(1L, true).collectList().block();
List<Long> l2 = service.cacheFlux(key).take(3L, true).collectList().block();
List<Long> l3 = service.cacheFlux(key).collectList().block();

Long first = l1.get(0);

assertThat(l1).as("l1").containsExactly(first);
assertThat(l2).as("l2").containsExactly(first, 0L, -1L);
assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L);

ctx.close();
}


@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {

Expand All @@ -119,12 +141,16 @@ CompletableFuture<Long> cacheFuture(Object arg) {

@Cacheable
Mono<Long> cacheMono(Object arg) {
return Mono.just(this.counter.getAndIncrement());
// here counter not only reflects invocations of cacheMono but subscriptions to
// the returned Mono as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Mono.defer(() -> Mono.just(this.counter.getAndIncrement()));
}

@Cacheable
Flux<Long> cacheFlux(Object arg) {
return Flux.just(this.counter.getAndIncrement(), 0L);
// here counter not only reflects invocations of cacheFlux but subscriptions to
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.observability.DefaultSignalListener;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -90,7 +90,6 @@
* @author Sam Brannen
* @author Stephane Nicoll
* @author Sebastien Deleuze
* @author Simon Baslé
* @since 3.1
*/
public abstract class CacheAspectSupport extends AbstractCacheInvoker
Expand Down Expand Up @@ -1037,45 +1036,34 @@ public void performCachePut(@Nullable Object value) {


/**
* Reactor stateful SignalListener for collecting a List to cache.
* Reactive Streams Subscriber for exhausting the Flux and collecting a List
* to cache.
*/
private class CachePutSignalListener extends DefaultSignalListener<Object> {
private final class CachePutListSubscriber implements Subscriber<Object> {

private final AtomicReference<CachePutRequest> request;
private final CachePutRequest request;

private final List<Object> cacheValue = new ArrayList<>();

public CachePutSignalListener(CachePutRequest request) {
this.request = new AtomicReference<>(request);
public CachePutListSubscriber(CachePutRequest request) {
this.request = request;
}

@Override
public void doOnNext(Object o) {
this.cacheValue.add(o);
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}

@Override
public void doOnComplete() {
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
public void onNext(Object o) {
this.cacheValue.add(o);
}

@Override
public void doOnCancel() {
// Note: we don't use doFinally as we want to propagate the signal after cache put, not before
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
public void onError(Throwable t) {
this.cacheValue.clear();
}

@Override
public void doOnError(Throwable error) {
if (this.request.getAndSet(null) != null) {
this.cacheValue.clear();
}
public void onComplete() {
this.request.performCachePut(this.cacheValue);
}
}

Expand Down Expand Up @@ -1159,8 +1147,10 @@ public Object processPutRequest(CachePutRequest request, @Nullable Object result
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
if (adapter != null) {
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(adapter.toPublisher(result))
.tap(() -> new CachePutSignalListener(request)));
Flux<?> source = Flux.from(adapter.toPublisher(result))
.publish().refCount(2);
source.subscribe(new CachePutListSubscriber(request));
return adapter.fromPublisher(source);
}
else {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ void cacheHitDetermination(Class<?> configClass) {
ctx.close();
}

@ParameterizedTest
@ValueSource(classes = {EarlyCacheHitDeterminationConfig.class,
EarlyCacheHitDeterminationWithoutNullValuesConfig.class,
LateCacheHitDeterminationConfig.class,
LateCacheHitDeterminationWithValueWrapperConfig.class})
void fluxCacheDoesntDependOnFirstRequest(Class<?> configClass) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class);
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);

Object key = new Object();

List<Long> l1 = service.cacheFlux(key).take(1L, true).collectList().block();
List<Long> l2 = service.cacheFlux(key).take(3L, true).collectList().block();
List<Long> l3 = service.cacheFlux(key).collectList().block();

Long first = l1.get(0);

assertThat(l1).as("l1").containsExactly(first);
assertThat(l2).as("l2").containsExactly(first, 0L, -1L);
assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L);

ctx.close();
}

@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {

Expand All @@ -132,7 +156,7 @@ Mono<Long> cacheMono(Object arg) {
Flux<Long> cacheFlux(Object arg) {
// here counter not only reflects invocations of cacheFlux but subscriptions to
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L));
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L));
}
}

Expand Down

0 comments on commit 7f0ab22

Please sign in to comment.