diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index e765f9a60c..1bc1e0cacb 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -64,6 +64,7 @@ library Ouroboros.Network.PeerSelection.RootPeersDNS.PublicRootPeers Ouroboros.Network.PeerSharing Ouroboros.Network.TxSubmission.Inbound + Ouroboros.Network.TxSubmission.Inbound.Server Ouroboros.Network.TxSubmission.Inbound.Decision Ouroboros.Network.TxSubmission.Inbound.Policy Ouroboros.Network.TxSubmission.Inbound.State diff --git a/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs b/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs index 5537667352..f19c416210 100644 --- a/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs +++ b/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs @@ -8,6 +8,8 @@ {-# OPTIONS_GHC -Wno-partial-fields #-} +-- | Legacy `tx-submission` inbound peer. +-- module Ouroboros.Network.TxSubmission.Inbound ( txSubmissionInbound , TxSubmissionMempoolWriter (..) @@ -41,6 +43,7 @@ import Network.TypedProtocol.Pipelined (N, Nat (..), natToInt) import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) import Ouroboros.Network.Protocol.TxSubmission2.Server import Ouroboros.Network.Protocol.TxSubmission2.Type +import Ouroboros.Network.TxSubmission.Inbound.Decision (TxDecision) import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..), TxSubmissionMempoolReader (..)) @@ -81,9 +84,17 @@ data TraceTxSubmissionInbound txid tx = -- | Just processed transaction pass/fail breakdown. | TraceTxSubmissionProcessed ProcessedTxCount -- | Server received 'MsgDone' - | TraceTxInboundTerminated | TraceTxInboundCanRequestMoreTxs Int | TraceTxInboundCannotRequestMoreTxs Int + + -- + -- messages emitted by the new implementation of the server in + -- "Ouroboros.Network.TxSubmission.Inbound.Server"; some of them are also + -- used in this module. + -- + + | TraceTxInboundTerminated + | TraceTxInboundDecision (TxDecision txid tx) deriving (Eq, Show) data TxSubmissionProtocolError = diff --git a/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Server.hs b/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Server.hs new file mode 100644 index 0000000000..1731245b06 --- /dev/null +++ b/ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Server.hs @@ -0,0 +1,185 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Network.TxSubmission.Inbound.Server where + +import Data.List.NonEmpty qualified as NonEmpty +import Data.Map.Strict qualified as Map +import Data.Sequence.Strict qualified as StrictSeq +import Data.Set qualified as Set + +import Control.Concurrent.Class.MonadSTM.Strict +import Control.Exception (assert) +import Control.Monad.Class.MonadThrow +import Control.Tracer (Tracer, traceWith) + +import Network.TypedProtocol.Pipelined + +import Control.Monad (unless) +import Ouroboros.Network.Protocol.TxSubmission2.Server +import Ouroboros.Network.TxSubmission.Inbound (TraceTxSubmissionInbound (..), + TxSubmissionMempoolWriter (..), TxSubmissionProtocolError (..)) +import Ouroboros.Network.TxSubmission.Inbound.Decision (TxDecision (..)) +import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI (..)) + + +-- | A tx-submission outbound side (server, sic!). +-- +-- The server blocks on receiving `TxDecision` from the decision logic. If +-- there are tx's to download it pipelines two requests: first for tx's second +-- for txid's. If there are no tx's to download, it either sends a blocking or +-- non-blocking request for txid's. +-- +txSubmissionInboundV2 + :: forall txid tx idx m. + ( MonadSTM m + , MonadThrow m + , Ord txid + ) + => Tracer m (TraceTxSubmissionInbound txid tx) + -> TxSubmissionMempoolWriter txid tx idx m + -> PeerTxAPI m txid tx + -> TxSubmissionServerPipelined txid tx m () +txSubmissionInboundV2 + tracer + TxSubmissionMempoolWriter { + txId, + mempoolAddTxs + } + PeerTxAPI { + readTxDecision, + handleReceivedTxIds, + handleReceivedTxs + } + = + TxSubmissionServerPipelined serverIdle + where + serverIdle + :: m (ServerStIdle Z txid tx m ()) + serverIdle = do + -- Block on next decision. + txd@TxDecision { txdTxsToRequest = txsToReq, txdTxsToMempool = txs } + <- readTxDecision + traceWith tracer (TraceTxInboundDecision txd) + txidsAccepted <- mempoolAddTxs txs + let !collected = length txidsAccepted + traceWith tracer $ + TraceTxSubmissionCollected collected + -- TODO: + -- We can update the state so that other `tx-submission` servers will + -- not try to add these txs to the mempool. + if Set.null txsToReq + then serverReqTxIds Zero txd + else serverReqTxs txd + + + -- Pipelined request of txs + serverReqTxs :: TxDecision txid tx + -> m (ServerStIdle Z txid tx m ()) + serverReqTxs txd@TxDecision { txdTxsToRequest = txsToReq } = + pure $ SendMsgRequestTxsPipelined (Set.toList txsToReq) + (serverReqTxIds (Succ Zero) txd) + + + serverReqTxIds :: forall (n :: N). + Nat n + -> TxDecision txid tx + -> m (ServerStIdle n txid tx m ()) + serverReqTxIds + n TxDecision { txdTxIdsToAcknowledge = 0, + txdTxIdsToRequest = 0 } + = + case n of + Zero -> serverIdle + Succ _ -> handleReplies n + + serverReqTxIds + -- if there are no unacknowledged txids, the protocol requires sending + -- a blocking `MsgRequestTxIds` request. This is important, as otherwise + -- the client side wouldn't have a chance to terminate the + -- mini-protocol. + Zero TxDecision { txdTxIdsToAcknowledge = txIdsToAck, + txdPipelineTxIds = False, + txdTxIdsToRequest = txIdsToReq + } + = + pure $ SendMsgRequestTxIdsBlocking + txIdsToAck txIdsToReq + -- Our result if the client terminates the protocol + (traceWith tracer TraceTxInboundTerminated) + (\txids -> do + let txids' = NonEmpty.toList txids + txidsSeq = StrictSeq.fromList $ fst <$> txids' + txidsMap = Map.fromList txids' + unless (StrictSeq.length txidsSeq <= fromIntegral txIdsToReq) $ + throwIO ProtocolErrorTxIdsNotRequested + handleReceivedTxIds txIdsToReq txidsSeq txidsMap + serverIdle + ) + + serverReqTxIds + n@Zero TxDecision { txdTxIdsToAcknowledge = txIdsToAck, + txdPipelineTxIds = True, + txdTxIdsToRequest = txIdsToReq + } + = + pure $ SendMsgRequestTxIdsPipelined + txIdsToAck txIdsToReq + (handleReplies (Succ n)) + + serverReqTxIds + n@Succ{} TxDecision { txdTxIdsToAcknowledge = txIdsToAck, + txdPipelineTxIds, + txdTxIdsToRequest = txIdsToReq + } + = + -- it is impossible that we have had `tx`'s to request (Succ{} - is an + -- evidence for that), but no unacknowledged `txid`s. + assert txdPipelineTxIds $ + pure $ SendMsgRequestTxIdsPipelined + txIdsToAck txIdsToReq + (handleReplies (Succ n)) + + + handleReplies :: forall (n :: N). + Nat (S n) + -> m (ServerStIdle (S n) txid tx m ()) + handleReplies (Succ n'@Succ{}) = + pure $ CollectPipelined + Nothing + (handleReply (handleReplies n')) + + handleReplies (Succ Zero) = + pure $ CollectPipelined + Nothing + (handleReply serverIdle) + + handleReply :: forall (n :: N). + m (ServerStIdle n txid tx m ()) + -- continuation + -> Collect txid tx + -> m (ServerStIdle n txid tx m ()) + handleReply k = \case + CollectTxIds txIdsToReq txids -> do + let txidsSeq = StrictSeq.fromList $ fst <$> txids + txidsMap = Map.fromList txids + unless (StrictSeq.length txidsSeq <= fromIntegral txIdsToReq) $ + throwIO ProtocolErrorTxIdsNotRequested + handleReceivedTxIds txIdsToReq txidsSeq txidsMap + k + CollectTxs txids txs -> do + let requested = Set.fromList txids + received = Map.fromList [ (txId tx, tx) | tx <- txs ] + + unless (Map.keysSet received `Set.isSubsetOf` requested) $ + throwIO ProtocolErrorTxNotRequested + -- TODO: all sizes of txs which were announced earlier with + -- `MsgReplyTxIds` must be verified. + + handleReceivedTxs requested received + k