From a8e3e29dd7f2566007f045bb5343cce2ea153bb9 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Fri, 26 Nov 2021 09:12:43 +0100 Subject: [PATCH] feat(journal): sketch more of the api --- src/journal/app/Main.hs | 62 +++++++++++++++++- src/journal/journal.cabal | 9 ++- src/journal/src/Journal.hs | 98 +++++++++-------------------- src/journal/src/Journal/Internal.hs | 4 +- src/journal/src/Journal/Types.hs | 17 ++++- 5 files changed, 116 insertions(+), 74 deletions(-) diff --git a/src/journal/app/Main.hs b/src/journal/app/Main.hs index 65ae4a05..025577e9 100644 --- a/src/journal/app/Main.hs +++ b/src/journal/app/Main.hs @@ -1,4 +1,64 @@ module Main where +import Control.Concurrent (forkFinally) +import qualified Control.Exception as E +import Control.Monad (unless, forever, void) +import qualified Data.ByteString.Char8 as BS +import Network.Socket +import Network.Socket.ByteString (sendAll) +import System.IO.MMap +import Data.Word +import Foreign.Ptr + +------------------------------------------------------------------------ + main :: IO () -main = putStrLn "Hello, Haskell!" +main = do + mmapWithFilePtr "/tmp/mmap.txt" ReadWrite Nothing $ \(ptr, len) -> do + putStrLn ("Memory mapped file length: " ++ show len) + -- XXX: advance ptr past all written data using headers (len above is merely + -- the size of the file not the contents). + putStrLn "Listening on localhost:3000" + runTCPServer Nothing "3000" (go (castPtr ptr)) + where + go :: Ptr Word8 -> Socket -> IO () + go buf sock = do + putStrLn "Waiting for client..." + rx <- recvBuf sock buf 1024 + if rx == 0 + then putStrLn "Done" + else do + sendAll sock (BS.pack ("Appended " ++ show rx ++ " bytes\n")) + go (buf `plusPtr` rx) sock + +------------------------------------------------------------------------ + +-- Taken from the network package's documentation. +runTCPServer :: Maybe HostName -> ServiceName -> (Socket -> IO a) -> IO a +runTCPServer mhost port server = withSocketsDo $ do + addr <- resolve + E.bracket (open addr) close loop + where + resolve = do + let hints = defaultHints { + addrFlags = [AI_PASSIVE] + , addrSocketType = Stream + } + head <$> getAddrInfo (Just hints) mhost (Just port) + open addr = E.bracketOnError (openSocket addr) close $ \sock -> do + setSocketOption sock ReuseAddr 1 + withFdSocket sock setCloseOnExecIfNeeded + bind sock $ addrAddress addr + listen sock 1024 + return sock + + openSocket :: AddrInfo -> IO Socket + openSocket addr = socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) + + loop sock = forever $ E.bracketOnError (accept sock) (close . fst) + $ \(conn, _peer) -> void $ + -- 'forkFinally' alone is unlikely to fail thus leaking @conn@, + -- but 'E.bracketOnError' above will be necessary if some + -- non-atomic setups (e.g. spawning a subprocess to handle + -- @conn@) before proper cleanup of @conn@ is your case + forkFinally (server conn) (const $ gracefulClose conn 5000) diff --git a/src/journal/journal.cabal b/src/journal/journal.cabal index 6295b5f6..f83faae4 100644 --- a/src/journal/journal.cabal +++ b/src/journal/journal.cabal @@ -25,13 +25,13 @@ extra-source-files: library hs-source-dirs: src/ + -- XXX: separate boot deps from other deps build-depends: , async , base ^>=4.14.1.0 , bytestring , stm - , mmap , network exposed-modules: @@ -71,6 +71,11 @@ executable journal -- LANGUAGE extensions used by modules in this package. -- other-extensions: - build-depends: base ^>=4.14.1.0 + build-depends: + , base ^>=4.14.1.0 + , bytestring + , mmap + , network + hs-source-dirs: app default-language: Haskell2010 diff --git a/src/journal/src/Journal.hs b/src/journal/src/Journal.hs index 410dd546..aeba34f1 100644 --- a/src/journal/src/Journal.hs +++ b/src/journal/src/Journal.hs @@ -4,30 +4,22 @@ module Journal , startNewJournal , restartOldJournal , journal - , journalMany + , journalSocket + , journalSocket_ , truncateAfterSnapshot , replay , replay_ ) where import Data.ByteString (ByteString) -import qualified Data.ByteString.Char8 as BS - --- XXX: -import Control.Concurrent (forkFinally) -import qualified Control.Exception as E -import Control.Monad (unless, forever, void) -import qualified Data.ByteString as S -import Network.Socket -import Network.Socket.ByteString (recv, sendAll) -import System.IO.MMap -import Data.Word -import Foreign.Ptr +import Network.Socket (Socket) import Journal.Types ------------------------------------------------------------------------ +-- * Initialisation + defaultOptions :: Options defaultOptions = Options @@ -39,15 +31,34 @@ startNewJournal dir opts = undefined restartOldJournal :: FilePath -> IO Journal restartOldJournal dir = undefined -journal :: Journal -> ByteString -> IO Position -journal = undefined +------------------------------------------------------------------------ + +-- * Production -journalMany :: Foldable t => Journal -> t ByteString -> IO Position -journalMany = undefined +journal :: Journal -> ByteString -> IO () +journal = undefined -journalSocket :: Journal -> Socket -> Int -> IO (Position, ByteString) +journalSocket :: Journal -> Socket -> Int -> IO ByteString journalSocket jour sock len = undefined +journalSocket_ :: Journal -> Socket -> Int -> IO Int +journalSocket_ jour sock len = do + buf <- undefined -- getPtrToActive jour + undefined + -- rxBytes <- recvBuf sock buf len + -- return rxBytes + +------------------------------------------------------------------------ + +-- * Consumption + +readJournal :: Journal -> Int -> IO ByteString +readJournal = undefined + +------------------------------------------------------------------------ + +-- * Snapshots and replay + truncateAfterSnapshot :: Journal -> Position -> IO () truncateAfterSnapshot = undefined @@ -56,54 +67,3 @@ replay = undefined replay_ :: Journal -> Position -> (ByteString -> IO ()) -> IO () replay_ = undefined - ------------------------------------------------------------------------- --- * Prototype application integration - -runTCPServer :: Maybe HostName -> ServiceName -> (Socket -> IO a) -> IO a -runTCPServer mhost port server = withSocketsDo $ do - addr <- resolve - E.bracket (open addr) close loop - where - resolve = do - let hints = defaultHints { - addrFlags = [AI_PASSIVE] - , addrSocketType = Stream - } - head <$> getAddrInfo (Just hints) mhost (Just port) - open addr = E.bracketOnError (openSocket addr) close $ \sock -> do - setSocketOption sock ReuseAddr 1 - withFdSocket sock setCloseOnExecIfNeeded - bind sock $ addrAddress addr - listen sock 1024 - return sock - - openSocket :: AddrInfo -> IO Socket - openSocket addr = socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) - - loop sock = forever $ E.bracketOnError (accept sock) (close . fst) - $ \(conn, _peer) -> void $ - -- 'forkFinally' alone is unlikely to fail thus leaking @conn@, - -- but 'E.bracketOnError' above will be necessary if some - -- non-atomic setups (e.g. spawning a subprocess to handle - -- @conn@) before proper cleanup of @conn@ is your case - forkFinally (server conn) (const $ gracefulClose conn 5000) - -main :: IO () -main = do - mmapWithFilePtr "/tmp/mmap.txt" ReadWrite Nothing $ \(ptr, len) -> do - putStrLn ("Memory mapped file length: " ++ show len) - -- XXX: advance ptr past all written data using headers (len above is merely - -- the size of the file not the contents). - putStrLn "Listening on localhost:3000" - runTCPServer Nothing "3000" (go (castPtr ptr)) - where - go :: Ptr Word8 -> Socket -> IO () - go buf sock = do - putStrLn "Waiting for client..." - rx <- recvBuf sock buf 1024 - if rx == 0 - then return () - else do - sendAll sock (BS.pack ("Appended " ++ show rx ++ " bytes\n")) - go (buf `plusPtr` rx) sock diff --git a/src/journal/src/Journal/Internal.hs b/src/journal/src/Journal/Internal.hs index 1ed7e31c..79694161 100644 --- a/src/journal/src/Journal/Internal.hs +++ b/src/journal/src/Journal/Internal.hs @@ -32,7 +32,9 @@ cleanDirtyFile = undefined spawnCleaningThread :: IO (Async ()) spawnCleaningThread = undefined -data Inconsistency = Inconsistency +data Inconsistency + = PartialReceived + | PartialRotation checkForInconsistencies :: Journal -> IO [Inconsistency] checkForInconsistencies = undefined diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index dca80075..96c15abd 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -6,13 +6,28 @@ import Data.Word (Word32, Word64) ------------------------------------------------------------------------ +newtype Bytes = Bytes Int + +newtype BytesCounter = BytesCounter Word64 + data Journal = Journal { jFile :: {-# UNPACK #-} !(TVar FilePath) - , jOffset :: {-# UNPACK #-} !(TVar Word64) + , jOffset :: {-# UNPACK #-} !(TVar Word64) -- Tail. , jMaxSize :: !Word64 -- XXX: unit? bytes? mb? , jSequence :: {-# UNPACK #-} !(TVar Word64) + -- jPointerToActiveFile + -- jGatingBytes :: IORef Word64 + , jMetrics :: Metrics } +data Metrics = Metrics + { mAbortedConnections :: Word32 + , mReplaySize :: Int -- XXX: Histogram + } + +emptyMetrics :: Metrics +emptyMetrics = Metrics 0 0 + data Options = Options -- buffer and fsync every ms? -- max disk space in total? multiple of maxSize?