Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify critical session running in hls #4256

Merged
merged 38 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
15f9892
add thread to do shake restart
soulomoon May 26, 2024
3ba47f6
fix
soulomoon May 26, 2024
5d66041
run session loader in thread
soulomoon May 26, 2024
d7946a0
fix 9.2
soulomoon May 26, 2024
da56bfb
rename
soulomoon May 27, 2024
fb0a370
use evalContT
soulomoon May 27, 2024
d1775e6
add doc
soulomoon May 27, 2024
39bdf6a
fix doc
soulomoon May 27, 2024
b06186b
fix import
soulomoon May 27, 2024
1a9374b
export explicit
soulomoon May 27, 2024
c9bdc87
add comment
soulomoon May 27, 2024
cb131e3
cleanup
soulomoon May 27, 2024
96d6d07
cleanup
soulomoon May 27, 2024
b552c80
fix note
soulomoon May 27, 2024
aef173a
add `blockRunInThread`
soulomoon May 27, 2024
f231648
Merge branch 'master' into soulomoon/add-threads
soulomoon May 27, 2024
d08f175
fix
soulomoon May 27, 2024
60839b0
fix 9.2 import
soulomoon May 27, 2024
aba6a88
Merge branch 'master' into soulomoon/add-threads
soulomoon May 28, 2024
981724e
Merge branch 'master' into soulomoon/add-threads
soulomoon May 29, 2024
8f9ef7a
Merge branch 'master' into soulomoon/add-threads
soulomoon May 29, 2024
0e3a6e8
Merge branch 'master' into soulomoon/add-threads
soulomoon Jun 3, 2024
a1b0a69
Merge branch 'master' into soulomoon/add-threads
soulomoon Jun 4, 2024
78e9fc1
Merge branch 'master' into soulomoon/add-threads
soulomoon Jun 6, 2024
b2be89f
Update ghcide/src/Development/IDE/Core/Thread.hs
soulomoon Jun 8, 2024
8aea82e
refactor to withWorkerQueue
soulomoon Jun 8, 2024
8c3773f
typo
soulomoon Jun 8, 2024
5f27fad
ident
soulomoon Jun 8, 2024
e800cac
Improve Note
soulomoon Jun 8, 2024
86d7fb9
add comment
soulomoon Jun 8, 2024
027e5be
format
soulomoon Jun 8, 2024
99322fa
Update WorkerThread.hs
soulomoon Jun 8, 2024
c1b3e7d
Update WorkerThread.hs
soulomoon Jun 8, 2024
a16d04a
rename to await
soulomoon Jun 8, 2024
c832da3
use block comment
soulomoon Jun 8, 2024
6bdba37
merge
soulomoon Jun 8, 2024
442e776
Remove duplicated comment
soulomoon Jun 8, 2024
5d657b6
add file header comment
soulomoon Jun 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ghcide/ghcide.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ library
Development.IDE.Core.Shake
Development.IDE.Core.Tracing
Development.IDE.Core.UseStale
Development.IDE.Core.Thread
Development.IDE.GHC.Compat
Development.IDE.GHC.Compat.Core
Development.IDE.GHC.Compat.CmdLine
Expand Down
87 changes: 36 additions & 51 deletions ghcide/session-loader/Development/IDE/Session.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@
module Development.IDE.Session
(SessionLoadingOptions(..)
,CacheDirs(..)
,loadSession
,loadSessionWithOptions
,setInitialDynFlags
,getHieDbLoc
,runWithDb
,retryOnSqliteBusy
,retryOnException
,Log(..)
,dbThread
) where

-- Unfortunately, we cannot use loadSession with ghc-lib since hie-bios uses
-- the real GHC library and the types are incompatible. Furthermore, when
-- building with ghc-lib we need to make this Haskell agnostic, so no hie-bios!

import Control.Concurrent.Async
import Control.Concurrent.Strict
import Control.Exception.Safe as Safe
import Control.Monad
Expand Down Expand Up @@ -100,14 +98,19 @@
import Control.DeepSeq
import Control.Exception (evaluate)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Trans.Cont (ContT (ContT), evalContT)
import Data.Foldable (for_)
import Data.HashMap.Strict (HashMap)
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Database.SQLite.Simple
import Development.IDE.Core.Thread (ThreadRun (..),
blockRunInThread)
import Development.IDE.Core.Tracing (withTrace)
import Development.IDE.Session.Diagnostics (renderCradleError)
import Development.IDE.Types.Shake (WithHieDb, toNoFileKey)
import Development.IDE.Types.Shake (WithHieDb,
WithHieDbShield (..),
toNoFileKey)
import HieDb.Create
import HieDb.Types
import HieDb.Utils
Expand Down Expand Up @@ -375,44 +378,35 @@
-- writing. Actions are picked off one by one from the `HieWriterChan` and executed in serial
-- by a worker thread using a dedicated database connection.
-- This is done in order to serialize writes to the database, or else SQLite becomes unhappy
runWithDb :: Recorder (WithPriority Log) -> FilePath -> (WithHieDb -> IndexQueue -> IO ()) -> IO ()
runWithDb recorder fp k = do
-- use non-deterministic seed because maybe multiple HLS start at same time
-- and send bursts of requests
rng <- Random.newStdGen
-- Delete the database if it has an incompatible schema version
retryOnSqliteBusy
recorder
rng
(withHieDb fp (const $ pure ()) `Safe.catch` \IncompatibleSchemaVersion{} -> removeFile fp)

withHieDb fp $ \writedb -> do
-- the type signature is necessary to avoid concretizing the tyvar
-- e.g. `withWriteDbRetryable initConn` without type signature will
-- instantiate tyvar `a` to `()`
let withWriteDbRetryable :: WithHieDb
withWriteDbRetryable = makeWithHieDbRetryable recorder rng writedb
withWriteDbRetryable initConn

chan <- newTQueueIO

withAsync (writerThread withWriteDbRetryable chan) $ \_ -> do
withHieDb fp (\readDb -> k (makeWithHieDbRetryable recorder rng readDb) chan)
where
writerThread :: WithHieDb -> IndexQueue -> IO ()
writerThread withHieDbRetryable chan = do
-- Clear the index of any files that might have been deleted since the last run
_ <- withHieDbRetryable deleteMissingRealFiles
_ <- withHieDbRetryable garbageCollectTypeNames
forever $ do
l <- atomically $ readTQueue chan
-- TODO: probably should let exceptions be caught/logged/handled by top level handler
l withHieDbRetryable
--
-- see Note [Serializing runs in separate thread]
dbThread ::
ThreadRun
(Recorder (WithPriority Log), FilePath)
WithHieDbShield -- ^ writer resource
WithHieDbShield -- ^ reader resource
(((HieDb -> IO a) -> IO a) -> IO ())
dbThread = ThreadRun {
tWorker = \(recorder, _fp) (WithHieDbShield withWriter) l -> l withWriter
`Safe.catch` \e@SQLError{} -> do
logWith recorder Error $ LogHieDbWriterThreadSQLiteError e
`Safe.catchAny` \f -> do
logWith recorder Error $ LogHieDbWriterThreadException f

,
tRunWithResource = \(recorder, fp) f -> do
rng <- Random.newStdGen
retryOnSqliteBusy
recorder
rng
(withHieDb fp (const $ pure ()) `Safe.catch` \IncompatibleSchemaVersion{} -> removeFile fp)
evalContT $ do
writeDb <- ContT $ withHieDb fp
readDb <- ContT $ withHieDb fp
let withWriteDbRetryable :: WithHieDb
withWriteDbRetryable = makeWithHieDbRetryable recorder rng writeDb
liftIO $ withWriteDbRetryable initConn
liftIO $ f (WithHieDbShield withWriteDbRetryable) (WithHieDbShield (makeWithHieDbRetryable recorder rng readDb))
}

getHieDbLoc :: FilePath -> IO FilePath
getHieDbLoc dir = do
Expand All @@ -435,11 +429,9 @@
-- This is the key function which implements multi-component support. All
-- components mapping to the same hie.yaml file are mapped to the same
-- HscEnv which is updated as new components are discovered.
loadSession :: Recorder (WithPriority Log) -> FilePath -> IO (Action IdeGhcSession)
loadSession recorder = loadSessionWithOptions recorder def

loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> IO (Action IdeGhcSession)
loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir = do
loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> TQueue (IO ()) -> IO (Action IdeGhcSession)
loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir que = do
let toAbsolutePath = toAbsolute rootDir -- see Note [Root Directory]
cradle_files <- newIORef []
-- Mapping from hie.yaml file to HscEnv, one per hie.yaml file
Expand All @@ -464,9 +456,6 @@
let res' = toAbsolutePath <$> res
return $ normalise <$> res'

dummyAs <- async $ return (error "Uninitialised")
runningCradle <- newVar dummyAs :: IO (Var (Async (IdeResult HscEnvEq,[FilePath])))

return $ do
clientConfig <- getClientConfigAction
extras@ShakeExtras{restartShakeSession, ideNc, knownTargetsVar, lspEnv
Expand Down Expand Up @@ -665,7 +654,7 @@
InstallationMismatch{..} ->
return (([renderPackageSetupException cfp GhcVersionMismatch{..}], Nothing),[])
InstallationChecked _compileTime _ghcLibCheck -> do
atomicModifyIORef' cradle_files (\xs -> (cfp:xs,()))

Check warning on line 657 in ghcide/session-loader/Development/IDE/Session.hs

View workflow job for this annotation

GitHub Actions / Hlint check run

Warning in loadSessionWithOptions in module Development.IDE.Session: Use atomicModifyIORef'_ ▫︎ Found: "atomicModifyIORef' cradle_files (\\ xs -> (cfp : xs, ()))" ▫︎ Perhaps: "atomicModifyIORef'_ cradle_files ((:) cfp)"
session (hieYaml, toNormalizedFilePath' cfp, opts, libDir)
-- Failure case, either a cradle error or the none cradle
Left err -> do
Expand Down Expand Up @@ -739,12 +728,8 @@
return (([renderPackageSetupException file e], Nothing), maybe [] pure hieYaml)

returnWithVersion $ \file -> do
opts <- join $ mask_ $ modifyVar runningCradle $ \as -> do
-- If the cradle is not finished, then wait for it to finish.
void $ wait as
asyncRes <- async $ getOptions file
return (asyncRes, wait asyncRes)
pure opts
-- see Note [Serializing runs in separate thread]
blockRunInThread que $ getOptions file

-- | Run the specific cradle on a specific FilePath via hie-bios.
-- This then builds dependencies or whatever based on the cradle, gets the
Expand Down
4 changes: 3 additions & 1 deletion ghcide/src/Development/IDE/Core/Service.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import Ide.Plugin.Config
import qualified Language.LSP.Protocol.Types as LSP
import qualified Language.LSP.Server as LSP

import Control.Concurrent.STM (TQueue)
import Control.Monad
import qualified Development.IDE.Core.FileExists as FileExists
import qualified Development.IDE.Core.OfInterest as OfInterest
Expand All @@ -53,6 +54,7 @@ instance Pretty Log where
LogOfInterest msg -> pretty msg
LogFileExists msg -> pretty msg


------------------------------------------------------------
-- Exposed API

Expand All @@ -65,7 +67,7 @@ initialise :: Recorder (WithPriority Log)
-> Debouncer LSP.NormalizedUri
-> IdeOptions
-> WithHieDb
-> IndexQueue
-> ThreadQueue
-> Monitoring
-> FilePath -- ^ Root directory see Note [Root Directory]
-> IO IdeState
Expand Down
64 changes: 41 additions & 23 deletions ghcide/src/Development/IDE/Core/Shake.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
garbageCollectDirtyKeysOlderThan,
Log(..),
VFSModified(..), getClientConfigAction,
ThreadQueue(..)
) where

import Control.Concurrent.Async
Expand Down Expand Up @@ -122,8 +123,9 @@
import Development.IDE.Core.PositionMapping
import Development.IDE.Core.ProgressReporting
import Development.IDE.Core.RuleTypes
import Development.IDE.Core.Thread
import Development.IDE.Core.Tracing
import Development.IDE.GHC.Compat (NameCache,

Check warning on line 128 in ghcide/src/Development/IDE/Core/Shake.hs

View workflow job for this annotation

GitHub Actions / Hlint check run

Warning in module Development.IDE.Core.Shake: Use fewer imports ▫︎ Found: "import Development.IDE.GHC.Compat\n ( NameCache, initNameCache, knownKeyNames )\nimport Development.IDE.GHC.Compat\n ( NameCacheUpdater(NCU), mkSplitUniqSupply, upNameCache )\n" ▫︎ Perhaps: "import Development.IDE.GHC.Compat\n ( NameCache,\n initNameCache,\n knownKeyNames,\n NameCacheUpdater(NCU),\n mkSplitUniqSupply,\n upNameCache )\n"
initNameCache,
knownKeyNames)
import Development.IDE.GHC.Orphans ()
Expand Down Expand Up @@ -262,6 +264,12 @@
-- with (currently) retry functionality
type IndexQueue = TQueue (((HieDb -> IO ()) -> IO ()) -> IO ())

data ThreadQueue = ThreadQueue {
tIndexQueue :: IndexQueue
, tRestartQueue :: TQueue (IO ())
, tLoaderQueue :: TQueue (IO ())
}

-- Note [Semantic Tokens Cache Location]
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- storing semantic tokens cache for each file in shakeExtras might
Expand Down Expand Up @@ -334,6 +342,10 @@
-- ^ Default HLS config, only relevant if the client does not provide any Config
, dirtyKeys :: TVar KeySet
-- ^ Set of dirty rule keys since the last Shake run
, restartQueue :: TQueue (IO ())
-- ^ Queue of restart actions to be run.
, loaderQueue :: TQueue (IO ())
-- ^ Queue of loader actions to be run.
}

type WithProgressFunc = forall a.
Expand Down Expand Up @@ -648,7 +660,7 @@
-> IdeReportProgress
-> IdeTesting
-> WithHieDb
-> IndexQueue
-> ThreadQueue
-> ShakeOptions
-> Monitoring
-> Rules ()
Expand All @@ -658,8 +670,12 @@
-> IO IdeState
shakeOpen recorder lspEnv defaultConfig idePlugins debouncer
shakeProfileDir (IdeReportProgress reportProgress)
ideTesting@(IdeTesting testing)
withHieDb indexQueue opts monitoring rules rootDir = mdo
ideTesting
withHieDb threadQueue opts monitoring rules rootDir = mdo
-- see Note [Serializing runs in separate thread]
let indexQueue = tIndexQueue threadQueue
restartQueue = tRestartQueue threadQueue
loaderQueue = tLoaderQueue threadQueue

#if MIN_VERSION_ghc(9,3,0)
ideNc <- initNameCache 'r' knownKeyNames
Expand Down Expand Up @@ -784,31 +800,33 @@
extras <- ask
liftIO $ shakeEnqueue extras a


-- | Restart the current 'ShakeSession' with the given system actions.
-- Any actions running in the current session will be aborted,
-- but actions added via 'shakeEnqueue' will be requeued.
shakeRestart :: Recorder (WithPriority Log) -> IdeState -> VFSModified -> String -> [DelayedAction ()] -> IO [Key] -> IO ()
shakeRestart recorder IdeState{..} vfs reason acts ioActionBetweenShakeSession =
withMVar'
shakeSession
(\runner -> do
(stopTime,()) <- duration $ logErrorAfter 10 $ cancelShakeSession runner
keys <- ioActionBetweenShakeSession
-- it is every important to update the dirty keys after we enter the critical section
-- see Note [Housekeeping rule cache and dirty key outside of hls-graph]
atomically $ modifyTVar' (dirtyKeys shakeExtras) $ \x -> foldl' (flip insertKeySet) x keys
res <- shakeDatabaseProfile shakeDb
backlog <- readTVarIO $ dirtyKeys shakeExtras
queue <- atomicallyNamed "actionQueue - peek" $ peekInProgress $ actionQueue shakeExtras

-- this log is required by tests
logWith recorder Debug $ LogBuildSessionRestart reason queue backlog stopTime res
)
-- It is crucial to be masked here, otherwise we can get killed
-- between spawning the new thread and updating shakeSession.
-- See https://github.com/haskell/ghcide/issues/79
(\() -> do
(,()) <$> newSession recorder shakeExtras vfs shakeDb acts reason)
void $ blockRunInThread (restartQueue shakeExtras) $ do
withMVar'
shakeSession
(\runner -> do
(stopTime,()) <- duration $ logErrorAfter 10 $ cancelShakeSession runner
keys <- ioActionBetweenShakeSession
-- it is every important to update the dirty keys after we enter the critical section
-- see Note [Housekeeping rule cache and dirty key outside of hls-graph]
atomically $ modifyTVar' (dirtyKeys shakeExtras) $ \x -> foldl' (flip insertKeySet) x keys
res <- shakeDatabaseProfile shakeDb
backlog <- readTVarIO $ dirtyKeys shakeExtras
queue <- atomicallyNamed "actionQueue - peek" $ peekInProgress $ actionQueue shakeExtras

-- this log is required by tests
logWith recorder Debug $ LogBuildSessionRestart reason queue backlog stopTime res
)
-- It is crucial to be masked here, otherwise we can get killed
-- between spawning the new thread and updating shakeSession.
-- See https://github.com/haskell/ghcide/issues/79
(\() -> do
(,()) <$> newSession recorder shakeExtras vfs shakeDb acts reason)
where
logErrorAfter :: Seconds -> IO () -> IO ()
logErrorAfter seconds action = flip withAsync (const action) $ do
Expand Down
67 changes: 67 additions & 0 deletions ghcide/src/Development/IDE/Core/Thread.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
module Development.IDE.Core.Thread
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a more typical name for something like this would be a "work queue" or "job queue".

( ThreadRun(..), runWithThread, blockRunInThread)
where

import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.Strict (newBarrier, signalBarrier,
waitBarrier)
import Control.Monad (forever)
import Control.Monad.Cont (ContT (ContT))

-- Note [Serializing runs in separate thread]
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- In a lof cases we want to have a separate thread that will serialize the runs of the actions.
soulomoon marked this conversation as resolved.
Show resolved Hide resolved
-- Like the db writes, session loading in session loader, shake session restarts.
--
-- Originally we used various ways to implement this, but it was hard to maintain and error prone.
-- Moreover, we can not stop these threads uniformly when we are shutting down the server.
--
-- `Development.IDE.Core.Thread` module provides a declarative api to implement this easily.
-- In `ThreadRun` data type:
-- * `tRunWithResource`: is used to create the resources needed to perform the long running action.
-- * `tWorker`: is the action we want to run in separate thread serially.
--
-- runWithThread will create a worker thread to run along with the main thread.
-- runWithThread provides `resource` created by `tRunWithResource` and a `TQueue` to send the actions to run.
-- The worker thread will serialize the runs of the actions from the TQueue.


data ThreadRun input workerResource resource arg = ThreadRun {
tRunWithResource ::
input -- ^ input of running
-> (workerResource -> resource -> IO ()) -- ^ the long running action need to run with resource
-> IO (),
tWorker -- ^ A single action we want to run in separate thread serially
:: input -- ^ input of running
-> workerResource -- ^ writer resource
-> arg -- ^ argument to run
-> IO ()
}

-- | runWithThread
-- Run a long running action with a additional running thread
-- The additional thread will serialize runs of the actions from the TQueue.
-- Return ContT to run the action
runWithThread :: ThreadRun input workerResource resource arg -> input -> ContT () IO (resource, TQueue arg)
runWithThread ThreadRun{..} ip = ContT $ \f -> do
tRunWithResource ip $ \w r -> do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this kind of looks like two separate things to me:

  1. A domain-specific withResource which creates the resource and passes it to a continuation.
  2. A standard "worker queue"

My instinct is that we could focus on defining the worker queue, and just leave the creation of the resource, where necessary, to the call site. Indeed, many of the call sites don't create a resource at all! I think we could then quite naturally write:

withMyResource $ \resource -> withWorkerQueue (\arg -> doSomething resource arg) $ \queue -> ...

or something?

Copy link
Collaborator Author

@soulomoon soulomoon Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, let's implement withWorkerQueue instead to see if that would make better sense

q <- newTQueueIO
withAsync (writerThread w q) $ \_ -> f (r, q)
where
writerThread r q =
forever $ do
l <- atomically $ readTQueue q
tWorker ip r l


-- | blockRunInThread run and wait for the result
-- Take an action from TQueue, run it and
-- use barrier to wait for the result
soulomoon marked this conversation as resolved.
Show resolved Hide resolved
blockRunInThread :: TQueue (IO ()) -> IO result -> IO result
blockRunInThread q act = do
barrier <- newBarrier
atomically $ writeTQueue q $ do
res <- act
signalBarrier barrier res
waitBarrier barrier
Loading
Loading