diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/AsyncResourcesInitialization.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/AsyncResourcesInitialization.java new file mode 100644 index 000000000000..492e8ae268ff --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/AsyncResourcesInitialization.java @@ -0,0 +1,37 @@ +package org.enso.languageserver.boot.resource; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; + +/** Component that initializes resources in parallel. */ +public class AsyncResourcesInitialization implements InitializationComponent { + + private final InitializationComponent[] resources; + + /** + * Create async initialization component. + * + * @param resources the list of resources to initialize + */ + public AsyncResourcesInitialization(InitializationComponent... resources) { + this.resources = resources; + } + + @Override + public boolean isInitialized() { + return Arrays.stream(resources).allMatch(InitializationComponent::isInitialized); + } + + @Override + public CompletableFuture init() { + return CompletableFuture.allOf( + Arrays.stream(resources) + .map( + component -> + component.isInitialized() + ? CompletableFuture.completedFuture(null) + : component.init()) + .toArray(CompletableFuture[]::new)) + .thenRun(() -> {}); + } +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/BlockingInitialization.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/BlockingInitialization.java new file mode 100644 index 000000000000..c56692857d33 --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/BlockingInitialization.java @@ -0,0 +1,35 @@ +package org.enso.languageserver.boot.resource; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; + +/** Initialization component ensuring that only one initialization sequence is running at a time. */ +public final class BlockingInitialization implements InitializationComponent { + + private final InitializationComponent component; + private final Semaphore lock = new Semaphore(1); + + /** + * Create blocking initialization component. + * + * @param component the underlying initialization component to run + */ + public BlockingInitialization(InitializationComponent component) { + this.component = component; + } + + @Override + public boolean isInitialized() { + return component.isInitialized(); + } + + @Override + public CompletableFuture init() { + try { + lock.acquire(); + } catch (InterruptedException e) { + return CompletableFuture.failedFuture(e); + } + return component.init().whenComplete((res, err) -> lock.release()); + } +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/DirectoriesInitialization.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/DirectoriesInitialization.java new file mode 100644 index 000000000000..5b1b79aa79f5 --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/DirectoriesInitialization.java @@ -0,0 +1,46 @@ +package org.enso.languageserver.boot.resource; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.enso.languageserver.data.ProjectDirectoriesConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Directories initialization. */ +public class DirectoriesInitialization implements InitializationComponent { + + private final Executor executor; + private final ProjectDirectoriesConfig projectDirectoriesConfig; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private volatile boolean isInitialized = false; + + /** + * Creates the directories initialization component. + * + * @param executor the executor that runs the initialization + * @param projectDirectoriesConfig the directories config + */ + public DirectoriesInitialization( + Executor executor, ProjectDirectoriesConfig projectDirectoriesConfig) { + this.executor = executor; + this.projectDirectoriesConfig = projectDirectoriesConfig; + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + + @Override + public CompletableFuture init() { + return CompletableFuture.runAsync( + () -> { + logger.info("Initializing directories..."); + projectDirectoriesConfig.createDirectories(); + logger.info("Initialized directories."); + isInitialized = true; + }, + executor); + } +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/InitializationComponent.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/InitializationComponent.java new file mode 100644 index 000000000000..8fb1d65ee6c2 --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/InitializationComponent.java @@ -0,0 +1,13 @@ +package org.enso.languageserver.boot.resource; + +import java.util.concurrent.CompletableFuture; + +/** A component that should be initialized. */ +public interface InitializationComponent { + + /** @return `true` if the component is initialized */ + boolean isInitialized(); + + /** Initialize the component. */ + CompletableFuture init(); +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/InitializationComponentInitialized.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/InitializationComponentInitialized.java new file mode 100644 index 000000000000..075069fcf804 --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/InitializationComponentInitialized.java @@ -0,0 +1,19 @@ +package org.enso.languageserver.boot.resource; + +/** Object indicating that the initialization is complete. */ +public final class InitializationComponentInitialized { + + private static final class InstanceHolder { + private static final InitializationComponentInitialized INSTANCE = + new InitializationComponentInitialized(); + } + + /** + * Get the initialized marker object. + * + * @return the instance of {@link InitializationComponentInitialized}. + */ + public static InitializationComponentInitialized getInstance() { + return InstanceHolder.INSTANCE; + } +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/JsonRpcInitialization.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/JsonRpcInitialization.java new file mode 100644 index 000000000000..356f7c13714b --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/JsonRpcInitialization.java @@ -0,0 +1,45 @@ +package org.enso.languageserver.boot.resource; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.enso.jsonrpc.ProtocolFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Initialization of JSON-RPC protocol. */ +public class JsonRpcInitialization implements InitializationComponent { + + private final Executor executor; + private final ProtocolFactory protocolFactory; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private volatile boolean isInitialized = false; + + /** + * Create an instance of JSON-RPC initialization component. + * + * @param executor the executor that runs the initialization + * @param protocolFactory the JSON-RPC protocol factory + */ + public JsonRpcInitialization(Executor executor, ProtocolFactory protocolFactory) { + this.executor = executor; + this.protocolFactory = protocolFactory; + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + + @Override + public CompletableFuture init() { + return CompletableFuture.runAsync( + () -> { + logger.info("Initializing JSON-RPC protocol."); + protocolFactory.init(); + logger.info("JSON-RPC protocol initialized."); + isInitialized = true; + }, + executor); + } +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/RepoInitialization.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/RepoInitialization.java new file mode 100644 index 000000000000..b17fa85b29eb --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/RepoInitialization.java @@ -0,0 +1,176 @@ +package org.enso.languageserver.boot.resource; + +import akka.event.EventStream; +import java.io.IOException; +import java.nio.file.FileSystemException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import org.apache.commons.io.FileUtils; +import org.enso.languageserver.data.ProjectDirectoriesConfig; +import org.enso.languageserver.event.InitializedEvent; +import org.enso.logger.masking.MaskedPath; +import org.enso.searcher.sql.SqlDatabase; +import org.enso.searcher.sql.SqlSuggestionsRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.jdk.javaapi.FutureConverters; + +/** Initialization of the Language Server suggestions database. */ +public class RepoInitialization implements InitializationComponent { + + private static final int MAX_RETRIES = 3; + private static final long RETRY_DELAY_MILLIS = 1000; + + private final Executor executor; + + private final ProjectDirectoriesConfig projectDirectoriesConfig; + private final EventStream eventStream; + private final SqlDatabase sqlDatabase; + private final SqlSuggestionsRepo sqlSuggestionsRepo; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private volatile boolean isInitialized = false; + + /** + * Create an instance of repo initialization component. + * + * @param executor the executor that runs the initialization + * @param projectDirectoriesConfig configuration of language server directories + * @param eventStream the events stream + * @param sqlDatabase the sql database + * @param sqlSuggestionsRepo the suggestions repo + */ + public RepoInitialization( + Executor executor, + ProjectDirectoriesConfig projectDirectoriesConfig, + EventStream eventStream, + SqlDatabase sqlDatabase, + SqlSuggestionsRepo sqlSuggestionsRepo) { + this.executor = executor; + this.projectDirectoriesConfig = projectDirectoriesConfig; + this.eventStream = eventStream; + this.sqlDatabase = sqlDatabase; + this.sqlSuggestionsRepo = sqlSuggestionsRepo; + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + + @Override + public CompletableFuture init() { + return initSqlDatabase() + .thenComposeAsync(v -> initSuggestionsRepo(), executor) + .thenRun(() -> isInitialized = true); + } + + private CompletableFuture initSqlDatabase() { + return CompletableFuture.runAsync( + () -> { + logger.info("Initializing sql database [{}]...", sqlDatabase); + sqlDatabase.open(); + logger.info("Initialized sql database [{}].", sqlDatabase); + }, + executor) + .whenCompleteAsync( + (res, err) -> { + if (err != null) { + logger.error("Failed to initialize sql database [{}].", sqlDatabase, err); + } + }, + executor); + } + + private CompletableFuture initSuggestionsRepo() { + return CompletableFuture.runAsync( + () -> logger.info("Initializing suggestions repo [{}]...", sqlDatabase), executor) + .thenComposeAsync( + v -> + doInitSuggestionsRepo().exceptionallyComposeAsync(this::recoverInitializationError), + executor) + .thenRunAsync( + () -> logger.info("Initialized Suggestions repo [{}].", sqlDatabase), executor) + .whenCompleteAsync( + (res, err) -> { + if (err != null) { + logger.error("Failed to initialize SQL suggestions repo [{}].", sqlDatabase, err); + } else { + eventStream.publish(InitializedEvent.SuggestionsRepoInitialized$.MODULE$); + } + }); + } + + private CompletableFuture recoverInitializationError(Throwable error) { + return CompletableFuture.runAsync( + () -> + logger.warn( + "Failed to initialize the suggestions database [{}].", sqlDatabase, error), + executor) + .thenRunAsync(sqlDatabase::close, executor) + .thenComposeAsync(v -> clearDatabaseFile(0), executor) + .thenRunAsync(sqlDatabase::open, executor) + .thenRunAsync(() -> logger.info("Retrying database initialization."), executor) + .thenComposeAsync(v -> doInitSuggestionsRepo(), executor); + } + + private CompletableFuture clearDatabaseFile(int retries) { + return CompletableFuture.runAsync( + () -> { + logger.info("Clear database file. Attempt #{}.", retries + 1); + try { + Files.delete(projectDirectoriesConfig.suggestionsDatabaseFile().toPath()); + } catch (IOException e) { + throw new CompletionException(e); + } + }, + executor) + .exceptionallyComposeAsync(error -> recoverClearDatabaseFile(error, retries), executor); + } + + private CompletableFuture recoverClearDatabaseFile(Throwable error, int retries) { + if (error instanceof CompletionException) { + return recoverClearDatabaseFile(error.getCause(), retries); + } else if (error instanceof NoSuchFileException) { + logger.warn( + "Failed to delete the database file. Attempt #{}. File does not exist [{}].", + retries + 1, + new MaskedPath(projectDirectoriesConfig.suggestionsDatabaseFile().toPath())); + return CompletableFuture.completedFuture(null); + } else if (error instanceof FileSystemException) { + logger.error( + "Failed to delete the database file. Attempt #{}. The file will be removed during the shutdown.", + retries + 1, + error); + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> + FileUtils.deleteQuietly(projectDirectoriesConfig.suggestionsDatabaseFile()))); + return CompletableFuture.failedFuture(error); + } else if (error instanceof IOException) { + logger.error("Failed to delete the database file. Attempt #{}.", retries + 1, error); + if (retries < MAX_RETRIES) { + try { + Thread.sleep(RETRY_DELAY_MILLIS); + } catch (InterruptedException e) { + throw new CompletionException(e); + } + return clearDatabaseFile(retries + 1); + } else { + return CompletableFuture.failedFuture(error); + } + } + + return CompletableFuture.completedFuture(null); + } + + private CompletionStage doInitSuggestionsRepo() { + return FutureConverters.asJava(sqlSuggestionsRepo.init()).thenAccept(res -> {}); + } +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/SequentialResourcesInitialization.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/SequentialResourcesInitialization.java new file mode 100644 index 000000000000..afa8e59ae09a --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/SequentialResourcesInitialization.java @@ -0,0 +1,46 @@ +package org.enso.languageserver.boot.resource; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** Initializes resources in sequence. */ +public class SequentialResourcesInitialization implements InitializationComponent { + + private final InitializationComponent[] resources; + private final Executor executor; + + /** + * Create an instance of sequential initialization component. + * + * @param executor the executor that runs the initialization + * @param resources the list of resources to initialize + */ + public SequentialResourcesInitialization( + Executor executor, InitializationComponent... resources) { + this.resources = resources; + this.executor = executor; + } + + @Override + public boolean isInitialized() { + return Arrays.stream(resources).allMatch(InitializationComponent::isInitialized); + } + + @Override + public CompletableFuture init() { + CompletableFuture result = CompletableFuture.completedFuture(null); + + for (InitializationComponent component : resources) { + result = + result.thenComposeAsync( + res -> + component.isInitialized() + ? CompletableFuture.completedFuture(null) + : component.init(), + executor); + } + + return result; + } +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/TruffleContextInitialization.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/TruffleContextInitialization.java new file mode 100644 index 000000000000..22e5e7d218a8 --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/TruffleContextInitialization.java @@ -0,0 +1,54 @@ +package org.enso.languageserver.boot.resource; + +import akka.event.EventStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.enso.languageserver.event.InitializedEvent; +import org.enso.polyglot.LanguageInfo; +import org.graalvm.polyglot.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Initialize the Truffle context. */ +public class TruffleContextInitialization implements InitializationComponent { + + private final Executor executor; + private final Context truffleContext; + private final EventStream eventStream; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private volatile boolean isInitialized = false; + + /** + * Creates an instance of Truffle initialization component. + * + * @param executor the executor that runs the initialization + * @param eventStream the events stream + * @param truffleContext the Truffle context + */ + public TruffleContextInitialization( + Executor executor, Context truffleContext, EventStream eventStream) { + this.executor = executor; + this.truffleContext = truffleContext; + this.eventStream = eventStream; + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + + @Override + public CompletableFuture init() { + return CompletableFuture.runAsync( + () -> { + logger.info("Initializing Runtime context [{}]...", truffleContext); + truffleContext.initialize(LanguageInfo.ID); + eventStream.publish(InitializedEvent.TruffleContextInitialized$.MODULE$); + logger.info("Initialized Runtime context [{}].", truffleContext); + isInitialized = true; + }, + executor); + } +} diff --git a/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/ZioRuntimeInitialization.java b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/ZioRuntimeInitialization.java new file mode 100644 index 000000000000..4ea63f128217 --- /dev/null +++ b/engine/language-server/src/main/java/org/enso/languageserver/boot/resource/ZioRuntimeInitialization.java @@ -0,0 +1,51 @@ +package org.enso.languageserver.boot.resource; + +import akka.event.EventStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.enso.languageserver.effect.Runtime; +import org.enso.languageserver.event.InitializedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Initialization of ZIO runtime. */ +public class ZioRuntimeInitialization implements InitializationComponent { + + private final Executor executor; + private final Runtime runtime; + private final EventStream eventStream; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private volatile boolean isInitialized = false; + + /** + * Create an instance of ZIO runtime initialization component. + * + * @param executor the executor that runs the initialization + * @param runtime the runtime to initialize + * @param eventStream the events stream + */ + public ZioRuntimeInitialization(Executor executor, Runtime runtime, EventStream eventStream) { + this.executor = executor; + this.runtime = runtime; + this.eventStream = eventStream; + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + + @Override + public CompletableFuture init() { + return CompletableFuture.runAsync( + () -> { + logger.info("Initializing ZIO runtime..."); + runtime.init(); + logger.info("ZIO runtime initialized [{}].", runtime); + isInitialized = true; + eventStream.publish(InitializedEvent.ZioRuntimeInitialized$.MODULE$); + }, + executor); + } +} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/ResourcesInitialization.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/ResourcesInitialization.scala index 9a4dfefc0fa4..36a93359ad69 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/ResourcesInitialization.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/boot/ResourcesInitialization.scala @@ -4,9 +4,10 @@ import akka.event.EventStream import org.enso.jsonrpc.ProtocolFactory import org.enso.languageserver.boot.resource.{ AsyncResourcesInitialization, + BlockingInitialization, DirectoriesInitialization, InitializationComponent, - JsonRpcInitializationComponent, + JsonRpcInitialization, RepoInitialization, SequentialResourcesInitialization, TruffleContextInitialization, @@ -17,7 +18,7 @@ import org.enso.languageserver.effect import org.enso.searcher.sql.{SqlDatabase, SqlSuggestionsRepo} import org.graalvm.polyglot.Context -import scala.concurrent.ExecutionContext +import scala.concurrent.ExecutionContextExecutor /** Helper object for the initialization of the Language Server resources. * Creates the directories, initializes the databases, and the Truffle context. @@ -43,19 +44,23 @@ object ResourcesInitialization { suggestionsRepo: SqlSuggestionsRepo, truffleContext: Context, runtime: effect.Runtime - )(implicit ec: ExecutionContext): InitializationComponent = { - SequentialResourcesInitialization( - new DirectoriesInitialization(directoriesConfig), - AsyncResourcesInitialization( - new JsonRpcInitializationComponent(protocolFactory), - new ZioRuntimeInitialization(runtime, eventStream), - new RepoInitialization( - directoriesConfig, - eventStream, - sqlDatabase, - suggestionsRepo - ), - new TruffleContextInitialization(eventStream, truffleContext) + )(implicit ec: ExecutionContextExecutor): InitializationComponent = { + new BlockingInitialization( + new SequentialResourcesInitialization( + ec, + new DirectoriesInitialization(ec, directoriesConfig), + new AsyncResourcesInitialization( + new JsonRpcInitialization(ec, protocolFactory), + new ZioRuntimeInitialization(ec, runtime, eventStream), + new RepoInitialization( + ec, + directoriesConfig, + eventStream, + sqlDatabase, + suggestionsRepo + ), + new TruffleContextInitialization(ec, truffleContext, eventStream) + ) ) ) } diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/AsyncResourcesInitialization.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/AsyncResourcesInitialization.scala deleted file mode 100644 index bcf5e722c608..000000000000 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/AsyncResourcesInitialization.scala +++ /dev/null @@ -1,34 +0,0 @@ -package org.enso.languageserver.boot.resource - -import scala.concurrent.{ExecutionContext, Future} - -/** Initializes resources in parallel. - * - * @param resources the list of resources to initialize - * @param ec the execution context - */ -class AsyncResourcesInitialization( - resources: Iterable[InitializationComponent] -)(implicit ec: ExecutionContext) - extends InitializationComponent { - - /** @inheritdoc */ - override def init(): Future[InitializationComponent.Initialized.type] = - Future - .traverse(resources)(_.init()) - .map { _ => InitializationComponent.Initialized } -} - -object AsyncResourcesInitialization { - - /** Create [[AsyncResourcesInitialization]] component. - * - * @param resources the list of resources to initialize - * @param ec the execution context - * @return new async initialization component - */ - def apply(resources: InitializationComponent*)(implicit - ec: ExecutionContext - ): AsyncResourcesInitialization = - new AsyncResourcesInitialization(resources) -} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/DirectoriesInitialization.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/DirectoriesInitialization.scala deleted file mode 100644 index 37881062dc09..000000000000 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/DirectoriesInitialization.scala +++ /dev/null @@ -1,25 +0,0 @@ -package org.enso.languageserver.boot.resource - -import com.typesafe.scalalogging.LazyLogging -import org.enso.languageserver.data.ProjectDirectoriesConfig - -import scala.concurrent.{ExecutionContext, Future} - -/** Directories initialization. - * - * @param directoriesConfig the directories config - */ -class DirectoriesInitialization(directoriesConfig: ProjectDirectoriesConfig)( - implicit ec: ExecutionContext -) extends InitializationComponent - with LazyLogging { - - /** @inheritdoc */ - override def init(): Future[InitializationComponent.Initialized.type] = - Future { - logger.info("Initializing directories...") - directoriesConfig.createDirectories() - logger.info("Initialized directories.") - InitializationComponent.Initialized - } -} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/InitializationComponent.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/InitializationComponent.scala deleted file mode 100644 index 02d946876d02..000000000000 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/InitializationComponent.scala +++ /dev/null @@ -1,15 +0,0 @@ -package org.enso.languageserver.boot.resource - -import scala.concurrent.Future - -/** A component that should be initialized. */ -trait InitializationComponent { - - /** Initialize the component. */ - def init(): Future[InitializationComponent.Initialized.type] -} - -object InitializationComponent { - - case object Initialized -} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/JsonRpcInitializationComponent.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/JsonRpcInitializationComponent.scala deleted file mode 100644 index 900567a47959..000000000000 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/JsonRpcInitializationComponent.scala +++ /dev/null @@ -1,25 +0,0 @@ -package org.enso.languageserver.boot.resource - -import com.typesafe.scalalogging.LazyLogging -import org.enso.jsonrpc.ProtocolFactory - -import scala.concurrent.{ExecutionContext, Future} - -/** Initialization of JSON-RPC protocol. - * - * @param protocolFactory the JSON-RPC protocol factory - */ -class JsonRpcInitializationComponent(protocolFactory: ProtocolFactory)(implicit - ec: ExecutionContext -) extends InitializationComponent - with LazyLogging { - - /** @inheritdoc */ - override def init(): Future[InitializationComponent.Initialized.type] = - Future { - logger.info("Initializing JSON-RPC protocol.") - protocolFactory.init() - logger.info("JSON-RPC protocol initialized.") - InitializationComponent.Initialized - } -} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/RepoInitialization.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/RepoInitialization.scala deleted file mode 100644 index 3835c2ac4e2b..000000000000 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/RepoInitialization.scala +++ /dev/null @@ -1,152 +0,0 @@ -package org.enso.languageserver.boot.resource - -import java.io.IOException -import java.nio.file.{FileSystemException, Files, NoSuchFileException} - -import akka.event.EventStream -import com.typesafe.scalalogging.LazyLogging -import org.apache.commons.io.FileUtils -import org.enso.languageserver.data.ProjectDirectoriesConfig -import org.enso.languageserver.event.InitializedEvent -import org.enso.logger.masking.MaskedPath -import org.enso.searcher.sql.{SqlDatabase, SqlSuggestionsRepo} - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal -import scala.util.{Failure, Success} - -/** Initialization of the Language Server repositories. - * - * @param directoriesConfig configuration of language server directories - * @param eventStream akka events stream - * @param sqlDatabase the sql database - * @param suggestionsRepo the suggestions repo - */ -class RepoInitialization( - directoriesConfig: ProjectDirectoriesConfig, - eventStream: EventStream, - sqlDatabase: SqlDatabase, - suggestionsRepo: SqlSuggestionsRepo -)(implicit ec: ExecutionContext) - extends InitializationComponent - with LazyLogging { - - /** @inheritdoc */ - override def init(): Future[InitializationComponent.Initialized.type] = - for { - _ <- sqlDatabaseInit - _ <- suggestionsRepoInit - } yield InitializationComponent.Initialized - - private def sqlDatabaseInit: Future[Unit] = { - val initAction = Future { - logger.info("Initializing sql database [{}]...", sqlDatabase) - sqlDatabase.open() - logger.info("Initialized sql database [{}].", sqlDatabase) - } - initAction.onComplete { - case Success(()) => - case Failure(ex) => - logger.error("Failed to initialize sql database [{}].", sqlDatabase, ex) - } - initAction - } - - private def suggestionsRepoInit: Future[Unit] = { - val initAction = - for { - _ <- Future { - logger.info( - "Initializing suggestions repo [{}]...", - MaskedPath(directoriesConfig.suggestionsDatabaseFile.toPath) - ) - } - _ <- suggestionsRepo.init.recoverWith { case NonFatal(error) => - recoverInitError(error, suggestionsRepo.db) - } - _ <- Future { - logger.info( - "Initialized Suggestions repo [{}].", - MaskedPath(directoriesConfig.suggestionsDatabaseFile.toPath) - ) - } - } yield () - initAction.onComplete { - case Success(()) => - eventStream.publish(InitializedEvent.SuggestionsRepoInitialized) - case Failure(ex) => - logger.error( - "Failed to initialize SQL suggestions repo [{}].", - MaskedPath(directoriesConfig.suggestionsDatabaseFile.toPath), - ex - ) - } - initAction - } - - private def recoverInitError( - error: Throwable, - db: SqlDatabase - ): Future[Unit] = - for { - _ <- Future { - logger.warn( - "Failed to initialize the suggestions database [{}].", - MaskedPath(directoriesConfig.suggestionsDatabaseFile.toPath), - error - ) - } - _ <- Future(db.close()) - _ <- clearDatabaseFile() - _ <- Future(db.open()) - _ <- Future { - logger.info("Retrying database initialization.") - } - _ <- suggestionsRepo.init - } yield () - - private def clearDatabaseFile(retries: Int = 0): Future[Unit] = { - Future { - logger.info("Clear database file. Attempt #{}.", retries + 1) - Files.delete(directoriesConfig.suggestionsDatabaseFile.toPath) - }.recoverWith { - case _: NoSuchFileException => - logger.warn( - "Failed to delete the database file. Attempt #{}. " + - "File does not exist [{}].", - retries + 1, - MaskedPath(directoriesConfig.suggestionsDatabaseFile.toPath) - ) - Future.successful(()) - case error: FileSystemException => - logger.error( - "Failed to delete the database file. Attempt #{}. " + - "The file will be removed during the shutdown.", - retries + 1, - error - ) - sys.addShutdownHook( - FileUtils.deleteQuietly(directoriesConfig.suggestionsDatabaseFile) - ) - Future.failed(error) - case error: IOException => - logger.error( - "Failed to delete the database file. Attempt #{}.", - retries + 1, - error - ) - if (retries < RepoInitialization.MaxRetries) { - Thread.sleep(1000) - clearDatabaseFile(retries + 1) - } else { - Future.failed(error) - } - } - } - -} - -object RepoInitialization { - - val MaxRetries = 3 -} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/SequentialResourcesInitialization.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/SequentialResourcesInitialization.scala deleted file mode 100644 index cd9b9819e2f1..000000000000 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/SequentialResourcesInitialization.scala +++ /dev/null @@ -1,26 +0,0 @@ -package org.enso.languageserver.boot.resource -import scala.concurrent.{ExecutionContext, Future} - -/** Initializes resources in sequence. - * - * @param resources the list of resources to initialize - */ -class SequentialResourcesInitialization( - resources: Seq[InitializationComponent] -)(implicit ec: ExecutionContext) - extends InitializationComponent { - - /** @inheritdoc */ - override def init(): Future[InitializationComponent.Initialized.type] = - resources.foldLeft(Future.successful(InitializationComponent.Initialized)) { - (action, resource) => action.flatMap(_ => resource.init()) - } -} - -object SequentialResourcesInitialization { - - def apply(resources: InitializationComponent*)(implicit - ec: ExecutionContext - ): SequentialResourcesInitialization = - new SequentialResourcesInitialization(resources) -} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/TruffleContextInitialization.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/TruffleContextInitialization.scala deleted file mode 100644 index 90003320fa78..000000000000 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/TruffleContextInitialization.scala +++ /dev/null @@ -1,33 +0,0 @@ -package org.enso.languageserver.boot.resource - -import akka.event.EventStream -import com.typesafe.scalalogging.LazyLogging -import org.enso.languageserver.event.InitializedEvent -import org.enso.polyglot.LanguageInfo -import org.graalvm.polyglot.Context - -import scala.concurrent.{ExecutionContext, Future} - -/** Initialize the Truffle context. - * - * @param eventStream akka events stream - * @param truffleContext the Truffle context - */ -class TruffleContextInitialization( - eventStream: EventStream, - truffleContext: Context -)(implicit - ec: ExecutionContext -) extends InitializationComponent - with LazyLogging { - - /** @inheritdoc */ - override def init(): Future[InitializationComponent.Initialized.type] = - Future { - logger.info("Initializing Runtime context...") - truffleContext.initialize(LanguageInfo.ID) - eventStream.publish(InitializedEvent.TruffleContextInitialized) - logger.info("Initialized Runtime context.") - InitializationComponent.Initialized - } -} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/ZioRuntimeInitialization.scala b/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/ZioRuntimeInitialization.scala deleted file mode 100644 index 6b99ba56d933..000000000000 --- a/engine/language-server/src/main/scala/org/enso/languageserver/boot/resource/ZioRuntimeInitialization.scala +++ /dev/null @@ -1,33 +0,0 @@ -package org.enso.languageserver.boot.resource - -import akka.event.EventStream -import com.typesafe.scalalogging.LazyLogging -import org.enso.languageserver.effect -import org.enso.languageserver.event.InitializedEvent - -import scala.concurrent.{ExecutionContext, Future} - -/** Initialization of ZIO runtime. - * - * @param runtime the runtime to initialize - * @param eventStream events stream - * @param ec the execution context - */ -class ZioRuntimeInitialization( - runtime: effect.Runtime, - eventStream: EventStream -)(implicit - ec: ExecutionContext -) extends InitializationComponent - with LazyLogging { - - /** @inheritdoc */ - override def init(): Future[InitializationComponent.Initialized.type] = - Future { - logger.info("Initializing ZIO runtime...") - runtime.init() - logger.info("ZIO runtime initialized [{}].", runtime) - eventStream.publish(InitializedEvent.ZioRuntimeInitialized) - InitializationComponent.Initialized - } -} diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala index 2a82b5278b87..bd18305bdc6c 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/protocol/json/JsonConnectionController.scala @@ -1,14 +1,17 @@ package org.enso.languageserver.protocol.json import akka.actor.{Actor, ActorRef, Cancellable, Props, Stash, Status} -import akka.pattern.pipe +import akka.pattern.pipeCompletionStage import akka.util.Timeout import com.typesafe.scalalogging.LazyLogging import org.enso.cli.task.ProgressUnit import org.enso.cli.task.notifications.TaskNotificationApi import org.enso.jsonrpc._ import org.enso.languageserver.ai.AICompletion -import org.enso.languageserver.boot.resource.InitializationComponent +import org.enso.languageserver.boot.resource.{ + InitializationComponent, + InitializationComponentInitialized +} import org.enso.languageserver.capability.CapabilityApi.{ AcquireCapability, ForceReleaseCapability, @@ -173,8 +176,15 @@ class JsonConnectionController( _, InitProtocolConnection.Params(clientId) ) => - logger.info("Initializing resources.") - mainComponent.init().pipeTo(self) + logger.info( + "Initializing resources for [{}] [{}].", + clientId, + mainComponent + ) + mainComponent + .init() + .thenApply(_ => InitializationComponentInitialized.getInstance) + .pipeTo(self) context.become(initializing(webActor, clientId, req, sender())) case Request(_, id, _) => @@ -190,7 +200,7 @@ class JsonConnectionController( request: Request[_, _], receiver: ActorRef ): Receive = { - case InitializationComponent.Initialized => + case _: InitializationComponentInitialized => logger.info("RPC session initialized for client [{}].", clientId) val session = JsonSession(clientId, self) context.system.eventStream.publish(JsonSessionInitialized(session)) diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/search/SuggestionsHandler.scala b/engine/language-server/src/main/scala/org/enso/languageserver/search/SuggestionsHandler.scala index 874eaaf65009..86e58510f5d1 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/search/SuggestionsHandler.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/search/SuggestionsHandler.scala @@ -331,17 +331,14 @@ final class SuggestionsHandler( handlerAction.pipeTo(handler) - if (state.shouldStartBackgroundProcessing) { - runtimeConnector ! Api.Request(Api.StartBackgroundProcessing()) - context.become( - initialized( - projectName, - graph, - clients, - state.backgroundProcessingStarted() - ) + context.become( + initialized( + projectName, + graph, + clients, + state.backgroundProcessingStarted() ) - } + ) case Completion(path, pos, selfType, returnType, tags, isStatic) => val selfTypes = selfType.toList.flatMap(ty => ty :: graph.getParents(ty)) @@ -426,14 +423,7 @@ final class SuggestionsHandler( ) ) action.pipeTo(handler)(sender()) - context.become( - initialized( - projectName, - graph, - clients, - state.backgroundProcessingStopped() - ) - ) + context.become(initialized(projectName, graph, clients, state)) case ProjectNameUpdated(name, updates) => updates.foreach(sessionRouter ! _) diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/boot/resource/RepoInitializationSpec.scala b/engine/language-server/src/test/scala/org/enso/languageserver/boot/resource/RepoInitializationSpec.scala index b1420f8b74ef..4c47a92db198 100644 --- a/engine/language-server/src/test/scala/org/enso/languageserver/boot/resource/RepoInitializationSpec.scala +++ b/engine/language-server/src/test/scala/org/enso/languageserver/boot/resource/RepoInitializationSpec.scala @@ -8,7 +8,7 @@ import org.enso.languageserver.data._ import org.enso.languageserver.event.InitializedEvent import org.enso.languageserver.filemanager.{ContentRoot, ContentRootWithFile} import org.enso.searcher.sql.{SchemaVersion, SqlDatabase, SqlSuggestionsRepo} -import org.enso.testkit.FlakySpec +import org.enso.testkit.{FlakySpec, ToScalaFutureConversions} import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -16,6 +16,7 @@ import org.sqlite.SQLiteException import java.nio.file.{Files, StandardOpenOption} import java.util.UUID + import scala.concurrent.Await import scala.concurrent.duration._ @@ -25,6 +26,7 @@ class RepoInitializationSpec with AnyWordSpecLike with Matchers with BeforeAndAfterAll + with ToScalaFutureConversions with FlakySpec { import system.dispatcher @@ -43,6 +45,7 @@ class RepoInitializationSpec val component = new RepoInitialization( + system.dispatcher, config.directories, system.eventStream, sqlDatabase, @@ -68,6 +71,7 @@ class RepoInitializationSpec val testSchemaVersion = Long.MaxValue val component = new RepoInitialization( + system.dispatcher, config.directories, system.eventStream, sqlDatabase, @@ -96,6 +100,7 @@ class RepoInitializationSpec val component = new RepoInitialization( + system.dispatcher, config.directories, system.eventStream, sqlDatabase, @@ -132,6 +137,7 @@ class RepoInitializationSpec withRepos(config) { (sqlDatabase, suggestionsRepo) => val component = new RepoInitialization( + system.dispatcher, config.directories, system.eventStream, sqlDatabase, @@ -168,6 +174,7 @@ class RepoInitializationSpec withRepos(config) { (sqlDatabase, suggestionsRepo) => val component = new RepoInitialization( + system.dispatcher, config.directories, system.eventStream, sqlDatabase, diff --git a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala index 1a6ae580daaf..6f2e5964b32f 100644 --- a/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala +++ b/engine/language-server/src/test/scala/org/enso/languageserver/websocket/json/BaseServerTest.scala @@ -60,7 +60,6 @@ import org.slf4j.event.Level import java.nio.file.{Files, Path} import java.util.UUID -import scala.concurrent.Await import scala.concurrent.duration._ class BaseServerTest @@ -70,8 +69,6 @@ class BaseServerTest with WithTemporaryDirectory with FakeEnvironment { - import system.dispatcher - val timeout: FiniteDuration = 10.seconds def isFileWatcherEnabled: Boolean = false @@ -141,16 +138,23 @@ class BaseServerTest val sqlDatabase = SqlDatabase(config.directories.suggestionsDatabaseFile) val suggestionsRepo = new SqlSuggestionsRepo(sqlDatabase)(system.dispatcher) - val initializationComponent = SequentialResourcesInitialization( - new DirectoriesInitialization(config.directories), - new ZioRuntimeInitialization(zioRuntime, system.eventStream), - new RepoInitialization( - config.directories, - system.eventStream, - sqlDatabase, - suggestionsRepo + private def initializationComponent = + new SequentialResourcesInitialization( + system.dispatcher, + new DirectoriesInitialization(system.dispatcher, config.directories), + new ZioRuntimeInitialization( + system.dispatcher, + zioRuntime, + system.eventStream + ), + new RepoInitialization( + system.dispatcher, + config.directories, + system.eventStream, + sqlDatabase, + suggestionsRepo + ) ) - ) val contentRootManagerActor = system.actorOf(ContentRootManagerActor.props(config)) @@ -262,7 +266,7 @@ class BaseServerTest UUID.randomUUID(), Api.GetTypeGraphResponse(typeGraph) ) - Await.ready(initializationComponent.init(), timeout) + initializationComponent.init().get(timeout.length, timeout.unit) suggestionsHandler ! ProjectNameUpdated("Test") val environment = fakeInstalledEnvironment() diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/InvalidateModulesIndexCommand.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/InvalidateModulesIndexCommand.java index e30714100f4b..7edaa2710840 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/InvalidateModulesIndexCommand.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/InvalidateModulesIndexCommand.java @@ -5,6 +5,7 @@ import java.util.logging.Level; import org.enso.interpreter.instrument.execution.RuntimeContext; import org.enso.interpreter.instrument.job.DeserializeLibrarySuggestionsJob; +import org.enso.interpreter.instrument.job.StartBackgroundProcessingJob; import org.enso.interpreter.runtime.EnsoContext; import org.enso.polyglot.runtime.Runtime$Api$InvalidateModulesIndexResponse; import scala.Option; @@ -25,17 +26,17 @@ public InvalidateModulesIndexCommand(Option maybeRequestId) { } @Override + @SuppressWarnings("unchecked") public Future executeAsynchronously(RuntimeContext ctx, ExecutionContext ec) { return Future.apply( () -> { TruffleLogger logger = ctx.executionService().getLogger(); long writeCompilationLockTimestamp = ctx.locking().acquireWriteCompilationLock(); try { - ctx.jobControlPlane().abortAllJobs(); + ctx.jobControlPlane().abortBackgroundJobs(DeserializeLibrarySuggestionsJob.class); EnsoContext context = ctx.executionService().getContext(); context.getTopScope().getModules().forEach(module -> module.setIndexed(false)); - ctx.jobControlPlane().stopBackgroundJobs(); context .getPackageRepository() @@ -47,6 +48,7 @@ public Future executeAsynchronously(RuntimeContext ctx, ExecutionCont return BoxedUnit.UNIT; }); + StartBackgroundProcessingJob.startBackgroundJobs(ctx); reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx); } finally { ctx.locking().releaseWriteCompilationLock(); diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/BackgroundJob.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/BackgroundJob.java index 6a7e5035b159..39a5bfe4d4b5 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/BackgroundJob.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/BackgroundJob.java @@ -1,19 +1,24 @@ package org.enso.interpreter.instrument.job; +import java.util.Comparator; import scala.collection.immutable.List$; /** The job that runs in the background. */ -public abstract class BackgroundJob extends Job implements Comparable> { +public abstract class BackgroundJob extends Job { private final int priority; + /** Comparator defining the order of jobs in the background jobs queue. */ + public static final Comparator> BACKGROUND_JOBS_QUEUE_ORDER = + Comparator.comparingInt(BackgroundJob::getPriority); + /** * Create a background job with priority. * * @param priority the job priority. Lower number indicates higher priority. */ public BackgroundJob(int priority) { - super(List$.MODULE$.empty(), false, false); + super(List$.MODULE$.empty(), true, false); this.priority = priority; } @@ -21,9 +26,4 @@ public BackgroundJob(int priority) { public int getPriority() { return priority; } - - @Override - public int compareTo(BackgroundJob that) { - return Integer.compare(this.priority, that.getPriority()); - } } diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/SerializeModuleJob.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/SerializeModuleJob.java index c26570ed4d24..0887c2047d1f 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/SerializeModuleJob.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/SerializeModuleJob.java @@ -55,4 +55,9 @@ public Void run(RuntimeContext ctx) { } return null; } + + @Override + public String toString() { + return "SerializeModuleJob(" + moduleName.toString() + ")"; + } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala index fc77777c2f98..303c764179a5 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala @@ -38,7 +38,7 @@ class ModifyVisualizationCmd( val jobFilter: PartialFunction[Job[_], Option[ExpressionId]] = { case upsert: UpsertVisualizationJob if upsert.visualizationId == request.visualizationId => - Some(upsert.key) + Some(upsert.expressionId) } ctx.jobControlPlane.jobInProgress(jobFilter) } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala index 55fb88fd5ffd..73ecb9458b86 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala @@ -4,6 +4,8 @@ import org.enso.interpreter.instrument.job.Job import java.util.UUID +import scala.annotation.varargs + /** Controls running jobs. */ trait JobControlPlane { @@ -15,6 +17,7 @@ trait JobControlPlane { * * @param ignoredJobs the list of jobs to keep in the execution queue */ + @varargs def abortAllExcept(ignoredJobs: Class[_ <: Job[_]]*): Unit /** Aborts all jobs that relates to the specified execution context. @@ -23,6 +26,13 @@ trait JobControlPlane { */ def abortJobs(contextId: UUID): Unit + /** Abort provided background jobs. + * + * @param toAbort the list of jobs to abort + */ + @varargs + def abortBackgroundJobs(toAbort: Class[_ <: Job[_]]*): Unit + /** Starts background jobs processing. * * @return `true` if the background jobs were started and `false` if they are diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala index a3d6eb473853..b1cc10c3b1e3 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala @@ -66,27 +66,38 @@ final class JobExecutionEngine( override def runBackground[A](job: BackgroundJob[A]): Unit = synchronized { if (isBackgroundJobsStarted) { + cancelDuplicateJobs(job, backgroundJobsRef) runInternal(job, backgroundJobExecutor, backgroundJobsRef) } else { + job match { + case job: UniqueJob[_] => + delayedBackgroundJobsQueue.removeIf { + case that: UniqueJob[_] => that.equalsTo(job) + case _ => false + } + case _ => + } delayedBackgroundJobsQueue.add(job) } } /** @inheritdoc */ override def run[A](job: Job[A]): Future[A] = { - cancelDuplicateJobs(job) + cancelDuplicateJobs(job, runningJobsRef) runInternal(job, jobExecutor, runningJobsRef) } - private def cancelDuplicateJobs[A](job: Job[A]): Unit = { + private def cancelDuplicateJobs[A]( + job: Job[A], + runningJobsRef: AtomicReference[Vector[RunningJob]] + ): Unit = { job match { case job: UniqueJob[_] => val allJobs = runningJobsRef.updateAndGet(_.filterNot(_.future.isCancelled)) allJobs.foreach { runningJob => runningJob.job match { - case jobRef: UniqueJob[_] - if jobRef.getClass == job.getClass && jobRef.key == job.key => + case jobRef: UniqueJob[_] if jobRef.equalsTo(job) => runtimeContext.executionService.getLogger .log(Level.FINEST, s"Cancelling duplicate job [$jobRef].") runningJob.future.cancel(jobRef.mayInterruptIfRunning) @@ -164,6 +175,19 @@ final class JobExecutionEngine( .interruptThreads() } + override def abortBackgroundJobs(toAbort: Class[_ <: Job[_]]*): Unit = { + val allJobs = + backgroundJobsRef.updateAndGet(_.filterNot(_.future.isCancelled)) + val cancellableJobs = allJobs + .filter { runningJob => + runningJob.job.isCancellable && + toAbort.contains(runningJob.job.getClass) + } + cancellableJobs.foreach { runningJob => + runningJob.future.cancel(runningJob.job.mayInterruptIfRunning) + } + } + /** @inheritdoc */ override def startBackgroundJobs(): Boolean = synchronized { @@ -193,7 +217,18 @@ final class JobExecutionEngine( /** Submit background jobs preserving the stable order. */ private def submitBackgroundJobsOrdered(): Unit = { - Collections.sort(delayedBackgroundJobsQueue) + Collections.sort( + delayedBackgroundJobsQueue, + BackgroundJob.BACKGROUND_JOBS_QUEUE_ORDER + ) + runtimeContext.executionService.getLogger.log( + Level.FINE, + "Submitting {0} background jobs [{1}]", + Array[AnyRef]( + delayedBackgroundJobsQueue.size(): Integer, + delayedBackgroundJobsQueue + ) + ) delayedBackgroundJobsQueue.forEach(job => runBackground(job)) delayedBackgroundJobsQueue.clear() } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DeserializeLibrarySuggestionsJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DeserializeLibrarySuggestionsJob.scala index 9497874a18c3..22f03753bb05 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DeserializeLibrarySuggestionsJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DeserializeLibrarySuggestionsJob.scala @@ -13,8 +13,17 @@ import scala.jdk.CollectionConverters._ * @param libraryName the name of loaded library */ final class DeserializeLibrarySuggestionsJob( - libraryName: LibraryName -) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority) { + val libraryName: LibraryName +) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority) + with UniqueJob[Unit] { + + /** @inheritdoc */ + override def equalsTo(that: UniqueJob[_]): Boolean = + that match { + case that: DeserializeLibrarySuggestionsJob => + this.libraryName == that.libraryName + case _ => false + } /** @inheritdoc */ override def run(implicit ctx: RuntimeContext): Unit = { diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DetachVisualizationJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DetachVisualizationJob.scala index 6ba2e8cef330..e583a8236f79 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DetachVisualizationJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DetachVisualizationJob.scala @@ -17,9 +17,18 @@ import java.util.logging.Level */ class DetachVisualizationJob( visualizationId: VisualizationId, - expressionId: ExpressionId, + val expressionId: ExpressionId, contextId: ContextId -) extends UniqueJob[Unit](expressionId, List(contextId), false) { +) extends Job[Unit](List(contextId), false, false) + with UniqueJob[Unit] { + + /** @inheritdoc */ + override def equalsTo(that: UniqueJob[_]): Boolean = + that match { + case that: DetachVisualizationJob => + this.expressionId == that.expressionId + case _ => false + } /** @inheritdoc */ override def run(implicit ctx: RuntimeContext): Unit = { diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala index 329b5768a046..86bb4e7849e2 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala @@ -88,9 +88,7 @@ class ExecuteJob( Level.FINEST, s"Kept context lock [ExecuteJob] for ${contextId} for ${System.currentTimeMillis() - acquiredLock} milliseconds" ) - } - StartBackgroundProcessingJob.startBackgroundJobs() } override def toString(): String = { diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala index 078fee5943d8..6a154daef530 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala @@ -25,17 +25,16 @@ abstract class Job[+A]( def run(implicit ctx: RuntimeContext): A } -/** The job queue can contain only one job of this type with the same `key`. - * When a job of this type is added to the job queue, previous duplicate jobs - * are cancelled. - * - * @param key a unique job key - * @param contextIds affected executions contests' ids - * @param mayInterruptIfRunning determines if the job may be interruptd when - * running +/** The job queue can contain only one job of this type decided by the + * `equalsTo` method. When a job of this type is added to the job queue, + * previous duplicate jobs are cancelled. */ -abstract class UniqueJob[+A]( - val key: UUID, - contextIds: List[UUID], - mayInterruptIfRunning: Boolean -) extends Job[A](contextIds, isCancellable = false, mayInterruptIfRunning) +trait UniqueJob[A] { self: Job[A] => + + /** Decide if this job is the same as the other job. + * + * @param that the other job to compare with + * @return `true` if `this` job is considered the same as `that` job + */ + def equalsTo(that: UniqueJob[_]): Boolean +} diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala index 0780ccb83122..d8ee7a82dc52 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala @@ -38,13 +38,22 @@ import java.util.logging.Level class UpsertVisualizationJob( requestId: Option[Api.RequestId], val visualizationId: Api.VisualizationId, - expressionId: Api.ExpressionId, + val expressionId: Api.ExpressionId, config: Api.VisualizationConfiguration -) extends UniqueJob[Option[Executable]]( - expressionId, +) extends Job[Option[Executable]]( List(config.executionContextId), + false, false - ) { + ) + with UniqueJob[Option[Executable]] { + + /** @inheritdoc */ + override def equalsTo(that: UniqueJob[_]): Boolean = + that match { + case that: UpsertVisualizationJob => + this.expressionId == that.expressionId + case _ => false + } /** @inheritdoc */ override def run(implicit ctx: RuntimeContext): Option[Executable] = { diff --git a/engine/runtime-with-instruments/src/test/java/org/enso/interpreter/test/instrument/IncrementalUpdatesTest.java b/engine/runtime-with-instruments/src/test/java/org/enso/interpreter/test/instrument/IncrementalUpdatesTest.java index 27a72d887ac0..51cd6b28b3d3 100644 --- a/engine/runtime-with-instruments/src/test/java/org/enso/interpreter/test/instrument/IncrementalUpdatesTest.java +++ b/engine/runtime-with-instruments/src/test/java/org/enso/interpreter/test/instrument/IncrementalUpdatesTest.java @@ -12,7 +12,6 @@ import org.enso.interpreter.test.Metadata; import org.enso.interpreter.test.NodeCountingTestInstrument; import org.enso.interpreter.test.instrument.RuntimeServerTest.TestContext; -import org.enso.polyglot.runtime.Runtime$Api$BackgroundJobsStartedNotification; import org.enso.polyglot.runtime.Runtime$Api$CreateContextRequest; import org.enso.polyglot.runtime.Runtime$Api$CreateContextResponse; import org.enso.polyglot.runtime.Runtime$Api$EditFileNotification; @@ -220,12 +219,11 @@ private static String extractPositions(String code, String chars, Map + case None => compiler.context.logSerializationManager( - Level.FINEST, + Level.FINE, "Unable to load suggestions for library [{0}].", libraryName ) diff --git a/lib/scala/testkit/src/main/scala/org/enso/testkit/ToScalaFutureConversions.scala b/lib/scala/testkit/src/main/scala/org/enso/testkit/ToScalaFutureConversions.scala new file mode 100644 index 000000000000..509349fafc28 --- /dev/null +++ b/lib/scala/testkit/src/main/scala/org/enso/testkit/ToScalaFutureConversions.scala @@ -0,0 +1,15 @@ +package org.enso.testkit + +import org.scalatest.TestSuite + +import java.util.concurrent.CompletableFuture + +import scala.concurrent.Future +import scala.jdk.FutureConverters._ + +trait ToScalaFutureConversions extends TestSuite { + + /** Convert Java future to Scala. */ + implicit final def toScalaFuture[A](f: CompletableFuture[A]): Future[A] = + f.asScala +}