Skip to content

Commit

Permalink
feat(runtime): move scheduler to library and make it possible to buil…
Browse files Browse the repository at this point in the history
…d executable
  • Loading branch information
symbiont-stevan-andjelkovic committed Aug 31, 2021
1 parent 85b74f3 commit 973a8a7
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 59 deletions.
13 changes: 11 additions & 2 deletions src/runtime-prototype/app/Main.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
module Main where

import Control.Concurrent
import StuntDouble
import Scheduler

------------------------------------------------------------------------

main :: IO ()
main = print =<< getNumCapabilities
main = do
let executorPort = 3004
executorRef = RemoteRef ("http://localhost:" ++ show executorPort) 0
el <- makeEventLoop realTime (makeSeed 0) HttpSync (EventLoopName "scheduler")
lref <- spawn el (fakeScheduler executorRef) initState
putStrLn "Scheduler is running..."
waitForEventLoopQuit el
53 changes: 53 additions & 0 deletions src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{-# LANGUAGE OverloadedStrings #-}

module Scheduler where

import Data.Text (Text)
import qualified Data.Text as Text
import Control.Exception
import Control.Concurrent.Async

import StuntDouble

------------------------------------------------------------------------

initState :: State
initState = stateFromList [ ("heap", heapFromList [(Integer 1, Text "cmd1")])
, ("time", epoch)
, ("seed", Integer 0)
]

fakeScheduler :: RemoteRef -> Message -> Actor
fakeScheduler executorRef (ClientRequest "CreateTest" cid) = Actor $ do
-- load from db
undefined
fakeScheduler executorRef (ClientRequest "Start" cid) = Actor $ do
-- pop agenda end send to executorRef
r <- pop <$> gets "heap"
case r of
Some (Pair cmd heap') -> do
update "heap" heap'
p <- send executorRef (InternalMessage (prettyCommand cmd))
on p (\(InternalMessageR (InternalMessage "Ack")) -> undefined)
undefined
None -> return (InternalMessage "Done") -- XXX: reply to client id?!
_otherwise -> error "scheduler: start: impossible"
where
prettyCommand :: Datatype -> String
prettyCommand = undefined
fakeScheduler executorRef msg@(InternalMessage "Ack") = Actor $ do
undefined
-- does executor send back anything else?
-- schedule the responses from the executor back on the agenda

-- XXX: we need to make messages be able to have args/fields/payload
-- cmds <- parseCommands (payload msg)
-- if no cmds and agenda is empty then stop (how do we contact client? need to save cid?)
-- else
-- now <- gets "time"
-- seed <- gets "seed"
-- arrivalTime <- genArrivalTime now seed
-- op2 push arrivalTime (parseCommand resp) %= "heap"
-- where
-- parseCommand :: Message -> Datatype
-- parseCommand (InternalMessage m) = Pair (Text (Text.pack (show m))) (List []) -- XXX: args
1 change: 1 addition & 0 deletions src/runtime-prototype/src/StuntDouble.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ import StuntDouble.Reference as X
import StuntDouble.Time as X
import StuntDouble.Transport as X
import StuntDouble.Transport.Http as X
import StuntDouble.Transport.HttpSync as X
import StuntDouble.Transport.NamedPipe as X
import StuntDouble.Transport.Stm as X
8 changes: 8 additions & 0 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import StuntDouble.Reference
import StuntDouble.Time
import StuntDouble.Transport
import StuntDouble.Transport.Http
import StuntDouble.Transport.HttpSync
import StuntDouble.Transport.NamedPipe
import StuntDouble.Transport.Stm

Expand Down Expand Up @@ -536,6 +537,7 @@ withTransport tk name k =
(case tk of
NamedPipe fp -> namedPipeTransport fp name
Http port -> httpTransport port
HttpSync -> httpSyncTransport
Stm -> stmTransport)
transportShutdown
k
Expand Down Expand Up @@ -578,6 +580,7 @@ makeEventLoopThreaded threaded threadpool time seed tk name = do
t <- case tk of
NamedPipe fp -> namedPipeTransport fp name
Http port -> httpTransport port
HttpSync -> httpSyncTransport
Stm -> stmTransport
disk <- fakeDisk
ls <- initLoopState name time seed t disk
Expand Down Expand Up @@ -780,3 +783,8 @@ handleEvent (Admin cmd) ls = case cmd of
handleEvent (ClientRequestEvent lref msg cref returnVar) ls = do
reply <- actorPokeIO ls lref (ClientRequest (getMessage msg) cref)
atomically (putTMVar returnVar reply)

waitForEventLoopQuit :: EventLoop -> IO ()
waitForEventLoopQuit ls = do
pids <- readTVarIO (lsPids ls)
mapM_ wait pids
2 changes: 1 addition & 1 deletion src/runtime-prototype/src/StuntDouble/Transport.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import StuntDouble.Envelope

------------------------------------------------------------------------

data TransportKind = NamedPipe FilePath | Http Int | Stm
data TransportKind = NamedPipe FilePath | Http Int | HttpSync | Stm

data Transport m = Transport
{ transportSend :: Envelope -> m ()
Expand Down
6 changes: 3 additions & 3 deletions src/runtime-prototype/src/StuntDouble/Transport/HttpSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ httpSyncTransport = do
-- `transportReceive` are made to retrieve their replies.
queue <- newQueue 128
manager <- newManager defaultManagerSettings
return Transport { transportSend = transportSend' manager queue
return Transport { transportSend = transportSyncSend manager queue
, transportReceive = dequeue queue
, transportShutdown = return ()
}

transportSend' :: Manager -> Queue Envelope -> Envelope -> IO ()
transportSend' manager queue e = do
transportSyncSend :: Manager -> Queue Envelope -> Envelope -> IO ()
transportSyncSend manager queue e = do
request <- envelopeToRequest e
-- XXX: Instead of sending right away here, we could batch instead and only
-- send ever 10 ms or whatever, we could also send concurrently (we would need
Expand Down
11 changes: 4 additions & 7 deletions src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tested-with: GHC ==8.10.4
library
hs-source-dirs: src/
exposed-modules:
Scheduler
StuntDouble
StuntDouble.Actor.State
StuntDouble.ActorMap
Expand Down Expand Up @@ -102,15 +103,11 @@ test-suite test
ghc-options: -threaded -rtsopts -with-rtsopts=-N -fno-ignore-asserts
default-language: Haskell2010

executable stunt-double
executable scheduler
hs-source-dirs: app/
main-is: Main.hs

-- other-modules:
-- other-extensions:
build-depends: base ==4.13.*

-- hs-source-dirs:
build-depends: base ==4.13.*,
stunt-double
ghc-options: -threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010

Expand Down
46 changes: 0 additions & 46 deletions src/runtime-prototype/test/StuntDouble/SchedulerTest.hs
Original file line number Diff line number Diff line change
@@ -1,53 +1,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module StuntDouble.SchedulerTest where

import Data.Text (Text)
import qualified Data.Text as Text
import Control.Exception
import Control.Concurrent.Async
import Test.HUnit

import StuntDouble

------------------------------------------------------------------------

fakeScheduler :: RemoteRef -> Message -> Actor
fakeScheduler executorRef (ClientRequest "CreateTest" cid) = Actor $ do
-- load from db
undefined
fakeScheduler executorRef (ClientRequest "Start" cid) = Actor $ do
-- pop agenda end send to executorRef
r <- pop <$> gets "heap"
case r of
Some (Pair cmd heap') -> do
update "heap" heap'
_ <- send executorRef (InternalMessage (prettyCommand cmd))
undefined
None -> return (InternalMessage "Done") -- XXX: reply to client id?!
_otherwise -> error "scheduler: start: impossible"
where
prettyCommand :: Datatype -> String
prettyCommand = undefined
fakeScheduler executorRef msg@(InternalMessage "Ack") = Actor $ do
undefined
-- does executor send back anything else?
-- schedule the responses from the executor back on the agenda

-- XXX: we need to make messages be able to have args/fields/payload
-- cmds <- parseCommands (payload msg)
-- if no cmds and agenda is empty then stop (how do we contact client? need to save cid?)
-- else
-- now <- gets "time"
-- seed <- gets "seed"
-- arrivalTime <- genArrivalTime now seed
-- op2 push arrivalTime (parseCommand resp) %= "heap"
-- where
-- parseCommand :: Message -> Datatype
-- parseCommand (InternalMessage m) = Pair (Text (Text.pack (show m))) (List []) -- XXX: args


{-
unit_scheduler :: Assertion
unit_scheduler = do
Expand Down

0 comments on commit 973a8a7

Please sign in to comment.