Skip to content

Commit

Permalink
feat(runtime): make async i/o test pass
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed May 11, 2021
1 parent 89a1a97 commit 0c88812
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/runtime-prototype/src/StuntDouble/Actor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ data Cont a
-- | ...

data IOResult = Unit | String String
deriving Show
deriving (Eq, Ord, Show)

data ActorF x
= Call LocalRef Message (Message -> x)
Expand Down
14 changes: 12 additions & 2 deletions src/runtime-prototype/src/StuntDouble/EventLoop.hs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ handleReceive e ls = case envelopeKind e of
installContinuation async (envelopeReceiver e) (envelopeCorrelationId e) k ls
LaterIO async k -> do
say ls "installing i/o continuation"
-- installIOContinuation async ...
installIOContinuation async (envelopeReceiver e) (envelopeCorrelationId e) k ls

ResponseKind -> do
emit ls (LogReceive (envelopeSender e) (envelopeReceiver e) (envelopeMessage e)
Expand All @@ -158,7 +158,17 @@ handleReceive e ls = case envelopeKind e of
undefined

handleAsyncIODone :: Async IOResult -> IOResult -> LoopState -> IO ()
handleAsyncIODone a r ls = undefined
handleAsyncIODone a r ls = do
(self, corrId, k) <- recallIOContinuation a ls
emit ls (LogAsyncIOFinish corrId r)
cont <- runActor ls self (k r)
case cont of
Now replyMsg -> do
resps <- readTVarIO (loopStateResponses ls)
let respVar = resps Map.! corrId
atomically (putTMVar respVar replyMsg)
Later {} -> do
undefined

runActor :: LoopState -> RemoteRef -> Free ActorF a -> IO a
runActor ls self = iterM go return
Expand Down
1 change: 1 addition & 0 deletions src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ data LogEntry
| LogRequestStart RemoteRef RemoteRef Message CorrelationId EventLoopName
| LogRequestFinish CorrelationId Message EventLoopName
| LogComment String EventLoopName
| LogAsyncIOFinish CorrelationId IOResult EventLoopName
deriving (Eq, Show)

isComment :: LogEntry -> Bool
Expand Down
17 changes: 16 additions & 1 deletion src/runtime-prototype/src/StuntDouble/EventLoop/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ data LoopState = LoopState
, loopStateQueue :: TBQueue Event
, loopStateActors :: TVar (Map LocalRef (Message -> Actor)) -- XXX: Only changed by main loop, so no need for STM?
, loopStateIOAsyncs :: TVar [Async IOResult]
, loopStateIOHandlers :: TVar (Map (Async IOResult) (LocalRef, IOResult -> Actor))
, loopStateIOContinuations :: TVar (Map (Async IOResult)
(RemoteRef, CorrelationId, IOResult -> Actor))
, loopStateTransport :: Transport IO -- Will not change once created, so doesn't need STM?
, loopStateNextCorrelationId :: TVar CorrelationId
, loopStateResponses :: TVar (Map CorrelationId (TMVar Message))
Expand Down Expand Up @@ -48,6 +49,20 @@ recallContinuation a ls = do
return ks
return (Map.lookup a ks)

installIOContinuation :: Async IOResult -> RemoteRef -> CorrelationId
-> (IOResult -> Actor) -> LoopState -> IO ()
installIOContinuation a self corrId k ls = atomically $
modifyTVar' (loopStateIOContinuations ls) (Map.insert a (self, corrId, k))

recallIOContinuation :: Async IOResult -> LoopState
-> IO (RemoteRef, CorrelationId, IOResult -> Actor)
recallIOContinuation a ls = do
ks <- atomically $ do
ks <- readTVar (loopStateIOContinuations ls)
writeTVar (loopStateIOContinuations ls) (Map.delete a ks)
return ks
return (ks Map.! a)

correlateAsync :: CorrelationId -> Async Message -> LoopState -> IO ()
correlateAsync cid a ls = atomically $
modifyTVar' (loopStateWaitingAsyncs ls) (Map.insert cid a)
Expand Down
12 changes: 9 additions & 3 deletions src/runtime-prototype/test/StuntDouble/EventLoopTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ testActor2 rref (Message "init") = do

testActor3 :: Message -> Actor
testActor3 (Message "init") = do
a <- asyncIO (threadDelay 1000000 >> return (String "result"))
a <- asyncIO (threadDelay 300000 >> return (String "result"))
return (LaterIO a (\(String result) -> return (Now (Message ("Got: " ++ result)))))

eventLoopA :: String -> EventLoopName
Expand Down Expand Up @@ -88,10 +88,9 @@ unit_sendLater = do
reply <- wait a
reply @?= Message "Got: bye!")
(\(e :: SomeException) -> dump el1 >> dump el2 >> eventLog el1 >>= mapM_ print)
-}

dump el1
dump el2
-}

quit el1
quit el2
Expand All @@ -101,8 +100,15 @@ unit_asyncIO = do
elog <- emptyEventLog
let ev = eventLoopA "asyncIO"
el <- makeEventLoop "/tmp" ev elog
lref <- spawn el testActor3
a <- send el (localToRemoteRef ev lref) (Message "init")
reply <- wait a
reply @?= Message "Got: result"
quit el
{-
catch (do lref <- spawn el testActor3
a <- send el (localToRemoteRef ev lref) (Message "init")
reply <- wait a
reply @?= Message "Got: result")
(\(e :: SomeException) -> dump el >> eventLog el >>= mapM_ print >> print e)
-}

0 comments on commit 0c88812

Please sign in to comment.