From 3247b8d2b266086eb8fb32f09270f787238851a3 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Fri, 30 Apr 2021 16:00:46 +0200 Subject: [PATCH] feat(runtime): more debug info --- .../src/StuntDouble/EventLoop.hs | 48 ++++++++++++++----- .../StuntDouble/EventLoop/InboundHandler.hs | 8 +++- .../src/StuntDouble/EventLoop/State.hs | 28 +++++++++++ 3 files changed, 69 insertions(+), 15 deletions(-) diff --git a/src/runtime-prototype/src/StuntDouble/EventLoop.hs b/src/runtime-prototype/src/StuntDouble/EventLoop.hs index 061bf299..49e9c0f2 100644 --- a/src/runtime-prototype/src/StuntDouble/EventLoop.hs +++ b/src/runtime-prototype/src/StuntDouble/EventLoop.hs @@ -84,7 +84,6 @@ handleCommand (Invoke lref msg respVar) ls = do Now reply <- runActor ls (actor msg) atomically (putTMVar respVar reply) handleCommand (Send rref msg respVar) ls = do - say ls ("Send: " ++ show rref ++ " " ++ show msg) (corrId, respTMVar) <- atomically $ do corrId <- readTVar (loopStateNextCorrelationId ls) modifyTVar' (loopStateNextCorrelationId ls) succ @@ -98,6 +97,8 @@ handleCommand (Send rref msg respVar) ls = do resp <- takeTMVar respTMVar -- XXX: timeout? modifyTVar' (loopStateResponses ls) (Map.delete corrId) return resp + say ls ("correlating send with `" ++ show corrId ++ "'") + correlateAsync corrId a ls atomically (putTMVar respVar a) say ls "Done with send" handleCommand Quit ls = do @@ -107,19 +108,27 @@ handleCommand Quit ls = do handleResponse :: Response -> LoopState -> IO () handleResponse (Reply respTMVar e) ls - | envelopeReceiver e == dummyDeveloperRef ls = + | envelopeReceiver e == dummyDeveloperRef ls = do + say ls ("handleResponse: Reply: " ++ show e) atomically (putTMVar respTMVar (envelopeMessage e)) | otherwise = do - say ls (show e) + say ls ("handleResponse: Reply: otherwise: " ++ show e) atomically (putTMVar respTMVar (envelopeMessage e)) handleResponse (AsyncReply respTMVar a e) ls = do + say ls ("handleResponse: AsyncReply: " ++ show e) say ls "recalling continuation" k <- recallContinuation a ls cont <- runActor ls (k (envelopeMessage e)) case cont of Now replyMsg -> do say ls ("Now: " ++ show replyMsg) - atomically (putTMVar respTMVar replyMsg) + Just corrId <- reverseCorrelateAsync a ls + say ls ("Now: CorrId: " ++ show corrId) + resps <- readTVarIO (loopStateResponses ls) + let respVar = resps Map.! corrId + say ls ("Now: loopStateResponses.keys = " ++ show (Map.keys resps)) + -- atomically (putTMVar respTMVar replyMsg) + atomically (putTMVar respVar replyMsg) say ls "Done!" Later {} -> do say ls "Later" @@ -142,7 +151,9 @@ handleReceive (Request e) ls = do Just actor -> do cont <- runActor ls (actor (envelopeMessage e)) case cont of - Now replyMsg -> transportSend (loopStateTransport ls) (reply e replyMsg) + Now replyMsg -> do + say ls "no continuation" + transportSend (loopStateTransport ls) (reply e replyMsg) Later async k -> do -- The actor has to talk to other remote actors before being able to reply. say ls "installing continuation" @@ -174,7 +185,7 @@ runActor ls = iterM go return return resp -- Associate the correlation id with the `Async` `a`, so that we can later -- install continuations for it. - say ls ("correlating `" ++ show corrId ++ "'") + say ls ("correlating remote call `" ++ show corrId ++ "'") correlateAsync corrId a ls k a go (AsyncIO m k) = do @@ -231,10 +242,8 @@ test1 logs = do quit el2 displayLogs (loopRefLoopState el1) -test2 :: TVar [String] -> IO () -test2 logs = do - el1 <- makeEventLoop "/tmp" "a" logs - el2 <- makeEventLoop "/tmp" "b" logs +test2 :: EventLoopRef -> EventLoopRef -> TVar [String] -> IO () +test2 el1 el2 logs = do lref1 <- spawn el1 testActor lref2 <- spawn el2 (testActor2 (localToRemoteRef "a" lref1)) a <- send el2 (localToRemoteRef "b" lref2) (Message "init") @@ -244,10 +253,23 @@ test2 logs = do threadDelay 10000 quit el1 quit el2 + dumpState (loopRefLoopState el1) + dumpState (loopRefLoopState el2) displayLogs (loopRefLoopState el1) -test3 :: IO () -test3 = do +t1 :: IO () +t1 = do logs <- newTVarIO [] - test2 logs + test1 logs `catch` (\(e :: SomeException) -> displayLogs' logs) + +t2 :: IO () +t2 = do + logs <- newTVarIO [] + el1 <- makeEventLoop "/tmp" "a" logs + el2 <- makeEventLoop "/tmp" "b" logs + test2 el1 el2 logs + `catch` (\(e :: SomeException) -> do + dumpState (loopRefLoopState el1) + dumpState (loopRefLoopState el2) + displayLogs' logs) diff --git a/src/runtime-prototype/src/StuntDouble/EventLoop/InboundHandler.hs b/src/runtime-prototype/src/StuntDouble/EventLoop/InboundHandler.hs index 44e094d0..e778e013 100644 --- a/src/runtime-prototype/src/StuntDouble/EventLoop/InboundHandler.hs +++ b/src/runtime-prototype/src/StuntDouble/EventLoop/InboundHandler.hs @@ -27,5 +27,9 @@ handleInbound ls = forever go Just respTMVar -> do waitingAsyncs <- readTVar (loopStateWaitingAsyncs ls) case Map.lookup corrId waitingAsyncs of - Nothing -> writeTBQueue (loopStateQueue ls) (Response (Reply respTMVar e)) - Just a -> writeTBQueue (loopStateQueue ls) (Response (AsyncReply respTMVar a e)) + Nothing -> do + -- writeTVar (loopStateWaitingAsyncs ls) (Map.delete corrId waitingAsyncs) + writeTBQueue (loopStateQueue ls) (Response (Reply respTMVar e)) + Just a -> do + -- writeTVar (loopStateWaitingAsyncs ls) (Map.delete corrId waitingAsyncs) + writeTBQueue (loopStateQueue ls) (Response (AsyncReply respTMVar a e)) diff --git a/src/runtime-prototype/src/StuntDouble/EventLoop/State.hs b/src/runtime-prototype/src/StuntDouble/EventLoop/State.hs index c3e9eb1a..c01bbfc1 100644 --- a/src/runtime-prototype/src/StuntDouble/EventLoop/State.hs +++ b/src/runtime-prototype/src/StuntDouble/EventLoop/State.hs @@ -54,6 +54,14 @@ correlateAsync :: CorrelationId -> Async Message -> LoopState -> IO () correlateAsync cid a ls = atomically $ modifyTVar' (loopStateWaitingAsyncs ls) (Map.insert cid a) +reverseCorrelateAsync :: Async Message -> LoopState -> IO (Maybe CorrelationId) +reverseCorrelateAsync a ls = atomically $ do + m <- readTVar (loopStateWaitingAsyncs ls) + let m' = Map.fromList (map swap (Map.toList m)) + return (Map.lookup a m') + where + swap (x, y) = (y, x) + say' :: TVar [String] -> String -> IO () say' logs s = atomically (modifyTVar' logs (s :)) @@ -71,3 +79,23 @@ displayLogs' :: TVar [String] -> IO () displayLogs' logsVar = do logs <- readTVarIO logsVar mapM_ putStrLn (reverse logs) + +dumpState :: LoopState -> IO () +dumpState ls = do + putStrLn "" + putStrLn "=== LOOPSTATE DUMP ===" + putStr "loopStateName = " + putStrLn (getEventLoopName (loopStateName ls)) + corrId <- readTVarIO (loopStateNextCorrelationId ls) + putStr "loopStateNextCorrelationId = " + print corrId + putStr "loopStateResponses.keys = " + responses <- readTVarIO (loopStateResponses ls) + print (Map.keys responses) + putStr "loopStateWaitingAsyncs.keys = " + asyncs <- readTVarIO (loopStateWaitingAsyncs ls) + print (Map.keys asyncs) + putStr "loopStateContinuations.keys.length = " + conts <- readTVarIO (loopStateContinuations ls) + print (length (Map.keys conts)) + putStrLn "=== END OF LOOPSTATE DUMP ==="