Skip to content

Commit

Permalink
Cabal flag to control auto-labelling of threads
Browse files Browse the repository at this point in the history
  • Loading branch information
jasagredo committed Jan 24, 2025
1 parent e729ddd commit 5a7bd63
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 40 deletions.
208 changes: 168 additions & 40 deletions Control/Concurrent/Async/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc (ThreadId(..), labelThread)

#ifdef DEBUG_AUTO_LABEL
import GHC.Stack
#endif

-- -----------------------------------------------------------------------------
-- STM Async API

Expand Down Expand Up @@ -95,40 +99,65 @@ compareAsyncs (Async t1 _) (Async t2 _) = compare t1 t2
-- (see module-level documentation for details).
--
-- __Use 'withAsync' style functions wherever you can instead!__
async :: IO a -> IO (Async a)
async ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO (Async a)
async = inline asyncUsing rawForkIO

-- | Like 'async' but using 'forkOS' internally.
asyncBound :: IO a -> IO (Async a)
asyncBound ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO (Async a)
asyncBound = asyncUsing forkOS

-- | Like 'async' but using 'forkOn' internally.
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO a -> IO (Async a)
asyncOn = asyncUsing . rawForkOn

-- | Like 'async' but using 'forkIOWithUnmask' internally. The child
-- thread is passed a function that can be used to unmask asynchronous
-- exceptions.
asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask)

-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask cpu actionWith =
asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)

asyncUsing :: (IO () -> IO ThreadId)
-> IO a -> IO (Async a)
asyncUsing ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
(IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing doFork = \action -> do
var <- newEmptyTMVarIO
let action_plus = debugLabelMe >> action
-- t <- forkFinally action (\r -> atomically $ putTMVar var r)
-- slightly faster:
t <- mask $ \restore ->
doFork $ try (restore action) >>= atomically . putTMVar var
doFork $ try (restore action_plus) >>= atomically . putTMVar var
return (Async t (readTMVar var))


-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function. When the function returns
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
Expand All @@ -144,41 +173,63 @@ asyncUsing doFork = \action -> do
-- to `withAsync` returns, so nesting many `withAsync` calls requires
-- linear memory.
--
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> (Async a -> IO b) -> IO b
withAsync = inline withAsyncUsing rawForkIO

-- | Like 'withAsync' but uses 'forkOS' internally.
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
withAsyncBound ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> (Async a -> IO b) -> IO b
withAsyncBound = withAsyncUsing forkOS

-- | Like 'withAsync' but uses 'forkOn' internally.
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn = withAsyncUsing . rawForkOn

-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions.
withAsyncWithUnmask
:: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask actionWith =
withAsyncUsing rawForkIO (actionWith unsafeUnmask)

-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally. The
-- child thread is passed a function that can be used to unmask
-- asynchronous exceptions
withAsyncOnWithUnmask
:: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask cpu actionWith =
withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)

withAsyncUsing :: (IO () -> IO ThreadId)
-> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow. We can do better by
-- hand-coding it:
withAsyncUsing doFork = \action inner -> do
var <- newEmptyTMVarIO
mask $ \restore -> do
t <- doFork $ try (restore action) >>= atomically . putTMVar var
let action_plus = debugLabelMe >> action
t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var
let a = Async t (readTMVar var)
r <- restore (inner a) `catchAll` \e -> do
uninterruptibleCancel a
Expand Down Expand Up @@ -555,11 +606,19 @@ isCancel e
-- > withAsync right $ \b ->
-- > waitEither a b
--
race :: IO a -> IO b -> IO (Either a b)
race ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b -> IO (Either a b)

-- | Like 'race', but the result is ignored.
--
race_ :: IO a -> IO b -> IO ()
race_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b -> IO ()


-- | Run two @IO@ actions concurrently, and return both results. If
Expand All @@ -571,19 +630,31 @@ race_ :: IO a -> IO b -> IO ()
-- > withAsync left $ \a ->
-- > withAsync right $ \b ->
-- > waitBoth a b
concurrently :: IO a -> IO b -> IO (a,b)
concurrently ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b -> IO (a,b)


-- | Run two @IO@ actions concurrently. If both of them end with @Right@,
-- return both results. If one of then ends with @Left@, interrupt the other
-- action and return the @Left@.
--
concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
concurrentlyE ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))

-- | 'concurrently', but ignore the result values
--
-- @since 2.1.1
concurrently_ :: IO a -> IO b -> IO ()
concurrently_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b -> IO ()

#define USE_ASYNC_VERSIONS 0

Expand Down Expand Up @@ -644,9 +715,13 @@ concurrentlyE left right = concurrently' left right (collect [])
Left ex -> throwIO ex
Right r -> collect (r:xs) m

concurrently' :: IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' left right collect = do
done <- newEmptyMVar
mask $ \restore -> do
Expand Down Expand Up @@ -723,37 +798,61 @@ concurrently_ left right = concurrently' left right (collect 0)
-- for each element of the @Traversable@, so running this on large
-- inputs without care may lead to resource exhaustion (of memory,
-- file descriptors, or other limited resources).
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently f = runConcurrently . traverse (Concurrently . f)

-- | `forConcurrently` is `mapConcurrently` with its arguments flipped
--
-- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
--
-- @since 2.1.0
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently = flip mapConcurrently

-- | `mapConcurrently_` is `mapConcurrently` with the return value discarded;
-- a concurrent equivalent of 'mapM_'.
mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f)

-- | `forConcurrently_` is `forConcurrently` with the return value discarded;
-- a concurrent equivalent of 'forM_'.
forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ = flip mapConcurrently_

-- | Perform the action in the given number of threads.
--
-- @since 2.1.1
replicateConcurrently :: Int -> IO a -> IO [a]
replicateConcurrently ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO a -> IO [a]
replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently

-- | Same as 'replicateConcurrently', but ignore the results.
--
-- @since 2.1.1
replicateConcurrently_ :: Int -> IO a -> IO ()
replicateConcurrently_ ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO a -> IO ()
replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . Concurrently . void

-- -----------------------------------------------------------------------------
Expand Down Expand Up @@ -847,14 +946,18 @@ instance (Semigroup a, Monoid a) => Monoid (ConcurrentlyE e a) where
-- | Fork a thread that runs the supplied action, and if it raises an
-- exception, re-runs the action. The thread terminates only when the
-- action runs to completion without raising an exception.
forkRepeat :: IO a -> IO ThreadId
forkRepeat ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO a -> IO ThreadId
forkRepeat action =
mask $ \restore ->
let go = do r <- tryAll (restore action)
case r of
Left _ -> go
_ -> return ()
in forkIO go
in forkIO (debugLabelMe >> go)

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = catch
Expand All @@ -866,11 +969,36 @@ tryAll = try
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO (IO action) = IO $ \ s ->
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
rawForkIO ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
where
(IO action_plus) = debugLabelMe >> action

{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) (IO action) = IO $ \ s ->
case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
rawForkOn ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) action = IO $ \ s ->
case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
where
(IO action_plus) = debugLabelMe >> action


debugLabelMe ::
#ifdef DEBUG_AUTO_LABEL
HasCallStack =>
#endif
IO ()
debugLabelMe =
#ifdef DEBUG_AUTO_LABEL
myThreadId >>= flip labelThread (prettyCallStack callStack)
#else
pure ()
#endif
12 changes: 12 additions & 0 deletions async.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ source-repository head
type: git
location: https://github.com/simonmar/async.git

flag debug-auto-label
description:
Strictly for debugging as it might have a non-negligible overhead.

Enabling this flag will auto-label the threads spawned by @async@. Use it to
find where are unlabelled threads spawned in your program (be it your code or
dependency code).
default: False
manual: True

library
default-language: Haskell2010
other-extensions: CPP, MagicHash, RankNTypes, UnboxedTuples
Expand All @@ -73,6 +83,8 @@ library
build-depends: base >= 4.3 && < 4.22,
hashable >= 1.1.2.0 && < 1.6,
stm >= 2.2 && < 2.6
if flag(debug-auto-label)
cpp-options: -DDEBUG_AUTO_LABEL

test-suite test-async
default-language: Haskell2010
Expand Down

0 comments on commit 5a7bd63

Please sign in to comment.