Skip to content

Commit

Permalink
Ensure backpressure's task is executed last (fixes #124)
Browse files Browse the repository at this point in the history
In most workloads writes are relatively rare and spaced apart. The write
buffer is a handoff between the writer and the async maintenance task
being triggered. That buffer may grow to a limit, at which point the
cache forces backpressure to cope with the high write rate. In this case
the writer blocks and tries to help out by performing the maintenace work.

In the above case the writer's task could not be scheduled, so when it
performs the clean-up it must also run its pending task. Previously this
was done prior to the buffers being drained, as order shouldn't matter. But
in a unit test a user could observe a premature eviction, because the last
addition was pushed to the front of the admission window.

This fix now keeps the writer's task at the tail, so the writer won't evict
the entry early. That won't have much of a real-world impact, but does play
nicer in unit tests.
  • Loading branch information
ben-manes committed Sep 30, 2016
1 parent eb6426b commit 48e8c89
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -996,10 +996,7 @@ void performCleanUp(@Nullable Runnable task) {
evictionLock.lock();
try {
lazySetDrainStatus(PROCESSING_TO_IDLE);
if (task != null) {
task.run();
}
maintenance();
maintenance(task);
} finally {
if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
lazySetDrainStatus(REQUIRED);
Expand All @@ -1011,12 +1008,18 @@ void performCleanUp(@Nullable Runnable task) {
/**
* Performs the pending maintenance work. The read buffer, write buffer, and reference queues are
* drained, followed by expiration, and size-based eviction.
*
* @param task an additional pending task to run, or {@code null} if not present
*/
@GuardedBy("evictionLock")
void maintenance() {
void maintenance(@Nullable Runnable task) {
drainReadBuffer();

drainWriteBuffer();
if (task != null) {
task.run();
}

drainKeyReferences();
drainValueReferences();

Expand Down Expand Up @@ -2314,7 +2317,7 @@ Map<K, V> snapshot(Iterator<Node<K, V>> iterator, Function<V, V> transformer, in
requireArgument(limit >= 0);
evictionLock.lock();
try {
maintenance();
maintenance(/* ignored */ null);

int initialCapacity =
isWeighted() ? 16 : Math.min(limit, evicts() ? (int) adjustedWeightedSize() : size());
Expand Down Expand Up @@ -3012,7 +3015,7 @@ final class BoundedEviction implements Eviction<K, V> {
cache.evictionLock.lock();
try {
cache.setMaximum(maximum);
cache.maintenance();
cache.maintenance(/* ignored */ null);
} finally {
cache.evictionLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -305,7 +303,7 @@ private static void updateRecency(BoundedLocalCache<Integer, Integer> cache,
Node<Integer, Integer> first = firstBeforeAccess(cache, context);

operation.run();
cache.maintenance();
cache.maintenance(/* ignored */ null);

if (context.isZeroWeighted()) {
assertThat(cache.accessOrderEdenDeque().peekFirst(), is(not(first)));
Expand Down Expand Up @@ -377,16 +375,22 @@ public void fastpath(Cache<Integer, Integer> cache, CacheContext context) {
population = Population.FULL, maximumSize = Maximum.FULL)
public void afterWrite_drainFullWriteBuffer(Cache<Integer, Integer> cache, CacheContext context) {
BoundedLocalCache<Integer, Integer> localCache = asBoundedLocalCache(cache);
Runnable task = Mockito.mock(Runnable.class);
localCache.drainStatus = PROCESSING_TO_IDLE;
int expectedCount = 1;

while (localCache.writeBuffer().offer(task)) {
expectedCount++;
int[] processed = { 0 };
Runnable pendingTask = () -> processed[0]++;

int[] expectedCount = { 0 };
while (localCache.writeBuffer().offer(pendingTask)) {
expectedCount[0]++;
}

localCache.afterWrite(null, task, 0L);
verify(task, times(expectedCount)).run();
int[] triggered = { 0 };
Runnable triggerTask = () -> triggered[0] = 1 + expectedCount[0];
localCache.afterWrite(null, triggerTask, 0L);

assertThat(processed[0], is(expectedCount[0]));
assertThat(triggered[0], is(expectedCount[0] + 1));
}

@Test(dataProvider = "caches")
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ext {
jcache_tck: '1.0.1',
jctools: '1.2.1',
junit: '4.12',
mockito: '2.1.0-RC.1',
mockito: '2.1.0-RC.2',
pax_exam: '4.9.1',
testng: '6.9.12',
truth: '0.24',
Expand Down

0 comments on commit 48e8c89

Please sign in to comment.