Skip to content

Commit

Permalink
feat(journal): sketch more of the api
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Nov 30, 2021
1 parent 932f9ed commit a8e3e29
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 74 deletions.
62 changes: 61 additions & 1 deletion src/journal/app/Main.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,64 @@
module Main where

import Control.Concurrent (forkFinally)
import qualified Control.Exception as E
import Control.Monad (unless, forever, void)
import qualified Data.ByteString.Char8 as BS
import Network.Socket
import Network.Socket.ByteString (sendAll)
import System.IO.MMap
import Data.Word
import Foreign.Ptr

------------------------------------------------------------------------

main :: IO ()
main = putStrLn "Hello, Haskell!"
main = do
mmapWithFilePtr "/tmp/mmap.txt" ReadWrite Nothing $ \(ptr, len) -> do
putStrLn ("Memory mapped file length: " ++ show len)
-- XXX: advance ptr past all written data using headers (len above is merely
-- the size of the file not the contents).
putStrLn "Listening on localhost:3000"
runTCPServer Nothing "3000" (go (castPtr ptr))
where
go :: Ptr Word8 -> Socket -> IO ()
go buf sock = do
putStrLn "Waiting for client..."
rx <- recvBuf sock buf 1024
if rx == 0
then putStrLn "Done"
else do
sendAll sock (BS.pack ("Appended " ++ show rx ++ " bytes\n"))
go (buf `plusPtr` rx) sock

------------------------------------------------------------------------

-- Taken from the network package's documentation.
runTCPServer :: Maybe HostName -> ServiceName -> (Socket -> IO a) -> IO a
runTCPServer mhost port server = withSocketsDo $ do
addr <- resolve
E.bracket (open addr) close loop
where
resolve = do
let hints = defaultHints {
addrFlags = [AI_PASSIVE]
, addrSocketType = Stream
}
head <$> getAddrInfo (Just hints) mhost (Just port)
open addr = E.bracketOnError (openSocket addr) close $ \sock -> do
setSocketOption sock ReuseAddr 1
withFdSocket sock setCloseOnExecIfNeeded
bind sock $ addrAddress addr
listen sock 1024
return sock

openSocket :: AddrInfo -> IO Socket
openSocket addr = socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)

loop sock = forever $ E.bracketOnError (accept sock) (close . fst)
$ \(conn, _peer) -> void $
-- 'forkFinally' alone is unlikely to fail thus leaking @conn@,
-- but 'E.bracketOnError' above will be necessary if some
-- non-atomic setups (e.g. spawning a subprocess to handle
-- @conn@) before proper cleanup of @conn@ is your case
forkFinally (server conn) (const $ gracefulClose conn 5000)
9 changes: 7 additions & 2 deletions src/journal/journal.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ extra-source-files:

library
hs-source-dirs: src/

-- XXX: separate boot deps from other deps
build-depends:
, async
, base ^>=4.14.1.0
, bytestring
, stm
, mmap
, network

exposed-modules:
Expand Down Expand Up @@ -71,6 +71,11 @@ executable journal

-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
build-depends: base ^>=4.14.1.0
build-depends:
, base ^>=4.14.1.0
, bytestring
, mmap
, network

hs-source-dirs: app
default-language: Haskell2010
98 changes: 29 additions & 69 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,22 @@ module Journal
, startNewJournal
, restartOldJournal
, journal
, journalMany
, journalSocket
, journalSocket_
, truncateAfterSnapshot
, replay
, replay_
) where

import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS

-- XXX:
import Control.Concurrent (forkFinally)
import qualified Control.Exception as E
import Control.Monad (unless, forever, void)
import qualified Data.ByteString as S
import Network.Socket
import Network.Socket.ByteString (recv, sendAll)
import System.IO.MMap
import Data.Word
import Foreign.Ptr
import Network.Socket (Socket)

import Journal.Types

------------------------------------------------------------------------

-- * Initialisation

defaultOptions :: Options
defaultOptions = Options

Expand All @@ -39,15 +31,34 @@ startNewJournal dir opts = undefined
restartOldJournal :: FilePath -> IO Journal
restartOldJournal dir = undefined

journal :: Journal -> ByteString -> IO Position
journal = undefined
------------------------------------------------------------------------

-- * Production

journalMany :: Foldable t => Journal -> t ByteString -> IO Position
journalMany = undefined
journal :: Journal -> ByteString -> IO ()
journal = undefined

journalSocket :: Journal -> Socket -> Int -> IO (Position, ByteString)
journalSocket :: Journal -> Socket -> Int -> IO ByteString
journalSocket jour sock len = undefined

journalSocket_ :: Journal -> Socket -> Int -> IO Int
journalSocket_ jour sock len = do
buf <- undefined -- getPtrToActive jour
undefined
-- rxBytes <- recvBuf sock buf len
-- return rxBytes

------------------------------------------------------------------------

-- * Consumption

readJournal :: Journal -> Int -> IO ByteString
readJournal = undefined

------------------------------------------------------------------------

-- * Snapshots and replay

truncateAfterSnapshot :: Journal -> Position -> IO ()
truncateAfterSnapshot = undefined

Expand All @@ -56,54 +67,3 @@ replay = undefined

replay_ :: Journal -> Position -> (ByteString -> IO ()) -> IO ()
replay_ = undefined

------------------------------------------------------------------------
-- * Prototype application integration

runTCPServer :: Maybe HostName -> ServiceName -> (Socket -> IO a) -> IO a
runTCPServer mhost port server = withSocketsDo $ do
addr <- resolve
E.bracket (open addr) close loop
where
resolve = do
let hints = defaultHints {
addrFlags = [AI_PASSIVE]
, addrSocketType = Stream
}
head <$> getAddrInfo (Just hints) mhost (Just port)
open addr = E.bracketOnError (openSocket addr) close $ \sock -> do
setSocketOption sock ReuseAddr 1
withFdSocket sock setCloseOnExecIfNeeded
bind sock $ addrAddress addr
listen sock 1024
return sock

openSocket :: AddrInfo -> IO Socket
openSocket addr = socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)

loop sock = forever $ E.bracketOnError (accept sock) (close . fst)
$ \(conn, _peer) -> void $
-- 'forkFinally' alone is unlikely to fail thus leaking @conn@,
-- but 'E.bracketOnError' above will be necessary if some
-- non-atomic setups (e.g. spawning a subprocess to handle
-- @conn@) before proper cleanup of @conn@ is your case
forkFinally (server conn) (const $ gracefulClose conn 5000)

main :: IO ()
main = do
mmapWithFilePtr "/tmp/mmap.txt" ReadWrite Nothing $ \(ptr, len) -> do
putStrLn ("Memory mapped file length: " ++ show len)
-- XXX: advance ptr past all written data using headers (len above is merely
-- the size of the file not the contents).
putStrLn "Listening on localhost:3000"
runTCPServer Nothing "3000" (go (castPtr ptr))
where
go :: Ptr Word8 -> Socket -> IO ()
go buf sock = do
putStrLn "Waiting for client..."
rx <- recvBuf sock buf 1024
if rx == 0
then return ()
else do
sendAll sock (BS.pack ("Appended " ++ show rx ++ " bytes\n"))
go (buf `plusPtr` rx) sock
4 changes: 3 additions & 1 deletion src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ cleanDirtyFile = undefined
spawnCleaningThread :: IO (Async ())
spawnCleaningThread = undefined

data Inconsistency = Inconsistency
data Inconsistency
= PartialReceived
| PartialRotation

checkForInconsistencies :: Journal -> IO [Inconsistency]
checkForInconsistencies = undefined
Expand Down
17 changes: 16 additions & 1 deletion src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,28 @@ import Data.Word (Word32, Word64)

------------------------------------------------------------------------

newtype Bytes = Bytes Int

newtype BytesCounter = BytesCounter Word64

data Journal = Journal
{ jFile :: {-# UNPACK #-} !(TVar FilePath)
, jOffset :: {-# UNPACK #-} !(TVar Word64)
, jOffset :: {-# UNPACK #-} !(TVar Word64) -- Tail.
, jMaxSize :: !Word64 -- XXX: unit? bytes? mb?
, jSequence :: {-# UNPACK #-} !(TVar Word64)
-- jPointerToActiveFile
-- jGatingBytes :: IORef Word64
, jMetrics :: Metrics
}

data Metrics = Metrics
{ mAbortedConnections :: Word32
, mReplaySize :: Int -- XXX: Histogram
}

emptyMetrics :: Metrics
emptyMetrics = Metrics 0 0

data Options = Options
-- buffer and fsync every ms?
-- max disk space in total? multiple of maxSize?
Expand Down

0 comments on commit a8e3e29

Please sign in to comment.