From 0fcf8838ccb15f2b98de390dd89c5b0982d5ffba Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 6 Sep 2020 13:36:10 +0300 Subject: [PATCH 1/5] improve republish logic to only republish messages that can be included in the next 20 blocks --- chain/messagepool/repub.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index 806171f52ba..f57e8c4128b 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -16,7 +16,7 @@ import ( const repubMsgLimit = 30 -var RepublishBatchDelay = 200 * time.Millisecond +var RepublishBatchDelay = 100 * time.Millisecond func (mp *MessagePool) republishPendingMessages() error { mp.curTsLk.Lock() @@ -27,6 +27,7 @@ func (mp *MessagePool) republishPendingMessages() error { mp.curTsLk.Unlock() return xerrors.Errorf("computing basefee: %w", err) } + baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor) pending := make(map[address.Address]map[uint64]*types.SignedMessage) mp.lk.Lock() @@ -70,6 +71,7 @@ func (mp *MessagePool) republishPendingMessages() error { gasLimit := int64(build.BlockGasLimit) minGas := int64(gasguess.MinGas) var msgs []*types.SignedMessage +loop: for i := 0; i < len(chains); { chain := chains[i] @@ -91,8 +93,18 @@ func (mp *MessagePool) republishPendingMessages() error { // does it fit in a block? if chain.gasLimit <= gasLimit { - gasLimit -= chain.gasLimit - msgs = append(msgs, chain.msgs...) + // check the baseFee lower bound -- only republish messages that can be included in the chain + // within the next 20 blocks. + for _, m := range chain.msgs { + if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) { + chain.Invalidate() + continue loop + } + gasLimit -= m.Message.GasLimit + msgs = append(msgs, m) + } + + // we processed the whole chain, advance i++ continue } From 2e75d9c80ae5301745e59612f1d9a7bff9aa02cf Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 6 Sep 2020 13:36:25 +0300 Subject: [PATCH 2/5] be explicit about republish interval, check against timecache duration --- chain/messagepool/messagepool.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 0d62e5423f2..72d0a01ab21 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -45,7 +45,7 @@ var rbfDenomBig = types.NewInt(RbfDenom) const RbfDenom = 256 -var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second +var RepublishInterval = time.Duration(10*build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee)) var baseFeeLowerBoundFactor = types.NewInt(10) @@ -81,6 +81,14 @@ const ( localUpdates = "update" ) +func init() { + // if the republish interval is too short compared to the pubsub timecache, adjust it + minInterval := pubsub.TimeCacheDuration + time.Duration(build.PropagationDelaySecs) + if RepublishInterval < minInterval { + RepublishInterval = minInterval + } +} + type MessagePool struct { lk sync.Mutex From 5659faf7f0732f3a3a8851fb9b7d2af7c1096956 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 6 Sep 2020 13:48:26 +0300 Subject: [PATCH 3/5] don't immediately publish messages that cannot be included in the next 20 blocks --- chain/messagepool/messagepool.go | 63 +++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 72d0a01ab21..b37f347fd1a 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -363,12 +363,12 @@ func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error { return nil } -func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) error { +func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) { epoch := curTs.Height() minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength()) if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil { - return xerrors.Errorf("message will not be included in a block: %w", err) + return false, xerrors.Errorf("message will not be included in a block: %w", err) } // this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks @@ -376,18 +376,25 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T // on republish to push it through later, if the baseFee has fallen. // this is a defensive check that stops minimum baseFee spam attacks from overloading validation // queues. - // Note that we don't do that for local messages, so that they can be accepted and republished - // automatically - if !local && len(curTs.Blocks()) > 0 { + // Note that for local messages, we always add them so that they can be accepted and republished + // automatically. + publish := local + if len(curTs.Blocks()) > 0 { baseFee := curTs.Blocks()[0].ParentBaseFee baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor) if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) { - return xerrors.Errorf("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w", - m.Message.GasFeeCap, baseFeeLowerBound, ErrSoftValidationFailure) + if local { + log.Warnf("local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s)", + m.Message.GasFeeCap, baseFeeLowerBound) + publish = false + } else { + return false, xerrors.Errorf("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w", + m.Message.GasFeeCap, baseFeeLowerBound, ErrSoftValidationFailure) + } } } - return nil + return publish, nil } func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { @@ -408,7 +415,8 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { } mp.curTsLk.Lock() - if err := mp.addTs(m, mp.curTs, true); err != nil { + publish, err := mp.addTs(m, mp.curTs, true) + if err != nil { mp.curTsLk.Unlock() return cid.Undef, err } @@ -421,7 +429,11 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { } mp.lk.Unlock() - return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) + if publish { + err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) + } + + return m.Cid(), err } func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { @@ -469,7 +481,9 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() - return mp.addTs(m, mp.curTs, false) + + _, err = mp.addTs(m, mp.curTs, false) + return err } func sigCacheKey(m *types.SignedMessage) (string, error) { @@ -536,28 +550,29 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) return nil } -func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) error { +func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) { snonce, err := mp.getStateNonce(m.Message.From, curTs) if err != nil { - return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) + return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) } if snonce > m.Message.Nonce { - return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow) + return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow) } mp.lk.Lock() defer mp.lk.Unlock() - if err := mp.verifyMsgBeforeAdd(m, curTs, local); err != nil { - return err + publish, err := mp.verifyMsgBeforeAdd(m, curTs, local) + if err != nil { + return false, err } if err := mp.checkBalance(m, curTs); err != nil { - return err + return false, err } - return mp.addLocked(m, true) + return publish, mp.addLocked(m, true) } func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { @@ -583,7 +598,8 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { mp.lk.Lock() defer mp.lk.Unlock() - if err := mp.verifyMsgBeforeAdd(m, curTs, true); err != nil { + _, err = mp.verifyMsgBeforeAdd(m, curTs, true) + if err != nil { return err } @@ -769,7 +785,8 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, return nil, ErrTryAgain } - if err := mp.verifyMsgBeforeAdd(msg, curTs, true); err != nil { + publish, err := mp.verifyMsgBeforeAdd(msg, curTs, true) + if err != nil { return nil, err } @@ -784,7 +801,11 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, log.Errorf("addLocal failed: %+v", err) } - return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) + if publish { + err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) + } + + return msg, err } func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) { From 41222792edfd6f5e1cf1253eef32922f936a0725 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 7 Sep 2020 20:26:51 +0300 Subject: [PATCH 4/5] add docstring for verifyMsgBeforeAdd --- chain/messagepool/messagepool.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index b37f347fd1a..621a67ae038 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -363,6 +363,16 @@ func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error { return nil } +// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio +// and whether the message has enough funds to be included in the next 20 blocks. +// If the message is not valid for block inclusion, it returns an error. +// For local messages, if the message can be included in the next 20 blocks, it returns true to +// signal that it should be immediately published. If the message cannot be included in the next 20 +// blocks, it returns false so that the message doesn't immediately get published (and ignored by our +// peers); instead it will be published through the republish loop, once the base fee has fallen +// sufficiently. +// For non local messages, if the message cannot be included in the next 20 blocks it returns +// a (soft) validation error. func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) { epoch := curTs.Height() minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength()) From 2233751668d0aeaf0a930aa92c862620600eef4b Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 7 Sep 2020 21:53:30 +0300 Subject: [PATCH 5/5] use the baseFeeLowerBound for computing repub message chains --- chain/messagepool/repub.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index f57e8c4128b..648466629f4 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -56,7 +56,11 @@ func (mp *MessagePool) republishPendingMessages() error { var chains []*msgChain for actor, mset := range pending { - next := mp.createMessageChains(actor, mset, baseFee, ts) + // We use the baseFee lower bound for createChange so that we optimistically include + // chains that might become profitable in the next 20 blocks. + // We still check the lowerBound condition for individual messages so that we don't send + // messages that will be rejected by the mpool spam protector, so this is safe to do. + next := mp.createMessageChains(actor, mset, baseFeeLowerBound, ts) chains = append(chains, next...) }