Skip to content

Commit

Permalink
Merge branch 'main' into bazel-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jkciesluk committed Jan 2, 2024
2 parents 96071fa + bce412e commit e5ec695
Show file tree
Hide file tree
Showing 17 changed files with 582 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ final class BspServers(
client,
newConnection,
tables.dismissedNotifications.ReconnectBsp,
tables.dismissedNotifications.RequestTimeout,
config,
details.getName(),
bspStatusOpt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ final class BloopServers(
() =>
connectToLauncher(bloopVersion, config.bloopPort, userConfiguration),
tables.dismissedNotifications.ReconnectBsp,
tables.dismissedNotifications.RequestTimeout,
config,
name,
bspStatusOpt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContextExecutorService
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.util.Success
import scala.util.Try

import scala.meta.internal.bsp.ConnectionBspStatus
import scala.meta.internal.builds.BazelBuildTool
Expand All @@ -30,6 +30,8 @@ import scala.meta.internal.metals.MetalsEnrichments._
import scala.meta.internal.metals.ammonite.Ammonite
import scala.meta.internal.metals.clients.language.MetalsLanguageClient
import scala.meta.internal.metals.scalacli.ScalaCli
import scala.meta.internal.metals.utils.RequestRegistry
import scala.meta.internal.metals.utils.Timeout
import scala.meta.internal.pc.InterruptException
import scala.meta.internal.semver.SemVer
import scala.meta.io.AbsolutePath
Expand All @@ -52,12 +54,17 @@ class BuildServerConnection private (
initialConnection: BuildServerConnection.LauncherConnection,
languageClient: LanguageClient,
reconnectNotification: DismissedNotifications#Notification,
requestTimeOutNotification: DismissedNotifications#Notification,
config: MetalsServerConfig,
workspace: AbsolutePath,
supportsWrappedSources: Boolean,
)(implicit ec: ExecutionContextExecutorService)
extends Cancelable {

private val defaultTimeout = Some(
Timeout.default(FiniteDuration(3, TimeUnit.MINUTES))
)

@volatile private var connection = Future.successful(initialConnection)
initialConnection.setReconnect(() => reconnect().ignoreValue)
private def reestablishConnection(
Expand All @@ -67,6 +74,13 @@ class BuildServerConnection private (
setupConnection()
}

val requestRegistry =
new RequestRegistry(
initialConnection.cancelables,
languageClient,
Some(requestTimeOutNotification),
)

private val isShuttingDown = new AtomicBoolean(false)
private val onReconnection =
new AtomicReference[BuildServerConnection => Future[Unit]](_ =>
Expand All @@ -75,9 +89,6 @@ class BuildServerConnection private (

private val _version = new AtomicReference(initialConnection.version)

private val ongoingRequests =
new MutableCancelable().addAll(initialConnection.cancelables)

def version: String = _version.get()

// the name is set before when establishing connection
Expand Down Expand Up @@ -173,7 +184,10 @@ class BuildServerConnection private (
}
}

def compile(params: CompileParams): CompletableFuture[CompileResult] = {
def compile(
params: CompileParams,
timeout: Option[Timeout],
): CompletableFuture[CompileResult] = {
register(
server => server.buildTargetCompile(params),
onFail = Some(
Expand All @@ -182,6 +196,7 @@ class BuildServerConnection private (
s"Cancelling compilation on ${name} server",
)
),
timeout = timeout,
)
}

Expand Down Expand Up @@ -214,6 +229,7 @@ class BuildServerConnection private (
register(
server => server.buildTargetScalaMainClasses(params),
onFail,
defaultTimeout,
).asScala
} else Future.successful(resultOnUnsupported)

Expand All @@ -233,6 +249,7 @@ class BuildServerConnection private (
register(
server => server.buildTargetScalaTestClasses(params),
onFail,
defaultTimeout,
).asScala
} else Future.successful(resultOnUnsupported)
}
Expand Down Expand Up @@ -365,7 +382,7 @@ class BuildServerConnection private (

override def cancel(): Unit = {
if (cancelled.compareAndSet(false, true)) {
ongoingRequests.cancel()
requestRegistry.cancel()
}
}

Expand Down Expand Up @@ -401,7 +418,7 @@ class BuildServerConnection private (
connection = askUser(original).map { conn =>
// version can change when reconnecting
_version.set(conn.version)
ongoingRequests.addAll(conn.cancelables)
requestRegistry.addOngoingRequest(conn.cancelables)
conn.setReconnect(() => reconnect().ignoreValue)
conn
}
Expand All @@ -418,26 +435,21 @@ class BuildServerConnection private (
private def register[T: ClassTag](
action: MetalsBuildServer => CompletableFuture[T],
onFail: => Option[(T, String)] = None,
timeout: Option[Timeout] = None,
): CompletableFuture[T] = {
val localCancelable = new MutableCancelable()
def runWithCanceling(
launcherConnection: BuildServerConnection.LauncherConnection
): Future[T] = {
val resultFuture = action(launcherConnection.server)
val cancelable = Cancelable { () =>
Try(resultFuture.cancel(true))
}
ongoingRequests.add(cancelable)
val CancelableFuture(result, cancelable) = requestRegistry.register(
action = () => action(launcherConnection.server),
timeout = timeout,
)
localCancelable.add(cancelable)

val result = resultFuture.asScala

result.onComplete { _ =>
ongoingRequests.remove(cancelable)
localCancelable.remove(cancelable)
}
result.onComplete(_ => localCancelable.remove(cancelable))
result
}

val original = connection
val actionFuture = original
.flatMap { launcherConnection =>
Expand Down Expand Up @@ -500,6 +512,7 @@ object BuildServerConnection {
localClient: MetalsBuildClient,
languageClient: MetalsLanguageClient,
connect: () => Future[SocketConnection],
requestTimeOutNotification: DismissedNotifications#Notification,
reconnectNotification: DismissedNotifications#Notification,
config: MetalsServerConfig,
serverName: String,
Expand Down Expand Up @@ -572,6 +585,7 @@ object BuildServerConnection {
setupServer,
connection,
languageClient,
requestTimeOutNotification,
reconnectNotification,
config,
projectRoot,
Expand All @@ -588,6 +602,7 @@ object BuildServerConnection {
languageClient,
connect,
reconnectNotification,
requestTimeOutNotification,
config,
serverName,
bspStatusOpt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package scala.meta.internal.metals

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Try

case class CancelableFuture[T](
future: Future[T],
cancelable: Cancelable = Cancelable.empty,
) extends Cancelable {
def map[U](f: T => U)(implicit ec: ExecutionContext): CancelableFuture[U] =
CancelableFuture(future.map(f), cancelable)
def transform[U](f: Try[T] => Try[U])(implicit
ec: ExecutionContext
): CancelableFuture[U] =
CancelableFuture(future.transform(f), cancelable)
def cancel(): Unit = {
cancelable.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package scala.meta.internal.metals

import java.util.UUID
import java.util.concurrent.TimeUnit

import scala.collection.concurrent.TrieMap
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import scala.meta.internal.metals.MetalsEnrichments._
import scala.meta.internal.metals.clients.language.MetalsLanguageClient
import scala.meta.internal.metals.debug.BuildTargetClasses
import scala.meta.internal.metals.utils.Timeout
import scala.meta.io.AbsolutePath

import ch.epfl.scala.bsp4j.BuildTargetIdentifier
Expand All @@ -29,18 +32,31 @@ final class Compilations(
onStartCompilation: () => Unit,
userConfiguration: () => UserConfiguration,
)(implicit ec: ExecutionContext) {

private val compileTimeout: Timeout =
Timeout("compile", Duration(10, TimeUnit.MINUTES))
private val cascadeTimeout: Timeout =
Timeout("cascade compile", Duration(15, TimeUnit.MINUTES))
// we are maintaining a separate queue for cascade compilation since those must happen ASAP
private val compileBatch =
new BatchedFunction[
b.BuildTargetIdentifier,
Map[BuildTargetIdentifier, b.CompileResult],
](compile, "compileBatch", shouldLogQueue = true, Some(Map.empty))
](
compile(timeout = Some(compileTimeout)),
"compileBatch",
shouldLogQueue = true,
Some(Map.empty),
)
private val cascadeBatch =
new BatchedFunction[
b.BuildTargetIdentifier,
Map[BuildTargetIdentifier, b.CompileResult],
](compile, "cascadeBatch", shouldLogQueue = true, Some(Map.empty))
](
compile(timeout = Some(cascadeTimeout)),
"cascadeBatch",
shouldLogQueue = true,
Some(Map.empty),
)
def pauseables: List[Pauseable] = List(compileBatch, cascadeBatch)

private val isCompiling = TrieMap.empty[b.BuildTargetIdentifier, Boolean]
Expand Down Expand Up @@ -152,7 +168,7 @@ final class Compilations(
for {
cleanResult <- cleaned
if cleanResult.getCleaned() == true
_ <- compile(targetIds).future
_ <- compile(timeout = None)(targetIds).future
} yield ()
}

Expand Down Expand Up @@ -190,7 +206,7 @@ final class Compilations(
Future.sequence(expansions).map(_.flatten)
}

private def compile(
private def compile(timeout: Option[Timeout])(
targets: Seq[b.BuildTargetIdentifier]
): CancelableFuture[Map[BuildTargetIdentifier, b.CompileResult]] = {

Expand All @@ -215,11 +231,11 @@ final class Compilations(
.successful(Map.empty[BuildTargetIdentifier, b.CompileResult])
.asCancelable
case (buildServer, targets) :: Nil =>
compile(buildServer, targets)
compile(buildServer, targets, timeout)
.map(res => targets.map(target => target -> res).toMap)
case targetList =>
val futures = targetList.map { case (buildServer, targets) =>
compile(buildServer, targets).map(res =>
compile(buildServer, targets, timeout).map(res =>
targets.map(target => target -> res)
)
}
Expand All @@ -229,6 +245,7 @@ final class Compilations(
private def compile(
connection: BuildServerConnection,
targets: Seq[b.BuildTargetIdentifier],
timeout: Option[Timeout],
): CancelableFuture[b.CompileResult] = {
val originId = "METALS-$" + UUID.randomUUID().toString
val params = new b.CompileParams(targets.asJava)
Expand All @@ -239,7 +256,7 @@ final class Compilations(
params.setArguments(List("--verbose").asJava)
}
targets.foreach(target => isCompiling(target) = true)
val compilation = connection.compile(params)
val compilation = connection.compile(params, timeout)

onStartCompilation()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ final class DismissedNotifications(conn: () => Connection, time: Time) {
val ReconnectScalaCli = new Notification(13)
val ScalaCliImportAuto = new Notification(14)
val BspErrors = new Notification(15)
val RequestTimeout = new Notification(16)

val all: List[Notification] = List(
Only212Navigation,
Expand Down
23 changes: 23 additions & 0 deletions metals/src/main/scala/scala/meta/internal/metals/Messages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,29 @@ object Messages {
}
}

object RequestTimeout {

val cancel = new MessageActionItem("Cancel")
val waitAction = new MessageActionItem("Wait")
val waitAlways = new MessageActionItem("WaitAlways")

def params(actionName: String, minutes: Int): ShowMessageRequestParams = {
val params = new ShowMessageRequestParams()
params.setMessage(
s"$actionName request is taking longer than expected (over $minutes minutes), do you want to cancel and rerun it?"
)
params.setType(MessageType.Info)
params.setActions(
List(
cancel,
waitAction,
waitAlways,
).asJava
)
params
}
}

}

object FileOutOfScalaCliBspScope {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,12 @@ object MetalsEnrichments

def withTimeout(length: Int, unit: TimeUnit)(implicit
ec: ExecutionContext
): Future[A] = {
Future(Await.result(future, FiniteDuration(length, unit)))
): Future[A] = withTimeout(FiniteDuration(length, unit))

def withTimeout(
duration: FiniteDuration
)(implicit ec: ExecutionContext): Future[A] = {
Future(Await.result(future, duration))
}

def onTimeout(length: Int, unit: TimeUnit)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ final class Ammonite(
workspace(),
),
tables.dismissedNotifications.ReconnectAmmonite,
tables.dismissedNotifications.RequestTimeout,
config,
"Ammonite",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class ScalaCli(
languageClient,
() => ScalaCli.socketConn(command, connDir),
tables.dismissedNotifications.ReconnectScalaCli,
tables.dismissedNotifications.RequestTimeout,
config(),
"Scala CLI",
supportsWrappedSources = Some(true),
Expand Down
Loading

0 comments on commit e5ec695

Please sign in to comment.