Skip to content

Commit

Permalink
Removing Thread.sleep from Serialization manager code
Browse files Browse the repository at this point in the history
  • Loading branch information
JaroslavTulach committed Feb 3, 2024
1 parent 0f53896 commit bb1272d
Showing 1 changed file with 20 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,14 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.enso.pkg.QualifiedName;

final class SerializationPool {
/** The maximum number of serialization threads allowed. */
private static final int maximumThreadCount = 2;

/** The number of threads at compiler start. */
private static final int startingThreadCount = maximumThreadCount;

/** The thread keep-alive time in seconds. */
private static final long threadKeepalive = 3;

/** The debug logging level. */
private static final Level debugLogLevel = Level.FINE;

Expand All @@ -43,27 +34,13 @@ final class SerializationPool {
new ConcurrentHashMap<>();

/** The thread pool that handles serialization. */
private final ThreadPoolExecutor pool;
private final ExecutorService pool;

SerializationPool(TruffleCompilerContext context) {
this.context = context;
this.pool =
new ThreadPoolExecutor(
startingThreadCount,
maximumThreadCount,
threadKeepalive,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(),
context::createSystemThread);
this.pool = Executors.newSingleThreadExecutor(context::createSystemThread);
}

/*
<E, M > Future<Boolean> doSerialize(Cache<E, M> cache, E entry, boolean useGlobalCacheLocations) {
context.saveCache(cache, entry, useGlobalCacheLocations);
return null;
}
*/

void prestartAllCoreThreads() {}

/**
Expand All @@ -85,10 +62,11 @@ void shutdown(boolean waitForPendingJobCompletion) throws InterruptedException {
if (!pool.isShutdown()) {
if (waitForPendingJobCompletion && this.hasJobsRemaining()) {
int waitingCount;
int jobCount;
synchronized (isWaitingForSerialization) {
waitingCount = isWaitingForSerialization.size();
jobCount = waitingCount + isSerializing.size();
}
var jobCount = waitingCount + isSerializing.size();
context.logSerializationManager(
debugLogLevel, "Waiting for #{0} serialization jobs to complete.", jobCount);

Expand All @@ -97,7 +75,9 @@ void shutdown(boolean waitForPendingJobCompletion) throws InterruptedException {
int counter = 0;
while (this.hasJobsRemaining() && counter < maxCount) {
counter += 1;
Thread.sleep(1 * 1000);
synchronized (isWaitingForSerialization) {
isWaitingForSerialization.wait(1000);
}
}
}

Expand All @@ -112,21 +92,10 @@ void shutdown(boolean waitForPendingJobCompletion) throws InterruptedException {
}

pool.shutdownNow();
Thread.sleep(100);
context.logSerializationManager(debugLogLevel, "Serialization manager has been shut down.");
}
}

/**
* Checks if the provided module is in the process of being serialized.
*
* @param key the module to check
* @return `true` if `module` is currently being serialized, `false` otherwise
*/
boolean isSerializing(QualifiedName key) {
return isSerializing.containsKey(key);
}

boolean isWaitingForSerialization(QualifiedName key) {
synchronized (isWaitingForSerialization) {
return isWaitingForSerialization.containsKey(key);
Expand All @@ -143,6 +112,7 @@ boolean abort(QualifiedName key) {
synchronized (isWaitingForSerialization) {
if (isWaitingForSerialization(key)) {
var prev = isWaitingForSerialization.remove(key);
isWaitingForSerialization.notifyAll();
if (prev != null) {
return prev.cancel(false);
} else {
Expand All @@ -157,8 +127,9 @@ boolean abort(QualifiedName key) {
void startSerializing(QualifiedName name) {
synchronized (isWaitingForSerialization) {
isWaitingForSerialization.remove(name);
isSerializing.put(name, true);
isWaitingForSerialization.notifyAll();
}
isSerializing.put(name, true);
}

/**
Expand All @@ -167,7 +138,10 @@ void startSerializing(QualifiedName name) {
* @param name the name of the module to set as having finished serialization
*/
void finishSerializing(QualifiedName name) {
isSerializing.remove(name);
synchronized (isWaitingForSerialization) {
isSerializing.remove(name);
isWaitingForSerialization.notifyAll();
}
}

Future<scala.Boolean> submitTask(
Expand All @@ -190,8 +164,10 @@ Future<scala.Boolean> submitTask(
}

void waitWhileSerializing(QualifiedName name) throws InterruptedException {
while (isSerializing.containsKey(name)) {
Thread.sleep(100);
synchronized (isWaitingForSerialization) {
while (isSerializing.containsKey(name)) {
isWaitingForSerialization.wait(100);
}
}
}
}

0 comments on commit bb1272d

Please sign in to comment.