From 542c308fb89a2cd3bdf35698b47d9c9f6d34741e Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Wed, 30 Jun 2021 21:57:44 -0700 Subject: [PATCH] Improve robustness in racy scenarios (fixes #568) 1. When an entry is updated then a concurrent reader should either observe the old or new value. This operation replaces the j.l.Reference instance stored on the entry and the old referent becomes eligible for garbage collection. A reader holding the stale Reference may therefore return a null value, which is more likely due to the cache proactively clearing the referent to assist the garbage collector. When a null value is read then the an extra volatile read is used to validate that the Reference instance is still held by the entry. This retry loop has negligible cost. 2. When an entry is eligible for removal due to its value being garbage collected, during the eviction's atomic map operation this eligibility must be verified. If concurrently the entry was resurrected and a new value set, then the cache writer has already dispatched the removal notification and established a live mapping. If the evictor does not detect that the cause is no longer valid, then it would incorrectly discard the mapping with a removal notification containing a non-null key, non-null value, and collected removal cause. Like expiration and size policies, the reference eviction policy will now validate and no-op if the entry is no longer eligible. 3. When the fixed expiration setting is dynamically adjusted, an expired entry may be resurrected as no longer eligible for removoal. While the map operation detected this case, stemming from the entry itself being updated and its lifetime reset, the outer eviction loop could retry indefinitely. A stale read of the fixed duration caused the loop to retry the ineligible entry, but instead it can terminatee as it scans a queue ordered by the expiration timestamp. Co-authored-by: jhorvitz@google.com --- .../caffeine/cache/node/AddValue.java | 30 +++- .../caffeine/cache/BoundedLocalCache.java | 26 ++-- .../caffeine/cache/BoundedLocalCacheTest.java | 139 ++++++++++++++++++ .../caffeine/cache/issues/Issue568Test.java | 127 ++++++++++++++++ checksum.xml | 3 + gradle/dependencies.gradle | 2 +- 6 files changed, 315 insertions(+), 12 deletions(-) create mode 100644 caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java index ab271ed2e0..f7a7df2c73 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java @@ -24,6 +24,7 @@ import javax.lang.model.element.Modifier; import com.squareup.javapoet.ClassName; +import com.squareup.javapoet.CodeBlock; import com.squareup.javapoet.FieldSpec; import com.squareup.javapoet.MethodSpec; @@ -44,7 +45,7 @@ protected boolean applies() { protected void execute() { context.nodeSubtype .addField(newValueField()) - .addMethod(newGetter(valueStrength(), vTypeVar, "value", Visibility.PLAIN)) + .addMethod(makeGetValue()) .addMethod(newGetRef("value")) .addMethod(makeSetValue()) .addMethod(makeContainsValue()); @@ -60,6 +61,30 @@ private FieldSpec newValueField() { return fieldSpec.build(); } + /** Creates the getValue method. */ + private MethodSpec makeGetValue() { + MethodSpec.Builder getter = MethodSpec.methodBuilder("getValue") + .addModifiers(context.publicFinalModifiers()) + .returns(vTypeVar); + if (valueStrength() == Strength.STRONG) { + getter.addStatement("return ($T) $L.get(this)", vTypeVar, varHandleName("value")); + return getter.build(); + } + + CodeBlock code = CodeBlock.builder() + .beginControlFlow("for (;;)") + .addStatement("$1T ref = ($1T) $2L.get(this)", + Reference.class, varHandleName("value")) + .addStatement("V value = ref.get()") + .beginControlFlow( + "if ((value != null) || (ref == $L.getVolatile(this)))", varHandleName("value")) + .addStatement("return value") + .endControlFlow() + .endControlFlow() + .build(); + return getter.addCode(code).build(); + } + /** Creates the setValue method. */ private MethodSpec makeSetValue() { MethodSpec.Builder setter = MethodSpec.methodBuilder("setValue") @@ -70,9 +95,10 @@ private MethodSpec makeSetValue() { if (isStrongValues()) { setter.addStatement("$L.set(this, $N)", varHandleName("value"), "value"); } else { - setter.addStatement("(($T) getValueReference()).clear()", Reference.class); + setter.addStatement("$1T ref = ($1T) getValueReference()", Reference.class); setter.addStatement("$L.set(this, new $T($L, $N, referenceQueue))", varHandleName("value"), valueReferenceType(), "getKeyReference()", "value"); + setter.addStatement("ref.clear()"); } return setter.build(); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index e6a9e2b094..6cfa68d50f 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -870,10 +870,10 @@ void expireAfterAccessEntries(AccessOrderDeque> accessOrderDeque, lon long duration = expiresAfterAccessNanos(); for (;;) { Node node = accessOrderDeque.peekFirst(); - if ((node == null) || ((now - node.getAccessTime()) < duration)) { + if ((node == null) || ((now - node.getAccessTime()) < duration) + || !evictEntry(node, RemovalCause.EXPIRED, now)) { return; } - evictEntry(node, RemovalCause.EXPIRED, now); } } @@ -885,11 +885,11 @@ void expireAfterWriteEntries(long now) { } long duration = expiresAfterWriteNanos(); for (;;) { - final Node node = writeOrderDeque().peekFirst(); - if ((node == null) || ((now - node.getWriteTime()) < duration)) { + Node node = writeOrderDeque().peekFirst(); + if ((node == null) || ((now - node.getWriteTime()) < duration) + || !evictEntry(node, RemovalCause.EXPIRED, now)) { break; } - evictEntry(node, RemovalCause.EXPIRED, now); } } @@ -942,8 +942,8 @@ boolean hasExpired(Node node, long now) { } /** - * Attempts to evict the entry based on the given removal cause. A removal due to expiration or - * size may be ignored if the entry was updated and is no longer eligible for eviction. + * Attempts to evict the entry based on the given removal cause. A removal due to may be ignored + * if the entry was updated and is no longer eligible for eviction. * * @param node the entry to evict * @param cause the reason to evict @@ -958,8 +958,8 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { V[] value = (V[]) new Object[1]; boolean[] removed = new boolean[1]; boolean[] resurrect = new boolean[1]; - RemovalCause[] actualCause = new RemovalCause[1]; Object keyReference = node.getKeyReference(); + RemovalCause[] actualCause = new RemovalCause[1]; data.computeIfPresent(keyReference, (k, n) -> { if (n != node) { @@ -968,7 +968,15 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { synchronized (n) { value[0] = n.getValue(); - actualCause[0] = (key == null) || (value[0] == null) ? RemovalCause.COLLECTED : cause; + if ((key == null) || (value[0] == null)) { + actualCause[0] = RemovalCause.COLLECTED; + } else if (cause == RemovalCause.COLLECTED) { + resurrect[0] = true; + return n; + } else { + actualCause[0] = cause; + } + if (actualCause[0] == RemovalCause.EXPIRED) { boolean expired = false; if (expiresAfterAccess()) { diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 70a5e757d4..38b45498c5 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -44,6 +44,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.lang.Thread.State; +import java.lang.ref.Reference; +import java.time.Duration; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -409,6 +413,141 @@ public void evict_update_entryTooBig_protected(Cache cache, CacheConte Int.valueOf(1), Int.valueOf(20), RemovalCause.SIZE))); } + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, values = {ReferenceType.WEAK, ReferenceType.SOFT}, + removalListener = Listener.CONSUMING) + public void evict_resurrect_collected(Cache cache, CacheContext context) { + Int key = Int.valueOf(1); + Int oldValue = Int.valueOf(2); + Int newValue = Int.valueOf(2); + var localCache = asBoundedLocalCache(cache); + + cache.put(key, oldValue); + var node = localCache.data.get(localCache.referenceKey(key)); + @SuppressWarnings("unchecked") + var ref = (Reference) node.getValueReference(); + ref.enqueue(); + + var evictor = new Thread(localCache::cleanUp); + evictor.setDaemon(true); + + localCache.compute(key, (k, v) -> { + assertThat(v, is(nullValue())); + + evictor.start(); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.getState())); + + return newValue; + }); + await().until(() -> evictor.getState() == State.TERMINATED); + + assertThat(node.getValue(), is(newValue)); + assertThat(context.removalNotifications(), is(equalTo(List.of( + new RemovalNotification<>(key, null, RemovalCause.COLLECTED))))); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, maximumSize = Maximum.UNREACHABLE, + weigher = CacheWeigher.COLLECTION) + public void evict_resurrect_weight(Cache> cache, CacheContext context) { + Int key = Int.valueOf(1); + cache.put(key, List.of(key)); + + var evictor = new Thread(() -> cache.policy().eviction().get().setMaximum(0)); + evictor.setDaemon(true); + + cache.asMap().compute(key, (k, v) -> { + evictor.start(); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.getState())); + + return List.of(); + }); + await().until(() -> evictor.getState() == State.TERMINATED); + + assertThat(cache.getIfPresent(key), is(List.of())); + verifyRemovalListener(context, verifier -> verifier.hasCount(0, RemovalCause.SIZE)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterAccess = Expire.FOREVER) + public void evict_resurrect_expireAfterAccess(Cache cache, CacheContext context) { + Int key = Int.valueOf(1); + cache.put(key, key); + + var evictor = new Thread(() -> + cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ZERO)); + evictor.setDaemon(true); + + context.ticker().advance(Duration.ofMinutes(1)); + cache.asMap().compute(key, (k, v) -> { + evictor.start(); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.getState())); + cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ofHours(1)); + return v; + }); + await().until(() -> evictor.getState() == State.TERMINATED); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterWrite = Expire.FOREVER) + public void evict_resurrect_expireAfterWrite(Cache cache, CacheContext context) { + Int key = Int.valueOf(1); + cache.put(key, key); + + var evictor = new Thread(() -> + cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ZERO)); + evictor.setDaemon(true); + + context.ticker().advance(Duration.ofMinutes(1)); + cache.asMap().compute(key, (k, v) -> { + evictor.start(); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.getState())); + cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ofHours(1)); + return v; + }); + await().until(() -> evictor.getState() == State.TERMINATED); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expiry = CacheExpiry.CREATE, expiryTime = Expire.ONE_MINUTE) + public void evict_resurrect_expireAfterVar(Cache cache, CacheContext context) { + var localCache = asBoundedLocalCache(cache); + Int key = Int.valueOf(1); + cache.put(key, key); + var node = localCache.data.get(localCache.referenceKey(key)); + + var evictor = new Thread(cache::cleanUp); + evictor.setDaemon(true); + + synchronized (node) { + context.ticker().advance(Duration.ofHours(1)); + + evictor.start(); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.getState())); + cache.policy().expireVariably().get().setExpiresAfter(key, Duration.ofDays(1)); + } + await().until(() -> evictor.getState() == State.TERMINATED); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + @Test(dataProvider = "caches") @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, population = Population.FULL, maximumSize = Maximum.FULL) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java new file mode 100644 index 0000000000..6354d5a6b8 --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import java.lang.ref.Reference; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; + +/** + * Issue #568: Incorrect handling of weak/soft reference caching. + * + * @author justinhorvitz (Justin Horvitz) + */ +@Test(groups = "isolated") +public class Issue568Test { + + /** + * When an entry is updated then a concurrent reader should either observe the old or new value. + * This operation replaces the {@link Reference} instance stored on the entry and the old referent + * becomes eligible for garbage collection. A reader holding the stale Reference may therefore + * return a null value, which is more likely due to the cache proactively clearing the referent to + * assist the garbage collector. + */ + @Test + public void intermittentNull() throws InterruptedException { + Cache cache = Caffeine.newBuilder().weakValues().build(); + + String key = "key"; + String val = "val"; + cache.put("key", "val"); + + var error = new AtomicReference(); + var threads = new ArrayList(); + for (int i = 0; i < 10; i++) { + int name = i; + var thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + if (Math.random() < .5) { + cache.put(key, val); + } else if (cache.getIfPresent(key) == null) { + error.compareAndSet(null, new IllegalStateException( + "Thread " + name + " observed null on iteration " + j)); + break; + } + } + }); + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } + + /** + * When an entry is eligible for removal due to its value being garbage collected, during the + * eviction's atomic map operation this eligibility must be verified. If concurrently the entry + * was resurrected and a new value set, then the cache writer has already dispatched the removal + * notification and established a live mapping. If the evictor does not detect that the cause is + * no longer valid, then it would incorrectly discard the mapping with a removal notification + * containing a non-null key, non-null value, and collected removal cause. + */ + @Test + public void resurrect() throws InterruptedException { + var error = new AtomicReference(); + Cache cache = Caffeine.newBuilder() + .weakValues() + .removalListener((k, v, cause) -> { + if (cause == RemovalCause.COLLECTED && (v != null)) { + error.compareAndSet(null, new IllegalStateException("Evicted a live value: " + v)); + } + }).build(); + + String key = "key"; + cache.put(key, new Object()); + + var missing = new AtomicBoolean(); + var threads = new ArrayList(); + for (int i = 0; i < 100; i++) { + var thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + if (error.get() != null) { + break; + } + if (Math.random() < .01) { + System.gc(); + cache.cleanUp(); + } else if ((cache.getIfPresent(key) == null) && !missing.getAndSet(true)) { + cache.put(key, new Object()); + missing.set(false); + } + } + }); + threads.add(thread); + thread.start(); + } + + for (var thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } +} diff --git a/checksum.xml b/checksum.xml index cbbe8b47a4..949e1e0563 100644 --- a/checksum.xml +++ b/checksum.xml @@ -483,6 +483,9 @@ 7F18348A8EDC88BB4B0D6EB5412BC4F6D79D1E5843F28CA72F717BE50BF0BF6A19AE1C3416E5EAB40CC1FD27C697CB0F9B5E10A51CFA574093EF0A4F57CB93CF + + 5EA9CA94A3682E090E28895ECAAE1E020C48DD249EE5040FB6EBE4B01DA027B86F94450E30692253DFA787371D4B4286FE257CEBF02E184AC149A746952D669C + 779F0D784A11834392C65DA375CF5F9612FD89B0540C665BCE1009EFAA7C35642E38D381AF7362703378306C95D24669B02CA27A2CCAFB574173BF9FA273F625 diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index c194f51d58..f9051f8b8c 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -85,7 +85,7 @@ ext { bnd: '5.3.0', checkstyle: '8.44', coveralls: '2.12.0', - errorprone: '2.0.1', + errorprone: '2.0.2', findsecbugs: '1.11.0', jacoco: '0.8.6', jmh: '0.6.5',