From 92a74f0db9186a36a3f6d6d49885ff78c56ff649 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Tue, 23 May 2023 16:22:34 +0800 Subject: [PATCH 1/3] fix: cap the message gas limit at the block gas limit --- pkg/messagepool/gas.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/messagepool/gas.go b/pkg/messagepool/gas.go index 67714fb668..6560fd29f2 100644 --- a/pkg/messagepool/gas.go +++ b/pkg/messagepool/gas.go @@ -372,6 +372,10 @@ func (mp *MessagePool) GasEstimateMessageGas(ctx context.Context, estimateMessag gasLimitOverestimation = estimateMessage.Spec.GasOverEstimation } estimateMessage.Msg.GasLimit = int64(float64(gasLimit) * gasLimitOverestimation) + // Gas overestimation can cause us to exceed the block gas limit, cap it. + if estimateMessage.Msg.GasLimit > constants.BlockGasLimit { + estimateMessage.Msg.GasLimit = constants.BlockGasLimit + } } if estimateMessage.Msg.GasPremium == types.EmptyInt || types.BigCmp(estimateMessage.Msg.GasPremium, types.NewInt(0)) == 0 { @@ -441,6 +445,10 @@ func (mp *MessagePool) GasBatchEstimateMessageGas(ctx context.Context, estimateM continue } estimateMsg.GasLimit = int64(float64(gasUsed) * estimateMessage.Spec.GasOverEstimation) + // Gas overestimation can cause us to exceed the block gas limit, cap it. + if estimateMsg.GasLimit > constants.BlockGasLimit { + estimateMsg.GasLimit = constants.BlockGasLimit + } } if estimateMsg.GasPremium == types.EmptyInt || types.BigCmp(estimateMsg.GasPremium, types.NewInt(0)) == 0 { From e6bfc13994b38e2b39c6754d2fa91f3d81bc5d9e Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Tue, 23 May 2023 17:01:14 +0800 Subject: [PATCH 2/3] perf: Address performance of EthGetTransactionCount --- pkg/messagepool/messagepool.go | 91 ++++++++++++++++++++++------- pkg/messagepool/messagepool_test.go | 16 +++++ pkg/messagepool/provider.go | 50 ++++++++++++---- 3 files changed, 125 insertions(+), 32 deletions(-) diff --git a/pkg/messagepool/messagepool.go b/pkg/messagepool/messagepool.go index be9ad55fce..364fe59e57 100644 --- a/pkg/messagepool/messagepool.go +++ b/pkg/messagepool/messagepool.go @@ -172,6 +172,8 @@ type MessagePool struct { sigValCache *lru.TwoQueueCache[string, struct{}] + stateNonceCache *lru.Cache[stateNonceCacheKey, uint64] + evtTypes [3]journal.EventType journal journal.Journal @@ -182,6 +184,11 @@ type MessagePool struct { PriceCache *GasPriceCache } +type stateNonceCacheKey struct { + tsk types.TipSetKey + addr address.Address +} + func newDefaultMaxFeeFunc(maxFee types.FIL) DefaultMaxFeeFunc { return func() (out abi.TokenAmount, err error) { out = abi.TokenAmount{Int: maxFee.Int} @@ -391,6 +398,7 @@ func New(ctx context.Context, cache, _ := lru.New2Q[cid.Cid, crypto.Signature](constants.BlsSignatureCacheSize) verifcache, _ := lru.New2Q[string, struct{}](constants.VerifSigCacheSize) keycache, _ := lru.New[address.Address, address.Address](1_000_000) + stateNonceCache, _ := lru.New[stateNonceCacheKey, uint64](32768) // 32k * ~200 bytes = 6MB cfg, err := loadConfig(ctx, ds) if err != nil { @@ -404,25 +412,26 @@ func New(ctx context.Context, setRepublishInterval(networkParams.PropagationDelaySecs) mp := &MessagePool{ - ds: ds, - addSema: make(chan struct{}, 1), - closer: make(chan struct{}), - repubTk: constants.Clock.Ticker(RepublishInterval), - repubTrigger: make(chan struct{}, 1), - localAddrs: make(map[address.Address]struct{}), - pending: make(map[address.Address]*msgSet), - keyCache: keycache, - minGasPrice: big.NewInt(0), - pruneTrigger: make(chan struct{}, 1), - pruneCooldown: make(chan struct{}, 1), - blsSigCache: cache, - sigValCache: verifcache, - changes: lps.New(50), - localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), - api: api, - sm: sm, - netName: netName, - cfg: cfg, + ds: ds, + addSema: make(chan struct{}, 1), + closer: make(chan struct{}), + repubTk: constants.Clock.Ticker(RepublishInterval), + repubTrigger: make(chan struct{}, 1), + localAddrs: make(map[address.Address]struct{}), + pending: make(map[address.Address]*msgSet), + keyCache: keycache, + minGasPrice: big.NewInt(0), + pruneTrigger: make(chan struct{}, 1), + pruneCooldown: make(chan struct{}, 1), + blsSigCache: cache, + sigValCache: verifcache, + stateNonceCache: stateNonceCache, + changes: lps.New(50), + localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), + api: api, + sm: sm, + netName: netName, + cfg: cfg, evtTypes: [...]journal.EventType{ evtTypeMpoolAdd: j.RegisterEventType("mpool", "add"), evtTypeMpoolRemove: j.RegisterEventType("mpool", "remove"), @@ -1123,12 +1132,52 @@ func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, } func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, curTS *types.TipSet) (uint64, error) { - act, err := mp.api.GetActorAfter(ctx, addr, curTS) + nk := stateNonceCacheKey{ + tsk: curTS.Key(), + addr: addr, + } + + n, ok := mp.stateNonceCache.Get(nk) + if ok { + return n, nil + } + + // get the nonce from the actor before ts + actor, err := mp.api.GetActorBefore(addr, curTS) + if err != nil { + return 0, err + } + nextNonce := actor.Nonce + + raddr, err := mp.resolveToKey(ctx, addr) if err != nil { return 0, err } - return act.Nonce, nil + // loop over all messages sent by 'addr' and find the highest nonce + messages, err := mp.api.MessagesForTipset(ctx, curTS) + if err != nil { + return 0, err + } + for _, message := range messages { + msg := message.VMMessage() + + maddr, err := mp.resolveToKey(ctx, msg.From) + if err != nil { + log.Warnf("failed to resolve message from address: %s", err) + continue + } + + if maddr == raddr { + if n := msg.Nonce + 1; n > nextNonce { + nextNonce = n + } + } + } + + mp.stateNonceCache.Add(nk, nextNonce) + + return nextNonce, nil } func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (big.Int, error) { diff --git a/pkg/messagepool/messagepool_test.go b/pkg/messagepool/messagepool_test.go index 09a354e7b5..769600a85b 100644 --- a/pkg/messagepool/messagepool_test.go +++ b/pkg/messagepool/messagepool_test.go @@ -217,6 +217,22 @@ func (tma *testMpoolAPI) PubSubPublish(context.Context, string, []byte) error { return nil } +func (tma *testMpoolAPI) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + balance, ok := tma.balance[addr] + if !ok { + balance = types.NewInt(1000e6) + tma.balance[addr] = balance + } + + nonce := tma.statenonce[addr] + + return &types.Actor{ + Code: builtin2.AccountActorCodeID, + Nonce: nonce, + Balance: balance, + }, nil +} + func (tma *testMpoolAPI) GetActorAfter(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) { // regression check for load bug if ts == nil { diff --git a/pkg/messagepool/provider.go b/pkg/messagepool/provider.go index 9519696984..3353c78449 100644 --- a/pkg/messagepool/provider.go +++ b/pkg/messagepool/provider.go @@ -31,6 +31,7 @@ type Provider interface { SubscribeHeadChanges(context.Context, func(rev, app []*types.TipSet) error) *types.TipSet PutMessage(context.Context, types.ChainMsg) (cid.Cid, error) PubSubPublish(context.Context, string, []byte) error + GetActorBefore(address.Address, *types.TipSet) (*types.Actor, error) GetActorAfter(context.Context, address.Address, *types.TipSet) (*types.Actor, error) StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error) StateNetworkVersion(context.Context, abi.ChainEpoch) network.Version @@ -72,6 +73,22 @@ func (mpp *mpoolProvider) IsLite() bool { return mpp.lite != nil } +func (mpp *mpoolProvider) getActorLite(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) { + if !mpp.IsLite() { + return nil, errors.New("should not use getActorLite on non lite Provider") + } + n, err := mpp.lite.GetNonce(ctx, addr, ts.Key()) + if err != nil { + return nil, fmt.Errorf("getting nonce over lite: %w", err) + } + a, err := mpp.lite.GetActor(ctx, addr, ts.Key()) + if err != nil { + return nil, fmt.Errorf("getting actor over lite: %w", err) + } + a.Nonce = n + return a, nil +} + func (mpp *mpoolProvider) SubscribeHeadChanges(ctx context.Context, cb func(rev, app []*types.TipSet) error) *types.TipSet { mpp.sm.SubscribeHeadChanges( chain.WrapHeadChangeCoalescer( @@ -99,18 +116,29 @@ func (mpp *mpoolProvider) PubSubPublish(ctx context.Context, k string, v []byte) return mpp.ps.Publish(k, v) // nolint } +func (mpp *mpoolProvider) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + ctx := context.TODO() + + if mpp.IsLite() { + return mpp.getActorLite(ctx, addr, ts) + } + + _, st, err := mpp.stmgr.ParentState(ctx, ts) + if err != nil { + return nil, fmt.Errorf("computing tipset state for GetActor: %v", err) + } + + act, found, err := st.GetActor(ctx, addr) + if !found { + err = types.ErrActorNotFound + } + + return act, err +} + func (mpp *mpoolProvider) GetActorAfter(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) { if mpp.IsLite() { - n, err := mpp.lite.GetNonce(ctx, addr, ts.Key()) - if err != nil { - return nil, fmt.Errorf("getting nonce over lite: %w", err) - } - a, err := mpp.lite.GetActor(ctx, addr, ts.Key()) - if err != nil { - return nil, fmt.Errorf("getting actor over lite: %w", err) - } - a.Nonce = n - return a, nil + return mpp.getActorLite(context.TODO(), addr, ts) } st, err := mpp.stmgr.TipsetState(ctx, ts) @@ -120,7 +148,7 @@ func (mpp *mpoolProvider) GetActorAfter(ctx context.Context, addr address.Addres act, found, err := st.GetActor(ctx, addr) if !found { - err = errors.New("actor not found") + err = types.ErrActorNotFound } return act, err From 336826b163571a9ed0ccb4d36021fd7721092dd4 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Tue, 23 May 2023 17:36:22 +0800 Subject: [PATCH 3/3] perf: mempool: lower priority optimizations --- pkg/messagepool/check.go | 19 ++++++++++++++++--- pkg/messagepool/messagepool.go | 9 ++++++++- pkg/messagepool/repub.go | 12 +++++++----- pkg/messagepool/selection.go | 8 ++++---- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/pkg/messagepool/check.go b/pkg/messagepool/check.go index 084b7c2542..b662479cce 100644 --- a/pkg/messagepool/check.go +++ b/pkg/messagepool/check.go @@ -30,8 +30,13 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*types.Messag func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]types.MessageCheckStatus, error) { var msgs []*types.Message mp.lk.RLock() - mset, ok := mp.pending[from] + mset, ok, err := mp.getPendingMset(ctx, from) + if err != nil { + mp.lk.RUnlock() + return nil, fmt.Errorf("errored while getting pending mset: %w", err) + } if ok { + msgs = make([]*types.Message, 0, len(mset.msgs)) for _, sm := range mset.msgs { msgs = append(msgs, &sm.Message) } @@ -61,7 +66,11 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type if !ok { mmap = make(map[uint64]*types.Message) msgMap[m.From] = mmap - mset, ok := mp.pending[m.From] + mset, ok, err := mp.getPendingMset(ctx, m.From) + if err != nil { + mp.lk.RUnlock() + return nil, fmt.Errorf("errored while getting pending mset: %w", err) + } if ok { count += len(mset.msgs) for _, sm := range mset.msgs { @@ -141,7 +150,11 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, st, ok := state[m.From] if !ok { mp.lk.RLock() - mset, ok := mp.pending[m.From] + mset, ok, err := mp.getPendingMset(ctx, m.From) + if err != nil { + mp.lk.RUnlock() + return nil, fmt.Errorf("errored while getting pending mset: %w", err) + } if ok && !interned { st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds} for _, m := range mset.msgs { diff --git a/pkg/messagepool/messagepool.go b/pkg/messagepool/messagepool.go index 364fe59e57..17709925dd 100644 --- a/pkg/messagepool/messagepool.go +++ b/pkg/messagepool/messagepool.go @@ -481,6 +481,14 @@ func New(ctx context.Context, } func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) { + //if addr is not an ID addr, then it is already resolved to a key + if addr.Protocol() != address.ID { + return addr, nil + } + return mp.resolveToKeyFromID(ctx, addr) +} + +func (mp *MessagePool) resolveToKeyFromID(ctx context.Context, addr address.Address) (address.Address, error) { // check the cache a, ok := mp.keyCache.Get(addr) if ok { @@ -495,7 +503,6 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) ( // place both entries in the cache (may both be key addresses, which is fine) mp.keyCache.Add(addr, ka) - mp.keyCache.Add(ka, ka) return ka, nil } diff --git a/pkg/messagepool/repub.go b/pkg/messagepool/repub.go index 148184eb30..f5ed419fde 100644 --- a/pkg/messagepool/repub.go +++ b/pkg/messagepool/repub.go @@ -22,18 +22,21 @@ var RepublishBatchDelay = 100 * time.Millisecond func (mp *MessagePool) republishPendingMessages(ctx context.Context) error { mp.curTSLk.RLock() ts := mp.curTS + mp.curTSLk.RUnlock() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) - mp.curTSLk.RUnlock() if err != nil { return fmt.Errorf("computing basefee: %v", err) } baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor) pending := make(map[address.Address]map[uint64]*types.SignedMessage) - mp.curTSLk.Lock() + mp.lk.Lock() mp.republished = nil // clear this to avoid races triggering an early republish + mp.lk.Unlock() + + mp.lk.RLock() for actor := range mp.localAddrs { mset, ok := mp.pending[actor] if !ok { @@ -49,8 +52,7 @@ func (mp *MessagePool) republishPendingMessages(ctx context.Context) error { } pending[actor] = pend } - mp.lk.Unlock() - mp.curTSLk.Unlock() + mp.lk.RUnlock() if len(pending) == 0 { return nil @@ -173,8 +175,8 @@ LOOP: republished[m.Cid()] = struct{}{} } - mp.lk.Lock() // update the republished set so that we can trigger early republish from head changes + mp.lk.Lock() mp.republished = republished mp.lk.Unlock() diff --git a/pkg/messagepool/selection.go b/pkg/messagepool/selection.go index 46847c5f35..8b19098af3 100644 --- a/pkg/messagepool/selection.go +++ b/pkg/messagepool/selection.go @@ -39,12 +39,12 @@ type msgChain struct { } func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { - mp.curTSLk.Lock() - defer mp.curTSLk.Unlock() + mp.curTSLk.RLock() + defer mp.curTSLk.RUnlock() //TODO confirm if we can switch to RLock here for performance - mp.lk.Lock() - defer mp.lk.Unlock() + mp.lk.RLock() + defer mp.lk.RUnlock() // See if we need to prune before selection; excessive buildup can lead to slow selection, // so prune if we have too many messages (ignoring the cooldown).