Skip to content

Commit

Permalink
fix(journal): don't advance mmaped pointer upon startup
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Dec 2, 2021
1 parent feca5c1 commit ddf553d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
5 changes: 3 additions & 2 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ startJournal dir (Options maxByteSize) = do
else return 0

(ptr, _rawSize, _offset, _size) <-
mmapFilePtr (dir </> activeFile) ReadWriteEx (Just (fromIntegral offset, maxByteSize))
mmapFilePtr (dir </> activeFile) ReadWriteEx (Just (0, maxByteSize))
-- XXX: assert max size
bytesProducedCounter <- newCounter offset
ptrRef <- newJournalPtrRef (ptr `plusPtr` offset)
ptrRef <- newJournalPtrRef ptr
bytesConsumedCounter <- newCounter 0
jc <- JournalConsumer <$> newJournalConsumerPtrRef ptr <*> pure bytesConsumedCounter
<*> pure dir
Expand All @@ -77,6 +77,7 @@ appendBS jour bs = assert (0 < BS.length bs && BS.length bs <= jMaxByteSize jour
tee :: Journal -> Socket -> Int -> IO ByteString
tee jour sock len = assert (0 < len && len <= jMaxByteSize jour) $ do
offset <- claim jour len
putStrLn ("tee: writing to offset: " ++ show offset)
buf <- getJournalPtr jour
receivedBytes <- recvBuf sock (buf `plusPtr` (offset + hEADER_SIZE)) len
writeHeader (buf `plusPtr` offset) (makeValidHeader len)
Expand Down
3 changes: 2 additions & 1 deletion src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ waitForHeader :: Ptr Word8 -> Int -> IO Int
waitForHeader ptr offset = go
where
go = do
putStrLn ("waitForHeader: looking for header at offset: " ++ show offset)
hdr <- readHeader (ptr `plusPtr` offset)
if jhTag hdr == Empty
then threadDelay 1000 >> go -- XXX: wait strategy via options?
then threadDelay 1000000 >> go -- XXX: wait strategy via options?
else return (fromIntegral (jhLength hdr))

mapHeadersUntil :: Word8 -> (JournalHeader -> JournalHeader) -> Ptr Word8 -> Int -> IO ()
Expand Down

0 comments on commit ddf553d

Please sign in to comment.