Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change keepers to use the default contract transmitter #11308

Merged
merged 14 commits into from
Dec 12, 2023
Merged
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ func (d *Delegate) newServicesOCR2Keepers21(
return nil, fmt.Errorf("keeper2 services: failed to get chain %s: %w", rid.ChainID, err2)
}

keeperProvider, services, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, chain, d.pipelineRunner, mc, kb, d.cfg.Database())
keeperProvider, services, err2 := ocr2keeper.EVMDependencies21(jb, d.db, lggr, chain, mc, kb, d.cfg.Database(), d.ethKs)
if err2 != nil {
return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers")
}
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func (d *Delegate) newServicesOCR2Keepers20(
return nil, fmt.Errorf("keepers2.0 services: failed to get chain (%s): %w", rid.ChainID, err2)
}

keeperProvider, rgstry, encoder, logProvider, err2 := ocr2keeper.EVMDependencies20(jb, d.db, lggr, chain, d.pipelineRunner)
keeperProvider, rgstry, encoder, logProvider, err2 := ocr2keeper.EVMDependencies20(jb, d.db, lggr, chain, d.ethKs)
if err2 != nil {
return nil, errors.Wrap(err2, "could not build dependencies for ocr2 keepers")
}
Expand Down
93 changes: 35 additions & 58 deletions core/services/ocr2/plugins/ocr2keeper/integration_21_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
)

Expand Down Expand Up @@ -117,7 +116,7 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) {
require.NoError(t, err)
registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr)

nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve)
setupNodes(t, nodeKeys, registry, backend, steve)

<-time.After(time.Second * 5)

Expand Down Expand Up @@ -160,8 +159,6 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) {
}
g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1))

checkPipelineRuns(t, nodes, 1)

// change payload
_, err = upkeepContract.SetBytesToSend(carrol, payload2)
require.NoError(t, err)
Expand Down Expand Up @@ -204,7 +201,7 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) {
require.NoError(t, err)

registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr)
nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve)
setupNodes(t, nodeKeys, registry, backend, steve)
upkeeps := 1

_, err = linkToken.Transfer(sergey, carrol.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeeps+1))))
Expand All @@ -228,35 +225,36 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) {
g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue())
done()

runs := checkPipelineRuns(t, nodes, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we explored any alternate assertions for non pipeline transmitter? Or do we feel they are covered by existing assertions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to see how the other services assert their transmits, so I just assumed the existing assertions would be enough, but I can look into this further

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verifying the upkeeps are actually performed i think should give similar coverage but I'm not sure that's done everywhere (e.g. the other place where this is removed)


t.Run("recover logs", func(t *testing.T) {

addr, contract := addrs[0], contracts[0]
upkeepID := registerUpkeep(t, registry, addr, carrol, steve, backend)
backend.Commit()
t.Logf("Registered new upkeep %s for address %s", upkeepID.String(), addr.String())
// Emit 100 logs in a burst
emits := 100
recoverEmits := 100
i := 0
emitEvents(testutils.Context(t), t, 100, []*log_upkeep_counter_wrapper.LogUpkeepCounter{contract}, carrol, func() {
i++
if i%(emits/4) == 0 {
if i%(recoverEmits/4) == 0 {
backend.Commit()
time.Sleep(time.Millisecond * 250) // otherwise we get "invalid transaction nonce" errors
}
})
// Mine enough blocks to ensre these logs don't fall into log provider range

beforeDummyBlocks := backend.Blockchain().CurrentBlock().Number.Uint64()

// Mine enough blocks to ensure these logs don't fall into log provider range
dummyBlocks := 500
for i := 0; i < dummyBlocks; i++ {
backend.Commit()
time.Sleep(time.Millisecond * 10)
}
t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks)

expectedPostRecover := runs + emits
waitPipelineRuns(t, nodes, expectedPostRecover, testutils.WaitTimeout(t), cltest.DBPollingInterval)
Comment on lines -257 to -258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should find some other way to assert wanted expectations. Removing this is weakening this test

t.Logf("Mined %d blocks, waiting for logs to be recovered", dummyBlocks)

listener, done := listenPerformedN(t, backend, registry, ids, int64(beforeDummyBlocks), recoverEmits)
g.Eventually(listener, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.BeTrue())
done()
})
}

Expand Down Expand Up @@ -296,7 +294,7 @@ func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) {

registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr)

nodes, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner)
_, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner)

const upkeepCount = 10
const mercuryFailCount = upkeepCount * 3 * 2
Expand Down Expand Up @@ -374,39 +372,6 @@ func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) {
g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue())

done()

_ = checkPipelineRuns(t, nodes, 1*len(nodes)) // TODO: TBD
}

func waitPipelineRuns(t *testing.T, nodes []Node, n int, timeout, interval time.Duration) {
ctx, cancel := context.WithTimeout(testutils.Context(t), timeout)
defer cancel()
var allRuns []pipeline.Run
for len(allRuns) < n && ctx.Err() == nil {
allRuns = []pipeline.Run{}
for _, node := range nodes {
runs, err := node.App.PipelineORM().GetAllRuns()
require.NoError(t, err)
allRuns = append(allRuns, runs...)
}
time.Sleep(interval)
}
runs := len(allRuns)
t.Logf("found %d pipeline runs", runs)
require.GreaterOrEqual(t, runs, n)
}

func checkPipelineRuns(t *testing.T, nodes []Node, n int) int {
var allRuns []pipeline.Run
for _, node := range nodes {
runs, err2 := node.App.PipelineORM().GetAllRuns()
require.NoError(t, err2)
allRuns = append(allRuns, runs...)
}
runs := len(allRuns)
t.Logf("found %d pipeline runs", runs)
require.GreaterOrEqual(t, runs, n)
return runs
}

func emitEvents(ctx context.Context, t *testing.T, n int, contracts []*log_upkeep_counter_wrapper.LogUpkeepCounter, carrol *bind.TransactOpts, afterEmit func()) {
Expand All @@ -424,32 +389,32 @@ func mapListener(m *sync.Map, n int) func() bool {
return func() bool {
count := 0
m.Range(func(key, value interface{}) bool {
count++
count += value.(int)
return true
})
return count > n
}
}

func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64) (func() bool, func()) {
func listenPerformedN(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64, count int) (func() bool, func()) {
cache := &sync.Map{}
ctx, cancel := context.WithCancel(testutils.Context(t))
start := startBlock

go func() {
for ctx.Err() == nil {
bl := backend.Blockchain().CurrentBlock().Number.Uint64()
currentBlock := backend.Blockchain().CurrentBlock().Number.Uint64()

sc := make([]bool, len(ids))
for i := range sc {
sc[i] = true
success := make([]bool, len(ids))
for i := range success {
success[i] = true
}

iter, err := registry.FilterUpkeepPerformed(&bind.FilterOpts{
Start: uint64(start),
End: &bl,
End: &currentBlock,
Context: ctx,
}, ids, sc)
}, ids, success)

if ctx.Err() != nil {
return
Expand All @@ -460,7 +425,15 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry
for iter.Next() {
if iter.Event != nil {
t.Logf("[automation-ocr3 | EvmRegistry] upkeep performed event emitted for id %s", iter.Event.Id.String())
cache.Store(iter.Event.Id.String(), true)

//cache.Store(iter.Event.Id.String(), true)
count, ok := cache.Load(iter.Event.Id.String())
if !ok {
cache.Store(iter.Event.Id.String(), 1)
continue
}
countI := count.(int)
cache.Store(iter.Event.Id.String(), countI+1)
}
}

Expand All @@ -470,7 +443,11 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry
}
}()

return mapListener(cache, 0), cancel
return mapListener(cache, count), cancel
}

func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry *iregistry21.IKeeperRegistryMaster, ids []*big.Int, startBlock int64) (func() bool, func()) {
return listenPerformedN(t, backend, registry, ids, startBlock, 0)
}

func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts) ([]Node, *SimulatedMercuryServer) {
Expand Down
19 changes: 0 additions & 19 deletions core/services/ocr2/plugins/ocr2keeper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -411,15 +410,6 @@ func TestIntegration_KeeperPluginBasic(t *testing.T) {
}
g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1))

// check pipeline runs
var allRuns []pipeline.Run
for _, node := range nodes {
runs, err2 := node.App.PipelineORM().GetAllRuns()
require.NoError(t, err2)
allRuns = append(allRuns, runs...)
}
require.GreaterOrEqual(t, len(allRuns), 1)

// change payload
_, err = upkeepContract.SetBytesToSend(carrol, payload2)
require.NoError(t, err)
Expand Down Expand Up @@ -683,15 +673,6 @@ func TestIntegration_KeeperPluginForwarderEnabled(t *testing.T) {
}
g.Eventually(receivedBytes, testutils.WaitTimeout(t), cltest.DBPollingInterval).Should(gomega.Equal(payload1))

// check pipeline runs
var allRuns []pipeline.Run
for _, node := range nodes {
runs, err2 := node.App.PipelineORM().GetAllRuns()
require.NoError(t, err2)
allRuns = append(allRuns, runs...)
}
require.GreaterOrEqual(t, len(allRuns), 1)

// change payload
_, err = upkeepContract.SetBytesToSend(carrol, payload2)
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions core/services/ocr2/plugins/ocr2keeper/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
evmregistry20 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v20"
evmregistry21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21"
evmregistry21transmit "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/transmit"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
)

Expand All @@ -43,9 +43,9 @@ var (
ErrNoChainFromSpec = fmt.Errorf("could not create chain from spec")
)

func EVMProvider(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, spec job.Job, pr pipeline.Runner) (evmrelay.OCR2KeeperProvider, error) {
func EVMProvider(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, spec job.Job, ethKeystore keystore.Eth) (evmrelay.OCR2KeeperProvider, error) {
oSpec := spec.OCR2OracleSpec
ocr2keeperRelayer := evmrelay.NewOCR2KeeperRelayer(db, chain, pr, spec, lggr.Named("OCR2KeeperRelayer"))
ocr2keeperRelayer := evmrelay.NewOCR2KeeperRelayer(db, chain, lggr.Named("OCR2KeeperRelayer"), ethKeystore)

keeperProvider, err := ocr2keeperRelayer.NewOCR2KeeperProvider(
types.RelayArgs{
Expand All @@ -71,15 +71,15 @@ func EVMDependencies20(
db *sqlx.DB,
lggr logger.Logger,
chain legacyevm.Chain,
pr pipeline.Runner,
ethKeystore keystore.Eth,
) (evmrelay.OCR2KeeperProvider, *evmregistry20.EvmRegistry, Encoder20, *evmregistry20.LogProvider, error) {
var err error

var keeperProvider evmrelay.OCR2KeeperProvider
var registry *evmregistry20.EvmRegistry

// the provider will be returned as a dependency
if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr); err != nil {
if keeperProvider, err = EVMProvider(db, chain, lggr, spec, ethKeystore); err != nil {
return nil, nil, nil, nil, err
}

Expand Down Expand Up @@ -112,17 +112,17 @@ func EVMDependencies21(
db *sqlx.DB,
lggr logger.Logger,
chain legacyevm.Chain,
pr pipeline.Runner,
mc *models.MercuryCredentials,
keyring ocrtypes.OnchainKeyring,
dbCfg pg.QConfig,
ethKeystore keystore.Eth,
) (evmrelay.OCR2KeeperProvider, evmregistry21.AutomationServices, error) {
var err error
var keeperProvider evmrelay.OCR2KeeperProvider

oSpec := spec.OCR2OracleSpec
// the provider will be returned as a dependency
if keeperProvider, err = EVMProvider(db, chain, lggr, spec, pr); err != nil {
if keeperProvider, err = EVMProvider(db, chain, lggr, spec, ethKeystore); err != nil {
return nil, nil, err
}

Expand Down
Loading
Loading