From 5428ae6e03abf002db7404b2c9b8737b1fb47f68 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Fri, 26 Nov 2021 15:18:29 +0100 Subject: [PATCH] feat(journal): use claim --- src/journal/app/Main.hs | 11 ++++++----- src/journal/src/Journal.hs | 29 +++++++++++++++++------------ src/journal/src/Journal/Internal.hs | 10 ++++++++-- src/journal/src/Journal/Types.hs | 24 ++++++++++++------------ 4 files changed, 43 insertions(+), 31 deletions(-) diff --git a/src/journal/app/Main.hs b/src/journal/app/Main.hs index e07946dd..eb3af049 100644 --- a/src/journal/app/Main.hs +++ b/src/journal/app/Main.hs @@ -28,12 +28,13 @@ main = do go :: Journal -> Socket -> IO () go jour sock = do putStrLn "A client connected..." - rxBytes <- appendRecv jour sock 4096 - if rxBytes == 4096 - then putStrLn "TODO: There's more to read" + rxBs <- tee jour sock 5 + putStrLn ("Received: `" ++ BSChar8.unpack rxBs ++ "'") + if BS.null rxBs + then return () else do - sendAll sock (BSChar8.pack ("Appended " ++ show rxBytes ++ " bytes\n")) - go jour sock + sendAll sock (BSChar8.pack ("Appended " ++ show (BS.length rxBs) ++ " bytes\n")) + -- go jour sock ------------------------------------------------------------------------ diff --git a/src/journal/src/Journal.hs b/src/journal/src/Journal.hs index a0067490..de4f6eee 100644 --- a/src/journal/src/Journal.hs +++ b/src/journal/src/Journal.hs @@ -23,6 +23,7 @@ import System.FilePath (()) import System.IO.MMap (Mode(ReadWriteEx), mmapFilePtr, munmapFilePtr) import Journal.Types +import Journal.Internal ------------------------------------------------------------------------ @@ -39,18 +40,21 @@ startJournal dir (Options maxByteSize) = do offset <- do activeExists <- doesFileExist (dir "active") if activeExists - then - -- XXX: What if the user writes a NUL? Safer to use takeWhileEnd and - -- subtract from maxByteSize? - BS.length . BS.takeWhile (/= (fromIntegral 0)) <$> BS.readFile (dir "active") + then do + nuls <- BS.length . BS.takeWhileEnd (== (fromIntegral 0)) <$> + BS.readFile (dir "active") + return (maxByteSize - nuls) else return 0 + putStrLn ("Offset: " ++ show offset) + (ptr, _rawSize, _offset, _size) <- mmapFilePtr (dir "active") ReadWriteEx (Just (fromIntegral offset, maxByteSize)) -- XXX: assert max size - bytesWritten <- newBytesCounter offset + bytesWrittenCounter <- newCounter offset ptrRef <- newJournalPtrRef (ptr `plusPtr` offset) - return (Journal ptrRef bytesWritten maxByteSize) + fileCounter <- newCounter 0 + return (Journal ptrRef bytesWrittenCounter maxByteSize fileCounter) ------------------------------------------------------------------------ @@ -61,19 +65,20 @@ appendBS = undefined tee :: Journal -> Socket -> Int -> IO ByteString tee jour sock len = do - offset <- return 0 -- XXX: use claim to get this + offset <- claim jour len + putStrLn ("tee: offset = " ++ show offset) buf <- getJournalPtr jour - receivedBytes <- recvBuf sock buf len - advanceJournalPtr jour receivedBytes + receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len + -- XXX: write header fptr <- newForeignPtr_ buf return (BS.copy (fromForeignPtr fptr offset len)) appendRecv :: Journal -> Socket -> Int -> IO Int appendRecv jour sock len = do - -- claim + offset <- claim jour len buf <- getJournalPtr jour - receivedBytes <- recvBuf sock buf len - advanceJournalPtr jour receivedBytes + receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len + -- XXX: write header return receivedBytes ------------------------------------------------------------------------ diff --git a/src/journal/src/Journal/Internal.hs b/src/journal/src/Journal/Internal.hs index e5f481bf..cf31fc19 100644 --- a/src/journal/src/Journal/Internal.hs +++ b/src/journal/src/Journal/Internal.hs @@ -6,8 +6,14 @@ import Journal.Types ------------------------------------------------------------------------ -claim :: Bytes -> Journal -> IO (Maybe Offset) -claim bytes = undefined +-- | The size of the journal entry header in bytes. +hEADER_SIZE :: Int +hEADER_SIZE = 0 -- XXX + +claim :: Journal -> Int -> IO Int +claim jour bytes = incrCounter (bytes + hEADER_SIZE) (jOffset jour) + + -- XXX: -- if bytes + offset <= jMaxSize then write to active file -- if bytes + offset > jMaxSize then -- if offset - jMaxSize == bytes then rotate files diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index eccf758e..4374d13d 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -14,17 +14,22 @@ newtype Bytes = Bytes Int newtype BytesRead = BytesRead Int newtype Offset = Offset Int -newtype BytesCounter = BytesCounter (IORef Int) +newtype AtomicCounter = AtomicCounter (IORef Int) -newBytesCounter :: Int -> IO BytesCounter -newBytesCounter offset = do - ref <- newIORef offset - return (BytesCounter ref) +newCounter :: Int -> IO AtomicCounter +newCounter i = AtomicCounter <$> newIORef i + +incrCounter :: Int -> AtomicCounter -> IO Int +incrCounter i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, j)) + +incrCounter_ :: Int -> AtomicCounter -> IO () +incrCounter_ i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, ())) data Journal = Journal { jPtr :: !(TVar (Ptr Word8)) - , jBytesWritten :: !BytesCounter - , jMaxByteSize :: !Int + , jOffset :: {-# UNPACK #-} !AtomicCounter + , jMaxByteSize :: {-# UNPACK #-} !Int + , jFileCount :: {-# UNPACK #-} !AtomicCounter -- , jFile :: {-# UNPACK #-} !(TVar FilePath) -- , jOffset :: {-# UNPACK #-} !(TVar Word64) -- Tail. -- , jSequence :: {-# UNPACK #-} !(TVar Word64) @@ -39,11 +44,6 @@ newJournalPtrRef = newTVarIO getJournalPtr :: Journal -> IO (Ptr Word8) getJournalPtr = atomically . readTVar . jPtr --- XXX: remove, can be (re)calculate dfrom jBytesWritten -advanceJournalPtr :: Journal -> Int -> IO () -advanceJournalPtr jour bytes = atomically $ - modifyTVar' (jPtr jour) (`plusPtr` bytes) - data Metrics = Metrics { mAbortedConnections :: Word32 , mReplaySize :: Int -- XXX: Histogram