From 902ebe75851328ecfae8fd19c5cb4483013ec4d7 Mon Sep 17 00:00:00 2001 From: Haaai <55118568+Liuhaai@users.noreply.github.com> Date: Thu, 5 Jan 2023 14:56:03 -0800 Subject: [PATCH] [actpool] Improve actqueue efficiency (#3377) * opt actpool.Add() * improve actqueue efficiency --- actpool/actioniterator/actioniterator.go | 3 +- actpool/actpool.go | 462 +++++++++++------------ actpool/actpool_test.go | 235 +++++------- actpool/actqueue.go | 258 +++++++------ actpool/actqueue_test.go | 69 +--- actpool/options.go | 5 + actpool/queueworker.go | 278 ++++++++++++++ dispatcher/dispatcher.go | 40 +- 8 files changed, 776 insertions(+), 574 deletions(-) create mode 100644 actpool/queueworker.go diff --git a/actpool/actioniterator/actioniterator.go b/actpool/actioniterator/actioniterator.go index 9d654eb71e..8c932b8dbc 100644 --- a/actpool/actioniterator/actioniterator.go +++ b/actpool/actioniterator/actioniterator.go @@ -69,8 +69,7 @@ func NewActionIterator(accountActs map[string][]action.SealedEnvelope) ActionIte // LoadNext load next action of account of top action func (ai *actionIterator) loadNextActionForTopAccount() { - sender := ai.heads[0].SrcPubkey() - callerAddrStr := sender.Address().String() + callerAddrStr := ai.heads[0].SenderAddress().String() if actions, ok := ai.accountActs[callerAddrStr]; ok && len(actions) > 0 { ai.heads[0], ai.accountActs[callerAddrStr] = actions[0], actions[1:] heap.Fix(&ai.heads, 0) diff --git a/actpool/actpool.go b/actpool/actpool.go index f9ab32da8d..428fe79147 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -9,13 +9,14 @@ import ( "context" "encoding/hex" "sort" - "strings" "sync" + "sync/atomic" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "github.com/iotexproject/go-pkgs/cache/ttl" "github.com/iotexproject/go-pkgs/hash" "github.com/iotexproject/iotex-address/address" @@ -29,6 +30,11 @@ import ( "github.com/iotexproject/iotex-core/pkg/tracer" ) +const ( + // TODO: move to config + _numWorker = 16 +) + var ( _actpoolMtc = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "iotex_actpool_rejection_metrics", @@ -91,18 +97,18 @@ func EnableExperimentalActions() Option { // actPool implements ActPool interface type actPool struct { - mutex sync.RWMutex cfg Config g genesis.Genesis sf protocol.StateReader - accountActs map[string]ActQueue - accountDesActs map[string]map[hash.Hash256]action.SealedEnvelope - allActions map[hash.Hash256]action.SealedEnvelope + accountDesActs *destinationMap + allActions *ttl.Cache gasInPool uint64 actionEnvelopeValidators []action.SealedEnvelopeValidator timerFactory *prometheustimer.TimerFactory enableExperimentalActions bool senderBlackList map[string]bool + jobQueue []chan workerJob + worker []*queueWorker } // NewActPool constructs a new actpool @@ -116,14 +122,16 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ... senderBlackList[bannedSender] = true } + actsMap, _ := ttl.NewCache() ap := &actPool{ cfg: cfg, g: g, sf: sf, senderBlackList: senderBlackList, - accountActs: make(map[string]ActQueue), - accountDesActs: make(map[string]map[hash.Hash256]action.SealedEnvelope), - allActions: make(map[hash.Hash256]action.SealedEnvelope), + accountDesActs: &destinationMap{acts: make(map[string]map[hash.Hash256]action.SealedEnvelope)}, + allActions: actsMap, + jobQueue: make([]chan workerJob, _numWorker), + worker: make([]*queueWorker, _numWorker), } for _, opt := range opts { if err := opt(ap); err != nil { @@ -140,6 +148,14 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ... return nil, err } ap.timerFactory = timerFactory + + for i := 0; i < _numWorker; i++ { + ap.jobQueue[i] = make(chan workerJob, ap.cfg.MaxNumActsPerAcct) + ap.worker[i] = newQueueWorker(ap, ap.jobQueue[i]) + if err := ap.worker[i].Start(); err != nil { + return nil, err + } + } return ap, nil } @@ -156,100 +172,150 @@ func (ap *actPool) AddActionEnvelopeValidators(fs ...action.SealedEnvelopeValida // Then starting from the current confirmed nonce, iteratively update pending nonce if nonces are consecutive and pending // balance is sufficient, and remove all the subsequent actions once the pending balance becomes insufficient func (ap *actPool) Reset() { - ap.mutex.Lock() - defer ap.mutex.Unlock() - ap.reset() } -func (ap *actPool) ReceiveBlock(*block.Block) error { - ap.mutex.Lock() - defer ap.mutex.Unlock() +func (ap *actPool) reset() { + var ( + wg sync.WaitGroup + ctx = ap.context(context.Background()) + ) + for i := range ap.worker { + wg.Add(1) + go func(worker *queueWorker) { + defer wg.Done() + worker.Reset(ctx) + }(ap.worker[i]) + } + wg.Wait() +} +func (ap *actPool) ReceiveBlock(*block.Block) error { ap.reset() return nil } -// PendingActionIterator returns an action interator with all accepted actions +// PendingActionMap returns an action interator with all accepted actions func (ap *actPool) PendingActionMap() map[string][]action.SealedEnvelope { - ap.mutex.Lock() - defer ap.mutex.Unlock() - - // Remove the actions that are already timeout - ap.reset() - - ctx := ap.context(context.Background()) - actionMap := make(map[string][]action.SealedEnvelope) - for from, queue := range ap.accountActs { - actionMap[from] = append(actionMap[from], queue.PendingActs(ctx)...) + var ( + wg sync.WaitGroup + actsFromWorker = make([][]*pendingActions, _numWorker) + ctx = ap.context(context.Background()) + totalAccounts = uint64(0) + ) + for i := range ap.worker { + wg.Add(1) + go func(i int) { + defer wg.Done() + actsFromWorker[i] = ap.worker[i].PendingActions(ctx) + atomic.AddUint64(&totalAccounts, uint64(len(actsFromWorker[i]))) + }(i) + } + wg.Wait() + + ret := make(map[string][]action.SealedEnvelope, totalAccounts) + for _, v := range actsFromWorker { + for _, w := range v { + ret[w.sender] = w.acts + } } - return actionMap + return ret } func (ap *actPool) Add(ctx context.Context, act action.SealedEnvelope) error { - ap.mutex.Lock() - defer ap.mutex.Unlock() - ctx, span := tracer.NewSpan(ap.context(ctx), "actPool.Add") defer span.End() + ctx = ap.context(ctx) + + if err := checkSelpData(&act); err != nil { + return err + } + + if err := ap.checkSelpWithoutState(ctx, &act); err != nil { + return err + } // Reject action if pool space is full - if uint64(len(ap.allActions)) >= ap.cfg.MaxNumActsPerPool { + + if uint64(ap.allActions.Count()) >= ap.cfg.MaxNumActsPerPool { _actpoolMtc.WithLabelValues("overMaxNumActsPerPool").Inc() return action.ErrTxPoolOverflow } - span.AddEvent("act.IntrinsicGas") - intrinsicGas, err := act.IntrinsicGas() + + if intrinsicGas, _ := act.IntrinsicGas(); atomic.LoadUint64(&ap.gasInPool)+intrinsicGas > ap.cfg.MaxGasLimitPerPool { + _actpoolMtc.WithLabelValues("overMaxGasLimitPerPool").Inc() + return action.ErrGasLimit + } + + return ap.enqueue(ctx, act) +} + +func checkSelpData(act *action.SealedEnvelope) error { + _, err := act.IntrinsicGas() if err != nil { - _actpoolMtc.WithLabelValues("failedGetIntrinsicGas").Inc() return err } - if ap.gasInPool+intrinsicGas > ap.cfg.MaxGasLimitPerPool { - _actpoolMtc.WithLabelValues("overMaxGasLimitPerPool").Inc() - return action.ErrGasLimit + _, err = act.Hash() + if err != nil { + return err } - hash, err := act.Hash() + _, err = act.Cost() if err != nil { return err } + if act.SrcPubkey() == nil { + return action.ErrAddress + } + return nil +} + +func (ap *actPool) checkSelpWithoutState(ctx context.Context, selp *action.SealedEnvelope) error { + span := tracer.SpanFromContext(ctx) + span.AddEvent("actPool.checkSelpWithoutState") + defer span.End() + + hash, _ := selp.Hash() // Reject action if it already exists in pool - if _, exist := ap.allActions[hash]; exist { + if _, exist := ap.allActions.Get(hash); exist { _actpoolMtc.WithLabelValues("existedAction").Inc() return action.ErrExistedInPool } + // Reject action if the gas price is lower than the threshold - if act.GasPrice().Cmp(ap.cfg.MinGasPrice()) < 0 { + if selp.GasPrice().Cmp(ap.cfg.MinGasPrice()) < 0 { _actpoolMtc.WithLabelValues("gasPriceLower").Inc() + actHash, _ := selp.Hash() log.L().Info("action rejected due to low gas price", - zap.String("actionHash", hex.EncodeToString(hash[:])), - zap.String("gasPrice", act.GasPrice().String())) + zap.String("actionHash", hex.EncodeToString(actHash[:])), + zap.String("gasPrice", selp.GasPrice().String())) return action.ErrUnderpriced } - if err := ap.validate(ctx, act); err != nil { - return err + + if _, ok := ap.senderBlackList[selp.SenderAddress().String()]; ok { + _actpoolMtc.WithLabelValues("blacklisted").Inc() + return errors.Wrap(action.ErrAddress, "action source address is blacklisted") } - caller := act.SenderAddress() - if caller == nil { - return action.ErrAddress + for _, ev := range ap.actionEnvelopeValidators { + span.AddEvent("ev.Validate") + if err := ev.Validate(ctx, *selp); err != nil { + return err + } } - return ap.enqueueAction(ctx, caller, act, hash, act.Nonce()) + return nil } // GetPendingNonce returns pending nonce in pool or confirmed nonce given an account address -func (ap *actPool) GetPendingNonce(addr string) (uint64, error) { - addrStr, err := address.FromString(addr) +func (ap *actPool) GetPendingNonce(addrStr string) (uint64, error) { + addr, err := address.FromString(addrStr) if err != nil { return 0, err } - ap.mutex.RLock() - defer ap.mutex.RUnlock() - - if queue, ok := ap.accountActs[addr]; ok { + if queue := ap.worker[ap.allocatedWorker(addr)].GetQueue(addr); queue != nil { return queue.PendingNonce(), nil } ctx := ap.context(context.Background()) - confirmedState, err := accountutil.AccountState(ctx, ap.sf, addrStr) + confirmedState, err := accountutil.AccountState(ctx, ap.sf, addr) if err != nil { return 0, err } @@ -257,44 +323,32 @@ func (ap *actPool) GetPendingNonce(addr string) (uint64, error) { } // GetUnconfirmedActs returns unconfirmed actions in pool given an account address -func (ap *actPool) GetUnconfirmedActs(addr string) []action.SealedEnvelope { - ap.mutex.RLock() - defer ap.mutex.RUnlock() +func (ap *actPool) GetUnconfirmedActs(addrStr string) []action.SealedEnvelope { + addr, err := address.FromString(addrStr) + if err != nil { + return []action.SealedEnvelope{} + } + var ret []action.SealedEnvelope - if queue, ok := ap.accountActs[addr]; ok { - ret = queue.AllActs() - } - if desMap, ok := ap.accountDesActs[addr]; ok { - if desMap != nil { - sortActions := make(SortedActions, 0) - for _, v := range desMap { - sortActions = append(sortActions, v) - } - sort.Stable(sortActions) - ret = append(ret, sortActions...) - } + if queue := ap.worker[ap.allocatedWorker(addr)].GetQueue(addr); queue != nil { + ret = append(ret, queue.AllActs()...) } + ret = append(ret, ap.accountDesActs.actionsByDestination(addrStr)...) return ret } // GetActionByHash returns the pending action in pool given action's hash func (ap *actPool) GetActionByHash(hash hash.Hash256) (action.SealedEnvelope, error) { - ap.mutex.RLock() - defer ap.mutex.RUnlock() - - act, ok := ap.allActions[hash] + act, ok := ap.allActions.Get(hash) if !ok { return action.SealedEnvelope{}, errors.Wrapf(action.ErrNotFound, "action hash %x does not exist in pool", hash) } - return act, nil + return act.(action.SealedEnvelope), nil } // GetSize returns the act pool size func (ap *actPool) GetSize() uint64 { - ap.mutex.RLock() - defer ap.mutex.RUnlock() - - return uint64(len(ap.allActions)) + return uint64(ap.allActions.Count()) } // GetCapacity returns the act pool capacity @@ -304,10 +358,7 @@ func (ap *actPool) GetCapacity() uint64 { // GetGasSize returns the act pool gas size func (ap *actPool) GetGasSize() uint64 { - ap.mutex.RLock() - defer ap.mutex.RUnlock() - - return ap.gasInPool + return atomic.LoadUint64(&ap.gasInPool) } // GetGasCapacity returns the act pool gas capacity @@ -316,17 +367,14 @@ func (ap *actPool) GetGasCapacity() uint64 { } func (ap *actPool) Validate(ctx context.Context, selp action.SealedEnvelope) error { - ap.mutex.RLock() - defer ap.mutex.RUnlock() return ap.validate(ctx, selp) } func (ap *actPool) DeleteAction(caller address.Address) { - ap.mutex.RLock() - defer ap.mutex.RUnlock() - pendingActs := ap.accountActs[caller.String()].AllActs() - ap.removeInvalidActs(pendingActs) - delete(ap.accountActs, caller.String()) + worker := ap.worker[ap.allocatedWorker(caller)] + if pendingActs := worker.ResetAccount(caller); len(pendingActs) != 0 { + ap.removeInvalidActs(pendingActs) + } } func (ap *actPool) validate(ctx context.Context, selp action.SealedEnvelope) error { @@ -347,7 +395,7 @@ func (ap *actPool) validate(ctx context.Context, selp action.SealedEnvelope) err if err != nil { return err } - if _, ok := ap.allActions[selpHash]; ok { + if _, ok := ap.allActions.Get(selpHash); ok { return nil } for _, ev := range ap.actionEnvelopeValidators { @@ -360,114 +408,6 @@ func (ap *actPool) validate(ctx context.Context, selp action.SealedEnvelope) err return nil } -// ====================================== -// private functions -// ====================================== -func (ap *actPool) enqueueAction(ctx context.Context, addr address.Address, act action.SealedEnvelope, actHash hash.Hash256, actNonce uint64) error { - span := tracer.SpanFromContext(ctx) - defer span.End() - confirmedState, err := accountutil.AccountState(ctx, ap.sf, addr) - if err != nil { - _actpoolMtc.WithLabelValues("failedToGetNonce").Inc() - return errors.Wrapf(err, "failed to get sender's nonce for action %x", actHash) - } - pendingNonce := confirmedState.PendingNonce() - if actNonce < pendingNonce { - return action.ErrNonceTooLow - } - sender := addr.String() - queue := ap.accountActs[sender] - if queue == nil { - span.AddEvent("new queue") - queue = NewActQueue(ap, sender, WithTimeOut(ap.cfg.ActionExpiry)) - ap.accountActs[sender] = queue - // Initialize pending nonce and balance for new account - queue.SetPendingNonce(pendingNonce) - queue.SetPendingBalance(confirmedState.Balance) - } - - if actNonce-pendingNonce >= ap.cfg.MaxNumActsPerAcct { - // Nonce exceeds current range - log.L().Debug("Rejecting action because nonce is too large.", - log.Hex("hash", actHash[:]), - zap.Uint64("startNonce", pendingNonce), - zap.Uint64("actNonce", actNonce)) - _actpoolMtc.WithLabelValues("nonceTooLarge").Inc() - return action.ErrNonceTooHigh - } - - span.AddEvent("act cost") - cost, err := act.Cost() - if err != nil { - _actpoolMtc.WithLabelValues("failedToGetCost").Inc() - return errors.Wrapf(err, "failed to get cost of action %x", actHash) - } - if queue.PendingBalance().Cmp(cost) < 0 { - // Pending balance is insufficient - _actpoolMtc.WithLabelValues("insufficientBalance").Inc() - log.L().Info("insufficient balance for action", - zap.String("actionHash", hex.EncodeToString(actHash[:])), - zap.String("cost", cost.String()), - zap.String("pendingBalance", queue.PendingBalance().String()), - zap.String("sender", sender), - ) - return action.ErrInsufficientFunds - } - - span.AddEvent("queue put") - if err := queue.Put(act); err != nil { - _actpoolMtc.WithLabelValues("failedPutActQueue").Inc() - log.L().Info("failed put action into ActQueue", - zap.String("actionHash", hex.EncodeToString(actHash[:]))) - return err - } - ap.allActions[actHash] = act - - //add actions to destination map - desAddress, ok := act.Destination() - if ok && !strings.EqualFold(sender, desAddress) { - desQueue := ap.accountDesActs[desAddress] - if desQueue == nil { - ap.accountDesActs[desAddress] = make(map[hash.Hash256]action.SealedEnvelope) - } - ap.accountDesActs[desAddress][actHash] = act - } - - span.AddEvent("act.IntrinsicGas") - intrinsicGas, _ := act.IntrinsicGas() - ap.gasInPool += intrinsicGas - // If the pending nonce equals this nonce, update queue - span.AddEvent("queue.PendingNonce") - nonce := queue.PendingNonce() - if actNonce == nonce { - span.AddEvent("ap.updateAccount") - ap.updateAccount(sender) - } - return nil -} - -// removeConfirmedActs removes processed (committed to block) actions from pool -func (ap *actPool) removeConfirmedActs(ctx context.Context) { - for from, queue := range ap.accountActs { - addr, _ := address.FromString(from) - confirmedState, err := accountutil.AccountState(ctx, ap.sf, addr) - if err != nil { - log.L().Error("Error when removing confirmed actions", zap.Error(err)) - return - } - pendingNonce := confirmedState.PendingNonce() - // Remove all actions that are committed to new block - acts := queue.FilterNonce(pendingNonce) - ap.removeInvalidActs(acts) - //del actions in destination map - ap.deleteAccountDestinationActions(acts...) - // Delete the queue entry if it becomes empty - if queue.Empty() { - delete(ap.accountActs, from) - } - } -} - func (ap *actPool) removeInvalidActs(acts []action.SealedEnvelope) { for _, act := range acts { hash, err := act.Hash() @@ -476,76 +416,94 @@ func (ap *actPool) removeInvalidActs(acts []action.SealedEnvelope) { continue } log.L().Debug("Removed invalidated action.", log.Hex("hash", hash[:])) - delete(ap.allActions, hash) + ap.allActions.Delete(hash) intrinsicGas, _ := act.IntrinsicGas() - ap.subGasFromPool(intrinsicGas) - //del actions in destination map - ap.deleteAccountDestinationActions(act) + atomic.AddUint64(&ap.gasInPool, ^uint64(intrinsicGas-1)) + ap.accountDesActs.delete(act) } } -// deleteAccountDestinationActions just for destination map -func (ap *actPool) deleteAccountDestinationActions(acts ...action.SealedEnvelope) { - for _, act := range acts { - hash, err := act.Hash() - if err != nil { - log.L().Debug("Skipping action due to hash error", zap.Error(err)) - continue - } - desAddress, ok := act.Destination() - if ok { - dst := ap.accountDesActs[desAddress] - if dst != nil { - delete(dst, hash) - } - } - } +func (ap *actPool) context(ctx context.Context) context.Context { + return genesis.WithGenesisContext(ctx, ap.g) } -// updateAccount updates queue's status and remove invalidated actions from pool if necessary -func (ap *actPool) updateAccount(sender string) { - queue := ap.accountActs[sender] - acts := queue.UpdateQueue(queue.PendingNonce()) - if len(acts) > 0 { - ap.removeInvalidActs(acts) - } - // Delete the queue entry if it becomes empty - if queue.Empty() { - delete(ap.accountActs, sender) +func (ap *actPool) enqueue(ctx context.Context, act action.SealedEnvelope) error { + var errChan = make(chan error) // unused errChan will be garbage-collected + ap.jobQueue[ap.allocatedWorker(act.SenderAddress())] <- workerJob{ctx, act, errChan} + + for { + select { + case <-ctx.Done(): + log.L().Error("enqueue actpool fails", zap.Error(ctx.Err())) + return ctx.Err() + case ret := <-errChan: + return ret + } } } -func (ap *actPool) context(ctx context.Context) context.Context { - return genesis.WithGenesisContext(ctx, ap.g) +func (ap *actPool) allocatedWorker(senderAddr address.Address) int { + senderBytes := senderAddr.Bytes() + var lastByte uint8 = senderBytes[len(senderBytes)-1] + return int(lastByte) % _numWorker } -func (ap *actPool) reset() { - timer := ap.timerFactory.NewTimer("reset") - defer timer.End() +type destinationMap struct { + mu sync.Mutex + acts map[string]map[hash.Hash256]action.SealedEnvelope +} - ctx := ap.context(context.Background()) - // Remove confirmed actions in actpool - ap.removeConfirmedActs(ctx) - for from, queue := range ap.accountActs { - // Reset pending balance for each account - addr, _ := address.FromString(from) - state, err := accountutil.AccountState(ctx, ap.sf, addr) - if err != nil { - log.L().Error("Error when resetting actpool state.", zap.Error(err)) - return - } - queue.SetPendingBalance(state.Balance) +func (des *destinationMap) addAction(act action.SealedEnvelope) error { + des.mu.Lock() + defer des.mu.Unlock() + destn, ok := act.Destination() + if !ok { + return errors.New("the recipient is empty") + } + actHash, err := act.Hash() + if err != nil { + return err + } + if _, exist := des.acts[destn]; !exist { + des.acts[destn] = make(map[hash.Hash256]action.SealedEnvelope) + } + des.acts[destn][actHash] = act + return nil +} - // Reset pending nonce and remove invalid actions for each account - queue.SetPendingNonce(state.PendingNonce()) - ap.updateAccount(from) +func (des *destinationMap) actionsByDestination(addr string) []action.SealedEnvelope { + des.mu.Lock() + defer des.mu.Unlock() + desMap, ok := des.acts[addr] + if !ok { + return []action.SealedEnvelope{} } + sortActions := make(SortedActions, 0) + for _, v := range desMap { + sortActions = append(sortActions, v) + } + sort.Stable(sortActions) + return sortActions } -func (ap *actPool) subGasFromPool(gas uint64) { - if ap.gasInPool < gas { - ap.gasInPool = 0 +func (des *destinationMap) delete(act action.SealedEnvelope) { + des.mu.Lock() + defer des.mu.Unlock() + desAddress, ok := act.Destination() + if !ok { + return + } + hash, err := act.Hash() + if err != nil { + log.L().Debug("Skipping action due to hash error", zap.Error(err)) + return + } + dst, exist := des.acts[desAddress] + if !exist { return } - ap.gasInPool -= gas + delete(dst, hash) + if len(dst) == 0 { + delete(des.acts, desAddress) + } } diff --git a/actpool/actpool_test.go b/actpool/actpool_test.go index 35b532ae04..8800cc9354 100644 --- a/actpool/actpool_test.go +++ b/actpool/actpool_test.go @@ -12,25 +12,23 @@ import ( "testing" "time" - "github.com/iotexproject/iotex-core/actpool/actioniterator" - "github.com/iotexproject/iotex-core/blockchain/genesis" - "github.com/iotexproject/iotex-core/state" - "github.com/iotexproject/iotex-core/test/mock/mock_chainmanager" - "github.com/iotexproject/iotex-core/test/mock/mock_sealed_envelope_validator" - "github.com/golang/mock/gomock" + "github.com/iotexproject/iotex-address/address" "github.com/pkg/errors" "github.com/stretchr/testify/require" - "github.com/iotexproject/iotex-address/address" - "github.com/iotexproject/iotex-core/action" "github.com/iotexproject/iotex-core/action/protocol" "github.com/iotexproject/iotex-core/action/protocol/account" accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util" "github.com/iotexproject/iotex-core/action/protocol/rewarding" + "github.com/iotexproject/iotex-core/actpool/actioniterator" "github.com/iotexproject/iotex-core/blockchain" + "github.com/iotexproject/iotex-core/blockchain/genesis" + "github.com/iotexproject/iotex-core/state" "github.com/iotexproject/iotex-core/test/identityset" + "github.com/iotexproject/iotex-core/test/mock/mock_chainmanager" + "github.com/iotexproject/iotex-core/test/mock/mock_sealed_envelope_validator" "github.com/iotexproject/iotex-core/testutil" ) @@ -42,7 +40,6 @@ const ( var ( _addr1 = identityset.Address(28).String() - _pubKey1 = identityset.PrivateKey(28).PublicKey() _priKey1 = identityset.PrivateKey(28) _addr2 = identityset.Address(29).String() _priKey2 = identityset.PrivateKey(29) @@ -174,22 +171,22 @@ func TestActPool_AddActs(t *testing.T) { require.NoError(ap.Add(ctx, tsf7)) require.NoError(ap.Add(ctx, tsf8)) - pBalance1, _ := ap.getPendingBalance(_addr1) + pBalance1, _ := getPendingBalance(ap, _addr1) require.Equal(uint64(10), pBalance1.Uint64()) - pNonce1, _ := ap.getPendingNonce(_addr1) + pNonce1, _ := ap.GetPendingNonce(_addr1) require.Equal(uint64(5), pNonce1) - pBalance2, _ := ap.getPendingBalance(_addr2) + pBalance2, _ := getPendingBalance(ap, _addr2) require.Equal(uint64(5), pBalance2.Uint64()) - pNonce2, _ := ap.getPendingNonce(_addr2) + pNonce2, _ := ap.GetPendingNonce(_addr2) require.Equal(uint64(2), pNonce2) tsf9, err := action.SignedTransfer(_addr2, _priKey2, uint64(2), big.NewInt(3), []byte{}, uint64(100000), big.NewInt(0)) require.NoError(err) require.NoError(ap.Add(ctx, tsf9)) - pBalance2, _ = ap.getPendingBalance(_addr2) + pBalance2, _ = getPendingBalance(ap, _addr2) require.Equal(uint64(1), pBalance2.Uint64()) - pNonce2, _ = ap.getPendingNonce(_addr2) + pNonce2, _ = ap.GetPendingNonce(_addr2) require.Equal(uint64(4), pNonce2) // Error Case Handling // Case I: Action source address is blacklisted @@ -210,7 +207,7 @@ func TestActPool_AddActs(t *testing.T) { require.NoError(err) nTsfHash, err := nTsf.Hash() require.NoError(err) - ap2.allActions[nTsfHash] = nTsf + ap2.allActions.Set(nTsfHash, nTsf) } err = ap2.Add(ctx, tsf1) require.Equal(action.ErrTxPoolOverflow, errors.Cause(err)) @@ -226,7 +223,7 @@ func TestActPool_AddActs(t *testing.T) { require.NoError(err) nTsfHash, err := nTsf.Hash() require.NoError(err) - ap3.allActions[nTsfHash] = nTsf + ap3.allActions.Set(nTsfHash, nTsf) intrinsicGas, err := nTsf.IntrinsicGas() require.NoError(err) ap3.gasInPool += intrinsicGas @@ -396,7 +393,7 @@ func TestActPool_removeConfirmedActs(t *testing.T) { require.NoError(acct.AddBalance(big.NewInt(100000000000000000))) return 0, nil - }).Times(8) + }).Times(5) sf.EXPECT().Height().Return(uint64(1), nil).AnyTimes() ctx := genesis.WithGenesisContext(context.Background(), genesis.Default) require.NoError(ap.Add(ctx, tsf1)) @@ -404,8 +401,10 @@ func TestActPool_removeConfirmedActs(t *testing.T) { require.NoError(ap.Add(ctx, tsf3)) require.NoError(ap.Add(ctx, tsf4)) - require.Equal(4, len(ap.allActions)) - require.NotNil(ap.accountActs[_addr1]) + require.Equal(4, ap.allActions.Count()) + addr, err := address.FromString(_addr1) + require.NoError(err) + require.NotNil(ap.worker[ap.allocatedWorker(addr)].GetQueue(addr)) sf.EXPECT().State(gomock.Any(), gomock.Any()).DoAndReturn(func(account interface{}, opts ...protocol.StateOption) (uint64, error) { acct, ok := account.(*state.Account) require.True(ok) @@ -416,9 +415,9 @@ func TestActPool_removeConfirmedActs(t *testing.T) { return 0, nil }).Times(1) - ap.removeConfirmedActs(ctx) - require.Equal(0, len(ap.allActions)) - require.Nil(ap.accountActs[_addr1]) + ap.Reset() + require.Equal(0, ap.allActions.Count()) + require.True(ap.worker[ap.allocatedWorker(addr)].GetQueue(addr).Empty()) } func TestActPool_Reset(t *testing.T) { @@ -542,35 +541,35 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after adding Tsfs above for each account // ap1 // Addr1 - ap1PNonce1, _ := ap1.getPendingNonce(_addr1) + ap1PNonce1, _ := ap1.GetPendingNonce(_addr1) require.Equal(uint64(3), ap1PNonce1) - ap1PBalance1, _ := ap1.getPendingBalance(_addr1) + ap1PBalance1, _ := getPendingBalance(ap1, _addr1) require.Equal(big.NewInt(20).Uint64(), ap1PBalance1.Uint64()) // Addr2 - ap1PNonce2, _ := ap1.getPendingNonce(_addr2) + ap1PNonce2, _ := ap1.GetPendingNonce(_addr2) require.Equal(uint64(3), ap1PNonce2) - ap1PBalance2, _ := ap1.getPendingBalance(_addr2) + ap1PBalance2, _ := getPendingBalance(ap1, _addr2) require.Equal(big.NewInt(50).Uint64(), ap1PBalance2.Uint64()) // Addr3 - ap1PNonce3, _ := ap1.getPendingNonce(_addr3) + ap1PNonce3, _ := ap1.GetPendingNonce(_addr3) require.Equal(uint64(3), ap1PNonce3) - ap1PBalance3, _ := ap1.getPendingBalance(_addr3) + ap1PBalance3, _ := getPendingBalance(ap1, _addr3) require.Equal(big.NewInt(100).Uint64(), ap1PBalance3.Uint64()) // ap2 // Addr1 - ap2PNonce1, _ := ap2.getPendingNonce(_addr1) + ap2PNonce1, _ := ap2.GetPendingNonce(_addr1) require.Equal(uint64(4), ap2PNonce1) - ap2PBalance1, _ := ap2.getPendingBalance(_addr1) + ap2PBalance1, _ := getPendingBalance(ap2, _addr1) require.Equal(big.NewInt(0).Uint64(), ap2PBalance1.Uint64()) // Addr2 - ap2PNonce2, _ := ap2.getPendingNonce(_addr2) + ap2PNonce2, _ := ap2.GetPendingNonce(_addr2) require.Equal(uint64(3), ap2PNonce2) - ap2PBalance2, _ := ap2.getPendingBalance(_addr2) + ap2PBalance2, _ := getPendingBalance(ap2, _addr2) require.Equal(big.NewInt(30).Uint64(), ap2PBalance2.Uint64()) // Addr3 - ap2PNonce3, _ := ap2.getPendingNonce(_addr3) + ap2PNonce3, _ := ap2.GetPendingNonce(_addr3) require.Equal(uint64(3), ap2PNonce3) - ap2PBalance3, _ := ap2.getPendingBalance(_addr3) + ap2PBalance3, _ := getPendingBalance(ap2, _addr3) require.Equal(big.NewInt(50).Uint64(), ap2PBalance3.Uint64()) // Let ap1 be BP's actpool balances[0] = big.NewInt(220) @@ -585,35 +584,35 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after resetting actpool for each account // ap1 // Addr1 - ap1PNonce1, _ = ap1.getPendingNonce(_addr1) + ap1PNonce1, _ = ap1.GetPendingNonce(_addr1) require.Equal(uint64(3), ap1PNonce1) - ap1PBalance1, _ = ap1.getPendingBalance(_addr1) + ap1PBalance1, _ = getPendingBalance(ap1, _addr1) require.Equal(big.NewInt(220).Uint64(), ap1PBalance1.Uint64()) // Addr2 - ap1PNonce2, _ = ap1.getPendingNonce(_addr2) + ap1PNonce2, _ = ap1.GetPendingNonce(_addr2) require.Equal(uint64(3), ap1PNonce2) - ap1PBalance2, _ = ap1.getPendingBalance(_addr2) + ap1PBalance2, _ = getPendingBalance(ap1, _addr2) require.Equal(big.NewInt(200).Uint64(), ap1PBalance2.Uint64()) // Addr3 - ap1PNonce3, _ = ap1.getPendingNonce(_addr3) + ap1PNonce3, _ = ap1.GetPendingNonce(_addr3) require.Equal(uint64(3), ap1PNonce3) - ap1PBalance3, _ = ap1.getPendingBalance(_addr3) + ap1PBalance3, _ = getPendingBalance(ap1, _addr3) require.Equal(big.NewInt(180).Uint64(), ap1PBalance3.Uint64()) // ap2 // Addr1 - ap2PNonce1, _ = ap2.getPendingNonce(_addr1) + ap2PNonce1, _ = ap2.GetPendingNonce(_addr1) require.Equal(uint64(4), ap2PNonce1) - ap2PBalance1, _ = ap2.getPendingBalance(_addr1) + ap2PBalance1, _ = getPendingBalance(ap2, _addr1) require.Equal(big.NewInt(200).Uint64(), ap2PBalance1.Uint64()) // Addr2 - ap2PNonce2, _ = ap2.getPendingNonce(_addr2) + ap2PNonce2, _ = ap2.GetPendingNonce(_addr2) require.Equal(uint64(3), ap2PNonce2) - ap2PBalance2, _ = ap2.getPendingBalance(_addr2) + ap2PBalance2, _ = getPendingBalance(ap2, _addr2) require.Equal(big.NewInt(200).Uint64(), ap2PBalance2.Uint64()) // Addr3 - ap2PNonce3, _ = ap2.getPendingNonce(_addr3) + ap2PNonce3, _ = ap2.GetPendingNonce(_addr3) require.Equal(uint64(3), ap2PNonce3) - ap2PBalance3, _ = ap2.getPendingBalance(_addr3) + ap2PBalance3, _ = getPendingBalance(ap2, _addr3) require.Equal(big.NewInt(180).Uint64(), ap2PBalance3.Uint64()) // Add more Tsfs after resetting // Tsfs To be added to ap1 only @@ -642,35 +641,35 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after adding Tsfs above for each account // ap1 // Addr1 - ap1PNonce1, _ = ap1.getPendingNonce(_addr1) + ap1PNonce1, _ = ap1.GetPendingNonce(_addr1) require.Equal(uint64(3), ap1PNonce1) - ap1PBalance1, _ = ap1.getPendingBalance(_addr1) + ap1PBalance1, _ = getPendingBalance(ap1, _addr1) require.Equal(big.NewInt(220).Uint64(), ap1PBalance1.Uint64()) // Addr2 - ap1PNonce2, _ = ap1.getPendingNonce(_addr2) + ap1PNonce2, _ = ap1.GetPendingNonce(_addr2) require.Equal(uint64(3), ap1PNonce2) - ap1PBalance2, _ = ap1.getPendingBalance(_addr2) + ap1PBalance2, _ = getPendingBalance(ap1, _addr2) require.Equal(big.NewInt(200).Uint64(), ap1PBalance2.Uint64()) // Addr3 - ap1PNonce3, _ = ap1.getPendingNonce(_addr3) + ap1PNonce3, _ = ap1.GetPendingNonce(_addr3) require.Equal(uint64(5), ap1PNonce3) - ap1PBalance3, _ = ap1.getPendingBalance(_addr3) + ap1PBalance3, _ = getPendingBalance(ap1, _addr3) require.Equal(big.NewInt(0).Uint64(), ap1PBalance3.Uint64()) // ap2 // Addr1 - ap2PNonce1, _ = ap2.getPendingNonce(_addr1) + ap2PNonce1, _ = ap2.GetPendingNonce(_addr1) require.Equal(uint64(5), ap2PNonce1) - ap2PBalance1, _ = ap2.getPendingBalance(_addr1) - require.Equal(big.NewInt(50).Uint64(), ap2PBalance1.Uint64()) + ap2PBalance2, _ = getPendingBalance(ap2, _addr2) + require.Equal(big.NewInt(10).Uint64(), ap2PBalance2.Uint64()) // Addr2 - ap2PNonce2, _ = ap2.getPendingNonce(_addr2) + ap2PNonce2, _ = ap2.GetPendingNonce(_addr2) require.Equal(uint64(5), ap2PNonce2) - ap2PBalance2, _ = ap2.getPendingBalance(_addr2) + ap2PBalance2, _ = getPendingBalance(ap2, _addr2) require.Equal(big.NewInt(10).Uint64(), ap2PBalance2.Uint64()) // Addr3 - ap2PNonce3, _ = ap2.getPendingNonce(_addr3) + ap2PNonce3, _ = ap2.GetPendingNonce(_addr3) require.Equal(uint64(3), ap2PNonce3) - ap2PBalance3, _ = ap2.getPendingBalance(_addr3) + ap2PBalance3, _ = getPendingBalance(ap2, _addr3) require.Equal(big.NewInt(180).Uint64(), ap2PBalance3.Uint64()) // Let ap2 be BP's actpool balances[0] = big.NewInt(140) @@ -686,35 +685,35 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after resetting actpool for each account // ap1 // Addr1 - ap1PNonce1, _ = ap1.getPendingNonce(_addr1) + ap1PNonce1, _ = ap1.GetPendingNonce(_addr1) require.Equal(uint64(5), ap1PNonce1) - ap1PBalance1, _ = ap1.getPendingBalance(_addr1) + ap1PBalance1, _ = getPendingBalance(ap1, _addr1) require.Equal(big.NewInt(140).Uint64(), ap1PBalance1.Uint64()) // Addr2 - ap1PNonce2, _ = ap1.getPendingNonce(_addr2) + ap1PNonce2, _ = ap1.GetPendingNonce(_addr2) require.Equal(uint64(5), ap1PNonce2) - ap1PBalance2, _ = ap1.getPendingBalance(_addr2) + ap1PBalance2, _ = getPendingBalance(ap1, _addr2) require.Equal(big.NewInt(180).Uint64(), ap1PBalance2.Uint64()) // Addr3 - ap1PNonce3, _ = ap1.getPendingNonce(_addr3) + ap1PNonce3, _ = ap1.GetPendingNonce(_addr3) require.Equal(uint64(5), ap1PNonce3) - ap1PBalance3, _ = ap1.getPendingBalance(_addr3) + ap1PBalance3, _ = getPendingBalance(ap1, _addr3) require.Equal(big.NewInt(100).Uint64(), ap1PBalance3.Uint64()) // ap2 // Addr1 - ap2PNonce1, _ = ap2.getPendingNonce(_addr1) + ap2PNonce1, _ = ap2.GetPendingNonce(_addr1) require.Equal(uint64(5), ap2PNonce1) - ap2PBalance1, _ = ap2.getPendingBalance(_addr1) + ap2PBalance1, _ = getPendingBalance(ap2, _addr1) require.Equal(big.NewInt(140).Uint64(), ap2PBalance1.Uint64()) // Addr2 - ap2PNonce2, _ = ap2.getPendingNonce(_addr2) + ap2PNonce2, _ = ap2.GetPendingNonce(_addr2) require.Equal(uint64(5), ap2PNonce2) - ap2PBalance2, _ = ap2.getPendingBalance(_addr2) + ap2PBalance2, _ = getPendingBalance(ap2, _addr2) require.Equal(big.NewInt(180).Uint64(), ap2PBalance2.Uint64()) // Addr3 - ap2PNonce3, _ = ap2.getPendingNonce(_addr3) + ap2PNonce3, _ = ap2.GetPendingNonce(_addr3) require.Equal(uint64(3), ap2PNonce3) - ap2PBalance3, _ = ap2.getPendingBalance(_addr3) + ap2PBalance3, _ = getPendingBalance(ap2, _addr3) require.Equal(big.NewInt(280).Uint64(), ap2PBalance3.Uint64()) // Add two more players @@ -755,14 +754,14 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after adding actions above for account4 and account5 // ap1 // Addr4 - ap1PNonce4, _ := ap1.getPendingNonce(_addr4) + ap1PNonce4, _ := ap1.GetPendingNonce(_addr4) require.Equal(uint64(2), ap1PNonce4) - ap1PBalance4, _ := ap1.getPendingBalance(_addr4) + ap1PBalance4, _ := getPendingBalance(ap1, _addr4) require.Equal(big.NewInt(0).Uint64(), ap1PBalance4.Uint64()) // Addr5 - ap1PNonce5, _ := ap1.getPendingNonce(_addr5) + ap1PNonce5, _ := ap1.GetPendingNonce(_addr5) require.Equal(uint64(3), ap1PNonce5) - ap1PBalance5, _ := ap1.getPendingBalance(_addr5) + ap1PBalance5, _ := getPendingBalance(ap1, _addr5) require.Equal(big.NewInt(0).Uint64(), ap1PBalance5.Uint64()) // Let ap1 be BP's actpool balances[3] = big.NewInt(10) @@ -774,14 +773,14 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after resetting actpool for each account // ap1 // Addr4 - ap1PNonce4, _ = ap1.getPendingNonce(_addr4) + ap1PNonce4, _ = ap1.GetPendingNonce(_addr4) require.Equal(uint64(2), ap1PNonce4) - ap1PBalance4, _ = ap1.getPendingBalance(_addr4) + ap1PBalance4, _ = getPendingBalance(ap1, _addr4) require.Equal(big.NewInt(10).Uint64(), ap1PBalance4.Uint64()) // Addr5 - ap1PNonce5, _ = ap1.getPendingNonce(_addr5) + ap1PNonce5, _ = ap1.GetPendingNonce(_addr5) require.Equal(uint64(3), ap1PNonce5) - ap1PBalance5, _ = ap1.getPendingBalance(_addr5) + ap1PBalance5, _ = getPendingBalance(ap1, _addr5) require.Equal(big.NewInt(20).Uint64(), ap1PBalance5.Uint64()) } @@ -824,11 +823,15 @@ func TestActPool_removeInvalidActs(t *testing.T) { hash2, err := tsf4.Hash() require.NoError(err) acts := []action.SealedEnvelope{tsf1, tsf4} - require.NotNil(ap.allActions[hash1]) - require.NotNil(ap.allActions[hash2]) + _, exist1 := ap.allActions.Get(hash1) + require.True(exist1) + _, exist2 := ap.allActions.Get(hash2) + require.True(exist2) ap.removeInvalidActs(acts) - require.Equal(action.SealedEnvelope{}, ap.allActions[hash1]) - require.Equal(action.SealedEnvelope{}, ap.allActions[hash2]) + _, exist1 = ap.allActions.Get(hash1) + require.False(exist1) + _, exist2 = ap.allActions.Get(hash2) + require.False(exist2) } func TestActPool_GetPendingNonce(t *testing.T) { @@ -950,7 +953,7 @@ func TestActPool_GetActionByHash(t *testing.T) { hash2, err := tsf2.Hash() require.NoError(err) - ap.allActions[hash1] = tsf1 + ap.allActions.Set(hash1, tsf1) act, err := ap.GetActionByHash(hash1) require.NoError(err) require.Equal(tsf1, act) @@ -958,7 +961,7 @@ func TestActPool_GetActionByHash(t *testing.T) { require.Equal(action.ErrNotFound, errors.Cause(err)) require.Equal(action.SealedEnvelope{}, act) - ap.allActions[hash2] = tsf2 + ap.allActions.Set(hash2, tsf2) act, err = ap.GetActionByHash(hash2) require.NoError(err) require.Equal(tsf2, act) @@ -1007,7 +1010,7 @@ func TestActPool_GetSize(t *testing.T) { require.NoError(acct.AddBalance(big.NewInt(100000000000000000))) return 0, nil - }).Times(8) + }).Times(5) sf.EXPECT().Height().Return(uint64(1), nil).AnyTimes() ctx := genesis.WithGenesisContext(context.Background(), genesis.Default) require.NoError(ap.Add(ctx, tsf1)) @@ -1026,7 +1029,7 @@ func TestActPool_GetSize(t *testing.T) { return 0, nil }).Times(1) - ap.removeConfirmedActs(ctx) + ap.Reset() require.Equal(uint64(0), ap.GetSize()) require.Equal(uint64(0), ap.GetGasSize()) } @@ -1091,20 +1094,20 @@ func TestActPool_SpeedUpAction(t *testing.T) { require.NoError(ap.Add(ctx, tsf2)) // check account and actpool status - pBalance1, _ := ap.getPendingBalance(_addr1) + pBalance1, _ := getPendingBalance(ap, _addr1) require.Equal(uint64(10000000-10), pBalance1.Uint64()) - pNonce1, _ := ap.getPendingNonce(_addr1) + pNonce1, _ := ap.GetPendingNonce(_addr1) require.Equal(uint64(2), pNonce1) - pBalance2, _ := ap.getPendingBalance(_addr2) + pBalance2, _ := getPendingBalance(ap, _addr2) require.Equal(uint64(10000000-5-10000), pBalance2.Uint64()) - pNonce2, _ := ap.getPendingNonce(_addr2) + pNonce2, _ := ap.GetPendingNonce(_addr2) require.Equal(uint64(2), pNonce2) // A send action tsf3 with nonce 1 and higher gas price require.NoError(ap.Add(ctx, tsf3)) // check account and actpool status again after new action is inserted - pNonce3, _ := ap.getPendingNonce(_addr1) + pNonce3, _ := ap.GetPendingNonce(_addr1) require.Equal(uint64(2), pNonce3) ai := actioniterator.NewActionIterator(ap.PendingActionMap()) @@ -1128,44 +1131,20 @@ func TestActPool_SpeedUpAction(t *testing.T) { } } -// Helper function to return the correct pending nonce just in case of empty queue -func (ap *actPool) getPendingNonce(addr string) (uint64, error) { - if queue, ok := ap.accountActs[addr]; ok { - return queue.PendingNonce(), nil - } - _addr1, err := address.FromString(addr) - if err != nil { - return 0, err - } - committedState, err := accountutil.AccountState( - genesis.WithGenesisContext(context.Background(), genesis.Default), - ap.sf, - _addr1, - ) - if err != nil { - return 0, err - } - return committedState.PendingNonce(), nil -} - // Helper function to return the correct pending balance just in case of empty queue -func (ap *actPool) getPendingBalance(addr string) (*big.Int, error) { - if queue, ok := ap.accountActs[addr]; ok { - return queue.PendingBalance(), nil - } - _addr1, err := address.FromString(addr) +func getPendingBalance(ap *actPool, addrStr string) (*big.Int, error) { + addr, err := address.FromString(addrStr) if err != nil { return nil, err } - state, err := accountutil.AccountState( - genesis.WithGenesisContext(context.Background(), genesis.Default), - ap.sf, - _addr1, - ) + if queue := ap.worker[ap.allocatedWorker(addr)].GetQueue(addr); queue != nil { + q := queue.(*actQueue) + return q.getPendingBalanceAtNonce(q.pendingNonce), nil + } + state, err := accountutil.AccountState(genesis.WithGenesisContext(context.Background(), genesis.Default), ap.sf, addr) if err != nil { return nil, err } - return state.Balance, nil } @@ -1179,14 +1158,6 @@ func getActPoolCfg() Config { } } -func actionMap2Slice(actMap map[string][]action.SealedEnvelope) []action.SealedEnvelope { - acts := make([]action.SealedEnvelope, 0) - for _, parts := range actMap { - acts = append(acts, parts...) - } - return acts -} - func lenPendingActionMap(acts map[string][]action.SealedEnvelope) int { l := 0 for _, part := range acts { diff --git a/actpool/actqueue.go b/actpool/actqueue.go index e870450843..ab117218ca 100644 --- a/actpool/actqueue.go +++ b/actpool/actqueue.go @@ -10,6 +10,7 @@ import ( "context" "math/big" "sort" + "sync" "time" "github.com/facebookgo/clock" @@ -60,16 +61,15 @@ func (h *noncePriorityQueue) Pop() interface{} { // ActQueue is the interface of actQueue type ActQueue interface { Put(action.SealedEnvelope) error - FilterNonce(uint64) []action.SealedEnvelope - UpdateQueue(uint64) []action.SealedEnvelope - SetPendingNonce(uint64) + UpdateQueue() []action.SealedEnvelope + UpdateAccountState(uint64, *big.Int) []action.SealedEnvelope + AccountState() (uint64, *big.Int) PendingNonce() uint64 - SetPendingBalance(*big.Int) - PendingBalance() *big.Int Len() int Empty() bool PendingActs(context.Context) []action.SealedEnvelope AllActs() []action.SealedEnvelope + Reset() } // actQueue is a queue of actions from an account @@ -82,26 +82,28 @@ type actQueue struct { index noncePriorityQueue // Current pending nonce tracking previous actions that can be committed to the next block for the account pendingNonce uint64 - // Current pending balance for the account - pendingBalance *big.Int + // Pending balance map + pendingBalance map[uint64]*big.Int + // Current account nonce + accountNonce uint64 + // Current account balance + accountBalance *big.Int clock clock.Clock ttl time.Duration -} - -// ActQueueOption is the option for actQueue. -type ActQueueOption interface { - SetActQueueOption(*actQueue) + mu sync.RWMutex } // NewActQueue create a new action queue -func NewActQueue(ap *actPool, address string, ops ...ActQueueOption) ActQueue { +func NewActQueue(ap *actPool, address string, pendingNonce uint64, balance *big.Int, ops ...ActQueueOption) ActQueue { aq := &actQueue{ ap: ap, address: address, items: make(map[uint64]action.SealedEnvelope), index: noncePriorityQueue{}, - pendingNonce: uint64(1), // Taking coinbase Action into account, pendingNonce should start with 1 - pendingBalance: big.NewInt(0), + pendingNonce: pendingNonce, + pendingBalance: make(map[uint64]*big.Int), + accountNonce: pendingNonce, + accountBalance: new(big.Int).Set(balance), clock: clock.New(), ttl: 0, } @@ -113,37 +115,80 @@ func NewActQueue(ap *actPool, address string, ops ...ActQueueOption) ActQueue { // Put inserts a new action into the map, also updating the queue's nonce index func (q *actQueue) Put(act action.SealedEnvelope) error { + q.mu.Lock() + defer q.mu.Unlock() nonce := act.Nonce() + + if cost, _ := act.Cost(); q.getPendingBalanceAtNonce(nonce).Cmp(cost) < 0 { + return action.ErrInsufficientFunds + } + if actInPool, exist := q.items[nonce]; exist { - // act of higher gas price cut in line - if act.GasPrice().Cmp(actInPool.GasPrice()) != 1 { + // act of higher gas price can cut in line + if nonce < q.pendingNonce && act.GasPrice().Cmp(actInPool.GasPrice()) != 1 { return action.ErrReplaceUnderpriced } // update action in q.items and q.index q.items[nonce] = act - for i, x := range q.index { - if x.nonce == nonce { + for i := range q.index { + if q.index[i].nonce == nonce { q.index[i].deadline = q.clock.Now().Add(q.ttl) break } } + q.updateFromNonce(nonce) return nil } heap.Push(&q.index, &nonceWithTTL{nonce: nonce, deadline: q.clock.Now().Add(q.ttl)}) q.items[nonce] = act + if nonce == q.pendingNonce { + q.updateFromNonce(q.pendingNonce) + } return nil } -// FilterNonce removes all actions from the map with a nonce lower than the given threshold -func (q *actQueue) FilterNonce(threshold uint64) []action.SealedEnvelope { - var removed []action.SealedEnvelope - // Pop off priority queue and delete corresponding entries from map until the threshold is reached - for q.index.Len() > 0 && (q.index)[0].nonce < threshold { - nonce := heap.Pop(&q.index).(*nonceWithTTL).nonce - removed = append(removed, q.items[nonce]) - delete(q.items, nonce) +func (q *actQueue) getPendingBalanceAtNonce(nonce uint64) *big.Int { + if nonce > q.pendingNonce { + return q.getPendingBalanceAtNonce(q.pendingNonce) } - return removed + if _, exist := q.pendingBalance[nonce]; !exist { + return new(big.Int).Set(q.accountBalance) + } + return new(big.Int).Set(q.pendingBalance[nonce]) +} + +func (q *actQueue) updateFromNonce(start uint64) { + if start > q.pendingNonce { + return + } + + for balance := q.getPendingBalanceAtNonce(start); ; start++ { + act, exist := q.items[start] + if !exist { + break + } + + cost, _ := act.Cost() + if balance.Cmp(cost) < 0 { + break + } + + balance = new(big.Int).Sub(balance, cost) + q.pendingBalance[start+1] = new(big.Int).Set(balance) + } + + q.pendingNonce = start +} + +// UpdateQueue updates the pending nonce and balance of the queue +func (q *actQueue) UpdateQueue() []action.SealedEnvelope { + q.mu.Lock() + defer q.mu.Unlock() + // First remove all timed out actions + removedFromQueue := q.cleanTimeout() + // Now, starting from the current pending nonce, incrementally find the next pending nonce + q.updateFromNonce(q.pendingNonce) + return removedFromQueue } func (q *actQueue) cleanTimeout() []action.SealedEnvelope { @@ -157,8 +202,13 @@ func (q *actQueue) cleanTimeout() []action.SealedEnvelope { ) for i := 0; i < size; { if timeNow.After(q.index[i].deadline) { - removedFromQueue = append(removedFromQueue, q.items[q.index[i].nonce]) - delete(q.items, q.index[i].nonce) + nonce := q.index[i].nonce + if nonce < q.pendingNonce { + q.pendingNonce = nonce + } + removedFromQueue = append(removedFromQueue, q.items[nonce]) + delete(q.items, nonce) + delete(q.pendingBalance, nonce) q.index[i] = q.index[size-1] size-- continue @@ -171,82 +221,62 @@ func (q *actQueue) cleanTimeout() []action.SealedEnvelope { return removedFromQueue } -// UpdateQueue updates the pending nonce and balance of the queue -func (q *actQueue) UpdateQueue(nonce uint64) []action.SealedEnvelope { - // First remove all timed out actions - removedFromQueue := q.cleanTimeout() - - // Now, starting from the current pending nonce, incrementally find the next pending nonce - // while updating pending balance if actions are payable - for ; ; nonce++ { - _, exist := q.items[nonce] - if !exist { - break - } - if !q.enoughBalance(q.items[nonce], true) { - break - } - } +// UpdateAccountState updates the account's nonce and balance and cleans confirmed actions +func (q *actQueue) UpdateAccountState(nonce uint64, balance *big.Int) []action.SealedEnvelope { + q.mu.Lock() + defer q.mu.Unlock() q.pendingNonce = nonce - - // Find the index of new pending nonce within the queue - sort.Sort(q.index) - i := 0 - for ; i < q.index.Len(); i++ { - if q.index[i].nonce >= nonce { - break - } - } - // Case I: An unpayable action has been found while updating pending nonce/balance - // Remove all the subsequent actions in the queue starting from the index of new pending nonce - if _, exist := q.items[nonce]; exist { - removedFromQueue = append(removedFromQueue, q.removeActs(i)...) - return removedFromQueue - } - - // Case II: All actions are payable while updating pending nonce/balance - // Check all the subsequent actions in the queue starting from the index of new pending nonce - // Find the nonce index of the first unpayable action - // Remove all the subsequent actions in the queue starting from that index - for ; i < q.index.Len(); i++ { - nonce = q.index[i].nonce - act := q.items[nonce] - if !q.enoughBalance(act, false) { - break - } + q.pendingBalance = make(map[uint64]*big.Int) + q.accountNonce = nonce + q.accountBalance.Set(balance) + var removed []action.SealedEnvelope + // Pop off priority queue and delete corresponding entries from map + for q.index.Len() > 0 && (q.index)[0].nonce < q.accountNonce { + nonce := heap.Pop(&q.index).(*nonceWithTTL).nonce + removed = append(removed, q.items[nonce]) + delete(q.items, nonce) } - removedFromQueue = append(removedFromQueue, q.removeActs(i)...) - return removedFromQueue + return removed } -// SetPendingNonce sets pending nonce for the queue -func (q *actQueue) SetPendingNonce(nonce uint64) { - q.pendingNonce = nonce +// AccountState returns the current account's nonce and balance +func (q *actQueue) AccountState() (uint64, *big.Int) { + q.mu.RLock() + defer q.mu.RUnlock() + return q.accountNonce, new(big.Int).Set(q.accountBalance) } // PendingNonce returns the current pending nonce of the queue func (q *actQueue) PendingNonce() uint64 { + q.mu.RLock() + defer q.mu.RUnlock() return q.pendingNonce } -// SetPendingBalance sets pending balance for the queue -func (q *actQueue) SetPendingBalance(balance *big.Int) { - q.pendingBalance = balance -} - -// PendingBalance returns the current pending balance of the queue -func (q *actQueue) PendingBalance() *big.Int { - return q.pendingBalance -} - // Len returns the length of the action map func (q *actQueue) Len() int { + q.mu.RLock() + defer q.mu.RUnlock() return len(q.items) } // Empty returns whether the queue of actions is empty or not func (q *actQueue) Empty() bool { - return q.Len() == 0 + q.mu.RLock() + defer q.mu.RUnlock() + return len(q.items) == 0 +} + +// Reset makes the queue into a dummy queue +func (q *actQueue) Reset() { + q.mu.Lock() + defer q.mu.Unlock() + q.items = make(map[uint64]action.SealedEnvelope) + q.index = noncePriorityQueue{} + q.pendingNonce = 0 + q.pendingBalance = make(map[uint64]*big.Int) + q.accountNonce = 0 + q.accountBalance = big.NewInt(0) } // PendingActs creates a consecutive nonce-sorted slice of actions @@ -254,7 +284,6 @@ func (q *actQueue) PendingActs(ctx context.Context) []action.SealedEnvelope { if q.Len() == 0 { return nil } - acts := make([]action.SealedEnvelope, 0, len(q.items)) addr, err := address.FromString(q.address) if err != nil { log.L().Error("Error when getting the address", zap.String("address", q.address), zap.Error(err)) @@ -265,20 +294,37 @@ func (q *actQueue) PendingActs(ctx context.Context) []action.SealedEnvelope { log.L().Error("Error when getting the nonce", zap.String("address", q.address), zap.Error(err)) return nil } - nonce := confirmedState.PendingNonce() + + var ( + nonce = confirmedState.PendingNonce() + balance = new(big.Int).Set(confirmedState.Balance) + acts = make([]action.SealedEnvelope, 0, len(q.items)) + ) + q.mu.RLock() + defer q.mu.RUnlock() for ; ; nonce++ { - if _, exist := q.items[nonce]; !exist { + act, exist := q.items[nonce] + if !exist { break } - acts = append(acts, q.items[nonce]) + + cost, _ := act.Cost() + if balance.Cmp(cost) < 0 { + break + } + + balance = new(big.Int).Sub(balance, cost) + acts = append(acts, act) } return acts } // AllActs returns all the actions currently in queue func (q *actQueue) AllActs() []action.SealedEnvelope { + q.mu.Lock() + defer q.mu.Unlock() acts := make([]action.SealedEnvelope, 0, len(q.items)) - if q.Len() == 0 { + if len(q.items) == 0 { return acts } sort.Sort(q.index) @@ -287,29 +333,3 @@ func (q *actQueue) AllActs() []action.SealedEnvelope { } return acts } - -// removeActs removes all the actions starting at idx from queue -func (q *actQueue) removeActs(idx int) []action.SealedEnvelope { - removedFromQueue := make([]action.SealedEnvelope, 0) - for i := idx; i < q.index.Len(); i++ { - removedFromQueue = append(removedFromQueue, q.items[q.index[i].nonce]) - delete(q.items, q.index[i].nonce) - } - q.index = q.index[:idx] - heap.Init(&q.index) - return removedFromQueue -} - -// enoughBalance helps check whether queue's pending balance is sufficient for the given action -func (q *actQueue) enoughBalance(act action.SealedEnvelope, updateBalance bool) bool { - cost, _ := act.Cost() - if q.pendingBalance.Cmp(cost) < 0 { - return false - } - - if updateBalance { - q.pendingBalance.Sub(q.pendingBalance, cost) - } - - return true -} diff --git a/actpool/actqueue_test.go b/actpool/actqueue_test.go index 5074931edf..ffc372b069 100644 --- a/actpool/actqueue_test.go +++ b/actpool/actqueue_test.go @@ -15,7 +15,6 @@ import ( "github.com/facebookgo/clock" "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/iotexproject/iotex-core/action" @@ -26,6 +25,10 @@ import ( "github.com/iotexproject/iotex-core/test/mock/mock_chainmanager" ) +const ( + maxBalance = 1e7 +) + func TestNoncePriorityQueue(t *testing.T) { require := require.New(t) pq := noncePriorityQueue{} @@ -58,7 +61,7 @@ func TestNoncePriorityQueue(t *testing.T) { func TestActQueuePut(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "").(*actQueue) + q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1)) require.NoError(err) require.NoError(q.Put(tsf1)) @@ -83,7 +86,7 @@ func TestActQueuePut(t *testing.T) { func TestActQueueFilterNonce(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "").(*actQueue) + q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(1), nil, uint64(0), big.NewInt(0)) @@ -93,7 +96,7 @@ func TestActQueueFilterNonce(t *testing.T) { require.NoError(q.Put(tsf1)) require.NoError(q.Put(tsf2)) require.NoError(q.Put(tsf3)) - q.FilterNonce(uint64(3)) + q.UpdateAccountState(3, big.NewInt(maxBalance)) require.Equal(1, len(q.items)) require.Equal(uint64(3), q.index[0].nonce) require.Equal(tsf3, q.items[q.index[0].nonce]) @@ -101,26 +104,23 @@ func TestActQueueFilterNonce(t *testing.T) { func TestActQueueUpdateNonce(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "").(*actQueue) + q := NewActQueue(nil, "", 1, big.NewInt(1010)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) - tsf3, err := action.SignedTransfer(_addr2, _priKey1, 4, big.NewInt(10000), nil, uint64(0), big.NewInt(0)) + tsf3, err := action.SignedTransfer(_addr2, _priKey1, 4, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) - tsf4, err := action.SignedTransfer(_addr2, _priKey1, 6, big.NewInt(100000), nil, uint64(0), big.NewInt(0)) + tsf4, err := action.SignedTransfer(_addr2, _priKey1, 6, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) - tsf5, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100000), nil, uint64(0), big.NewInt(0)) + tsf5, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) require.NoError(q.Put(tsf1)) require.NoError(q.Put(tsf2)) require.NoError(q.Put(tsf3)) require.NoError(q.Put(tsf4)) - q.pendingBalance = big.NewInt(1000) require.NoError(q.Put(tsf5)) - removed := q.UpdateQueue(uint64(2)) - require.Equal(uint64(2), q.pendingNonce) - require.Equal([]action.SealedEnvelope{tsf5, tsf2, tsf3, tsf4}, removed) + require.Equal(uint64(3), q.pendingNonce) } func TestActQueuePendingActs(t *testing.T) { @@ -130,12 +130,13 @@ func TestActQueuePendingActs(t *testing.T) { sf := mock_chainmanager.NewMockStateReader(ctrl) sf.EXPECT().State(gomock.Any(), gomock.Any()).Do(func(accountState *state.Account, _ protocol.StateOption) { require.NoError(accountState.SetPendingNonce(accountState.PendingNonce() + 1)) + accountState.Balance = big.NewInt(maxBalance) }).Return(uint64(0), nil).Times(1) sf.EXPECT().Height().Return(uint64(1), nil).AnyTimes() ctx := genesis.WithGenesisContext(context.Background(), genesis.Default) ap, err := NewActPool(genesis.Default, sf, DefaultConfig, EnableExperimentalActions()) require.NoError(err) - q := NewActQueue(ap.(*actPool), identityset.Address(0).String()).(*actQueue) + q := NewActQueue(ap.(*actPool), identityset.Address(0).String(), 1, big.NewInt(maxBalance)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0)) @@ -158,7 +159,7 @@ func TestActQueuePendingActs(t *testing.T) { func TestActQueueAllActs(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "").(*actQueue) + q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf3, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) @@ -169,41 +170,9 @@ func TestActQueueAllActs(t *testing.T) { require.Equal([]action.SealedEnvelope{tsf1, tsf3}, actions) } -func TestActQueueRemoveActs(t *testing.T) { - require := require.New(t) - q := NewActQueue(nil, "").(*actQueue) - tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - tsf2, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - tsf3, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - require.NoError(q.Put(tsf1)) - require.NoError(q.Put(tsf2)) - require.NoError(q.Put(tsf3)) - removed := q.removeActs(0) - require.Equal(0, len(q.index)) - require.Equal(0, len(q.items)) - require.Equal([]action.SealedEnvelope{tsf1, tsf2, tsf3}, removed) - - tsf4, err := action.SignedTransfer(_addr2, _priKey1, 4, big.NewInt(10000), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - tsf5, err := action.SignedTransfer(_addr2, _priKey1, 5, big.NewInt(100000), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - tsf6, err := action.SignedTransfer(_addr2, _priKey1, 6, big.NewInt(100000), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - require.NoError(q.Put(tsf4)) - require.NoError(q.Put(tsf5)) - require.NoError(q.Put(tsf6)) - removed = q.removeActs(1) - require.Equal(1, len(q.index)) - require.Equal(1, len(q.items)) - require.Equal([]action.SealedEnvelope{tsf5, tsf6}, removed) -} - func TestActQueueTimeOutAction(t *testing.T) { c := clock.NewMock() - q := NewActQueue(nil, "", WithClock(c), WithTimeOut(3*time.Minute)) + q := NewActQueue(nil, "", 1, big.NewInt(maxBalance), WithClock(c), WithTimeOut(3*time.Minute)) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(0)) require.NoError(t, err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0)) @@ -214,15 +183,15 @@ func TestActQueueTimeOutAction(t *testing.T) { require.NoError(t, q.Put(tsf2)) q.(*actQueue).cleanTimeout() - assert.Equal(t, 2, q.Len()) + require.Equal(t, 2, q.Len()) c.Add(2 * time.Minute) q.(*actQueue).cleanTimeout() - assert.Equal(t, 1, q.Len()) + require.Equal(t, 1, q.Len()) } func TestActQueueCleanTimeout(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "").(*actQueue) + q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue) q.ttl = 1 invalidTime := time.Now() validTime := time.Now().Add(10 * time.Minute) diff --git a/actpool/options.go b/actpool/options.go index dafbd905ad..821f9523ab 100644 --- a/actpool/options.go +++ b/actpool/options.go @@ -11,6 +11,11 @@ import ( "github.com/facebookgo/clock" ) +// ActQueueOption is the option for actQueue. +type ActQueueOption interface { + SetActQueueOption(*actQueue) +} + type clockOption struct{ c clock.Clock } // WithClock returns an option to overwrite clock. diff --git a/actpool/queueworker.go b/actpool/queueworker.go new file mode 100644 index 0000000000..9cc923179b --- /dev/null +++ b/actpool/queueworker.go @@ -0,0 +1,278 @@ +package actpool + +import ( + "context" + "encoding/hex" + "errors" + "math/big" + "strings" + "sync" + "sync/atomic" + + "github.com/iotexproject/go-pkgs/cache/ttl" + "github.com/iotexproject/iotex-address/address" + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/action" + accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util" + "github.com/iotexproject/iotex-core/pkg/log" + "github.com/iotexproject/iotex-core/pkg/tracer" +) + +type ( + queueWorker struct { + queue chan workerJob + ap *actPool + mu sync.RWMutex + accountActs map[string]ActQueue + emptyAccounts *ttl.Cache + } + + workerJob struct { + ctx context.Context + act action.SealedEnvelope + err chan error + } + + pendingActions struct { + sender string + acts []action.SealedEnvelope + } +) + +func newQueueWorker(ap *actPool, jobQueue chan workerJob) *queueWorker { + acc, _ := ttl.NewCache() + return &queueWorker{ + queue: jobQueue, + ap: ap, + accountActs: make(map[string]ActQueue), + emptyAccounts: acc, + } +} + +func (worker *queueWorker) Start() error { + if worker.queue == nil || worker.ap == nil { + return errors.New("worker is invalid") + } + go func() { + for { + job, more := <-worker.queue + if !more { // worker chan is closed + return + } + job.err <- worker.Handle(job) + } + }() + return nil +} + +func (worker *queueWorker) Stop() error { + close(worker.queue) + return nil +} + +// Hanlde is called sequentially by worker +func (worker *queueWorker) Handle(job workerJob) error { + ctx := job.ctx + // ctx is canceled or timeout + if ctx.Err() != nil { + return ctx.Err() + } + + var ( + span = tracer.SpanFromContext(ctx) + act = job.act + sender = act.SenderAddress().String() + actHash, _ = act.Hash() + intrinsicGas, _ = act.IntrinsicGas() + ) + defer span.End() + + nonce, balance, err := worker.getConfirmedState(ctx, act.SenderAddress()) + if err != nil { + return err + } + + if err := worker.checkSelpWithState(&act, nonce, balance); err != nil { + return err + } + + if err := worker.putAction(sender, act, nonce, balance); err != nil { + return err + } + + worker.ap.allActions.Set(actHash, act) + + if desAddress, ok := act.Destination(); ok && !strings.EqualFold(sender, desAddress) { + if err := worker.ap.accountDesActs.addAction(act); err != nil { + log.L().Debug("fail to add destionation map", zap.Error(err)) + } + } + + atomic.AddUint64(&worker.ap.gasInPool, intrinsicGas) + + worker.mu.Lock() + defer worker.mu.Unlock() + worker.removeEmptyAccounts() + + return nil +} + +func (worker *queueWorker) getConfirmedState(ctx context.Context, sender address.Address) (uint64, *big.Int, error) { + worker.mu.RLock() + queue := worker.accountActs[sender.String()] + worker.mu.RUnlock() + // account state isn't cached in the actpool + if queue == nil { + confirmedState, err := accountutil.AccountState(ctx, worker.ap.sf, sender) + if err != nil { + return 0, nil, err + } + return confirmedState.PendingNonce(), confirmedState.Balance, nil + } + nonce, balance := queue.AccountState() + return nonce, balance, nil +} + +func (worker *queueWorker) checkSelpWithState(act *action.SealedEnvelope, pendingNonce uint64, balance *big.Int) error { + if act.Nonce() < pendingNonce { + _actpoolMtc.WithLabelValues("nonceTooSmall").Inc() + return action.ErrNonceTooLow + } + + // Nonce exceeds current range + if act.Nonce()-pendingNonce >= worker.ap.cfg.MaxNumActsPerAcct { + hash, _ := act.Hash() + log.L().Debug("Rejecting action because nonce is too large.", + log.Hex("hash", hash[:]), + zap.Uint64("startNonce", pendingNonce), + zap.Uint64("actNonce", act.Nonce())) + _actpoolMtc.WithLabelValues("nonceTooLarge").Inc() + return action.ErrNonceTooHigh + } + + if cost, _ := act.Cost(); balance.Cmp(cost) < 0 { + _actpoolMtc.WithLabelValues("insufficientBalance").Inc() + sender := act.SenderAddress().String() + actHash, _ := act.Hash() + log.L().Info("insufficient balance for action", + zap.String("actionHash", hex.EncodeToString(actHash[:])), + zap.String("cost", cost.String()), + zap.String("balance", balance.String()), + zap.String("sender", sender), + ) + return action.ErrInsufficientFunds + } + return nil +} + +func (worker *queueWorker) putAction(sender string, act action.SealedEnvelope, pendingNonce uint64, confirmedBalance *big.Int) error { + worker.mu.RLock() + queue := worker.accountActs[sender] + worker.mu.RUnlock() + + if queue == nil { + queue = NewActQueue(worker.ap, + sender, + pendingNonce, + confirmedBalance, + WithTimeOut(worker.ap.cfg.ActionExpiry), + ) + worker.mu.Lock() + worker.accountActs[sender] = queue + worker.mu.Unlock() + } + + if err := queue.Put(act); err != nil { + actHash, _ := act.Hash() + _actpoolMtc.WithLabelValues("failedPutActQueue").Inc() + log.L().Info("failed put action into ActQueue", + zap.String("actionHash", hex.EncodeToString(actHash[:])), + zap.Error(err)) + return err + } + + return nil +} + +func (worker *queueWorker) removeEmptyAccounts() { + if worker.emptyAccounts.Count() == 0 { + return + } + + worker.emptyAccounts.Range(func(key, _ interface{}) error { + sender := key.(string) + if worker.accountActs[sender].Empty() { + delete(worker.accountActs, sender) + } + return nil + }) + + worker.emptyAccounts.Reset() +} + +func (worker *queueWorker) Reset(ctx context.Context) { + worker.mu.RLock() + defer worker.mu.RUnlock() + + for from, queue := range worker.accountActs { + addr, _ := address.FromString(from) + confirmedState, err := accountutil.AccountState(ctx, worker.ap.sf, addr) + if err != nil { + log.L().Error("Error when removing confirmed actions", zap.Error(err)) + queue.Reset() + worker.emptyAccounts.Set(from, struct{}{}) + continue + } + // Remove all actions that are committed to new block + acts := queue.UpdateAccountState(confirmedState.PendingNonce(), confirmedState.Balance) + acts2 := queue.UpdateQueue() + worker.ap.removeInvalidActs(append(acts, acts2...)) + // Delete the queue entry if it becomes empty + if queue.Empty() { + worker.emptyAccounts.Set(from, struct{}{}) + } + } +} + +// PendingActions returns all accepted actions +func (worker *queueWorker) PendingActions(ctx context.Context) []*pendingActions { + actionArr := make([]*pendingActions, 0) + + worker.mu.RLock() + defer worker.mu.RUnlock() + for from, queue := range worker.accountActs { + if queue.Empty() { + continue + } + // Remove the actions that are already timeout + acts := queue.UpdateQueue() + worker.ap.removeInvalidActs(acts) + actionArr = append(actionArr, &pendingActions{ + sender: from, + acts: queue.PendingActs(ctx), + }) + } + return actionArr +} + +// GetQueue returns the actQueue of sender +func (worker *queueWorker) GetQueue(sender address.Address) ActQueue { + worker.mu.RLock() + defer worker.mu.RUnlock() + return worker.accountActs[sender.String()] +} + +// ResetAccount resets account in the accountActs of worker +func (worker *queueWorker) ResetAccount(sender address.Address) []action.SealedEnvelope { + senderStr := sender.String() + worker.mu.RLock() + defer worker.mu.RUnlock() + if queue := worker.accountActs[senderStr]; queue != nil { + pendingActs := queue.AllActs() + queue.Reset() + worker.emptyAccounts.Set(senderStr, struct{}{}) + return pendingActs + } + return nil +} diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 70c910e412..c7e89f7e95 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -9,11 +9,9 @@ import ( "context" "fmt" "sync" - "sync/atomic" "time" "github.com/libp2p/go-libp2p-core/peer" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -39,7 +37,7 @@ type ( var ( // DefaultConfig is the default config DefaultConfig = Config{ - ActionChanSize: 1000, + ActionChanSize: 5000, BlockChanSize: 1000, BlockSyncChanSize: 400, ProcessSyncRequestInterval: 0 * time.Second, @@ -118,8 +116,7 @@ func (m actionMsg) ChainID() uint32 { // IotxDispatcher is the request and event dispatcher for iotx node. type IotxDispatcher struct { - started int32 - shutdown int32 + lifecycle.Readiness actionChanLock sync.RWMutex blockChanLock sync.RWMutex syncChanLock sync.RWMutex @@ -163,23 +160,28 @@ func (d *IotxDispatcher) AddSubscriber( // Start starts the dispatcher. func (d *IotxDispatcher) Start(ctx context.Context) error { - if atomic.AddInt32(&d.started, 1) != 1 { - return errors.New("Dispatcher already started") - } log.L().Info("Starting dispatcher.") - d.wg.Add(3) - go d.actionHandler() + + // setup mutiple action consumers to enqueue actions into actpool + for i := 0; i < cap(d.actionChan)/5; i++ { + d.wg.Add(1) + go d.actionHandler() + } + + d.wg.Add(1) go d.blockHandler() + + d.wg.Add(1) go d.syncHandler() - return nil + return d.TurnOn() } // Stop gracefully shuts down the dispatcher by stopping all handlers and waiting for them to finish. func (d *IotxDispatcher) Stop(ctx context.Context) error { - if atomic.AddInt32(&d.shutdown, 1) != 1 { + if err := d.TurnOff(); err != nil { log.L().Warn("Dispatcher already in the process of shutting down.") - return nil + return err } log.L().Info("Dispatcher is shutting down.") close(d.quit) @@ -208,12 +210,12 @@ func (d *IotxDispatcher) EventAudit() map[iotexrpc.MessageType]int { } func (d *IotxDispatcher) actionHandler() { + defer d.wg.Done() for { select { case a := <-d.actionChan: d.handleActionMsg(a) case <-d.quit: - d.wg.Done() log.L().Info("action handler is terminated.") return } @@ -222,12 +224,12 @@ func (d *IotxDispatcher) actionHandler() { // blockHandler is the main handler for handling all news from peers. func (d *IotxDispatcher) blockHandler() { + defer d.wg.Done() for { select { case b := <-d.blockChan: d.handleBlockMsg(b) case <-d.quit: - d.wg.Done() log.L().Info("block handler is terminated.") return } @@ -236,12 +238,12 @@ func (d *IotxDispatcher) blockHandler() { // syncHandler handles incoming block sync requests func (d *IotxDispatcher) syncHandler() { + defer d.wg.Done() for { select { case m := <-d.syncChan: d.handleBlockSyncMsg(m) case <-d.quit: - d.wg.Done() log.L().Info("block sync handler done.") return } @@ -320,7 +322,7 @@ func (d *IotxDispatcher) handleBlockSyncMsg(m *blockSyncMsg) { // dispatchAction adds the passed action message to the news handling queue. func (d *IotxDispatcher) dispatchAction(ctx context.Context, chainID uint32, msg proto.Message) { - if atomic.LoadInt32(&d.shutdown) != 0 { + if !d.IsReady() { return } subscriber := d.subscriber(chainID) @@ -347,7 +349,7 @@ func (d *IotxDispatcher) dispatchAction(ctx context.Context, chainID uint32, msg // dispatchBlock adds the passed block message to the news handling queue. func (d *IotxDispatcher) dispatchBlock(ctx context.Context, chainID uint32, peer string, msg proto.Message) { - if atomic.LoadInt32(&d.shutdown) != 0 { + if !d.IsReady() { return } subscriber := d.subscriber(chainID) @@ -375,7 +377,7 @@ func (d *IotxDispatcher) dispatchBlock(ctx context.Context, chainID uint32, peer // dispatchBlockSyncReq adds the passed block sync request to the news handling queue. func (d *IotxDispatcher) dispatchBlockSyncReq(ctx context.Context, chainID uint32, peer peer.AddrInfo, msg proto.Message) { - if atomic.LoadInt32(&d.shutdown) != 0 { + if !d.IsReady() { return } subscriber := d.subscriber(chainID)