From e620308e5edb95d660cbc46e4e0446a011c02a1d Mon Sep 17 00:00:00 2001 From: Daniel Gustafsson Date: Wed, 9 Mar 2022 14:01:33 +0100 Subject: [PATCH] feat(journal): Add support for a secondary subscriber --- src/journal/src/Journal.hs | 21 ++++++++------ src/journal/src/Journal/Internal.hs | 2 +- src/journal/src/Journal/MP.hs | 20 ++++++------- src/journal/src/Journal/Types.hs | 45 +++++++++++++++++++++-------- src/journal/test/JournalTest.hs | 4 +-- 5 files changed, 58 insertions(+), 34 deletions(-) diff --git a/src/journal/src/Journal.hs b/src/journal/src/Journal.hs index dc7e66ee..c476701e 100644 --- a/src/journal/src/Journal.hs +++ b/src/journal/src/Journal.hs @@ -53,10 +53,10 @@ import Journal.Types.AtomicCounter -- * Initialisation and shutdown defaultOptions :: Options -defaultOptions = Options (64 * 1024) ioLogger +defaultOptions = Options (64 * 1024) ioLogger Sub1 allocateJournal :: FilePath -> Options -> IO () -allocateJournal fp (Options termBufferLen logger) = do +allocateJournal fp (Options termBufferLen logger maxSub) = do unless (popCount termBufferLen == 1) $ -- XXX: check bounds error "allocateJournal: oTermBufferLength must be a power of 2" @@ -83,6 +83,9 @@ allocateJournal fp (Options termBufferLen logger) = do (initTermId + TermId (int2Int32 (i - if i == 0 then 0 else pARTITION_COUNT))) pageSize <- sysconfPageSize writePageSize (Metadata meta) (int2Int32 pageSize) + -- Set tombstones for all subscribers *above* `maxSub` + forM_ (drop 1 [maxSub..]) $ \sub -> + writeBytesConsumed (Metadata meta) sub tombStone else do logg logger ("allocateJournal, journal exists: " ++ fp) let logLength = termBufferLen * pARTITION_COUNT + lOG_META_DATA_LENGTH @@ -99,7 +102,7 @@ allocateJournal fp (Options termBufferLen logger) = do error "allocateJournal: pageSize doesn't match the metadata" startJournal :: FilePath -> Options -> IO Journal -startJournal fp (Options termLength logger) = do +startJournal fp (Options termLength logger _maxSub) = do logLength <- fromIntegral <$> getFileSize fp bb <- mmapped fp logLength meta <- wrapPart bb (logLength - lOG_META_DATA_LENGTH) lOG_META_DATA_LENGTH @@ -161,9 +164,9 @@ recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock ptr len -- * Consumption -readJournal :: Journal -> IO (Maybe ByteString) -readJournal jour = do - offset <- readBytesConsumed (jMetadata jour) +readJournal :: Journal -> Subscriber -> IO (Maybe ByteString) +readJournal jour sub = do + offset <- readBytesConsumed (jMetadata jour) sub let jLog = logg (jLogger jour) jLog ("readJournal, offset: " ++ show offset) @@ -205,9 +208,9 @@ readJournal jour = do if tag == Padding then do assertM (len >= 0) - incrBytesConsumed_ (jMetadata jour) (align (int322Int len) fRAME_ALIGNMENT) + incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT) jLog "readJournal, skipping padding..." - readJournal jour + readJournal jour sub else do assertM (len > 0) jLog ("readJournal, termCount: " ++ show (unTermCount termCount)) @@ -215,7 +218,7 @@ readJournal jour = do (int322Int relativeOffset + hEADER_LENGTH) (int322Int len - hEADER_LENGTH) assertM (BS.length bs == int322Int len - hEADER_LENGTH) - incrBytesConsumed_ (jMetadata jour) (align (int322Int len) fRAME_ALIGNMENT) + incrBytesConsumed_ (jMetadata jour) sub (align (int322Int len) fRAME_ALIGNMENT) return (Just bs) ------------------------------------------------------------------------ diff --git a/src/journal/src/Journal/Internal.hs b/src/journal/src/Journal/Internal.hs index 9e3f2c8a..bdf4f2d7 100644 --- a/src/journal/src/Journal/Internal.hs +++ b/src/journal/src/Journal/Internal.hs @@ -83,7 +83,7 @@ tryClaim jour len = do -- separate process? calculatePositionLimit :: Journal -> IO Int64 calculatePositionLimit jour = do - minSubscriberPos <- readBytesConsumed (jMetadata jour) -- XXX: only one subscriber so far. + minSubscriberPos <- readMinBytesConsumed (jMetadata jour) maxSubscriberPos <- readCleanPosition (jMetadata jour) termWindowLen <- termWindowLength (jMetadata jour) let _consumerPos = maxSubscriberPos diff --git a/src/journal/src/Journal/MP.hs b/src/journal/src/Journal/MP.hs index bfa99745..64221898 100644 --- a/src/journal/src/Journal/MP.hs +++ b/src/journal/src/Journal/MP.hs @@ -42,9 +42,9 @@ appendBS jour bs = do recvBytes :: BufferClaim -> Socket -> Int -> IO Int recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock (ptr `plusPtr` hEADER_LENGTH) len -readJournal :: Journal -> IO (Maybe ByteString) -readJournal jour = do - offset <- readBytesConsumed (jMetadata jour) +readJournal :: Journal -> Subscriber -> IO (Maybe ByteString) +readJournal jour sub = do + offset <- readBytesConsumed (jMetadata jour) sub let jLog = logg (jLogger jour) jLog ("readJournal, offset: " ++ show offset) @@ -88,15 +88,15 @@ readJournal jour = do then do if len >= 0 then do - _success <- casBytesConsumed (jMetadata jour) offset (offset + int322Int len) + _success <- casBytesConsumed (jMetadata jour) sub offset (offset + int322Int len) jLog "readJournal, skipping padding..." -- If the CAS fails, it just means that some other process incremented the -- counter already. - readJournal jour - else readJournal jour -- If len is negative then the writer hasn't - -- finished writing the padding yet. + readJournal jour sub + else readJournal jour sub -- If len is negative then the writer hasn't + -- finished writing the padding yet. else if len <= 0 || tag == Empty - then readJournal jour + then readJournal jour sub else do assertMMsg (show len) (len > 0) jLog ("readJournal, termCount: " ++ show (unTermCount termCount)) @@ -110,7 +110,7 @@ readJournal jour = do (int322Int relativeOffset + hEADER_LENGTH) (int322Int len - hEADER_LENGTH) assertM (BS.length bs == int322Int len - hEADER_LENGTH) - success <- casBytesConsumed (jMetadata jour) offset + success <- casBytesConsumed (jMetadata jour) sub offset (offset + (align (int322Int len) fRAME_ALIGNMENT)) if success then do @@ -120,7 +120,7 @@ readJournal jour = do else -- If the CAS failed it means that another process read what we were -- about to read, so we retry reading the next item instead. - readJournal jour + readJournal jour sub ------------------------------------------------------------------------ diff --git a/src/journal/src/Journal/Types.hs b/src/journal/src/Journal/Types.hs index 7d28d30f..88244979 100644 --- a/src/journal/src/Journal/Types.hs +++ b/src/journal/src/Journal/Types.hs @@ -76,7 +76,8 @@ data JMetadata = JMetadata , mdTermLength :: Int32 , mdPageSize :: Int32 , mdCleanPosition :: Int64 - , mdBytesConsumed :: Int64 + , mdBytesConsumed1 :: Int64 + , mdBytesConsumed2 :: Int64 -- padding -- , mdDefaultFrameHeader :: Bytestring??? } @@ -103,11 +104,24 @@ lOG_PAGE_SIZE_OFFSET = lOG_TERM_LENGTH_OFFSET + lOG_CLEAN_POSITION_OFFSET :: Int lOG_CLEAN_POSITION_OFFSET = lOG_PAGE_SIZE_OFFSET + sizeOf (4 :: Int32) -lOG_BYTES_CONSUMED_OFFSET :: Int -lOG_BYTES_CONSUMED_OFFSET = lOG_CLEAN_POSITION_OFFSET + sizeOf (8 :: Int) +lOG_BYTES_CONSUMED1_OFFSET :: Int +lOG_BYTES_CONSUMED1_OFFSET = lOG_CLEAN_POSITION_OFFSET + sizeOf (8 :: Int) + +lOG_BYTES_CONSUMED2_OFFSET :: Int +lOG_BYTES_CONSUMED2_OFFSET = lOG_BYTES_CONSUMED1_OFFSET + sizeOf (8 :: Int) lOG_META_DATA_LENGTH :: Int -lOG_META_DATA_LENGTH = lOG_BYTES_CONSUMED_OFFSET + sizeOf (8 :: Int) -- is this correct? +lOG_META_DATA_LENGTH = lOG_BYTES_CONSUMED2_OFFSET + sizeOf (8 :: Int) -- is this correct? + +data Subscriber = Sub1 | Sub2 + deriving (Bounded, Enum, Show) + +subscriberOffset :: Subscriber -> Int +subscriberOffset Sub1 = lOG_BYTES_CONSUMED1_OFFSET +subscriberOffset Sub2 = lOG_BYTES_CONSUMED2_OFFSET + +tombStone :: Int +tombStone = maxBound ------------------------------------------------------------------------ @@ -197,17 +211,23 @@ readPageSize (Metadata meta) = readInt32OffAddr meta lOG_PAGE_SIZE_OFFSET writePageSize :: Metadata -> Int32 -> IO () writePageSize (Metadata meta) = writeInt32OffAddr meta lOG_PAGE_SIZE_OFFSET -readBytesConsumed :: Metadata -> IO Int -readBytesConsumed (Metadata meta) = readIntOffArrayIx meta lOG_BYTES_CONSUMED_OFFSET +readBytesConsumed :: Metadata -> Subscriber -> IO Int +readBytesConsumed (Metadata meta) sub = readIntOffArrayIx meta (subscriberOffset sub) + +writeBytesConsumed :: Metadata -> Subscriber -> Int -> IO () +writeBytesConsumed (Metadata meta) sub = writeIntOffAddr meta (subscriberOffset sub) -writeBytesConsumed :: Metadata -> Int -> IO () -writeBytesConsumed (Metadata meta) = writeIntOffAddr meta lOG_BYTES_CONSUMED_OFFSET +incrBytesConsumed_ :: Metadata -> Subscriber -> Int -> IO () +incrBytesConsumed_ (Metadata meta) sub = fetchAddIntArray_ meta (subscriberOffset sub) -incrBytesConsumed_ :: Metadata -> Int -> IO () -incrBytesConsumed_ (Metadata meta) = fetchAddIntArray_ meta lOG_BYTES_CONSUMED_OFFSET +casBytesConsumed :: Metadata -> Subscriber -> Int -> Int -> IO Bool +casBytesConsumed (Metadata meta) sub = casIntAddr meta (subscriberOffset sub) -casBytesConsumed :: Metadata -> Int -> Int -> IO Bool -casBytesConsumed (Metadata meta) = casIntAddr meta lOG_BYTES_CONSUMED_OFFSET +readMinBytesConsumed :: Metadata -> IO Int +readMinBytesConsumed meta = do + minSub1 <- readBytesConsumed meta Sub1 + minSub2 <- readBytesConsumed meta Sub2 + pure (min minSub1 minSub2) readCleanPosition :: Metadata -> IO Int readCleanPosition (Metadata meta) = readIntOffArrayIx meta lOG_CLEAN_POSITION_OFFSET @@ -387,6 +407,7 @@ emptyMetrics = Metrics 0 0 data Options = Options { oTermBufferLength :: !Int , oLogger :: !Logger + , oMaxSubscriber :: Subscriber } -- archive -- buffer and fsync every ms? diff --git a/src/journal/test/JournalTest.hs b/src/journal/test/JournalTest.hs index 7fe5b37f..45cf8782 100644 --- a/src/journal/test/JournalTest.hs +++ b/src/journal/test/JournalTest.hs @@ -296,7 +296,7 @@ step DumpJournal m = (m, Result (Right ())) exec :: Command -> Journal -> IO Response exec (AppendBS rle) j = Result <$> appendBS j (decodeRunLength rle) -exec ReadJournal j = ByteString . fmap encodeRunLength <$> readJournal j +exec ReadJournal j = ByteString . fmap encodeRunLength <$> readJournal j Sub1 exec DumpJournal j = Result . Right <$> dumpJournal j genRunLenEncoding :: Gen [(Int, Char)] @@ -848,7 +848,7 @@ concExec queue jour cmd = do execMP :: Command -> Journal -> IO Response execMP (AppendBS rle) j = Result <$> MP.appendBS j (decodeRunLength rle) -execMP ReadJournal j = ByteString . fmap encodeRunLength <$> MP.readJournal j +execMP ReadJournal j = ByteString . fmap encodeRunLength <$> MP.readJournal j Sub1 execMP DumpJournal j = Result . Right <$> dumpJournal j -- Generate all possible single-threaded executions from the concurrent history.