diff --git a/geth/api/backend.go b/geth/api/backend.go index 6b27d72a381..ad563c33a15 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string { } // SendTransaction creates a new transaction and waits until it's complete. -func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) { +func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) { if ctx == nil { ctx = context.Background() } - tx := common.CreateTransaction(ctx, args) - if err := m.txQueueManager.QueueTransaction(tx); err != nil { - return gethcommon.Hash{}, err + return hash, err } - - if err := m.txQueueManager.WaitForTransaction(tx); err != nil { - return gethcommon.Hash{}, err + rst := m.txQueueManager.WaitForTransaction(tx) + if rst.Error != nil { + return hash, rst.Error } - - return tx.Hash, nil + return rst.Hash, nil } // CompleteTransaction instructs backend to complete sending of a given transaction diff --git a/geth/common/types.go b/geth/common/types.go index 812fc66ba29..23a3962ca5d 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -158,11 +158,9 @@ type QueuedTxID string // QueuedTx holds enough information to complete the queued transaction. type QueuedTx struct { ID QueuedTxID - Hash common.Hash Context context.Context Args SendTxArgs - Done chan struct{} - Err error + Result chan RawCompleteTransactionResult } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. @@ -206,7 +204,7 @@ type TxQueueManager interface { QueueTransaction(tx *QueuedTx) error // WaitForTransactions blocks until transaction is completed, discarded or timed out. - WaitForTransaction(tx *QueuedTx) error + WaitForTransaction(tx *QueuedTx) RawCompleteTransactionResult SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) diff --git a/geth/common/types_mock.go b/geth/common/types_mock.go index 756bef6e2fa..e4fdf592af0 100644 --- a/geth/common/types_mock.go +++ b/geth/common/types_mock.go @@ -534,9 +534,9 @@ func (mr *MockTxQueueManagerMockRecorder) QueueTransaction(tx interface{}) *gomo } // WaitForTransaction mocks base method -func (m *MockTxQueueManager) WaitForTransaction(tx *QueuedTx) error { +func (m *MockTxQueueManager) WaitForTransaction(tx *QueuedTx) RawCompleteTransactionResult { ret := m.ctrl.Call(m, "WaitForTransaction", tx) - ret0, _ := ret[0].(error) + ret0, _ := ret[0].(RawCompleteTransactionResult) return ret0 } @@ -612,16 +612,6 @@ func (mr *MockTxQueueManagerMockRecorder) DiscardTransactions(ids interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscardTransactions", reflect.TypeOf((*MockTxQueueManager)(nil).DiscardTransactions), ids) } -// DisableNotificactions mocks base method -func (m *MockTxQueueManager) DisableNotificactions() { - m.ctrl.Call(m, "DisableNotificactions") -} - -// DisableNotificactions indicates an expected call of DisableNotificactions -func (mr *MockTxQueueManagerMockRecorder) DisableNotificactions() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableNotificactions", reflect.TypeOf((*MockTxQueueManager)(nil).DisableNotificactions)) -} - // MockJailCell is a mock of JailCell interface type MockJailCell struct { ctrl *gomock.Controller diff --git a/geth/common/utils.go b/geth/common/utils.go index c4ad1d152b9..3500fb36e74 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) { func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { return &QueuedTx{ ID: QueuedTxID(uuid.New()), - Hash: common.Hash{}, Context: ctx, Args: args, - Done: make(chan struct{}), + Result: make(chan RawCompleteTransactionResult, 1), } } diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index d307951ffeb..be327ef68d3 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -65,13 +65,13 @@ type ReturnSendTransactionEvent struct { } // NotifyOnReturn returns handler that processes responses from internal tx manager -func NotifyOnReturn(queuedTx *common.QueuedTx) { - // discard notifications with empty tx - if queuedTx == nil { +func NotifyOnReturn(queuedTx *common.QueuedTx, err error) { + // we don't want to notify a user if tx wassent successfully + if err == nil { return } - // we don't want to notify a user if tx sent successfully - if queuedTx.Err == nil { + // discard notifications with empty tx + if queuedTx == nil { return } signal.Send(signal.Envelope{ @@ -80,8 +80,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) { ID: string(queuedTx.ID), Args: queuedTx.Args, MessageID: common.MessageIDFromContext(queuedTx.Context), - ErrorMessage: queuedTx.Err.Error(), - ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)), + ErrorMessage: err.Error(), + ErrorCode: strconv.Itoa(sendTransactionErrorCode(err)), }, }) } diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index eea69544263..b9b44e34cb1 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -133,9 +133,6 @@ func (q *TxQueue) Reset() { // Enqueue enqueues incoming transaction func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) - if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) { - return ErrQueuedTxAlreadyProcessed - } log.Info("before enqueueTicker") q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item @@ -204,17 +201,15 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { delete(q.inprogress, tx.ID) - tx.Err = err // hash is updated only if err is nil, but transaction is not removed from a queue if err == nil { + q.transactions[tx.ID].Result <- common.RawCompleteTransactionResult{Hash: hash, Error: err} q.remove(tx.ID) - tx.Hash = hash - close(tx.Done) return } if _, transient := transientErrs[err.Error()]; !transient { + q.transactions[tx.ID].Result <- common.RawCompleteTransactionResult{Error: err} q.remove(tx.ID) - close(tx.Done) } } diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go index e156e6b9129..e02b9e65041 100644 --- a/geth/transactions/queue/queue_test.go +++ b/geth/transactions/queue/queue_test.go @@ -52,17 +52,6 @@ func (s *QueueTestSuite) TestGetTransaction() { } } -func (s *QueueTestSuite) TestEnqueueProcessedTransaction() { - // enqueue will fail if transaction with hash will be enqueued - tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Hash = gethcommon.Hash{1} - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) - - tx = common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Err = errors.New("error") - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) -} - func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) s.NoError(s.queue.Enqueue(tx)) @@ -73,12 +62,12 @@ func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.Queue func (s *QueueTestSuite) TestDoneSuccess() { hash := gethcommon.Hash{1} tx := s.testDone(hash, nil) - s.NoError(tx.Err) - s.Equal(hash, tx.Hash) - s.False(s.queue.Has(tx.ID)) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-tx.Result: + s.NoError(rst.Error) + s.Equal(hash, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } @@ -88,22 +77,22 @@ func (s *QueueTestSuite) TestDoneTransientError() { hash := gethcommon.Hash{1} err := keystore.ErrDecrypt tx := s.testDone(hash, err) - s.Equal(keystore.ErrDecrypt, tx.Err) - s.Equal(gethcommon.Hash{}, tx.Hash) s.True(s.queue.Has(tx.ID)) + _, inp := s.queue.inprogress[tx.ID] + s.False(inp) } func (s *QueueTestSuite) TestDoneError() { hash := gethcommon.Hash{1} err := errors.New("test") tx := s.testDone(hash, err) - s.Equal(err, tx.Err) - s.NotEqual(hash, tx.Hash) - s.Equal(gethcommon.Hash{}, tx.Hash) - s.False(s.queue.Has(tx.ID)) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-tx.Result: + s.Equal(err, rst.Error) + s.NotEqual(hash, rst.Hash) + s.Equal(gethcommon.Hash{}, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index bb1ade9c450..fdd23d062a5 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -25,22 +25,24 @@ const ( // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { - nodeManager common.NodeManager - accountManager common.AccountManager - txQueue *queue.TxQueue - ethTxClient EthTransactor - addrLock *AddrLocker - notify bool + nodeManager common.NodeManager + accountManager common.AccountManager + txQueue *queue.TxQueue + ethTxClient EthTransactor + addrLock *AddrLocker + notify bool + sendCompletionTimeout time.Duration } // NewManager returns a new Manager. func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager { return &Manager{ - nodeManager: nodeManager, - accountManager: accountManager, - txQueue: queue.New(), - addrLock: &AddrLocker{}, - notify: true, + nodeManager: nodeManager, + accountManager: accountManager, + txQueue: queue.New(), + addrLock: &AddrLocker{}, + notify: true, + sendCompletionTimeout: DefaultTxSendCompletionTimeout * time.Second, } } @@ -75,34 +77,38 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { to = tx.Args.To.Hex() } log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) - err := m.txQueue.Enqueue(tx) + if err := m.txQueue.Enqueue(tx); err != nil { + return err + } if m.notify { NotifyOnEnqueue(tx) } - return err + return nil } func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) { m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, err) } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. -func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { +func (m *Manager) WaitForTransaction(tx *common.QueuedTx) common.RawCompleteTransactionResult { log.Info("wait for transaction", "id", tx.ID) // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out - select { - case <-tx.Done: - case <-time.After(DefaultTxSendCompletionTimeout * time.Second): - m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + for { + select { + case rst := <-tx.Result: + return rst + case <-time.After(m.sendCompletionTimeout): + m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + } } - return tx.Err } // CompleteTransaction instructs backend to complete sending of a given transaction. @@ -244,7 +250,7 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { } err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded) if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, queue.ErrQueuedTxDiscarded) } return err } @@ -269,19 +275,17 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { log.Info("SendTransactionRPCHandler called") - // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // We should refactor parsing these params to a separate struct. rpcCall := common.RPCCall{Params: args} tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) - if err := m.QueueTransaction(tx); err != nil { return nil, err } - - if err := m.WaitForTransaction(tx); err != nil { - return nil, err + rst := m.WaitForTransaction(tx) + if rst.Error != nil { + return nil, rst.Error } - - return tx.Hash.Hex(), nil + // handle empty hash + return rst.Hash.Hex(), nil } diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go index 3fa4af31430..6002ab6eaa9 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -98,24 +98,26 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) w := make(chan struct{}) + var ( + hash gethcommon.Hash + err error + ) go func() { - hash, err := txQueueManager.CompleteTransaction(tx.ID, password) + hash, err = txQueueManager.CompleteTransaction(tx.ID, password) s.NoError(err) - s.Equal(tx.Hash, hash) close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) + s.Equal(hash, rst.Hash) } func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { @@ -141,8 +143,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) var ( wg sync.WaitGroup @@ -168,10 +169,9 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { }() } - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) @@ -197,10 +197,9 @@ func (s *TxQueueTestSuite) TestAccountMismatch() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + txQueueManager.QueueTransaction(tx) - _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) + _, err := txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) s.Equal(err, queue.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts @@ -227,10 +226,9 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + txQueueManager.QueueTransaction(tx) - _, err = txQueueManager.CompleteTransaction(tx.ID, password) + _, err := txQueueManager.CompleteTransaction(tx.ID, password) s.Equal(err.Error(), keystore.ErrDecrypt.Error()) // Transaction should stay in the queue as mismatched accounts @@ -250,20 +248,34 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) - + s.NoError(txQueueManager.QueueTransaction(tx)) w := make(chan struct{}) go func() { s.NoError(txQueueManager.DiscardTransaction(tx.ID)) close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.Equal(queue.ErrQueuedTxDiscarded, err) - // Check that error is assigned to the transaction. - s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) + rst := txQueueManager.WaitForTransaction(tx) + s.Equal(queue.ErrQueuedTxDiscarded, rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) } + +func (s *TxQueueTestSuite) TestCompletionTimedOut() { + txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() + txQueueManager.sendCompletionTimeout = time.Nanosecond + + txQueueManager.Start() + defer txQueueManager.Stop() + + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ + From: common.FromAddress(TestConfig.Account1.Address), + To: common.ToAddress(TestConfig.Account2.Address), + }) + + s.NoError(txQueueManager.QueueTransaction(tx)) + rst := txQueueManager.WaitForTransaction(tx) + s.Equal(queue.ErrQueuedTxTimedOut, rst.Error) +}