Skip to content

Commit

Permalink
refactor(runtime): inline a few local helper functions
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Jun 9, 2021
1 parent d58eb8b commit cbe1ec4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 57 deletions.
3 changes: 2 additions & 1 deletion src/runtime-prototype/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ not.
#### How to run tests

cabal configure test \
--test-option='--timeout=1' \
--test-option='--timeout=10' \
--test-option='--color=always' \
--test-show-details=streaming \
--ghc-options='-threaded -rtsopts -with-rtsopts=-N -fno-ignore-asserts' \
# --test-option='--pattern=/$pattern/
cabal test
111 changes: 55 additions & 56 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -490,42 +490,37 @@ handleInbound :: EventLoop -> IO ()
handleInbound = forever . handleInbound1

handleInbound1 :: EventLoop -> IO ()
handleInbound1 ls = go
where
go = do
me <- transportReceive (lsTransport ls)
case me of
Nothing -> return ()
Just e -> do
let p = Promise (getCorrelationId (envelopeCorrelationId e))
atomically (writeTBQueue (lsQueue ls) (Reaction (Receive p e)))
handleInbound1 ls = do
me <- transportReceive (lsTransport ls)
case me of
Nothing -> return ()
Just e -> do
let p = Promise (getCorrelationId (envelopeCorrelationId e))
atomically (writeTBQueue (lsQueue ls) (Reaction (Receive p e)))

handleAsyncIO :: EventLoop -> IO ()
handleAsyncIO ls = forever (handleAsyncIO1 ls >> threadDelay 1000 {- 1 ms -})

handleAsyncIO1 :: EventLoop -> IO ()
handleAsyncIO1 ls = go
where
go :: IO ()
go = atomically $ do
s <- readTVar (lsAsyncState ls)
-- We want to be non-blocking here, otherwise we can get into a situation
-- where we schedule a slow I/O operation and block waiting for it while
-- other quicker I/O operation could get scheduled, but won't be polled
-- until after the slow one finishes.
mr <- pollAnySTM (Map.keys (asyncStateAsyncIO s))
case mr of
Nothing -> return ()
Just (a, Right ioResult) -> do
let p = asyncStateAsyncIO s Map.! a -- XXX: partial function
writeTBQueue (lsQueue ls) (Reaction (AsyncIOFinished p ioResult))
writeTVar (lsAsyncState ls)
(s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) })
Just (a, Left exception) -> do
let p = asyncStateAsyncIO s Map.! a -- XXX: partial function
writeTBQueue (lsQueue ls) (Reaction (AsyncIOFailed p exception))
writeTVar (lsAsyncState ls)
(s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) })
handleAsyncIO1 ls = atomically $ do
s <- readTVar (lsAsyncState ls)
-- We want to be non-blocking here, otherwise we can get into a situation
-- where we schedule a slow I/O operation and block waiting for it while
-- other quicker I/O operation could get scheduled, but won't be polled
-- until after the slow one finishes.
mr <- pollAnySTM (Map.keys (asyncStateAsyncIO s))
case mr of
Nothing -> return ()
Just (a, Right ioResult) -> do
let p = asyncStateAsyncIO s Map.! a -- XXX: partial function
writeTBQueue (lsQueue ls) (Reaction (AsyncIOFinished p ioResult))
writeTVar (lsAsyncState ls)
(s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) })
Just (a, Left exception) -> do
let p = asyncStateAsyncIO s Map.! a -- XXX: partial function
writeTBQueue (lsQueue ls) (Reaction (AsyncIOFailed p exception))
writeTVar (lsAsyncState ls)
(s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) })

-- | Check if any async finished in a non-blocking way.
pollAnySTM :: [Async a] -> STM (Maybe (Async a, Either SomeException a))
Expand All @@ -540,15 +535,12 @@ handleTimeouts :: EventLoop -> IO ()
handleTimeouts = forever . handleTimeouts1

handleTimeouts1 :: EventLoop -> IO ()
handleTimeouts1 ls = go
where
go :: IO ()
go = do
now <- getCurrentTime (lsTime ls)
als <- atomically (stateTVar (lsAsyncState ls) (findTimedout now))
mapM_ (\(a, lref) ->
atomically (writeTBQueue (lsQueue ls) (Reaction (SendTimeoutReaction a lref))))
als
handleTimeouts1 ls = do
now <- getCurrentTime (lsTime ls)
als <- atomically (stateTVar (lsAsyncState ls) (findTimedout now))
mapM_ (\(a, lref) ->
atomically (writeTBQueue (lsQueue ls) (Reaction (SendTimeoutReaction a lref))))
als

findTimedout :: UTCTime -> AsyncState
-> ([(Free ActorF (), LocalRef)], AsyncState)
Expand All @@ -568,23 +560,30 @@ findTimedout now s =
})

handleEvents :: EventLoop -> IO ()
handleEvents = forever . handleEvents1

handleEvents = forever . handleEvents1Blocking

handleEvents1Blocking :: EventLoop -> IO ()
handleEvents1Blocking ls = do
e <- atomically (readTBQueue (lsQueue ls))
handleEvent e ls
`catch` \(ex :: SomeException) -> do
-- XXX: Why are `AsyncCancelled` being caught here?
unless (fromException ex == Just AsyncCancelled) $
putStrLn ("handleEvents: exception: " ++ show ex)

-- XXX: Using this non-blocking version in `handleEvents` causes tests to be a
-- lot more flaky, not sure why.
handleEvents1 :: EventLoop -> IO ()
handleEvents1 ls = go
where
go = do
-- XXX: This appears to make the tests a lot more flaky, not sure why...
-- me <- atomically (tryReadTBQueue (lsQueue ls))
me <- atomically (Just <$> readTBQueue (lsQueue ls))
case me of
Nothing -> threadDelay 100 -- 0.1 ms
Just e -> do
handleEvent e ls
`catch` \(ex :: SomeException) -> do
-- XXX: Why are `AsyncCancelled` being caught here?
unless (fromException ex == Just AsyncCancelled) $
putStrLn ("handleEvents: exception: " ++ show ex)
handleEvents1 ls = do
me <- atomically (tryReadTBQueue (lsQueue ls))
case me of
Nothing -> threadDelay 100 -- 0.1 ms
Just e -> do
handleEvent e ls
`catch` \(ex :: SomeException) -> do
-- XXX: Why are `AsyncCancelled` being caught here?
unless (fromException ex == Just AsyncCancelled) $
putStrLn ("handleEvents: exception: " ++ show ex)

handleEvent :: Event -> EventLoop -> IO ()
handleEvent (Action a) ls = act' ls [a]
Expand Down

0 comments on commit cbe1ec4

Please sign in to comment.