Skip to content

Commit

Permalink
feat(journal): Notify reader on write
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-daniel-gustafsson committed Mar 22, 2022
1 parent 52c760d commit fa8e94d
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 9 deletions.
7 changes: 5 additions & 2 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,11 @@ startJournal fp (Options termLength logger _maxSub) = do

initTermId <- readInitialTermId (Metadata meta)

readerNotifier <- newReaderNotifier

return (Journal termBuffers (Metadata meta) logger
(int2Int32 termLength) (positionBitsToShift (int2Int32 termLength)) initTermId)
(int2Int32 termLength) (positionBitsToShift (int2Int32 termLength)) initTermId
readerNotifier)

------------------------------------------------------------------------

Expand All @@ -149,7 +152,7 @@ appendBS jour bs = do
Left err -> return (Left err)
Right (_offset, bufferClaim) -> do
putBS bufferClaim hEADER_LENGTH bs
Right <$> commit bufferClaim (jLogger jour)
Right <$> commit bufferClaim (jLogger jour) (jReadNotifier jour)

-- tee :: Journal -> Socket -> Int -> IO ByteString
-- tee jour sock len = do
Expand Down
5 changes: 3 additions & 2 deletions src/journal/src/Journal/Internal/BufferClaim.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ withPtr (BufferClaim bb) k = do
-- XXX: boundcheck?
withForeignPtr (bbData bb `plusForeignPtr` slice) k

commit :: BufferClaim -> Logger -> IO ()
commit (BufferClaim bb) logger = do
commit :: BufferClaim -> Logger -> WaitingStrategy -> IO ()
commit (BufferClaim bb) logger rn = do
let Capacity frameLen = getCapacity bb
logg logger ("commit, frameLen: " ++ show frameLen)
writeFrameType bb 0 Valid
writeFrameLength bb 0 (HeaderLength (int2Int32 frameLen))
notifyReader rn

abort :: BufferClaim -> IO ()
abort (BufferClaim bb) = do
Expand Down
6 changes: 3 additions & 3 deletions src/journal/src/Journal/MP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ appendBS jour bs = do
Left err -> return (Left err)
Right (_offset, bufferClaim) -> do
putBS bufferClaim hEADER_LENGTH bs
Right <$> commit bufferClaim (jLogger jour)
Right <$> commit bufferClaim (jLogger jour) (jReadNotifier jour)

appendLBS :: Journal -> LBS.ByteString -> IO (Either AppendError ())
appendLBS jour bs = do
Expand All @@ -60,7 +60,7 @@ appendLBS jour bs = do
Left err -> return (Left err)
Right (_offset, bufferClaim) -> do
putLBS bufferClaim hEADER_LENGTH bs
Right <$> commit bufferClaim (jLogger jour)
Right <$> commit bufferClaim (jLogger jour) (jReadNotifier jour)

recvBytesOffset :: BufferClaim -> Socket -> Int -> Int -> IO Int
recvBytesOffset bc sock offset len = withPtr bc $ \ptr ->
Expand Down Expand Up @@ -171,7 +171,7 @@ readManyJournalSC jour sub state0 process = do
rawTail <- readRawTail meta activeTermIndex
let termOffset = rawTailTermOffset rawTail termLength
if termOffset == oldTermOffset
then threadDelay 1 >> go
then blockUntilNotification (jReadNotifier jour) >> go
else return ()

termLength :: Int32
Expand Down
26 changes: 26 additions & 0 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ module Journal.Types
-- )
where

import Control.Concurrent (threadDelay)
import Control.Concurrent.STM
(TVar, atomically, newTVarIO, readTVar, writeTVar)
import Control.Concurrent.MVar (MVar, newMVar, takeMVar, tryPutMVar)
import Data.Binary (Binary)
import Data.Bits
import Data.ByteString (ByteString)
Expand Down Expand Up @@ -67,6 +69,7 @@ data Journal = Journal
, jTermLength :: !Int32
, jPositionBitsToShift :: !Int32
, jInitialTermId :: {-# UNPACK #-} !TermId
, jReadNotifier :: !WaitingStrategy
}

data JMetadata = JMetadata
Expand Down Expand Up @@ -130,6 +133,29 @@ tombStone = maxBound

------------------------------------------------------------------------

data WaitingStrategy
= ReaderNotifier !(MVar ())
| SpinLoopWaitMicros !Int

newReaderNotifier :: IO WaitingStrategy
-- at the start a reader should check if there is something to read
newReaderNotifier = ReaderNotifier <$> newMVar ()

newSpinLoopWaitMicros :: Int -> WaitingStrategy
newSpinLoopWaitMicros = SpinLoopWaitMicros

notifyReader :: WaitingStrategy -> IO ()
notifyReader (SpinLoopWaitMicros _) = pure ()
notifyReader (ReaderNotifier rn) = do
tryPutMVar rn ()
pure ()

blockUntilNotification :: WaitingStrategy -> IO ()
blockUntilNotification (SpinLoopWaitMicros m) = threadDelay m
blockUntilNotification (ReaderNotifier rn) = takeMVar rn

------------------------------------------------------------------------

newtype RawTail = RawTail { unRawTail :: Int64 }
deriving newtype (Integral, Real, Num, Enum, Ord, Eq, Bits)

Expand Down
4 changes: 2 additions & 2 deletions src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Network.Socket

import Journal.Internal.BufferClaim
import Journal.MP
import Journal.Types (Journal, hEADER_LENGTH, jLogger)
import Journal.Types (Journal, hEADER_LENGTH, jLogger, jReadNotifier)

------------------------------------------------------------------------

Expand Down Expand Up @@ -61,7 +61,7 @@ client' _mgr jour conn offset bufferClaim _fdKey _event = do
CInt fd <- socketToFd conn
putInt32At bufferClaim hEADER_LENGTH fd
putInt64At bufferClaim (hEADER_LENGTH + sizeOf (4 :: Int32)) offset
commit bufferClaim (jLogger jour)
commit bufferClaim (jLogger jour) (jReadNotifier jour)
-- XXX: implement keep-alive...
-- close conn

Expand Down

0 comments on commit fa8e94d

Please sign in to comment.