-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring of TxQueue and Manager #530
Conversation
6e2128b
to
fe16b03
Compare
4ada6ca
to
775fb83
Compare
@adambabik this change is ready for review, please take a look when you will have time. it is based on previous pr, so all of the relevant changes are in a88f9eb |
@dshulyak thanks for PR. I'm going to review it today or tomorrow. The only thing that immediately brings attention is the size of the PR. Large PRs take more times to review. If there is a chance to split it into a few smaller PRs that would be great. If not, please wait until at least 2 approves. |
Tests failed with |
Added one more commit on top of previous that fixes remaining data races in transactions module 39d03d8 |
|
||
log.Info(fmt.Sprintf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d", | ||
i, txqueue.DefaultTxQueueCap, txQueue.Count())) | ||
time.Sleep(2 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment is lost, restore
geth/common/types.go
Outdated
Context context.Context | ||
Args SendTxArgs | ||
Done chan struct{} | ||
Discard chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove discard
geth/common/types.go
Outdated
@@ -272,6 +247,9 @@ type TxQueueManager interface { | |||
|
|||
// DiscardTransactions discards given multiple transactions from transaction queue | |||
DiscardTransactions(ids []QueuedTxID) map[QueuedTxID]RawDiscardTransactionResult | |||
|
|||
// Disable notifictions to a device | |||
DisableNotificactions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safe to remove from interface, as it is used only in tests
geth/transactions/queue/queue.go
Outdated
transactions map[common.QueuedTxID]*common.QueuedTx | ||
inprogress map[common.QueuedTxID]empty | ||
|
||
// TODO don't use another goroutine for eviction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add my id
@@ -112,6 +114,7 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { | |||
s.NoError(tx.Err) | |||
// Transaction should be already removed from the queue. | |||
s.False(txQueueManager.TransactionQueue().Has(tx.ID)) | |||
<-w |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after rebase it will be changed to a wait with timeout
@@ -264,4 +268,5 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { | |||
s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) | |||
// Transaction should be already removed from the queue. | |||
s.False(txQueueManager.TransactionQueue().Has(tx.ID)) | |||
<-w |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
geth/transactions/queue/queue.go
Outdated
@@ -193,6 +193,7 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er | |||
|
|||
func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { | |||
delete(q.inprogress, tx.ID) | |||
tx.Err = err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove L201
|
||
func (m *Manager) validateAccount(tx *common.QueuedTx) (*common.SelectedExtKey, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move verifyAccountPassword password to validateAccount
geth/transactions/txqueue_manager.go
Outdated
log.Info("complete transaction", "id", queuedTx.ID) | ||
var emptyHash gethcommon.Hash | ||
log.Info("verifying account password for transaction", "id", queuedTx.ID) | ||
config, err := m.nodeManager.NodeConfig() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we to nofity on node config error? if not move it to CompleteTransaction
there are 2 potential follow-ups for this refactoring that can simplify txqueue even more.
However, consumers of tx are interested only in the hash and err. Atm there are 2 consumers:
Let me know if this makes sense to you |
Thanks @dshulyak for this huge effort! You rock! I will review it in details tomorrow. Regarding the follow-ups:
// wait for the transaction to finish
done := txqueue.Subscribe(tx.ID)
<-done
// get details, hash or error
hash, err := txqueue.TransactionHash(tx.ID)
// or only error
err := txqueue.TransactionErr(tx.ID) |
ci issue #539 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some change requests. It looks very promising 👏 However, I must also say it's hard to review because of the number of changes and the previous PR included (and I know there is no better way :().
Also, I still need to figure out if we can remove eviction completely.
I will ping the rest of the team to review #527 so that we can merge it first and have this PR smaller.
geth/transactions/queue/queue.go
Outdated
@@ -36,33 +33,38 @@ var ( | |||
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") | |||
) | |||
|
|||
// remove from queue on any error (except for transient ones) and propagate | |||
var transientErrs = map[string]bool{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why this map is not map[error]bool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is for errors returned from an eth client, jsonError{Message: "error"}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. Errors from EthClient
are dynamically created so even if an error has the same message, it will be a different object, thus relying on strings. Make sense 👍 Also, cool that error messages are derived from packages so everything should be fine.
I would suggest adding a comment explaining this structure as looking at its initialization it's tempting to change it to map[error]bool
. Or a unit test that will test this property.
account.ErrNoAccountSelected.Error(): true, // account not selected | ||
} | ||
|
||
type empty struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was a common practice in some opensource projects to use such type for such cases, but I can avoid using it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave it then. Maybe it's just me and everyone are ok with it.
geth/transactions/queue/queue.go
Outdated
if q.txEnqueueHandler == nil { //discard, until handler is provided | ||
log.Info("there is no txEnqueueHandler") | ||
return nil | ||
if (tx.Hash != gethcommon.Hash{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if a transaction errored? Before it would not have Hash
set but Error
.
It looks like this method is not thread-safe:
tx.Hash
access is not guarded,- It's possible to enqueue already enqueued transaction (it looks like this was an issue before as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thought that it makes sense to enqueu transaction again, even if an error was not transient.
but to be fair, is it even required? i dont think that i've seen a code path that can actually try to enqueue same transaction again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you thought about some particular use cases?
In our case, if a transaction reports an error, and it's not transient, then the transaction is removed and a signal is sent. If the error is transient, the transaction stays in the queue and a signal is sent. I don't recall any use cases that would allow or require putting the same transaction again to the queue, thus, I would throw an error in case of such an attempt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i will add error comparison for now, and will get back to this topic in #537
geth/transactions/queue/queue.go
Outdated
if _, inprogress := q.inprogress[id]; inprogress { | ||
return tx, ErrQueuedTxInProgress | ||
} | ||
q.inprogress[id] = empty{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we set the transaction to inprogress
map here in Get
method? Getters should not perform any side effects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that in the current code there is only one place where Get is called (CompleteTransaction), so I wanted to get rid of the methods that are not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or better to say redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it doesn't make sense i will add separate method for Get and another for marking transaction as inprogress
@adambabik you can also review particular commit by selecting it from a github changelog. I can't post a link because github changes it to a wrong one. |
@divan @adambabik i rebased this pr and fixed comments after initial review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for rebasing, it's really easy to review it now.
|
||
// CreateTransaction returns a transaction object. | ||
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { | ||
return &QueuedTx{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can also change a name of this struct to Transaction
? Or eventually TransactionQueued
if the first one is too verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idk, name seems fine to me
// CreateTransaction returns a transaction object. | ||
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { | ||
return &QueuedTx{ | ||
ID: QueuedTxID(uuid.New()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we also discussed getting rid of QueuedTxID
and just using a string
type. What do you think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i tried and it somewhat increased the size of this pr (because of mocks). i would prefer to create a separate pr which would be trivial to review
geth/common/utils.go
Outdated
Hash: common.Hash{}, | ||
Context: ctx, | ||
Args: args, | ||
Done: make(chan struct{}, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be a non-buffered channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that original idea was not to block the producer (complete transaction method). i would keep it this way, any reason not to?
geth/transactions/notifications.go
Outdated
// EventTransactionFailed is triggered when send transaction request fails | ||
EventTransactionFailed = "transaction.failed" | ||
|
||
SendTransactionNoErrorCode = "0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't error codes be a number? Maybe something like:
const (
SendTransactionDefaultErrorCode int = iota + 1
SendTransactionPasswordErrorCode
...
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, will try to change this
type ReturnSendTransactionEvent struct { | ||
ID string `json:"id"` | ||
Args common.SendTxArgs `json:"args"` | ||
MessageID string `json:"message_id"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is MessageID
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see that it is derived from context, but context is actually never updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, now I remember what it is. Let's leave it and we will create a follow-up issue to research with Clojure Team if we can get rid of this.
geth/transactions/notifications.go
Outdated
|
||
// NotifyOnReturn returns handler that processes responses from internal tx manager | ||
func NotifyOnReturn(queuedTx *common.QueuedTx) { | ||
if queuedTx.Err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment why transactions without error are skipped (because we don't need to send any signals, but it may not be obvious).
geth/transactions/queue/queue.go
Outdated
} | ||
|
||
// NewTransactionQueue make new transaction queue | ||
func NewTransactionQueue() *TxQueue { | ||
func NewQueue() *TxQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can also be func New()
as queue
is the name of the package.
if q.txEnqueueHandler == nil { //discard, until handler is provided | ||
log.Info("there is no txEnqueueHandler") | ||
return nil | ||
if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic can be encapsulated as common.QueuedTx
's method, e.g. tx.IsProcessed()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's get back to it in this PR #537
geth/transactions/queue/queue.go
Outdated
} | ||
|
||
// LockInprogress returns transcation and locks it as inprogress | ||
func (q *TxQueue) LockInprogress(id common.QueuedTxID) (*common.QueuedTx, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to return common.QueuedTx
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wanted to avoid doing get and then lock, particularly i was thinking about SELECT FOR UPDATE sql statement when i added this method
maybe naming is unfortunate, though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find where it is used except for a test file.
If we have LockInProgress()
and Get()
and call it in this order everything should be ok without race conditions, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe naming is unfortunate, though
Agree. I had to go to the function definition to understand what does it mean. Maybe GetAndLock
?
@adambabik it's used in (*Manager).CompleteTransaction()
:
// CompleteTransaction instructs backend to complete sending of a given transaction.
// TODO(adam): investigate a possible bug that calling this method multiple times with the same Transaction ID
// results in sending multiple transactions.
func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (hash gethcommon.Hash, err error) {
log.Info("complete transaction", "id", id)
tx, err := m.txQueue.LockInprogress(id)
if err != nil {
log.Warn("error getting a queued transaction", "err", err)
return hash, err
}
BTW, is it solving bug mentioned in TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, this comment is not relevant anymore. The problem was that we were not tracking which transaction is in progress. The fix was introduced before this refactoring and we are still good so let's remove it.
I would just go with LockInProgress() error
and use Get()
. This code is not a bottleneck so readability vs preoptimization.
geth/transactions/queue/queue.go
Outdated
if err == nil { | ||
q.remove(tx.ID) | ||
tx.Hash = hash | ||
tx.Done <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about just closing the channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed looks like it is safe to close, i saw a reason for not closing it but i dont see it right now :)
@adambabik Sorry for the late reply. Nope, I have no idea. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks great! Amazing job! I added just a few minor request changes, mostly in tests.
geth/transactions/queue/queue.go
Outdated
} | ||
|
||
// LockInprogress returns transcation and locks it as inprogress | ||
func (q *TxQueue) LockInprogress(id common.QueuedTxID) (*common.QueuedTx, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find where it is used except for a test file.
If we have LockInProgress()
and Get()
and call it in this order everything should be ok without race conditions, right?
geth/transactions/queue/queue.go
Outdated
return | ||
} | ||
_, transient := transientErrs[err.Error()] | ||
if !transient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be written as if _, transient := transientErrs[err.Error()]; !transient {
.
// enqueue will fail if transaction with hash will be enqueued | ||
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) | ||
tx.Hash = gethcommon.Hash{1} | ||
s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Condition tx.Err != nil
also should be checked.
err := keystore.ErrDecrypt | ||
tx := s.testDone(hash, err) | ||
s.Equal(keystore.ErrDecrypt, tx.Err) | ||
s.NotEqual(hash, tx.Hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like not needed as below we have s.Equal(gethcommon.Hash{}, tx.Hash)
.
// event is sent only if transaction was removed from a queue | ||
select { | ||
case <-tx.Done: | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice default
usage 👍
|
||
log.Info(fmt.Sprintf("Number of transactions sent: %d. Queue size (shouldn't be more than %d): %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove all log lines from tests.
Perhaps it can be replaced with a check for the right txCount
value here? (it's not used anywhere except this log line)
geth/transactions/notifications.go
Outdated
SendTransactionDiscardedErrorCode | ||
) | ||
|
||
var txReturnCodes = map[error]int{ // deliberately strings, in case more meaningful codes are to be returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
obsolete comment?
geth/transactions/notifications.go
Outdated
|
||
// NotifyOnEnqueue returns handler that processes incoming tx queue requests | ||
func NotifyOnEnqueue(queuedTx *common.QueuedTx) { | ||
log.Info("calling TransactionQueueHandler") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like copypasted and forgotten log output. Remove it?
geth/transactions/queue/queue.go
Outdated
return nil, ErrQueuedTxIDNotFound | ||
} | ||
|
||
// LockInprogress returns transcation and locks it as inprogress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: transcation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor remarks, LGTM 👍
Changes to TxQueue: - factored out into a separate module to ensure that only exported methods will be used - deleted EnqueuAsync as it is not used, it is always possible to restore it from git, if it will be ever needed - notifications moved to a manager, with the goal to simplify txqueue and separate concerns between manager and txqueue - simplified API for processing transactions After analyzing code I removed all redundant methods and currently all the processing can be done with Get and Done methods. - added inprogress map to store transactions that are taken for processing and removed Inprogress variable from transaction, the goal is to cleanup tx structure - removed Discard channel from transaction, transaction.Err can be used to understand if transaction was discarded or failed due to another error - transient errors stored as global variable of queue module for simplicity, it is unlikely that they will become dynamic Change to Manager: - simplified code that manages notifications, all handlers were removed and notifiers refactored to be simple functions that are called in two places, when transaction is queued and when manager finished waiting for transaction - notifications can be turned of by calling DisableNotifications on manager - made a function out of CreateTransaction method, as it can be used as a simple utility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ready to be merged! 👏
log.Info("complete transaction", "id", queuedTx.ID) | ||
var emptyHash gethcommon.Hash | ||
log.Info("verifying account password for transaction", "id", queuedTx.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log is unnecessary as there is one above already.
// QueueTransaction puts a transaction into the queue. | ||
func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { | ||
to := "<nil>" | ||
if tx.Args.To != nil { | ||
to = tx.Args.To.Hex() | ||
} | ||
log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) | ||
err := m.txQueue.Enqueue(tx) | ||
if m.notify { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be beneficial to create a method
func (m *Manager) notifyOnEnqueue(tx) {
if m.notify {
NotifyOnEnqueue(tx)
}
}
so that the whole logic when to send a notification is encapsulated in one place. Here, if there will be another condition, all NotifyOnEnqueue
call conditions would need to be changed.
@adambabik @themue please, consider merging it |
@dshulyak done, sorry didn't notice it was a PR from a fork. |
Based on PR: #527
Fixes: #338
Changes to TxQueue:
will be ever needed
between manager and txqueue
After analyzing code I removed all redundant methods and currently all the processing can be done with Get and Done methods.
and removed Inprogress variable from transaction, the goal is to cleanup tx structure
if transaction was discarded or failed due to another error
that they will become dynamic
Changes to Manager:
refactored to be simple functions that are called in several places