Skip to content

Commit

Permalink
feat(runtime): add the first parts of the admin commands
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Sep 22, 2021
1 parent 6f9aa3e commit 1cc79d5
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ fakeScheduler executorRef (ClientRequest' "Start" [] cid) =
p <- send executorRef (InternalMessage (prettyEvent e))
on p (\(InternalMessageR (InternalMessage' "Events" args)) -> do
-- XXX: we should generate an arrival time here using the seed.
-- XXX: with some probability duplicate the event?
let Just evs = sequence (map (fromSDatatype time) args)
evs' = filter (\e -> kind e /= "ok") (concat evs)
heap' = Heap.fromList (map (\e -> Entry (at e) e) evs')
Expand Down
72 changes: 51 additions & 21 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import StuntDouble.Metrics
import StuntDouble.Random
import StuntDouble.Reference
import StuntDouble.Time
import StuntDouble.AdminTransport
import StuntDouble.AdminTransport.NamedPipe
import StuntDouble.Transport
import StuntDouble.Transport.Http
import StuntDouble.Transport.HttpSync
Expand Down Expand Up @@ -450,6 +452,7 @@ data Event
= Action Action
| Reaction Reaction
| Admin Command
| AdminCommands [AdminCommand]
| ClientRequestEvent LocalRef Message ClientRef (TMVar Message)

data Command
Expand All @@ -460,23 +463,25 @@ data Command
| Quit

data EventLoop = EventLoop
{ lsName :: EventLoopName
, lsActorMap :: TVar ActorMap
, lsAsyncState :: TVar AsyncState
, lsQueue :: TBQueue Event
, lsTime :: Time
, lsSeed :: TVar Seed
, lsTransport :: Transport IO
, lsDisk :: Disk IO
, lsPids :: TVar [Async ()]
, lsNextPromise :: TVar Promise
, lsLog :: TVar Log
, lsMetrics :: Metrics
, lsIOQueue :: TBQueue (IOOp, Promise)
{ lsName :: EventLoopName
, lsActorMap :: TVar ActorMap
, lsAsyncState :: TVar AsyncState
, lsQueue :: TBQueue Event
, lsTime :: Time
, lsSeed :: TVar Seed
, lsTransport :: Transport IO
, lsAdminTransport :: AdminTransport
, lsDisk :: Disk IO
, lsPids :: TVar [Async ()]
, lsNextPromise :: TVar Promise
, lsLog :: TVar Log
, lsMetrics :: Metrics
, lsIOQueue :: TBQueue (IOOp, Promise)
}

initLoopState :: EventLoopName -> Time -> Seed -> Transport IO -> Disk IO -> IO EventLoop
initLoopState name time seed transport disk =
initLoopState :: EventLoopName -> Time -> Seed -> Transport IO -> AdminTransport -> Disk IO
-> IO EventLoop
initLoopState name time seed transport adminTransport disk =
EventLoop
<$> pure name
<*> newTVarIO emptyActorMap
Expand All @@ -485,6 +490,7 @@ initLoopState name time seed transport disk =
<*> pure time
<*> newTVarIO seed
<*> pure transport
<*> pure adminTransport
<*> pure disk
<*> newTVarIO []
<*> newTVarIO (Promise 0)
Expand Down Expand Up @@ -591,34 +597,39 @@ spawnIOWorkers (ThreadPoolOfSize n) ls
error "spawnIOWorkers: thread pool size is bigger than the available CPU capabilities"
mapM (\c -> asyncOn c (asyncIOWorker ls)) [ i | i <- [0..n], i /= cap ]

makeEventLoop :: Time -> Seed -> TransportKind -> Codec -> DiskKind -> EventLoopName
-> IO EventLoop
makeEventLoop :: Time -> Seed -> TransportKind -> AdminTransportKind -> Codec -> DiskKind
-> EventLoopName -> IO EventLoop
makeEventLoop = makeEventLoopThreaded SingleThreaded NoThreadPool

makeEventLoopThreaded :: Threaded -> ThreadPool -> Time -> Seed -> TransportKind
-> Codec -> DiskKind -> EventLoopName -> IO EventLoop
makeEventLoopThreaded threaded threadpool time seed tk codec dk name = do
-> AdminTransportKind -> Codec -> DiskKind -> EventLoopName
-> IO EventLoop
makeEventLoopThreaded threaded threadpool time seed tk atk codec dk name = do
t <- case tk of
NamedPipe fp -> namedPipeTransport fp name
Http port -> httpTransport port
HttpSync -> httpSyncTransport codec
Stm -> stmTransport
at <- case atk of
AdminNamedPipe fp -> namedPipeAdminTransport fp name
d <- case dk of
FakeDisk -> fakeDisk
RealDisk fp -> realDisk fp
ls <- initLoopState name time seed t d
ls <- initLoopState name time seed t at d
workerPids <- spawnIOWorkers threadpool ls
pids <- case threaded of
SingleThreaded ->
fmap (: [])
(async (runHandlers seed -- XXX: or do we want `TVar Seed` here?
[ handleInbound1 ls
, handleAdminCommands1 ls
, handleAsyncIO1 ls
, handleEvents1 ls
, handleTimeouts1 ls
]))
MultiThreaded ->
mapM async [ handleInbound ls
, handleAdminCommands ls
, handleAsyncIO ls
, handleEvents ls
, handleTimeouts ls
Expand All @@ -627,6 +638,7 @@ makeEventLoopThreaded threaded threadpool time seed tk codec dk name = do
mapM_ link pids
return ls

{-
withEventLoop :: EventLoopName -> Codec -> (EventLoop -> FakeTimeHandle -> IO a) -> IO a
withEventLoop name codec k =
withTransport (NamedPipe "/tmp") codec name $ \t -> do
Expand All @@ -644,6 +656,7 @@ withEventLoop name codec k =
x <- k ls h
quit ls
return x
-}

runHandlers :: Seed -> [IO ()] -> IO ()
runHandlers seed0 hs = go seed0
Expand All @@ -666,14 +679,24 @@ handleInbound = forever . handleInbound1
handleInbound1 :: EventLoop -> IO ()
handleInbound1 ls = do
-- XXX: instead of just reading one message from the transport queue we could
-- read the whole queue here...
-- read the whole queue here... See `adminTransportReceive`.
me <- transportReceive (lsTransport ls)
case me of
Nothing -> return ()
Just e -> do
let p = Promise (getCorrelationId (envelopeCorrelationId e))
atomically (writeTBQueue (lsQueue ls) (Reaction (Receive p e)))

handleAdminCommands :: EventLoop -> IO ()
handleAdminCommands = forever . handleAdminCommands1

handleAdminCommands1 :: EventLoop -> IO ()
handleAdminCommands1 ls = do
cmds <- adminTransportReceive (lsAdminTransport ls)
case cmds of
[] -> return ()
_ : _ -> atomically (writeTBQueue (lsQueue ls) (AdminCommands cmds))

handleAsyncIO :: EventLoop -> IO ()
handleAsyncIO ls = forever (handleAsyncIO1 ls >> threadDelay 1000 {- 1 ms -})

Expand Down Expand Up @@ -807,6 +830,13 @@ handleEvent (Admin cmd) ls = case cmd of
pids <- readTVarIO (lsPids ls)
threadDelay 100000
mapM_ cancel pids
handleEvent (AdminCommands cmds) ls = mapM_ go cmds
where
go :: AdminCommand -> IO ()
go AdminQuit = handleEvent (Admin Quit) ls
go AdminDumpLog = adminTransportSend (lsAdminTransport ls) "XXX: log"
go AdminResetLog = undefined -- XXX

handleEvent (ClientRequestEvent lref msg cref returnVar) ls = do
reply <- actorPokeIO ls lref (ClientRequest' (getMessage msg) (getArgs msg) cref)
atomically (putTMVar returnVar reply)
Expand Down
17 changes: 17 additions & 0 deletions src/runtime-prototype/src/StuntDouble/AdminTransport.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module StuntDouble.AdminTransport where

------------------------------------------------------------------------

data AdminTransportKind = AdminNamedPipe FilePath

data AdminCommand
= AdminQuit
| AdminDumpLog
| AdminResetLog
deriving (Read, Show)

data AdminTransport = AdminTransport
{ adminTransportSend :: String -> IO ()
, adminTransportReceive :: IO [AdminCommand]
, adminTransportShutdown :: IO ()
}
54 changes: 54 additions & 0 deletions src/runtime-prototype/src/StuntDouble/AdminTransport/NamedPipe.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
module StuntDouble.AdminTransport.NamedPipe where

import Control.Monad
import Text.Read (readMaybe)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import System.Directory
import System.FilePath
import System.IO
import System.IO.Error
import System.Posix.Files

import StuntDouble.Envelope
import StuntDouble.Message
import StuntDouble.Queue
import StuntDouble.Reference
import StuntDouble.AdminTransport
import StuntDouble.Transport.NamedPipe (safeCreateNamedPipe)

------------------------------------------------------------------------

namedPipeAdminTransport :: FilePath -> EventLoopName -> IO AdminTransport
namedPipeAdminTransport fp name = do
queue <- newTBQueueIO 128 -- This queue grows if input is produced more often
-- than `transportReceive` is called.
let pipe = fp </> getEventLoopName name </> "admin"
safeCreateNamedPipe pipe
h <- openFile pipe ReadWriteMode
hSetBuffering h LineBuffering
pid <- async (producer h queue)
return AdminTransport
{ adminTransportSend = hPutStrLn h
, adminTransportReceive = atomically (flushTBQueue queue)
, adminTransportShutdown = do
adminCleanUpNamedPipe fp name
cancel pid -- XXX: Potentially resource leak?
}
where
producer :: Handle -> TBQueue AdminCommand -> IO ()
producer h queue = forever $ do
l <- hGetLine h
case readMaybe l of
Just cmd -> atomically (writeTBQueue queue cmd)
Nothing -> return () -- XXX: Perhaps we should log something here?

adminCleanUpNamedPipe :: FilePath -> EventLoopName -> IO ()
adminCleanUpNamedPipe fp name =
catchJust
(\e -> if isDoesNotExistErrorType (ioeGetErrorType e)
then Just ()
else Nothing)
(removeFile (fp </> getEventLoopName name </> "admin"))
return
2 changes: 2 additions & 0 deletions src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ library
StuntDouble.Transport.HttpSync
StuntDouble.Transport.NamedPipe
StuntDouble.Transport.Stm
StuntDouble.AdminTransport
StuntDouble.AdminTransport.NamedPipe
StuntDouble.Queue

-- GHC boot library dependencies:
Expand Down

0 comments on commit 1cc79d5

Please sign in to comment.