Skip to content

Commit

Permalink
Result of tx processing returned as QueuedTxResult
Browse files Browse the repository at this point in the history
Currently it is quite easy to introduce concurrency issues while working
with transaction object. For example, race issue will exist every time
while transaction is processed in a separate goroutine and caller will
try to check for an error before event to Done channel is sent.

This change removes all the data that is updated on transaction and leaves
it with ID, Args and Context (which is not used at the moment).
  • Loading branch information
dshulyak committed Jan 10, 2018
1 parent 39d03d8 commit aa15a9f
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 99 deletions.
15 changes: 6 additions & 9 deletions geth/api/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string {
}

// SendTransaction creates a new transaction and waits until it's complete.
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) {
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) {
if ctx == nil {
ctx = context.Background()
}

tx := common.CreateTransaction(ctx, args)

if err := m.txQueueManager.QueueTransaction(tx); err != nil {
return gethcommon.Hash{}, err
return hash, err
}

if err := m.txQueueManager.WaitForTransaction(tx); err != nil {
return gethcommon.Hash{}, err
rst := m.txQueueManager.WaitForTransaction(tx)
if rst.Err != nil {
return hash, rst.Err
}

return tx.Hash, nil
return rst.Hash, nil
}

// CompleteTransaction instructs backend to complete sending of a given transaction
Expand Down
12 changes: 7 additions & 5 deletions geth/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,14 @@ type QueuedTxID string
// QueuedTx holds enough information to complete the queued transaction.
type QueuedTx struct {
ID QueuedTxID
Hash common.Hash
Context context.Context
Args SendTxArgs
Done chan struct{}
Discard chan struct{}
Err error
Result chan QueuedTxResult
}

type QueuedTxResult struct {
Hash common.Hash
Err error
}

// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
Expand Down Expand Up @@ -232,7 +234,7 @@ type TxQueueManager interface {
QueueTransaction(tx *QueuedTx) error

// WaitForTransactions blocks until transaction is completed, discarded or timed out.
WaitForTransaction(tx *QueuedTx) error
WaitForTransaction(tx *QueuedTx) QueuedTxResult

SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error)

Expand Down
4 changes: 2 additions & 2 deletions geth/common/types_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions geth/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) {
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx {
return &QueuedTx{
ID: QueuedTxID(uuid.New()),
Hash: common.Hash{},
Context: ctx,
Args: args,
Done: make(chan struct{}, 1),
Result: make(chan QueuedTxResult, 1),
}
}
8 changes: 4 additions & 4 deletions geth/transactions/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type ReturnSendTransactionEvent struct {
}

// NotifyOnReturn returns handler that processes responses from internal tx manager
func NotifyOnReturn(queuedTx *common.QueuedTx) {
if queuedTx.Err == nil {
func NotifyOnReturn(queuedTx *common.QueuedTx, err error) {
if err == nil {
return
}

Expand All @@ -75,8 +75,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) {
ID: string(queuedTx.ID),
Args: queuedTx.Args,
MessageID: common.MessageIDFromContext(queuedTx.Context),
ErrorMessage: queuedTx.Err.Error(),
ErrorCode: sendTransactionErrorCode(queuedTx.Err),
ErrorMessage: err.Error(),
ErrorCode: sendTransactionErrorCode(err),
},
})
}
Expand Down
10 changes: 2 additions & 8 deletions geth/transactions/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ func (q *TxQueue) Reset() {
// Enqueue enqueues incoming transaction
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
if (tx.Hash != gethcommon.Hash{}) {
return ErrQueuedTxAlreadyProcessed
}

log.Info("before enqueueTicker")
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
Expand Down Expand Up @@ -193,19 +190,16 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er

func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
delete(q.inprogress, tx.ID)
tx.Err = err
// hash is updated only if err is nil
if err == nil {
q.transactions[tx.ID].Result <- common.QueuedTxResult{Hash: hash, Err: err}
q.remove(tx.ID)
tx.Hash = hash
tx.Err = err
tx.Done <- struct{}{}
return
}
_, transient := transientErrs[err.Error()]
if !transient {
q.transactions[tx.ID].Result <- common.QueuedTxResult{Err: err}
q.remove(tx.ID)
tx.Done <- struct{}{}
}
}

Expand Down
30 changes: 11 additions & 19 deletions geth/transactions/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ func (s *QueueTestSuite) TestGetTransaction() {
s.Equal(ErrQueuedTxInProgress, err)
}

func (s *QueueTestSuite) TestGetProcessedTransaction() {
// enqueue will fail if transaction with hash will be enqueued
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
tx.Hash = gethcommon.Hash{1}
s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx))
}

func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
Expand All @@ -59,12 +52,12 @@ func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.Queue
func (s *QueueTestSuite) TestDoneSuccess() {
hash := gethcommon.Hash{1}
tx := s.testDone(hash, nil)
s.NoError(tx.Err)
s.Equal(hash, tx.Hash)
s.False(s.queue.Has(tx.ID))
// event is sent only if transaction was removed from a queue
select {
case <-tx.Done:
case rst := <-tx.Result:
s.NoError(rst.Err)
s.Equal(hash, rst.Hash)
s.False(s.queue.Has(tx.ID))
default:
s.Fail("No event was sent to Done channel")
}
Expand All @@ -74,23 +67,22 @@ func (s *QueueTestSuite) TestDoneTransientError() {
hash := gethcommon.Hash{1}
err := keystore.ErrDecrypt
tx := s.testDone(hash, err)
s.Equal(keystore.ErrDecrypt, tx.Err)
s.NotEqual(hash, tx.Hash)
s.Equal(gethcommon.Hash{}, tx.Hash)
s.True(s.queue.Has(tx.ID))
_, inp := s.queue.inprogress[tx.ID]
s.False(inp)
}

func (s *QueueTestSuite) TestDoneError() {
hash := gethcommon.Hash{1}
err := errors.New("test")
tx := s.testDone(hash, err)
s.Equal(err, tx.Err)
s.NotEqual(hash, tx.Hash)
s.Equal(gethcommon.Hash{}, tx.Hash)
s.False(s.queue.Has(tx.ID))
// event is sent only if transaction was removed from a queue
select {
case <-tx.Done:
case rst := <-tx.Result:
s.Equal(err, rst.Err)
s.NotEqual(hash, rst.Hash)
s.Equal(gethcommon.Hash{}, rst.Hash)
s.False(s.queue.Has(tx.ID))
default:
s.Fail("No event was sent to Done channel")
}
Expand Down
60 changes: 32 additions & 28 deletions geth/transactions/txqueue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ const (

// Manager provides means to manage internal Status Backend (injected into LES)
type Manager struct {
nodeManager common.NodeManager
accountManager common.AccountManager
txQueue *queue.TxQueue
ethTxClient EthTransactor
addrLock *AddrLocker
notify bool
nodeManager common.NodeManager
accountManager common.AccountManager
txQueue *queue.TxQueue
ethTxClient EthTransactor
addrLock *AddrLocker
notify bool
sendCompletionTimeout time.Duration
}

// NewManager returns a new Manager.
func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager {
return &Manager{
nodeManager: nodeManager,
accountManager: accountManager,
txQueue: queue.NewQueue(),
addrLock: &AddrLocker{},
notify: true,
nodeManager: nodeManager,
accountManager: accountManager,
txQueue: queue.NewQueue(),
addrLock: &AddrLocker{},
notify: true,
sendCompletionTimeout: DefaultTxSendCompletionTimeout * time.Second,
}
}

Expand Down Expand Up @@ -75,34 +77,38 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error {
to = tx.Args.To.Hex()
}
log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to)
err := m.txQueue.Enqueue(tx)
if err := m.txQueue.Enqueue(tx); err != nil {
return err
}
if m.notify {
NotifyOnEnqueue(tx)
}
return err
return nil
}

func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck
if m.notify {
NotifyOnReturn(tx)
NotifyOnReturn(tx, err)
}
}

// WaitForTransaction adds a transaction to the queue and blocks
// until it's completed, discarded or times out.
func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error {
func (m *Manager) WaitForTransaction(tx *common.QueuedTx) common.QueuedTxResult {
log.Info("wait for transaction", "id", tx.ID)
// now wait up until transaction is:
// - completed (via CompleteQueuedTransaction),
// - discarded (via DiscardQueuedTransaction)
// - or times out
select {
case <-tx.Done:
case <-time.After(DefaultTxSendCompletionTimeout * time.Second):
m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut)
for {
select {
case rst := <-tx.Result:
return rst
case <-time.After(m.sendCompletionTimeout):
m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut)
}
}
return tx.Err
}

// CompleteTransaction instructs backend to complete sending of a given transaction.
Expand Down Expand Up @@ -240,7 +246,7 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error {
}
err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded)
if m.notify {
NotifyOnReturn(tx)
NotifyOnReturn(tx, queue.ErrQueuedTxDiscarded)
}
return err
}
Expand All @@ -265,19 +271,17 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued
// It accepts one param which is a slice with a map of transaction params.
func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
log.Info("SendTransactionRPCHandler called")

// TODO(adam): it's a hack to parse arguments as common.RPCCall can do that.
// We should refactor parsing these params to a separate struct.
rpcCall := common.RPCCall{Params: args}
tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs())

if err := m.QueueTransaction(tx); err != nil {
return nil, err
}

if err := m.WaitForTransaction(tx); err != nil {
return nil, err
rst := m.WaitForTransaction(tx)
if rst.Err != nil {
return nil, rst.Err
}

return tx.Hash.Hex(), nil
// handle empty hash
return rst.Hash.Hex(), nil
}
Loading

0 comments on commit aa15a9f

Please sign in to comment.