Skip to content

Commit

Permalink
Result of tx processing returned as QueuedTxResult
Browse files Browse the repository at this point in the history
Currently it is quite easy to introduce concurrency issues while working
with transaction object. For example, race issue will exist every time
while transaction is processed in a separate goroutine and caller will
try to check for an error before event to Done channel is sent.

This change removes all the data that is updated on transaction and leaves
it with ID, Args and Context (which is not used at the moment).

Signed-off-by: Dmitry Shulyak <[email protected]>
  • Loading branch information
dshulyak committed Feb 2, 2018
1 parent 8b56060 commit 653da5b
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 162 deletions.
8 changes: 4 additions & 4 deletions e2e/transactions/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
log.Info("transaction return event received", "id", event["id"].(string))

receivedErrMessage := event["error_message"].(string)
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)

receivedErrCode := event["error_code"].(string)
Expand All @@ -511,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
s.EqualError(err, transactions.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")

select {
case <-completeQueuedTransaction:
Expand Down Expand Up @@ -659,7 +659,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
log.Info("transaction return event received", "id", event["id"].(string))

receivedErrMessage := event["error_message"].(string)
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)

receivedErrCode := event["error_code"].(string)
Expand All @@ -681,7 +681,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
require.EqualError(err, queue.ErrQueuedTxDiscarded.Error())
require.EqualError(err, transactions.ErrQueuedTxDiscarded.Error())
require.Equal(gethcommon.Hash{}, txHashCheck, "transaction returned hash, while it shouldn't")
}

Expand Down
2 changes: 1 addition & 1 deletion geth/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (api *StatusAPI) CompleteTransaction(id common.QueuedTxID, password string)
}

// CompleteTransactions instructs backend to complete sending of multiple transactions
func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult {
return api.b.txQueueManager.CompleteTransactions(ids, password)
}

Expand Down
19 changes: 8 additions & 11 deletions geth/api/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string {
}

// SendTransaction creates a new transaction and waits until it's complete.
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) {
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) {
if ctx == nil {
ctx = context.Background()
}

tx := common.CreateTransaction(ctx, args)

if err := m.txQueueManager.QueueTransaction(tx); err != nil {
return gethcommon.Hash{}, err
if err = m.txQueueManager.QueueTransaction(tx); err != nil {
return hash, err
}

if err := m.txQueueManager.WaitForTransaction(tx); err != nil {
return gethcommon.Hash{}, err
rst := m.txQueueManager.WaitForTransaction(tx)
if rst.Error != nil {
return hash, rst.Error
}

return tx.Hash, nil
return rst.Hash, nil
}

// CompleteTransaction instructs backend to complete sending of a given transaction
Expand All @@ -224,7 +221,7 @@ func (m *StatusBackend) CompleteTransaction(id common.QueuedTxID, password strin
}

// CompleteTransactions instructs backend to complete sending of multiple transactions
func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult {
return m.txQueueManager.CompleteTransactions(ids, password)
}

Expand Down
8 changes: 3 additions & 5 deletions geth/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ type AccountManager interface {
AddressToDecryptedAccount(address, password string) (accounts.Account, *keystore.Key, error)
}

// RawCompleteTransactionResult is a JSON returned from transaction complete function (used internally)
type RawCompleteTransactionResult struct {
// TransactionResult is a JSON returned from transaction complete function (used internally)
type TransactionResult struct {
Hash common.Hash
Error error
}
Expand All @@ -158,11 +158,9 @@ type QueuedTxID string
// QueuedTx holds enough information to complete the queued transaction.
type QueuedTx struct {
ID QueuedTxID
Hash common.Hash
Context context.Context
Args SendTxArgs
Done chan struct{}
Err error
Result chan TransactionResult
}

// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
Expand Down
3 changes: 1 addition & 2 deletions geth/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) {
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx {
return &QueuedTx{
ID: QueuedTxID(uuid.New()),
Hash: common.Hash{},
Context: ctx,
Args: args,
Done: make(chan struct{}),
Result: make(chan TransactionResult, 1),
}
}
10 changes: 10 additions & 0 deletions geth/transactions/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package transactions

import "errors"

var (
//ErrQueuedTxTimedOut - error transaction sending timed out
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
//ErrQueuedTxDiscarded - error transaction discarded
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
)
46 changes: 23 additions & 23 deletions geth/transactions/fake/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions geth/transactions/fake/txservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

// NewTestServer returns a mocked test server
func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockFakePublicTransactionPoolAPI) {
func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockPublicTransactionPoolAPI) {
srv := rpc.NewServer()
svc := NewMockFakePublicTransactionPoolAPI(ctrl)
svc := NewMockPublicTransactionPoolAPI(ctrl)
if err := srv.RegisterName("eth", svc); err != nil {
panic(err)
}
Expand Down
27 changes: 12 additions & 15 deletions geth/transactions/notifications.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package transactions

import (
"strconv"

"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/transactions/queue"
)

const (
Expand All @@ -30,10 +27,10 @@ const (
)

var txReturnCodes = map[error]int{
nil: SendTransactionNoErrorCode,
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
queue.ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
nil: SendTransactionNoErrorCode,
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
}

// SendTransactionEvent is a signal sent on a send transaction request
Expand Down Expand Up @@ -61,17 +58,17 @@ type ReturnSendTransactionEvent struct {
Args common.SendTxArgs `json:"args"`
MessageID string `json:"message_id"`
ErrorMessage string `json:"error_message"`
ErrorCode string `json:"error_code"`
ErrorCode int `json:"error_code,string"`
}

// NotifyOnReturn returns handler that processes responses from internal tx manager
func NotifyOnReturn(queuedTx *common.QueuedTx) {
// discard notifications with empty tx
if queuedTx == nil {
func NotifyOnReturn(queuedTx *common.QueuedTx, err error) {
// we don't want to notify a user if tx was sent successfully
if err == nil {
return
}
// we don't want to notify a user if tx sent successfully
if queuedTx.Err == nil {
// discard notifications with empty tx
if queuedTx == nil {
return
}
signal.Send(signal.Envelope{
Expand All @@ -80,8 +77,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) {
ID: string(queuedTx.ID),
Args: queuedTx.Args,
MessageID: common.MessageIDFromContext(queuedTx.Context),
ErrorMessage: queuedTx.Err.Error(),
ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)),
ErrorMessage: err.Error(),
ErrorCode: sendTransactionErrorCode(err),
},
})
}
Expand Down
31 changes: 14 additions & 17 deletions geth/transactions/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@ const (
)

var (
// ErrQueuedTxExist - transaction was already enqueued
ErrQueuedTxExist = errors.New("transaction already exist in queue")
//ErrQueuedTxIDNotFound - error transaction hash not found
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
//ErrQueuedTxTimedOut - error transaction sending timed out
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
//ErrQueuedTxDiscarded - error transaction discarded
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
//ErrQueuedTxInProgress - error transaction in progress
//ErrQueuedTxInProgress - error transaction is in progress
ErrQueuedTxInProgress = errors.New("transaction is in progress")
//ErrQueuedTxAlreadyProcessed - error transaction has already processed
ErrQueuedTxAlreadyProcessed = errors.New("transaction has been already processed")
//ErrInvalidCompleteTxSender - error transaction with invalid sender
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
)
Expand Down Expand Up @@ -133,15 +129,18 @@ func (q *TxQueue) Reset() {
// Enqueue enqueues incoming transaction
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) {
return ErrQueuedTxAlreadyProcessed
q.mu.RLock()
if _, ok := q.transactions[tx.ID]; ok {
q.mu.RUnlock()
return ErrQueuedTxExist
}
q.mu.RUnlock()

log.Info("before enqueueTicker")
// we can't hold a lock in this part
log.Debug("notifying eviction loop")
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
log.Info("before evictableIDs")
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
log.Info("after evictableIDs")
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
log.Debug("notified eviction loop")

q.mu.Lock()
q.transactions[tx.ID] = tx
Expand Down Expand Up @@ -204,17 +203,15 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er

func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
delete(q.inprogress, tx.ID)
tx.Err = err
// hash is updated only if err is nil, but transaction is not removed from a queue
if err == nil {
q.transactions[tx.ID].Result <- common.TransactionResult{Hash: hash, Error: err}
q.remove(tx.ID)
tx.Hash = hash
close(tx.Done)
return
}
if _, transient := transientErrs[err.Error()]; !transient {
q.transactions[tx.ID].Result <- common.TransactionResult{Error: err}
q.remove(tx.ID)
close(tx.Done)
}
}

Expand Down
Loading

0 comments on commit 653da5b

Please sign in to comment.