Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v2.0] multi: Main module backports. #3392

Merged
merged 3 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/decred/dcrd/chaincfg/v3 v3.2.1
github.com/decred/dcrd/connmgr/v3 v3.1.2
github.com/decred/dcrd/container/apbf v1.0.1
github.com/decred/dcrd/container/lru v1.0.0
github.com/decred/dcrd/crypto/blake256 v1.0.1
github.com/decred/dcrd/crypto/ripemd160 v1.0.2
github.com/decred/dcrd/database/v3 v3.0.2
Expand Down Expand Up @@ -49,7 +50,6 @@ require (
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect
github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a // indirect
github.com/dchest/siphash v1.2.3 // indirect
github.com/decred/dcrd/container/lru v1.0.0 // indirect
github.com/decred/dcrd/crypto/rand v1.0.0 // indirect
github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect
github.com/decred/dcrd/hdkeychain/v3 v3.1.2 // indirect
Expand Down
15 changes: 4 additions & 11 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,17 +1046,10 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) {
m.rejectedTxns.Reset()

// Remove expired pair requests and completed mixes from
// mixpool. The transactions from the previous block are used
// to remove spent PRs to avoid a logic race where a mined
// block immediately removes messages still propagating the
// network.
prevBlock, err := chain.BlockByHash(&header.PrevBlock)
if err == nil {
prev := prevBlock.MsgBlock()
m.cfg.MixPool.RemoveSpentPRs(prev.Transactions)
m.cfg.MixPool.RemoveSpentPRs(prev.STransactions)
m.cfg.MixPool.ExpireMessagesInBackground(prev.Header.Height)
}
// mixpool.
m.cfg.MixPool.RemoveSpentPRs(msgBlock.Transactions)
m.cfg.MixPool.RemoveSpentPRs(msgBlock.STransactions)
m.cfg.MixPool.ExpireMessagesInBackground(header.Height)
}

// Update the latest block height for the peer to avoid stale heights when
Expand Down
167 changes: 146 additions & 21 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/decred/dcrd/chaincfg/v3"
"github.com/decred/dcrd/connmgr/v3"
"github.com/decred/dcrd/container/apbf"
"github.com/decred/dcrd/container/lru"
"github.com/decred/dcrd/database/v3"
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/internal/blockchain"
Expand Down Expand Up @@ -149,6 +150,20 @@ const (
// These values result in about 183 KiB memory usage including overhead.
maxRecentlyConfirmedTxns = 23000
recentlyConfirmedTxnsFPRate = 0.000001

// These fields are used when caching recently advertised transactions.
//
// maxRecentlyAdvertisedTxns specifies the maximum number to cache and is
// set to target tracking the maximum number transactions of the minimum
// realistic size (~206 bytes) in approximately two blocks on the main
// network plus an additional 20%.
//
// recentlyAdvertisedTxnsTTL is the time to keep recently advertised
// transactions in the cache before they are expired.
//
// These values result in about 640 KiB memory usage including overhead.
maxRecentlyAdvertisedTxns = 4500
recentlyAdvertisedTxnsTTL = 45 * time.Second
)

var (
Expand Down Expand Up @@ -580,6 +595,45 @@ type server struct {
// recentlyConfirmedTxns tracks transactions that have been confirmed in the
// most recent blocks.
recentlyConfirmedTxns *apbf.Filter

// recentlyAdvertisedTxns caches transactions that have recently been
// advertised to other peers. The cache handles automatic expiration and
// maximum entry limiting.
//
// It is considered misbehavior to advertise a transaction and then claim it
// is not found when the corresponding request arrives. Further, since the
// mempool only contains the unconfirmed transactions as of the current best
// chain tip, a transaction might be advertised when it is first added to
// the mempool and then removed from the mempool prior to it being requested
// in the case new blocks are connected in between the advertisement and
// request.
//
// Thus, maintaining a separate cache of advertised transactions increases
// the probability they are available to serve regardless of whether or not
// they are still in the mempool when a request for the advertisement
// arrives.
//
// Note that it might be tempting to keep track of the number of times a tx
// has been advertised and requested so it can be removed from the cache as
// soon as there are no longer any potential outstanding requests, however,
// that is intentionally not done because it is exceedingly rare for
// advertisements to result in a request from all peers, so the extra
// overhead is not warranted.
recentlyAdvertisedTxns *lru.Map[chainhash.Hash, *dcrutil.Tx]

// The following fields are used to periodically log the total number
// evicted recently advertised transactions. They are only accessed from
// a single long-running goroutine, so they are not protected for concurrent
// access.
//
// totalAdvertisedTxnsEvicted is the total number of advertised transactions
// that have been evicted from the cache since the previous report.
//
// lastAdvertisedTxnsEvictedLogged is the last time the total number of
// advertised transactions that have been evicted from the cache was
// reported.
totalAdvertisedTxnsEvicted uint64
lastAdvertisedTxnsEvictedLogged time.Time
}

// serverPeer extends the peer to maintain state shared by the server.
Expand Down Expand Up @@ -678,28 +732,38 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,
var dataMsg wire.Message
switch iv.Type {
case wire.InvTypeTx:
// Attempt to fetch the requested transaction from the pool. A call
// could be made to check for existence first, but simply trying to
// fetch a missing transaction results in the same behavior. Do not
// allow peers to request transactions already in a block but are
// unconfirmed, as they may be expensive. Restrict that to the
// authenticated RPC only.
// Attempt to fetch the requested transaction. Try the pool of
// recently advertised transactions first and then fall back to the
// mempool.
//
// Note that this does not allow peers to request transactions
// already in a block over p2p unless they still happen to be in the
// pool of advertised transactions, as that would require all nodes
// to maintain a full transaction index which can be expensive.
// That ability is restricted to authenticated RPC only and requires
// the aforementioned full transaction index.
txHash := &iv.Hash
tx, err := sp.server.txMemPool.FetchTransaction(txHash)
if err != nil {
peerLog.Debugf("Unable to fetch tx %v from the "+
"transaction pool for %v: %v", txHash,
sp, err)
break
tx, ok := sp.server.recentlyAdvertisedTxns.Get(*txHash)
if !ok {
// Note that a call could be made to check for existence first,
// but simply trying to fetch a missing transaction results in
// the same behavior.
var err error
tx, err = sp.server.txMemPool.FetchTransaction(txHash)
if err != nil {
peerLog.Debugf("Unable to fetch tx %v from transaction "+
"pool for peer %s: %v", txHash, sp, err)
break
}
}
dataMsg = tx.MsgTx()

case wire.InvTypeBlock:
blockHash := &iv.Hash
block, err := sp.server.chain.BlockByHash(blockHash)
if err != nil {
peerLog.Debugf("Unable to fetch block hash %v "+
"for %v: %v", blockHash, sp, err)
peerLog.Debugf("Unable to fetch block hash %v for peer %s: %v",
blockHash, sp, err)
break
}
dataMsg = block.MsgBlock()
Expand Down Expand Up @@ -1014,24 +1078,33 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
return
}

// Maintain at least one outbound peer capable of supporting p2p mixing.
// Maintain a minimum desired number of outbound peers capable of supporting
// p2p mixing.
if !isInbound && msg.ProtocolVersion < int32(wire.MixVersion) {
var hasMixCapableOutbound bool
var numOutbound uint32
var numOutbound, numMixCapableOutbound uint32
peerState := &sp.server.peerState
peerState.Lock()
peerState.forAllOutboundPeers(func(sp *serverPeer) {
if sp.ProtocolVersion() >= wire.MixVersion {
hasMixCapableOutbound = true
numMixCapableOutbound++
}
numOutbound++
})
peerState.Unlock()

if !hasMixCapableOutbound && numOutbound+1 == sp.server.targetOutbound {
const defaultWantMixCapableOutbound uint32 = 3
wantMixCapableOutbound := defaultWantMixCapableOutbound
if sp.server.targetOutbound < wantMixCapableOutbound {
wantMixCapableOutbound = sp.server.targetOutbound
}
hasMinMixCapableOuts := numMixCapableOutbound >= wantMixCapableOutbound
needsMoreMixCapable := !hasMinMixCapableOuts &&
numOutbound+wantMixCapableOutbound >= sp.server.targetOutbound
if needsMoreMixCapable {
srvrLog.Debugf("Rejecting outbound peer %s with protocol version "+
"%d in favor of a peer with minimum version %d", sp,
msg.ProtocolVersion, wire.MixVersion)
"%d in favor of a peer with minimum version %d (have: %d, "+
"target: %d)", sp, msg.ProtocolVersion, wire.MixVersion,
numMixCapableOutbound, wantMixCapableOutbound)
sp.Disconnect()
}
}
Expand Down Expand Up @@ -1982,6 +2055,33 @@ func (s *server) TransactionConfirmed(tx *dcrutil.Tx) {
}
}

// maybeLogRecentlyAdvertisedNumEvicted periodically logs the total number of
// evicted advertised transactions.
//
// It is NOT safe for concurrent access.
func (s *server) maybeLogRecentlyAdvertisedNumEvicted() {
// Do not log anything when there haven't been any evicted transactions.
totalEvicted := s.totalAdvertisedTxnsEvicted
if totalEvicted == 0 {
return
}

// Only log a maximum of once per day.
const logInterval = 24 * time.Hour
sinceLastLogged := time.Since(s.lastAdvertisedTxnsEvictedLogged)
if sinceLastLogged < logInterval {
return
}

srvrLog.Debugf("Evicted %d advertised %s in the last %v (%d remaining, "+
"%.2f%% hit ratio)", totalEvicted, pickNoun(totalEvicted, "tx", "txns"),
sinceLastLogged.Truncate(time.Second), s.recentlyAdvertisedTxns.Len(),
s.recentlyAdvertisedTxns.HitRatio())

s.totalAdvertisedTxnsEvicted = 0
s.lastAdvertisedTxnsEvictedLogged = time.Now()
}

// handleRelayInvMsg deals with relaying inventory to peers that are not already
// known to have it. It is invoked from the peerHandler goroutine.
func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
Expand Down Expand Up @@ -2031,6 +2131,28 @@ func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
if sp.disableRelayTx.Load() {
return
}

// Track advertised transactions for a period of time in order to
// increase the probability they are available to serve regardless
// of whether or not they are still in the mempool when a request
// for the advertisement arrives. Note that it is still possible
// for the advertised transaction to not be available in the case it
// is later removed from the advertised transactions pool due to
// expiration and/or exceeding the maximum limits prior to the
// request for it arriving. However, not only is that case
// extremely rare in practice, the transaction is also likely still
// in the mempool in that case and will be served from there given
// peers generally do not request transactions that have been
// recently confirmed.
tx, ok := msg.data.(*dcrutil.Tx)
if !ok {
peerLog.Warnf("Underlying data for inventory vector is not a " +
" transaction")
return
}
numEvicted := s.recentlyAdvertisedTxns.Put(iv.Hash, tx)
s.totalAdvertisedTxnsEvicted += uint64(numEvicted)
s.maybeLogRecentlyAdvertisedNumEvicted()
}

if iv.Type == wire.InvTypeMix {
Expand Down Expand Up @@ -3730,6 +3852,9 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB,
recentlyConfirmedTxnsFPRate),
indexSubscriber: indexers.NewIndexSubscriber(ctx),
quit: make(chan struct{}),
recentlyAdvertisedTxns: lru.NewMapWithDefaultTTL[chainhash.Hash,
*dcrutil.Tx](maxRecentlyAdvertisedTxns, recentlyAdvertisedTxnsTTL),
lastAdvertisedTxnsEvictedLogged: time.Now(),
}

// Convert the minimum known work to a uint256 when it exists. Ideally, the
Expand Down
Loading