Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NONEVM-706][SOAK] - Soak Testing TxExpirationRebroadcast feature #962

Draft
wants to merge 62 commits into
base: develop
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
2d1a82d
refactor so txm owns blockhash assignment
Farber98 Nov 15, 2024
50dfef0
lastValidBlockHeight shouldn't be exported
Farber98 Nov 15, 2024
4e545e2
better comment
Farber98 Nov 15, 2024
4ded53c
refactor sendWithRetry to make it clearer
Farber98 Nov 15, 2024
9e1be6d
confirm loop refactor
Farber98 Nov 18, 2024
7dd2028
fix infinite loop
Farber98 Nov 18, 2024
6c675f2
move accountID inside msg
Farber98 Nov 19, 2024
b0d9426
lint fix
Farber98 Nov 19, 2024
1b38665
base58 does not contain lower l
Farber98 Nov 19, 2024
6923ddf
fix hash errors
Farber98 Nov 19, 2024
462844b
fix generate random hash
Farber98 Nov 19, 2024
fd785d0
remove blockhash as we only need block height
Farber98 Nov 19, 2024
cf958a4
expired tx changes without tests
Farber98 Nov 19, 2024
c5e957b
add maybe to mocks
Farber98 Nov 19, 2024
a505993
expiration tests
Farber98 Nov 19, 2024
adc8b1c
send txes through queue
Farber98 Nov 19, 2024
7d77f99
revert pendingtx leakage of information. overwrite blockhash
Farber98 Nov 20, 2024
92a280b
fix order of confirm loop and not found signature check
Farber98 Nov 20, 2024
2598e19
fix mocks
Farber98 Nov 20, 2024
42b3da1
prevent confirmation loop to mark tx as errored when it needs to be r…
Farber98 Nov 20, 2024
89af1f3
fix test
Farber98 Nov 20, 2024
5e8a0da
fix pointer
Farber98 Nov 20, 2024
75c1dcd
add comments
Farber98 Nov 21, 2024
4ff2d23
reduce rpc calls + refactors
Farber98 Nov 21, 2024
84e423e
tests + check to save rpc calls
Farber98 Nov 21, 2024
7d8319e
address feedback + remove redundant impl
Farber98 Nov 22, 2024
68f3a3e
iface comment
Farber98 Nov 22, 2024
780179f
address feedback on compute unit limit and lastValidBlockHeight assig…
Farber98 Nov 25, 2024
98f0246
blockhash assignment inside txm.sendWithRetry
Farber98 Nov 25, 2024
cbf55f6
address feedback
Farber98 Nov 26, 2024
90daf33
Merge branch 'develop' into nonevm-706-support-custom-bumping-strateg…
Farber98 Nov 26, 2024
77b28cf
refactors after merge
Farber98 Nov 26, 2024
0c4a7d8
fix interactive rebase
Farber98 Nov 26, 2024
849ac48
fix whitespace diffs
Farber98 Nov 26, 2024
20a1548
fix import
Farber98 Nov 26, 2024
a4d4770
fix mocks
Farber98 Nov 26, 2024
56a64da
add on prebroadcaste error
Farber98 Nov 26, 2024
9148d7d
remove rebroadcast count and fix package
Farber98 Nov 27, 2024
caf2cbf
improve docs
Farber98 Nov 27, 2024
1fbd63f
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Nov 28, 2024
5c22af2
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Dec 5, 2024
02ffd1a
fix comparison against blockHeight instead of slotHeight
Farber98 Dec 5, 2024
c00494c
address feedback
Farber98 Dec 5, 2024
6ac30b0
fix lint
Farber98 Dec 5, 2024
36ee4ec
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Dec 5, 2024
0e38174
fix log
Farber98 Dec 5, 2024
b04653c
config for soaks
Farber98 Dec 5, 2024
d240021
address feedback
Farber98 Dec 6, 2024
4389e13
remove useless slot height
Farber98 Dec 6, 2024
6bcdd8f
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Dec 9, 2024
50dd10f
address feedback
Farber98 Dec 10, 2024
10b0453
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Dec 11, 2024
2ea0c50
validate that tx doesn't exist in any of maps when adding new tx
Farber98 Dec 11, 2024
b5a6927
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Dec 11, 2024
b4b4fd5
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Dec 13, 2024
e7d7680
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Dec 16, 2024
409fd1c
callers set lastValidBlockheight + get blockhash on expiration + inte…
Farber98 Dec 17, 2024
b900abe
Merge branch 'backup-branch-fee-bumping' into soak-expiration-rebroad…
Farber98 Dec 17, 2024
31a65a0
Merge branch 'develop' into backup-branch-fee-bumping
Farber98 Dec 17, 2024
1be704a
add enq iface comm to help callers
Farber98 Dec 17, 2024
9ee8977
address feedback
Farber98 Dec 17, 2024
105e890
Merge branch 'backup-branch-fee-bumping' into soak-expiration-rebroad…
Farber98 Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
revert pendingtx leakage of information. overwrite blockhash
Farber98 committed Nov 26, 2024

Verified

This commit was signed with the committer’s verified signature.
Farber98 Juan Farber
commit 7d77f9930f1c2ef5e50049f38929a16bd55e482a
43 changes: 32 additions & 11 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@ import (
"sync"
"time"

"github.com/gagliardetto/solana-go"
solanago "github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/programs/system"
"github.com/gagliardetto/solana-go/rpc"
@@ -528,6 +527,11 @@ func (c *chain) HealthReport() map[string]error {
}

func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error {
reader, err := c.Reader()
if err != nil {
return fmt.Errorf("chain unreachable: %w", err)
}

fromKey, err := solanago.PublicKeyFromBase58(from)
if err != nil {
return fmt.Errorf("failed to parse from key: %w", err)
@@ -541,6 +545,10 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba
}
amountI := amount.Uint64()

blockhash, err := reader.LatestBlockhash(ctx)
if err != nil {
return fmt.Errorf("failed to get latest block hash: %w", err)
}
tx, err := solanago.NewTransaction(
[]solanago.Instruction{
system.NewTransferInstruction(
@@ -549,25 +557,21 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba
toKey,
).Build(),
},
solana.Hash{}, // Will be set within sendWithRetry txm function.
blockhash.Value.Blockhash, // Will be override if needed within sendWithRetry txm function.
solanago.TransactionPayer(fromKey),
)
if err != nil {
return fmt.Errorf("failed to create tx: %w", err)
}

msg := &txm.PendingTx{
Tx: *tx,
AccountID: "",
// To perform balanceCheck we need a blockhash.
// Storing values to perform balanceCheck within sendWithRetry txm function before sending tx.
BalanceCheck: balanceCheck,
From: fromKey,
Amount: amountI,
if balanceCheck {
if err = solanaValidateBalance(ctx, reader, fromKey, amountI, tx.Message.ToBase64()); err != nil {
return fmt.Errorf("failed to validate balance: %w", err)
}
}

chainTxm := c.TxManager()
err = chainTxm.Enqueue(ctx, msg,
err = chainTxm.Enqueue(ctx, "", tx, nil,
txm.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units
// no fee bumping and no additional fee - makes validating balance accurate
txm.SetComputeUnitPriceMax(0),
@@ -580,3 +584,20 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba
}
return nil
}

func solanaValidateBalance(ctx context.Context, reader client.Reader, from solanago.PublicKey, amount uint64, msg string) error {
balance, err := reader.Balance(ctx, from)
if err != nil {
return err
}

fee, err := reader.GetFeeForMessage(ctx, msg)
if err != nil {
return err
}

if balance < (amount + fee) {
return fmt.Errorf("balance %d is too low for this transaction to be executed: amount %d + fee %d", balance, amount, fee)
}
return nil
}
18 changes: 9 additions & 9 deletions pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@ import (
mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
solcfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/fees"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/mocks"
)

@@ -536,8 +535,11 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(0), receiverBal)

createMsgWithTx := func(accountID string, signer solana.PublicKey, sender solana.PublicKey, receiver solana.PublicKey, amt uint64) *txm.PendingTx {
createTx := func(signer solana.PublicKey, sender solana.PublicKey, receiver solana.PublicKey, amt uint64) *solana.Transaction {
selectedClient, err = testChain.getClient()
assert.NoError(t, err)
hash, hashErr := selectedClient.LatestBlockhash(tests.Context(t))
assert.NoError(t, hashErr)
tx, txErr := solana.NewTransaction(
[]solana.Instruction{
system.NewTransferInstruction(
@@ -546,16 +548,15 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) {
receiver,
).Build(),
},
solana.Hash{},
hash.Value.Blockhash,
solana.TransactionPayer(signer),
)
require.NoError(t, txErr)
return &txm.PendingTx{Tx: *tx, AccountID: accountID}
return tx
}

// Send funds twice, along with an invalid transaction
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), createMsgWithTx("test_success", pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)))

require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
// Wait for new block hash
currentBh, err := selectedClient.LatestBlockhash(tests.Context(t))
require.NoError(t, err)
@@ -575,9 +576,8 @@ NewBlockHash:
}
}

require.NoError(t, testChain.txm.Enqueue(tests.Context(t), createMsgWithTx("test_success_2", pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)))
require.Error(t, testChain.txm.Enqueue(tests.Context(t), createMsgWithTx("test_invalidSigner", pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) // cannot sign tx before enqueuing

require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) // cannot sign tx before enqueuing
// wait for all txes to finish
ctx, cancel := context.WithCancel(tests.Context(t))
t.Cleanup(cancel)
2 changes: 1 addition & 1 deletion pkg/solana/relay.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import (
var _ TxManager = (*txm.Txm)(nil)

type TxManager interface {
Enqueue(ctx context.Context, msg *txm.PendingTx, txCfgs ...txm.SetTxConfig) error
Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txCfgs ...txm.SetTxConfig) error
}

var _ relaytypes.Relayer = &Relayer{} //nolint:staticcheck
25 changes: 16 additions & 9 deletions pkg/solana/transmitter.go
Original file line number Diff line number Diff line change
@@ -3,15 +3,14 @@ package solana
import (
"bytes"
"context"
"errors"
"fmt"

"github.com/gagliardetto/solana-go"
"github.com/smartcontractkit/libocr/offchainreporting2/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
)

var _ types.ContractTransmitter = (*Transmitter)(nil)
@@ -31,6 +30,19 @@ func (c *Transmitter) Transmit(
report types.Report,
sigs []types.AttributedOnchainSignature,
) error {
reader, err := c.getReader()
if err != nil {
return fmt.Errorf("error on Transmit.Reader: %w", err)
}

blockhash, err := reader.LatestBlockhash(ctx)
if err != nil {
return fmt.Errorf("error on Transmit.GetRecentBlockhash: %w", err)
}
if blockhash == nil || blockhash.Value == nil {
return errors.New("nil pointer returned from Transmit.GetRecentBlockhash")
}

// Determine store authority
seeds := [][]byte{[]byte("store"), c.stateID.Bytes()}
storeAuthority, storeNonce, err := solana.FindProgramAddress(seeds, c.programID)
@@ -66,21 +78,16 @@ func (c *Transmitter) Transmit(
[]solana.Instruction{
solana.NewInstruction(c.programID, accounts, data.Bytes()),
},
solana.Hash{}, // Will be set within sendWithRetry txm function.
blockhash.Value.Blockhash, // Will be override if needed within sendWithRetry txm function.
solana.TransactionPayer(c.transmissionSigner),
)
if err != nil {
return fmt.Errorf("error on Transmit.NewTransaction: %w", err)
}

msg := &txm.PendingTx{
Tx: *tx,
AccountID: c.stateID.String(),
}

// pass transmit payload to tx manager queue
c.lggr.Debugf("Queuing transmit tx: state (%s) + transmissions (%s)", c.stateID.String(), c.transmissionsID.String())
if err = c.txManager.Enqueue(ctx, msg); err != nil {
if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx, nil); err != nil {
return fmt.Errorf("error on Transmit.txManager.Enqueue: %w", err)
}
return nil
9 changes: 6 additions & 3 deletions pkg/solana/transmitter_test.go
Original file line number Diff line number Diff line change
@@ -5,8 +5,10 @@ import (
"testing"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/smartcontractkit/libocr/offchainreporting2/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -25,9 +27,8 @@ type verifyTxSize struct {
s *solana.PrivateKey
}

func (txm verifyTxSize) Enqueue(_ context.Context, msg *txm.PendingTx, _ ...txm.SetTxConfig) error {
func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ ...txm.SetTxConfig) error {
// additional components that transaction manager adds to the transaction
tx := &msg.Tx
require.NoError(txm.t, fees.SetComputeUnitPrice(tx, 0))
require.NoError(txm.t, fees.SetComputeUnitLimit(tx, 0))

@@ -58,7 +59,9 @@ func TestTransmitter_TxSize(t *testing.T) {
}

rw := clientmocks.NewReaderWriter(t)

rw.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{},
}, nil)
transmitter := Transmitter{
stateID: mustNewRandomPublicKey(),
programID: mustNewRandomPublicKey(),
53 changes: 24 additions & 29 deletions pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
@@ -21,15 +21,15 @@ var (

type PendingTxContext interface {
// New adds a new tranasction in Broadcasted state to the storage
New(msg PendingTx, sig solana.Signature, cancel context.CancelFunc) error
New(msg pendingTx, sig solana.Signature, cancel context.CancelFunc) error
// AddSignature adds a new signature for an existing transaction ID
AddSignature(id string, sig solana.Signature) error
// Remove removes transaction and related signatures from storage if not in finalized or errored state
Remove(sig solana.Signature) (string, error)
// ListAll returns all of the signatures being tracked for all transactions not yet finalized or errored
ListAll() []solana.Signature
// ListAllExpiredBroadcastedTxs returns all the expired broadcasted that are in broadcasted state and have expired for given slot height.
ListAllExpiredBroadcastedTxs(currHeight uint64) []PendingTx
ListAllExpiredBroadcastedTxs(currHeight uint64) []pendingTx
// Expired returns whether or not confirmation timeout amount of time has passed since creation
Expired(sig solana.Signature, confirmationTimeout time.Duration) bool
// OnProcessed marks transactions as Processed
@@ -48,21 +48,16 @@ type PendingTxContext interface {
TrimFinalizedErroredTxs() int
}

type PendingTx struct {
Tx solana.Transaction
AccountID string
type pendingTx struct {
tx solana.Transaction
cfg TxConfig
signatures []solana.Signature
UUID string
IDSetByCaller bool
id string
rebroadcastCount int
createTs time.Time
retentionTs time.Time
state TxState
lastValidBlockHeight uint64 // to track expiration
BalanceCheck bool
From solana.PublicKey // to perform balanceCheck
Amount uint64 // to perform balanceCheck
}

var _ PendingTxContext = &pendingTxContext{}
@@ -71,9 +66,9 @@ type pendingTxContext struct {
cancelBy map[string]context.CancelFunc
sigToID map[solana.Signature]string

broadcastedProcessedTxs map[string]PendingTx // broadcasted and processed transactions that may require retry and bumping
confirmedTxs map[string]PendingTx // transactions that require monitoring for re-org
finalizedErroredTxs map[string]PendingTx // finalized and errored transactions held onto for status
broadcastedProcessedTxs map[string]pendingTx // broadcasted and processed transactions that may require retry and bumping
confirmedTxs map[string]pendingTx // transactions that require monitoring for re-org
finalizedErroredTxs map[string]pendingTx // finalized and errored transactions held onto for status

lock sync.RWMutex
}
@@ -83,20 +78,20 @@ func newPendingTxContext() *pendingTxContext {
cancelBy: map[string]context.CancelFunc{},
sigToID: map[solana.Signature]string{},

broadcastedProcessedTxs: map[string]PendingTx{},
confirmedTxs: map[string]PendingTx{},
finalizedErroredTxs: map[string]PendingTx{},
broadcastedProcessedTxs: map[string]pendingTx{},
confirmedTxs: map[string]pendingTx{},
finalizedErroredTxs: map[string]pendingTx{},
}
}

func (c *pendingTxContext) New(tx PendingTx, sig solana.Signature, cancel context.CancelFunc) error {
func (c *pendingTxContext) New(tx pendingTx, sig solana.Signature, cancel context.CancelFunc) error {
err := c.withReadLock(func() error {
// validate signature does not exist
if _, exists := c.sigToID[sig]; exists {
return ErrSigAlreadyExists
}
// validate id does not exist
if _, exists := c.broadcastedProcessedTxs[tx.UUID]; exists {
if _, exists := c.broadcastedProcessedTxs[tx.id]; exists {
return ErrIDAlreadyExists
}
return nil
@@ -110,18 +105,18 @@ func (c *pendingTxContext) New(tx PendingTx, sig solana.Signature, cancel contex
if _, exists := c.sigToID[sig]; exists {
return "", ErrSigAlreadyExists
}
if _, exists := c.broadcastedProcessedTxs[tx.UUID]; exists {
if _, exists := c.broadcastedProcessedTxs[tx.id]; exists {
return "", ErrIDAlreadyExists
}
// save cancel func
c.cancelBy[tx.UUID] = cancel
c.sigToID[sig] = tx.UUID
c.cancelBy[tx.id] = cancel
c.sigToID[sig] = tx.id
// add signature to tx
tx.signatures = append(tx.signatures, sig)
tx.createTs = time.Now()
tx.state = Broadcasted
// save to the broadcasted map since transaction was just broadcasted
c.broadcastedProcessedTxs[tx.UUID] = tx
c.broadcastedProcessedTxs[tx.id] = tx
return "", nil
})
return err
@@ -190,7 +185,7 @@ func (c *pendingTxContext) Remove(sig solana.Signature) (id string, err error) {
if !sigExists {
return id, ErrSigDoesNotExist
}
var tx PendingTx
var tx pendingTx
if tempTx, exists := c.broadcastedProcessedTxs[id]; exists {
tx = tempTx
delete(c.broadcastedProcessedTxs, id)
@@ -221,10 +216,10 @@ func (c *pendingTxContext) ListAll() []solana.Signature {
}

// ListAllExpiredBroadcastedTxs returns all the expired broadcasted that are in broadcasted state and have expired for given slot height.
func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currHeight uint64) []PendingTx {
func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currHeight uint64) []pendingTx {
c.lock.RLock()
defer c.lock.RUnlock()
broadcastedTxes := make([]PendingTx, 0, len(c.broadcastedProcessedTxs)) // worst case, all of them
broadcastedTxes := make([]pendingTx, 0, len(c.broadcastedProcessedTxs)) // worst case, all of them
for _, tx := range c.broadcastedProcessedTxs {
if tx.state == Broadcasted && tx.lastValidBlockHeight < currHeight {
broadcastedTxes = append(broadcastedTxes, tx)
@@ -365,7 +360,7 @@ func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout ti
if !exists {
return id, ErrSigDoesNotExist
}
var tx, tempTx PendingTx
var tx, tempTx pendingTx
var broadcastedExists, confirmedExists bool
if tempTx, broadcastedExists = c.broadcastedProcessedTxs[id]; broadcastedExists {
tx = tempTx
@@ -472,7 +467,7 @@ func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.D
if !exists {
return "", ErrSigDoesNotExist
}
var tx, tempTx PendingTx
var tx, tempTx pendingTx
var broadcastedExists, confirmedExists bool
if tempTx, broadcastedExists = c.broadcastedProcessedTxs[id]; broadcastedExists {
tx = tempTx
@@ -591,7 +586,7 @@ func newPendingTxContextWithProm(id string) *pendingTxContextWithProm {
}
}

func (c *pendingTxContextWithProm) New(msg PendingTx, sig solana.Signature, cancel context.CancelFunc) error {
func (c *pendingTxContextWithProm) New(msg pendingTx, sig solana.Signature, cancel context.CancelFunc) error {
return c.pendingTx.New(msg, sig, cancel)
}

@@ -621,7 +616,7 @@ func (c *pendingTxContextWithProm) ListAll() []solana.Signature {
return sigs
}

func (c *pendingTxContextWithProm) ListAllExpiredBroadcastedTxs(currHeight uint64) []PendingTx {
func (c *pendingTxContextWithProm) ListAllExpiredBroadcastedTxs(currHeight uint64) []pendingTx {
return c.pendingTx.ListAllExpiredBroadcastedTxs(currHeight)
}

290 changes: 145 additions & 145 deletions pkg/solana/txm/pendingtx_test.go

Large diffs are not rendered by default.

159 changes: 64 additions & 95 deletions pkg/solana/txm/txm.go

Large diffs are not rendered by default.

43 changes: 22 additions & 21 deletions pkg/solana/txm/txm_internal_test.go
Original file line number Diff line number Diff line change
@@ -209,7 +209,7 @@ func TestTxm(t *testing.T) {

// send tx
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait()

// no transactions stored inflight txs list
@@ -245,7 +245,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed

// no transactions stored inflight txs list
@@ -276,7 +276,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, waitDuration, txm, prom, empty) // txs cleared quickly

@@ -312,7 +312,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, waitDuration, txm, prom, empty) // txs cleared after timeout

@@ -352,7 +352,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, waitDuration, txm, prom, empty) // txs cleared after timeout

@@ -402,7 +402,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, waitDuration, txm, prom, empty) // txs cleared after timeout

@@ -444,7 +444,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, waitDuration, txm, prom, empty) // txs cleared after timeout

@@ -489,7 +489,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, waitDuration, txm, prom, empty) // inflight txs cleared after timeout

@@ -541,7 +541,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, waitDuration, txm, prom, empty) // inflight txs cleared after timeout

@@ -579,7 +579,7 @@ func TestTxm(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, waitDuration, txm, prom, empty) // inflight txs cleared after timeout

@@ -625,7 +625,7 @@ func TestTxm(t *testing.T) {

// send tx
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait()

// no transactions stored inflight txs list
@@ -679,7 +679,7 @@ func TestTxm(t *testing.T) {

// send tx - with disabled fee bumping
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}, SetFeeBumpPeriod(0)))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, SetFeeBumpPeriod(0)))
wg.Wait()

// no transactions stored inflight txs list
@@ -731,7 +731,7 @@ func TestTxm(t *testing.T) {

// send tx - with disabled fee bumping and disabled compute unit limit
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}, SetFeeBumpPeriod(0), SetComputeUnitLimit(0)))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, SetFeeBumpPeriod(0), SetComputeUnitLimit(0)))
wg.Wait()

// no transactions stored inflight txs list
@@ -846,7 +846,7 @@ func TestTxm_disabled_confirm_timeout_with_retention(t *testing.T) {

// tx should be able to queue
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait() // wait to be picked up and processed
waitFor(t, 5*time.Second, txm, prom, empty) // inflight txs cleared after timeout

@@ -1057,7 +1057,7 @@ func TestTxm_compute_unit_limit_estimation(t *testing.T) {

// send tx
testTxID := uuid.New().String()
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: testTxID, AccountID: t.Name()}))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID))
wg.Wait()

// no transactions stored inflight txs list
@@ -1086,7 +1086,7 @@ func TestTxm_compute_unit_limit_estimation(t *testing.T) {
mc.On("SimulateTx", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("simulation failed")).Once()

// tx should NOT be able to queue
assert.Error(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, AccountID: t.Name()}))
assert.Error(t, txm.Enqueue(ctx, t.Name(), tx, nil))
})

t.Run("simulation_returns_error", func(t *testing.T) {
@@ -1102,7 +1102,7 @@ func TestTxm_compute_unit_limit_estimation(t *testing.T) {

txID := uuid.NewString()
// tx should NOT be able to queue
assert.Error(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, AccountID: t.Name()}))
assert.Error(t, txm.Enqueue(ctx, t.Name(), tx, nil))
})
}

@@ -1165,7 +1165,7 @@ func TestTxm_Enqueue(t *testing.T) {
loader := utils.NewLazyLoad(func() (client.ReaderWriter, error) { return mc, nil })
txm := NewTxm("enqueue_test", loader, nil, cfg, mkey, lggr)

require.ErrorContains(t, txm.Enqueue(ctx, &PendingTx{AccountID: "txmUnstarted"}), "not started")
require.ErrorContains(t, txm.Enqueue(ctx, "txmUnstarted", &solana.Transaction{}, nil), "not started")
require.NoError(t, txm.Start(ctx))
t.Cleanup(func() { require.NoError(t, txm.Close()) })

@@ -1176,16 +1176,17 @@ func TestTxm_Enqueue(t *testing.T) {
}{
{"success", tx, false},
{"invalid_key", invalidTx, true},
{"nil_pointer", nil, true},
{"empty_tx", &solana.Transaction{}, true},
}

for _, run := range txs {
t.Run(run.name, func(t *testing.T) {
if !run.fail {
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *run.tx, AccountID: run.name}))
assert.NoError(t, txm.Enqueue(ctx, run.name, run.tx, nil))
return
}
assert.Error(t, txm.Enqueue(ctx, &PendingTx{Tx: *run.tx, AccountID: run.name}))
assert.Error(t, txm.Enqueue(ctx, run.name, run.tx, nil))
})
}
}
@@ -1301,7 +1302,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
// Enqueue the transaction
tx, _ := getTx(t, 0, mkey)
expiredTxID := "test"
assert.NoError(t, txm.Enqueue(ctx, &PendingTx{Tx: *tx, UUID: expiredTxID, AccountID: t.Name()}, SetTimeout(10*time.Second)))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &expiredTxID), SetTimeout(10*time.Second))
wg.Wait() // Wait for the transaction to be finalized

// Check that transaction for expiredTxID is not stored in memory
22 changes: 12 additions & 10 deletions pkg/solana/txm/txm_load_test.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import (

solanaClient "github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
keyMocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/mocks"

relayconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
@@ -71,7 +72,7 @@ func TestTxm_Integration(t *testing.T) {
client, err := solanaClient.NewClient(url, cfg, 2*time.Second, lggr)
require.NoError(t, err)
loader := utils.NewLazyLoad(func() (solanaClient.ReaderWriter, error) { return client, nil })
txm := NewTxm("localnet", loader, nil, cfg, mkey, lggr)
txm := txm.NewTxm("localnet", loader, nil, cfg, mkey, lggr)

// track initial balance
initBal, err := client.Balance(ctx, pubKey)
@@ -83,8 +84,9 @@ func TestTxm_Integration(t *testing.T) {
// already started
assert.Error(t, txm.Start(ctx))

createMsgWithTx := func(accountID string, signer solana.PublicKey, sender solana.PublicKey, receiver solana.PublicKey, amt uint64) *PendingTx {
createTx := func(signer solana.PublicKey, sender solana.PublicKey, receiver solana.PublicKey, amt uint64) *solana.Transaction {
// create transfer tx
hash, err := client.LatestBlockhash(ctx)
assert.NoError(t, err)
tx, txErr := solana.NewTransaction(
[]solana.Instruction{
@@ -94,24 +96,24 @@ func TestTxm_Integration(t *testing.T) {
receiver,
).Build(),
},
solana.Hash{},
hash.Value.Blockhash,
solana.TransactionPayer(signer),
)
require.NoError(t, txErr)
return &PendingTx{Tx: *tx, AccountID: accountID}
return tx
}

// enqueue txs (must pass to move on to load test)
require.NoError(t, txm.Enqueue(ctx, createMsgWithTx("test_success_0", pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)))
require.Error(t, txm.Enqueue(ctx, createMsgWithTx("test_invalidSigner", pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) // cannot sign tx before enqueuing
require.NoError(t, txm.Enqueue(ctx, createMsgWithTx("test_invalidReceiver", pubKey, pubKey, solana.PublicKey{}, solana.LAMPORTS_PER_SOL)))
require.NoError(t, txm.Enqueue(ctx, "test_success_0", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
require.Error(t, txm.Enqueue(ctx, "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) // cannot sign tx before enqueuing
require.NoError(t, txm.Enqueue(ctx, "test_invalidReceiver", createTx(pubKey, pubKey, solana.PublicKey{}, solana.LAMPORTS_PER_SOL), nil))
time.Sleep(500 * time.Millisecond) // pause 0.5s for new blockhash
require.NoError(t, txm.Enqueue(ctx, createMsgWithTx("test_success_1", pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)))
require.NoError(t, txm.Enqueue(ctx, createMsgWithTx("test_txFail", pubKey, pubKey, pubKeyReceiver, 1000*solana.LAMPORTS_PER_SOL)))
require.NoError(t, txm.Enqueue(ctx, "test_success_1", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
require.NoError(t, txm.Enqueue(ctx, "test_txFail", createTx(pubKey, pubKey, pubKeyReceiver, 1000*solana.LAMPORTS_PER_SOL), nil))

// load test: try to overload txs, confirm, or simulation
for i := 0; i < 1000; i++ {
assert.NoError(t, txm.Enqueue(ctx, createMsgWithTx(fmt.Sprintf("load_%d", i), loadTestKey.PublicKey(), loadTestKey.PublicKey(), loadTestKey.PublicKey(), uint64(i))))
assert.NoError(t, txm.Enqueue(ctx, fmt.Sprintf("load_%d", i), createTx(loadTestKey.PublicKey(), loadTestKey.PublicKey(), loadTestKey.PublicKey(), uint64(i)), nil))
time.Sleep(10 * time.Millisecond) // ~100 txs per second (note: have run 5ms delays for ~200tx/s successfully)
}

40 changes: 20 additions & 20 deletions pkg/solana/txm/txm_race_test.go
Original file line number Diff line number Diff line change
@@ -28,10 +28,10 @@ import (
"github.com/stretchr/testify/require"
)

func NewTestMsg() (msg PendingTx) {
func NewTestMsg() (msg pendingTx) {
tx := solanaGo.Transaction{}
tx.Message.AccountKeys = append(tx.Message.AccountKeys, solanaGo.PublicKey{})
msg.Tx = tx
msg.tx = tx
return msg
}

@@ -222,33 +222,33 @@ func TestTxm_SendWithRetry_Race(t *testing.T) {
client := clientmocks.NewReaderWriter(t)
// client mock - first tx is always successful
msg0 := NewTestMsg()
require.NoError(t, fees.SetComputeUnitPrice(&msg0.Tx, 0))
require.NoError(t, fees.SetComputeUnitLimit(&msg0.Tx, 200_000))
msg0.Tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg0.Tx).Return(solanaGo.Signature{1}, nil)
require.NoError(t, fees.SetComputeUnitPrice(&msg0.tx, 0))
require.NoError(t, fees.SetComputeUnitLimit(&msg0.tx, 200_000))
msg0.tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg0.tx).Return(solanaGo.Signature{1}, nil)

// init bump tx fails, rebroadcast is successful
msg1 := NewTestMsg()
require.NoError(t, fees.SetComputeUnitPrice(&msg1.Tx, 1))
require.NoError(t, fees.SetComputeUnitLimit(&msg1.Tx, 200_000))
msg1.Tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg1.Tx).Return(solanaGo.Signature{}, fmt.Errorf("BUMP FAILED")).Once()
client.On("SendTx", mock.Anything, &msg1.Tx).Return(solanaGo.Signature{2}, nil)
require.NoError(t, fees.SetComputeUnitPrice(&msg1.tx, 1))
require.NoError(t, fees.SetComputeUnitLimit(&msg1.tx, 200_000))
msg1.tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg1.tx).Return(solanaGo.Signature{}, fmt.Errorf("BUMP FAILED")).Once()
client.On("SendTx", mock.Anything, &msg1.tx).Return(solanaGo.Signature{2}, nil)

// init bump tx success, rebroadcast fails
msg2 := NewTestMsg()
require.NoError(t, fees.SetComputeUnitPrice(&msg2.Tx, 2))
require.NoError(t, fees.SetComputeUnitLimit(&msg2.Tx, 200_000))
msg2.Tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg2.Tx).Return(solanaGo.Signature{3}, nil).Once()
client.On("SendTx", mock.Anything, &msg2.Tx).Return(solanaGo.Signature{}, fmt.Errorf("REBROADCAST FAILED"))
require.NoError(t, fees.SetComputeUnitPrice(&msg2.tx, 2))
require.NoError(t, fees.SetComputeUnitLimit(&msg2.tx, 200_000))
msg2.tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg2.tx).Return(solanaGo.Signature{3}, nil).Once()
client.On("SendTx", mock.Anything, &msg2.tx).Return(solanaGo.Signature{}, fmt.Errorf("REBROADCAST FAILED"))

// always successful
msg3 := NewTestMsg()
require.NoError(t, fees.SetComputeUnitPrice(&msg3.Tx, 4))
require.NoError(t, fees.SetComputeUnitLimit(&msg3.Tx, 200_000))
msg3.Tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg3.Tx).Return(solanaGo.Signature{4}, nil)
require.NoError(t, fees.SetComputeUnitPrice(&msg3.tx, 4))
require.NoError(t, fees.SetComputeUnitLimit(&msg3.tx, 200_000))
msg3.tx.Signatures = make([]solanaGo.Signature, 1)
client.On("SendTx", mock.Anything, &msg3.tx).Return(solanaGo.Signature{4}, nil)
client.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: uint64(2000),