Skip to content

Commit

Permalink
feat(journal): use claim
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Nov 30, 2021
1 parent cf394c1 commit 5428ae6
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 31 deletions.
11 changes: 6 additions & 5 deletions src/journal/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ main = do
go :: Journal -> Socket -> IO ()
go jour sock = do
putStrLn "A client connected..."
rxBytes <- appendRecv jour sock 4096
if rxBytes == 4096
then putStrLn "TODO: There's more to read"
rxBs <- tee jour sock 5
putStrLn ("Received: `" ++ BSChar8.unpack rxBs ++ "'")
if BS.null rxBs
then return ()
else do
sendAll sock (BSChar8.pack ("Appended " ++ show rxBytes ++ " bytes\n"))
go jour sock
sendAll sock (BSChar8.pack ("Appended " ++ show (BS.length rxBs) ++ " bytes\n"))
-- go jour sock

------------------------------------------------------------------------

Expand Down
29 changes: 17 additions & 12 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import System.FilePath ((</>))
import System.IO.MMap (Mode(ReadWriteEx), mmapFilePtr, munmapFilePtr)

import Journal.Types
import Journal.Internal

------------------------------------------------------------------------

Expand All @@ -39,18 +40,21 @@ startJournal dir (Options maxByteSize) = do
offset <- do
activeExists <- doesFileExist (dir </> "active")
if activeExists
then
-- XXX: What if the user writes a NUL? Safer to use takeWhileEnd and
-- subtract from maxByteSize?
BS.length . BS.takeWhile (/= (fromIntegral 0)) <$> BS.readFile (dir </> "active")
then do
nuls <- BS.length . BS.takeWhileEnd (== (fromIntegral 0)) <$>
BS.readFile (dir </> "active")
return (maxByteSize - nuls)
else return 0

putStrLn ("Offset: " ++ show offset)

(ptr, _rawSize, _offset, _size) <-
mmapFilePtr (dir </> "active") ReadWriteEx (Just (fromIntegral offset, maxByteSize))
-- XXX: assert max size
bytesWritten <- newBytesCounter offset
bytesWrittenCounter <- newCounter offset
ptrRef <- newJournalPtrRef (ptr `plusPtr` offset)
return (Journal ptrRef bytesWritten maxByteSize)
fileCounter <- newCounter 0
return (Journal ptrRef bytesWrittenCounter maxByteSize fileCounter)

------------------------------------------------------------------------

Expand All @@ -61,19 +65,20 @@ appendBS = undefined

tee :: Journal -> Socket -> Int -> IO ByteString
tee jour sock len = do
offset <- return 0 -- XXX: use claim to get this
offset <- claim jour len
putStrLn ("tee: offset = " ++ show offset)
buf <- getJournalPtr jour
receivedBytes <- recvBuf sock buf len
advanceJournalPtr jour receivedBytes
receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len
-- XXX: write header
fptr <- newForeignPtr_ buf
return (BS.copy (fromForeignPtr fptr offset len))

appendRecv :: Journal -> Socket -> Int -> IO Int
appendRecv jour sock len = do
-- claim
offset <- claim jour len
buf <- getJournalPtr jour
receivedBytes <- recvBuf sock buf len
advanceJournalPtr jour receivedBytes
receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len
-- XXX: write header
return receivedBytes

------------------------------------------------------------------------
Expand Down
10 changes: 8 additions & 2 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ import Journal.Types

------------------------------------------------------------------------

claim :: Bytes -> Journal -> IO (Maybe Offset)
claim bytes = undefined
-- | The size of the journal entry header in bytes.
hEADER_SIZE :: Int
hEADER_SIZE = 0 -- XXX

claim :: Journal -> Int -> IO Int
claim jour bytes = incrCounter (bytes + hEADER_SIZE) (jOffset jour)

-- XXX:
-- if bytes + offset <= jMaxSize then write to active file
-- if bytes + offset > jMaxSize then
-- if offset - jMaxSize == bytes then rotate files
Expand Down
24 changes: 12 additions & 12 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@ newtype Bytes = Bytes Int
newtype BytesRead = BytesRead Int
newtype Offset = Offset Int

newtype BytesCounter = BytesCounter (IORef Int)
newtype AtomicCounter = AtomicCounter (IORef Int)

newBytesCounter :: Int -> IO BytesCounter
newBytesCounter offset = do
ref <- newIORef offset
return (BytesCounter ref)
newCounter :: Int -> IO AtomicCounter
newCounter i = AtomicCounter <$> newIORef i

incrCounter :: Int -> AtomicCounter -> IO Int
incrCounter i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, j))

incrCounter_ :: Int -> AtomicCounter -> IO ()
incrCounter_ i (AtomicCounter ref) = atomicModifyIORef' ref (\j -> (i + j, ()))

data Journal = Journal
{ jPtr :: !(TVar (Ptr Word8))
, jBytesWritten :: !BytesCounter
, jMaxByteSize :: !Int
, jOffset :: {-# UNPACK #-} !AtomicCounter
, jMaxByteSize :: {-# UNPACK #-} !Int
, jFileCount :: {-# UNPACK #-} !AtomicCounter
-- , jFile :: {-# UNPACK #-} !(TVar FilePath)
-- , jOffset :: {-# UNPACK #-} !(TVar Word64) -- Tail.
-- , jSequence :: {-# UNPACK #-} !(TVar Word64)
Expand All @@ -39,11 +44,6 @@ newJournalPtrRef = newTVarIO
getJournalPtr :: Journal -> IO (Ptr Word8)
getJournalPtr = atomically . readTVar . jPtr

-- XXX: remove, can be (re)calculate dfrom jBytesWritten
advanceJournalPtr :: Journal -> Int -> IO ()
advanceJournalPtr jour bytes = atomically $
modifyTVar' (jPtr jour) (`plusPtr` bytes)

data Metrics = Metrics
{ mAbortedConnections :: Word32
, mReplaySize :: Int -- XXX: Histogram
Expand Down

0 comments on commit 5428ae6

Please sign in to comment.