From a9ba3836b2f0dc2e2687f19ee1f6c465ea5d21d8 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Tue, 25 May 2021 12:29:28 +0200 Subject: [PATCH] refactor(runtime): finish admin send --- .../src/StuntDouble/ActorMap.hs | 157 +++++++++++------- 1 file changed, 98 insertions(+), 59 deletions(-) diff --git a/src/runtime-prototype/src/StuntDouble/ActorMap.hs b/src/runtime-prototype/src/StuntDouble/ActorMap.hs index 0938cc6c..fb8a777b 100644 --- a/src/runtime-prototype/src/StuntDouble/ActorMap.hs +++ b/src/runtime-prototype/src/StuntDouble/ActorMap.hs @@ -33,6 +33,9 @@ import StuntDouble.Reference newtype Promise = Promise Int deriving (Eq, Ord, Num) +unPromise :: Promise -> Int +unPromise (Promise i) = i + newtype Actor = Actor { unActor :: Free ActorF Message } data ActorF x @@ -72,97 +75,97 @@ data Action | OnAction Promise (Either IOResult Message -> Free ActorF ()) LocalRef -- XXX: what about exceptions? transactional in state, but also in actions?! -actorMapTurn :: LocalRef -> Message -> ActorMap -> ((Message, ActorMap, [Action]), ActorMap) +actorMapTurn :: LocalRef -> Message -> ActorMap + -> ((Message, Promise, ActorMap, [Action]), ActorMap) actorMapTurn lref0 msg0 am0 = let a = fst (actorMapUnsafeLookup lref0 am0) in -- XXX: Promises should not always start from 0, or they will overlap each -- other if more than one turn happens... - (actorMapTurn' 0 [] lref0 (unActor (a msg0)) am0, am0) + (actorMapTurn' (Promise 0) [] lref0 (unActor (a msg0)) am0, am0) -actorMapTurn' :: Int -> [Action] -> LocalRef -> Free ActorF a -> ActorMap - -> (a, ActorMap, [Action]) -actorMapTurn' _pc acc _lref (Pure msg) am = (msg, am, reverse acc) -actorMapTurn' pc acc lref (Free op) am = case op of +actorMapTurn' :: Promise -> [Action] -> LocalRef -> Free ActorF a -> ActorMap + -> (a, Promise, ActorMap, [Action]) +actorMapTurn' p acc _lref (Pure msg) am = (msg, p, am, reverse acc) +actorMapTurn' p acc lref (Free op) am = case op of Invoke lref' msg k -> let a' = fst (actorMapUnsafeLookup lref' am) - (reply, am', acc') = actorMapTurn' pc acc lref' (unActor (a' msg)) am + (reply, p', am', acc') = actorMapTurn' p acc lref' (unActor (a' msg)) am in - actorMapTurn' pc acc' lref (k reply) am' + actorMapTurn' p' acc' lref (k reply) am' Send rref msg k -> - let - p = Promise pc - in - actorMapTurn' (pc + 1) (SendAction lref msg rref p : acc) lref (k p) am + actorMapTurn' (p + 1) (SendAction lref msg rref p : acc) lref (k p) am AsyncIO io k -> - let - p = Promise pc - in - actorMapTurn' (pc + 1) (AsyncIOAction io p : acc) lref (k p) am - On p c k -> - actorMapTurn' pc (OnAction p c lref : acc) lref (k ()) am + actorMapTurn' (p + 1) (AsyncIOAction io p : acc) lref (k p) am + On q c k -> + actorMapTurn' p (OnAction q c lref : acc) lref (k ()) am Get k -> - actorMapTurn' pc acc lref (k (snd (actorMapUnsafeLookup lref am))) am + actorMapTurn' p acc lref (k (snd (actorMapUnsafeLookup lref am))) am Put s' k -> case am of ActorMap m -> - actorMapTurn' pc acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m)) + actorMapTurn' p acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m)) actorMapPeek :: LocalRef -> Message -> ActorMap -> (Message, ActorMap) actorMapPeek lref msg am = let - ((reply, _am', _as), _am) = actorMapTurn lref msg am + ((reply, _p, _am', _as), _am) = actorMapTurn lref msg am in (reply, am) actorMapPoke :: LocalRef -> Message -> ActorMap -> (Message, ActorMap) actorMapPoke lref msg am = let - ((reply, am', _as), _am) = actorMapTurn lref msg am + ((reply, _p, am', _as), _am) = actorMapTurn lref msg am in (reply, am') ------------------------------------------------------------------------ -type ActorMapTVar = TVar ActorMap - -makeActorMapIO :: IO ActorMapTVar +makeActorMapIO :: IO (TVar ActorMap) makeActorMapIO = newTVarIO emptyActorMap -actorMapSpawnIO :: (Message -> Actor) -> State -> ActorMapTVar -> IO LocalRef +actorMapSpawnIO :: (Message -> Actor) -> State -> TVar ActorMap -> IO LocalRef actorMapSpawnIO a s am = atomically (stateTVar am (actorMapSpawn a s)) -actorMapTurnIO :: LocalRef -> Message -> ActorMapTVar -> IO (Message, ActorMap, [Action]) +actorMapTurnIO :: LocalRef -> Message -> TVar ActorMap + -> IO (Message, Promise, ActorMap, [Action]) actorMapTurnIO lref msg am = atomically (stateTVar am (actorMapTurn lref msg)) -actorMapPeekIO :: LocalRef -> Message -> ActorMapTVar -> IO Message +actorMapPeekIO :: LocalRef -> Message -> TVar ActorMap -> IO Message actorMapPeekIO lref msg am = atomically (stateTVar am (actorMapPeek lref msg)) -actorMapPokeIO :: LocalRef -> Message -> ActorMapTVar -> IO Message +actorMapPokeIO :: LocalRef -> Message -> TVar ActorMap -> IO Message actorMapPokeIO lref msg am = atomically (stateTVar am (actorMapPoke lref msg)) ------------------------------------------------------------------------ -devSend :: {- EventLoopRef-} RemoteRef -> Message -> IO (Async Message) -devSend = undefined - -- p <- createPromise - -- v <- newEmptyTMVar - -- insertDeveloperSend p v - -- async (atomically (takeTMVar v)) +send :: EventLoop -> RemoteRef -> Message -> IO (Async Message) +send ls rref msg = do + p <- atomically (stateTVar (lsNextPromise ls) (\p -> (p, p + 1))) + returnVar <- newEmptyTMVarIO + atomically (writeTBQueue (lsQueue ls) (Admin (AdminSend rref msg p returnVar))) + async (atomically (takeTMVar returnVar)) + +spawn :: EventLoop -> (Message -> Actor) -> State -> IO LocalRef +spawn ls a s = do + returnVar <- newEmptyTMVarIO + atomically (writeTBQueue (lsQueue ls) (Admin (Spawn a s returnVar))) + atomically (takeTMVar returnVar) ------------------------------------------------------------------------ data AsyncState = AsyncState - { asyncStateAsyncIO :: Map (Async IOResult) Promise - , asyncStateContinuations :: Map Promise (Either IOResult Message -> Free ActorF (), - LocalRef) - -- , asyncStateDeveloperSend :: Map Promise (TMVar Message) + { asyncStateAsyncIO :: Map (Async IOResult) Promise + , asyncStateContinuations :: Map Promise (Either IOResult Message -> Free ActorF (), + LocalRef) + , asyncStateAdminSend :: Map Promise (TMVar Message) } emptyAsyncState :: AsyncState -emptyAsyncState = AsyncState Map.empty Map.empty +emptyAsyncState = AsyncState Map.empty Map.empty Map.empty madePromises :: [Action] -> Set Int madePromises = foldMap go @@ -196,17 +199,27 @@ data Reaction = Response Promise Message | AsyncIOFinished (Async IOResult) IOResult -react :: Reaction -> AsyncState -> (Maybe (Free ActorF (), LocalRef), AsyncState) +data ReactTask + = NothingToDo + | ResumeContinuation (Free ActorF ()) LocalRef + | AdminSendResponse (TMVar Message) Message + +react :: Reaction -> AsyncState -> (ReactTask, AsyncState) react (Response p msg) s = case Map.lookup p (asyncStateContinuations s) of - Just (k, lref) -> (Just (k (Right msg), lref), + Just (k, lref) -> (ResumeContinuation (k (Right msg)) lref, s { asyncStateContinuations = Map.delete p (asyncStateContinuations s) }) Nothing -> - -- XXX: Map.lookup p (developerSend s)? + case Map.lookup p (asyncStateAdminSend s) of + Nothing -> + -- We got a response for something we are not (longer) waiting for. + (NothingToDo, s) + Just returnVar -> + (AdminSendResponse returnVar msg, + s { asyncStateAdminSend = + Map.delete p (asyncStateAdminSend s) }) - -- We got a response for something we are not (longer) waiting for. - (Nothing, s) react (AsyncIOFinished a result) s = case Map.lookup a (asyncStateAsyncIO s) of Nothing -> error "react: impossible, unknown async finished." @@ -214,13 +227,13 @@ react (AsyncIOFinished a result) s = Nothing -> -- No continuation was registered for this async. -- XXX: the async handler should take care for this map deletion... - (Nothing, s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) }) - Just (k, lref) -> (Just (k (Left result), lref), + (NothingToDo, s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) }) + Just (k, lref) -> (ResumeContinuation (k (Left result)) lref, s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) , asyncStateContinuations = Map.delete p (asyncStateContinuations s) }) -reactIO :: Reaction -> TVar AsyncState -> IO (Maybe (Free ActorF (), LocalRef)) +reactIO :: Reaction -> TVar AsyncState -> IO ReactTask reactIO r v = atomically (stateTVar v (react r)) ------------------------------------------------------------------------ @@ -228,14 +241,21 @@ reactIO r v = atomically (stateTVar v (react r)) data Event = Action Action | Reaction Reaction + | Admin Command + +data Command + = Spawn (Message -> Actor) State (TMVar LocalRef) + | AdminInvoke LocalRef Message (TMVar Message) + | AdminSend RemoteRef Message Promise (TMVar Message) data EventLoop = EventLoop - { lsName :: EventLoopName - , lsActorMap :: TVar ActorMap - , lsAsyncState :: TVar AsyncState - , lsQueue :: TBQueue Event - , lsTransport :: Transport IO - , lsPids :: TVar [Async ()] + { lsName :: EventLoopName + , lsActorMap :: TVar ActorMap + , lsAsyncState :: TVar AsyncState + , lsQueue :: TBQueue Event + , lsTransport :: Transport IO + , lsPids :: TVar [Async ()] + , lsNextPromise :: TVar Promise } initLoopState :: EventLoopName -> Transport IO -> IO EventLoop @@ -247,6 +267,7 @@ initLoopState name t = <*> newTBQueueIO 128 <*> pure t <*> newTVarIO [] + <*> newTVarIO (Promise 0) makeEventLoop :: TransportKind -> EventLoopName -> IO EventLoop makeEventLoop tk name = do @@ -301,14 +322,14 @@ handleEvent (Action a) ls = do handleEvent (Reaction r) ls = do m <- reactIO r (lsAsyncState ls) case m of - Nothing -> return () - Just (a, lref) -> do + NothingToDo -> return () + ResumeContinuation a lref -> do as <- atomically $ do am <- readTVar (lsActorMap ls) - let ((), am', as) = actorMapTurn' 0 [] lref a am -- XXX: promise counter - -- should not always be - -- 0... + p <- readTVar (lsNextPromise ls) + let ((), p', am', as) = actorMapTurn' p [] lref a am writeTVar (lsActorMap ls) am' + writeTVar (lsNextPromise ls) p' return as -- XXX: -- XXX: Non-atomic update of `lsAsyncState`, should be fixed... @@ -316,3 +337,21 @@ handleEvent (Reaction r) ls = do s <- readTVarIO (lsAsyncState ls) s' <- act (lsName ls) as s (lsTransport ls) atomically (writeTVar (lsAsyncState ls) s') + AdminSendResponse returnVar msg -> + atomically (putTMVar returnVar msg) +handleEvent (Admin cmd) ls = case cmd of + Spawn a s returnVar -> do + lref <- actorMapSpawnIO a s (lsActorMap ls) + atomically (putTMVar returnVar lref) + AdminInvoke lref msg returnVar -> do + reply <- actorMapPokeIO lref msg (lsActorMap ls) + atomically (putTMVar returnVar reply) + AdminSend rref msg p returnVar -> do + -- XXX: is the `from` field in `Envelope` ever used? If it can be removed + -- then this `dummyAdminRef` hack can be removed too... + let dummyAdminRef = localToRemoteRef (lsName ls) (LocalRef (-1)) + transportSend (lsTransport ls) + (Envelope RequestKind dummyAdminRef msg rref (CorrelationId (unPromise p))) + atomically (modifyTVar' (lsAsyncState ls) + (\as -> as { asyncStateAdminSend = + Map.insert p returnVar (asyncStateAdminSend as) }))