Skip to content

Commit

Permalink
feat(runtime): make client request for "CreateTest" do db I/O and res…
Browse files Browse the repository at this point in the history
…pond
  • Loading branch information
symbiont-stevan-andjelkovic committed Sep 10, 2021
1 parent c65312d commit 437ebd6
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 30 deletions.
33 changes: 28 additions & 5 deletions src/runtime-prototype/app/Main.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,38 @@
{-# LANGUAGE ScopedTypeVariables #-}

module Main where

import StuntDouble
import Control.Concurrent.Async (cancel)
import Control.Exception (throwIO)
import System.Environment (getEnv)
import System.FilePath ((</>))
import System.IO.Error (catchIOError, isDoesNotExistError)

import Scheduler
import StuntDouble

------------------------------------------------------------------------

getDbPath :: IO FilePath
getDbPath = do
getEnv "DETSYS_DB"
`catchIOError` \(e :: catchIOError) ->
if isDoesNotExistError e
then do
home <- getEnv "HOME"
return (home </> ".detsys.db")
else throwIO e

main :: IO ()
main = do
let executorPort = 3004
executorRef = RemoteRef ("http://localhost:" ++ show executorPort) 0
el <- makeEventLoop realTime (makeSeed 0) HttpSync (EventLoopName "scheduler")
lref <- spawn el (fakeScheduler executorRef) initState
putStrLn "Scheduler is running..."
waitForEventLoopQuit el
schedulerPort = 3005
fp <- getDbPath
el <- makeEventLoop realTime (makeSeed 0) HttpSync (RealDisk fp) (EventLoopName "scheduler")
now <- getCurrentTime realTime
lref <- spawn el (fakeScheduler executorRef) (initState now (makeSeed 0))
withHttpFrontend el lref schedulerPort $ \pid -> do
putStrLn ("Scheduler is listening on port: " ++ show schedulerPort)
waitForEventLoopQuit el
cancel pid
36 changes: 26 additions & 10 deletions src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE DeriveGeneric #-}

module Scheduler where

import Control.Concurrent.Async
import Control.Exception
import Data.Aeson
import GHC.Generics (Generic)
import Data.Heap (Entry(Entry), Heap)
import qualified Data.Heap as Heap
import Data.Text (Text)
import qualified Data.Text.Encoding as Text
import qualified Data.Text as Text
import Data.Time (UTCTime)
import Database.SQLite.Simple
Expand All @@ -15,10 +19,20 @@ import StuntDouble

------------------------------------------------------------------------

data SchedulerCommand = SchedulerCommand
data SchedulerEvent = SchedulerEvent
{ kind :: String
, event :: String
, args :: Data.Aeson.Value
, to :: String
, from :: String
, at :: UTCTime
}
deriving (Generic, Eq, Ord, Show)

instance FromJSON SchedulerEvent where

data SchedulerState = SchedulerState
{ heap :: Heap (Entry UTCTime SchedulerCommand)
{ heap :: Heap (Entry UTCTime SchedulerEvent)
, time :: UTCTime
, seed :: Seed
}
Expand All @@ -30,19 +44,21 @@ initState t s = SchedulerState
, seed = s
}

data Agenda = Agenda Int -- XXX
data Agenda = Agenda [SchedulerEvent]

instance ParseRow Agenda where
parseRow [FInt i] = Just (Agenda i)
parseRow _ = Nothing
parseRow [FText t] = case eitherDecodeStrict (Text.encodeUtf8 t) of
Right es -> Just (Agenda es)
Left err -> error (show err)
parseRow x = error (show x)

fakeScheduler :: RemoteRef -> Message -> Actor SchedulerState
fakeScheduler executorRef (ClientRequest' "CreateTest" [SInt tid] cid) = Actor $ do
-- load from db. XXX: need to extend IO module to be able to return Datatype?
p <- asyncIO (IOQuery "SELECT agenda FROM test_info WHERE test_id = :tid" [":tid" := tid])
on p (\(IOResultR (IORows entries)) -> case parseRows entries of
Just [Agenda i] -> undefined)
undefined
on p (\(IOResultR (IORows rs)) -> case parseRows rs of
Nothing -> clientResponse cid (InternalMessage "parse error")
Just [Agenda es] -> clientResponse cid (InternalMessage (show es)))
return (InternalMessage "ok")
fakeScheduler executorRef (ClientRequest "Start" cid) = Actor $ do
-- pop agenda end send to executorRef
r <- Heap.uncons . heap <$> get
Expand All @@ -56,7 +72,7 @@ fakeScheduler executorRef (ClientRequest "Start" cid) = Actor $ do
undefined
Nothing -> return (InternalMessage "Done") -- XXX: reply to client id?!
where
prettyCommand :: SchedulerCommand -> String
prettyCommand :: SchedulerEvent -> String
prettyCommand = undefined
fakeScheduler executorRef msg@(InternalMessage "Ack") = Actor $ do
undefined
Expand Down
14 changes: 8 additions & 6 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -584,19 +584,21 @@ spawnIOWorkers (ThreadPoolOfSize n) ls
error "spawnIOWorkers: thread pool size is bigger than the available CPU capabilities"
mapM (\c -> asyncOn c (asyncIOWorker ls)) [ i | i <- [0..n], i /= cap ]

makeEventLoop :: Time -> Seed -> TransportKind -> EventLoopName -> IO EventLoop
makeEventLoop :: Time -> Seed -> TransportKind -> DiskKind -> EventLoopName -> IO EventLoop
makeEventLoop = makeEventLoopThreaded SingleThreaded NoThreadPool

makeEventLoopThreaded :: Threaded -> ThreadPool -> Time -> Seed -> TransportKind
makeEventLoopThreaded :: Threaded -> ThreadPool -> Time -> Seed -> TransportKind -> DiskKind
-> EventLoopName -> IO EventLoop
makeEventLoopThreaded threaded threadpool time seed tk name = do
makeEventLoopThreaded threaded threadpool time seed tk dk name = do
t <- case tk of
NamedPipe fp -> namedPipeTransport fp name
Http port -> httpTransport port
HttpSync -> httpSyncTransport
Stm -> stmTransport
disk <- fakeDisk
ls <- initLoopState name time seed t disk
d <- case dk of
FakeDisk -> fakeDisk
RealDisk fp -> realDisk fp
ls <- initLoopState name time seed t d
workerPids <- spawnIOWorkers threadpool ls
pids <- case threaded of
SingleThreaded ->
Expand Down Expand Up @@ -798,7 +800,7 @@ handleEvent (Admin cmd) ls = case cmd of
threadDelay 100000
mapM_ cancel pids
handleEvent (ClientRequestEvent lref msg cref returnVar) ls = do
reply <- actorPokeIO ls lref (ClientRequest (getMessage msg) cref)
reply <- actorPokeIO ls lref (ClientRequest' (getMessage msg) (getArgs msg) cref)
atomically (putTMVar returnVar reply)

waitForEventLoopQuit :: EventLoop -> IO ()
Expand Down
24 changes: 16 additions & 8 deletions src/runtime-prototype/src/StuntDouble/IO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
module StuntDouble.IO where

import Control.Monad
import Data.String
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.Hashable
Expand Down Expand Up @@ -37,8 +38,8 @@ data IOOp
| IODeletes [Key]
| IOIterate Key Key

| IOExecute Query [NamedParam]
| IOQuery Query [NamedParam]
| IOExecute String [NamedParam]
| IOQuery String [NamedParam]

| IOReturn IOResult

Expand All @@ -52,8 +53,8 @@ data Disk m = Disk
, ioIterate :: Maybe Key -> Maybe Key -> m [(Key, Value)]

-- SQLite.
, ioExecute :: Query -> [NamedParam] -> m ()
, ioQuery :: Query -> [NamedParam] -> m [[FieldValue]]
, ioExecute :: String -> [NamedParam] -> m ()
, ioQuery :: String -> [NamedParam] -> m [[FieldValue]]
}

data FieldValue
Expand Down Expand Up @@ -106,8 +107,8 @@ fakeDisk = do
slowFakeDisk :: IO (Disk IO)
slowFakeDisk = undefined

realSqlite :: FilePath -> IO (Disk IO)
realSqlite fp = do
realDisk :: FilePath -> IO (Disk IO)
realDisk fp = do
conn <- open fp
return Disk
{ ioGet = undefined
Expand All @@ -117,19 +118,26 @@ realSqlite fp = do
, ioDeletes = undefined
, ioIterate = undefined

, ioExecute = executeNamed conn
, ioQuery = queryNamed conn
, ioExecute = \q -> executeNamed conn (fromString q)
, ioQuery = \q -> queryNamed conn (fromString q)
}

class ParseRow a where
parseRow :: [FieldValue] -> Maybe a

instance ParseRow [FieldValue] where
parseRow :: [FieldValue] -> Maybe [FieldValue]
parseRow = Just

instance ParseRow FieldValue where
parseRow :: [FieldValue] -> Maybe FieldValue
parseRow [fv] = Just fv
parseRow _ = Nothing

parseRows :: ParseRow a => [[FieldValue]] -> Maybe [a]
parseRows = sequence . map parseRow


data DiskKind
= FakeDisk
| RealDisk FilePath
6 changes: 6 additions & 0 deletions src/runtime-prototype/src/StuntDouble/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ getMessage (InternalMessage msg) = msg
getMessage (InternalMessage' msg _args) = msg
getMessage (ClientRequest msg _cid) = msg
getMessage (ClientRequest' msg _args _cid) = msg

getArgs :: Message -> Args
getArgs InternalMessage {} = []
getArgs (InternalMessage' _msg args) = args
getArgs ClientRequest {} = []
getArgs (ClientRequest' _msg args _cref) = args
4 changes: 3 additions & 1 deletion src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ test-suite test
executable scheduler
hs-source-dirs: app/
main-is: Main.hs
build-depends: base ==4.13.*,
build-depends: async,
base ==4.13.*,
filepath,
stunt-double
ghc-options: -threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010
Expand Down

0 comments on commit 437ebd6

Please sign in to comment.