From fa8e94d3e3d2ea90ec94a80e4a3a5affb276afaf Mon Sep 17 00:00:00 2001 From: Daniel Gustafsson Date: Mon, 21 Mar 2022 15:57:47 +0100 Subject: [PATCH] feat(journal): Notify reader on write --- src/journal/src/Journal.hs | 7 +++-- .../src/Journal/Internal/BufferClaim.hs | 5 ++-- src/journal/src/Journal/MP.hs | 6 ++--- src/journal/src/Journal/Types.hs | 26 +++++++++++++++++++ .../src/Dumblog/ZeroCopy/HttpServer.hs | 4 +-- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/journal/src/Journal.hs b/src/journal/src/Journal.hs index b0e4d7f4..8a19eea3 100644 --- a/src/journal/src/Journal.hs +++ b/src/journal/src/Journal.hs @@ -130,8 +130,11 @@ startJournal fp (Options termLength logger _maxSub) = do initTermId <- readInitialTermId (Metadata meta) + readerNotifier <- newReaderNotifier + return (Journal termBuffers (Metadata meta) logger - (int2Int32 termLength) (positionBitsToShift (int2Int32 termLength)) initTermId) + (int2Int32 termLength) (positionBitsToShift (int2Int32 termLength)) initTermId + readerNotifier) ------------------------------------------------------------------------ @@ -149,7 +152,7 @@ appendBS jour bs = do Left err -> return (Left err) Right (_offset, bufferClaim) -> do putBS bufferClaim hEADER_LENGTH bs - Right <$> commit bufferClaim (jLogger jour) + Right <$> commit bufferClaim (jLogger jour) (jReadNotifier jour) -- tee :: Journal -> Socket -> Int -> IO ByteString -- tee jour sock len = do diff --git a/src/journal/src/Journal/Internal/BufferClaim.hs b/src/journal/src/Journal/Internal/BufferClaim.hs index dbe593d1..c97b4893 100644 --- a/src/journal/src/Journal/Internal/BufferClaim.hs +++ b/src/journal/src/Journal/Internal/BufferClaim.hs @@ -38,12 +38,13 @@ withPtr (BufferClaim bb) k = do -- XXX: boundcheck? withForeignPtr (bbData bb `plusForeignPtr` slice) k -commit :: BufferClaim -> Logger -> IO () -commit (BufferClaim bb) logger = do +commit :: BufferClaim -> Logger -> WaitingStrategy -> IO () +commit (BufferClaim bb) logger rn = do let Capacity frameLen = getCapacity bb logg logger ("commit, frameLen: " ++ show frameLen) writeFrameType bb 0 Valid writeFrameLength bb 0 (HeaderLength (int2Int32 frameLen)) + notifyReader rn abort :: BufferClaim -> IO () abort (BufferClaim bb) = do diff --git a/src/journal/src/Journal/MP.hs b/src/journal/src/Journal/MP.hs index 03c2388f..e029a4a1 100644 --- a/src/journal/src/Journal/MP.hs +++ b/src/journal/src/Journal/MP.hs @@ -46,7 +46,7 @@ appendBS jour bs = do Left err -> return (Left err) Right (_offset, bufferClaim) -> do putBS bufferClaim hEADER_LENGTH bs - Right <$> commit bufferClaim (jLogger jour) + Right <$> commit bufferClaim (jLogger jour) (jReadNotifier jour) appendLBS :: Journal -> LBS.ByteString -> IO (Either AppendError ()) appendLBS jour bs = do @@ -60,7 +60,7 @@ appendLBS jour bs = do Left err -> return (Left err) Right (_offset, bufferClaim) -> do putLBS bufferClaim hEADER_LENGTH bs - Right <$> commit bufferClaim (jLogger jour) + Right <$> commit bufferClaim (jLogger jour) (jReadNotifier jour) recvBytesOffset :: BufferClaim -> Socket -> Int -> Int -> IO Int recvBytesOffset bc sock offset len = withPtr bc $ \ptr -> @@ -171,7 +171,7 @@ readManyJournalSC jour sub state0 process = do rawTail <- readRawTail meta activeTermIndex let termOffset = rawTailTermOffset rawTail termLength if termOffset == oldTermOffset - then threadDelay 1 >> go + then blockUntilNotification (jReadNotifier jour) >> go else return () termLength :: Int32 diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index d66602a4..4d4153b0 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -35,8 +35,10 @@ module Journal.Types -- ) where +import Control.Concurrent (threadDelay) import Control.Concurrent.STM (TVar, atomically, newTVarIO, readTVar, writeTVar) +import Control.Concurrent.MVar (MVar, newMVar, takeMVar, tryPutMVar) import Data.Binary (Binary) import Data.Bits import Data.ByteString (ByteString) @@ -67,6 +69,7 @@ data Journal = Journal , jTermLength :: !Int32 , jPositionBitsToShift :: !Int32 , jInitialTermId :: {-# UNPACK #-} !TermId + , jReadNotifier :: !WaitingStrategy } data JMetadata = JMetadata @@ -130,6 +133,29 @@ tombStone = maxBound ------------------------------------------------------------------------ +data WaitingStrategy + = ReaderNotifier !(MVar ()) + | SpinLoopWaitMicros !Int + +newReaderNotifier :: IO WaitingStrategy +-- at the start a reader should check if there is something to read +newReaderNotifier = ReaderNotifier <$> newMVar () + +newSpinLoopWaitMicros :: Int -> WaitingStrategy +newSpinLoopWaitMicros = SpinLoopWaitMicros + +notifyReader :: WaitingStrategy -> IO () +notifyReader (SpinLoopWaitMicros _) = pure () +notifyReader (ReaderNotifier rn) = do + tryPutMVar rn () + pure () + +blockUntilNotification :: WaitingStrategy -> IO () +blockUntilNotification (SpinLoopWaitMicros m) = threadDelay m +blockUntilNotification (ReaderNotifier rn) = takeMVar rn + +------------------------------------------------------------------------ + newtype RawTail = RawTail { unRawTail :: Int64 } deriving newtype (Integral, Real, Num, Enum, Ord, Eq, Bits) diff --git a/src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs b/src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs index 5f1049b0..8d8b8404 100644 --- a/src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs +++ b/src/sut/dumblog/src/Dumblog/ZeroCopy/HttpServer.hs @@ -13,7 +13,7 @@ import Network.Socket import Journal.Internal.BufferClaim import Journal.MP -import Journal.Types (Journal, hEADER_LENGTH, jLogger) +import Journal.Types (Journal, hEADER_LENGTH, jLogger, jReadNotifier) ------------------------------------------------------------------------ @@ -61,7 +61,7 @@ client' _mgr jour conn offset bufferClaim _fdKey _event = do CInt fd <- socketToFd conn putInt32At bufferClaim hEADER_LENGTH fd putInt64At bufferClaim (hEADER_LENGTH + sizeOf (4 :: Int32)) offset - commit bufferClaim (jLogger jour) + commit bufferClaim (jLogger jour) (jReadNotifier jour) -- XXX: implement keep-alive... -- close conn