diff --git a/src/runtime-prototype/app/Main.hs b/src/runtime-prototype/app/Main.hs index cf9c4cba..1d181c7b 100644 --- a/src/runtime-prototype/app/Main.hs +++ b/src/runtime-prototype/app/Main.hs @@ -1,15 +1,38 @@ +{-# LANGUAGE ScopedTypeVariables #-} + module Main where -import StuntDouble +import Control.Concurrent.Async (cancel) +import Control.Exception (throwIO) +import System.Environment (getEnv) +import System.FilePath (()) +import System.IO.Error (catchIOError, isDoesNotExistError) + import Scheduler +import StuntDouble ------------------------------------------------------------------------ +getDbPath :: IO FilePath +getDbPath = do + getEnv "DETSYS_DB" + `catchIOError` \(e :: catchIOError) -> + if isDoesNotExistError e + then do + home <- getEnv "HOME" + return (home ".detsys.db") + else throwIO e + main :: IO () main = do let executorPort = 3004 executorRef = RemoteRef ("http://localhost:" ++ show executorPort) 0 - el <- makeEventLoop realTime (makeSeed 0) HttpSync (EventLoopName "scheduler") - lref <- spawn el (fakeScheduler executorRef) initState - putStrLn "Scheduler is running..." - waitForEventLoopQuit el + schedulerPort = 3005 + fp <- getDbPath + el <- makeEventLoop realTime (makeSeed 0) HttpSync (RealDisk fp) (EventLoopName "scheduler") + now <- getCurrentTime realTime + lref <- spawn el (fakeScheduler executorRef) (initState now (makeSeed 0)) + withHttpFrontend el lref schedulerPort $ \pid -> do + putStrLn ("Scheduler is listening on port: " ++ show schedulerPort) + waitForEventLoopQuit el + cancel pid diff --git a/src/runtime-prototype/src/Scheduler.hs b/src/runtime-prototype/src/Scheduler.hs index 88274b82..ce24c3c8 100644 --- a/src/runtime-prototype/src/Scheduler.hs +++ b/src/runtime-prototype/src/Scheduler.hs @@ -1,12 +1,16 @@ {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE DeriveGeneric #-} module Scheduler where import Control.Concurrent.Async import Control.Exception +import Data.Aeson +import GHC.Generics (Generic) import Data.Heap (Entry(Entry), Heap) import qualified Data.Heap as Heap import Data.Text (Text) +import qualified Data.Text.Encoding as Text import qualified Data.Text as Text import Data.Time (UTCTime) import Database.SQLite.Simple @@ -15,10 +19,20 @@ import StuntDouble ------------------------------------------------------------------------ -data SchedulerCommand = SchedulerCommand +data SchedulerEvent = SchedulerEvent + { kind :: String + , event :: String + , args :: Data.Aeson.Value + , to :: String + , from :: String + , at :: UTCTime + } + deriving (Generic, Eq, Ord, Show) + +instance FromJSON SchedulerEvent where data SchedulerState = SchedulerState - { heap :: Heap (Entry UTCTime SchedulerCommand) + { heap :: Heap (Entry UTCTime SchedulerEvent) , time :: UTCTime , seed :: Seed } @@ -30,19 +44,21 @@ initState t s = SchedulerState , seed = s } -data Agenda = Agenda Int -- XXX +data Agenda = Agenda [SchedulerEvent] instance ParseRow Agenda where - parseRow [FInt i] = Just (Agenda i) - parseRow _ = Nothing + parseRow [FText t] = case eitherDecodeStrict (Text.encodeUtf8 t) of + Right es -> Just (Agenda es) + Left err -> error (show err) + parseRow x = error (show x) fakeScheduler :: RemoteRef -> Message -> Actor SchedulerState fakeScheduler executorRef (ClientRequest' "CreateTest" [SInt tid] cid) = Actor $ do - -- load from db. XXX: need to extend IO module to be able to return Datatype? p <- asyncIO (IOQuery "SELECT agenda FROM test_info WHERE test_id = :tid" [":tid" := tid]) - on p (\(IOResultR (IORows entries)) -> case parseRows entries of - Just [Agenda i] -> undefined) - undefined + on p (\(IOResultR (IORows rs)) -> case parseRows rs of + Nothing -> clientResponse cid (InternalMessage "parse error") + Just [Agenda es] -> clientResponse cid (InternalMessage (show es))) + return (InternalMessage "ok") fakeScheduler executorRef (ClientRequest "Start" cid) = Actor $ do -- pop agenda end send to executorRef r <- Heap.uncons . heap <$> get @@ -56,7 +72,7 @@ fakeScheduler executorRef (ClientRequest "Start" cid) = Actor $ do undefined Nothing -> return (InternalMessage "Done") -- XXX: reply to client id?! where - prettyCommand :: SchedulerCommand -> String + prettyCommand :: SchedulerEvent -> String prettyCommand = undefined fakeScheduler executorRef msg@(InternalMessage "Ack") = Actor $ do undefined diff --git a/src/runtime-prototype/src/StuntDouble/ActorMap.hs b/src/runtime-prototype/src/StuntDouble/ActorMap.hs index 59ae11dd..4f5772e4 100644 --- a/src/runtime-prototype/src/StuntDouble/ActorMap.hs +++ b/src/runtime-prototype/src/StuntDouble/ActorMap.hs @@ -584,19 +584,21 @@ spawnIOWorkers (ThreadPoolOfSize n) ls error "spawnIOWorkers: thread pool size is bigger than the available CPU capabilities" mapM (\c -> asyncOn c (asyncIOWorker ls)) [ i | i <- [0..n], i /= cap ] -makeEventLoop :: Time -> Seed -> TransportKind -> EventLoopName -> IO EventLoop +makeEventLoop :: Time -> Seed -> TransportKind -> DiskKind -> EventLoopName -> IO EventLoop makeEventLoop = makeEventLoopThreaded SingleThreaded NoThreadPool -makeEventLoopThreaded :: Threaded -> ThreadPool -> Time -> Seed -> TransportKind +makeEventLoopThreaded :: Threaded -> ThreadPool -> Time -> Seed -> TransportKind -> DiskKind -> EventLoopName -> IO EventLoop -makeEventLoopThreaded threaded threadpool time seed tk name = do +makeEventLoopThreaded threaded threadpool time seed tk dk name = do t <- case tk of NamedPipe fp -> namedPipeTransport fp name Http port -> httpTransport port HttpSync -> httpSyncTransport Stm -> stmTransport - disk <- fakeDisk - ls <- initLoopState name time seed t disk + d <- case dk of + FakeDisk -> fakeDisk + RealDisk fp -> realDisk fp + ls <- initLoopState name time seed t d workerPids <- spawnIOWorkers threadpool ls pids <- case threaded of SingleThreaded -> @@ -798,7 +800,7 @@ handleEvent (Admin cmd) ls = case cmd of threadDelay 100000 mapM_ cancel pids handleEvent (ClientRequestEvent lref msg cref returnVar) ls = do - reply <- actorPokeIO ls lref (ClientRequest (getMessage msg) cref) + reply <- actorPokeIO ls lref (ClientRequest' (getMessage msg) (getArgs msg) cref) atomically (putTMVar returnVar reply) waitForEventLoopQuit :: EventLoop -> IO () diff --git a/src/runtime-prototype/src/StuntDouble/IO.hs b/src/runtime-prototype/src/StuntDouble/IO.hs index 3ae0c0be..a41dce9d 100644 --- a/src/runtime-prototype/src/StuntDouble/IO.hs +++ b/src/runtime-prototype/src/StuntDouble/IO.hs @@ -6,6 +6,7 @@ module StuntDouble.IO where import Control.Monad +import Data.String import Data.HashMap.Strict (HashMap) import qualified Data.HashMap.Strict as HashMap import Data.Hashable @@ -37,8 +38,8 @@ data IOOp | IODeletes [Key] | IOIterate Key Key - | IOExecute Query [NamedParam] - | IOQuery Query [NamedParam] + | IOExecute String [NamedParam] + | IOQuery String [NamedParam] | IOReturn IOResult @@ -52,8 +53,8 @@ data Disk m = Disk , ioIterate :: Maybe Key -> Maybe Key -> m [(Key, Value)] -- SQLite. - , ioExecute :: Query -> [NamedParam] -> m () - , ioQuery :: Query -> [NamedParam] -> m [[FieldValue]] + , ioExecute :: String -> [NamedParam] -> m () + , ioQuery :: String -> [NamedParam] -> m [[FieldValue]] } data FieldValue @@ -106,8 +107,8 @@ fakeDisk = do slowFakeDisk :: IO (Disk IO) slowFakeDisk = undefined -realSqlite :: FilePath -> IO (Disk IO) -realSqlite fp = do +realDisk :: FilePath -> IO (Disk IO) +realDisk fp = do conn <- open fp return Disk { ioGet = undefined @@ -117,19 +118,26 @@ realSqlite fp = do , ioDeletes = undefined , ioIterate = undefined - , ioExecute = executeNamed conn - , ioQuery = queryNamed conn + , ioExecute = \q -> executeNamed conn (fromString q) + , ioQuery = \q -> queryNamed conn (fromString q) } class ParseRow a where parseRow :: [FieldValue] -> Maybe a instance ParseRow [FieldValue] where + parseRow :: [FieldValue] -> Maybe [FieldValue] parseRow = Just instance ParseRow FieldValue where + parseRow :: [FieldValue] -> Maybe FieldValue parseRow [fv] = Just fv parseRow _ = Nothing parseRows :: ParseRow a => [[FieldValue]] -> Maybe [a] parseRows = sequence . map parseRow + + +data DiskKind + = FakeDisk + | RealDisk FilePath diff --git a/src/runtime-prototype/src/StuntDouble/Message.hs b/src/runtime-prototype/src/StuntDouble/Message.hs index 0781cc57..70752e2a 100644 --- a/src/runtime-prototype/src/StuntDouble/Message.hs +++ b/src/runtime-prototype/src/StuntDouble/Message.hs @@ -31,3 +31,9 @@ getMessage (InternalMessage msg) = msg getMessage (InternalMessage' msg _args) = msg getMessage (ClientRequest msg _cid) = msg getMessage (ClientRequest' msg _args _cid) = msg + +getArgs :: Message -> Args +getArgs InternalMessage {} = [] +getArgs (InternalMessage' _msg args) = args +getArgs ClientRequest {} = [] +getArgs (ClientRequest' _msg args _cref) = args diff --git a/src/runtime-prototype/stunt-double.cabal b/src/runtime-prototype/stunt-double.cabal index 29cd5424..37b9a81b 100644 --- a/src/runtime-prototype/stunt-double.cabal +++ b/src/runtime-prototype/stunt-double.cabal @@ -107,7 +107,9 @@ test-suite test executable scheduler hs-source-dirs: app/ main-is: Main.hs - build-depends: base ==4.13.*, + build-depends: async, + base ==4.13.*, + filepath, stunt-double ghc-options: -threaded -rtsopts -with-rtsopts=-N default-language: Haskell2010