From 5a7bd639a30314e39e8b2ee08f09cc216f27a086 Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Fri, 24 Jan 2025 10:21:07 +0100 Subject: [PATCH] Cabal flag to control auto-labelling of threads --- Control/Concurrent/Async/Internal.hs | 208 +++++++++++++++++++++------ async.cabal | 12 ++ 2 files changed, 180 insertions(+), 40 deletions(-) diff --git a/Control/Concurrent/Async/Internal.hs b/Control/Concurrent/Async/Internal.hs index 1603f5d..4752434 100644 --- a/Control/Concurrent/Async/Internal.hs +++ b/Control/Concurrent/Async/Internal.hs @@ -57,6 +57,10 @@ import GHC.Exts import GHC.IO hiding (finally, onException) import GHC.Conc (ThreadId(..), labelThread) +#ifdef DEBUG_AUTO_LABEL +import GHC.Stack +#endif + -- ----------------------------------------------------------------------------- -- STM Async API @@ -95,40 +99,65 @@ compareAsyncs (Async t1 _) (Async t2 _) = compare t1 t2 -- (see module-level documentation for details). -- -- __Use 'withAsync' style functions wherever you can instead!__ -async :: IO a -> IO (Async a) +async :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> IO (Async a) async = inline asyncUsing rawForkIO -- | Like 'async' but using 'forkOS' internally. -asyncBound :: IO a -> IO (Async a) +asyncBound :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> IO (Async a) asyncBound = asyncUsing forkOS -- | Like 'async' but using 'forkOn' internally. -asyncOn :: Int -> IO a -> IO (Async a) +asyncOn :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Int -> IO a -> IO (Async a) asyncOn = asyncUsing . rawForkOn -- | Like 'async' but using 'forkIOWithUnmask' internally. The child -- thread is passed a function that can be used to unmask asynchronous -- exceptions. -asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) +asyncWithUnmask :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask) -- | Like 'asyncOn' but using 'forkOnWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. -asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) +asyncOnWithUnmask :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) asyncOnWithUnmask cpu actionWith = asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask) -asyncUsing :: (IO () -> IO ThreadId) - -> IO a -> IO (Async a) +asyncUsing :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + (IO () -> IO ThreadId) -> IO a -> IO (Async a) asyncUsing doFork = \action -> do var <- newEmptyTMVarIO + let action_plus = debugLabelMe >> action -- t <- forkFinally action (\r -> atomically $ putTMVar var r) -- slightly faster: t <- mask $ \restore -> - doFork $ try (restore action) >>= atomically . putTMVar var + doFork $ try (restore action_plus) >>= atomically . putTMVar var return (Async t (readTMVar var)) + -- | Spawn an asynchronous action in a separate thread, and pass its -- @Async@ handle to the supplied function. When the function returns -- or throws an exception, 'uninterruptibleCancel' is called on the @Async@. @@ -144,41 +173,63 @@ asyncUsing doFork = \action -> do -- to `withAsync` returns, so nesting many `withAsync` calls requires -- linear memory. -- -withAsync :: IO a -> (Async a -> IO b) -> IO b +withAsync :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> (Async a -> IO b) -> IO b withAsync = inline withAsyncUsing rawForkIO -- | Like 'withAsync' but uses 'forkOS' internally. -withAsyncBound :: IO a -> (Async a -> IO b) -> IO b +withAsyncBound :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> (Async a -> IO b) -> IO b withAsyncBound = withAsyncUsing forkOS -- | Like 'withAsync' but uses 'forkOn' internally. -withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b +withAsyncOn :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Int -> IO a -> (Async a -> IO b) -> IO b withAsyncOn = withAsyncUsing . rawForkOn -- | Like 'withAsync' but uses 'forkIOWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. -withAsyncWithUnmask - :: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b +withAsyncWithUnmask :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b withAsyncWithUnmask actionWith = withAsyncUsing rawForkIO (actionWith unsafeUnmask) -- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions -withAsyncOnWithUnmask - :: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b +withAsyncOnWithUnmask :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b withAsyncOnWithUnmask cpu actionWith = withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask) -withAsyncUsing :: (IO () -> IO ThreadId) - -> IO a -> (Async a -> IO b) -> IO b +withAsyncUsing :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b -- The bracket version works, but is slow. We can do better by -- hand-coding it: withAsyncUsing doFork = \action inner -> do var <- newEmptyTMVarIO mask $ \restore -> do - t <- doFork $ try (restore action) >>= atomically . putTMVar var + let action_plus = debugLabelMe >> action + t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var let a = Async t (readTMVar var) r <- restore (inner a) `catchAll` \e -> do uninterruptibleCancel a @@ -555,11 +606,19 @@ isCancel e -- > withAsync right $ \b -> -- > waitEither a b -- -race :: IO a -> IO b -> IO (Either a b) +race :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> IO b -> IO (Either a b) -- | Like 'race', but the result is ignored. -- -race_ :: IO a -> IO b -> IO () +race_ :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> IO b -> IO () -- | Run two @IO@ actions concurrently, and return both results. If @@ -571,19 +630,31 @@ race_ :: IO a -> IO b -> IO () -- > withAsync left $ \a -> -- > withAsync right $ \b -> -- > waitBoth a b -concurrently :: IO a -> IO b -> IO (a,b) +concurrently :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> IO b -> IO (a,b) -- | Run two @IO@ actions concurrently. If both of them end with @Right@, -- return both results. If one of then ends with @Left@, interrupt the other -- action and return the @Left@. -- -concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b)) +concurrentlyE :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b)) -- | 'concurrently', but ignore the result values -- -- @since 2.1.1 -concurrently_ :: IO a -> IO b -> IO () +concurrently_ :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> IO b -> IO () #define USE_ASYNC_VERSIONS 0 @@ -644,9 +715,13 @@ concurrentlyE left right = concurrently' left right (collect []) Left ex -> throwIO ex Right r -> collect (r:xs) m -concurrently' :: IO a -> IO b - -> (IO (Either SomeException (Either a b)) -> IO r) - -> IO r +concurrently' :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> IO b + -> (IO (Either SomeException (Either a b)) -> IO r) + -> IO r concurrently' left right collect = do done <- newEmptyMVar mask $ \restore -> do @@ -723,7 +798,11 @@ concurrently_ left right = concurrently' left right (collect 0) -- for each element of the @Traversable@, so running this on large -- inputs without care may lead to resource exhaustion (of memory, -- file descriptors, or other limited resources). -mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b) +mapConcurrently :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Traversable t => (a -> IO b) -> t a -> IO (t b) mapConcurrently f = runConcurrently . traverse (Concurrently . f) -- | `forConcurrently` is `mapConcurrently` with its arguments flipped @@ -731,29 +810,49 @@ mapConcurrently f = runConcurrently . traverse (Concurrently . f) -- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url -- -- @since 2.1.0 -forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b) +forConcurrently :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Traversable t => t a -> (a -> IO b) -> IO (t b) forConcurrently = flip mapConcurrently -- | `mapConcurrently_` is `mapConcurrently` with the return value discarded; -- a concurrent equivalent of 'mapM_'. -mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO () +mapConcurrently_ :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + F.Foldable f => (a -> IO b) -> f a -> IO () mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f) -- | `forConcurrently_` is `forConcurrently` with the return value discarded; -- a concurrent equivalent of 'forM_'. -forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO () +forConcurrently_ :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + F.Foldable f => f a -> (a -> IO b) -> IO () forConcurrently_ = flip mapConcurrently_ -- | Perform the action in the given number of threads. -- -- @since 2.1.1 -replicateConcurrently :: Int -> IO a -> IO [a] +replicateConcurrently :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Int -> IO a -> IO [a] replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently -- | Same as 'replicateConcurrently', but ignore the results. -- -- @since 2.1.1 -replicateConcurrently_ :: Int -> IO a -> IO () +replicateConcurrently_ :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Int -> IO a -> IO () replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . Concurrently . void -- ----------------------------------------------------------------------------- @@ -847,14 +946,18 @@ instance (Semigroup a, Monoid a) => Monoid (ConcurrentlyE e a) where -- | Fork a thread that runs the supplied action, and if it raises an -- exception, re-runs the action. The thread terminates only when the -- action runs to completion without raising an exception. -forkRepeat :: IO a -> IO ThreadId +forkRepeat :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO a -> IO ThreadId forkRepeat action = mask $ \restore -> let go = do r <- tryAll (restore action) case r of Left _ -> go _ -> return () - in forkIO go + in forkIO (debugLabelMe >> go) catchAll :: IO a -> (SomeException -> IO a) -> IO a catchAll = catch @@ -866,11 +969,36 @@ tryAll = try -- handler: saves a bit of time when we will be installing our own -- exception handler. {-# INLINE rawForkIO #-} -rawForkIO :: IO () -> IO ThreadId -rawForkIO (IO action) = IO $ \ s -> - case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #) +rawForkIO :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO () -> IO ThreadId +rawForkIO action = IO $ \ s -> + case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) + where + (IO action_plus) = debugLabelMe >> action {-# INLINE rawForkOn #-} -rawForkOn :: Int -> IO () -> IO ThreadId -rawForkOn (I# cpu) (IO action) = IO $ \ s -> - case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #) +rawForkOn :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + Int -> IO () -> IO ThreadId +rawForkOn (I# cpu) action = IO $ \ s -> + case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #) + where + (IO action_plus) = debugLabelMe >> action + + +debugLabelMe :: +#ifdef DEBUG_AUTO_LABEL + HasCallStack => +#endif + IO () +debugLabelMe = +#ifdef DEBUG_AUTO_LABEL + myThreadId >>= flip labelThread (prettyCallStack callStack) +#else + pure () +#endif diff --git a/async.cabal b/async.cabal index 68286ba..9d28abb 100644 --- a/async.cabal +++ b/async.cabal @@ -63,6 +63,16 @@ source-repository head type: git location: https://github.com/simonmar/async.git +flag debug-auto-label + description: + Strictly for debugging as it might have a non-negligible overhead. + + Enabling this flag will auto-label the threads spawned by @async@. Use it to + find where are unlabelled threads spawned in your program (be it your code or + dependency code). + default: False + manual: True + library default-language: Haskell2010 other-extensions: CPP, MagicHash, RankNTypes, UnboxedTuples @@ -73,6 +83,8 @@ library build-depends: base >= 4.3 && < 4.22, hashable >= 1.1.2.0 && < 1.6, stm >= 2.2 && < 2.6 + if flag(debug-auto-label) + cpp-options: -DDEBUG_AUTO_LABEL test-suite test-async default-language: Haskell2010