From 653da5bcd0737d83d929dcb3e7eb92712fe9194b Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 5 Jan 2018 22:58:17 +0200 Subject: [PATCH] Result of tx processing returned as QueuedTxResult Currently it is quite easy to introduce concurrency issues while working with transaction object. For example, race issue will exist every time while transaction is processed in a separate goroutine and caller will try to check for an error before event to Done channel is sent. This change removes all the data that is updated on transaction and leaves it with ID, Args and Context (which is not used at the moment). Signed-off-by: Dmitry Shulyak --- e2e/transactions/transactions_test.go | 8 +-- geth/api/api.go | 2 +- geth/api/backend.go | 19 +++--- geth/common/types.go | 8 +-- geth/common/utils.go | 3 +- geth/transactions/errors.go | 10 +++ geth/transactions/fake/mock.go | 46 +++++++------- geth/transactions/fake/txservice.go | 4 +- geth/transactions/notifications.go | 27 ++++----- geth/transactions/queue/queue.go | 31 +++++----- geth/transactions/queue/queue_test.go | 34 +++++------ geth/transactions/txqueue_manager.go | 74 ++++++++++++----------- geth/transactions/txqueue_manager_test.go | 64 ++++++++++++-------- lib/utils.go | 8 +-- 14 files changed, 176 insertions(+), 162 deletions(-) create mode 100644 geth/transactions/errors.go diff --git a/e2e/transactions/transactions_test.go b/e2e/transactions/transactions_test.go index 94912b85870..180bd193972 100644 --- a/e2e/transactions/transactions_test.go +++ b/e2e/transactions/transactions_test.go @@ -495,7 +495,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -511,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") + s.EqualError(err, transactions.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") select { case <-completeQueuedTransaction: @@ -659,7 +659,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -681,7 +681,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - require.EqualError(err, queue.ErrQueuedTxDiscarded.Error()) + require.EqualError(err, transactions.ErrQueuedTxDiscarded.Error()) require.Equal(gethcommon.Hash{}, txHashCheck, "transaction returned hash, while it shouldn't") } diff --git a/geth/api/api.go b/geth/api/api.go index f3c4848e22b..4234ea4c97d 100644 --- a/geth/api/api.go +++ b/geth/api/api.go @@ -172,7 +172,7 @@ func (api *StatusAPI) CompleteTransaction(id common.QueuedTxID, password string) } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { +func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { return api.b.txQueueManager.CompleteTransactions(ids, password) } diff --git a/geth/api/backend.go b/geth/api/backend.go index 962bcbcb75b..ad528693608 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string { } // SendTransaction creates a new transaction and waits until it's complete. -func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) { +func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) { if ctx == nil { ctx = context.Background() } - tx := common.CreateTransaction(ctx, args) - - if err := m.txQueueManager.QueueTransaction(tx); err != nil { - return gethcommon.Hash{}, err + if err = m.txQueueManager.QueueTransaction(tx); err != nil { + return hash, err } - - if err := m.txQueueManager.WaitForTransaction(tx); err != nil { - return gethcommon.Hash{}, err + rst := m.txQueueManager.WaitForTransaction(tx) + if rst.Error != nil { + return hash, rst.Error } - - return tx.Hash, nil + return rst.Hash, nil } // CompleteTransaction instructs backend to complete sending of a given transaction @@ -224,7 +221,7 @@ func (m *StatusBackend) CompleteTransaction(id common.QueuedTxID, password strin } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { +func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { return m.txQueueManager.CompleteTransactions(ids, password) } diff --git a/geth/common/types.go b/geth/common/types.go index e35cfe09a00..bea97177dba 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -141,8 +141,8 @@ type AccountManager interface { AddressToDecryptedAccount(address, password string) (accounts.Account, *keystore.Key, error) } -// RawCompleteTransactionResult is a JSON returned from transaction complete function (used internally) -type RawCompleteTransactionResult struct { +// TransactionResult is a JSON returned from transaction complete function (used internally) +type TransactionResult struct { Hash common.Hash Error error } @@ -158,11 +158,9 @@ type QueuedTxID string // QueuedTx holds enough information to complete the queued transaction. type QueuedTx struct { ID QueuedTxID - Hash common.Hash Context context.Context Args SendTxArgs - Done chan struct{} - Err error + Result chan TransactionResult } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. diff --git a/geth/common/utils.go b/geth/common/utils.go index c4ad1d152b9..b81bd3455fb 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) { func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { return &QueuedTx{ ID: QueuedTxID(uuid.New()), - Hash: common.Hash{}, Context: ctx, Args: args, - Done: make(chan struct{}), + Result: make(chan TransactionResult, 1), } } diff --git a/geth/transactions/errors.go b/geth/transactions/errors.go new file mode 100644 index 00000000000..7eb38081ee8 --- /dev/null +++ b/geth/transactions/errors.go @@ -0,0 +1,10 @@ +package transactions + +import "errors" + +var ( + //ErrQueuedTxTimedOut - error transaction sending timed out + ErrQueuedTxTimedOut = errors.New("transaction sending timed out") + //ErrQueuedTxDiscarded - error transaction discarded + ErrQueuedTxDiscarded = errors.New("transaction has been discarded") +) diff --git a/geth/transactions/fake/mock.go b/geth/transactions/fake/mock.go index 6b4218978d1..2b05dbbb681 100644 --- a/geth/transactions/fake/mock.go +++ b/geth/transactions/fake/mock.go @@ -14,31 +14,31 @@ import ( reflect "reflect" ) -// MockFakePublicTransactionPoolAPI is a mock of FakePublicTransactionPoolAPI interface -type MockFakePublicTransactionPoolAPI struct { +// MockPublicTransactionPoolAPI is a mock of PublicTransactionPoolAPI interface +type MockPublicTransactionPoolAPI struct { ctrl *gomock.Controller - recorder *MockFakePublicTransactionPoolAPIMockRecorder + recorder *MockPublicTransactionPoolAPIMockRecorder } -// MockFakePublicTransactionPoolAPIMockRecorder is the mock recorder for MockFakePublicTransactionPoolAPI -type MockFakePublicTransactionPoolAPIMockRecorder struct { - mock *MockFakePublicTransactionPoolAPI +// MockPublicTransactionPoolAPIMockRecorder is the mock recorder for MockPublicTransactionPoolAPI +type MockPublicTransactionPoolAPIMockRecorder struct { + mock *MockPublicTransactionPoolAPI } -// NewMockFakePublicTransactionPoolAPI creates a new mock instance -func NewMockFakePublicTransactionPoolAPI(ctrl *gomock.Controller) *MockFakePublicTransactionPoolAPI { - mock := &MockFakePublicTransactionPoolAPI{ctrl: ctrl} - mock.recorder = &MockFakePublicTransactionPoolAPIMockRecorder{mock} +// NewMockPublicTransactionPoolAPI creates a new mock instance +func NewMockPublicTransactionPoolAPI(ctrl *gomock.Controller) *MockPublicTransactionPoolAPI { + mock := &MockPublicTransactionPoolAPI{ctrl: ctrl} + mock.recorder = &MockPublicTransactionPoolAPIMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockFakePublicTransactionPoolAPI) EXPECT() *MockFakePublicTransactionPoolAPIMockRecorder { +func (m *MockPublicTransactionPoolAPI) EXPECT() *MockPublicTransactionPoolAPIMockRecorder { return m.recorder } // GasPrice mocks base method -func (m *MockFakePublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.Int, error) { +func (m *MockPublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.Int, error) { ret := m.ctrl.Call(m, "GasPrice", ctx) ret0, _ := ret[0].(*big.Int) ret1, _ := ret[1].(error) @@ -46,12 +46,12 @@ func (m *MockFakePublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.I } // GasPrice indicates an expected call of GasPrice -func (mr *MockFakePublicTransactionPoolAPIMockRecorder) GasPrice(ctx interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasPrice", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).GasPrice), ctx) +func (mr *MockPublicTransactionPoolAPIMockRecorder) GasPrice(ctx interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasPrice", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).GasPrice), ctx) } // EstimateGas mocks base method -func (m *MockFakePublicTransactionPoolAPI) EstimateGas(ctx context.Context, args CallArgs) (*hexutil.Big, error) { +func (m *MockPublicTransactionPoolAPI) EstimateGas(ctx context.Context, args CallArgs) (*hexutil.Big, error) { ret := m.ctrl.Call(m, "EstimateGas", ctx, args) ret0, _ := ret[0].(*hexutil.Big) ret1, _ := ret[1].(error) @@ -59,12 +59,12 @@ func (m *MockFakePublicTransactionPoolAPI) EstimateGas(ctx context.Context, args } // EstimateGas indicates an expected call of EstimateGas -func (mr *MockFakePublicTransactionPoolAPIMockRecorder) EstimateGas(ctx, args interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EstimateGas", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).EstimateGas), ctx, args) +func (mr *MockPublicTransactionPoolAPIMockRecorder) EstimateGas(ctx, args interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EstimateGas", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).EstimateGas), ctx, args) } // GetTransactionCount mocks base method -func (m *MockFakePublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Uint64, error) { +func (m *MockPublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Uint64, error) { ret := m.ctrl.Call(m, "GetTransactionCount", ctx, address, blockNr) ret0, _ := ret[0].(*hexutil.Uint64) ret1, _ := ret[1].(error) @@ -72,12 +72,12 @@ func (m *MockFakePublicTransactionPoolAPI) GetTransactionCount(ctx context.Conte } // GetTransactionCount indicates an expected call of GetTransactionCount -func (mr *MockFakePublicTransactionPoolAPIMockRecorder) GetTransactionCount(ctx, address, blockNr interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransactionCount", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).GetTransactionCount), ctx, address, blockNr) +func (mr *MockPublicTransactionPoolAPIMockRecorder) GetTransactionCount(ctx, address, blockNr interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransactionCount", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).GetTransactionCount), ctx, address, blockNr) } // SendRawTransaction mocks base method -func (m *MockFakePublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error) { +func (m *MockPublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error) { ret := m.ctrl.Call(m, "SendRawTransaction", ctx, encodedTx) ret0, _ := ret[0].(common.Hash) ret1, _ := ret[1].(error) @@ -85,6 +85,6 @@ func (m *MockFakePublicTransactionPoolAPI) SendRawTransaction(ctx context.Contex } // SendRawTransaction indicates an expected call of SendRawTransaction -func (mr *MockFakePublicTransactionPoolAPIMockRecorder) SendRawTransaction(ctx, encodedTx interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendRawTransaction", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).SendRawTransaction), ctx, encodedTx) +func (mr *MockPublicTransactionPoolAPIMockRecorder) SendRawTransaction(ctx, encodedTx interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendRawTransaction", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).SendRawTransaction), ctx, encodedTx) } diff --git a/geth/transactions/fake/txservice.go b/geth/transactions/fake/txservice.go index bcf3ac19267..bff63e6b5cb 100644 --- a/geth/transactions/fake/txservice.go +++ b/geth/transactions/fake/txservice.go @@ -11,9 +11,9 @@ import ( ) // NewTestServer returns a mocked test server -func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockFakePublicTransactionPoolAPI) { +func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockPublicTransactionPoolAPI) { srv := rpc.NewServer() - svc := NewMockFakePublicTransactionPoolAPI(ctrl) + svc := NewMockPublicTransactionPoolAPI(ctrl) if err := srv.RegisterName("eth", svc); err != nil { panic(err) } diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index d307951ffeb..b82f7b69c09 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -1,12 +1,9 @@ package transactions import ( - "strconv" - "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/transactions/queue" ) const ( @@ -30,10 +27,10 @@ const ( ) var txReturnCodes = map[error]int{ - nil: SendTransactionNoErrorCode, - keystore.ErrDecrypt: SendTransactionPasswordErrorCode, - queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, - queue.ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, + nil: SendTransactionNoErrorCode, + keystore.ErrDecrypt: SendTransactionPasswordErrorCode, + ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, + ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, } // SendTransactionEvent is a signal sent on a send transaction request @@ -61,17 +58,17 @@ type ReturnSendTransactionEvent struct { Args common.SendTxArgs `json:"args"` MessageID string `json:"message_id"` ErrorMessage string `json:"error_message"` - ErrorCode string `json:"error_code"` + ErrorCode int `json:"error_code,string"` } // NotifyOnReturn returns handler that processes responses from internal tx manager -func NotifyOnReturn(queuedTx *common.QueuedTx) { - // discard notifications with empty tx - if queuedTx == nil { +func NotifyOnReturn(queuedTx *common.QueuedTx, err error) { + // we don't want to notify a user if tx was sent successfully + if err == nil { return } - // we don't want to notify a user if tx sent successfully - if queuedTx.Err == nil { + // discard notifications with empty tx + if queuedTx == nil { return } signal.Send(signal.Envelope{ @@ -80,8 +77,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) { ID: string(queuedTx.ID), Args: queuedTx.Args, MessageID: common.MessageIDFromContext(queuedTx.Context), - ErrorMessage: queuedTx.Err.Error(), - ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)), + ErrorMessage: err.Error(), + ErrorCode: sendTransactionErrorCode(err), }, }) } diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index eea69544263..ed6602f730f 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -19,16 +19,12 @@ const ( ) var ( + // ErrQueuedTxExist - transaction was already enqueued + ErrQueuedTxExist = errors.New("transaction already exist in queue") //ErrQueuedTxIDNotFound - error transaction hash not found ErrQueuedTxIDNotFound = errors.New("transaction hash not found") - //ErrQueuedTxTimedOut - error transaction sending timed out - ErrQueuedTxTimedOut = errors.New("transaction sending timed out") - //ErrQueuedTxDiscarded - error transaction discarded - ErrQueuedTxDiscarded = errors.New("transaction has been discarded") - //ErrQueuedTxInProgress - error transaction in progress + //ErrQueuedTxInProgress - error transaction is in progress ErrQueuedTxInProgress = errors.New("transaction is in progress") - //ErrQueuedTxAlreadyProcessed - error transaction has already processed - ErrQueuedTxAlreadyProcessed = errors.New("transaction has been already processed") //ErrInvalidCompleteTxSender - error transaction with invalid sender ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") ) @@ -133,15 +129,18 @@ func (q *TxQueue) Reset() { // Enqueue enqueues incoming transaction func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) - if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) { - return ErrQueuedTxAlreadyProcessed + q.mu.RLock() + if _, ok := q.transactions[tx.ID]; ok { + q.mu.RUnlock() + return ErrQueuedTxExist } + q.mu.RUnlock() - log.Info("before enqueueTicker") + // we can't hold a lock in this part + log.Debug("notifying eviction loop") q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item - log.Info("before evictableIDs") - q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap - log.Info("after evictableIDs") + q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap + log.Debug("notified eviction loop") q.mu.Lock() q.transactions[tx.ID] = tx @@ -204,17 +203,15 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { delete(q.inprogress, tx.ID) - tx.Err = err // hash is updated only if err is nil, but transaction is not removed from a queue if err == nil { + q.transactions[tx.ID].Result <- common.TransactionResult{Hash: hash, Error: err} q.remove(tx.ID) - tx.Hash = hash - close(tx.Done) return } if _, transient := transientErrs[err.Error()]; !transient { + q.transactions[tx.ID].Result <- common.TransactionResult{Error: err} q.remove(tx.ID) - close(tx.Done) } } diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go index e156e6b9129..47a32ff5b1d 100644 --- a/geth/transactions/queue/queue_test.go +++ b/geth/transactions/queue/queue_test.go @@ -52,15 +52,13 @@ func (s *QueueTestSuite) TestGetTransaction() { } } -func (s *QueueTestSuite) TestEnqueueProcessedTransaction() { - // enqueue will fail if transaction with hash will be enqueued +func (s *QueueTestSuite) TestAlreadyEnqueued() { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Hash = gethcommon.Hash{1} - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) - + s.NoError(s.queue.Enqueue(tx)) + s.Equal(ErrQueuedTxExist, s.queue.Enqueue(tx)) + // try to enqueue another tx to double check locking tx = common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Err = errors.New("error") - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) + s.NoError(s.queue.Enqueue(tx)) } func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { @@ -73,12 +71,12 @@ func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.Queue func (s *QueueTestSuite) TestDoneSuccess() { hash := gethcommon.Hash{1} tx := s.testDone(hash, nil) - s.NoError(tx.Err) - s.Equal(hash, tx.Hash) - s.False(s.queue.Has(tx.ID)) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-tx.Result: + s.NoError(rst.Error) + s.Equal(hash, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } @@ -88,22 +86,22 @@ func (s *QueueTestSuite) TestDoneTransientError() { hash := gethcommon.Hash{1} err := keystore.ErrDecrypt tx := s.testDone(hash, err) - s.Equal(keystore.ErrDecrypt, tx.Err) - s.Equal(gethcommon.Hash{}, tx.Hash) s.True(s.queue.Has(tx.ID)) + _, inp := s.queue.inprogress[tx.ID] + s.False(inp) } func (s *QueueTestSuite) TestDoneError() { hash := gethcommon.Hash{1} err := errors.New("test") tx := s.testDone(hash, err) - s.Equal(err, tx.Err) - s.NotEqual(hash, tx.Hash) - s.Equal(gethcommon.Hash{}, tx.Hash) - s.False(s.queue.Has(tx.ID)) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-tx.Result: + s.Equal(err, rst.Error) + s.NotEqual(hash, rst.Hash) + s.Equal(gethcommon.Hash{}, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index 984e066fd39..502a6444c92 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -17,7 +17,7 @@ const ( // SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected. SendTxDefaultErrorCode = SendTransactionDefaultErrorCode // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). - DefaultTxSendCompletionTimeout = 300 + DefaultTxSendCompletionTimeout = 300 * time.Second defaultGas = 90000 defaultTimeout = time.Minute @@ -25,22 +25,24 @@ const ( // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { - nodeManager common.NodeManager - accountManager common.AccountManager - txQueue *queue.TxQueue - ethTxClient EthTransactor - addrLock *AddrLocker - notify bool + nodeManager common.NodeManager + accountManager common.AccountManager + txQueue *queue.TxQueue + ethTxClient EthTransactor + addrLock *AddrLocker + notify bool + completionTimeout time.Duration } // NewManager returns a new Manager. func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager { return &Manager{ - nodeManager: nodeManager, - accountManager: accountManager, - txQueue: queue.New(), - addrLock: &AddrLocker{}, - notify: true, + nodeManager: nodeManager, + accountManager: accountManager, + txQueue: queue.New(), + addrLock: &AddrLocker{}, + notify: true, + completionTimeout: DefaultTxSendCompletionTimeout, } } @@ -75,34 +77,41 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { to = tx.Args.To.Hex() } log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) - err := m.txQueue.Enqueue(tx) + if err := m.txQueue.Enqueue(tx); err != nil { + return err + } if m.notify { NotifyOnEnqueue(tx) } - return err + return nil } func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) { - m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck + if err := m.txQueue.Done(tx.ID, hash, err); err == queue.ErrQueuedTxIDNotFound { + log.Warn("transaction is already removed from a queue", tx.ID) + return + } if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, err) } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. -func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { +func (m *Manager) WaitForTransaction(tx *common.QueuedTx) common.TransactionResult { log.Info("wait for transaction", "id", tx.ID) // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out - select { - case <-tx.Done: - case <-time.After(DefaultTxSendCompletionTimeout * time.Second): - m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + for { + select { + case rst := <-tx.Result: + return rst + case <-time.After(m.completionTimeout): + m.txDone(tx, gethcommon.Hash{}, ErrQueuedTxTimedOut) + } } - return tx.Err } // CompleteTransaction instructs backend to complete sending of a given transaction. @@ -224,11 +233,11 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { - results := make(map[common.QueuedTxID]common.RawCompleteTransactionResult) +func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { + results := make(map[common.QueuedTxID]common.TransactionResult) for _, txID := range ids { txHash, txErr := m.CompleteTransaction(txID, password) - results[txID] = common.RawCompleteTransactionResult{ + results[txID] = common.TransactionResult{ Hash: txHash, Error: txErr, } @@ -242,9 +251,9 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { if err != nil { return err } - err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded) + err = m.txQueue.Done(id, gethcommon.Hash{}, ErrQueuedTxDiscarded) if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, ErrQueuedTxDiscarded) } return err } @@ -269,19 +278,16 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { log.Info("SendTransactionRPCHandler called") - // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // We should refactor parsing these params to a separate struct. rpcCall := common.RPCCall{Params: args} tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) - if err := m.QueueTransaction(tx); err != nil { return nil, err } - - if err := m.WaitForTransaction(tx); err != nil { - return nil, err + rst := m.WaitForTransaction(tx) + if rst.Error != nil { + return nil, rst.Error } - - return tx.Hash.Hex(), nil + return rst.Hash.Hex(), nil } diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go index 3fa4af31430..28e175748c5 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -36,7 +36,7 @@ type TxQueueTestSuite struct { server *gethrpc.Server client *gethrpc.Client txServiceMockCtrl *gomock.Controller - txServiceMock *fake.MockFakePublicTransactionPoolAPI + txServiceMock *fake.MockPublicTransactionPoolAPI } func (s *TxQueueTestSuite) SetupTest() { @@ -98,24 +98,26 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) w := make(chan struct{}) + var ( + hash gethcommon.Hash + err error + ) go func() { - hash, err := txQueueManager.CompleteTransaction(tx.ID, password) + hash, err = txQueueManager.CompleteTransaction(tx.ID, password) s.NoError(err) - s.Equal(tx.Hash, hash) close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) + s.Equal(hash, rst.Hash) } func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { @@ -141,8 +143,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) var ( wg sync.WaitGroup @@ -168,10 +169,9 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { }() } - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) @@ -197,10 +197,9 @@ func (s *TxQueueTestSuite) TestAccountMismatch() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) - _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) + _, err := txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) s.Equal(err, queue.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts @@ -227,10 +226,9 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) - _, err = txQueueManager.CompleteTransaction(tx.ID, password) + _, err := txQueueManager.CompleteTransaction(tx.ID, password) s.Equal(err.Error(), keystore.ErrDecrypt.Error()) // Transaction should stay in the queue as mismatched accounts @@ -250,20 +248,34 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) - + s.NoError(txQueueManager.QueueTransaction(tx)) w := make(chan struct{}) go func() { s.NoError(txQueueManager.DiscardTransaction(tx.ID)) close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.Equal(queue.ErrQueuedTxDiscarded, err) - // Check that error is assigned to the transaction. - s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) + rst := txQueueManager.WaitForTransaction(tx) + s.Equal(ErrQueuedTxDiscarded, rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) } + +func (s *TxQueueTestSuite) TestCompletionTimedOut() { + txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() + txQueueManager.completionTimeout = time.Nanosecond + + txQueueManager.Start() + defer txQueueManager.Stop() + + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ + From: common.FromAddress(TestConfig.Account1.Address), + To: common.ToAddress(TestConfig.Account2.Address), + }) + + s.NoError(txQueueManager.QueueTransaction(tx)) + rst := txQueueManager.WaitForTransaction(tx) + s.Equal(ErrQueuedTxTimedOut, rst.Error) +} diff --git a/lib/utils.go b/lib/utils.go index 216b5b0d584..8335f85ab1a 100644 --- a/lib/utils.go +++ b/lib/utils.go @@ -1049,7 +1049,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return @@ -1071,7 +1071,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != queue.ErrQueuedTxDiscarded { + if err != transactions.ErrQueuedTxDiscarded { t.Errorf("expected error not thrown: %v", err) return false } @@ -1136,7 +1136,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return @@ -1162,7 +1162,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != queue.ErrQueuedTxDiscarded { + if err != transactions.ErrQueuedTxDiscarded { t.Errorf("expected error not thrown: %v", err) return }