Skip to content

Commit

Permalink
feat(journal): Add support for a secondary subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-daniel-gustafsson committed Mar 9, 2022
1 parent 4adbdc9 commit e620308
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 34 deletions.
21 changes: 12 additions & 9 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ import Journal.Types.AtomicCounter
-- * Initialisation and shutdown

defaultOptions :: Options
defaultOptions = Options (64 * 1024) ioLogger
defaultOptions = Options (64 * 1024) ioLogger Sub1

allocateJournal :: FilePath -> Options -> IO ()
allocateJournal fp (Options termBufferLen logger) = do
allocateJournal fp (Options termBufferLen logger maxSub) = do
unless (popCount termBufferLen == 1) $
-- XXX: check bounds
error "allocateJournal: oTermBufferLength must be a power of 2"
Expand All @@ -83,6 +83,9 @@ allocateJournal fp (Options termBufferLen logger) = do
(initTermId + TermId (int2Int32 (i - if i == 0 then 0 else pARTITION_COUNT)))
pageSize <- sysconfPageSize
writePageSize (Metadata meta) (int2Int32 pageSize)
-- Set tombstones for all subscribers *above* `maxSub`
forM_ (drop 1 [maxSub..]) $ \sub ->
writeBytesConsumed (Metadata meta) sub tombStone
else do
logg logger ("allocateJournal, journal exists: " ++ fp)
let logLength = termBufferLen * pARTITION_COUNT + lOG_META_DATA_LENGTH
Expand All @@ -99,7 +102,7 @@ allocateJournal fp (Options termBufferLen logger) = do
error "allocateJournal: pageSize doesn't match the metadata"

startJournal :: FilePath -> Options -> IO Journal
startJournal fp (Options termLength logger) = do
startJournal fp (Options termLength logger _maxSub) = do
logLength <- fromIntegral <$> getFileSize fp
bb <- mmapped fp logLength
meta <- wrapPart bb (logLength - lOG_META_DATA_LENGTH) lOG_META_DATA_LENGTH
Expand Down Expand Up @@ -161,9 +164,9 @@ recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock ptr len

-- * Consumption

readJournal :: Journal -> IO (Maybe ByteString)
readJournal jour = do
offset <- readBytesConsumed (jMetadata jour)
readJournal :: Journal -> Subscriber -> IO (Maybe ByteString)
readJournal jour sub = do
offset <- readBytesConsumed (jMetadata jour) sub
let jLog = logg (jLogger jour)
jLog ("readJournal, offset: " ++ show offset)

Expand Down Expand Up @@ -205,17 +208,17 @@ readJournal jour = do
if tag == Padding
then do
assertM (len >= 0)
incrBytesConsumed_ (jMetadata jour) (align (int322Int len) fRAME_ALIGNMENT)
incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT)
jLog "readJournal, skipping padding..."
readJournal jour
readJournal jour sub
else do
assertM (len > 0)
jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
bs <- getByteStringAt termBuffer
(int322Int relativeOffset + hEADER_LENGTH)
(int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
incrBytesConsumed_ (jMetadata jour) (align (int322Int len) fRAME_ALIGNMENT)
incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT)
return (Just bs)

------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ tryClaim jour len = do
-- separate process?
calculatePositionLimit :: Journal -> IO Int64
calculatePositionLimit jour = do
minSubscriberPos <- readBytesConsumed (jMetadata jour) -- XXX: only one subscriber so far.
minSubscriberPos <- readMinBytesConsumed (jMetadata jour)
maxSubscriberPos <- readCleanPosition (jMetadata jour)
termWindowLen <- termWindowLength (jMetadata jour)
let _consumerPos = maxSubscriberPos
Expand Down
20 changes: 10 additions & 10 deletions src/journal/src/Journal/MP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ appendBS jour bs = do
recvBytes :: BufferClaim -> Socket -> Int -> IO Int
recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock (ptr `plusPtr` hEADER_LENGTH) len

readJournal :: Journal -> IO (Maybe ByteString)
readJournal jour = do
offset <- readBytesConsumed (jMetadata jour)
readJournal :: Journal -> Subscriber -> IO (Maybe ByteString)
readJournal jour sub = do
offset <- readBytesConsumed (jMetadata jour) sub
let jLog = logg (jLogger jour)
jLog ("readJournal, offset: " ++ show offset)

Expand Down Expand Up @@ -88,15 +88,15 @@ readJournal jour = do
then do
if len >= 0
then do
_success <- casBytesConsumed (jMetadata jour) offset (offset + int322Int len)
_success <- casBytesConsumed (jMetadata jour) sub offset (offset + int322Int len)
jLog "readJournal, skipping padding..."
-- If the CAS fails, it just means that some other process incremented the
-- counter already.
readJournal jour
else readJournal jour -- If len is negative then the writer hasn't
-- finished writing the padding yet.
readJournal jour sub
else readJournal jour sub -- If len is negative then the writer hasn't
-- finished writing the padding yet.
else if len <= 0 || tag == Empty
then readJournal jour
then readJournal jour sub
else do
assertMMsg (show len) (len > 0)
jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
Expand All @@ -110,7 +110,7 @@ readJournal jour = do
(int322Int relativeOffset + hEADER_LENGTH)
(int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
success <- casBytesConsumed (jMetadata jour) offset
success <- casBytesConsumed (jMetadata jour) sub offset
(offset + (align (int322Int len) fRAME_ALIGNMENT))
if success
then do
Expand All @@ -120,7 +120,7 @@ readJournal jour = do
else
-- If the CAS failed it means that another process read what we were
-- about to read, so we retry reading the next item instead.
readJournal jour
readJournal jour sub

------------------------------------------------------------------------

Expand Down
45 changes: 33 additions & 12 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ data JMetadata = JMetadata
, mdTermLength :: Int32
, mdPageSize :: Int32
, mdCleanPosition :: Int64
, mdBytesConsumed :: Int64
, mdBytesConsumed1 :: Int64
, mdBytesConsumed2 :: Int64
-- padding
-- , mdDefaultFrameHeader :: Bytestring???
}
Expand All @@ -103,11 +104,24 @@ lOG_PAGE_SIZE_OFFSET = lOG_TERM_LENGTH_OFFSET +
lOG_CLEAN_POSITION_OFFSET :: Int
lOG_CLEAN_POSITION_OFFSET = lOG_PAGE_SIZE_OFFSET + sizeOf (4 :: Int32)

lOG_BYTES_CONSUMED_OFFSET :: Int
lOG_BYTES_CONSUMED_OFFSET = lOG_CLEAN_POSITION_OFFSET + sizeOf (8 :: Int)
lOG_BYTES_CONSUMED1_OFFSET :: Int
lOG_BYTES_CONSUMED1_OFFSET = lOG_CLEAN_POSITION_OFFSET + sizeOf (8 :: Int)

lOG_BYTES_CONSUMED2_OFFSET :: Int
lOG_BYTES_CONSUMED2_OFFSET = lOG_BYTES_CONSUMED1_OFFSET + sizeOf (8 :: Int)

lOG_META_DATA_LENGTH :: Int
lOG_META_DATA_LENGTH = lOG_BYTES_CONSUMED_OFFSET + sizeOf (8 :: Int) -- is this correct?
lOG_META_DATA_LENGTH = lOG_BYTES_CONSUMED2_OFFSET + sizeOf (8 :: Int) -- is this correct?

data Subscriber = Sub1 | Sub2
deriving (Bounded, Enum, Show)

subscriberOffset :: Subscriber -> Int
subscriberOffset Sub1 = lOG_BYTES_CONSUMED1_OFFSET
subscriberOffset Sub2 = lOG_BYTES_CONSUMED2_OFFSET

tombStone :: Int
tombStone = maxBound

------------------------------------------------------------------------

Expand Down Expand Up @@ -197,17 +211,23 @@ readPageSize (Metadata meta) = readInt32OffAddr meta lOG_PAGE_SIZE_OFFSET
writePageSize :: Metadata -> Int32 -> IO ()
writePageSize (Metadata meta) = writeInt32OffAddr meta lOG_PAGE_SIZE_OFFSET

readBytesConsumed :: Metadata -> IO Int
readBytesConsumed (Metadata meta) = readIntOffArrayIx meta lOG_BYTES_CONSUMED_OFFSET
readBytesConsumed :: Metadata -> Subscriber -> IO Int
readBytesConsumed (Metadata meta) sub = readIntOffArrayIx meta (subscriberOffset sub)

writeBytesConsumed :: Metadata -> Subscriber -> Int -> IO ()
writeBytesConsumed (Metadata meta) sub = writeIntOffAddr meta (subscriberOffset sub)

writeBytesConsumed :: Metadata -> Int -> IO ()
writeBytesConsumed (Metadata meta) = writeIntOffAddr meta lOG_BYTES_CONSUMED_OFFSET
incrBytesConsumed_ :: Metadata -> Subscriber -> Int -> IO ()
incrBytesConsumed_ (Metadata meta) sub = fetchAddIntArray_ meta (subscriberOffset sub)

incrBytesConsumed_ :: Metadata -> Int -> IO ()
incrBytesConsumed_ (Metadata meta) = fetchAddIntArray_ meta lOG_BYTES_CONSUMED_OFFSET
casBytesConsumed :: Metadata -> Subscriber -> Int -> Int -> IO Bool
casBytesConsumed (Metadata meta) sub = casIntAddr meta (subscriberOffset sub)

casBytesConsumed :: Metadata -> Int -> Int -> IO Bool
casBytesConsumed (Metadata meta) = casIntAddr meta lOG_BYTES_CONSUMED_OFFSET
readMinBytesConsumed :: Metadata -> IO Int
readMinBytesConsumed meta = do
minSub1 <- readBytesConsumed meta Sub1
minSub2 <- readBytesConsumed meta Sub2
pure (min minSub1 minSub2)

readCleanPosition :: Metadata -> IO Int
readCleanPosition (Metadata meta) = readIntOffArrayIx meta lOG_CLEAN_POSITION_OFFSET
Expand Down Expand Up @@ -387,6 +407,7 @@ emptyMetrics = Metrics 0 0
data Options = Options
{ oTermBufferLength :: !Int
, oLogger :: !Logger
, oMaxSubscriber :: Subscriber
}
-- archive
-- buffer and fsync every ms?
Expand Down
4 changes: 2 additions & 2 deletions src/journal/test/JournalTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ step DumpJournal m = (m, Result (Right ()))

exec :: Command -> Journal -> IO Response
exec (AppendBS rle) j = Result <$> appendBS j (decodeRunLength rle)
exec ReadJournal j = ByteString . fmap encodeRunLength <$> readJournal j
exec ReadJournal j = ByteString . fmap encodeRunLength <$> readJournal j Sub1
exec DumpJournal j = Result . Right <$> dumpJournal j

genRunLenEncoding :: Gen [(Int, Char)]
Expand Down Expand Up @@ -848,7 +848,7 @@ concExec queue jour cmd = do

execMP :: Command -> Journal -> IO Response
execMP (AppendBS rle) j = Result <$> MP.appendBS j (decodeRunLength rle)
execMP ReadJournal j = ByteString . fmap encodeRunLength <$> MP.readJournal j
execMP ReadJournal j = ByteString . fmap encodeRunLength <$> MP.readJournal j Sub1
execMP DumpJournal j = Result . Right <$> dumpJournal j

-- Generate all possible single-threaded executions from the concurrent history.
Expand Down

0 comments on commit e620308

Please sign in to comment.