Skip to content

Commit

Permalink
feat(runtime): more debug info
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Apr 30, 2021
1 parent 38c56c8 commit 3247b8d
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 15 deletions.
48 changes: 35 additions & 13 deletions src/runtime-prototype/src/StuntDouble/EventLoop.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ handleCommand (Invoke lref msg respVar) ls = do
Now reply <- runActor ls (actor msg)
atomically (putTMVar respVar reply)
handleCommand (Send rref msg respVar) ls = do
say ls ("Send: " ++ show rref ++ " " ++ show msg)
(corrId, respTMVar) <- atomically $ do
corrId <- readTVar (loopStateNextCorrelationId ls)
modifyTVar' (loopStateNextCorrelationId ls) succ
Expand All @@ -98,6 +97,8 @@ handleCommand (Send rref msg respVar) ls = do
resp <- takeTMVar respTMVar -- XXX: timeout?
modifyTVar' (loopStateResponses ls) (Map.delete corrId)
return resp
say ls ("correlating send with `" ++ show corrId ++ "'")
correlateAsync corrId a ls
atomically (putTMVar respVar a)
say ls "Done with send"
handleCommand Quit ls = do
Expand All @@ -107,19 +108,27 @@ handleCommand Quit ls = do

handleResponse :: Response -> LoopState -> IO ()
handleResponse (Reply respTMVar e) ls
| envelopeReceiver e == dummyDeveloperRef ls =
| envelopeReceiver e == dummyDeveloperRef ls = do
say ls ("handleResponse: Reply: " ++ show e)
atomically (putTMVar respTMVar (envelopeMessage e))
| otherwise = do
say ls (show e)
say ls ("handleResponse: Reply: otherwise: " ++ show e)
atomically (putTMVar respTMVar (envelopeMessage e))
handleResponse (AsyncReply respTMVar a e) ls = do
say ls ("handleResponse: AsyncReply: " ++ show e)
say ls "recalling continuation"
k <- recallContinuation a ls
cont <- runActor ls (k (envelopeMessage e))
case cont of
Now replyMsg -> do
say ls ("Now: " ++ show replyMsg)
atomically (putTMVar respTMVar replyMsg)
Just corrId <- reverseCorrelateAsync a ls
say ls ("Now: CorrId: " ++ show corrId)
resps <- readTVarIO (loopStateResponses ls)
let respVar = resps Map.! corrId
say ls ("Now: loopStateResponses.keys = " ++ show (Map.keys resps))
-- atomically (putTMVar respTMVar replyMsg)
atomically (putTMVar respVar replyMsg)
say ls "Done!"
Later {} -> do
say ls "Later"
Expand All @@ -142,7 +151,9 @@ handleReceive (Request e) ls = do
Just actor -> do
cont <- runActor ls (actor (envelopeMessage e))
case cont of
Now replyMsg -> transportSend (loopStateTransport ls) (reply e replyMsg)
Now replyMsg -> do
say ls "no continuation"
transportSend (loopStateTransport ls) (reply e replyMsg)
Later async k -> do
-- The actor has to talk to other remote actors before being able to reply.
say ls "installing continuation"
Expand Down Expand Up @@ -174,7 +185,7 @@ runActor ls = iterM go return
return resp
-- Associate the correlation id with the `Async` `a`, so that we can later
-- install continuations for it.
say ls ("correlating `" ++ show corrId ++ "'")
say ls ("correlating remote call `" ++ show corrId ++ "'")
correlateAsync corrId a ls
k a
go (AsyncIO m k) = do
Expand Down Expand Up @@ -231,10 +242,8 @@ test1 logs = do
quit el2
displayLogs (loopRefLoopState el1)

test2 :: TVar [String] -> IO ()
test2 logs = do
el1 <- makeEventLoop "/tmp" "a" logs
el2 <- makeEventLoop "/tmp" "b" logs
test2 :: EventLoopRef -> EventLoopRef -> TVar [String] -> IO ()
test2 el1 el2 logs = do
lref1 <- spawn el1 testActor
lref2 <- spawn el2 (testActor2 (localToRemoteRef "a" lref1))
a <- send el2 (localToRemoteRef "b" lref2) (Message "init")
Expand All @@ -244,10 +253,23 @@ test2 logs = do
threadDelay 10000
quit el1
quit el2
dumpState (loopRefLoopState el1)
dumpState (loopRefLoopState el2)
displayLogs (loopRefLoopState el1)

test3 :: IO ()
test3 = do
t1 :: IO ()
t1 = do
logs <- newTVarIO []
test2 logs
test1 logs
`catch` (\(e :: SomeException) -> displayLogs' logs)

t2 :: IO ()
t2 = do
logs <- newTVarIO []
el1 <- makeEventLoop "/tmp" "a" logs
el2 <- makeEventLoop "/tmp" "b" logs
test2 el1 el2 logs
`catch` (\(e :: SomeException) -> do
dumpState (loopRefLoopState el1)
dumpState (loopRefLoopState el2)
displayLogs' logs)
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,9 @@ handleInbound ls = forever go
Just respTMVar -> do
waitingAsyncs <- readTVar (loopStateWaitingAsyncs ls)
case Map.lookup corrId waitingAsyncs of
Nothing -> writeTBQueue (loopStateQueue ls) (Response (Reply respTMVar e))
Just a -> writeTBQueue (loopStateQueue ls) (Response (AsyncReply respTMVar a e))
Nothing -> do
-- writeTVar (loopStateWaitingAsyncs ls) (Map.delete corrId waitingAsyncs)
writeTBQueue (loopStateQueue ls) (Response (Reply respTMVar e))
Just a -> do
-- writeTVar (loopStateWaitingAsyncs ls) (Map.delete corrId waitingAsyncs)
writeTBQueue (loopStateQueue ls) (Response (AsyncReply respTMVar a e))
28 changes: 28 additions & 0 deletions src/runtime-prototype/src/StuntDouble/EventLoop/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ correlateAsync :: CorrelationId -> Async Message -> LoopState -> IO ()
correlateAsync cid a ls = atomically $
modifyTVar' (loopStateWaitingAsyncs ls) (Map.insert cid a)

reverseCorrelateAsync :: Async Message -> LoopState -> IO (Maybe CorrelationId)
reverseCorrelateAsync a ls = atomically $ do
m <- readTVar (loopStateWaitingAsyncs ls)
let m' = Map.fromList (map swap (Map.toList m))
return (Map.lookup a m')
where
swap (x, y) = (y, x)

say' :: TVar [String] -> String -> IO ()
say' logs s = atomically (modifyTVar' logs (s :))

Expand All @@ -71,3 +79,23 @@ displayLogs' :: TVar [String] -> IO ()
displayLogs' logsVar = do
logs <- readTVarIO logsVar
mapM_ putStrLn (reverse logs)

dumpState :: LoopState -> IO ()
dumpState ls = do
putStrLn ""
putStrLn "=== LOOPSTATE DUMP ==="
putStr "loopStateName = "
putStrLn (getEventLoopName (loopStateName ls))
corrId <- readTVarIO (loopStateNextCorrelationId ls)
putStr "loopStateNextCorrelationId = "
print corrId
putStr "loopStateResponses.keys = "
responses <- readTVarIO (loopStateResponses ls)
print (Map.keys responses)
putStr "loopStateWaitingAsyncs.keys = "
asyncs <- readTVarIO (loopStateWaitingAsyncs ls)
print (Map.keys asyncs)
putStr "loopStateContinuations.keys.length = "
conts <- readTVarIO (loopStateContinuations ls)
print (length (Map.keys conts))
putStrLn "=== END OF LOOPSTATE DUMP ==="

0 comments on commit 3247b8d

Please sign in to comment.