Skip to content

Commit

Permalink
fix(runtime): fix the multi producer test and expand it to five produ…
Browse files Browse the repository at this point in the history
…cers
  • Loading branch information
symbiont-stevan-andjelkovic committed Oct 15, 2021
1 parent ed16243 commit 60a3329
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
6 changes: 1 addition & 5 deletions src/runtime-prototype/src/Disruptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ newEventProducer rb p backPressure s0 = do
Just snr -> do
(e, s') <- p s
set rb snr e
-- XXX: removing the following line causes a loop?
putStrLn ("wrote to srn: " ++ show (getSequenceNumber snr))
publish rb snr
halt <- isItTimeToShutdown shutdownVar
Expand Down Expand Up @@ -373,14 +374,11 @@ newEventConsumer rb handler s0 barriers (Sleep n) = do
-- couple of tries go into a takeMTVar sleep waiting for a producer to
-- wake us up.
threadDelay n
putStrLn ("nothing to do, mySrn = " ++ show (getSequenceNumber mySnr))
halt <- isItTimeToShutdown shutdownVar
if halt
then return s
else go s
Just bSnr -> do
putStrLn ("something to do, mySrn = " ++ show (getSequenceNumber mySnr) ++
", bSnr = " ++ show (getSequenceNumber bSnr))
-- XXX: what if handler throws exception? https://youtu.be/eTeWxZvlCZ8?t=2271
s' <- foldM (\ih snr -> unsafeGet rb snr >>= \e ->
handler ih e snr (snr == bSnr)) s [mySnr + 1..bSnr]
Expand All @@ -397,8 +395,6 @@ waitFor snr rb [] = waitFor snr rb [RingBufferBarrier rb]
waitFor snr rb bs = do
let snrs = map getSequenceNumberRef bs
minSnr <- minimum <$> mapM readIORef snrs
putStrLn ("waitFor: snr = " ++ show (getSequenceNumber snr) ++
", minSrn = " ++ show (getSequenceNumber minSnr))
if snr < minSnr
then return (Just minSnr)
else return Nothing
Expand Down
43 changes: 28 additions & 15 deletions src/runtime-prototype/test/DisruptorTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import Data.Set (Set)
import qualified Data.Set as Set
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.IORef
import System.IO
import System.IO.Error
import Test.HUnit
import Test.QuickCheck
import Test.QuickCheck.Monadic

import Disruptor

Expand Down Expand Up @@ -71,14 +69,19 @@ unit_ringBufferMulti = do
publish rb j
get rb j @?-> Just 'b'

unit_ringBufferTwoProducersOneConsumer :: Assertion
unit_ringBufferTwoProducersOneConsumer = do
rb <- newRingBuffer SingleProducer 128
unit_ringBufferFiveProducersOneConsumer :: Assertion
unit_ringBufferFiveProducersOneConsumer = do
rb <- newRingBuffer MultiProducer 128
counter <- newIORef 0 :: IO (IORef Int)

let production () = atomicModifyIORef' counter (\n -> (n + 1, (n, ())))
backPressure () = return ()
ep1 <- newEventProducer rb production backPressure ()
ep2 <- newEventProducer rb production backPressure ()
ep3 <- newEventProducer rb production backPressure ()
ep4 <- newEventProducer rb production backPressure ()
ep5 <- newEventProducer rb production backPressure ()

let handler seen n _snr endOfBatch
| n `Set.member` seen = error (show n ++ " appears twice")
| otherwise = return (Set.insert n seen)
Expand All @@ -88,30 +91,40 @@ unit_ringBufferTwoProducersOneConsumer = do

areWeDoneProducing = do
n <- readIORef counter
if n >= 10
if n >= atLeastThisManyEvents
then return ()
else do
threadDelay 10000
areWeDoneProducing

areWeDoneConsuming = do
snr <- readIORef (ecSequenceNumber ec)
if snr >= fromIntegral 10
if snr >= fromIntegral atLeastThisManyEvents
then return ()
else do
threadDelay 10000
areWeDoneConsuming

withEventProducer ep1 $ \aep1 ->
withEventProducer ep2 $ \aep2 ->
withEventConsumer ec $ \aec ->
withAsync areWeThereYet $ \a -> do
wait a
mapM_ shutdownProducer [ep1, ep2]
shutdownConsumer ec
mapM_ wait [aep1, aep2]
seen <- wait aec
print seen
withEventProducer ep3 $ \aep3 ->
withEventProducer ep4 $ \aep4 ->
withEventProducer ep5 $ \aep5 ->
withEventConsumer ec $ \aec ->
withAsync areWeThereYet $ \a -> do
wait a
mapM_ shutdownProducer [ep1, ep2, ep3, ep4, ep5]
shutdownConsumer ec
mapM_ wait [aep1, aep2, aep3, aep4, aep5]
seen <- wait aec
assert (increasingByOneFrom 0 (Set.toList seen))
where
atLeastThisManyEvents = 10000

increasingByOneFrom n [] = n >= atLeastThisManyEvents && n < atLeastThisManyEvents + 500
increasingByOneFrom n (i : is) | n == i = increasingByOneFrom (n + 1) is
| otherwise = False


{-
main :: IO ()
Expand Down

0 comments on commit 60a3329

Please sign in to comment.