Skip to content

Commit

Permalink
Stronger protection against excess scheduling [ci skip]
Browse files Browse the repository at this point in the history
A quick experiment showing that perfect scheduling has an insignificant
effect. The tryLock trick already avoids almost all duplicates due to
writer stampedes during the scheduling window. Unless the executor is
clogged there shouldn't be an impact and benchmarks didn't have any
conclusive difference.

The perfect state machine is only slightly more expensive to maintain.
So this remains as a brain dump for revisiting in case the issue
raised by user incident.
  • Loading branch information
ben-manes committed Mar 9, 2016
1 parent d8423c9 commit ccd7ae9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -810,24 +810,53 @@ void afterWrite(@Nullable Node<K, V> node, Runnable task, long now) {
if (buffersWrites()) {
writeQueue().add(task);
}
lazySetDrainStatus(REQUIRED);
scheduleDrainBuffers();
scheduleAfterWrite();
}

/**
* Conditionally schedules the asynchronous maintenance task after a write operation. If the
* task status was IDLE or REQUIRED then the maintenance task is scheduled immediately. If it
* is already processing then it is set to transition to REQUIRED upon completion so that a new
* execution triggered by the next operation.
*/
void scheduleAfterWrite() {
for (;;) {
switch (drainStatus()) {
case IDLE:
casDrainStatus(IDLE, REQUIRED);
scheduleDrainBuffers();
return;
case REQUIRED:
scheduleDrainBuffers();
return;
case PROCESSING_TO_IDLE:
if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {
return;
}
continue;
case PROCESSING_TO_REQUIRED:
return;
default:
throw new IllegalStateException();
}
}
}

/**
* Attempts to schedule an asynchronous task to apply the pending operations to the page
* replacement policy. If the executor rejects the task then it is run directly.
*/
void scheduleDrainBuffers() {
if (drainStatus() == PROCESSING) {
if (drainStatus() >= PROCESSING_TO_IDLE) {
return;
}
if (evictionLock.tryLock()) {
try {
if (drainStatus() == PROCESSING) {
int drainStatus = drainStatus();
if (drainStatus >= PROCESSING_TO_IDLE) {
return;
}
lazySetDrainStatus(PROCESSING);
lazySetDrainStatus(PROCESSING_TO_IDLE);
executor().execute(drainBuffersTask);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
Expand Down Expand Up @@ -855,10 +884,12 @@ public void cleanUp() {
void performCleanUp() {
evictionLock.lock();
try {
lazySetDrainStatus(PROCESSING);
lazySetDrainStatus(PROCESSING_TO_IDLE);
maintenance();
} finally {
casDrainStatus(PROCESSING, IDLE);
if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
lazySetDrainStatus(REQUIRED);
}
evictionLock.unlock();
}
}
Expand Down Expand Up @@ -3001,8 +3032,10 @@ static abstract class DrainStatusRef<K, V> extends PadDrainStatus<K, V> {
static final int IDLE = 0;
/** A drain is required due to a pending write modification. */
static final int REQUIRED = 1;
/** A drain is in progress. */
static final int PROCESSING = 2;
/** A drain is in progress and will transition to idle. */
static final int PROCESSING_TO_IDLE = 2;
/** A drain is in progress and will transition to required. */
static final int PROCESSING_TO_REQUIRED = 3;

/** The draining status of the buffers. */
volatile int drainStatus = IDLE;
Expand All @@ -3018,7 +3051,8 @@ boolean shouldDrainBuffers(boolean delayable) {
return !delayable;
case REQUIRED:
return true;
case PROCESSING:
case PROCESSING_TO_IDLE:
case PROCESSING_TO_REQUIRED:
return false;
default:
throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
<<<<<<< edee8ed34ba79b4b021beb49d4094eb411eaf317
import java.util.concurrent.ThreadFactory;
=======
>>>>>>> More graceful executor error handling for removal notifications
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -33,10 +30,7 @@

import com.github.benmanes.caffeine.testing.ConcurrentTestHarness;
import com.google.common.base.Stopwatch;
<<<<<<< edee8ed34ba79b4b021beb49d4094eb411eaf317
import com.google.common.util.concurrent.ThreadFactoryBuilder;
=======
>>>>>>> More graceful executor error handling for removal notifications

/**
* A stress test to observe if the cache has a memory leak by not being able to drain the buffers
Expand All @@ -45,7 +39,8 @@
* @author [email protected] (Ben Manes)
*/
public final class Stresser {
private static final String[] STATUS = { "Idle", "Required", "Processing" };
private static final String[] STATUS =
{ "Idle", "Required", "Processing -> Idle", "Processing -> Required" };
private static final int THREADS = 2 * Runtime.getRuntime().availableProcessors();
private static final int WRITE_MAX_SIZE = (1 << 12);
private static final int TOTAL_KEYS = (1 << 20);
Expand All @@ -62,15 +57,11 @@ public final class Stresser {
private final boolean reads = true;

public Stresser() {
<<<<<<< edee8ed34ba79b4b021beb49d4094eb411eaf317
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setPriority(Thread.MAX_PRIORITY)
.setDaemon(true)
.build();
Executors.newSingleThreadScheduledExecutor(threadFactory)
=======
Executors.newSingleThreadScheduledExecutor()
>>>>>>> More graceful executor error handling for removal notifications
.scheduleAtFixedRate(this::status, STATUS_INTERVAL, STATUS_INTERVAL, SECONDS);
maximum = reads ? TOTAL_KEYS : WRITE_MAX_SIZE;
evictions = new LongAdder();
Expand Down

0 comments on commit ccd7ae9

Please sign in to comment.