Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize suggestions loading after the reconnect 2 #9142

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ object SearchProtocol {
updates: Seq[SuggestionsDatabaseUpdate]
)

/** A request to clean the suggestions database. */
case object CleanSuggestionsDatabase
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since it is a request, I'd rather we name it ClearSuggestionsDatabase


/** The request to receive contents of the suggestions database. */
case object GetSuggestionsDatabase

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ final class SuggestionsHandler(
)
context.system.eventStream
.subscribe(self, classOf[Api.LibraryLoaded])
context.system.eventStream
.subscribe(self, classOf[Api.BackgroundJobsStartedNotification])
context.system.eventStream.subscribe(self, classOf[FileDeletedEvent])
context.system.eventStream
.subscribe(self, InitializedEvent.SuggestionsRepoInitialized.getClass)
Expand Down Expand Up @@ -186,13 +188,29 @@ final class SuggestionsHandler(

case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification
if state.isSuggestionLoadingRunning =>
state.suggestionLoadingQueue.enqueue(msg)
logger.trace(
"SuggestionsDatabaseSuggestionsLoadedNotification [shouldStartBackgroundProcessing={}].",
state.shouldStartBackgroundProcessing
)
if (state.shouldStartBackgroundProcessing) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was the main source of the problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to properly synchronize the state of background notifications in runtime

case Api.BackgroundJobsStartedNotification() =>
self ! SuggestionLoadingCompleted
context.become(
initialized(
projectName,
graph,
clients,
state.backgroundProcessingStarted()
)
)

state.suggestionLoadingQueue.clear()
} else {
state.suggestionLoadingQueue.enqueue(msg)
}

case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification =>
logger.debug(
"Starting loading suggestions for library [{}].",
msg.libraryName
)
context.become(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change isn't strictly necessary since applyLoadedSuggestions is a Future, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to change the state before starting loading the suggestions

initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)
applyLoadedSuggestions(msg.suggestions)
.onComplete {
case Success(notification) =>
Expand All @@ -214,14 +232,6 @@ final class SuggestionsHandler(
)
self ! SuggestionsHandler.SuggestionLoadingCompleted
}
context.become(
initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)

case msg: Api.SuggestionsDatabaseModuleUpdateNotification
if state.isSuggestionUpdatesRunning =>
Expand Down Expand Up @@ -304,39 +314,65 @@ final class SuggestionsHandler(
)
)

case Api.BackgroundJobsStartedNotification() =>
self ! SuggestionLoadingCompleted
context.become(
initialized(
projectName,
graph,
clients,
state.backgroundProcessingStarted()
)
)

case GetSuggestionsDatabaseVersion =>
suggestionsRepo.currentVersion
.map(GetSuggestionsDatabaseVersionResult)
.pipeTo(sender())

case GetSuggestionsDatabase =>
val responseAction = for {
_ <- suggestionsRepo.clean
version <- suggestionsRepo.currentVersion
} yield GetSuggestionsDatabaseResult(version, Seq())

responseAction.pipeTo(sender())

val handlerAction = for {
_ <- responseAction
} yield SearchProtocol.InvalidateModulesIndex
case CleanSuggestionsDatabase =>
if (state.isSuggestionLoadingRunning) stash()
else {
context.become(
initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)
for {
_ <- suggestionsRepo.clean
} yield {
logger.trace(
"CleanSuggestionsDatabase [{}].",
state.suggestionLoadingQueue
)
state.suggestionLoadingQueue.clear()
runtimeConnector ! Api.Request(Api.StartBackgroundProcessing())
}
}

case GetSuggestionsDatabase =>
val handler = context.system.actorOf(
InvalidateModulesIndexHandler.props(
RuntimeFailureMapper(contentRootManager),
timeout,
runtimeConnector
runtimeConnector,
self
)
)

handlerAction.pipeTo(handler)
handler ! SearchProtocol.InvalidateModulesIndex

sender() ! GetSuggestionsDatabaseResult(0, Seq())

context.become(
initialized(
projectName,
graph,
clients,
state.backgroundProcessingStarted()
state.backgroundProcessingStopped()
)
)

Expand Down Expand Up @@ -419,7 +455,8 @@ final class SuggestionsHandler(
InvalidateModulesIndexHandler.props(
runtimeFailureMapper,
timeout,
runtimeConnector
runtimeConnector,
self
)
)
action.pipeTo(handler)(sender())
Expand Down Expand Up @@ -450,6 +487,7 @@ final class SuggestionsHandler(
)

case SuggestionLoadingCompleted =>
unstashAll()
if (state.suggestionLoadingQueue.nonEmpty) {
self ! state.suggestionLoadingQueue.dequeue()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import scala.concurrent.duration.FiniteDuration
* @param runtimeFailureMapper mapper for runtime failures
* @param timeout request timeout
* @param runtime reference to the runtime connector
* @param suggestionsHandler reference to the suggestions handler
*/
final class InvalidateModulesIndexHandler(
runtimeFailureMapper: RuntimeFailureMapper,
timeout: FiniteDuration,
runtime: ActorRef
runtime: ActorRef,
suggestionsHandler: ActorRef
) extends Actor
with LazyLogging
with UnhandledLogging {
Expand Down Expand Up @@ -50,6 +52,7 @@ final class InvalidateModulesIndexHandler(
context.stop(self)

case Api.Response(_, Api.InvalidateModulesIndexResponse()) =>
suggestionsHandler ! SearchProtocol.CleanSuggestionsDatabase
replyTo ! SearchProtocol.InvalidateSuggestionsDatabaseResult
cancellable.cancel()
context.stop(self)
Expand All @@ -67,14 +70,21 @@ object InvalidateModulesIndexHandler {
*
* @param runtimeFailureMapper mapper for runtime failures
* @param timeout request timeout
* @param runtime reference to the runtime conector
* @param runtime reference to the runtime connector
* @param suggestionsHandler reference to the suggestions handler
*/
def props(
runtimeFailureMapper: RuntimeFailureMapper,
timeout: FiniteDuration,
runtime: ActorRef
runtime: ActorRef,
suggestionsHandler: ActorRef
): Props =
Props(
new InvalidateModulesIndexHandler(runtimeFailureMapper, timeout, runtime)
new InvalidateModulesIndexHandler(
runtimeFailureMapper,
timeout,
runtime,
suggestionsHandler
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.logging.Level;
import org.enso.interpreter.instrument.execution.RuntimeContext;
import org.enso.interpreter.instrument.job.DeserializeLibrarySuggestionsJob;
import org.enso.interpreter.instrument.job.StartBackgroundProcessingJob;
import org.enso.interpreter.runtime.EnsoContext;
import org.enso.polyglot.runtime.Runtime$Api$InvalidateModulesIndexResponse;
import scala.Option;
Expand Down Expand Up @@ -33,6 +32,7 @@ public Future<BoxedUnit> executeAsynchronously(RuntimeContext ctx, ExecutionCont
TruffleLogger logger = ctx.executionService().getLogger();
long writeCompilationLockTimestamp = ctx.locking().acquireWriteCompilationLock();
try {
ctx.jobControlPlane().stopBackgroundJobs();
ctx.jobControlPlane().abortBackgroundJobs(DeserializeLibrarySuggestionsJob.class);

EnsoContext context = ctx.executionService().getContext();
Expand All @@ -48,7 +48,6 @@ public Future<BoxedUnit> executeAsynchronously(RuntimeContext ctx, ExecutionCont
return BoxedUnit.UNIT;
});

StartBackgroundProcessingJob.startBackgroundJobs(ctx);
reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx);
} finally {
ctx.locking().releaseWriteCompilationLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ public BackgroundJob(int priority) {
this.priority = priority;
}

/**
* Create a background job with priority.
*
* @param priority the job priority. Lower number indicates higher priority.
* @param mayInterruptIfRunning the flag indicating if the running job may be interrupted.
*/
public BackgroundJob(int priority, boolean mayInterruptIfRunning) {
super(List$.MODULE$.empty(), true, mayInterruptIfRunning);
this.priority = priority;
}

/**
* @return the job priority.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ final class StartBackgroundProcessingCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
StartBackgroundProcessingJob.startBackgroundJobs()
Future.successful(())
Future(StartBackgroundProcessingJob.startBackgroundJobs())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.jdk.CollectionConverters._
*/
final class DeserializeLibrarySuggestionsJob(
val libraryName: LibraryName
) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority)
) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority, true)
with UniqueJob[Unit] {

/** @inheritdoc */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,7 @@ final class SqlSuggestionsRepo(val db: SqlDatabase)(implicit

/** The query to clean the repo. */
private def cleanQuery: DBIO[Unit] = {
for {
_ <- Suggestions.delete
_ <- SuggestionsVersion.delete
} yield ()
DBIO.seq(Suggestions.delete, SuggestionsVersion.delete)
}

/** The query to get all suggestions.
Expand Down
Loading