From 2fa9e691679c83d1c36f47eb7a28bf629093991b Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 28 Nov 2023 12:37:29 +0000 Subject: [PATCH 01/14] Switch keepers to use the default contract transmitter Pass the gas limit into the transmitter constructor so we can specify the automation gas limit Update tests --- core/services/ocr2/delegate.go | 4 +- .../plugins/ocr2keeper/integration_21_test.go | 50 +---------------- .../plugins/ocr2keeper/integration_test.go | 19 ------- core/services/ocr2/plugins/ocr2keeper/util.go | 11 ++-- core/services/relay/evm/evm.go | 56 +------------------ core/services/relay/evm/ocr2keeper.go | 27 ++++----- core/services/relay/evm/ocr2vrf.go | 4 +- 7 files changed, 27 insertions(+), 144 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 16f02282afb..0cd2b784df1 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1084,7 +1084,7 @@ func (d *Delegate) newServicesOCR2Keepers21( return nil, fmt.Errorf("keeper2 services: failed to get chain %s: %w", rid.ChainID, err2) } - keeperProvider, services, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, chain, d.pipelineRunner, mc, kb, d.cfg.Database()) + keeperProvider, services, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, chain, d.pipelineRunner, mc, kb, d.cfg.Database(), d.ethKs) if err2 != nil { return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers") } @@ -1201,7 +1201,7 @@ func (d *Delegate) newServicesOCR2Keepers20( return nil, fmt.Errorf("keepers2.0 services: failed to get chain (%s): %w", rid.ChainID, err2) } - keeperProvider, rgstry, encoder, logProvider, err2 := ocr2keeper.EVMDependencies20(jb, d.db, lggr, chain, d.pipelineRunner) + keeperProvider, rgstry, encoder, logProvider, err2 := ocr2keeper.EVMDependencies20(jb, d.db, lggr, chain, d.pipelineRunner, d.ethKs) if err2 != nil { return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers") } diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index c2b6612f664..f2bb88591c7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -53,8 +53,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" ) @@ -117,7 +115,7 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) { require.NoError(t, err) registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) - nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve) + setupNodes(t, nodeKeys, registry, backend, steve) <-time.After(time.Second * 5) @@ -160,8 +158,6 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) { } g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1)) - checkPipelineRuns(t, nodes, 1) - // change payload _, err = upkeepContract.SetBytesToSend(carrol, payload2) require.NoError(t, err) @@ -204,7 +200,7 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { require.NoError(t, err) registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) - nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve) + setupNodes(t, nodeKeys, registry, backend, steve) upkeeps := 1 _, err = linkToken.Transfer(sergey, carrol.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeeps+1)))) @@ -228,8 +224,6 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) done() - runs := checkPipelineRuns(t, nodes, 1) - t.Run("recover logs", func(t *testing.T) { addr, contract := addrs[0], contracts[0] @@ -252,11 +246,6 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { backend.Commit() time.Sleep(time.Millisecond * 10) } - t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks) - - expectedPostRecover := runs + emits - waitPipelineRuns(t, nodes, expectedPostRecover, testutils.WaitTimeout(t), cltest.DBPollingInterval) - }) } @@ -296,7 +285,7 @@ func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) { registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr) - nodes, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner) + _, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner) const upkeepCount = 10 const mercuryFailCount = upkeepCount * 3 * 2 @@ -374,39 +363,6 @@ func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) { g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue()) done() - - _ = checkPipelineRuns(t, nodes, 1*len(nodes)) // TODO: TBD -} - -func waitPipelineRuns(t *testing.T, nodes []Node, n int, timeout, interval time.Duration) { - ctx, cancel := context.WithTimeout(testutils.Context(t), timeout) - defer cancel() - var allRuns []pipeline.Run - for len(allRuns) < n && ctx.Err() == nil { - allRuns = []pipeline.Run{} - for _, node := range nodes { - runs, err := node.App.PipelineORM().GetAllRuns() - require.NoError(t, err) - allRuns = append(allRuns, runs...) - } - time.Sleep(interval) - } - runs := len(allRuns) - t.Logf("found %d pipeline runs", runs) - require.GreaterOrEqual(t, runs, n) -} - -func checkPipelineRuns(t *testing.T, nodes []Node, n int) int { - var allRuns []pipeline.Run - for _, node := range nodes { - runs, err2 := node.App.PipelineORM().GetAllRuns() - require.NoError(t, err2) - allRuns = append(allRuns, runs...) - } - runs := len(allRuns) - t.Logf("found %d pipeline runs", runs) - require.GreaterOrEqual(t, runs, n) - return runs } func emitEvents(ctx context.Context, t *testing.T, n int, contracts []*log_upkeep_counter_wrapper.LogUpkeepCounter, carrol *bind.TransactOpts, afterEmit func()) { diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_test.go index 58c1e38e017..d48ca984f6b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_test.go @@ -57,7 +57,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate" "github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -411,15 +410,6 @@ func TestIntegration_KeeperPluginBasic(t *testing.T) { } g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1)) - // check pipeline runs - var allRuns []pipeline.Run - for _, node := range nodes { - runs, err2 := node.App.PipelineORM().GetAllRuns() - require.NoError(t, err2) - allRuns = append(allRuns, runs...) - } - require.GreaterOrEqual(t, len(allRuns), 1) - // change payload _, err = upkeepContract.SetBytesToSend(carrol, payload2) require.NoError(t, err) @@ -683,15 +673,6 @@ func TestIntegration_KeeperPluginForwarderEnabled(t *testing.T) { } g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1)) - // check pipeline runs - var allRuns []pipeline.Run - for _, node := range nodes { - runs, err2 := node.App.PipelineORM().GetAllRuns() - require.NoError(t, err2) - allRuns = append(allRuns, runs...) - } - require.GreaterOrEqual(t, len(allRuns), 1) - // change payload _, err = upkeepContract.SetBytesToSend(carrol, payload2) require.NoError(t, err) diff --git a/core/services/ocr2/plugins/ocr2keeper/util.go b/core/services/ocr2/plugins/ocr2keeper/util.go index 76e5bb6e00e..8bbb9fbc534 100644 --- a/core/services/ocr2/plugins/ocr2keeper/util.go +++ b/core/services/ocr2/plugins/ocr2keeper/util.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models" evmregistry20 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v20" @@ -43,9 +44,9 @@ var ( ErrNoChainFromSpec = fmt.Errorf("could not create chain from spec") ) -func EVMProvider(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, spec job.Job, pr pipeline.Runner) (evmrelay.OCR2KeeperProvider, error) { +func EVMProvider(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, spec job.Job, pr pipeline.Runner, ethKeystore keystore.Eth) (evmrelay.OCR2KeeperProvider, error) { oSpec := spec.OCR2OracleSpec - ocr2keeperRelayer := evmrelay.NewOCR2KeeperRelayer(db, chain, pr, spec, lggr.Named("OCR2KeeperRelayer")) + ocr2keeperRelayer := evmrelay.NewOCR2KeeperRelayer(db, chain, lggr.Named("OCR2KeeperRelayer"), ethKeystore) keeperProvider, err := ocr2keeperRelayer.NewOCR2KeeperProvider( types.RelayArgs{ @@ -72,6 +73,7 @@ func EVMDependencies20( lggr logger.Logger, chain legacyevm.Chain, pr pipeline.Runner, + ethKeystore keystore.Eth, ) (evmrelay.OCR2KeeperProvider, *evmregistry20.EvmRegistry, Encoder20, *evmregistry20.LogProvider, error) { var err error @@ -79,7 +81,7 @@ func EVMDependencies20( var registry *evmregistry20.EvmRegistry // the provider will be returned as a dependency - if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr); err != nil { + if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr, ethKeystore); err != nil { return nil, nil, nil, nil, err } @@ -116,13 +118,14 @@ func EVMDependencies21( mc *models.MercuryCredentials, keyring ocrtypes.OnchainKeyring, dbCfg pg.QConfig, + ethKeystore keystore.Eth, ) (evmrelay.OCR2KeeperProvider, evmregistry21.AutomationServices, error) { var err error var keeperProvider evmrelay.OCR2KeeperProvider oSpec := spec.OCR2OracleSpec // the provider will be returned as a dependency - if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr); err != nil { + if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr, ethKeystore); err != nil { return nil, nil, err } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index aea704adacf..ad6eadeabac 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -27,13 +27,11 @@ import ( txm "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" mercuryconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pg" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/functions" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" @@ -373,7 +371,7 @@ func newConfigProvider(lggr logger.Logger, chain legacyevm.Chain, opts *types.Re return newConfigWatcher(lggr, aggregatorAddress, contractABI, offchainConfigDigester, cp, chain, relayConfig.FromBlock, opts.New), nil } -func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, configWatcher *configWatcher, ethKeystore keystore.Eth) (*contractTransmitter, error) { +func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, configWatcher *configWatcher, ethKeystore keystore.Eth, gasLimit uint32) (*contractTransmitter, error) { var relayConfig types.RelayConfig if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil { return nil, err @@ -410,7 +408,6 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra checker.CheckerType = txm.TransmitCheckerTypeSimulate } - gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() ocr2Limit := configWatcher.chain.Config().EVM().GasEstimator().LimitJobType().OCR2() if ocr2Limit != nil { gasLimit = *ocr2Limit @@ -442,55 +439,6 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra ) } -func newPipelineContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, pluginGasLimit *uint32, configWatcher *configWatcher, spec job.Job, pr pipeline.Runner) (*contractTransmitter, error) { - var relayConfig types.RelayConfig - if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil { - return nil, err - } - - if !relayConfig.EffectiveTransmitterID.Valid { - return nil, pkgerrors.New("EffectiveTransmitterID must be specified") - } - effectiveTransmitterAddress := common.HexToAddress(relayConfig.EffectiveTransmitterID.String) - transmitterAddress := common.HexToAddress(transmitterID) - scoped := configWatcher.chain.Config() - strategy := txmgrcommon.NewQueueingTxStrategy(rargs.ExternalJobID, scoped.OCR2().DefaultTransactionQueueDepth(), scoped.Database().DefaultQueryTimeout()) - - var checker txm.TransmitCheckerSpec - if configWatcher.chain.Config().OCR2().SimulateTransactions() { - checker.CheckerType = txm.TransmitCheckerTypeSimulate - } - - gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() - ocr2Limit := configWatcher.chain.Config().EVM().GasEstimator().LimitJobType().OCR2() - if ocr2Limit != nil { - gasLimit = *ocr2Limit - } - if pluginGasLimit != nil { - gasLimit = *pluginGasLimit - } - - return NewOCRContractTransmitter( - configWatcher.contractAddress, - configWatcher.chain.Client(), - configWatcher.contractABI, - ocrcommon.NewPipelineTransmitter( - lggr, - transmitterAddress, - gasLimit, - effectiveTransmitterAddress, - strategy, - checker, - pr, - spec, - configWatcher.chain.ID().String(), - ), - configWatcher.chain.LogPoller(), - lggr, - nil, - ) -} - func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontypes.PluginArgs) (commontypes.MedianProvider, error) { lggr := r.lggr.Named("MedianProvider").Named(rargs.ExternalJobID.String()) relayOpts := types.NewRelayOpts(rargs) @@ -513,7 +461,7 @@ func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontyp } reportCodec := evmreportcodec.ReportCodec{} - contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, configWatcher, r.ks.Eth()) + contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, configWatcher, r.ks.Eth(), configWatcher.chain.Config().EVM().GasEstimator().LimitDefault()) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index abc03c7abb1..2e6c0e2bd40 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -22,8 +22,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" ) @@ -51,21 +50,19 @@ type OCR2KeeperRelayer interface { // ocr2keeperRelayer is the relayer with added DKG and OCR2Keeper provider functions. type ocr2keeperRelayer struct { - db *sqlx.DB - chain legacyevm.Chain - pr pipeline.Runner - spec job.Job - lggr logger.Logger + db *sqlx.DB + chain legacyevm.Chain + lggr logger.Logger + ethKeystore keystore.Eth } // NewOCR2KeeperRelayer is the constructor of ocr2keeperRelayer -func NewOCR2KeeperRelayer(db *sqlx.DB, chain legacyevm.Chain, pr pipeline.Runner, spec job.Job, lggr logger.Logger) OCR2KeeperRelayer { +func NewOCR2KeeperRelayer(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, ethKeystore keystore.Eth) OCR2KeeperRelayer { return &ocr2keeperRelayer{ - db: db, - chain: chain, - pr: pr, - spec: spec, - lggr: lggr, + db: db, + chain: chain, + lggr: lggr, + ethKeystore: ethKeystore, } } @@ -74,9 +71,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p if err != nil { return nil, err } - - gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit() - contractTransmitter, err := newPipelineContractTransmitter(r.lggr, rargs, pargs.TransmitterID, &gasLimit, cfgWatcher, r.spec, r.pr) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, cfgWatcher, r.ethKeystore, cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit()) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2vrf.go b/core/services/relay/evm/ocr2vrf.go index 39d0503b8b1..00a2d1a4360 100644 --- a/core/services/relay/evm/ocr2vrf.go +++ b/core/services/relay/evm/ocr2vrf.go @@ -67,7 +67,7 @@ func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commo if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore, configWatcher.chain.Config().EVM().GasEstimator().LimitDefault()) if err != nil { return nil, err } @@ -90,7 +90,7 @@ func (r *ocr2vrfRelayer) NewOCR2VRFProvider(rargs commontypes.RelayArgs, pargs c if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore, configWatcher.chain.Config().EVM().GasEstimator().LimitDefault()) if err != nil { return nil, err } From d6ced2f4acd91756d69066664df3a36a809798da Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 1 Dec 2023 14:15:10 +0000 Subject: [PATCH 02/14] Fix streams import --- core/services/ocr2/plugins/ocr2keeper/integration_21_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index f2bb88591c7..2050d1a2539 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -53,6 +53,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" ) From 28b6cbc533c62ea970d90309ebcf3ed98ea75b3f Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 1 Dec 2023 14:28:03 +0000 Subject: [PATCH 03/14] Clean up function calls --- core/services/relay/evm/evm.go | 3 ++- core/services/relay/evm/ocr2keeper.go | 4 +++- core/services/relay/evm/ocr2vrf.go | 6 ++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index ad6eadeabac..e7972332720 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -461,7 +461,8 @@ func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontyp } reportCodec := evmreportcodec.ReportCodec{} - contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, configWatcher, r.ks.Eth(), configWatcher.chain.Config().EVM().GasEstimator().LimitDefault()) + gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() + contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, configWatcher, r.ks.Eth(), gasLimit) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index 2e6c0e2bd40..1f3839d7807 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -71,7 +71,9 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, cfgWatcher, r.ethKeystore, cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit()) + + gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit() + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, cfgWatcher, r.ethKeystore, gasLimit) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2vrf.go b/core/services/relay/evm/ocr2vrf.go index 00a2d1a4360..22e25e913ae 100644 --- a/core/services/relay/evm/ocr2vrf.go +++ b/core/services/relay/evm/ocr2vrf.go @@ -67,7 +67,8 @@ func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commo if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore, configWatcher.chain.Config().EVM().GasEstimator().LimitDefault()) + gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore, gasLimit) if err != nil { return nil, err } @@ -90,7 +91,8 @@ func (r *ocr2vrfRelayer) NewOCR2VRFProvider(rargs commontypes.RelayArgs, pargs c if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore, configWatcher.chain.Config().EVM().GasEstimator().LimitDefault()) + gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore, gasLimit) if err != nil { return nil, err } From ba160f68bb7c0ca83a574cdfd2b9c938c590d1d5 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 5 Dec 2023 14:41:17 +0000 Subject: [PATCH 04/14] Remove pipeline runner dependency for automation --- core/services/ocr2/delegate.go | 4 ++-- core/services/ocr2/plugins/ocr2keeper/util.go | 9 +++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 0cd2b784df1..326ce724cb5 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1084,7 +1084,7 @@ func (d *Delegate) newServicesOCR2Keepers21( return nil, fmt.Errorf("keeper2 services: failed to get chain %s: %w", rid.ChainID, err2) } - keeperProvider, services, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, chain, d.pipelineRunner, mc, kb, d.cfg.Database(), d.ethKs) + keeperProvider, services, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, chain, mc, kb, d.cfg.Database(), d.ethKs) if err2 != nil { return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers") } @@ -1201,7 +1201,7 @@ func (d *Delegate) newServicesOCR2Keepers20( return nil, fmt.Errorf("keepers2.0 services: failed to get chain (%s): %w", rid.ChainID, err2) } - keeperProvider, rgstry, encoder, logProvider, err2 := ocr2keeper.EVMDependencies20(jb, d.db, lggr, chain, d.pipelineRunner, d.ethKs) + keeperProvider, rgstry, encoder, logProvider, err2 := ocr2keeper.EVMDependencies20(jb, d.db, lggr, chain, d.ethKs) if err2 != nil { return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers") } diff --git a/core/services/ocr2/plugins/ocr2keeper/util.go b/core/services/ocr2/plugins/ocr2keeper/util.go index 8bbb9fbc534..c3c60ad58b1 100644 --- a/core/services/ocr2/plugins/ocr2keeper/util.go +++ b/core/services/ocr2/plugins/ocr2keeper/util.go @@ -24,7 +24,6 @@ import ( evmregistry21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21" evmregistry21transmit "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/transmit" "github.com/smartcontractkit/chainlink/v2/core/services/pg" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" ) @@ -44,7 +43,7 @@ var ( ErrNoChainFromSpec = fmt.Errorf("could not create chain from spec") ) -func EVMProvider(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, spec job.Job, pr pipeline.Runner, ethKeystore keystore.Eth) (evmrelay.OCR2KeeperProvider, error) { +func EVMProvider(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, spec job.Job, ethKeystore keystore.Eth) (evmrelay.OCR2KeeperProvider, error) { oSpec := spec.OCR2OracleSpec ocr2keeperRelayer := evmrelay.NewOCR2KeeperRelayer(db, chain, lggr.Named("OCR2KeeperRelayer"), ethKeystore) @@ -72,7 +71,6 @@ func EVMDependencies20( db *sqlx.DB, lggr logger.Logger, chain legacyevm.Chain, - pr pipeline.Runner, ethKeystore keystore.Eth, ) (evmrelay.OCR2KeeperProvider, *evmregistry20.EvmRegistry, Encoder20, *evmregistry20.LogProvider, error) { var err error @@ -81,7 +79,7 @@ func EVMDependencies20( var registry *evmregistry20.EvmRegistry // the provider will be returned as a dependency - if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr, ethKeystore); err != nil { + if keeperProvider, err = EVMProvider(db, chain, lggr, spec, ethKeystore); err != nil { return nil, nil, nil, nil, err } @@ -114,7 +112,6 @@ func EVMDependencies21( db *sqlx.DB, lggr logger.Logger, chain legacyevm.Chain, - pr pipeline.Runner, mc *models.MercuryCredentials, keyring ocrtypes.OnchainKeyring, dbCfg pg.QConfig, @@ -125,7 +122,7 @@ func EVMDependencies21( oSpec := spec.OCR2OracleSpec // the provider will be returned as a dependency - if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr, ethKeystore); err != nil { + if keeperProvider, err = EVMProvider(db, chain, lggr, spec, ethKeystore); err != nil { return nil, nil, err } From df408bf048b5aea9c01b78e5d34cf856c81fd29e Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 5 Dec 2023 15:57:37 +0000 Subject: [PATCH 05/14] Use contractTransmitterOpts to specify a pluginGasLimit --- core/services/relay/evm/evm.go | 17 ++++++++++++++--- core/services/relay/evm/ocr2keeper.go | 2 +- core/services/relay/evm/ocr2vrf.go | 6 ++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index e7972332720..908ff9885dc 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -371,7 +371,13 @@ func newConfigProvider(lggr logger.Logger, chain legacyevm.Chain, opts *types.Re return newConfigWatcher(lggr, aggregatorAddress, contractABI, offchainConfigDigester, cp, chain, relayConfig.FromBlock, opts.New), nil } -func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, configWatcher *configWatcher, ethKeystore keystore.Eth, gasLimit uint32) (*contractTransmitter, error) { +type configTransmitterOpts struct { + configWatcher *configWatcher + // override the gas limit default provided in the config watcher + pluginGasLimit uint32 +} + +func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, opts configTransmitterOpts) (*contractTransmitter, error) { var relayConfig types.RelayConfig if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil { return nil, err @@ -388,6 +394,8 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra return nil, pkgerrors.New("no sending keys provided") } + configWatcher := opts.configWatcher + // If we are using multiple sending keys, then a forwarder is needed to rotate transmissions. // Ensure that this forwarder is not set to a local sending key, and ensure our sending keys are enabled. for _, s := range sendingKeys { @@ -408,10 +416,14 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra checker.CheckerType = txm.TransmitCheckerTypeSimulate } + gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() ocr2Limit := configWatcher.chain.Config().EVM().GasEstimator().LimitJobType().OCR2() if ocr2Limit != nil { gasLimit = *ocr2Limit } + if opts.pluginGasLimit != 0 { + gasLimit = opts.pluginGasLimit + } transmitter, err := ocrcommon.NewTransmitter( configWatcher.chain.TxManager(), @@ -461,8 +473,7 @@ func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontyp } reportCodec := evmreportcodec.ReportCodec{} - gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() - contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, configWatcher, r.ks.Eth(), gasLimit) + contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configTransmitterOpts{configWatcher: configWatcher}) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index 1f3839d7807..87e37b0ac62 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -73,7 +73,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p } gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit() - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, cfgWatcher, r.ethKeystore, gasLimit) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configTransmitterOpts{configWatcher: cfgWatcher, pluginGasLimit: gasLimit}) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2vrf.go b/core/services/relay/evm/ocr2vrf.go index 22e25e913ae..59fa9016c5b 100644 --- a/core/services/relay/evm/ocr2vrf.go +++ b/core/services/relay/evm/ocr2vrf.go @@ -67,8 +67,7 @@ func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commo if err != nil { return nil, err } - gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore, gasLimit) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configTransmitterOpts{configWatcher: configWatcher}) if err != nil { return nil, err } @@ -91,8 +90,7 @@ func (r *ocr2vrfRelayer) NewOCR2VRFProvider(rargs commontypes.RelayArgs, pargs c if err != nil { return nil, err } - gasLimit := configWatcher.chain.Config().EVM().GasEstimator().LimitDefault() - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, configWatcher, r.ethKeystore, gasLimit) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configTransmitterOpts{configWatcher: configWatcher}) if err != nil { return nil, err } From ba9ac83e5693c581e7ab4ee5e3fd3665e01fa69b Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 5 Dec 2023 16:02:00 +0000 Subject: [PATCH 06/14] Make the pluginGasLimit a pointer --- core/services/relay/evm/evm.go | 6 +++--- core/services/relay/evm/ocr2keeper.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 908ff9885dc..e4820613b48 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -374,7 +374,7 @@ func newConfigProvider(lggr logger.Logger, chain legacyevm.Chain, opts *types.Re type configTransmitterOpts struct { configWatcher *configWatcher // override the gas limit default provided in the config watcher - pluginGasLimit uint32 + pluginGasLimit *uint32 } func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, opts configTransmitterOpts) (*contractTransmitter, error) { @@ -421,8 +421,8 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra if ocr2Limit != nil { gasLimit = *ocr2Limit } - if opts.pluginGasLimit != 0 { - gasLimit = opts.pluginGasLimit + if opts.pluginGasLimit != nil { + gasLimit = *opts.pluginGasLimit } transmitter, err := ocrcommon.NewTransmitter( diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index 87e37b0ac62..74d6ee6c780 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -73,7 +73,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p } gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit() - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configTransmitterOpts{configWatcher: cfgWatcher, pluginGasLimit: gasLimit}) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configTransmitterOpts{configWatcher: cfgWatcher, pluginGasLimit: &gasLimit}) if err != nil { return nil, err } From e0405d0380b808b20ecc6796c18d58508378b592 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 5 Dec 2023 21:24:09 +0000 Subject: [PATCH 07/14] Clean up the pipeline transmitter --- .../ocrcommon/transmitter_pipeline.go | 97 ------------------- .../ocrcommon/transmitter_pipeline_test.go | 77 --------------- 2 files changed, 174 deletions(-) delete mode 100644 core/services/ocrcommon/transmitter_pipeline.go delete mode 100644 core/services/ocrcommon/transmitter_pipeline_test.go diff --git a/core/services/ocrcommon/transmitter_pipeline.go b/core/services/ocrcommon/transmitter_pipeline.go deleted file mode 100644 index e62f745a941..00000000000 --- a/core/services/ocrcommon/transmitter_pipeline.go +++ /dev/null @@ -1,97 +0,0 @@ -package ocrcommon - -import ( - "context" - "fmt" - - "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" - - "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" -) - -const txObservationSource = ` - transmit_tx [type=ethtx - minConfirmations=0 - to="$(jobSpec.contractAddress)" - from="[$(jobSpec.fromAddress)]" - evmChainID="$(jobSpec.evmChainID)" - data="$(jobSpec.data)" - gasLimit="$(jobSpec.gasLimit)" - forwardingAllowed="$(jobSpec.forwardingAllowed)" - transmitChecker="$(jobSpec.transmitChecker)"] - transmit_tx -` - -type pipelineTransmitter struct { - lgr logger.Logger - fromAddress common.Address - gasLimit uint32 - effectiveTransmitterAddress common.Address - strategy types.TxStrategy - checker txmgr.TransmitCheckerSpec - pr pipeline.Runner - spec job.Job - chainID string -} - -// NewPipelineTransmitter creates a new eth transmitter using the job pipeline mechanism -func NewPipelineTransmitter( - lgr logger.Logger, - fromAddress common.Address, - gasLimit uint32, - effectiveTransmitterAddress common.Address, - strategy types.TxStrategy, - checker txmgr.TransmitCheckerSpec, - pr pipeline.Runner, - spec job.Job, - chainID string, -) Transmitter { - return &pipelineTransmitter{ - lgr: lgr, - fromAddress: fromAddress, - gasLimit: gasLimit, - effectiveTransmitterAddress: effectiveTransmitterAddress, - strategy: strategy, - checker: checker, - pr: pr, - spec: spec, - chainID: chainID, - } -} - -func (t *pipelineTransmitter) CreateEthTransaction(ctx context.Context, toAddress common.Address, payload []byte, _ *txmgr.TxMeta) error { - // t.strategy is ignored currently as pipeline does not support passing this (sc-55115) - vars := pipeline.NewVarsFrom(map[string]interface{}{ - "jobSpec": map[string]interface{}{ - "contractAddress": toAddress.String(), - "fromAddress": t.fromAddress.String(), - "gasLimit": t.gasLimit, - "evmChainID": t.chainID, - "forwardingAllowed": t.spec.ForwardingAllowed, - "data": payload, - "transmitChecker": t.checker, - }, - }) - - t.spec.PipelineSpec.DotDagSource = txObservationSource - run := pipeline.NewRun(*t.spec.PipelineSpec, vars) - - if _, err := t.pr.Run(ctx, run, t.lgr, true, nil); err != nil { - return errors.Wrap(err, "Skipped OCR transmission") - } - - if run.State != pipeline.RunStatusCompleted { - return fmt.Errorf("unexpected pipeline run state: %s with fatal errors %w", run.State, run.FatalErrors.ToError()) - } - - return nil -} - -func (t *pipelineTransmitter) FromAddress() common.Address { - return t.effectiveTransmitterAddress -} diff --git a/core/services/ocrcommon/transmitter_pipeline_test.go b/core/services/ocrcommon/transmitter_pipeline_test.go deleted file mode 100644 index e0114d0aa0d..00000000000 --- a/core/services/ocrcommon/transmitter_pipeline_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package ocrcommon_test - -import ( - "testing" - - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" - "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" - "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" - pipelinemocks "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/mocks" -) - -func Test_PipelineTransmitter_CreateEthTransaction(t *testing.T) { - t.Parallel() - - lggr := logger.TestLogger(t) - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) - - chainID := "12345" - gasLimit := uint32(1000) - effectiveTransmitterAddress := fromAddress - toAddress := testutils.NewAddress() - payload := []byte{1, 2, 3} - strategy := newMockTxStrategy(t) - checker := txmgr.TransmitCheckerSpec{CheckerType: txmgr.TransmitCheckerTypeSimulate} - runner := pipelinemocks.NewRunner(t) - - transmitter := ocrcommon.NewPipelineTransmitter( - lggr, - fromAddress, - gasLimit, - effectiveTransmitterAddress, - strategy, - checker, - runner, - job.Job{ - PipelineSpec: &pipeline.Spec{}, - }, - chainID, - ) - - runner.On("Run", mock.Anything, mock.AnythingOfType("*pipeline.Run"), mock.Anything, mock.Anything, mock.Anything). - Return(false, nil). - Run(func(args mock.Arguments) { - run := args.Get(1).(*pipeline.Run) - require.Equal(t, map[string]interface{}{ - "jobSpec": map[string]interface{}{ - "contractAddress": toAddress.String(), - "fromAddress": fromAddress.String(), - "gasLimit": gasLimit, - "evmChainID": chainID, - "forwardingAllowed": false, - "data": payload, - "transmitChecker": checker, - }, - }, run.Inputs.Val) - - save := args.Get(3).(bool) - require.True(t, save) - - run.State = pipeline.RunStatusCompleted - }).Once() - - require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, nil)) -} From 027f680479fb87554edf2490239ca69dc6e1740c Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 5 Dec 2023 23:11:32 +0000 Subject: [PATCH 08/14] Attempt to listen for transmits Clean up linter Extract function Intentionally fail test Rework transmit listen Try filtering for transmits Update test --- .../plugins/ocr2keeper/integration_21_test.go | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index 2050d1a2539..f29d2d705cd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -225,6 +225,10 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) done() + transmitsListener, done := listenForTransmits(t, backend, registry, int64(1), 1) + g.Eventually(transmitsListener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) + done() + t.Run("recover logs", func(t *testing.T) { addr, contract := addrs[0], contracts[0] @@ -250,6 +254,43 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { }) } +func listenForTransmits(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, startBlock int64, transmits int) (func() bool, func()) { + cache := &sync.Map{} + ctx, cancel := context.WithCancel(testutils.Context(t)) + start := startBlock + + go func() { + for ctx.Err() == nil { + bl := backend.Blockchain().CurrentBlock().Number.Uint64() + + iter, err := registry.FilterTransmitted(&bind.FilterOpts{ + Start: uint64(start), + End: &bl, + Context: ctx, + }) + + if ctx.Err() != nil { + return + } + + require.NoError(t, err) + + for iter.Next() { + if iter.Event != nil { + t.Logf("[automation-ocr3 | EvmRegistry] transmit event emitted for id %s", iter.Event.Topic().String()) + cache.Store(iter.Event.Topic().String(), true) + } + } + + require.NoError(t, iter.Close()) + + time.Sleep(time.Second) + } + }() + + return mapListener(cache, transmits), cancel +} + func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) { g := gomega.NewWithT(t) From b2f5d6a514cb4859956c212f49cae6553737fa47 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Wed, 6 Dec 2023 23:55:55 +0000 Subject: [PATCH 09/14] Revert "Attempt to listen for transmits" This reverts commit 198e6669a6f64768c84acb82e01b2773c89426ce. --- .../plugins/ocr2keeper/integration_21_test.go | 41 ------------------- 1 file changed, 41 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index f29d2d705cd..2050d1a2539 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -225,10 +225,6 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) done() - transmitsListener, done := listenForTransmits(t, backend, registry, int64(1), 1) - g.Eventually(transmitsListener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) - done() - t.Run("recover logs", func(t *testing.T) { addr, contract := addrs[0], contracts[0] @@ -254,43 +250,6 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { }) } -func listenForTransmits(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, startBlock int64, transmits int) (func() bool, func()) { - cache := &sync.Map{} - ctx, cancel := context.WithCancel(testutils.Context(t)) - start := startBlock - - go func() { - for ctx.Err() == nil { - bl := backend.Blockchain().CurrentBlock().Number.Uint64() - - iter, err := registry.FilterTransmitted(&bind.FilterOpts{ - Start: uint64(start), - End: &bl, - Context: ctx, - }) - - if ctx.Err() != nil { - return - } - - require.NoError(t, err) - - for iter.Next() { - if iter.Event != nil { - t.Logf("[automation-ocr3 | EvmRegistry] transmit event emitted for id %s", iter.Event.Topic().String()) - cache.Store(iter.Event.Topic().String(), true) - } - } - - require.NoError(t, iter.Close()) - - time.Sleep(time.Second) - } - }() - - return mapListener(cache, transmits), cancel -} - func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) { g := gomega.NewWithT(t) From 93e9973ddaf20a98fe7ef250fe4b29acd2836b6d Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 7 Dec 2023 15:07:47 +0000 Subject: [PATCH 10/14] Listen for performed events --- .../ocr2/plugins/ocr2keeper/integration_21_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index 2050d1a2539..9dbafd6111c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -241,12 +241,21 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { time.Sleep(time.Millisecond * 250) // otherwise we get "invalid transaction nonce" errors } }) - // Mine enough blocks to ensre these logs don't fall into log provider range + + beforeDummyBlocks := backend.Blockchain().CurrentBlock().Number.Uint64() + + // Mine enough blocks to ensure these logs don't fall into log provider range dummyBlocks := 500 for i := 0; i < dummyBlocks; i++ { backend.Commit() time.Sleep(time.Millisecond * 10) } + + t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks) + + listener, done := listenPerformed(t, backend, registry, ids, int64(beforeDummyBlocks)) + g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) + done() }) } From c31b3e42ea5c99d0a23b665cbf92012a3f42d96b Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 7 Dec 2023 15:26:30 +0000 Subject: [PATCH 11/14] Goimports --- core/services/ocr2/plugins/ocr2keeper/integration_21_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index 9dbafd6111c..9bbe3ece372 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -243,7 +243,7 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { }) beforeDummyBlocks := backend.Blockchain().CurrentBlock().Number.Uint64() - + // Mine enough blocks to ensure these logs don't fall into log provider range dummyBlocks := 500 for i := 0; i < dummyBlocks; i++ { From 0311896635bb5693312352e89f953a3f63fe27f6 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 7 Dec 2023 16:22:21 +0000 Subject: [PATCH 12/14] Add wrapper function that lets us specify a count of performed --- .../ocr2/plugins/ocr2keeper/integration_21_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index 9bbe3ece372..d94c939aab8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -253,7 +253,7 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks) - listener, done := listenPerformed(t, backend, registry, ids, int64(beforeDummyBlocks)) + listener, done := listenPerformedN(t, backend, registry, ids, int64(beforeDummyBlocks), len(ids)) g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) done() }) @@ -397,7 +397,7 @@ func mapListener(m *sync.Map, n int) func() bool { } } -func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64) (func() bool, func()) { +func listenPerformedN(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64, count int) (func() bool, func()) { cache := &sync.Map{} ctx, cancel := context.WithCancel(testutils.Context(t)) start := startBlock @@ -436,7 +436,11 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry } }() - return mapListener(cache, 0), cancel + return mapListener(cache, count), cancel +} + +func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64) (func() bool, func()) { + return listenPerformedN(t, backend, registry, ids, startBlock, 0) } func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts) ([]Node, *SimulatedMercuryServer) { From c12cab2a58832612c76e6fcd23169f732aa39348 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 8 Dec 2023 16:58:06 +0000 Subject: [PATCH 13/14] Update integration test --- .../plugins/ocr2keeper/integration_21_test.go | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index d94c939aab8..81a35a5ced2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -226,17 +226,16 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { done() t.Run("recover logs", func(t *testing.T) { - addr, contract := addrs[0], contracts[0] upkeepID := registerUpkeep(t, registry, addr, carrol, steve, backend) backend.Commit() t.Logf("Registered new upkeep %s for address %s", upkeepID.String(), addr.String()) // Emit 100 logs in a burst - emits := 100 + recoverEmits := 100 i := 0 emitEvents(testutils.Context(t), t, 100, []*log_upkeep_counter_wrapper.LogUpkeepCounter{contract}, carrol, func() { i++ - if i%(emits/4) == 0 { + if i%(recoverEmits/4) == 0 { backend.Commit() time.Sleep(time.Millisecond * 250) // otherwise we get "invalid transaction nonce" errors } @@ -253,7 +252,7 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks) - listener, done := listenPerformedN(t, backend, registry, ids, int64(beforeDummyBlocks), len(ids)) + listener, done := listenPerformedN(t, backend, registry, ids, int64(beforeDummyBlocks), recoverEmits) g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue()) done() }) @@ -390,7 +389,7 @@ func mapListener(m *sync.Map, n int) func() bool { return func() bool { count := 0 m.Range(func(key, value interface{}) bool { - count++ + count += value.(int) return true }) return count > n @@ -404,18 +403,18 @@ func listenPerformedN(t *testing.T, backend *backends.SimulatedBackend, registry go func() { for ctx.Err() == nil { - bl := backend.Blockchain().CurrentBlock().Number.Uint64() + currentBlock := backend.Blockchain().CurrentBlock().Number.Uint64() - sc := make([]bool, len(ids)) - for i := range sc { - sc[i] = true + success := make([]bool, len(ids)) + for i := range success { + success[i] = true } iter, err := registry.FilterUpkeepPerformed(&bind.FilterOpts{ Start: uint64(start), - End: &bl, + End: ¤tBlock, Context: ctx, - }, ids, sc) + }, ids, success) if ctx.Err() != nil { return @@ -426,7 +425,15 @@ func listenPerformedN(t *testing.T, backend *backends.SimulatedBackend, registry for iter.Next() { if iter.Event != nil { t.Logf("[automation-ocr3 | EvmRegistry] upkeep performed event emitted for id %s", iter.Event.Id.String()) - cache.Store(iter.Event.Id.String(), true) + + //cache.Store(iter.Event.Id.String(), true) + count, ok := cache.Load(iter.Event.Id.String()) + if !ok { + cache.Store(iter.Event.Id.String(), 1) + continue + } + countI := count.(int) + cache.Store(iter.Event.Id.String(), countI+1) } } From 785faa67e3905942161f286fd1af829ba9069e5a Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Mon, 11 Dec 2023 21:45:55 +0000 Subject: [PATCH 14/14] Pass the configWatcher as a parameter to indicate that its required --- core/services/relay/evm/evm.go | 7 ++----- core/services/relay/evm/ocr2keeper.go | 2 +- core/services/relay/evm/ocr2vrf.go | 4 ++-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index e4820613b48..83540e22bb7 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -372,12 +372,11 @@ func newConfigProvider(lggr logger.Logger, chain legacyevm.Chain, opts *types.Re } type configTransmitterOpts struct { - configWatcher *configWatcher // override the gas limit default provided in the config watcher pluginGasLimit *uint32 } -func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, opts configTransmitterOpts) (*contractTransmitter, error) { +func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts) (*contractTransmitter, error) { var relayConfig types.RelayConfig if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil { return nil, err @@ -394,8 +393,6 @@ func newContractTransmitter(lggr logger.Logger, rargs commontypes.RelayArgs, tra return nil, pkgerrors.New("no sending keys provided") } - configWatcher := opts.configWatcher - // If we are using multiple sending keys, then a forwarder is needed to rotate transmissions. // Ensure that this forwarder is not set to a local sending key, and ensure our sending keys are enabled. for _, s := range sendingKeys { @@ -473,7 +470,7 @@ func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontyp } reportCodec := evmreportcodec.ReportCodec{} - contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configTransmitterOpts{configWatcher: configWatcher}) + contractTransmitter, err := newContractTransmitter(lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index 74d6ee6c780..55c4d78e7b4 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -73,7 +73,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p } gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit() - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configTransmitterOpts{configWatcher: cfgWatcher, pluginGasLimit: &gasLimit}) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2vrf.go b/core/services/relay/evm/ocr2vrf.go index 59fa9016c5b..1e05f89d9de 100644 --- a/core/services/relay/evm/ocr2vrf.go +++ b/core/services/relay/evm/ocr2vrf.go @@ -67,7 +67,7 @@ func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commo if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configTransmitterOpts{configWatcher: configWatcher}) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}) if err != nil { return nil, err } @@ -90,7 +90,7 @@ func (r *ocr2vrfRelayer) NewOCR2VRFProvider(rargs commontypes.RelayArgs, pargs c if err != nil { return nil, err } - contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configTransmitterOpts{configWatcher: configWatcher}) + contractTransmitter, err := newContractTransmitter(r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}) if err != nil { return nil, err }