From 849f2b2e6b8ea9ecb050119c9e085518478bfdd2 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Thu, 3 Jun 2021 11:21:10 +0200 Subject: [PATCH] feat(runtime): towards being able to handle client requests and responses --- .../src/StuntDouble/ActorMap.hs | 42 ++++++++++++++++--- src/runtime-prototype/src/StuntDouble/Log.hs | 4 +- .../src/StuntDouble/Reference.hs | 5 ++- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/runtime-prototype/src/StuntDouble/ActorMap.hs b/src/runtime-prototype/src/StuntDouble/ActorMap.hs index 0ea0dbda..6580c54e 100644 --- a/src/runtime-prototype/src/StuntDouble/ActorMap.hs +++ b/src/runtime-prototype/src/StuntDouble/ActorMap.hs @@ -67,6 +67,7 @@ data ActorF x | GetTime (UTCTime -> x) | Random (Double -> x) | SetTimer Time.NominalDiffTime (Promise -> x) + | ClientResponse ClientRef Message (() -> x) -- XXX: Log? -- XXX: Throw? deriving instance Functor ActorF @@ -101,6 +102,9 @@ random = Free (Random return) setTimer :: Time.NominalDiffTime -> Free ActorF Promise setTimer ndt = Free (SetTimer ndt return) +clientResponse :: ClientRef -> Message -> Free ActorF () +clientResponse cref msg = Free (ClientResponse cref msg return) + ------------------------------------------------------------------------ newtype ActorMap = ActorMap (Map LocalRef ActorData) @@ -134,6 +138,7 @@ data Action | AsyncIOAction (IO IOResult) Promise | OnAction Promise (Resolution -> Free ActorF ()) LocalRef | SetTimerAction Time.NominalDiffTime Promise + | ClientResponseAction ClientRef Message -- XXX: this doesn't really fit into action... -- XXX: what about exceptions? transactional in state, but also in actions?! actorMapTurn :: LocalRef -> Message -> Promise -> UTCTime -> Seed -> ActorMap @@ -176,6 +181,8 @@ actorMapTurn' p acc lref t seed (Free op) am = case op of actorMapTurn' p acc lref t seed' (k d) am SetTimer ndt k -> actorMapTurn' (p + 1) (SetTimerAction ndt p : acc) lref t seed (k p) am + ClientResponse cref msg k -> + actorMapTurn' p (ClientResponseAction cref msg : acc) lref t seed (k ()) am actorMapPeek :: LocalRef -> Message -> Promise -> UTCTime -> Seed -> ActorMap -> (Message, ActorMap) @@ -263,6 +270,20 @@ asend ls rref msg = do atomically (writeTBQueue (lsQueue ls) (Admin (AdminSend rref msg p returnVar))) async (atomically (takeTMVar returnVar)) +clientRequest :: EventLoop -> LocalRef -> Message -> IO (Message, Async Message) +clientRequest ls lref msg = do + cref <- atomically (stateTVar (lsNextPromise ls) (\p -> (ClientRef (unPromise p), p + 1))) + returnVar <- newEmptyTMVarIO + respVar <- newEmptyTMVarIO + -- XXX: assoc cref respVar ls + atomically (modifyTVar' (lsAsyncState ls) + (\s -> s { asyncStateClientResponses = + Map.insert cref respVar (asyncStateClientResponses s) })) + atomically (writeTBQueue (lsQueue ls) (ClientRequestEvent lref msg cref returnVar)) + a <- async (atomically (takeTMVar respVar)) + reply <- atomically (takeTMVar returnVar) + return (reply, a) + spawn :: EventLoop -> (Message -> Actor) -> State -> IO LocalRef spawn ls a s = do returnVar <- newEmptyTMVarIO @@ -275,10 +296,11 @@ quit ls = atomically (writeTBQueue (lsQueue ls) (Admin Quit)) ------------------------------------------------------------------------ data AsyncState = AsyncState - { asyncStateAsyncIO :: Map (Async IOResult) Promise - , asyncStateContinuations :: Map Promise (Resolution -> Free ActorF (), LocalRef) - , asyncStateAdminSend :: Map Promise (TMVar Message) - , asyncStateTimeouts :: Heap (Entry UTCTime (TimeoutKind, Promise)) + { asyncStateAsyncIO :: Map (Async IOResult) Promise + , asyncStateContinuations :: Map Promise (Resolution -> Free ActorF (), LocalRef) + , asyncStateAdminSend :: Map Promise (TMVar Message) + , asyncStateTimeouts :: Heap (Entry UTCTime (TimeoutKind, Promise)) + , asyncStateClientResponses :: Map ClientRef (TMVar Message) } data TimeoutKind @@ -286,7 +308,7 @@ data TimeoutKind | TimerTimeout emptyAsyncState :: AsyncState -emptyAsyncState = AsyncState Map.empty Map.empty Map.empty Heap.empty +emptyAsyncState = AsyncState Map.empty Map.empty Map.empty Heap.empty Map.empty madePromises :: [Action] -> Set Int madePromises = foldMap go @@ -295,6 +317,7 @@ madePromises = foldMap go go (AsyncIOAction _io (Promise i)) = Set.singleton i go OnAction {} = Set.empty go (SetTimerAction _ndt (Promise i)) = Set.singleton i + go ClientResponseAction {} = Set.empty act :: EventLoopName -> [Action] -> Time -> Transport IO -> AsyncState -> IO AsyncState act name as time transport s0 = foldM go s0 as @@ -327,6 +350,11 @@ act name as time transport s0 = foldM go s0 as let timeoutAfter = Time.addUTCTime ndt t return s { asyncStateTimeouts = Heap.insert (Entry timeoutAfter (TimerTimeout, p)) (asyncStateTimeouts s) } + go s (ClientResponseAction cref msg) = do + let respVar = asyncStateClientResponses s Map.! cref -- XXX: partial + atomically (putTMVar respVar msg) + return s + { asyncStateClientResponses = Map.delete cref (asyncStateClientResponses s)} data Reaction = Receive Promise Envelope @@ -385,6 +413,7 @@ data Event = Action Action | Reaction Reaction | Admin Command + | ClientRequestEvent LocalRef Message ClientRef (TMVar Message) data Command = Spawn (Message -> Actor) State (TMVar LocalRef) @@ -557,3 +586,6 @@ handleEvent (Admin cmd) ls = case cmd of pids <- readTVarIO (lsPids ls) threadDelay 100000 mapM_ cancel pids +handleEvent (ClientRequestEvent lref msg cref returnVar) ls = do + reply <- actorPokeIO ls lref msg -- XXX: cref needs to be fed in here... + atomically (putTMVar returnVar reply) diff --git a/src/runtime-prototype/src/StuntDouble/Log.hs b/src/runtime-prototype/src/StuntDouble/Log.hs index 6241f7d5..03ae3de7 100644 --- a/src/runtime-prototype/src/StuntDouble/Log.hs +++ b/src/runtime-prototype/src/StuntDouble/Log.hs @@ -11,8 +11,8 @@ newtype Log = Log [LogEntry] data LogEntry = Spawned LocalRef State | Turn TurnData - | ClientRequest - | ClientResponse + | ClientRequestEntry + | ClientResponseEntry data TurnData = TurnData { tdActor :: LocalRef diff --git a/src/runtime-prototype/src/StuntDouble/Reference.hs b/src/runtime-prototype/src/StuntDouble/Reference.hs index 26047a67..e1669c10 100644 --- a/src/runtime-prototype/src/StuntDouble/Reference.hs +++ b/src/runtime-prototype/src/StuntDouble/Reference.hs @@ -6,7 +6,7 @@ import Data.String ------------------------------------------------------------------------ -data LocalRef = LocalRef Int +newtype LocalRef = LocalRef Int deriving (Eq, Ord, Show) data RemoteRef = RemoteRef @@ -23,3 +23,6 @@ remoteToLocalRef = LocalRef . index newtype EventLoopName = EventLoopName { getEventLoopName :: String } deriving (Eq, Ord, Show, IsString) + +newtype ClientRef = ClientRef Int + deriving (Eq, Ord, Show)