diff --git a/src/journal/app/Main.hs b/src/journal/app/Main.hs index eb3af049..e074bde3 100644 --- a/src/journal/app/Main.hs +++ b/src/journal/app/Main.hs @@ -1,5 +1,6 @@ module Main where +import Control.Concurrent.Async (withAsync) import Control.Concurrent (forkFinally) import qualified Control.Exception as E import Control.Monad (unless, forever, void) @@ -10,10 +11,7 @@ import qualified Data.ByteString.Char8 as BSChar8 import Network.Socket import Network.Socket.ByteString (sendAll, recv) import Data.Word -import Data.Char (ord) import Foreign.Ptr -import Data.Int (Int32) -import Data.Bits (shiftL, (.|.)) import Journal @@ -21,9 +19,13 @@ import Journal main :: IO () main = do - jour <- startJournal "/tmp/journal" defaultOptions + (jour, jc) <- startJournal "/tmp/journal" defaultOptions putStrLn "Starting TCP server on port 3000" - runTCPServer Nothing "3000" (go jour) + withAsync (runProducer jour) $ \_a -> + runConsumer jc + +runProducer :: Journal -> IO () +runProducer jour = runTCPServer Nothing "3000" (go jour) where go :: Journal -> Socket -> IO () go jour sock = do @@ -32,9 +34,17 @@ main = do putStrLn ("Received: `" ++ BSChar8.unpack rxBs ++ "'") if BS.null rxBs then return () - else do + else sendAll sock (BSChar8.pack ("Appended " ++ show (BS.length rxBs) ++ " bytes\n")) - -- go jour sock + +runConsumer :: JournalConsumer -> IO () +runConsumer jc = go 10 + where + go 0 = return () + go n = do + bs <- readJournal jc + putStrLn ("Consumed: `" ++ BSChar8.unpack bs ++ "'") + go (n - BS.length bs) ------------------------------------------------------------------------ diff --git a/src/journal/journal.cabal b/src/journal/journal.cabal index f585d079..51d9f2fe 100644 --- a/src/journal/journal.cabal +++ b/src/journal/journal.cabal @@ -26,14 +26,18 @@ extra-source-files: library hs-source-dirs: src/ - -- XXX: separate boot deps from other deps + -- GHC boot library dependencies: + -- (https://gitlab.haskell.org/ghc/ghc/-/blob/master/packages) build-depends: - , async , base ^>=4.14.1.0 + , binary , bytestring , directory , filepath , stm + + build-depends: + , async , mmap , network @@ -75,6 +79,7 @@ executable journal -- LANGUAGE extensions used by modules in this package. -- other-extensions: build-depends: + , async , base ^>=4.14.1.0 , bytestring , journal diff --git a/src/journal/src/Journal.hs b/src/journal/src/Journal.hs index de4f6eee..c31ac9f2 100644 --- a/src/journal/src/Journal.hs +++ b/src/journal/src/Journal.hs @@ -5,6 +5,7 @@ module Journal , appendBS , tee , appendRecv + , readJournal , truncateAfterSnapshot , replay , replay_ @@ -19,11 +20,12 @@ import Foreign.ForeignPtr (newForeignPtr_) import Foreign.Ptr (plusPtr) import Network.Socket (Socket, recvBuf) import System.Directory + (createDirectoryIfMissing, doesDirectoryExist, doesFileExist) import System.FilePath (()) import System.IO.MMap (Mode(ReadWriteEx), mmapFilePtr, munmapFilePtr) -import Journal.Types import Journal.Internal +import Journal.Types ------------------------------------------------------------------------ @@ -32,7 +34,7 @@ import Journal.Internal defaultOptions :: Options defaultOptions = Options 1024 -startJournal :: FilePath -> Options -> IO Journal +startJournal :: FilePath -> Options -> IO (Journal, JournalConsumer) startJournal dir (Options maxByteSize) = do dirExists <- doesDirectoryExist dir unless dirExists (createDirectoryIfMissing True dir) @@ -46,39 +48,42 @@ startJournal dir (Options maxByteSize) = do return (maxByteSize - nuls) else return 0 - putStrLn ("Offset: " ++ show offset) - (ptr, _rawSize, _offset, _size) <- mmapFilePtr (dir "active") ReadWriteEx (Just (fromIntegral offset, maxByteSize)) -- XXX: assert max size - bytesWrittenCounter <- newCounter offset + bytesProducedCounter <- newCounter offset ptrRef <- newJournalPtrRef (ptr `plusPtr` offset) + jcPtrRef <- newJournalConsumerPtrRef ptr fileCounter <- newCounter 0 - return (Journal ptrRef bytesWrittenCounter maxByteSize fileCounter) + bytesConsumedCounter <- newCounter 0 + return (Journal ptrRef bytesProducedCounter maxByteSize fileCounter, + JournalConsumer jcPtrRef bytesProducedCounter bytesConsumedCounter) ------------------------------------------------------------------------ -- * Production +-- NOTE: pre-condition: `BS.length bs > 0` appendBS :: Journal -> ByteString -> IO () -appendBS = undefined +appendBS jour bs = undefined +-- NOTE: pre-condition: `len` > 0 tee :: Journal -> Socket -> Int -> IO ByteString tee jour sock len = do offset <- claim jour len - putStrLn ("tee: offset = " ++ show offset) buf <- getJournalPtr jour receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len - -- XXX: write header + writeHeader (buf `plusPtr` offset) len fptr <- newForeignPtr_ buf - return (BS.copy (fromForeignPtr fptr offset len)) + return (BS.copy (fromForeignPtr fptr (offset + hEADER_SIZE) len)) +-- NOTE: pre-condition: `len` > 0 appendRecv :: Journal -> Socket -> Int -> IO Int appendRecv jour sock len = do offset <- claim jour len buf <- getJournalPtr jour receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len - -- XXX: write header + writeHeader (buf `plusPtr` offset) len return receivedBytes ------------------------------------------------------------------------ @@ -86,14 +91,21 @@ appendRecv jour sock len = do -- * Consumption readJournal :: JournalConsumer -> IO ByteString -readJournal = undefined +readJournal jc = do + ptr <- getJournalConsumerPtr jc + offset <- incrCounter hEADER_SIZE (jcBytesConsumed jc) + len <- waitForHeader ptr offset + fptr <- newForeignPtr_ ptr + let bs = BS.copy (fromForeignPtr fptr (offset + hEADER_SIZE) len) + incrCounter_ len (jcBytesConsumed jc) + return bs ------------------------------------------------------------------------ -- * Snapshots and replay -truncateAfterSnapshot :: Journal -> BytesRead -> IO () -truncateAfterSnapshot = undefined +truncateAfterSnapshot :: Journal -> Int -> IO () +truncateAfterSnapshot jour bytesRead = undefined replay :: Journal -> (ByteString -> IO a) -> IO [a] replay = undefined diff --git a/src/journal/src/Journal/Internal.hs b/src/journal/src/Journal/Internal.hs index cf31fc19..261c26b5 100644 --- a/src/journal/src/Journal/Internal.hs +++ b/src/journal/src/Journal/Internal.hs @@ -1,6 +1,12 @@ module Journal.Internal where import Control.Concurrent.Async +import Data.Binary (encode, decode) +import Data.ByteString.Lazy (ByteString) +import qualified Data.ByteString.Lazy as LBS +import Data.Word (Word8, Word32) +import Foreign.Ptr (Ptr, plusPtr) +import Foreign.Storable (pokeByteOff, peekByteOff) import Journal.Types @@ -8,7 +14,9 @@ import Journal.Types -- | The size of the journal entry header in bytes. hEADER_SIZE :: Int -hEADER_SIZE = 0 -- XXX +hEADER_SIZE = 4 -- sizeOf (0 :: Word32) + -- XXX: version? + -- XXX: CRC? claim :: Journal -> Int -> IO Int claim jour bytes = incrCounter (bytes + hEADER_SIZE) (jOffset jour) @@ -25,9 +33,39 @@ claim jour bytes = incrCounter (bytes + hEADER_SIZE) (jOffset jour) -- continuing on the above example say we are trying to write 100 bytes we get offset 1200 -- if bytes + offset > jMaxS -writeHeader :: Journal -> Int -> IO () -writeHeader = undefined - -- version +-- XXX: Use Data.Primitive.ByteArray.copyMutableByteArrayToPtr instead? +writeLBSToPtr :: ByteString -> Ptr Word8 -> IO () +writeLBSToPtr bs ptr | LBS.null bs = return () + | otherwise = go (fromIntegral (LBS.length bs - 1)) + where + go :: Int -> IO () + go 0 = pokeByteOff ptr 0 (LBS.index bs 0) + go n = do + pokeByteOff ptr n (LBS.index bs (fromIntegral n)) + go (n - 1) + +writeHeader :: Ptr Word8 -> Int -> IO () +writeHeader ptr len = do + let header = encode (fromIntegral len :: Word32) + writeLBSToPtr header ptr + +readHeader :: Ptr Word8 -> IO Int +readHeader ptr = do + b0 <- peekByteOff ptr 0 + b1 <- peekByteOff ptr 1 + b2 <- peekByteOff ptr 2 + b3 <- peekByteOff ptr 3 + return (fromIntegral (decode (LBS.pack [b0, b1, b2, b3]) :: Word32)) + +waitForHeader :: Ptr Word8 -> Int -> IO Int +waitForHeader ptr offset = go + where + go = do + len <- readHeader (ptr `plusPtr` offset) + -- TODO: This will break if we write a bytestring of length 0 + if len == 0 + then go + else return len -- | "active" file becomes "dirty", and the "clean" file becomes the new -- "active" file. diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index 4374d13d..edfd3841 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -9,11 +9,6 @@ import Foreign.Ptr (Ptr, plusPtr) ------------------------------------------------------------------------ -newtype Bytes = Bytes Int - -newtype BytesRead = BytesRead Int -newtype Offset = Offset Int - newtype AtomicCounter = AtomicCounter (IORef Int) newCounter :: Int -> IO AtomicCounter @@ -25,6 +20,9 @@ incrCounter i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, j)) incrCounter_ :: Int -> AtomicCounter -> IO () incrCounter_ i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, ())) +readCounter :: AtomicCounter -> IO Int +readCounter (AtomicCounter ref) = readIORef ref + data Journal = Journal { jPtr :: !(TVar (Ptr Word8)) , jOffset :: {-# UNPACK #-} !AtomicCounter @@ -59,5 +57,13 @@ data Options = Options -- max disk space in total? multiple of maxSize? data JournalConsumer = JournalConsumer - { jcBytesRead :: IORef BytesRead + { jcPtr :: {-# UNPACK #-} !(IORef (Ptr Word8)) + , jcBytesProduced :: {-# UNPACK #-} !AtomicCounter + , jcBytesConsumed :: {-# UNPACK #-} !AtomicCounter } + +newJournalConsumerPtrRef :: Ptr Word8 -> IO (IORef (Ptr Word8)) +newJournalConsumerPtrRef = newIORef + +getJournalConsumerPtr :: JournalConsumer -> IO (Ptr Word8) +getJournalConsumerPtr = readIORef . jcPtr