Skip to content

Commit

Permalink
Result of tx processing returned as QueuedTxResult
Browse files Browse the repository at this point in the history
Currently it is quite easy to introduce concurrency issues while working
with transaction object. For example, race issue will exist every time
while transaction is processed in a separate goroutine and caller will
try to check for an error before event to Done channel is sent.

Current change removes all the data that is updated on transaction and leaves
it with ID, Args and Context (which is not used at the moment).
Result of transaction will be sent to a channel that is returned when transaction
is enqueued. QueuedTxResult has a Hash and Err fields.
  • Loading branch information
dshulyak committed Jan 6, 2018
1 parent 39d03d8 commit 2c91b13
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 119 deletions.
12 changes: 5 additions & 7 deletions geth/api/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,14 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA
}

tx := common.CreateTransaction(ctx, args)
c := m.txQueueManager.QueueTransaction(tx)

if err := m.txQueueManager.QueueTransaction(tx); err != nil {
return gethcommon.Hash{}, err
rst := m.txQueueManager.WaitForTransaction(tx, c)
if rst.Err != nil {
return gethcommon.Hash{}, rst.Err
}

if err := m.txQueueManager.WaitForTransaction(tx); err != nil {
return gethcommon.Hash{}, err
}

return tx.Hash, nil
return rst.Hash, nil
}

// CompleteTransaction instructs backend to complete sending of a given transaction
Expand Down
12 changes: 7 additions & 5 deletions geth/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,14 @@ type QueuedTxID string
// QueuedTx holds enough information to complete the queued transaction.
type QueuedTx struct {
ID QueuedTxID
Hash common.Hash
Context context.Context
Args SendTxArgs
Done chan struct{}
Discard chan struct{}
Err error
}

type QueuedTxResult struct {
Hash common.Hash
Err error
Tx QueuedTx
}

// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
Expand Down Expand Up @@ -232,7 +234,7 @@ type TxQueueManager interface {
QueueTransaction(tx *QueuedTx) error

// WaitForTransactions blocks until transaction is completed, discarded or timed out.
WaitForTransaction(tx *QueuedTx) error
WaitForTransaction(tx *QueuedTx, c <-chan QueuedTxResult) QueuedTxResult

SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error)

Expand Down
10 changes: 5 additions & 5 deletions geth/common/types_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions geth/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,7 @@ func Fatalf(reason interface{}, args ...interface{}) {
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx {
return &QueuedTx{
ID: QueuedTxID(uuid.New()),
Hash: common.Hash{},
Context: ctx,
Args: args,
Done: make(chan struct{}, 1),
}
}
8 changes: 4 additions & 4 deletions geth/transactions/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type ReturnSendTransactionEvent struct {
}

// NotifyOnReturn returns handler that processes responses from internal tx manager
func NotifyOnReturn(queuedTx *common.QueuedTx) {
if queuedTx.Err == nil {
func NotifyOnReturn(queuedTx *common.QueuedTx, err error) {
if err == nil {
return
}

Expand All @@ -75,8 +75,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) {
ID: string(queuedTx.ID),
Args: queuedTx.Args,
MessageID: common.MessageIDFromContext(queuedTx.Context),
ErrorMessage: queuedTx.Err.Error(),
ErrorCode: sendTransactionErrorCode(queuedTx.Err),
ErrorMessage: err.Error(),
ErrorCode: sendTransactionErrorCode(err),
},
})
}
Expand Down
25 changes: 12 additions & 13 deletions geth/transactions/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ type empty struct{}

// TxQueue is capped container that holds pending transactions
type TxQueue struct {
mu sync.RWMutex // to guard transactions map
transactions map[common.QueuedTxID]*common.QueuedTx
inprogress map[common.QueuedTxID]empty
mu sync.RWMutex // to guard transactions map
transactions map[common.QueuedTxID]*common.QueuedTx
inprogress map[common.QueuedTxID]empty
subscriptions map[common.QueuedTxID]chan common.QueuedTxResult

// TODO don't use another goroutine for eviction
evictableIDs chan common.QueuedTxID
Expand All @@ -63,6 +64,7 @@ func NewQueue() *TxQueue {
return &TxQueue{
transactions: make(map[common.QueuedTxID]*common.QueuedTx),
inprogress: make(map[common.QueuedTxID]empty),
subscriptions: make(map[common.QueuedTxID]chan common.QueuedTxResult),
evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO
enqueueTicker: make(chan struct{}),
}
Expand Down Expand Up @@ -130,11 +132,8 @@ func (q *TxQueue) Reset() {
}

// Enqueue enqueues incoming transaction
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
func (q *TxQueue) Enqueue(tx *common.QueuedTx) <-chan common.QueuedTxResult {
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
if (tx.Hash != gethcommon.Hash{}) {
return ErrQueuedTxAlreadyProcessed
}

log.Info("before enqueueTicker")
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
Expand All @@ -144,11 +143,13 @@ func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {

q.mu.Lock()
q.transactions[tx.ID] = tx
c := make(chan common.QueuedTxResult, 1)
q.subscriptions[tx.ID] = c
q.mu.Unlock()

// notify handler
log.Info("calling txEnqueueHandler")
return nil
return c
}

// Get returns transaction by transaction identifier
Expand Down Expand Up @@ -176,6 +177,7 @@ func (q *TxQueue) Remove(id common.QueuedTxID) {
func (q *TxQueue) remove(id common.QueuedTxID) {
delete(q.transactions, id)
delete(q.inprogress, id)
delete(q.subscriptions, id)
}

// Done removes transaction from queue if no error or error is not transient
Expand All @@ -193,19 +195,16 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er

func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
delete(q.inprogress, tx.ID)
tx.Err = err
// hash is updated only if err is nil
if err == nil {
q.subscriptions[tx.ID] <- common.QueuedTxResult{Hash: hash, Err: err}
q.remove(tx.ID)
tx.Hash = hash
tx.Err = err
tx.Done <- struct{}{}
return
}
_, transient := transientErrs[err.Error()]
if !transient {
q.subscriptions[tx.ID] <- common.QueuedTxResult{Err: err}
q.remove(tx.ID)
tx.Done <- struct{}{}
}
}

Expand Down
50 changes: 21 additions & 29 deletions geth/transactions/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *QueueTestSuite) TearDownTest() {

func (s *QueueTestSuite) TestGetTransaction() {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
s.queue.Enqueue(tx)
enquedTx, err := s.queue.Get(tx.ID)
s.NoError(err)
s.Equal(tx, enquedTx)
Expand All @@ -42,29 +42,22 @@ func (s *QueueTestSuite) TestGetTransaction() {
s.Equal(ErrQueuedTxInProgress, err)
}

func (s *QueueTestSuite) TestGetProcessedTransaction() {
// enqueue will fail if transaction with hash will be enqueued
func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) (*common.QueuedTx, <-chan common.QueuedTxResult) {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
tx.Hash = gethcommon.Hash{1}
s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx))
}

func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
c := s.queue.Enqueue(tx)
s.NoError(s.queue.Done(tx.ID, hash, err))
return tx
return tx, c
}

func (s *QueueTestSuite) TestDoneSuccess() {
hash := gethcommon.Hash{1}
tx := s.testDone(hash, nil)
s.NoError(tx.Err)
s.Equal(hash, tx.Hash)
s.False(s.queue.Has(tx.ID))
tx, c := s.testDone(hash, nil)
// event is sent only if transaction was removed from a queue
select {
case <-tx.Done:
case rst := <-c:
s.NoError(rst.Err)
s.Equal(hash, rst.Hash)
s.False(s.queue.Has(tx.ID))
default:
s.Fail("No event was sent to Done channel")
}
Expand All @@ -73,24 +66,23 @@ func (s *QueueTestSuite) TestDoneSuccess() {
func (s *QueueTestSuite) TestDoneTransientError() {
hash := gethcommon.Hash{1}
err := keystore.ErrDecrypt
tx := s.testDone(hash, err)
s.Equal(keystore.ErrDecrypt, tx.Err)
s.NotEqual(hash, tx.Hash)
s.Equal(gethcommon.Hash{}, tx.Hash)
tx, _ := s.testDone(hash, err)
s.True(s.queue.Has(tx.ID))
_, inp := s.queue.inprogress[tx.ID]
s.False(inp)
}

func (s *QueueTestSuite) TestDoneError() {
hash := gethcommon.Hash{1}
err := errors.New("test")
tx := s.testDone(hash, err)
s.Equal(err, tx.Err)
s.NotEqual(hash, tx.Hash)
s.Equal(gethcommon.Hash{}, tx.Hash)
s.False(s.queue.Has(tx.ID))
tx, c := s.testDone(hash, err)
// event is sent only if transaction was removed from a queue
select {
case <-tx.Done:
case rst := <-c:
s.Equal(err, rst.Err)
s.NotEqual(hash, rst.Hash)
s.Equal(gethcommon.Hash{}, rst.Hash)
s.False(s.queue.Has(tx.ID))
default:
s.Fail("No event was sent to Done channel")
}
Expand All @@ -99,7 +91,7 @@ func (s *QueueTestSuite) TestDoneError() {
func (s QueueTestSuite) TestMultipleDone() {
hash := gethcommon.Hash{1}
err := keystore.ErrDecrypt
tx := s.testDone(hash, err)
tx, _ := s.testDone(hash, err)
s.NoError(s.queue.Done(tx.ID, hash, nil))
s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout")))
}
Expand All @@ -111,11 +103,11 @@ func (s *QueueTestSuite) TestEviction() {
if first == nil {
first = tx
}
s.NoError(s.queue.Enqueue(tx))
s.queue.Enqueue(tx)
}
s.Equal(DefaultTxQueueCap, s.queue.Count())
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
s.queue.Enqueue(tx)
s.Equal(DefaultTxQueueCap, s.queue.Count())
s.False(s.queue.Has(first.ID))
}
Loading

0 comments on commit 2c91b13

Please sign in to comment.