Skip to content

Commit

Permalink
associate sigs to retry ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Dec 2, 2024
1 parent f027aeb commit 8c18891
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 127 deletions.
110 changes: 57 additions & 53 deletions pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ var (

type PendingTxContext interface {
// New adds a new tranasction in Broadcasted state to the storage
New(msg pendingTx, sig solana.Signature, cancel context.CancelFunc) error
// AddSignature adds a new signature for an existing transaction ID
AddSignature(id string, sig solana.Signature) error
New(msg pendingTx) error
// AddSignature adds a new signature to a broadcasted transaction in the pending transaction context.
// It associates the provided context and cancel function with the signature to manage retry and bumping cycles.
AddSignature(ctx context.Context, cancel context.CancelFunc, id string, sig solana.Signature) error
// Remove removes transaction and related signatures from storage if not in finalized or errored state
Remove(sig solana.Signature) (string, error)
// ListAll returns all of the signatures being tracked for all transactions not yet finalized or errored
Expand All @@ -50,7 +51,7 @@ type PendingTxContext interface {
// GetSignatureInfo returns the transaction ID and TxState for the provided signature
GetSignatureInfo(sig solana.Signature) (txInfo, error)
// OnReorg resets the transaction state to Broadcasted for the given signature and returns the pendingTx for retrying.
OnReorg(sig solana.Signature) (pendingTx, error)
OnReorg(sig solana.Signature) (pendingTx, retryCtx, error)
}

// finishedTx is used to store info required to track transactions to finality or error
Expand All @@ -74,11 +75,16 @@ type txInfo struct {
state TxState
}

type retryCtx struct {
ctx context.Context
cancel context.CancelFunc
}

var _ PendingTxContext = &pendingTxContext{}

type pendingTxContext struct {
cancelBy map[string]context.CancelFunc
sigToTxInfo map[solana.Signature]txInfo
sigToRetryCtx map[solana.Signature]retryCtx
sigToTxInfo map[solana.Signature]txInfo

broadcastedProcessedTxs map[string]pendingTx // broadcasted and processed transactions that may require retry and bumping
confirmedTxs map[string]pendingTx // transactions that require monitoring for re-org
Expand All @@ -89,21 +95,18 @@ type pendingTxContext struct {

func newPendingTxContext() *pendingTxContext {
return &pendingTxContext{
cancelBy: map[string]context.CancelFunc{},
sigToTxInfo: map[solana.Signature]txInfo{},
sigToRetryCtx: map[solana.Signature]retryCtx{},
sigToTxInfo: map[solana.Signature]txInfo{},

broadcastedProcessedTxs: map[string]pendingTx{},
confirmedTxs: map[string]pendingTx{},
finalizedErroredTxs: map[string]finishedTx{},
}
}

func (c *pendingTxContext) New(tx pendingTx, sig solana.Signature, cancel context.CancelFunc) error {
// New adds a new tranasction in Broadcasted state to the storage
func (c *pendingTxContext) New(tx pendingTx) error {
err := c.withReadLock(func() error {
// validate signature does not exist
if _, exists := c.sigToTxInfo[sig]; exists {
return ErrSigAlreadyExists
}
// validate id does not exist
if _, exists := c.broadcastedProcessedTxs[tx.id]; exists {
return ErrIDAlreadyExists
Expand All @@ -114,19 +117,12 @@ func (c *pendingTxContext) New(tx pendingTx, sig solana.Signature, cancel contex
return err
}

// upgrade to write lock if sig or id do not exist
// upgrade to write lock if id do not exist
_, err = c.withWriteLock(func() (string, error) {
if _, exists := c.sigToTxInfo[sig]; exists {
return "", ErrSigAlreadyExists
}
if _, exists := c.broadcastedProcessedTxs[tx.id]; exists {
return "", ErrIDAlreadyExists
}
// save cancel func
c.cancelBy[tx.id] = cancel
c.sigToTxInfo[sig] = txInfo{id: tx.id, state: Broadcasted}
// add signature to tx
tx.signatures = append(tx.signatures, sig)
tx.signatures = []solana.Signature{}
tx.createTs = time.Now()
// save to the broadcasted map since transaction was just broadcasted
c.broadcastedProcessedTxs[tx.id] = tx
Expand All @@ -135,7 +131,9 @@ func (c *pendingTxContext) New(tx pendingTx, sig solana.Signature, cancel contex
return err
}

func (c *pendingTxContext) AddSignature(id string, sig solana.Signature) error {
// AddSignature adds a new signature to a broadcasted transaction in the pending transaction context.
// Additionally, it associates the provided context and cancel function with the signature to manage retry and bumping cycles.
func (c *pendingTxContext) AddSignature(ctx context.Context, cancel context.CancelFunc, id string, sig solana.Signature) error {
err := c.withReadLock(func() error {
// signature already exists
if _, exists := c.sigToTxInfo[sig]; exists {
Expand Down Expand Up @@ -164,6 +162,8 @@ func (c *pendingTxContext) AddSignature(id string, sig solana.Signature) error {
tx := c.broadcastedProcessedTxs[id]
// save new signature
tx.signatures = append(tx.signatures, sig)
// save retryCtx to stop retry/bumping cycles
c.sigToRetryCtx[sig] = retryCtx{ctx: ctx, cancel: cancel}
// save updated tx to broadcasted map
c.broadcastedProcessedTxs[id] = tx
return "", nil
Expand Down Expand Up @@ -208,15 +208,13 @@ func (c *pendingTxContext) Remove(sig solana.Signature) (id string, err error) {
delete(c.confirmedTxs, info.id)
}

// call cancel func + remove from map
if cancel, exists := c.cancelBy[info.id]; exists {
cancel() // cancel context
delete(c.cancelBy, info.id)
}

// remove all signatures associated with transaction from sig map
// remove all signatures associated with transaction from sig map and cancel any associated contexts to stop retry/bumping cycles
for _, s := range tx.signatures {
delete(c.sigToTxInfo, s)
if rtryCtx, exists := c.sigToRetryCtx[s]; exists {
rtryCtx.cancel()
delete(c.sigToRetryCtx, s)
}
}
return info.id, nil
})
Expand Down Expand Up @@ -344,9 +342,9 @@ func (c *pendingTxContext) OnConfirmed(sig solana.Signature) (string, error) {
return info.id, ErrTransactionNotFound
}
// call cancel func + remove from map to stop the retry/bumping cycle for this transaction
if cancel, exists := c.cancelBy[info.id]; exists {
cancel() // cancel context
delete(c.cancelBy, info.id)
if rtryCtx, exists := c.sigToRetryCtx[sig]; exists {
rtryCtx.cancel()
delete(c.sigToRetryCtx, sig)
}
// update sig and tx state to Confirmed
info.state = Confirmed
Expand Down Expand Up @@ -394,19 +392,18 @@ func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout ti
if !broadcastedExists && !confirmedExists {
return info.id, ErrTransactionNotFound
}
// call cancel func + remove from map to stop the retry/bumping cycle for this transaction
// cancel is expected to be called and removed when tx is confirmed but checked here too in case state is skipped
if cancel, exists := c.cancelBy[info.id]; exists {
cancel() // cancel context
delete(c.cancelBy, info.id)
}
// delete from broadcasted map, if exists
delete(c.broadcastedProcessedTxs, info.id)
// delete from confirmed map, if exists
delete(c.confirmedTxs, info.id)
// remove all related signatures from the sigToTxInfo map to skip picking up this tx in the confirmation logic
// call cancel func + remove from map to stop the retry/bumping cycle for this transaction
for _, s := range tx.signatures {
delete(c.sigToTxInfo, s)
if rtryCtx, exists := c.sigToRetryCtx[s]; exists {
rtryCtx.cancel()
delete(c.sigToRetryCtx, s)
}
}
// if retention duration is set to 0, delete transaction from storage
// otherwise, move to finalized map
Expand Down Expand Up @@ -503,18 +500,18 @@ func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.D
if !broadcastedExists && !confirmedExists {
return "", ErrTransactionNotFound
}
// call cancel func + remove from map
if cancel, exists := c.cancelBy[info.id]; exists {
cancel() // cancel context
delete(c.cancelBy, info.id)
}
// delete from broadcasted map, if exists
delete(c.broadcastedProcessedTxs, info.id)
// delete from confirmed map, if exists
delete(c.confirmedTxs, info.id)
// remove all related signatures from the sigToTxInfo map to skip picking up this tx in the confirmation logic
// call cancel func + remove from map to stop the retry/bumping cycle for this transaction
for _, s := range tx.signatures {
delete(c.sigToTxInfo, s)
if rtryCtx, exists := c.sigToRetryCtx[s]; exists {
rtryCtx.cancel() // cancel context
delete(c.sigToRetryCtx, s)
}
}
// if retention duration is set to 0, skip adding transaction to the errored map
if retentionTimeout == 0 {
Expand Down Expand Up @@ -621,7 +618,7 @@ func (c *pendingTxContext) GetSignatureInfo(sig solana.Signature) (txInfo, error
return info, nil
}

func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, retryCtx, error) {
// Acquire a read lock to check if the signature exists and needs to be reset
err := c.withReadLock(func() error {
// Check if the signature is still being tracked
Expand All @@ -640,10 +637,11 @@ func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
})
if err != nil {
// If transaction or sig are not found, return
return pendingTx{}, err
return pendingTx{}, retryCtx{}, err
}

var pTx pendingTx
var rtryCtx retryCtx
// Acquire a write lock to perform the state reset
_, err = c.withWriteLock(func() (string, error) {
// Retrieve sig and tx again inside the write lock
Expand All @@ -652,6 +650,12 @@ func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
return "", ErrSigDoesNotExist
}

// Check if the retryCtx is still relevant
rtryCtx, exists = c.sigToRetryCtx[sig]
if !exists {
return "", fmt.Errorf("retry context not found for signature %s associated to tx id %s", sig.String(), info.id)
}

// Attempt to find the transaction in the broadcasted or confirmed maps
var tx pendingTx
var broadcastedExists, confirmedExists bool
Expand All @@ -673,11 +677,11 @@ func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
})
if err != nil {
// If transaction or sig were not found, return
return pendingTx{}, err
return pendingTx{}, rtryCtx, err
}

// Return the transaction for retrying
return pTx, nil
return pTx, rtryCtx, nil
}

func (c *pendingTxContext) withReadLock(fn func() error) error {
Expand Down Expand Up @@ -717,12 +721,12 @@ func newPendingTxContextWithProm(id string) *pendingTxContextWithProm {
}
}

func (c *pendingTxContextWithProm) New(msg pendingTx, sig solana.Signature, cancel context.CancelFunc) error {
return c.pendingTx.New(msg, sig, cancel)
func (c *pendingTxContextWithProm) New(msg pendingTx) error {
return c.pendingTx.New(msg)
}

func (c *pendingTxContextWithProm) AddSignature(id string, sig solana.Signature) error {
return c.pendingTx.AddSignature(id, sig)
func (c *pendingTxContextWithProm) AddSignature(ctx context.Context, cancel context.CancelFunc, id string, sig solana.Signature) error {
return c.pendingTx.AddSignature(ctx, cancel, id, sig)
}

func (c *pendingTxContextWithProm) OnProcessed(sig solana.Signature) (string, error) {
Expand Down Expand Up @@ -811,6 +815,6 @@ func (c *pendingTxContextWithProm) GetSignatureInfo(sig solana.Signature) (txInf
return c.pendingTx.GetSignatureInfo(sig)
}

func (c *pendingTxContextWithProm) OnReorg(sig solana.Signature) (pendingTx, error) {
func (c *pendingTxContextWithProm) OnReorg(sig solana.Signature) (pendingTx, retryCtx, error) {
return c.pendingTx.OnReorg(sig)
}
Loading

0 comments on commit 8c18891

Please sign in to comment.