Skip to content

Commit

Permalink
feat(runtime): add blocking verison of next
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Oct 14, 2021
1 parent c2e69d8 commit 41d6e94
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
35 changes: 32 additions & 3 deletions src/runtime-prototype/src/Disruptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import System.Posix.Files

newtype SequenceNumber = SequenceNumber { getSequenceNumber :: Int64 }
deriving (Num, Eq, Ord, Real, Enum, Integral, Show, Bounded)
-- ^ NOTE: `(maxBound :: Int64) == 9223372036854775807` so if we write 10M events
-- per second (`10_000_000*60*60*24*365 == 315360000000000) then it would take
-- us `9223372036854775807 / 315360000000000 == 29247.1208677536` years before
-- we overflow.

-- * Ring-buffer

Expand Down Expand Up @@ -148,10 +152,35 @@ next rb = nextBatch rb 1
nextBatch :: RingBuffer e -> Int -> IO SequenceNumber
nextBatch rb n
| n < 1 || fromIntegral n > ringBufferCapacity rb =
error "nextBatch: n < 1 || n > ringBufferCapacity"
error "nextBatch: n must be >= 1 and =< ringBufferCapacity"
| otherwise = do
undefined
-- XXX: ...
(current, nextSequence) <- atomicModifyIORef' (rbSequenceNumber rb) $ \current ->
let
nextSequence = current + fromIntegral n
in
(nextSequence, (current, nextSequence))

let wrapPoint :: SequenceNumber
wrapPoint = nextSequence - fromIntegral (ringBufferCapacity rb)

cachedGatingSequence <- getCachedGatingSequence rb

when (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) $
waitForConsumers wrapPoint

return nextSequence
where
waitForConsumers :: SequenceNumber -> IO ()
waitForConsumers wrapPoint = go
where
go :: IO ()
go = do
gatingSequence <- minimumSequence rb
if wrapPoint > gatingSequence
then do
threadDelay 1
go
else setCachedGatingSequence rb gatingSequence

-- Try to return the next sequence number to write to. If `Nothing` is returned,
-- then the last consumer has not yet processed the event we are about to
Expand Down
22 changes: 18 additions & 4 deletions src/runtime-prototype/test/DisruptorTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ mx @?=> y = do
x <- mx
x @?= y

unit_ringBufferSingle :: Assertion
unit_ringBufferSingle = do
unit_ringBufferSingleNonBlocking :: Assertion
unit_ringBufferSingleNonBlocking = do
rb <- newRingBuffer SingleProducer 8
Just i <- tryNext rb
set rb i 'a'
Expand All @@ -27,16 +27,30 @@ unit_ringBufferSingle = do
publish rb j
get rb j @?=> Just 'b'

unit_ringBufferSingleBlocking :: Assertion
unit_ringBufferSingleBlocking = do
rb <- newRingBuffer SingleProducer 8
i <- next rb
set rb i 'a'
publish rb i
get rb i @?=> Just 'a'
j <- next rb
set rb j 'b'
publish rb j
get rb j @?=> Just 'b'

unit_ringBufferRemainingCapacity :: Assertion
unit_ringBufferRemainingCapacity = do
rb <- newRingBuffer SingleProducer 1
snr <- newIORef (SequenceNumber (-1))
consumerSnrRef <- newIORef (SequenceNumber (-1))
let dummyAsync = error "never used."
setGatingSequences rb [EventConsumer snr dummyAsync]
setGatingSequences rb [EventConsumer consumerSnrRef dummyAsync]
remainingCapacity rb @?=> 1
publish rb (SequenceNumber 0)
remainingCapacity rb @?=> 0
tryNext rb @?=> Nothing
modifyIORef consumerSnrRef succ
remainingCapacity rb @?=> 1

unit_ringBufferMulti :: Assertion
unit_ringBufferMulti = do
Expand Down

0 comments on commit 41d6e94

Please sign in to comment.