Skip to content

Commit

Permalink
BCFR-934/remove-finality-depth-as-default-value-for-minConfirmation-a…
Browse files Browse the repository at this point in the history
…nd-fix-inconsistency (#14509)

* remove default finality depth, and allow 0 to be provided by user

* update changeset

* fix lint

* restore > 0 check

* wait for callback unless explicit zero

* support finality callbacks

* update changeset

* update changeset

* feedback

---------

Co-authored-by: Jordan Krage <[email protected]>
  • Loading branch information
huangzhen1997 and jmank88 authored Oct 9, 2024
1 parent 4c3e7ec commit dbd42db
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 63 deletions.
9 changes: 9 additions & 0 deletions .changeset/brave-ads-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"chainlink": patch
---

Remove finality depth as the default value for minConfirmation for tx jobs.
Update the sql query for fetching pending callback transactions:
if minConfirmation is not null, we check difference if the current block - tx block > minConfirmation
else we check if the tx block is <= finalizedBlock
#updated
6 changes: 3 additions & 3 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro

if ec.resumeCallback != nil {
mark = time.Now()
if err := ec.ResumePendingTaskRuns(ctx, head); err != nil {
if err := ec.ResumePendingTaskRuns(ctx, head.BlockNumber(), latestFinalizedHead.BlockNumber()); err != nil {
return fmt.Errorf("ResumePendingTaskRuns failed: %w", err)
}

Expand Down Expand Up @@ -1259,8 +1259,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen
}

// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID)
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, latest, finalized int64) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, latest, finalized, ec.chainID)

if err != nil {
return err
Expand Down
31 changes: 16 additions & 15 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TxStore[
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]

// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3055,7 +3055,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
// It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.NoError(t, err)
})

Expand All @@ -3073,7 +3073,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.NoError(t, err)
})

Expand Down Expand Up @@ -3101,7 +3101,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3155,7 +3155,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3192,7 +3192,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.Error(t, err)

// Retrieve Tx to check if callback completed flag was left unchanged
Expand Down
10 changes: 7 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e
}

// Find confirmed txes requiring callback but have not yet been signaled
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
var rs []dbReceiptPlus

var cancel context.CancelFunc
Expand All @@ -1066,8 +1066,12 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64
INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE
AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2
`, blockNum, chainID.String())
AND (
(evm.txes.min_confirmations IS NOT NULL AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations))
OR (evm.txes.min_confirmations IS NULL AND evm.receipts.block_number <= $2)
)
AND evm.txes.evm_chain_id = $3
`, latest, finalized, chainID.String())
if err != nil {
return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err)
}
Expand Down
24 changes: 20 additions & 4 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)
attempt1 := etx1.TxAttempts[0]
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash)
etxBlockNum := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash).BlockNumber
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID)

// Callback to pipeline service completed. Should be ignored
Expand Down Expand Up @@ -685,10 +685,26 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID())
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Len(t, receiptsPlus, 1)
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}

// Clear min_confirmations
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = NULL WHERE id = $1`, etx1.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Empty(t, receiptsPlus)

// Search evm.txes table for tx requiring callback, with block 1 finalized
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, etxBlockNum, ethClient.ConfiguredChainID())
require.NoError(t, err)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}
}

func Test_FindTxWithIdempotencyKey(t *testing.T) {
Expand Down
31 changes: 16 additions & 15 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 15 additions & 17 deletions core/services/pipeline/task.eth_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func (t *ETHTxTask) getEvmChainID() string {
return t.EVMChainID
}

func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) {
func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo) {
var chainID StringParam
err := errors.Wrap(ResolveParam(&chainID, From(VarExpr(t.getEvmChainID(), vars), NonemptyString(t.getEvmChainID()), "")), "evmChainID")
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}

chain, err := t.legacyChains.Get(string(chainID))
Expand All @@ -81,7 +81,7 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
txManager := chain.TxManager()
_, err = CheckInputs(inputs, -1, -1, 0)
if err != nil {
return Result{Error: errors.Wrap(err, "task inputs")}, runInfo
return Result{Error: errors.Wrap(err, "task inputs")}, RunInfo{}
}

maximumGasLimit := SelectGasLimit(cfg.GasEstimator(), t.jobType, t.specGasLimit)
Expand All @@ -107,25 +107,20 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
errors.Wrap(ResolveParam(&failOnRevert, From(NonemptyString(t.FailOnRevert), false)), "failOnRevert"),
)
if err != nil {
return Result{Error: err}, runInfo
}
var minOutgoingConfirmations uint64
if min, isSet := maybeMinConfirmations.Uint64(); isSet {
minOutgoingConfirmations = min
} else {
minOutgoingConfirmations = uint64(cfg.FinalityDepth())
return Result{Error: err}, RunInfo{}
}
minOutgoingConfirmations, isMinConfirmationSet := maybeMinConfirmations.Uint64()

txMeta, err := decodeMeta(txMetaMap)
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}
txMeta.FailOnRevert = null.BoolFrom(bool(failOnRevert))
setJobIDOnMeta(lggr, vars, txMeta)

transmitChecker, err := decodeTransmitChecker(transmitCheckerMap)
if err != nil {
return Result{Error: err}, runInfo
return Result{Error: err}, RunInfo{}
}

fromAddr, err := t.keyStore.GetRoundRobinAddress(ctx, chain.ID(), fromAddrs...)
Expand Down Expand Up @@ -159,8 +154,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
SignalCallback: true,
}

if minOutgoingConfirmations > 0 {
// Store the task run ID, so we can resume the pipeline when tx is confirmed
if !isMinConfirmationSet {
// Store the task run ID, so we can resume the pipeline when tx is finalized
txRequest.PipelineTaskRunID = &t.uuid
} else if minOutgoingConfirmations > 0 {
// Store the task run ID, so we can resume the pipeline after minOutgoingConfirmations
txRequest.PipelineTaskRunID = &t.uuid
txRequest.MinConfirmations = clnull.Uint32From(uint32(minOutgoingConfirmations))
}
Expand All @@ -170,11 +168,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu
return Result{Error: errors.Wrapf(ErrTaskRunFailed, "while creating transaction: %v", err)}, retryableRunInfo()
}

if minOutgoingConfirmations > 0 {
return Result{}, pendingRunInfo()
if txRequest.PipelineTaskRunID != nil {
return Result{}, RunInfo{IsPending: true}
}

return Result{Value: nil}, runInfo
return Result{}, RunInfo{}
}

func decodeMeta(metaMap MapParam) (*txmgr.TxMeta, error) {
Expand Down

0 comments on commit dbd42db

Please sign in to comment.