From 0c888123fafef4226b16826462592db572719dd8 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Tue, 11 May 2021 09:38:26 +0200 Subject: [PATCH] feat(runtime): make async i/o test pass --- src/runtime-prototype/src/StuntDouble/Actor.hs | 2 +- .../src/StuntDouble/EventLoop.hs | 14 ++++++++++++-- .../src/StuntDouble/EventLoop/Event.hs | 1 + .../src/StuntDouble/EventLoop/State.hs | 17 ++++++++++++++++- .../test/StuntDouble/EventLoopTest.hs | 12 +++++++++--- 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/runtime-prototype/src/StuntDouble/Actor.hs b/src/runtime-prototype/src/StuntDouble/Actor.hs index 0747dba2..7ebc9996 100644 --- a/src/runtime-prototype/src/StuntDouble/Actor.hs +++ b/src/runtime-prototype/src/StuntDouble/Actor.hs @@ -30,7 +30,7 @@ data Cont a -- | ... data IOResult = Unit | String String - deriving Show + deriving (Eq, Ord, Show) data ActorF x = Call LocalRef Message (Message -> x) diff --git a/src/runtime-prototype/src/StuntDouble/EventLoop.hs b/src/runtime-prototype/src/StuntDouble/EventLoop.hs index bc1375b7..9d1b68ac 100644 --- a/src/runtime-prototype/src/StuntDouble/EventLoop.hs +++ b/src/runtime-prototype/src/StuntDouble/EventLoop.hs @@ -132,7 +132,7 @@ handleReceive e ls = case envelopeKind e of installContinuation async (envelopeReceiver e) (envelopeCorrelationId e) k ls LaterIO async k -> do say ls "installing i/o continuation" - -- installIOContinuation async ... + installIOContinuation async (envelopeReceiver e) (envelopeCorrelationId e) k ls ResponseKind -> do emit ls (LogReceive (envelopeSender e) (envelopeReceiver e) (envelopeMessage e) @@ -158,7 +158,17 @@ handleReceive e ls = case envelopeKind e of undefined handleAsyncIODone :: Async IOResult -> IOResult -> LoopState -> IO () -handleAsyncIODone a r ls = undefined +handleAsyncIODone a r ls = do + (self, corrId, k) <- recallIOContinuation a ls + emit ls (LogAsyncIOFinish corrId r) + cont <- runActor ls self (k r) + case cont of + Now replyMsg -> do + resps <- readTVarIO (loopStateResponses ls) + let respVar = resps Map.! corrId + atomically (putTMVar respVar replyMsg) + Later {} -> do + undefined runActor :: LoopState -> RemoteRef -> Free ActorF a -> IO a runActor ls self = iterM go return diff --git a/src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs b/src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs index 1ee03805..70db3f3d 100644 --- a/src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs +++ b/src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs @@ -65,6 +65,7 @@ data LogEntry | LogRequestStart RemoteRef RemoteRef Message CorrelationId EventLoopName | LogRequestFinish CorrelationId Message EventLoopName | LogComment String EventLoopName + | LogAsyncIOFinish CorrelationId IOResult EventLoopName deriving (Eq, Show) isComment :: LogEntry -> Bool diff --git a/src/runtime-prototype/src/StuntDouble/EventLoop/State.hs b/src/runtime-prototype/src/StuntDouble/EventLoop/State.hs index 4e16b010..0f5ef920 100644 --- a/src/runtime-prototype/src/StuntDouble/EventLoop/State.hs +++ b/src/runtime-prototype/src/StuntDouble/EventLoop/State.hs @@ -20,7 +20,8 @@ data LoopState = LoopState , loopStateQueue :: TBQueue Event , loopStateActors :: TVar (Map LocalRef (Message -> Actor)) -- XXX: Only changed by main loop, so no need for STM? , loopStateIOAsyncs :: TVar [Async IOResult] - , loopStateIOHandlers :: TVar (Map (Async IOResult) (LocalRef, IOResult -> Actor)) + , loopStateIOContinuations :: TVar (Map (Async IOResult) + (RemoteRef, CorrelationId, IOResult -> Actor)) , loopStateTransport :: Transport IO -- Will not change once created, so doesn't need STM? , loopStateNextCorrelationId :: TVar CorrelationId , loopStateResponses :: TVar (Map CorrelationId (TMVar Message)) @@ -48,6 +49,20 @@ recallContinuation a ls = do return ks return (Map.lookup a ks) +installIOContinuation :: Async IOResult -> RemoteRef -> CorrelationId + -> (IOResult -> Actor) -> LoopState -> IO () +installIOContinuation a self corrId k ls = atomically $ + modifyTVar' (loopStateIOContinuations ls) (Map.insert a (self, corrId, k)) + +recallIOContinuation :: Async IOResult -> LoopState + -> IO (RemoteRef, CorrelationId, IOResult -> Actor) +recallIOContinuation a ls = do + ks <- atomically $ do + ks <- readTVar (loopStateIOContinuations ls) + writeTVar (loopStateIOContinuations ls) (Map.delete a ks) + return ks + return (ks Map.! a) + correlateAsync :: CorrelationId -> Async Message -> LoopState -> IO () correlateAsync cid a ls = atomically $ modifyTVar' (loopStateWaitingAsyncs ls) (Map.insert cid a) diff --git a/src/runtime-prototype/test/StuntDouble/EventLoopTest.hs b/src/runtime-prototype/test/StuntDouble/EventLoopTest.hs index 0056ffbb..635794c9 100644 --- a/src/runtime-prototype/test/StuntDouble/EventLoopTest.hs +++ b/src/runtime-prototype/test/StuntDouble/EventLoopTest.hs @@ -21,7 +21,7 @@ testActor2 rref (Message "init") = do testActor3 :: Message -> Actor testActor3 (Message "init") = do - a <- asyncIO (threadDelay 1000000 >> return (String "result")) + a <- asyncIO (threadDelay 300000 >> return (String "result")) return (LaterIO a (\(String result) -> return (Now (Message ("Got: " ++ result))))) eventLoopA :: String -> EventLoopName @@ -88,10 +88,9 @@ unit_sendLater = do reply <- wait a reply @?= Message "Got: bye!") (\(e :: SomeException) -> dump el1 >> dump el2 >> eventLog el1 >>= mapM_ print) - -} - dump el1 dump el2 + -} quit el1 quit el2 @@ -101,8 +100,15 @@ unit_asyncIO = do elog <- emptyEventLog let ev = eventLoopA "asyncIO" el <- makeEventLoop "/tmp" ev elog + lref <- spawn el testActor3 + a <- send el (localToRemoteRef ev lref) (Message "init") + reply <- wait a + reply @?= Message "Got: result" + quit el + {- catch (do lref <- spawn el testActor3 a <- send el (localToRemoteRef ev lref) (Message "init") reply <- wait a reply @?= Message "Got: result") (\(e :: SomeException) -> dump el >> eventLog el >>= mapM_ print >> print e) +-}