Skip to content

Commit

Permalink
General refactoring of txqueue and manager
Browse files Browse the repository at this point in the history
Changes to TxQueue:
- factored out into a separate module to ensure that only exported methods will be used
- deleted EnqueuAsync as it is not used, it is always possible to restore it from git, if it
  will be ever needed
- notifications moved to a manager, with the goal to simplify txqueue and separate concerns
  between manager and txqueue
- simplified API for processing transactions
  After analyzing code I removed all redundant methods and currently
  all the processing can be done with Get and Done methods.
- added inprogress map to store transactions that are taken for processing
  and removed Inprogress variable from transaction, the goal is to cleanup tx structure
- removed Discard channel from transaction, transaction.Err can be used to understand
  if transaction was discarded or failed due to another error
- transient errors stored as global variable of queue module for simplicity, it is unlikely
  that they will become dynamic

Change to Manager:
- simplified code that manages notifications, all handlers were removed and notifiers
  refactored to be simple functions that are called in two places, when transaction
  is queued and when manager finished waiting for transaction
- notifications can be turned of by calling DisableNotifications on manager
- made a function out of CreateTransaction method, as it can be used as a simple utility
  • Loading branch information
dshulyak committed Jan 4, 2018
1 parent 6567159 commit a88f9eb
Show file tree
Hide file tree
Showing 17 changed files with 431 additions and 533 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mock: ##@other Regenerate mocks
mockgen -source=geth/mailservice/mailservice.go -destination=geth/mailservice/mailservice_mock.go -package=mailservice
mockgen -source=geth/common/notification.go -destination=geth/common/notification_mock.go -package=common -imports fcm=github.com/NaySoftware/go-fcm
mockgen -source=geth/notification/fcm/client.go -destination=geth/notification/fcm/client_mock.go -package=fcm -imports fcm=github.com/NaySoftware/go-fcm
mockgen -source=geth/txqueue/fake/txservice.go -destination=geth/txqueue/fake/mock.go -package=fake
mockgen -source=geth/transactions/fake/txservice.go -destination=geth/transactions/fake/mock.go -package=fake

test: test-unit-coverage ##@tests Run basic, short tests during development

Expand Down
6 changes: 3 additions & 3 deletions e2e/jail/jail_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue"
"github.com/status-im/status-go/geth/transactions"
. "github.com/status-im/status-go/testing"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() {
unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent)

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"])

Expand Down Expand Up @@ -282,7 +282,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() {
s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))

Expand Down
80 changes: 45 additions & 35 deletions e2e/transactions/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"reflect"
"sync"
"testing"
"time"

Expand All @@ -18,7 +19,8 @@ import (
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue"
"github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/geth/transactions/queue"
. "github.com/status-im/status-go/testing"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -48,7 +50,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() {
err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err)

if sg.Type == txqueue.EventTransactionQueued {
if sg.Type == transactions.EventTransactionQueued {
event := sg.Event.(map[string]interface{})
txID := event["id"].(string)
txHash, err = s.Backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
Expand Down Expand Up @@ -100,7 +102,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() {
err := json.Unmarshal([]byte(rawSignal), &signalEnvelope)
s.NoError(err)

if signalEnvelope.Type == txqueue.EventTransactionQueued {
if signalEnvelope.Type == transactions.EventTransactionQueued {
event := signalEnvelope.Event.(map[string]interface{})
txID := event["id"].(string)

Expand Down Expand Up @@ -156,7 +158,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() {
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))

Expand All @@ -182,7 +184,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() {
)
s.EqualError(
err,
txqueue.ErrInvalidCompleteTxSender.Error(),
queue.ErrInvalidCompleteTxSender.Error(),
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
)

Expand Down Expand Up @@ -247,7 +249,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))

Expand All @@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
common.QueuedTxID(event["id"].(string)), TestConfig.Account1.Password)
s.EqualError(
err,
txqueue.ErrInvalidCompleteTxSender.Error(),
queue.ErrInvalidCompleteTxSender.Error(),
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
)

Expand Down Expand Up @@ -330,7 +332,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() {
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent)

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))

Expand Down Expand Up @@ -387,7 +389,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be failed and completed on the second call)", "id", txID)
Expand All @@ -407,7 +409,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
close(completeQueuedTransaction)
}

if envelope.Type == txqueue.EventTransactionFailed {
if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))

Expand Down Expand Up @@ -466,7 +468,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be discarded soon)", "id", txID)
Expand All @@ -488,12 +490,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
close(completeQueuedTransaction)
}

if envelope.Type == txqueue.EventTransactionFailed {
if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))

receivedErrMessage := event["error_message"].(string)
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)

receivedErrCode := event["error_code"].(string)
Expand All @@ -509,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")

select {
case <-completeQueuedTransaction:
Expand Down Expand Up @@ -543,7 +545,7 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))

if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID)
Expand Down Expand Up @@ -640,7 +642,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err)
if envelope.Type == txqueue.EventTransactionQueued {
if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be discarded soon)", "id", txID)
Expand All @@ -650,12 +652,12 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
txIDs <- txID
}

if envelope.Type == txqueue.EventTransactionFailed {
if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))

receivedErrMessage := event["error_message"].(string)
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)

receivedErrCode := event["error_code"].(string)
Expand All @@ -675,7 +677,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error())
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error())

s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't")
}
Expand Down Expand Up @@ -747,7 +749,7 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() {
// try completing non-existing transaction
_, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password)
s.Error(err, "error expected and not received")
s.EqualError(err, txqueue.ErrQueuedTxIDNotFound.Error())
s.EqualError(err, queue.ErrQueuedTxIDNotFound.Error())
}

func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
Expand All @@ -756,6 +758,24 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {

backend := s.LightEthereumService().StatusBackend
s.NotNil(backend)
var m sync.Mutex
txCount := 0
txIDs := [queue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}

signal.SetDefaultNodeNotificationHandler(func(rawSignal string) {
var sg signal.Envelope
err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err)

if sg.Type == transactions.EventTransactionQueued {
event := sg.Event.(map[string]interface{})
txID := event["id"].(string)
m.Lock()
txIDs[txCount] = common.QueuedTxID(txID)
txCount++
m.Unlock()
}
})

// reset queue
s.Backend.TxQueueManager().TransactionQueue().Reset()
Expand All @@ -764,36 +784,26 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
s.NoError(s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))

txQueue := s.Backend.TxQueueManager().TransactionQueue()
var i = 0
txIDs := [txqueue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}
s.Backend.TxQueueManager().SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
log.Info("tx enqueued", "i", i+1, "queue size", txQueue.Count(), "id", queuedTx.ID)
txIDs[i] = queuedTx.ID
i++
})

s.Zero(txQueue.Count(), "transaction count should be zero")

for j := 0; j < 10; j++ {
go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck
}
time.Sleep(2 * time.Second) // FIXME(tiabc): more reliable synchronization to ensure all transactions are enqueued

log.Info(fmt.Sprintf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d",
i, txqueue.DefaultTxQueueCap, txQueue.Count()))
time.Sleep(2 * time.Second)

log.Info(fmt.Sprintf("Number of transactions sent: %d. Queue size (shouldn't be more than %d): %d",
txCount, queue.DefaultTxQueueCap, txQueue.Count()))
s.Equal(10, txQueue.Count(), "transaction count should be 10")

for i := 0; i < txqueue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
for i := 0; i < queue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck
}
time.Sleep(5 * time.Second)

s.True(txQueue.Count() <= txqueue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", txqueue.DefaultTxQueueCap, txqueue.DefaultTxQueueCap-1, txQueue.Count())
s.True(txQueue.Count() <= queue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", queue.DefaultTxQueueCap, queue.DefaultTxQueueCap-1, txQueue.Count())

for _, txID := range txIDs {
txQueue.Remove(txID)
}

s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count())
}
12 changes: 3 additions & 9 deletions geth/api/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/status-im/status-go/geth/notification/fcm"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue"
"github.com/status-im/status-go/geth/transactions"
)

const (
Expand All @@ -38,7 +38,7 @@ func NewStatusBackend() *StatusBackend {

nodeManager := node.NewNodeManager()
accountManager := account.NewManager(nodeManager)
txQueueManager := txqueue.NewManager(nodeManager, accountManager)
txQueueManager := transactions.NewManager(nodeManager, accountManager)
jailManager := jail.New(nodeManager)
notificationManager := fcm.NewNotification(fcmServerKey)

Expand Down Expand Up @@ -205,7 +205,7 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA
ctx = context.Background()
}

tx := m.txQueueManager.CreateTransaction(ctx, args)
tx := common.CreateTransaction(ctx, args)

if err := m.txQueueManager.QueueTransaction(tx); err != nil {
return gethcommon.Hash{}, err
Expand Down Expand Up @@ -247,11 +247,5 @@ func (m *StatusBackend) registerHandlers() error {

rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler())
rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler)
m.txQueueManager.SetTransactionQueueHandler(m.txQueueManager.TransactionQueueHandler())
log.Info("Registered handler", "fn", "TransactionQueueHandler")

m.txQueueManager.SetTransactionReturnHandler(m.txQueueManager.TransactionReturnHandler())
log.Info("Registered handler", "fn", "TransactionReturnHandler")

return nil
}
Loading

0 comments on commit a88f9eb

Please sign in to comment.