diff --git a/geth/api/backend.go b/geth/api/backend.go index 6b27d72a381..f402e1830be 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -206,16 +206,14 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA } tx := common.CreateTransaction(ctx, args) + c := m.txQueueManager.QueueTransaction(tx) - if err := m.txQueueManager.QueueTransaction(tx); err != nil { - return gethcommon.Hash{}, err + rst := m.txQueueManager.WaitForTransaction(tx, c) + if rst.Err != nil { + return gethcommon.Hash{}, rst.Err } - if err := m.txQueueManager.WaitForTransaction(tx); err != nil { - return gethcommon.Hash{}, err - } - - return tx.Hash, nil + return rst.Hash, nil } // CompleteTransaction instructs backend to complete sending of a given transaction diff --git a/geth/common/types.go b/geth/common/types.go index 0ab5e87fe1f..3158c0a733e 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -183,12 +183,14 @@ type QueuedTxID string // QueuedTx holds enough information to complete the queued transaction. type QueuedTx struct { ID QueuedTxID - Hash common.Hash Context context.Context Args SendTxArgs - Done chan struct{} - Discard chan struct{} - Err error +} + +type QueuedTxResult struct { + Hash common.Hash + Err error + Tx QueuedTx } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. @@ -232,7 +234,7 @@ type TxQueueManager interface { QueueTransaction(tx *QueuedTx) error // WaitForTransactions blocks until transaction is completed, discarded or timed out. - WaitForTransaction(tx *QueuedTx) error + WaitForTransaction(tx *QueuedTx, c <-chan QueuedTxResult) QueuedTxResult SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) diff --git a/geth/common/types_mock.go b/geth/common/types_mock.go index 6bc4dd9c814..27b90a43b6b 100644 --- a/geth/common/types_mock.go +++ b/geth/common/types_mock.go @@ -522,15 +522,15 @@ func (mr *MockTxQueueManagerMockRecorder) QueueTransaction(tx interface{}) *gomo } // WaitForTransaction mocks base method -func (m *MockTxQueueManager) WaitForTransaction(tx *QueuedTx) error { - ret := m.ctrl.Call(m, "WaitForTransaction", tx) - ret0, _ := ret[0].(error) +func (m *MockTxQueueManager) WaitForTransaction(tx *QueuedTx, c <-chan QueuedTxResult) QueuedTxResult { + ret := m.ctrl.Call(m, "WaitForTransaction", tx, c) + ret0, _ := ret[0].(QueuedTxResult) return ret0 } // WaitForTransaction indicates an expected call of WaitForTransaction -func (mr *MockTxQueueManagerMockRecorder) WaitForTransaction(tx interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).WaitForTransaction), tx) +func (mr *MockTxQueueManagerMockRecorder) WaitForTransaction(tx, c interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).WaitForTransaction), tx, c) } // SendTransactionRPCHandler mocks base method diff --git a/geth/common/utils.go b/geth/common/utils.go index 9d01681810d..d7e98719197 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -157,9 +157,7 @@ func Fatalf(reason interface{}, args ...interface{}) { func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { return &QueuedTx{ ID: QueuedTxID(uuid.New()), - Hash: common.Hash{}, Context: ctx, Args: args, - Done: make(chan struct{}, 1), } } diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index d6c58a126c4..a7cf4bb30a6 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -58,8 +58,8 @@ type ReturnSendTransactionEvent struct { } // NotifyOnReturn returns handler that processes responses from internal tx manager -func NotifyOnReturn(queuedTx *common.QueuedTx) { - if queuedTx.Err == nil { +func NotifyOnReturn(queuedTx *common.QueuedTx, err error) { + if err == nil { return } @@ -75,8 +75,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) { ID: string(queuedTx.ID), Args: queuedTx.Args, MessageID: common.MessageIDFromContext(queuedTx.Context), - ErrorMessage: queuedTx.Err.Error(), - ErrorCode: sendTransactionErrorCode(queuedTx.Err), + ErrorMessage: err.Error(), + ErrorCode: sendTransactionErrorCode(err), }, }) } diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index 2cc09193530..d02dbb09cd6 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -44,9 +44,10 @@ type empty struct{} // TxQueue is capped container that holds pending transactions type TxQueue struct { - mu sync.RWMutex // to guard transactions map - transactions map[common.QueuedTxID]*common.QueuedTx - inprogress map[common.QueuedTxID]empty + mu sync.RWMutex // to guard transactions map + transactions map[common.QueuedTxID]*common.QueuedTx + inprogress map[common.QueuedTxID]empty + subscriptions map[common.QueuedTxID]chan common.QueuedTxResult // TODO don't use another goroutine for eviction evictableIDs chan common.QueuedTxID @@ -63,6 +64,7 @@ func NewQueue() *TxQueue { return &TxQueue{ transactions: make(map[common.QueuedTxID]*common.QueuedTx), inprogress: make(map[common.QueuedTxID]empty), + subscriptions: make(map[common.QueuedTxID]chan common.QueuedTxResult), evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO enqueueTicker: make(chan struct{}), } @@ -130,11 +132,8 @@ func (q *TxQueue) Reset() { } // Enqueue enqueues incoming transaction -func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { +func (q *TxQueue) Enqueue(tx *common.QueuedTx) <-chan common.QueuedTxResult { log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) - if (tx.Hash != gethcommon.Hash{}) { - return ErrQueuedTxAlreadyProcessed - } log.Info("before enqueueTicker") q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item @@ -144,11 +143,13 @@ func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { q.mu.Lock() q.transactions[tx.ID] = tx + c := make(chan common.QueuedTxResult, 1) + q.subscriptions[tx.ID] = c q.mu.Unlock() // notify handler log.Info("calling txEnqueueHandler") - return nil + return c } // Get returns transaction by transaction identifier @@ -176,6 +177,7 @@ func (q *TxQueue) Remove(id common.QueuedTxID) { func (q *TxQueue) remove(id common.QueuedTxID) { delete(q.transactions, id) delete(q.inprogress, id) + delete(q.subscriptions, id) } // Done removes transaction from queue if no error or error is not transient @@ -193,19 +195,16 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { delete(q.inprogress, tx.ID) - tx.Err = err // hash is updated only if err is nil if err == nil { + q.subscriptions[tx.ID] <- common.QueuedTxResult{Hash: hash, Err: err} q.remove(tx.ID) - tx.Hash = hash - tx.Err = err - tx.Done <- struct{}{} return } _, transient := transientErrs[err.Error()] if !transient { + q.subscriptions[tx.ID] <- common.QueuedTxResult{Err: err} q.remove(tx.ID) - tx.Done <- struct{}{} } } diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go index 5f0f917e740..214c23aeea0 100644 --- a/geth/transactions/queue/queue_test.go +++ b/geth/transactions/queue/queue_test.go @@ -32,7 +32,7 @@ func (s *QueueTestSuite) TearDownTest() { func (s *QueueTestSuite) TestGetTransaction() { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) + s.queue.Enqueue(tx) enquedTx, err := s.queue.Get(tx.ID) s.NoError(err) s.Equal(tx, enquedTx) @@ -42,29 +42,22 @@ func (s *QueueTestSuite) TestGetTransaction() { s.Equal(ErrQueuedTxInProgress, err) } -func (s *QueueTestSuite) TestGetProcessedTransaction() { - // enqueue will fail if transaction with hash will be enqueued +func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) (*common.QueuedTx, <-chan common.QueuedTxResult) { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Hash = gethcommon.Hash{1} - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) -} - -func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { - tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) + c := s.queue.Enqueue(tx) s.NoError(s.queue.Done(tx.ID, hash, err)) - return tx + return tx, c } func (s *QueueTestSuite) TestDoneSuccess() { hash := gethcommon.Hash{1} - tx := s.testDone(hash, nil) - s.NoError(tx.Err) - s.Equal(hash, tx.Hash) - s.False(s.queue.Has(tx.ID)) + tx, c := s.testDone(hash, nil) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-c: + s.NoError(rst.Err) + s.Equal(hash, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } @@ -73,24 +66,23 @@ func (s *QueueTestSuite) TestDoneSuccess() { func (s *QueueTestSuite) TestDoneTransientError() { hash := gethcommon.Hash{1} err := keystore.ErrDecrypt - tx := s.testDone(hash, err) - s.Equal(keystore.ErrDecrypt, tx.Err) - s.NotEqual(hash, tx.Hash) - s.Equal(gethcommon.Hash{}, tx.Hash) + tx, _ := s.testDone(hash, err) s.True(s.queue.Has(tx.ID)) + _, inp := s.queue.inprogress[tx.ID] + s.False(inp) } func (s *QueueTestSuite) TestDoneError() { hash := gethcommon.Hash{1} err := errors.New("test") - tx := s.testDone(hash, err) - s.Equal(err, tx.Err) - s.NotEqual(hash, tx.Hash) - s.Equal(gethcommon.Hash{}, tx.Hash) - s.False(s.queue.Has(tx.ID)) + tx, c := s.testDone(hash, err) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-c: + s.Equal(err, rst.Err) + s.NotEqual(hash, rst.Hash) + s.Equal(gethcommon.Hash{}, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } @@ -99,7 +91,7 @@ func (s *QueueTestSuite) TestDoneError() { func (s QueueTestSuite) TestMultipleDone() { hash := gethcommon.Hash{1} err := keystore.ErrDecrypt - tx := s.testDone(hash, err) + tx, _ := s.testDone(hash, err) s.NoError(s.queue.Done(tx.ID, hash, nil)) s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout"))) } @@ -111,11 +103,11 @@ func (s *QueueTestSuite) TestEviction() { if first == nil { first = tx } - s.NoError(s.queue.Enqueue(tx)) + s.queue.Enqueue(tx) } s.Equal(DefaultTxQueueCap, s.queue.Count()) tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) + s.queue.Enqueue(tx) s.Equal(DefaultTxQueueCap, s.queue.Count()) s.False(s.queue.Has(first.ID)) } diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index c973abdda02..f93a614481b 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -25,22 +25,24 @@ const ( // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { - nodeManager common.NodeManager - accountManager common.AccountManager - txQueue *queue.TxQueue - ethTxClient EthTransactor - addrLock *AddrLocker - notify bool + nodeManager common.NodeManager + accountManager common.AccountManager + txQueue *queue.TxQueue + ethTxClient EthTransactor + addrLock *AddrLocker + notify bool + sendCompletionTimeout time.Duration } // NewManager returns a new Manager. func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager { return &Manager{ - nodeManager: nodeManager, - accountManager: accountManager, - txQueue: queue.NewQueue(), - addrLock: &AddrLocker{}, - notify: true, + nodeManager: nodeManager, + accountManager: accountManager, + txQueue: queue.NewQueue(), + addrLock: &AddrLocker{}, + notify: true, + sendCompletionTimeout: DefaultTxSendCompletionTimeout * time.Second, } } @@ -69,40 +71,42 @@ func (m *Manager) TransactionQueue() common.TxQueue { } // QueueTransaction puts a transaction into the queue. -func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { +func (m *Manager) QueueTransaction(tx *common.QueuedTx) <-chan common.QueuedTxResult { to := "" if tx.Args.To != nil { to = tx.Args.To.Hex() } log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) - err := m.txQueue.Enqueue(tx) + c := m.txQueue.Enqueue(tx) if m.notify { NotifyOnEnqueue(tx) } - return err + return c } func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) { m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, err) } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. -func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { +func (m *Manager) WaitForTransaction(tx *common.QueuedTx, c <-chan common.QueuedTxResult) common.QueuedTxResult { log.Info("wait for transaction", "id", tx.ID) // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out - select { - case <-tx.Done: - case <-time.After(DefaultTxSendCompletionTimeout * time.Second): - m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + for { + select { + case rst := <-c: + return rst + case <-time.After(m.sendCompletionTimeout): + m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + } } - return tx.Err } // CompleteTransaction instructs backend to complete sending of a given transaction. @@ -240,7 +244,7 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { } err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded) if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, queue.ErrQueuedTxDiscarded) } return err } @@ -265,19 +269,15 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { log.Info("SendTransactionRPCHandler called") - // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // We should refactor parsing these params to a separate struct. rpcCall := common.RPCCall{Params: args} tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) - - if err := m.QueueTransaction(tx); err != nil { - return nil, err + c := m.QueueTransaction(tx) + rst := m.WaitForTransaction(tx, c) + if rst.Err != nil { + return nil, rst.Err } - - if err := m.WaitForTransaction(tx); err != nil { - return nil, err - } - - return tx.Hash.Hex(), nil + // handle empty hash + return rst.Hash.Hex(), nil } diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go index 3e0af0efdaa..5c781671ae1 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -5,6 +5,7 @@ import ( "math/big" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/keystore" gethcommon "github.com/ethereum/go-ethereum/common" @@ -98,8 +99,7 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + c := txQueueManager.QueueTransaction(tx) w := make(chan struct{}) go func() { @@ -108,10 +108,9 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx, c) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Err) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) <-w @@ -141,8 +140,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + c := txQueueManager.QueueTransaction(tx) var wg sync.WaitGroup var mu sync.Mutex @@ -166,10 +164,9 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { }() } - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx, c) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Err) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) @@ -195,10 +192,9 @@ func (s *TxQueueTestSuite) TestAccountMismatch() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + txQueueManager.QueueTransaction(tx) - _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) + _, err := txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) s.Equal(err, queue.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts @@ -230,10 +226,9 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + txQueueManager.QueueTransaction(tx) - _, err = txQueueManager.CompleteTransaction(tx.ID, password) + _, err := txQueueManager.CompleteTransaction(tx.ID, password) s.Equal(err.Error(), keystore.ErrDecrypt.Error()) // Transaction should stay in the queue as mismatched accounts @@ -253,8 +248,7 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + c := txQueueManager.QueueTransaction(tx) w := make(chan struct{}) go func() { discardErr := txQueueManager.DiscardTransaction(tx.ID) @@ -262,11 +256,27 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.Equal(queue.ErrQueuedTxDiscarded, err) - // Check that error is assigned to the transaction. - s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) + rst := txQueueManager.WaitForTransaction(tx, c) + s.Equal(queue.ErrQueuedTxDiscarded, rst.Err) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) <-w } + +func (s *TxQueueTestSuite) TestCompletionTimedOut() { + txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() + txQueueManager.sendCompletionTimeout = time.Nanosecond + + txQueueManager.Start() + defer txQueueManager.Stop() + + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ + From: common.FromAddress(TestConfig.Account1.Address), + To: common.ToAddress(TestConfig.Account2.Address), + }) + + c := txQueueManager.QueueTransaction(tx) + rst := txQueueManager.WaitForTransaction(tx, c) + s.Equal(queue.ErrQueuedTxTimedOut, rst.Err) +}