diff --git a/go.mod b/go.mod index 23dd8c722..e77306653 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/decred/dcrd/chaincfg/v3 v3.2.1 github.com/decred/dcrd/connmgr/v3 v3.1.2 github.com/decred/dcrd/container/apbf v1.0.1 + github.com/decred/dcrd/container/lru v1.0.0 github.com/decred/dcrd/crypto/blake256 v1.0.1 github.com/decred/dcrd/crypto/ripemd160 v1.0.2 github.com/decred/dcrd/database/v3 v3.0.2 @@ -49,7 +50,6 @@ require ( github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a // indirect github.com/dchest/siphash v1.2.3 // indirect - github.com/decred/dcrd/container/lru v1.0.0 // indirect github.com/decred/dcrd/crypto/rand v1.0.0 // indirect github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect github.com/decred/dcrd/hdkeychain/v3 v3.1.2 // indirect diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 879123cdf..69c376992 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -1046,17 +1046,10 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { m.rejectedTxns.Reset() // Remove expired pair requests and completed mixes from - // mixpool. The transactions from the previous block are used - // to remove spent PRs to avoid a logic race where a mined - // block immediately removes messages still propagating the - // network. - prevBlock, err := chain.BlockByHash(&header.PrevBlock) - if err == nil { - prev := prevBlock.MsgBlock() - m.cfg.MixPool.RemoveSpentPRs(prev.Transactions) - m.cfg.MixPool.RemoveSpentPRs(prev.STransactions) - m.cfg.MixPool.ExpireMessagesInBackground(prev.Header.Height) - } + // mixpool. + m.cfg.MixPool.RemoveSpentPRs(msgBlock.Transactions) + m.cfg.MixPool.RemoveSpentPRs(msgBlock.STransactions) + m.cfg.MixPool.ExpireMessagesInBackground(header.Height) } // Update the latest block height for the peer to avoid stale heights when diff --git a/server.go b/server.go index 389083eae..ee371cf29 100644 --- a/server.go +++ b/server.go @@ -33,6 +33,7 @@ import ( "github.com/decred/dcrd/chaincfg/v3" "github.com/decred/dcrd/connmgr/v3" "github.com/decred/dcrd/container/apbf" + "github.com/decred/dcrd/container/lru" "github.com/decred/dcrd/database/v3" "github.com/decred/dcrd/dcrutil/v4" "github.com/decred/dcrd/internal/blockchain" @@ -149,6 +150,20 @@ const ( // These values result in about 183 KiB memory usage including overhead. maxRecentlyConfirmedTxns = 23000 recentlyConfirmedTxnsFPRate = 0.000001 + + // These fields are used when caching recently advertised transactions. + // + // maxRecentlyAdvertisedTxns specifies the maximum number to cache and is + // set to target tracking the maximum number transactions of the minimum + // realistic size (~206 bytes) in approximately two blocks on the main + // network plus an additional 20%. + // + // recentlyAdvertisedTxnsTTL is the time to keep recently advertised + // transactions in the cache before they are expired. + // + // These values result in about 640 KiB memory usage including overhead. + maxRecentlyAdvertisedTxns = 4500 + recentlyAdvertisedTxnsTTL = 45 * time.Second ) var ( @@ -580,6 +595,45 @@ type server struct { // recentlyConfirmedTxns tracks transactions that have been confirmed in the // most recent blocks. recentlyConfirmedTxns *apbf.Filter + + // recentlyAdvertisedTxns caches transactions that have recently been + // advertised to other peers. The cache handles automatic expiration and + // maximum entry limiting. + // + // It is considered misbehavior to advertise a transaction and then claim it + // is not found when the corresponding request arrives. Further, since the + // mempool only contains the unconfirmed transactions as of the current best + // chain tip, a transaction might be advertised when it is first added to + // the mempool and then removed from the mempool prior to it being requested + // in the case new blocks are connected in between the advertisement and + // request. + // + // Thus, maintaining a separate cache of advertised transactions increases + // the probability they are available to serve regardless of whether or not + // they are still in the mempool when a request for the advertisement + // arrives. + // + // Note that it might be tempting to keep track of the number of times a tx + // has been advertised and requested so it can be removed from the cache as + // soon as there are no longer any potential outstanding requests, however, + // that is intentionally not done because it is exceedingly rare for + // advertisements to result in a request from all peers, so the extra + // overhead is not warranted. + recentlyAdvertisedTxns *lru.Map[chainhash.Hash, *dcrutil.Tx] + + // The following fields are used to periodically log the total number + // evicted recently advertised transactions. They are only accessed from + // a single long-running goroutine, so they are not protected for concurrent + // access. + // + // totalAdvertisedTxnsEvicted is the total number of advertised transactions + // that have been evicted from the cache since the previous report. + // + // lastAdvertisedTxnsEvictedLogged is the last time the total number of + // advertised transactions that have been evicted from the cache was + // reported. + totalAdvertisedTxnsEvicted uint64 + lastAdvertisedTxnsEvictedLogged time.Time } // serverPeer extends the peer to maintain state shared by the server. @@ -678,19 +732,29 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, var dataMsg wire.Message switch iv.Type { case wire.InvTypeTx: - // Attempt to fetch the requested transaction from the pool. A call - // could be made to check for existence first, but simply trying to - // fetch a missing transaction results in the same behavior. Do not - // allow peers to request transactions already in a block but are - // unconfirmed, as they may be expensive. Restrict that to the - // authenticated RPC only. + // Attempt to fetch the requested transaction. Try the pool of + // recently advertised transactions first and then fall back to the + // mempool. + // + // Note that this does not allow peers to request transactions + // already in a block over p2p unless they still happen to be in the + // pool of advertised transactions, as that would require all nodes + // to maintain a full transaction index which can be expensive. + // That ability is restricted to authenticated RPC only and requires + // the aforementioned full transaction index. txHash := &iv.Hash - tx, err := sp.server.txMemPool.FetchTransaction(txHash) - if err != nil { - peerLog.Debugf("Unable to fetch tx %v from the "+ - "transaction pool for %v: %v", txHash, - sp, err) - break + tx, ok := sp.server.recentlyAdvertisedTxns.Get(*txHash) + if !ok { + // Note that a call could be made to check for existence first, + // but simply trying to fetch a missing transaction results in + // the same behavior. + var err error + tx, err = sp.server.txMemPool.FetchTransaction(txHash) + if err != nil { + peerLog.Debugf("Unable to fetch tx %v from transaction "+ + "pool for peer %s: %v", txHash, sp, err) + break + } } dataMsg = tx.MsgTx() @@ -698,8 +762,8 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, blockHash := &iv.Hash block, err := sp.server.chain.BlockByHash(blockHash) if err != nil { - peerLog.Debugf("Unable to fetch block hash %v "+ - "for %v: %v", blockHash, sp, err) + peerLog.Debugf("Unable to fetch block hash %v for peer %s: %v", + blockHash, sp, err) break } dataMsg = block.MsgBlock() @@ -1014,24 +1078,33 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { return } - // Maintain at least one outbound peer capable of supporting p2p mixing. + // Maintain a minimum desired number of outbound peers capable of supporting + // p2p mixing. if !isInbound && msg.ProtocolVersion < int32(wire.MixVersion) { - var hasMixCapableOutbound bool - var numOutbound uint32 + var numOutbound, numMixCapableOutbound uint32 peerState := &sp.server.peerState peerState.Lock() peerState.forAllOutboundPeers(func(sp *serverPeer) { if sp.ProtocolVersion() >= wire.MixVersion { - hasMixCapableOutbound = true + numMixCapableOutbound++ } numOutbound++ }) peerState.Unlock() - if !hasMixCapableOutbound && numOutbound+1 == sp.server.targetOutbound { + const defaultWantMixCapableOutbound uint32 = 3 + wantMixCapableOutbound := defaultWantMixCapableOutbound + if sp.server.targetOutbound < wantMixCapableOutbound { + wantMixCapableOutbound = sp.server.targetOutbound + } + hasMinMixCapableOuts := numMixCapableOutbound >= wantMixCapableOutbound + needsMoreMixCapable := !hasMinMixCapableOuts && + numOutbound+wantMixCapableOutbound >= sp.server.targetOutbound + if needsMoreMixCapable { srvrLog.Debugf("Rejecting outbound peer %s with protocol version "+ - "%d in favor of a peer with minimum version %d", sp, - msg.ProtocolVersion, wire.MixVersion) + "%d in favor of a peer with minimum version %d (have: %d, "+ + "target: %d)", sp, msg.ProtocolVersion, wire.MixVersion, + numMixCapableOutbound, wantMixCapableOutbound) sp.Disconnect() } } @@ -1982,6 +2055,33 @@ func (s *server) TransactionConfirmed(tx *dcrutil.Tx) { } } +// maybeLogRecentlyAdvertisedNumEvicted periodically logs the total number of +// evicted advertised transactions. +// +// It is NOT safe for concurrent access. +func (s *server) maybeLogRecentlyAdvertisedNumEvicted() { + // Do not log anything when there haven't been any evicted transactions. + totalEvicted := s.totalAdvertisedTxnsEvicted + if totalEvicted == 0 { + return + } + + // Only log a maximum of once per day. + const logInterval = 24 * time.Hour + sinceLastLogged := time.Since(s.lastAdvertisedTxnsEvictedLogged) + if sinceLastLogged < logInterval { + return + } + + srvrLog.Debugf("Evicted %d advertised %s in the last %v (%d remaining, "+ + "%.2f%% hit ratio)", totalEvicted, pickNoun(totalEvicted, "tx", "txns"), + sinceLastLogged.Truncate(time.Second), s.recentlyAdvertisedTxns.Len(), + s.recentlyAdvertisedTxns.HitRatio()) + + s.totalAdvertisedTxnsEvicted = 0 + s.lastAdvertisedTxnsEvictedLogged = time.Now() +} + // handleRelayInvMsg deals with relaying inventory to peers that are not already // known to have it. It is invoked from the peerHandler goroutine. func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { @@ -2031,6 +2131,28 @@ func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { if sp.disableRelayTx.Load() { return } + + // Track advertised transactions for a period of time in order to + // increase the probability they are available to serve regardless + // of whether or not they are still in the mempool when a request + // for the advertisement arrives. Note that it is still possible + // for the advertised transaction to not be available in the case it + // is later removed from the advertised transactions pool due to + // expiration and/or exceeding the maximum limits prior to the + // request for it arriving. However, not only is that case + // extremely rare in practice, the transaction is also likely still + // in the mempool in that case and will be served from there given + // peers generally do not request transactions that have been + // recently confirmed. + tx, ok := msg.data.(*dcrutil.Tx) + if !ok { + peerLog.Warnf("Underlying data for inventory vector is not a " + + " transaction") + return + } + numEvicted := s.recentlyAdvertisedTxns.Put(iv.Hash, tx) + s.totalAdvertisedTxnsEvicted += uint64(numEvicted) + s.maybeLogRecentlyAdvertisedNumEvicted() } if iv.Type == wire.InvTypeMix { @@ -3730,6 +3852,9 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, recentlyConfirmedTxnsFPRate), indexSubscriber: indexers.NewIndexSubscriber(ctx), quit: make(chan struct{}), + recentlyAdvertisedTxns: lru.NewMapWithDefaultTTL[chainhash.Hash, + *dcrutil.Tx](maxRecentlyAdvertisedTxns, recentlyAdvertisedTxnsTTL), + lastAdvertisedTxnsEvictedLogged: time.Now(), } // Convert the minimum known work to a uint256 when it exists. Ideally, the