Skip to content

Commit

Permalink
Unify critical session running in hls (#4256)
Browse files Browse the repository at this point in the history
* add thread to do shake restart
* run session loader in thread

---------

Co-authored-by: Michael Peyton Jones <[email protected]>
  • Loading branch information
soulomoon and michaelpj authored Jun 8, 2024
1 parent 7563439 commit 82da337
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 78 deletions.
1 change: 1 addition & 0 deletions ghcide/ghcide.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ library
Development.IDE.Core.Shake
Development.IDE.Core.Tracing
Development.IDE.Core.UseStale
Development.IDE.Core.WorkerThread
Development.IDE.GHC.Compat
Development.IDE.GHC.Compat.Core
Development.IDE.GHC.Compat.CmdLine
Expand Down
51 changes: 22 additions & 29 deletions ghcide/session-loader/Development/IDE/Session.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ The logic for setting up a ghcide session by tapping into hie-bios.
module Development.IDE.Session
(SessionLoadingOptions(..)
,CacheDirs(..)
,loadSession
,loadSessionWithOptions
,setInitialDynFlags
,getHieDbLoc
,runWithDb
,retryOnSqliteBusy
,retryOnException
,Log(..)
,runWithDb
) where

-- Unfortunately, we cannot use loadSession with ghc-lib since hie-bios uses
-- the real GHC library and the types are incompatible. Furthermore, when
-- building with ghc-lib we need to make this Haskell agnostic, so no hie-bios!

import Control.Concurrent.Async
import Control.Concurrent.Strict
import Control.Exception.Safe as Safe
import Control.Monad
Expand Down Expand Up @@ -100,14 +98,19 @@ import Control.Concurrent.STM.TQueue
import Control.DeepSeq
import Control.Exception (evaluate)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Trans.Cont (ContT (ContT, runContT))
import Data.Foldable (for_)
import Data.HashMap.Strict (HashMap)
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Database.SQLite.Simple
import Development.IDE.Core.Tracing (withTrace)
import Development.IDE.Core.WorkerThread (awaitRunInThread,
withWorkerQueue)
import Development.IDE.Session.Diagnostics (renderCradleError)
import Development.IDE.Types.Shake (WithHieDb, toNoFileKey)
import Development.IDE.Types.Shake (WithHieDb,
WithHieDbShield (..),
toNoFileKey)
import HieDb.Create
import HieDb.Types
import HieDb.Utils
Expand Down Expand Up @@ -375,8 +378,10 @@ makeWithHieDbRetryable recorder rng hieDb f =
-- writing. Actions are picked off one by one from the `HieWriterChan` and executed in serial
-- by a worker thread using a dedicated database connection.
-- This is done in order to serialize writes to the database, or else SQLite becomes unhappy
runWithDb :: Recorder (WithPriority Log) -> FilePath -> (WithHieDb -> IndexQueue -> IO ()) -> IO ()
runWithDb recorder fp k = do
--
-- Also see Note [Serializing runs in separate thread]
runWithDb :: Recorder (WithPriority Log) -> FilePath -> ContT () IO (WithHieDbShield, IndexQueue)
runWithDb recorder fp = ContT $ \k -> do
-- use non-deterministic seed because maybe multiple HLS start at same time
-- and send bursts of requests
rng <- Random.newStdGen
Expand All @@ -394,18 +399,15 @@ runWithDb recorder fp k = do
withWriteDbRetryable = makeWithHieDbRetryable recorder rng writedb
withWriteDbRetryable initConn

chan <- newTQueueIO

withAsync (writerThread withWriteDbRetryable chan) $ \_ -> do
withHieDb fp (\readDb -> k (makeWithHieDbRetryable recorder rng readDb) chan)
-- Clear the index of any files that might have been deleted since the last run
_ <- withWriteDbRetryable deleteMissingRealFiles
_ <- withWriteDbRetryable garbageCollectTypeNames

runContT (withWorkerQueue (writer withWriteDbRetryable)) $ \chan ->
withHieDb fp (\readDb -> k (WithHieDbShield $ makeWithHieDbRetryable recorder rng readDb, chan))
where
writerThread :: WithHieDb -> IndexQueue -> IO ()
writerThread withHieDbRetryable chan = do
-- Clear the index of any files that might have been deleted since the last run
_ <- withHieDbRetryable deleteMissingRealFiles
_ <- withHieDbRetryable garbageCollectTypeNames
forever $ do
l <- atomically $ readTQueue chan
writer withHieDbRetryable l = do
-- TODO: probably should let exceptions be caught/logged/handled by top level handler
l withHieDbRetryable
`Safe.catch` \e@SQLError{} -> do
Expand Down Expand Up @@ -435,11 +437,9 @@ getHieDbLoc dir = do
-- This is the key function which implements multi-component support. All
-- components mapping to the same hie.yaml file are mapped to the same
-- HscEnv which is updated as new components are discovered.
loadSession :: Recorder (WithPriority Log) -> FilePath -> IO (Action IdeGhcSession)
loadSession recorder = loadSessionWithOptions recorder def

loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> IO (Action IdeGhcSession)
loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir = do
loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> TQueue (IO ()) -> IO (Action IdeGhcSession)
loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir que = do
let toAbsolutePath = toAbsolute rootDir -- see Note [Root Directory]
cradle_files <- newIORef []
-- Mapping from hie.yaml file to HscEnv, one per hie.yaml file
Expand All @@ -464,9 +464,6 @@ loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir = do
let res' = toAbsolutePath <$> res
return $ normalise <$> res'

dummyAs <- async $ return (error "Uninitialised")
runningCradle <- newVar dummyAs :: IO (Var (Async (IdeResult HscEnvEq,[FilePath])))

return $ do
clientConfig <- getClientConfigAction
extras@ShakeExtras{restartShakeSession, ideNc, knownTargetsVar, lspEnv
Expand Down Expand Up @@ -739,12 +736,8 @@ loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir = do
return (([renderPackageSetupException file e], Nothing), maybe [] pure hieYaml)

returnWithVersion $ \file -> do
opts <- join $ mask_ $ modifyVar runningCradle $ \as -> do
-- If the cradle is not finished, then wait for it to finish.
void $ wait as
asyncRes <- async $ getOptions file
return (asyncRes, wait asyncRes)
pure opts
-- see Note [Serializing runs in separate thread]
awaitRunInThread que $ getOptions file

-- | Run the specific cradle on a specific FilePath via hie-bios.
-- This then builds dependencies or whatever based on the cradle, gets the
Expand Down
3 changes: 2 additions & 1 deletion ghcide/src/Development/IDE/Core/Service.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ instance Pretty Log where
LogOfInterest msg -> pretty msg
LogFileExists msg -> pretty msg


------------------------------------------------------------
-- Exposed API

Expand All @@ -65,7 +66,7 @@ initialise :: Recorder (WithPriority Log)
-> Debouncer LSP.NormalizedUri
-> IdeOptions
-> WithHieDb
-> IndexQueue
-> ThreadQueue
-> Monitoring
-> FilePath -- ^ Root directory see Note [Root Directory]
-> IO IdeState
Expand Down
64 changes: 41 additions & 23 deletions ghcide/src/Development/IDE/Core/Shake.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ module Development.IDE.Core.Shake(
garbageCollectDirtyKeysOlderThan,
Log(..),
VFSModified(..), getClientConfigAction,
ThreadQueue(..)
) where

import Control.Concurrent.Async
Expand Down Expand Up @@ -123,6 +124,7 @@ import Development.IDE.Core.PositionMapping
import Development.IDE.Core.ProgressReporting
import Development.IDE.Core.RuleTypes
import Development.IDE.Core.Tracing
import Development.IDE.Core.WorkerThread
import Development.IDE.GHC.Compat (NameCache,
initNameCache,
knownKeyNames)
Expand Down Expand Up @@ -262,6 +264,12 @@ data HieDbWriter
-- with (currently) retry functionality
type IndexQueue = TQueue (((HieDb -> IO ()) -> IO ()) -> IO ())

data ThreadQueue = ThreadQueue {
tIndexQueue :: IndexQueue
, tRestartQueue :: TQueue (IO ())
, tLoaderQueue :: TQueue (IO ())
}

-- Note [Semantic Tokens Cache Location]
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- storing semantic tokens cache for each file in shakeExtras might
Expand Down Expand Up @@ -334,6 +342,10 @@ data ShakeExtras = ShakeExtras
-- ^ Default HLS config, only relevant if the client does not provide any Config
, dirtyKeys :: TVar KeySet
-- ^ Set of dirty rule keys since the last Shake run
, restartQueue :: TQueue (IO ())
-- ^ Queue of restart actions to be run.
, loaderQueue :: TQueue (IO ())
-- ^ Queue of loader actions to be run.
}

type WithProgressFunc = forall a.
Expand Down Expand Up @@ -648,7 +660,7 @@ shakeOpen :: Recorder (WithPriority Log)
-> IdeReportProgress
-> IdeTesting
-> WithHieDb
-> IndexQueue
-> ThreadQueue
-> ShakeOptions
-> Monitoring
-> Rules ()
Expand All @@ -658,8 +670,12 @@ shakeOpen :: Recorder (WithPriority Log)
-> IO IdeState
shakeOpen recorder lspEnv defaultConfig idePlugins debouncer
shakeProfileDir (IdeReportProgress reportProgress)
ideTesting@(IdeTesting testing)
withHieDb indexQueue opts monitoring rules rootDir = mdo
ideTesting
withHieDb threadQueue opts monitoring rules rootDir = mdo
-- see Note [Serializing runs in separate thread]
let indexQueue = tIndexQueue threadQueue
restartQueue = tRestartQueue threadQueue
loaderQueue = tLoaderQueue threadQueue

#if MIN_VERSION_ghc(9,3,0)
ideNc <- initNameCache 'r' knownKeyNames
Expand Down Expand Up @@ -784,31 +800,33 @@ delayedAction a = do
extras <- ask
liftIO $ shakeEnqueue extras a


-- | Restart the current 'ShakeSession' with the given system actions.
-- Any actions running in the current session will be aborted,
-- but actions added via 'shakeEnqueue' will be requeued.
shakeRestart :: Recorder (WithPriority Log) -> IdeState -> VFSModified -> String -> [DelayedAction ()] -> IO [Key] -> IO ()
shakeRestart recorder IdeState{..} vfs reason acts ioActionBetweenShakeSession =
withMVar'
shakeSession
(\runner -> do
(stopTime,()) <- duration $ logErrorAfter 10 $ cancelShakeSession runner
keys <- ioActionBetweenShakeSession
-- it is every important to update the dirty keys after we enter the critical section
-- see Note [Housekeeping rule cache and dirty key outside of hls-graph]
atomically $ modifyTVar' (dirtyKeys shakeExtras) $ \x -> foldl' (flip insertKeySet) x keys
res <- shakeDatabaseProfile shakeDb
backlog <- readTVarIO $ dirtyKeys shakeExtras
queue <- atomicallyNamed "actionQueue - peek" $ peekInProgress $ actionQueue shakeExtras

-- this log is required by tests
logWith recorder Debug $ LogBuildSessionRestart reason queue backlog stopTime res
)
-- It is crucial to be masked here, otherwise we can get killed
-- between spawning the new thread and updating shakeSession.
-- See https://github.com/haskell/ghcide/issues/79
(\() -> do
(,()) <$> newSession recorder shakeExtras vfs shakeDb acts reason)
void $ awaitRunInThread (restartQueue shakeExtras) $ do
withMVar'
shakeSession
(\runner -> do
(stopTime,()) <- duration $ logErrorAfter 10 $ cancelShakeSession runner
keys <- ioActionBetweenShakeSession
-- it is every important to update the dirty keys after we enter the critical section
-- see Note [Housekeeping rule cache and dirty key outside of hls-graph]
atomically $ modifyTVar' (dirtyKeys shakeExtras) $ \x -> foldl' (flip insertKeySet) x keys
res <- shakeDatabaseProfile shakeDb
backlog <- readTVarIO $ dirtyKeys shakeExtras
queue <- atomicallyNamed "actionQueue - peek" $ peekInProgress $ actionQueue shakeExtras

-- this log is required by tests
logWith recorder Debug $ LogBuildSessionRestart reason queue backlog stopTime res
)
-- It is crucial to be masked here, otherwise we can get killed
-- between spawning the new thread and updating shakeSession.
-- See https://github.com/haskell/ghcide/issues/79
(\() -> do
(,()) <$> newSession recorder shakeExtras vfs shakeDb acts reason)
where
logErrorAfter :: Seconds -> IO () -> IO ()
logErrorAfter seconds action = flip withAsync (const action) $ do
Expand Down
54 changes: 54 additions & 0 deletions ghcide/src/Development/IDE/Core/WorkerThread.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{-
Module : Development.IDE.Core.WorkerThread
Author : @soulomoon
SPDX-License-Identifier: Apache-2.0
Description : This module provides an API for managing worker threads in the IDE.
see Note [Serializing runs in separate thread]
-}
module Development.IDE.Core.WorkerThread
(withWorkerQueue, awaitRunInThread)
where

import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM
import Control.Concurrent.Strict (newBarrier, signalBarrier,
waitBarrier)
import Control.Monad (forever)
import Control.Monad.Cont (ContT (ContT))

{-
Note [Serializing runs in separate thread]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We often want to take long-running actions using some resource that cannot be shared.
In this instance it is useful to have a queue of jobs to run using the resource.
Like the db writes, session loading in session loader, shake session restarts.
Originally we used various ways to implement this, but it was hard to maintain and error prone.
Moreover, we can not stop these threads uniformly when we are shutting down the server.
-}

-- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker
-- thread which polls the queue for requests and runs the given worker
-- function on them.
withWorkerQueue :: (t -> IO a) -> ContT () IO (TQueue t)
withWorkerQueue workerAction = ContT $ \mainAction -> do
q <- newTQueueIO
withAsync (writerThread q) $ \_ -> mainAction q
where
writerThread q =
forever $ do
l <- atomically $ readTQueue q
workerAction l

-- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread,
-- and then blocks until the result is computed.
awaitRunInThread :: TQueue (IO ()) -> IO result -> IO result
awaitRunInThread q act = do
-- Take an action from TQueue, run it and
-- use barrier to wait for the result
barrier <- newBarrier
atomically $ writeTQueue q $ do
res <- act
signalBarrier barrier res
waitBarrier barrier
Loading

0 comments on commit 82da337

Please sign in to comment.