Skip to content

Commit

Permalink
Synchronize suggestions loading after the reconnect (#9043)
Browse files Browse the repository at this point in the history
related #8689

Fixes a race between the language server SQL updating logic and the engine `DeserializeLibrarySuggestionsJob`s when the library suggestions may start loading before the database is properly cleaned up after the reconnect.
  • Loading branch information
4e6 authored Feb 13, 2024
1 parent 88c1cc1 commit 9c982e0
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ object SearchProtocol {
updates: Seq[SuggestionsDatabaseUpdate]
)

/** A request to clean the suggestions database. */
case object CleanSuggestionsDatabase

/** The request to receive contents of the suggestions database. */
case object GetSuggestionsDatabase

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ final class SuggestionsHandler(
)
context.system.eventStream
.subscribe(self, classOf[Api.LibraryLoaded])
context.system.eventStream
.subscribe(self, classOf[Api.BackgroundJobsStartedNotification])
context.system.eventStream.subscribe(self, classOf[FileDeletedEvent])
context.system.eventStream
.subscribe(self, InitializedEvent.SuggestionsRepoInitialized.getClass)
Expand Down Expand Up @@ -186,13 +188,29 @@ final class SuggestionsHandler(

case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification
if state.isSuggestionLoadingRunning =>
state.suggestionLoadingQueue.enqueue(msg)
logger.trace(
"SuggestionsDatabaseSuggestionsLoadedNotification [shouldStartBackgroundProcessing={}].",
state.shouldStartBackgroundProcessing
)
if (state.shouldStartBackgroundProcessing) {
state.suggestionLoadingQueue.clear()
} else {
state.suggestionLoadingQueue.enqueue(msg)
}

case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification =>
logger.debug(
"Starting loading suggestions for library [{}].",
msg.libraryName
)
context.become(
initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)
applyLoadedSuggestions(msg.suggestions)
.onComplete {
case Success(notification) =>
Expand All @@ -214,14 +232,6 @@ final class SuggestionsHandler(
)
self ! SuggestionsHandler.SuggestionLoadingCompleted
}
context.become(
initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)

case msg: Api.SuggestionsDatabaseModuleUpdateNotification
if state.isSuggestionUpdatesRunning =>
Expand Down Expand Up @@ -304,39 +314,64 @@ final class SuggestionsHandler(
)
)

case Api.BackgroundJobsStartedNotification() =>
context.become(
initialized(
projectName,
graph,
clients,
state.backgroundProcessingStarted()
)
)

case GetSuggestionsDatabaseVersion =>
suggestionsRepo.currentVersion
.map(GetSuggestionsDatabaseVersionResult)
.pipeTo(sender())

case GetSuggestionsDatabase =>
val responseAction = for {
_ <- suggestionsRepo.clean
version <- suggestionsRepo.currentVersion
} yield GetSuggestionsDatabaseResult(version, Seq())

responseAction.pipeTo(sender())

val handlerAction = for {
_ <- responseAction
} yield SearchProtocol.InvalidateModulesIndex
case CleanSuggestionsDatabase =>
if (state.isSuggestionLoadingRunning) stash()
else {
context.become(
initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)
for {
_ <- suggestionsRepo.clean
} yield {
logger.trace(
"CleanSuggestionsDatabase [{}].",
state.suggestionLoadingQueue
)
state.suggestionLoadingQueue.clear()
runtimeConnector ! Api.Request(Api.StartBackgroundProcessing())
}
}

case GetSuggestionsDatabase =>
val handler = context.system.actorOf(
InvalidateModulesIndexHandler.props(
RuntimeFailureMapper(contentRootManager),
timeout,
runtimeConnector
runtimeConnector,
self
)
)

handlerAction.pipeTo(handler)
handler ! SearchProtocol.InvalidateModulesIndex

sender() ! GetSuggestionsDatabaseResult(0, Seq())

context.become(
initialized(
projectName,
graph,
clients,
state.backgroundProcessingStarted()
state.backgroundProcessingStopped()
)
)

Expand Down Expand Up @@ -419,7 +454,8 @@ final class SuggestionsHandler(
InvalidateModulesIndexHandler.props(
runtimeFailureMapper,
timeout,
runtimeConnector
runtimeConnector,
self
)
)
action.pipeTo(handler)(sender())
Expand Down Expand Up @@ -450,6 +486,7 @@ final class SuggestionsHandler(
)

case SuggestionLoadingCompleted =>
unstashAll()
if (state.suggestionLoadingQueue.nonEmpty) {
self ! state.suggestionLoadingQueue.dequeue()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import scala.concurrent.duration.FiniteDuration
* @param runtimeFailureMapper mapper for runtime failures
* @param timeout request timeout
* @param runtime reference to the runtime connector
* @param suggestionsHandler reference to the suggestions handler
*/
final class InvalidateModulesIndexHandler(
runtimeFailureMapper: RuntimeFailureMapper,
timeout: FiniteDuration,
runtime: ActorRef
runtime: ActorRef,
suggestionsHandler: ActorRef
) extends Actor
with LazyLogging
with UnhandledLogging {
Expand Down Expand Up @@ -50,6 +52,7 @@ final class InvalidateModulesIndexHandler(
context.stop(self)

case Api.Response(_, Api.InvalidateModulesIndexResponse()) =>
suggestionsHandler ! SearchProtocol.CleanSuggestionsDatabase
replyTo ! SearchProtocol.InvalidateSuggestionsDatabaseResult
cancellable.cancel()
context.stop(self)
Expand All @@ -67,14 +70,21 @@ object InvalidateModulesIndexHandler {
*
* @param runtimeFailureMapper mapper for runtime failures
* @param timeout request timeout
* @param runtime reference to the runtime conector
* @param runtime reference to the runtime connector
* @param suggestionsHandler reference to the suggestions handler
*/
def props(
runtimeFailureMapper: RuntimeFailureMapper,
timeout: FiniteDuration,
runtime: ActorRef
runtime: ActorRef,
suggestionsHandler: ActorRef
): Props =
Props(
new InvalidateModulesIndexHandler(runtimeFailureMapper, timeout, runtime)
new InvalidateModulesIndexHandler(
runtimeFailureMapper,
timeout,
runtime,
suggestionsHandler
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.logging.Level;
import org.enso.interpreter.instrument.execution.RuntimeContext;
import org.enso.interpreter.instrument.job.DeserializeLibrarySuggestionsJob;
import org.enso.interpreter.instrument.job.StartBackgroundProcessingJob;
import org.enso.interpreter.runtime.EnsoContext;
import org.enso.polyglot.runtime.Runtime$Api$InvalidateModulesIndexResponse;
import scala.Option;
Expand Down Expand Up @@ -33,6 +32,7 @@ public Future<BoxedUnit> executeAsynchronously(RuntimeContext ctx, ExecutionCont
TruffleLogger logger = ctx.executionService().getLogger();
long writeCompilationLockTimestamp = ctx.locking().acquireWriteCompilationLock();
try {
ctx.jobControlPlane().stopBackgroundJobs();
ctx.jobControlPlane().abortBackgroundJobs(DeserializeLibrarySuggestionsJob.class);

EnsoContext context = ctx.executionService().getContext();
Expand All @@ -48,7 +48,6 @@ public Future<BoxedUnit> executeAsynchronously(RuntimeContext ctx, ExecutionCont
return BoxedUnit.UNIT;
});

StartBackgroundProcessingJob.startBackgroundJobs(ctx);
reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx);
} finally {
ctx.locking().releaseWriteCompilationLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ public BackgroundJob(int priority) {
this.priority = priority;
}

/**
* Create a background job with priority.
*
* @param priority the job priority. Lower number indicates higher priority.
* @param mayInterruptIfRunning the flag indicating if the running job may be interrupted.
*/
public BackgroundJob(int priority, boolean mayInterruptIfRunning) {
super(List$.MODULE$.empty(), true, mayInterruptIfRunning);
this.priority = priority;
}

/**
* @return the job priority.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ final class StartBackgroundProcessingCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
StartBackgroundProcessingJob.startBackgroundJobs()
Future.successful(())
Future(StartBackgroundProcessingJob.startBackgroundJobs())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.jdk.CollectionConverters._
*/
final class DeserializeLibrarySuggestionsJob(
val libraryName: LibraryName
) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority)
) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority, true)
with UniqueJob[Unit] {

/** @inheritdoc */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,7 @@ final class SqlSuggestionsRepo(val db: SqlDatabase)(implicit

/** The query to clean the repo. */
private def cleanQuery: DBIO[Unit] = {
for {
_ <- Suggestions.delete
_ <- SuggestionsVersion.delete
} yield ()
DBIO.seq(Suggestions.delete, SuggestionsVersion.delete)
}

/** The query to get all suggestions.
Expand Down

0 comments on commit 9c982e0

Please sign in to comment.