From bb1272d26d30060b205b2effe3b7f35a410134a5 Mon Sep 17 00:00:00 2001 From: Jaroslav Tulach Date: Sat, 3 Feb 2024 07:44:31 +0100 Subject: [PATCH] Removing Thread.sleep from Serialization manager code --- .../runtime/SerializationPool.java | 64 ++++++------------- 1 file changed, 20 insertions(+), 44 deletions(-) diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/SerializationPool.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/SerializationPool.java index e01e755781b2..d71635785c4a 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/SerializationPool.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/SerializationPool.java @@ -4,23 +4,14 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.enso.pkg.QualifiedName; final class SerializationPool { - /** The maximum number of serialization threads allowed. */ - private static final int maximumThreadCount = 2; - - /** The number of threads at compiler start. */ - private static final int startingThreadCount = maximumThreadCount; - - /** The thread keep-alive time in seconds. */ - private static final long threadKeepalive = 3; - /** The debug logging level. */ private static final Level debugLogLevel = Level.FINE; @@ -43,27 +34,13 @@ final class SerializationPool { new ConcurrentHashMap<>(); /** The thread pool that handles serialization. */ - private final ThreadPoolExecutor pool; + private final ExecutorService pool; SerializationPool(TruffleCompilerContext context) { this.context = context; - this.pool = - new ThreadPoolExecutor( - startingThreadCount, - maximumThreadCount, - threadKeepalive, - TimeUnit.SECONDS, - new LinkedBlockingDeque(), - context::createSystemThread); + this.pool = Executors.newSingleThreadExecutor(context::createSystemThread); } - /* - Future doSerialize(Cache cache, E entry, boolean useGlobalCacheLocations) { - context.saveCache(cache, entry, useGlobalCacheLocations); - return null; - } - */ - void prestartAllCoreThreads() {} /** @@ -85,10 +62,11 @@ void shutdown(boolean waitForPendingJobCompletion) throws InterruptedException { if (!pool.isShutdown()) { if (waitForPendingJobCompletion && this.hasJobsRemaining()) { int waitingCount; + int jobCount; synchronized (isWaitingForSerialization) { waitingCount = isWaitingForSerialization.size(); + jobCount = waitingCount + isSerializing.size(); } - var jobCount = waitingCount + isSerializing.size(); context.logSerializationManager( debugLogLevel, "Waiting for #{0} serialization jobs to complete.", jobCount); @@ -97,7 +75,9 @@ void shutdown(boolean waitForPendingJobCompletion) throws InterruptedException { int counter = 0; while (this.hasJobsRemaining() && counter < maxCount) { counter += 1; - Thread.sleep(1 * 1000); + synchronized (isWaitingForSerialization) { + isWaitingForSerialization.wait(1000); + } } } @@ -112,21 +92,10 @@ void shutdown(boolean waitForPendingJobCompletion) throws InterruptedException { } pool.shutdownNow(); - Thread.sleep(100); context.logSerializationManager(debugLogLevel, "Serialization manager has been shut down."); } } - /** - * Checks if the provided module is in the process of being serialized. - * - * @param key the module to check - * @return `true` if `module` is currently being serialized, `false` otherwise - */ - boolean isSerializing(QualifiedName key) { - return isSerializing.containsKey(key); - } - boolean isWaitingForSerialization(QualifiedName key) { synchronized (isWaitingForSerialization) { return isWaitingForSerialization.containsKey(key); @@ -143,6 +112,7 @@ boolean abort(QualifiedName key) { synchronized (isWaitingForSerialization) { if (isWaitingForSerialization(key)) { var prev = isWaitingForSerialization.remove(key); + isWaitingForSerialization.notifyAll(); if (prev != null) { return prev.cancel(false); } else { @@ -157,8 +127,9 @@ boolean abort(QualifiedName key) { void startSerializing(QualifiedName name) { synchronized (isWaitingForSerialization) { isWaitingForSerialization.remove(name); + isSerializing.put(name, true); + isWaitingForSerialization.notifyAll(); } - isSerializing.put(name, true); } /** @@ -167,7 +138,10 @@ void startSerializing(QualifiedName name) { * @param name the name of the module to set as having finished serialization */ void finishSerializing(QualifiedName name) { - isSerializing.remove(name); + synchronized (isWaitingForSerialization) { + isSerializing.remove(name); + isWaitingForSerialization.notifyAll(); + } } Future submitTask( @@ -190,8 +164,10 @@ Future submitTask( } void waitWhileSerializing(QualifiedName name) throws InterruptedException { - while (isSerializing.containsKey(name)) { - Thread.sleep(100); + synchronized (isWaitingForSerialization) { + while (isSerializing.containsKey(name)) { + isWaitingForSerialization.wait(100); + } } } }