From 06d0ec1dbd05d4131049ba008ef4d1934429badd Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 5 Jan 2018 22:58:17 +0200 Subject: [PATCH] Result of tx processing returned as QueuedTxResult Currently it is quite easy to introduce concurrency issues while working with transaction object. For example, race issue will exist every time while transaction is processed in a separate goroutine and caller will try to check for an error before event to Done channel is sent. This change removes all the data that is updated on transaction and leaves it with ID, Args and Context (which is not used at the moment). --- geth/api/backend.go | 15 +++--- geth/common/types.go | 6 +-- geth/common/types_mock.go | 14 +---- geth/common/utils.go | 3 +- geth/transactions/notifications.go | 8 ++- geth/transactions/queue/queue.go | 9 +--- geth/transactions/queue/queue_test.go | 33 ++++-------- geth/transactions/txqueue_manager.go | 60 ++++++++++++---------- geth/transactions/txqueue_manager_test.go | 62 ++++++++++++++--------- 9 files changed, 99 insertions(+), 111 deletions(-) diff --git a/geth/api/backend.go b/geth/api/backend.go index 6b27d72a381..ad563c33a15 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string { } // SendTransaction creates a new transaction and waits until it's complete. -func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) { +func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) { if ctx == nil { ctx = context.Background() } - tx := common.CreateTransaction(ctx, args) - if err := m.txQueueManager.QueueTransaction(tx); err != nil { - return gethcommon.Hash{}, err + return hash, err } - - if err := m.txQueueManager.WaitForTransaction(tx); err != nil { - return gethcommon.Hash{}, err + rst := m.txQueueManager.WaitForTransaction(tx) + if rst.Error != nil { + return hash, rst.Error } - - return tx.Hash, nil + return rst.Hash, nil } // CompleteTransaction instructs backend to complete sending of a given transaction diff --git a/geth/common/types.go b/geth/common/types.go index 812fc66ba29..23a3962ca5d 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -158,11 +158,9 @@ type QueuedTxID string // QueuedTx holds enough information to complete the queued transaction. type QueuedTx struct { ID QueuedTxID - Hash common.Hash Context context.Context Args SendTxArgs - Done chan struct{} - Err error + Result chan RawCompleteTransactionResult } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. @@ -206,7 +204,7 @@ type TxQueueManager interface { QueueTransaction(tx *QueuedTx) error // WaitForTransactions blocks until transaction is completed, discarded or timed out. - WaitForTransaction(tx *QueuedTx) error + WaitForTransaction(tx *QueuedTx) RawCompleteTransactionResult SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) diff --git a/geth/common/types_mock.go b/geth/common/types_mock.go index 756bef6e2fa..e4fdf592af0 100644 --- a/geth/common/types_mock.go +++ b/geth/common/types_mock.go @@ -534,9 +534,9 @@ func (mr *MockTxQueueManagerMockRecorder) QueueTransaction(tx interface{}) *gomo } // WaitForTransaction mocks base method -func (m *MockTxQueueManager) WaitForTransaction(tx *QueuedTx) error { +func (m *MockTxQueueManager) WaitForTransaction(tx *QueuedTx) RawCompleteTransactionResult { ret := m.ctrl.Call(m, "WaitForTransaction", tx) - ret0, _ := ret[0].(error) + ret0, _ := ret[0].(RawCompleteTransactionResult) return ret0 } @@ -612,16 +612,6 @@ func (mr *MockTxQueueManagerMockRecorder) DiscardTransactions(ids interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscardTransactions", reflect.TypeOf((*MockTxQueueManager)(nil).DiscardTransactions), ids) } -// DisableNotificactions mocks base method -func (m *MockTxQueueManager) DisableNotificactions() { - m.ctrl.Call(m, "DisableNotificactions") -} - -// DisableNotificactions indicates an expected call of DisableNotificactions -func (mr *MockTxQueueManagerMockRecorder) DisableNotificactions() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableNotificactions", reflect.TypeOf((*MockTxQueueManager)(nil).DisableNotificactions)) -} - // MockJailCell is a mock of JailCell interface type MockJailCell struct { ctrl *gomock.Controller diff --git a/geth/common/utils.go b/geth/common/utils.go index c4ad1d152b9..3500fb36e74 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) { func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { return &QueuedTx{ ID: QueuedTxID(uuid.New()), - Hash: common.Hash{}, Context: ctx, Args: args, - Done: make(chan struct{}), + Result: make(chan RawCompleteTransactionResult, 1), } } diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index d307951ffeb..9cd3f080ce9 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -65,7 +65,11 @@ type ReturnSendTransactionEvent struct { } // NotifyOnReturn returns handler that processes responses from internal tx manager -func NotifyOnReturn(queuedTx *common.QueuedTx) { +func NotifyOnReturn(queuedTx *common.QueuedTx, err error) { + // we don't want to notify a user if tx wassent successfully + if err == nil { + return + } // discard notifications with empty tx if queuedTx == nil { return @@ -80,7 +84,7 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) { ID: string(queuedTx.ID), Args: queuedTx.Args, MessageID: common.MessageIDFromContext(queuedTx.Context), - ErrorMessage: queuedTx.Err.Error(), + ErrorMessage: err.Error(), ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)), }, }) diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index eea69544263..b9b44e34cb1 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -133,9 +133,6 @@ func (q *TxQueue) Reset() { // Enqueue enqueues incoming transaction func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) - if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) { - return ErrQueuedTxAlreadyProcessed - } log.Info("before enqueueTicker") q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item @@ -204,17 +201,15 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { delete(q.inprogress, tx.ID) - tx.Err = err // hash is updated only if err is nil, but transaction is not removed from a queue if err == nil { + q.transactions[tx.ID].Result <- common.RawCompleteTransactionResult{Hash: hash, Error: err} q.remove(tx.ID) - tx.Hash = hash - close(tx.Done) return } if _, transient := transientErrs[err.Error()]; !transient { + q.transactions[tx.ID].Result <- common.RawCompleteTransactionResult{Error: err} q.remove(tx.ID) - close(tx.Done) } } diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go index e156e6b9129..e02b9e65041 100644 --- a/geth/transactions/queue/queue_test.go +++ b/geth/transactions/queue/queue_test.go @@ -52,17 +52,6 @@ func (s *QueueTestSuite) TestGetTransaction() { } } -func (s *QueueTestSuite) TestEnqueueProcessedTransaction() { - // enqueue will fail if transaction with hash will be enqueued - tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Hash = gethcommon.Hash{1} - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) - - tx = common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Err = errors.New("error") - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) -} - func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) s.NoError(s.queue.Enqueue(tx)) @@ -73,12 +62,12 @@ func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.Queue func (s *QueueTestSuite) TestDoneSuccess() { hash := gethcommon.Hash{1} tx := s.testDone(hash, nil) - s.NoError(tx.Err) - s.Equal(hash, tx.Hash) - s.False(s.queue.Has(tx.ID)) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-tx.Result: + s.NoError(rst.Error) + s.Equal(hash, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } @@ -88,22 +77,22 @@ func (s *QueueTestSuite) TestDoneTransientError() { hash := gethcommon.Hash{1} err := keystore.ErrDecrypt tx := s.testDone(hash, err) - s.Equal(keystore.ErrDecrypt, tx.Err) - s.Equal(gethcommon.Hash{}, tx.Hash) s.True(s.queue.Has(tx.ID)) + _, inp := s.queue.inprogress[tx.ID] + s.False(inp) } func (s *QueueTestSuite) TestDoneError() { hash := gethcommon.Hash{1} err := errors.New("test") tx := s.testDone(hash, err) - s.Equal(err, tx.Err) - s.NotEqual(hash, tx.Hash) - s.Equal(gethcommon.Hash{}, tx.Hash) - s.False(s.queue.Has(tx.ID)) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-tx.Result: + s.Equal(err, rst.Error) + s.NotEqual(hash, rst.Hash) + s.Equal(gethcommon.Hash{}, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index bb1ade9c450..fdd23d062a5 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -25,22 +25,24 @@ const ( // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { - nodeManager common.NodeManager - accountManager common.AccountManager - txQueue *queue.TxQueue - ethTxClient EthTransactor - addrLock *AddrLocker - notify bool + nodeManager common.NodeManager + accountManager common.AccountManager + txQueue *queue.TxQueue + ethTxClient EthTransactor + addrLock *AddrLocker + notify bool + sendCompletionTimeout time.Duration } // NewManager returns a new Manager. func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager { return &Manager{ - nodeManager: nodeManager, - accountManager: accountManager, - txQueue: queue.New(), - addrLock: &AddrLocker{}, - notify: true, + nodeManager: nodeManager, + accountManager: accountManager, + txQueue: queue.New(), + addrLock: &AddrLocker{}, + notify: true, + sendCompletionTimeout: DefaultTxSendCompletionTimeout * time.Second, } } @@ -75,34 +77,38 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { to = tx.Args.To.Hex() } log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) - err := m.txQueue.Enqueue(tx) + if err := m.txQueue.Enqueue(tx); err != nil { + return err + } if m.notify { NotifyOnEnqueue(tx) } - return err + return nil } func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) { m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, err) } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. -func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { +func (m *Manager) WaitForTransaction(tx *common.QueuedTx) common.RawCompleteTransactionResult { log.Info("wait for transaction", "id", tx.ID) // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out - select { - case <-tx.Done: - case <-time.After(DefaultTxSendCompletionTimeout * time.Second): - m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + for { + select { + case rst := <-tx.Result: + return rst + case <-time.After(m.sendCompletionTimeout): + m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + } } - return tx.Err } // CompleteTransaction instructs backend to complete sending of a given transaction. @@ -244,7 +250,7 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { } err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded) if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, queue.ErrQueuedTxDiscarded) } return err } @@ -269,19 +275,17 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { log.Info("SendTransactionRPCHandler called") - // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // We should refactor parsing these params to a separate struct. rpcCall := common.RPCCall{Params: args} tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) - if err := m.QueueTransaction(tx); err != nil { return nil, err } - - if err := m.WaitForTransaction(tx); err != nil { - return nil, err + rst := m.WaitForTransaction(tx) + if rst.Error != nil { + return nil, rst.Error } - - return tx.Hash.Hex(), nil + // handle empty hash + return rst.Hash.Hex(), nil } diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go index 3fa4af31430..6002ab6eaa9 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -98,24 +98,26 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) w := make(chan struct{}) + var ( + hash gethcommon.Hash + err error + ) go func() { - hash, err := txQueueManager.CompleteTransaction(tx.ID, password) + hash, err = txQueueManager.CompleteTransaction(tx.ID, password) s.NoError(err) - s.Equal(tx.Hash, hash) close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) + s.Equal(hash, rst.Hash) } func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { @@ -141,8 +143,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) var ( wg sync.WaitGroup @@ -168,10 +169,9 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { }() } - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) @@ -197,10 +197,9 @@ func (s *TxQueueTestSuite) TestAccountMismatch() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + txQueueManager.QueueTransaction(tx) - _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) + _, err := txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) s.Equal(err, queue.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts @@ -227,10 +226,9 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + txQueueManager.QueueTransaction(tx) - _, err = txQueueManager.CompleteTransaction(tx.ID, password) + _, err := txQueueManager.CompleteTransaction(tx.ID, password) s.Equal(err.Error(), keystore.ErrDecrypt.Error()) // Transaction should stay in the queue as mismatched accounts @@ -250,20 +248,34 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) - + s.NoError(txQueueManager.QueueTransaction(tx)) w := make(chan struct{}) go func() { s.NoError(txQueueManager.DiscardTransaction(tx.ID)) close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.Equal(queue.ErrQueuedTxDiscarded, err) - // Check that error is assigned to the transaction. - s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) + rst := txQueueManager.WaitForTransaction(tx) + s.Equal(queue.ErrQueuedTxDiscarded, rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) } + +func (s *TxQueueTestSuite) TestCompletionTimedOut() { + txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() + txQueueManager.sendCompletionTimeout = time.Nanosecond + + txQueueManager.Start() + defer txQueueManager.Stop() + + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ + From: common.FromAddress(TestConfig.Account1.Address), + To: common.ToAddress(TestConfig.Account2.Address), + }) + + s.NoError(txQueueManager.QueueTransaction(tx)) + rst := txQueueManager.WaitForTransaction(tx) + s.Equal(queue.ErrQueuedTxTimedOut, rst.Error) +}