diff --git a/multinode/transaction_sender.go b/multinode/transaction_sender.go index 8ccc51e..a2cd571 100644 --- a/multinode/transaction_sender.go +++ b/multinode/transaction_sender.go @@ -23,25 +23,26 @@ var ( }, []string{"network", "chainId", "invariant"}) ) -type SendTxResult interface { - Code() SendTxReturnCode - Error() error +type sendTxResult[RESULT any] struct { + res RESULT + code SendTxReturnCode + error error } const sendTxQuorum = 0.7 // SendTxRPCClient - defines interface of an RPC used by TransactionSender to broadcast transaction -type SendTxRPCClient[TX any, RESULT SendTxResult] interface { +type SendTxRPCClient[TX any, RESULT any] interface { // SendTransaction errors returned should include name or other unique identifier of the RPC - SendTransaction(ctx context.Context, tx TX) RESULT + SendTransaction(ctx context.Context, tx TX) (RESULT, SendTxReturnCode, error) } -func NewTransactionSender[TX any, RESULT SendTxResult, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]]( +func NewTransactionSender[TX any, RESULT any, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]]( lggr logger.Logger, chainID CHAIN_ID, chainFamily string, multiNode *MultiNode[CHAIN_ID, RPC], - newResult func(err error) RESULT, + classifyErr func(err error) SendTxReturnCode, sendTxSoftTimeout time.Duration, ) *TransactionSender[TX, RESULT, CHAIN_ID, RPC] { if sendTxSoftTimeout == 0 { @@ -52,19 +53,19 @@ func NewTransactionSender[TX any, RESULT SendTxResult, CHAIN_ID ID, RPC SendTxRP chainFamily: chainFamily, lggr: logger.Sugared(lggr).Named("TransactionSender").With("chainID", chainID.String()), multiNode: multiNode, - newResult: newResult, + classifyErr: classifyErr, sendTxSoftTimeout: sendTxSoftTimeout, chStop: make(services.StopChan), } } -type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]] struct { +type TransactionSender[TX any, RESULT any, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]] struct { services.StateMachine chainID CHAIN_ID chainFamily string lggr logger.SugaredLogger multiNode *MultiNode[CHAIN_ID, RPC] - newResult func(err error) RESULT + classifyErr func(err error) SendTxReturnCode sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation wg sync.WaitGroup // waits for all reporting goroutines to finish @@ -93,17 +94,16 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Name() string { // * If there is at least one terminal error - returns terminal error // * If there is both success and terminal error - returns success and reports invariant violation // * Otherwise, returns any (effectively random) of the errors. -func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT { - var result RESULT +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) (result RESULT, code SendTxReturnCode, err error) { ctx, cancel := txSender.chStop.Ctx(ctx) defer cancel() if !txSender.IfStarted(func() { - txResults := make(chan RESULT) - txResultsToReport := make(chan RESULT) + txResults := make(chan sendTxResult[RESULT]) + txResultsToReport := make(chan sendTxResult[RESULT]) primaryNodeWg := sync.WaitGroup{} healthyNodesNum := 0 - err := txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) { + err = txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) { if isSendOnly { txSender.wg.Add(1) go func(ctx context.Context) { @@ -151,45 +151,44 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct close(txResults) }() - if err != nil { - result = txSender.newResult(err) - return + if err == nil && healthyNodesNum == 0 { + err = ErrNodeError } - - if healthyNodesNum == 0 { - result = txSender.newResult(ErrNodeError) + if err != nil { + code = txSender.classifyErr(err) return } txSender.wg.Add(1) go txSender.reportSendTxAnomalies(tx, txResultsToReport) - result = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) + result, code, err = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) }) { - result = txSender.newResult(errors.New("TransactionSender not started")) + err = errors.New("TransactionSender not started") + code = txSender.classifyErr(err) } - return result + return } -func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) sendTxResult[RESULT] { // broadcast is a background job, so always detach from caller's cancellation ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx)) defer cancel() - result := rpc.SendTransaction(ctx, tx) - txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.Error()) - if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil { - txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.Error()) + res, code, err := rpc.SendTransaction(ctx, tx) + txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", err) + if !slices.Contains(sendTxSuccessfulCodes, code) && ctx.Err() == nil { + txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", err) } - return result + return sendTxResult[RESULT]{res: res, code: code, error: err} } -func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan RESULT) { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult[RESULT]) { defer txSender.wg.Done() resultsByCode := sendTxResults[RESULT]{} // txResults eventually will be closed for txResult := range txResults { - resultsByCode[txResult.Code()] = append(resultsByCode[txResult.Code()], txResult) + resultsByCode[txResult.code] = append(resultsByCode[txResult.code], txResult) } select { @@ -201,16 +200,16 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomal default: } - _, criticalErr := aggregateTxResults[RESULT](resultsByCode) + _, criticalErr := aggregateTxResults(resultsByCode) if criticalErr != nil { txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr) PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc() } } -type sendTxResults[RESULT any] map[SendTxReturnCode][]RESULT +type sendTxResults[RESULT any] map[SendTxReturnCode][]sendTxResult[RESULT] -func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result RESULT, criticalErr error) { +func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result sendTxResult[RESULT], criticalErr error) { severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) if hasSuccess { @@ -239,7 +238,8 @@ func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result return result, criticalErr } -func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan RESULT) RESULT { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan sendTxResult[RESULT]) (RESULT, SendTxReturnCode, error) { + var emptyResult RESULT requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum)) errorsByCode := sendTxResults[RESULT]{} var softTimeoutChan <-chan time.Time @@ -249,11 +249,12 @@ loop: select { case <-ctx.Done(): txSender.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode) - return txSender.newResult(ctx.Err()) + err := ctx.Err() + return emptyResult, txSender.classifyErr(err), err case r := <-txResults: - errorsByCode[r.Code()] = append(errorsByCode[r.Code()], r) + errorsByCode[r.code] = append(errorsByCode[r.code], r) resultsCount++ - if slices.Contains(sendTxSuccessfulCodes, r.Code()) || resultsCount >= requiredResults { + if slices.Contains(sendTxSuccessfulCodes, r.code) || resultsCount >= requiredResults { break loop } case <-softTimeoutChan: @@ -273,7 +274,7 @@ loop: // ignore critical error as it's reported in reportSendTxAnomalies result, _ := aggregateTxResults(errorsByCode) txSender.lggr.Debugw("Collected results", "errorsByCode", errorsByCode, "result", result) - return result + return result.res, result.code, result.error } func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start(ctx context.Context) error { diff --git a/multinode/transaction_sender_test.go b/multinode/transaction_sender_test.go index f44b67a..bc51f6f 100644 --- a/multinode/transaction_sender_test.go +++ b/multinode/transaction_sender_test.go @@ -16,7 +16,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) -type TestSendTxRPCClient SendTxRPCClient[any, *sendTxResult] +type TestSendTxRPCClient SendTxRPCClient[any, any] type sendTxMultiNode struct { *MultiNode[ID, TestSendTxRPCClient] @@ -27,39 +27,17 @@ type sendTxRPC struct { sendTxErr error } -type sendTxResult struct { - err error - code SendTxReturnCode -} - -var _ SendTxResult = (*sendTxResult)(nil) - -func NewSendTxResult(err error) *sendTxResult { - result := &sendTxResult{ - err: err, - } - return result -} - -func (r *sendTxResult) Error() error { - return r.err -} - -func (r *sendTxResult) Code() SendTxReturnCode { - return r.code -} - var _ TestSendTxRPCClient = (*sendTxRPC)(nil) func newSendTxRPC(sendTxErr error, sendTxRun func(args mock.Arguments)) *sendTxRPC { return &sendTxRPC{sendTxErr: sendTxErr, sendTxRun: sendTxRun} } -func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) *sendTxResult { +func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) (any, SendTxReturnCode, error) { if rpc.sendTxRun != nil { rpc.sendTxRun(mock.Arguments{ctx}) } - return &sendTxResult{err: rpc.sendTxErr, code: classifySendTxError(nil, rpc.sendTxErr)} + return nil, classifySendTxError(nil, rpc.sendTxErr), rpc.sendTxErr } // newTestTransactionSender returns a sendTxMultiNode and TransactionSender. @@ -67,11 +45,11 @@ func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) *sendTxResult func newTestTransactionSender(t *testing.T, chainID ID, lggr logger.Logger, nodes []Node[ID, TestSendTxRPCClient], sendOnlyNodes []SendOnlyNode[ID, TestSendTxRPCClient], -) (*sendTxMultiNode, *TransactionSender[any, *sendTxResult, ID, TestSendTxRPCClient]) { +) (*sendTxMultiNode, *TransactionSender[any, any, ID, TestSendTxRPCClient]) { mn := sendTxMultiNode{NewMultiNode[ID, TestSendTxRPCClient]( lggr, NodeSelectionModeRoundRobin, 0, nodes, sendOnlyNodes, chainID, "chainFamily", 0)} - txSender := NewTransactionSender[any, *sendTxResult, ID, TestSendTxRPCClient](lggr, chainID, mn.chainFamily, mn.MultiNode, NewSendTxResult, tests.TestInterval) + txSender := NewTransactionSender[any, any, ID, TestSendTxRPCClient](lggr, chainID, mn.chainFamily, mn.MultiNode, func(err error) SendTxReturnCode { return 0 }, tests.TestInterval) servicetest.Run(t, txSender) return &mn, txSender } @@ -105,8 +83,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { t.Run("Fails if there is no nodes available", func(t *testing.T) { lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, RandomID(), lggr, nil, nil) - result := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, result.Error(), ErrNodeError.Error()) + _, _, err := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, err, ErrNodeError.Error()) }) t.Run("Transaction failure happy path", func(t *testing.T) { @@ -118,9 +96,9 @@ func TestTransactionSender_SendTransaction(t *testing.T) { []Node[ID, TestSendTxRPCClient]{mainNode}, []SendOnlyNode[ID, TestSendTxRPCClient]{newNode(t, errors.New("unexpected error"), nil)}) - result := txSender.SendTransaction(tests.Context(t), nil) - require.ErrorIs(t, result.Error(), expectedError) - require.Equal(t, Fatal, result.Code()) + _, code, err := txSender.SendTransaction(tests.Context(t), nil) + require.ErrorIs(t, err, expectedError) + require.Equal(t, Fatal, code) tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 2) }) @@ -133,9 +111,9 @@ func TestTransactionSender_SendTransaction(t *testing.T) { []Node[ID, TestSendTxRPCClient]{mainNode}, []SendOnlyNode[ID, TestSendTxRPCClient]{newNode(t, errors.New("unexpected error"), nil)}) - result := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, result.Error()) - require.Equal(t, Successful, result.Code()) + _, code, err := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, err) + require.Equal(t, Successful, code) tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 1) }) @@ -156,8 +134,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { requestContext, cancel := context.WithCancel(tests.Context(t)) cancel() - result := txSender.SendTransaction(requestContext, nil) - require.EqualError(t, result.Error(), "context canceled") + _, _, err := txSender.SendTransaction(requestContext, nil) + require.EqualError(t, err, "context canceled") }) t.Run("Soft timeout stops results collection", func(t *testing.T) { @@ -176,8 +154,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[ID, TestSendTxRPCClient]{fastNode, slowNode}, nil) - result := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, result.Error(), expectedError.Error()) + _, _, err := txSender.SendTransaction(tests.Context(t), nil) + require.EqualError(t, err, expectedError.Error()) }) t.Run("Returns success without waiting for the rest of the nodes", func(t *testing.T) { chainID := RandomID() @@ -198,9 +176,9 @@ func TestTransactionSender_SendTransaction(t *testing.T) { []Node[ID, TestSendTxRPCClient]{fastNode, slowNode}, []SendOnlyNode[ID, TestSendTxRPCClient]{slowSendOnly}) - result := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, result.Error()) - require.Equal(t, Successful, result.Code()) + _, code, err := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, err) + require.Equal(t, Successful, code) }) t.Run("Fails when multinode is closed", func(t *testing.T) { chainID := RandomID() @@ -228,8 +206,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { require.NoError(t, mn.Start(tests.Context(t))) require.NoError(t, mn.Close()) - result := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, result.Error(), "service is stopped") + _, _, err := txSender.SendTransaction(tests.Context(t), nil) + require.EqualError(t, err, "service is stopped") }) t.Run("Fails when closed", func(t *testing.T) { chainID := RandomID() @@ -246,11 +224,11 @@ func TestTransactionSender_SendTransaction(t *testing.T) { <-testContext.Done() }) - var txSender *TransactionSender[any, *sendTxResult, ID, TestSendTxRPCClient] + var txSender *TransactionSender[any, any, ID, TestSendTxRPCClient] t.Cleanup(func() { // after txSender.Close() - result := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, result.err, "TransactionSender not started") + _, _, err := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, err, "TransactionSender not started") }) _, txSender = newTestTransactionSender(t, chainID, logger.Test(t), @@ -268,8 +246,8 @@ func TestTransactionSender_SendTransaction(t *testing.T) { []Node[ID, TestSendTxRPCClient]{primary}, []SendOnlyNode[ID, TestSendTxRPCClient]{sendOnly}) - result := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, result.Error(), ErrNodeError.Error()) + _, _, err := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, err, ErrNodeError.Error()) }) t.Run("Transaction success even if one of the nodes is unhealthy", func(t *testing.T) { @@ -287,9 +265,9 @@ func TestTransactionSender_SendTransaction(t *testing.T) { []Node[ID, TestSendTxRPCClient]{mainNode, unhealthyNode}, []SendOnlyNode[ID, TestSendTxRPCClient]{unhealthySendOnlyNode}) - result := txSender.SendTransaction(tests.Context(t), nil) - require.NoError(t, result.Error()) - require.Equal(t, Successful, result.Code()) + _, code, err := txSender.SendTransaction(tests.Context(t), nil) + require.NoError(t, err) + require.Equal(t, Successful, code) }) t.Run("All background jobs stop even if RPC returns result after soft timeout", func(t *testing.T) { chainID := RandomID() @@ -305,9 +283,9 @@ func TestTransactionSender_SendTransaction(t *testing.T) { lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[ID, TestSendTxRPCClient]{fastNode, slowNode}, nil) - result := txSender.SendTransaction(sendTxContext, nil) + _, _, err := txSender.SendTransaction(sendTxContext, nil) sendTxCancel() - require.EqualError(t, result.Error(), expectedError.Error()) + require.EqualError(t, err, expectedError.Error()) // TxSender should stop all background go routines after SendTransaction is done and before test is done. // Otherwise, it signals that we have a goroutine leak. txSender.wg.Wait() @@ -326,62 +304,62 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { Name string ExpectedTxResult string ExpectedCriticalErr string - ResultsByCode sendTxResults[*sendTxResult] + ResultsByCode sendTxResults[any] }{ { Name: "Returns success and logs critical error on success and Fatal", ExpectedTxResult: "success", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxResults[*sendTxResult]{ - Successful: {NewSendTxResult(errors.New("success"))}, - Fatal: {NewSendTxResult(errors.New("fatal"))}, + ResultsByCode: sendTxResults[any]{ + Successful: {newSendTxResult(errors.New("success"))}, + Fatal: {newSendTxResult(errors.New("fatal"))}, }, }, { Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal", ExpectedTxResult: "tx_already_known", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxResults[*sendTxResult]{ - TransactionAlreadyKnown: {NewSendTxResult(errors.New("tx_already_known"))}, - Unsupported: {NewSendTxResult(errors.New("unsupported"))}, + ResultsByCode: sendTxResults[any]{ + TransactionAlreadyKnown: {newSendTxResult(errors.New("tx_already_known"))}, + Unsupported: {newSendTxResult(errors.New("unsupported"))}, }, }, { Name: "Prefers sever error to temporary", ExpectedTxResult: "underpriced", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults[*sendTxResult]{ - Retryable: {NewSendTxResult(errors.New("retryable"))}, - Underpriced: {NewSendTxResult(errors.New("underpriced"))}, + ResultsByCode: sendTxResults[any]{ + Retryable: {newSendTxResult(errors.New("retryable"))}, + Underpriced: {newSendTxResult(errors.New("underpriced"))}, }, }, { Name: "Returns temporary error", ExpectedTxResult: "retryable", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults[*sendTxResult]{ - Retryable: {NewSendTxResult(errors.New("retryable"))}, + ResultsByCode: sendTxResults[any]{ + Retryable: {newSendTxResult(errors.New("retryable"))}, }, }, { Name: "Insufficient funds is treated as error", ExpectedTxResult: "insufficientFunds", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults[*sendTxResult]{ - InsufficientFunds: {NewSendTxResult(errors.New("insufficientFunds"))}, + ResultsByCode: sendTxResults[any]{ + InsufficientFunds: {newSendTxResult(errors.New("insufficientFunds"))}, }, }, { Name: "Logs critical error on empty ResultsByCode", ExpectedCriticalErr: "expected at least one response on SendTransaction", - ResultsByCode: sendTxResults[*sendTxResult]{}, + ResultsByCode: sendTxResults[any]{}, }, { Name: "Zk terminally stuck", ExpectedTxResult: "not enough keccak counters to continue the execution", ExpectedCriticalErr: "", - ResultsByCode: sendTxResults[*sendTxResult]{ - TerminallyStuck: {NewSendTxResult(errors.New("not enough keccak counters to continue the execution"))}, + ResultsByCode: sendTxResults[any]{ + TerminallyStuck: {newSendTxResult(errors.New("not enough keccak counters to continue the execution"))}, }, }, } @@ -394,7 +372,7 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { t.Run(testCase.Name, func(t *testing.T) { txResult, err := aggregateTxResults(testCase.ResultsByCode) if testCase.ExpectedTxResult != "" { - require.EqualError(t, txResult.Error(), testCase.ExpectedTxResult) + require.EqualError(t, txResult.error, testCase.ExpectedTxResult) } logger.Sugared(logger.Test(t)).Info("Map: " + fmt.Sprint(testCase.ResultsByCode)) @@ -415,3 +393,7 @@ func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) { } assert.Empty(t, codesToCover, "all of the SendTxReturnCode must be covered by this test") } + +func newSendTxResult(err error) sendTxResult[any] { + return sendTxResult[any]{error: err} +}