Skip to content

Commit

Permalink
Stronger protection against excess scheduling (fixes #57)
Browse files Browse the repository at this point in the history
The simpler state machine and tryLock guard protected against most
unnecessary scheduling of the maintenance task. This could occur though
due to some benign races, a high write rate, and pronounce the effect
of a clogged executor. This perfect scheduling has a negligible cost
while ensuring that even in synthetic stress tests no excess occurs.
  • Loading branch information
ben-manes committed Mar 9, 2016
1 parent af36fba commit 03af211
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -810,24 +810,53 @@ void afterWrite(@Nullable Node<K, V> node, Runnable task, long now) {
if (buffersWrites()) {
writeQueue().add(task);
}
lazySetDrainStatus(REQUIRED);
scheduleDrainBuffers();
scheduleAfterWrite();
}

/**
* Conditionally schedules the asynchronous maintenance task after a write operation. If the
* task status was IDLE or REQUIRED then the maintenance task is scheduled immediately. If it
* is already processing then it is set to transition to REQUIRED upon completion so that a new
* execution triggered by the next operation.
*/
void scheduleAfterWrite() {
for (;;) {
switch (drainStatus()) {
case IDLE:
casDrainStatus(IDLE, REQUIRED);
scheduleDrainBuffers();
return;
case REQUIRED:
scheduleDrainBuffers();
return;
case PROCESSING_TO_IDLE:
if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {
return;
}
continue;
case PROCESSING_TO_REQUIRED:
return;
default:
throw new IllegalStateException();
}
}
}

/**
* Attempts to schedule an asynchronous task to apply the pending operations to the page
* replacement policy. If the executor rejects the task then it is run directly.
*/
void scheduleDrainBuffers() {
if (drainStatus() == PROCESSING) {
if (drainStatus() >= PROCESSING_TO_IDLE) {
return;
}
if (evictionLock.tryLock()) {
try {
if (drainStatus() == PROCESSING) {
int drainStatus = drainStatus();
if (drainStatus >= PROCESSING_TO_IDLE) {
return;
}
lazySetDrainStatus(PROCESSING);
lazySetDrainStatus(PROCESSING_TO_IDLE);
executor().execute(drainBuffersTask);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
Expand Down Expand Up @@ -855,10 +884,12 @@ public void cleanUp() {
void performCleanUp() {
evictionLock.lock();
try {
lazySetDrainStatus(PROCESSING);
lazySetDrainStatus(PROCESSING_TO_IDLE);
maintenance();
} finally {
casDrainStatus(PROCESSING, IDLE);
if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
lazySetDrainStatus(REQUIRED);
}
evictionLock.unlock();
}
}
Expand Down Expand Up @@ -3001,8 +3032,10 @@ static abstract class DrainStatusRef<K, V> extends PadDrainStatus<K, V> {
static final int IDLE = 0;
/** A drain is required due to a pending write modification. */
static final int REQUIRED = 1;
/** A drain is in progress. */
static final int PROCESSING = 2;
/** A drain is in progress and will transition to idle. */
static final int PROCESSING_TO_IDLE = 2;
/** A drain is in progress and will transition to required. */
static final int PROCESSING_TO_REQUIRED = 3;

/** The draining status of the buffers. */
volatile int drainStatus = IDLE;
Expand All @@ -3018,7 +3051,8 @@ boolean shouldDrainBuffers(boolean delayable) {
return !delayable;
case REQUIRED:
return true;
case PROCESSING:
case PROCESSING_TO_IDLE:
case PROCESSING_TO_REQUIRED:
return false;
default:
throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.github.benmanes.caffeine.cache;

import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.IDLE;
import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.PROCESSING_TO_IDLE;
import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.PROCESSING_TO_REQUIRED;
import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.REQUIRED;
import static com.github.benmanes.caffeine.cache.testing.HasRemovalNotifications.hasRemovalNotifications;
import static com.github.benmanes.caffeine.cache.testing.HasStats.hasEvictionCount;
Expand All @@ -28,13 +31,16 @@
import static org.hamcrest.Matchers.nullValue;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.mockito.Matchers;
import org.mockito.Mockito;
import org.testng.annotations.Listeners;
import org.testng.annotations.Test;

Expand All @@ -58,6 +64,7 @@
import com.github.benmanes.caffeine.testing.Awaits;
import com.github.benmanes.caffeine.testing.ConcurrentTestHarness;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

Expand All @@ -76,6 +83,46 @@ static BoundedLocalCache<Integer, Integer> asBoundedLocalCache(Cache<Integer, In
return (BoundedLocalCache<Integer, Integer>) cache.asMap();
}

@Test
public void scheduleAfterWrite() {
BoundedLocalCache<?, ?> cache = new BoundedLocalCache<Object, Object>(
Caffeine.newBuilder(), /* loader */ null, /* async */ false) {
@Override void scheduleDrainBuffers() {}
};
Map<Integer, Integer> transitions = ImmutableMap.of(
IDLE, REQUIRED,
REQUIRED, REQUIRED,
PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED,
PROCESSING_TO_REQUIRED, PROCESSING_TO_REQUIRED);
transitions.forEach((start, end) -> {
cache.drainStatus = start;
cache.scheduleAfterWrite();
assertThat(cache.drainStatus, is(end));
});
}

@Test
public void scheduleDrainBuffers() {
Executor executor = Mockito.mock(Executor.class);
BoundedLocalCache<?, ?> cache = new BoundedLocalCache<Object, Object>(
Caffeine.newBuilder().executor(executor), /* loader */ null, /* async */ false) {};
Map<Integer, Integer> transitions = ImmutableMap.of(
IDLE, PROCESSING_TO_IDLE,
REQUIRED, PROCESSING_TO_IDLE,
PROCESSING_TO_IDLE, PROCESSING_TO_IDLE,
PROCESSING_TO_REQUIRED, PROCESSING_TO_REQUIRED);
transitions.forEach((start, end) -> {
cache.drainStatus = start;
cache.scheduleDrainBuffers();
assertThat(cache.drainStatus, is(end));

if (start != end) {
Mockito.verify(executor).execute(Matchers.any());
Mockito.reset(executor);
}
});
}

@Test
public void putWeighted_noOverflow() {
Cache<Integer, Integer> cache = Caffeine.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
* @author [email protected] (Ben Manes)
*/
public final class Stresser {
private static final String[] STATUS = { "Idle", "Required", "Processing" };
private static final String[] STATUS =
{ "Idle", "Required", "Processing -> Idle", "Processing -> Required" };
private static final int THREADS = 2 * Runtime.getRuntime().availableProcessors();
private static final int WRITE_MAX_SIZE = (1 << 12);
private static final int TOTAL_KEYS = (1 << 20);
Expand Down
180 changes: 0 additions & 180 deletions config/checkstyle/checkstyle.xsl

This file was deleted.

1 change: 1 addition & 0 deletions config/pmd/rulesSets.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<rule ref="rulesets/java/clone.xml"/>

<rule ref="rulesets/java/codesize.xml">
<exclude name="NcssTypeCount"/>
<exclude name="TooManyMethods"/>
<exclude name="ExcessivePublicCount"/>
<exclude name="ExcessiveClassLength"/>
Expand Down
7 changes: 3 additions & 4 deletions gradle/code_quality.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ tasks.withType(Test) {
tasks.withType(Checkstyle) {
enabled = System.properties.containsKey('checkstyle')
group = 'Checkstyle'
doLast {
ant.xslt(in: "${buildDir}/reports/checkstyle/main.xml",
style: "//${rootDir}/config/checkstyle/checkstyle.xsl",
out:"${buildDir}/reports/checkstyle/checkstyle.html")
reports {
xml.enabled = false
html.enabled = true
}
}

Expand Down
Loading

0 comments on commit 03af211

Please sign in to comment.