Skip to content

Commit

Permalink
feat(journal): add consumer side
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Nov 30, 2021
1 parent 5428ae6 commit e28865a
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 33 deletions.
24 changes: 17 additions & 7 deletions src/journal/app/Main.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Main where

import Control.Concurrent.Async (withAsync)
import Control.Concurrent (forkFinally)
import qualified Control.Exception as E
import Control.Monad (unless, forever, void)
Expand All @@ -10,20 +11,21 @@ import qualified Data.ByteString.Char8 as BSChar8
import Network.Socket
import Network.Socket.ByteString (sendAll, recv)
import Data.Word
import Data.Char (ord)
import Foreign.Ptr
import Data.Int (Int32)
import Data.Bits (shiftL, (.|.))

import Journal

------------------------------------------------------------------------

main :: IO ()
main = do
jour <- startJournal "/tmp/journal" defaultOptions
(jour, jc) <- startJournal "/tmp/journal" defaultOptions
putStrLn "Starting TCP server on port 3000"
runTCPServer Nothing "3000" (go jour)
withAsync (runProducer jour) $ \_a ->
runConsumer jc

runProducer :: Journal -> IO ()
runProducer jour = runTCPServer Nothing "3000" (go jour)
where
go :: Journal -> Socket -> IO ()
go jour sock = do
Expand All @@ -32,9 +34,17 @@ main = do
putStrLn ("Received: `" ++ BSChar8.unpack rxBs ++ "'")
if BS.null rxBs
then return ()
else do
else
sendAll sock (BSChar8.pack ("Appended " ++ show (BS.length rxBs) ++ " bytes\n"))
-- go jour sock

runConsumer :: JournalConsumer -> IO ()
runConsumer jc = go 10
where
go 0 = return ()
go n = do
bs <- readJournal jc
putStrLn ("Consumed: `" ++ BSChar8.unpack bs ++ "'")
go (n - BS.length bs)

------------------------------------------------------------------------

Expand Down
9 changes: 7 additions & 2 deletions src/journal/journal.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ extra-source-files:
library
hs-source-dirs: src/

-- XXX: separate boot deps from other deps
-- GHC boot library dependencies:
-- (https://gitlab.haskell.org/ghc/ghc/-/blob/master/packages)
build-depends:
, async
, base ^>=4.14.1.0
, binary
, bytestring
, directory
, filepath
, stm

build-depends:
, async
, mmap
, network

Expand Down Expand Up @@ -75,6 +79,7 @@ executable journal
-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
build-depends:
, async
, base ^>=4.14.1.0
, bytestring
, journal
Expand Down
40 changes: 26 additions & 14 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Journal
, appendBS
, tee
, appendRecv
, readJournal
, truncateAfterSnapshot
, replay
, replay_
Expand All @@ -19,11 +20,12 @@ import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (plusPtr)
import Network.Socket (Socket, recvBuf)
import System.Directory
(createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
import System.FilePath ((</>))
import System.IO.MMap (Mode(ReadWriteEx), mmapFilePtr, munmapFilePtr)

import Journal.Types
import Journal.Internal
import Journal.Types

------------------------------------------------------------------------

Expand All @@ -32,7 +34,7 @@ import Journal.Internal
defaultOptions :: Options
defaultOptions = Options 1024

startJournal :: FilePath -> Options -> IO Journal
startJournal :: FilePath -> Options -> IO (Journal, JournalConsumer)
startJournal dir (Options maxByteSize) = do
dirExists <- doesDirectoryExist dir
unless dirExists (createDirectoryIfMissing True dir)
Expand All @@ -46,54 +48,64 @@ startJournal dir (Options maxByteSize) = do
return (maxByteSize - nuls)
else return 0

putStrLn ("Offset: " ++ show offset)

(ptr, _rawSize, _offset, _size) <-
mmapFilePtr (dir </> "active") ReadWriteEx (Just (fromIntegral offset, maxByteSize))
-- XXX: assert max size
bytesWrittenCounter <- newCounter offset
bytesProducedCounter <- newCounter offset
ptrRef <- newJournalPtrRef (ptr `plusPtr` offset)
jcPtrRef <- newJournalConsumerPtrRef ptr
fileCounter <- newCounter 0
return (Journal ptrRef bytesWrittenCounter maxByteSize fileCounter)
bytesConsumedCounter <- newCounter 0
return (Journal ptrRef bytesProducedCounter maxByteSize fileCounter,
JournalConsumer jcPtrRef bytesProducedCounter bytesConsumedCounter)

------------------------------------------------------------------------

-- * Production

-- NOTE: pre-condition: `BS.length bs > 0`
appendBS :: Journal -> ByteString -> IO ()
appendBS = undefined
appendBS jour bs = undefined

-- NOTE: pre-condition: `len` > 0
tee :: Journal -> Socket -> Int -> IO ByteString
tee jour sock len = do
offset <- claim jour len
putStrLn ("tee: offset = " ++ show offset)
buf <- getJournalPtr jour
receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len
-- XXX: write header
writeHeader (buf `plusPtr` offset) len
fptr <- newForeignPtr_ buf
return (BS.copy (fromForeignPtr fptr offset len))
return (BS.copy (fromForeignPtr fptr (offset + hEADER_SIZE) len))

-- NOTE: pre-condition: `len` > 0
appendRecv :: Journal -> Socket -> Int -> IO Int
appendRecv jour sock len = do
offset <- claim jour len
buf <- getJournalPtr jour
receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len
-- XXX: write header
writeHeader (buf `plusPtr` offset) len
return receivedBytes

------------------------------------------------------------------------

-- * Consumption

readJournal :: JournalConsumer -> IO ByteString
readJournal = undefined
readJournal jc = do
ptr <- getJournalConsumerPtr jc
offset <- incrCounter hEADER_SIZE (jcBytesConsumed jc)
len <- waitForHeader ptr offset
fptr <- newForeignPtr_ ptr
let bs = BS.copy (fromForeignPtr fptr (offset + hEADER_SIZE) len)
incrCounter_ len (jcBytesConsumed jc)
return bs

------------------------------------------------------------------------

-- * Snapshots and replay

truncateAfterSnapshot :: Journal -> BytesRead -> IO ()
truncateAfterSnapshot = undefined
truncateAfterSnapshot :: Journal -> Int -> IO ()
truncateAfterSnapshot jour bytesRead = undefined

replay :: Journal -> (ByteString -> IO a) -> IO [a]
replay = undefined
Expand Down
46 changes: 42 additions & 4 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
module Journal.Internal where

import Control.Concurrent.Async
import Data.Binary (encode, decode)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as LBS
import Data.Word (Word8, Word32)
import Foreign.Ptr (Ptr, plusPtr)
import Foreign.Storable (pokeByteOff, peekByteOff)

import Journal.Types

------------------------------------------------------------------------

-- | The size of the journal entry header in bytes.
hEADER_SIZE :: Int
hEADER_SIZE = 0 -- XXX
hEADER_SIZE = 4 -- sizeOf (0 :: Word32)
-- XXX: version?
-- XXX: CRC?

claim :: Journal -> Int -> IO Int
claim jour bytes = incrCounter (bytes + hEADER_SIZE) (jOffset jour)
Expand All @@ -25,9 +33,39 @@ claim jour bytes = incrCounter (bytes + hEADER_SIZE) (jOffset jour)
-- continuing on the above example say we are trying to write 100 bytes we get offset 1200
-- if bytes + offset > jMaxS

writeHeader :: Journal -> Int -> IO ()
writeHeader = undefined
-- version
-- XXX: Use Data.Primitive.ByteArray.copyMutableByteArrayToPtr instead?
writeLBSToPtr :: ByteString -> Ptr Word8 -> IO ()
writeLBSToPtr bs ptr | LBS.null bs = return ()
| otherwise = go (fromIntegral (LBS.length bs - 1))
where
go :: Int -> IO ()
go 0 = pokeByteOff ptr 0 (LBS.index bs 0)
go n = do
pokeByteOff ptr n (LBS.index bs (fromIntegral n))
go (n - 1)

writeHeader :: Ptr Word8 -> Int -> IO ()
writeHeader ptr len = do
let header = encode (fromIntegral len :: Word32)
writeLBSToPtr header ptr

readHeader :: Ptr Word8 -> IO Int
readHeader ptr = do
b0 <- peekByteOff ptr 0
b1 <- peekByteOff ptr 1
b2 <- peekByteOff ptr 2
b3 <- peekByteOff ptr 3
return (fromIntegral (decode (LBS.pack [b0, b1, b2, b3]) :: Word32))

waitForHeader :: Ptr Word8 -> Int -> IO Int
waitForHeader ptr offset = go
where
go = do
len <- readHeader (ptr `plusPtr` offset)
-- TODO: This will break if we write a bytestring of length 0
if len == 0
then go
else return len

-- | "active" file becomes "dirty", and the "clean" file becomes the new
-- "active" file.
Expand Down
18 changes: 12 additions & 6 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ import Foreign.Ptr (Ptr, plusPtr)

------------------------------------------------------------------------

newtype Bytes = Bytes Int

newtype BytesRead = BytesRead Int
newtype Offset = Offset Int

newtype AtomicCounter = AtomicCounter (IORef Int)

newCounter :: Int -> IO AtomicCounter
Expand All @@ -25,6 +20,9 @@ incrCounter i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, j))
incrCounter_ :: Int -> AtomicCounter -> IO ()
incrCounter_ i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, ()))

readCounter :: AtomicCounter -> IO Int
readCounter (AtomicCounter ref) = readIORef ref

data Journal = Journal
{ jPtr :: !(TVar (Ptr Word8))
, jOffset :: {-# UNPACK #-} !AtomicCounter
Expand Down Expand Up @@ -59,5 +57,13 @@ data Options = Options
-- max disk space in total? multiple of maxSize?

data JournalConsumer = JournalConsumer
{ jcBytesRead :: IORef BytesRead
{ jcPtr :: {-# UNPACK #-} !(IORef (Ptr Word8))
, jcBytesProduced :: {-# UNPACK #-} !AtomicCounter
, jcBytesConsumed :: {-# UNPACK #-} !AtomicCounter
}

newJournalConsumerPtrRef :: Ptr Word8 -> IO (IORef (Ptr Word8))
newJournalConsumerPtrRef = newIORef

getJournalConsumerPtr :: JournalConsumer -> IO (Ptr Word8)
getJournalConsumerPtr = readIORef . jcPtr

0 comments on commit e28865a

Please sign in to comment.