Skip to content

Commit

Permalink
consensus: remove Mempool.getTxSize
Browse files Browse the repository at this point in the history
The 'LedgerSupportsMempool.txInBlockSize' method suffices, and is already in
scope within the mempool itself.

Adding sizes to the `MempoolSnapshot` interface replaces the other use of
`getTxSize`, in the `NodeKernel`.
  • Loading branch information
nfrisby committed Aug 28, 2024
1 parent 483e931 commit 2cd4b1d
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,6 @@ reproMempoolForge numBlks env = do
-- one megabyte should generously accomodate two blocks' worth of txs
(Mempool.MempoolCapacityBytesOverride $ Mempool.MempoolCapacityBytes $ 2^(20 :: Int))
nullTracer
LedgerSupportsMempool.txInBlockSize

void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks ref mempool)
pure Nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ tests =
mempool <- Mocked.openMockedMempool
capcityBytesOverride
tracer
LedgerSupportsMempool.txInBlockSize
mempoolParams

mempool `should_process` [ _137 ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
(configLedger cfg)
mempoolCapacityOverride
(mempoolTracer tracers)
txInBlockSize

fetchClientRegistry <- newFetchClientRegistry

Expand Down Expand Up @@ -732,8 +731,8 @@ getMempoolReader mempool = MempoolReader.TxSubmissionMempoolReader
snapshotHasTx } =
MempoolReader.MempoolSnapshot
{ mempoolTxIdsAfter = \idx ->
[ (txId (txForgetValidated tx), idx', getTxSize mempool (txForgetValidated tx))
| (tx, idx') <- snapshotTxsAfter idx
[ (txId (txForgetValidated tx), idx', sz)
| (tx, idx', sz) <- snapshotTxsAfter idx
]
, mempoolLookupTx = snapshotLookupTx
, mempoolHasTx = snapshotHasTx
Expand Down
1 change: 0 additions & 1 deletion ouroboros-consensus/bench/mempool-bench/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ openMempoolWithCapacity capacity =
(Mempool.unByteSize capacity)
)
Tracer.nullTracer
TestBlock.txSize
Mocked.MempoolAndModelParams {
Mocked.immpInitialState = TestBlock.initialLedgerState
, Mocked.immpLedgerConfig = TestBlock.sampleLedgerConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,6 @@ data Mempool m blk = Mempool {
-- capacity, i.e., we won't admit new transactions until some have been
-- removed because they have become invalid.
, getCapacity :: STM m Cap.MempoolCapacityBytes

-- | Return the post-serialisation size in bytes of a 'GenTx'.
, getTxSize :: GenTx blk -> TxSizeInBytes
}

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -335,7 +332,8 @@ data MempoolSnapshot blk = MempoolSnapshot {
-- | Get all transactions (oldest to newest) in the mempool snapshot,
-- along with their ticket number, which are associated with a ticket
-- number greater than the one provided.
, snapshotTxsAfter :: TicketNo -> [(Validated (GenTx blk), TicketNo)]
, snapshotTxsAfter ::
TicketNo -> [(Validated (GenTx blk), TicketNo, TxSizeInBytes)]

-- | Get a specific transaction from the mempool snapshot by its ticket
-- number, if it exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ data MempoolEnv m blk = MempoolEnv {
, mpEnvAddTxsRemoteFifo :: MVar m ()
, mpEnvAddTxsAllFifo :: MVar m ()
, mpEnvTracer :: Tracer m (TraceEventMempool blk)
, mpEnvTxSize :: GenTx blk -> TxSizeInBytes
, mpEnvCapacityOverride :: MempoolCapacityBytesOverride
}

Expand All @@ -200,9 +199,8 @@ initMempoolEnv :: ( IOLike m
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (MempoolEnv m blk)
initMempoolEnv ledgerInterface cfg capacityOverride tracer txSize = do
initMempoolEnv ledgerInterface cfg capacityOverride tracer = do
st <- atomically $ getCurrentLedgerState ledgerInterface
let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st)
isVar <- newTVarIO $ initInternalState capacityOverride TxSeq.zeroTicketNo slot st'
Expand All @@ -215,7 +213,6 @@ initMempoolEnv ledgerInterface cfg capacityOverride tracer txSize = do
, mpEnvAddTxsRemoteFifo = addTxRemoteFifo
, mpEnvAddTxsAllFifo = addTxAllFifo
, mpEnvTracer = tracer
, mpEnvTxSize = txSize
, mpEnvCapacityOverride = capacityOverride
}

Expand Down Expand Up @@ -323,14 +320,13 @@ extendVRPrevApplied cfg txTicket vr =
-- again.
extendVRNew :: (LedgerSupportsMempool blk, HasTxId (GenTx blk))
=> LedgerConfig blk
-> (GenTx blk -> TxSizeInBytes)
-> WhetherToIntervene
-> GenTx blk
-> ValidationResult (GenTx blk) blk
-> ( Either (ApplyTxErr blk) (Validated (GenTx blk))
, ValidationResult (GenTx blk) blk
)
extendVRNew cfg txSize wti tx vr = assert (isNothing vrNewValid) $
extendVRNew cfg wti tx vr = assert (isNothing vrNewValid) $
case runExcept (applyTx cfg wti vrSlotNo tx vrAfter) of
Left err ->
( Left err
Expand All @@ -339,7 +335,7 @@ extendVRNew cfg txSize wti tx vr = assert (isNothing vrNewValid) $
)
Right (st', vtx) ->
( Right vtx
, vr { vrValid = vrValid :> TxTicket vtx nextTicketNo (txSize tx)
, vr { vrValid = vrValid :> TxTicket vtx nextTicketNo sz
, vrValidTxIds = Set.insert (txId tx) vrValidTxIds
, vrNewValid = Just vtx
, vrAfter = st'
Expand All @@ -359,6 +355,8 @@ extendVRNew cfg txSize wti tx vr = assert (isNothing vrNewValid) $

nextTicketNo = succ vrLastTicketNo

sz = txInBlockSize tx

{-------------------------------------------------------------------------------
Conversions
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -428,11 +426,13 @@ snapshotFromIS is = MempoolSnapshot {
where
implSnapshotGetTxs :: InternalState blk
-> [(Validated (GenTx blk), TicketNo)]
implSnapshotGetTxs = flip implSnapshotGetTxsAfter TxSeq.zeroTicketNo
implSnapshotGetTxs is' =
map (\(a, b, _c) -> (a, b))
$ implSnapshotGetTxsAfter is' TxSeq.zeroTicketNo

implSnapshotGetTxsAfter :: InternalState blk
-> TicketNo
-> [(Validated (GenTx blk), TicketNo)]
-> [(Validated (GenTx blk), TicketNo, TxSizeInBytes)]
implSnapshotGetTxsAfter IS{isTxs} =
TxSeq.toTuples . snd . TxSeq.splitAfterTicketNo isTxs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ openMempool ::
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (Mempool m blk)
openMempool registry ledger cfg capacityOverride tracer txSize = do
env <- initMempoolEnv ledger cfg capacityOverride tracer txSize
openMempool registry ledger cfg capacityOverride tracer = do
env <- initMempoolEnv ledger cfg capacityOverride tracer
forkSyncStateOnTipPointChange registry env
return $ mkMempool env

Expand Down Expand Up @@ -91,10 +90,9 @@ openMempoolWithoutSyncThread ::
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (Mempool m blk)
openMempoolWithoutSyncThread ledger cfg capacityOverride tracer txSize =
mkMempool <$> initMempoolEnv ledger cfg capacityOverride tracer txSize
openMempoolWithoutSyncThread ledger cfg capacityOverride tracer =
mkMempool <$> initMempoolEnv ledger cfg capacityOverride tracer

mkMempool ::
( IOLike m
Expand All @@ -104,19 +102,17 @@ mkMempool ::
)
=> MempoolEnv m blk -> Mempool m blk
mkMempool mpEnv = Mempool
{ addTx = implAddTx istate remoteFifo allFifo cfg txSize trcr
{ addTx = implAddTx istate remoteFifo allFifo cfg trcr
, removeTxs = implRemoveTxs mpEnv
, syncWithLedger = implSyncWithLedger mpEnv
, getSnapshot = snapshotFromIS <$> readTVar istate
, getSnapshotFor = \fls -> pureGetSnapshotFor cfg fls co <$> readTVar istate
, getCapacity = isCapacity <$> readTVar istate
, getTxSize = txSize
}
where MempoolEnv { mpEnvStateVar = istate
, mpEnvAddTxsRemoteFifo = remoteFifo
, mpEnvAddTxsAllFifo = allFifo
, mpEnvLedgerCfg = cfg
, mpEnvTxSize = txSize
, mpEnvTracer = trcr
, mpEnvCapacityOverride = co
} = mpEnv
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,13 @@ toList (TxSeq ftree) = Foldable.toList ftree

-- | Convert a 'TxSeq' to a list of pairs of transactions and their
-- associated 'TicketNo's.
toTuples :: TxSeq tx -> [(tx, TicketNo)]
toTuples :: TxSeq tx -> [(tx, TicketNo, TxSizeInBytes)]
toTuples (TxSeq ftree) = fmap
(\ticket -> (txTicketTx ticket, txTicketNo ticket))
(\ticket ->
( txTicketTx ticket
, txTicketNo ticket
, txTicketTxSizeInBytes ticket)
)
(Foldable.toList ftree)

-- | \( O(1) \). Return the 'MempoolSize' of the given 'TxSeq'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,13 @@ implAddTx ::
-- ^ The FIFO for all remote peers and local clients
-> LedgerConfig blk
-- ^ The configuration of the ledger.
-> (GenTx blk -> TxSizeInBytes)
-- ^ The function to calculate the size of a
-- transaction.
-> Tracer m (TraceEventMempool blk)
-> AddTxOnBehalfOf
-- ^ Whether we're acting on behalf of a remote peer or a local client.
-> GenTx blk
-- ^ The transaction to add to the mempool.
-> m (MempoolAddTxResult blk)
implAddTx istate remoteFifo allFifo cfg txSize trcr onbehalf tx =
implAddTx istate remoteFifo allFifo cfg trcr onbehalf tx =
-- To ensure fair behaviour between threads that are trying to add
-- transactions, we make them all queue in a fifo. Only the one at the head
-- of the queue gets to actually wait for space to get freed up in the
Expand Down Expand Up @@ -87,7 +84,7 @@ implAddTx istate remoteFifo allFifo cfg txSize trcr onbehalf tx =
where
implAddTx' = do
(result, ev) <- atomically $ do
outcome <- implTryAddTx istate cfg txSize
outcome <- implTryAddTx istate cfg
(whetherToIntervene onbehalf)
tx
case outcome of
Expand Down Expand Up @@ -144,16 +141,13 @@ implTryAddTx ::
-- ^ The InternalState TVar.
-> LedgerConfig blk
-- ^ The configuration of the ledger.
-> (GenTx blk -> TxSizeInBytes)
-- ^ The function to calculate the size of a
-- transaction.
-> WhetherToIntervene
-> GenTx blk
-- ^ The transaction to add to the mempool.
-> STM m (TryAddTx blk)
implTryAddTx istate cfg txSize wti tx = do
implTryAddTx istate cfg wti tx = do
is <- readTVar istate
let outcome = pureTryAddTx cfg txSize wti tx is
let outcome = pureTryAddTx cfg wti tx is
case outcome of
TryAddTx (Just is') _ _ -> writeTVar istate is'
_ -> return ()
Expand All @@ -172,15 +166,13 @@ pureTryAddTx ::
)
=> LedgerCfg (LedgerState blk)
-- ^ The ledger configuration.
-> (GenTx blk -> TxSizeInBytes)
-- ^ The function to claculate the size of a transaction.
-> WhetherToIntervene
-> GenTx blk
-- ^ The transaction to add to the mempool.
-> InternalState blk
-- ^ The current internal state of the mempool.
-> TryAddTx blk
pureTryAddTx cfg txSize wti tx is
pureTryAddTx cfg wti tx is
-- We add the transaction if there is at least one byte free left in the
-- mempool.
| let curSize = msNumBytes $ isMempoolSize is
Expand Down Expand Up @@ -213,7 +205,7 @@ pureTryAddTx cfg txSize wti tx is
| otherwise
= NoSpaceLeft
where
(eVtx, vr) = extendVRNew cfg txSize wti tx $ validationResultFromIS is
(eVtx, vr) = extendVRNew cfg wti tx $ validationResultFromIS is
is' = internalStateFromVR vr

{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ openMockedMempool ::
)
=> Mempool.MempoolCapacityBytesOverride
-> Tracer IO (Mempool.TraceEventMempool blk)
-> (Ledger.GenTx blk -> Mempool.TxSizeInBytes)
-> InitialMempoolAndModelParams blk
-> IO (MockedMempool IO blk)
openMockedMempool capacityOverride tracer txSizeImpl initialParams = do
openMockedMempool capacityOverride tracer initialParams = do
currentLedgerStateTVar <- newTVarIO (immpInitialState initialParams)
let ledgerItf = Mempool.LedgerInterface {
Mempool.getCurrentLedgerState = readTVar currentLedgerStateTVar
Expand All @@ -71,7 +70,6 @@ openMockedMempool capacityOverride tracer txSizeImpl initialParams = do
(immpLedgerConfig initialParams)
capacityOverride
tracer
txSizeImpl
pure MockedMempool {
getLedgerInterface = ledgerItf
, getLedgerStateTVar = currentLedgerStateTVar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ prop_Mempool_snapshotTxs_snapshotTxsAfter :: TestSetup -> Property
prop_Mempool_snapshotTxs_snapshotTxsAfter setup =
withTestMempool setup $ \TestMempool { mempool } -> do
let Mempool { getSnapshot } = mempool
prj (tx, tn, _sz) = (tx, tn)
MempoolSnapshot { snapshotTxs, snapshotTxsAfter} <- atomically getSnapshot
return $ snapshotTxs === snapshotTxsAfter zeroTicketNo
return $ snapshotTxs === map prj (snapshotTxsAfter zeroTicketNo)

-- | Test that all valid transactions added to a 'Mempool' can be retrieved
-- afterward.
Expand Down Expand Up @@ -733,7 +734,6 @@ withTestMempool setup@TestSetup {..} prop =
testLedgerConfig
testMempoolCapOverride
tracer
txSize
result <- addTxs mempool testInitialTxs
-- the invalid transactions are reported in the same order they were
-- added, so the first error is not the result of a cascade
Expand Down Expand Up @@ -843,7 +843,7 @@ prop_TxSeq_lookupByTicketNo_complete xs =
and [ case TxSeq.lookupByTicketNo txseq tn of
Just tx' -> tx == tx'
Nothing -> False
| (tx, tn) <- TxSeq.toTuples txseq ]
| (tx, tn, _sz) <- TxSeq.toTuples txseq ]
where
txseq :: TxSeq Int
txseq =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ testTxSizeFairness TestParams { mempoolMaxCapacity, smallTxSize, largeTxSize, nr
(testBlockLedgerConfigFrom eraParams)
(Mempool.mkCapacityBytesOverride mempoolMaxCapacity)
Tracer.nullTracer
genTxSize

----------------------------------------------------------------------------
-- Add and collect transactions
Expand Down

0 comments on commit 2cd4b1d

Please sign in to comment.