Skip to content

Commit

Permalink
Respect cache hit when empty Mono/Flux response is returned
Browse files Browse the repository at this point in the history
Closes gh-31868
  • Loading branch information
jhoeller committed Dec 20, 2023
1 parent d7ce13c commit dc564f3
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,11 @@ private Object findInCaches(CacheOperationContext context, Object key,
private Object evaluate(@Nullable Object cacheHit, CacheOperationInvoker invoker, Method method,
CacheOperationContexts contexts) {

// Re-invocation in reactive pipeline after late cache hit determination?
if (contexts.processed) {
return cacheHit;
}

Object cacheValue;
Object returnValue;

Expand Down Expand Up @@ -541,6 +546,9 @@ private Object evaluate(@Nullable Object cacheHit, CacheOperationInvoker invoker
returnValue = returnOverride;
}

// Mark as processed for re-invocation after late cache hit determination
contexts.processed = true;

return returnValue;
}

Expand Down Expand Up @@ -688,6 +696,8 @@ private class CacheOperationContexts {

private final boolean sync;

boolean processed;

public CacheOperationContexts(Collection<? extends CacheOperation> operations, Method method,
Object[] args, Object target, Class<?> targetClass) {

Expand Down Expand Up @@ -1082,21 +1092,25 @@ public Object findInCaches(CacheOperationContext context, Cache cache, Object ke
return null;
}
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(
Mono.fromFuture(cachedFuture)
.flatMap(value -> (Mono<?>) evaluate(Mono.justOrEmpty(unwrapCacheValue(value)), invoker, method, contexts)))
.flatMap(v -> (v instanceof Iterable<?> iv ? Flux.fromIterable(iv) : Flux.just(v)))
.switchIfEmpty(Flux.defer(() -> (Flux<?>) evaluate(null, invoker, method, contexts))));
return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
.switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts)));
}
else {
return adapter.fromPublisher(Mono.fromFuture(cachedFuture)
.flatMap(value -> (Mono<?>) evaluate(Mono.justOrEmpty(unwrapCacheValue(value)), invoker, method, contexts))
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts))));
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)));
}
}
return NOT_HANDLED;
}

private Flux<?> valueToFlux(Object value, CacheOperationContexts contexts) {
Object data = unwrapCacheValue(value);
return (!contexts.processed && data instanceof Iterable<?> iterable ? Flux.fromIterable(iterable) :
(data != null ? Flux.just(data) : Flux.empty()));
}

@Nullable
public Object processPutRequest(CachePutRequest request, @Nullable Object result) {
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ void spr14235AdaptsToCompletableFuture() {
assertThat(bean.findById("tb2").join()).isNotSameAs(tb);
assertThat(cache.get("tb2")).isNull();

assertThat(bean.findByIdEmpty("").join()).isNull();
assertThat(bean.findByIdEmpty("").join()).isNull();
assertThat(cache.get("").get()).isNull();
assertThat(bean.findByIdEmpty("").join()).isNull();

context.close();
}
Expand All @@ -230,9 +230,9 @@ void spr14235AdaptsToCompletableFutureWithSync() throws Exception {
assertThat(bean.findById("tb1").get()).isSameAs(tb);
assertThat(cache.get("tb1").get()).isSameAs(tb);

assertThat(bean.findById("").join()).isNull();
assertThat(bean.findById("").join()).isNull();
assertThat(cache.get("").get()).isNull();
assertThat(bean.findById("").join()).isNull();

context.close();
}
Expand Down Expand Up @@ -265,9 +265,9 @@ void spr14235AdaptsToReactorMono() {
assertThat(bean.findById("tb2").block()).isNotSameAs(tb);
assertThat(cache.get("tb2")).isNull();

assertThat(bean.findByIdEmpty("").block()).isNull();
assertThat(bean.findByIdEmpty("").block()).isNull();
assertThat(cache.get("").get()).isNull();
assertThat(bean.findByIdEmpty("").block()).isNull();

context.close();
}
Expand All @@ -293,9 +293,9 @@ void spr14235AdaptsToReactorMonoWithSync() {
assertThat(bean.findById("tb1").block()).isSameAs(tb);
assertThat(cache.get("tb1").get()).isSameAs(tb);

assertThat(bean.findById("").block()).isNull();
assertThat(bean.findById("").block()).isNull();
assertThat(cache.get("").get()).isNull();
assertThat(bean.findById("").block()).isNull();

context.close();
}
Expand Down Expand Up @@ -328,9 +328,9 @@ void spr14235AdaptsToReactorFlux() {
assertThat(bean.findById("tb2").collectList().block()).isNotEqualTo(tb);
assertThat(cache.get("tb2")).isNull();

assertThat(bean.findByIdEmpty("").collectList().block()).isEmpty();
assertThat(bean.findByIdEmpty("").collectList().block()).isEmpty();
assertThat(cache.get("").get()).isEqualTo(Collections.emptyList());
assertThat(bean.findByIdEmpty("").collectList().block()).isEmpty();

context.close();
}
Expand All @@ -356,9 +356,9 @@ void spr14235AdaptsToReactorFluxWithSync() {
assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb);
assertThat(cache.get("tb1").get()).isEqualTo(tb);

assertThat(bean.findById("").collectList().block()).isEmpty();
assertThat(bean.findById("").collectList().block()).isEmpty();
assertThat(cache.get("").get()).isEqualTo(Collections.emptyList());
assertThat(bean.findById("").collectList().block()).isEmpty();

context.close();
}
Expand Down Expand Up @@ -587,13 +587,17 @@ public Spr14230Service service() {

public static class Spr14235FutureService {

private boolean emptyCalled;

@Cacheable(value = "itemCache", unless = "#result.name == 'tb2'")
public CompletableFuture<TestBean> findById(String id) {
return CompletableFuture.completedFuture(new TestBean(id));
}

@Cacheable(value = "itemCache")
public CompletableFuture<TestBean> findByIdEmpty(String id) {
assertThat(emptyCalled).isFalse();
emptyCalled = true;
return CompletableFuture.completedFuture(null);
}

Expand All @@ -611,9 +615,16 @@ public CompletableFuture<Void> clear() {

public static class Spr14235FutureServiceSync {

private boolean emptyCalled;

@Cacheable(value = "itemCache", sync = true)
public CompletableFuture<TestBean> findById(String id) {
return CompletableFuture.completedFuture(id.isEmpty() ? null : new TestBean(id));
if (id.isEmpty()) {
assertThat(emptyCalled).isFalse();
emptyCalled = true;
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.completedFuture(new TestBean(id));
}

@CachePut(cacheNames = "itemCache", key = "#item.name")
Expand All @@ -625,13 +636,17 @@ public TestBean insertItem(TestBean item) {

public static class Spr14235MonoService {

private boolean emptyCalled;

@Cacheable(value = "itemCache", unless = "#result.name == 'tb2'")
public Mono<TestBean> findById(String id) {
return Mono.just(new TestBean(id));
}

@Cacheable(value = "itemCache")
public Mono<TestBean> findByIdEmpty(String id) {
assertThat(emptyCalled).isFalse();
emptyCalled = true;
return Mono.empty();
}

Expand All @@ -649,9 +664,16 @@ public Mono<Void> clear() {

public static class Spr14235MonoServiceSync {

private boolean emptyCalled;

@Cacheable(value = "itemCache", sync = true)
public Mono<TestBean> findById(String id) {
return (id.isEmpty() ? Mono.empty() : Mono.just(new TestBean(id)));
if (id.isEmpty()) {
assertThat(emptyCalled).isFalse();
emptyCalled = true;
return Mono.empty();
}
return Mono.just(new TestBean(id));
}

@CachePut(cacheNames = "itemCache", key = "#item.name")
Expand All @@ -665,13 +687,17 @@ public static class Spr14235FluxService {

private int counter = 0;

private boolean emptyCalled;

@Cacheable(value = "itemCache", unless = "#result[0].name == 'tb2'")
public Flux<TestBean> findById(String id) {
return Flux.just(new TestBean(id), new TestBean(id + (counter++)));
}

@Cacheable(value = "itemCache")
public Flux<TestBean> findByIdEmpty(String id) {
assertThat(emptyCalled).isFalse();
emptyCalled = true;
return Flux.empty();
}

Expand All @@ -691,9 +717,16 @@ public static class Spr14235FluxServiceSync {

private int counter = 0;

private boolean emptyCalled;

@Cacheable(value = "itemCache", sync = true)
public Flux<TestBean> findById(String id) {
return (id.isEmpty() ? Flux.empty() : Flux.just(new TestBean(id), new TestBean(id + (counter++))));
if (id.isEmpty()) {
assertThat(emptyCalled).isFalse();
emptyCalled = true;
return Flux.empty();
}
return Flux.just(new TestBean(id), new TestBean(id + (counter++)));
}

@CachePut(cacheNames = "itemCache", key = "#id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -171,6 +172,11 @@ protected Cache createConcurrentMapCache(String name) {
public CompletableFuture<?> retrieve(Object key) {
return CompletableFuture.completedFuture(lookup(key));
}
@Override
public void put(Object key, @Nullable Object value) {
assertThat(get(key) == null).as("Double put");
super.put(key, value);
}
};
}
};
Expand All @@ -193,6 +199,11 @@ public CompletableFuture<?> retrieve(Object key) {
Object value = lookup(key);
return CompletableFuture.completedFuture(value != null ? toValueWrapper(value) : null);
}
@Override
public void put(Object key, @Nullable Object value) {
assertThat(get(key) == null).as("Double put");
super.put(key, value);
}
};
}
};
Expand Down

0 comments on commit dc564f3

Please sign in to comment.