From 7beb8223236dc05759e4fe73df9bee674141cba9 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Tue, 25 May 2021 14:45:31 +0200 Subject: [PATCH] refactor(runtime): simplify async io slightly --- .../src/StuntDouble/ActorMap.hs | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/src/runtime-prototype/src/StuntDouble/ActorMap.hs b/src/runtime-prototype/src/StuntDouble/ActorMap.hs index fb8a777b..1ae2050a 100644 --- a/src/runtime-prototype/src/StuntDouble/ActorMap.hs +++ b/src/runtime-prototype/src/StuntDouble/ActorMap.hs @@ -158,14 +158,14 @@ spawn ls a s = do ------------------------------------------------------------------------ data AsyncState = AsyncState - { asyncStateAsyncIO :: Map (Async IOResult) Promise + { asyncStateAsyncIO :: Set (Async (Promise, IOResult)) , asyncStateContinuations :: Map Promise (Either IOResult Message -> Free ActorF (), LocalRef) , asyncStateAdminSend :: Map Promise (TMVar Message) } emptyAsyncState :: AsyncState -emptyAsyncState = AsyncState Map.empty Map.empty Map.empty +emptyAsyncState = AsyncState Set.empty Map.empty Map.empty madePromises :: [Action] -> Set Int madePromises = foldMap go @@ -174,8 +174,8 @@ madePromises = foldMap go go (AsyncIOAction _io (Promise i)) = Set.singleton i go OnAction {} = Set.empty -act :: EventLoopName -> [Action] -> AsyncState -> Transport IO -> IO AsyncState -act name as s0 t = foldM go s0 as +act :: EventLoopName -> [Action] -> Transport IO -> AsyncState -> IO AsyncState +act name as t s0 = foldM go s0 as where is :: Set Int is = madePromises as @@ -184,12 +184,14 @@ act name as s0 t = foldM go s0 as go s (SendAction from msg to (Promise i)) = do transportSend t (Envelope RequestKind (localToRemoteRef name from) msg to (CorrelationId i)) - return s -- XXX: make a note of when we sent so we can timeout. + -- XXX: make a note of when we sent so we can timeout. + return s go s (AsyncIOAction io p) = do - a <- async io -- XXX: Use `asyncOn` a different capability than main loop. - return (s { asyncStateAsyncIO = Map.insert a p (asyncStateAsyncIO s) }) + -- XXX: Use `asyncOn` a different capability than main loop. + a <- fmap (fmap (\x -> (p, x))) (async io) + return (s { asyncStateAsyncIO = Set.insert a (asyncStateAsyncIO s) }) go s (OnAction p@(Promise i) k lref) - | i `Set.member` is = do + | i `Set.member` is = return (s { asyncStateContinuations = Map.insert p (k, lref) (asyncStateContinuations s) }) | otherwise = @@ -197,7 +199,7 @@ act name as s0 t = foldM go s0 as data Reaction = Response Promise Message - | AsyncIOFinished (Async IOResult) IOResult + | AsyncIOFinished Promise IOResult data ReactTask = NothingToDo @@ -220,18 +222,14 @@ react (Response p msg) s = s { asyncStateAdminSend = Map.delete p (asyncStateAdminSend s) }) -react (AsyncIOFinished a result) s = - case Map.lookup a (asyncStateAsyncIO s) of - Nothing -> error "react: impossible, unknown async finished." - Just p -> case Map.lookup p (asyncStateContinuations s) of - Nothing -> - -- No continuation was registered for this async. - -- XXX: the async handler should take care for this map deletion... - (NothingToDo, s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) }) - Just (k, lref) -> (ResumeContinuation (k (Left result)) lref, - s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) - , asyncStateContinuations = Map.delete p (asyncStateContinuations s) - }) +react (AsyncIOFinished p result) s = + case Map.lookup p (asyncStateContinuations s) of + Nothing -> + -- No continuation was registered for this async. + (NothingToDo, s) + Just (k, lref) -> (ResumeContinuation (k (Left result)) lref, + s { asyncStateContinuations = + Map.delete p (asyncStateContinuations s) }) reactIO :: Reaction -> TVar AsyncState -> IO ReactTask reactIO r v = atomically (stateTVar v (react r)) @@ -296,11 +294,11 @@ handleAsyncIO ls = forever go go = atomically $ do -- XXX: Use waitAnyCatchSTM and handle exceptions appropriately here, e.g. -- by extending `AsyncIOFinished` with `Fail` and `Info`. - as <- readTVar (lsAsyncState ls) - (a, ioResult) <- waitAnySTM (Map.keys (asyncStateAsyncIO as)) - writeTBQueue (lsQueue ls) (Reaction (AsyncIOFinished a ioResult)) + s <- readTVar (lsAsyncState ls) + (a, (p, ioResult)) <- waitAnySTM (Set.toList (asyncStateAsyncIO s)) + writeTBQueue (lsQueue ls) (Reaction (AsyncIOFinished p ioResult)) writeTVar (lsAsyncState ls) - (as { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO as) }) + (s { asyncStateAsyncIO = Set.delete a (asyncStateAsyncIO s) }) handleEvents :: EventLoop -> IO () handleEvents ls = forever go @@ -317,7 +315,7 @@ handleEvent (Action a) ls = do -- XXX: Non-atomic update of `lsAsyncState`, should be fixed... -- XXX s <- readTVarIO (lsAsyncState ls) - s' <- act (lsName ls) [a] s (lsTransport ls) + s' <- act (lsName ls) [a] (lsTransport ls) s atomically (writeTVar (lsAsyncState ls) s') handleEvent (Reaction r) ls = do m <- reactIO r (lsAsyncState ls) @@ -335,7 +333,7 @@ handleEvent (Reaction r) ls = do -- XXX: Non-atomic update of `lsAsyncState`, should be fixed... -- XXX s <- readTVarIO (lsAsyncState ls) - s' <- act (lsName ls) as s (lsTransport ls) + s' <- act (lsName ls) as (lsTransport ls) s atomically (writeTVar (lsAsyncState ls) s') AdminSendResponse returnVar msg -> atomically (putTMVar returnVar msg)