Skip to content

Commit

Permalink
Fix fromBlock validation in lp replay (#7531)
Browse files Browse the repository at this point in the history
* Fix fromBlock validation in lp replay

* Fix log

* Use mathutil
  • Loading branch information
connorwstein authored Sep 28, 2022
1 parent 44c4858 commit 0283788
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 194 deletions.
62 changes: 62 additions & 0 deletions core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,72 @@ package logpoller

import (
"context"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/smartcontractkit/sqlx"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/core/gethwrappers/generated/log_emitter"
"github.com/smartcontractkit/chainlink/core/internal/testutils"
"github.com/smartcontractkit/chainlink/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/utils"
)

type TestHarness struct {
Lggr logger.Logger
ChainID *big.Int
db *sqlx.DB
ORM *ORM
LogPoller *logPoller
Client *backends.SimulatedBackend
Owner *bind.TransactOpts
Emitter1, Emitter2 *log_emitter.LogEmitter
EmitterAddress1, EmitterAddress2 common.Address
}

func SetupTH(t *testing.T) TestHarness {
lggr := logger.TestLogger(t)
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)
require.NoError(t, utils.JustError(db.Exec(`SET CONSTRAINTS log_poller_blocks_evm_chain_id_fkey DEFERRED`)))
require.NoError(t, utils.JustError(db.Exec(`SET CONSTRAINTS logs_evm_chain_id_fkey DEFERRED`)))
o := NewORM(chainID, db, lggr, pgtest.NewPGCfg(true))
owner := testutils.MustNewSimTransactor(t)
ec := backends.NewSimulatedBackend(map[common.Address]core.GenesisAccount{
owner.From: {
Balance: big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)),
},
}, 10e6)
lp := NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID), lggr, 15*time.Second, 2, 3, 2)
emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
ec.Commit()
return TestHarness{
Lggr: lggr,
ChainID: chainID,
db: db,
ORM: o,
LogPoller: lp,
Client: ec,
Owner: owner,
Emitter1: emitter1,
Emitter2: emitter2,
EmitterAddress1: emitterAddress1,
EmitterAddress2: emitterAddress2,
}
}

func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 {
lp.pollAndSaveLogs(ctx, currentBlockNumber)
lastProcessed, _ := lp.orm.SelectLatestBlock()
Expand Down
55 changes: 16 additions & 39 deletions core/chains/evm/logpoller/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ import (
"time"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/core/gethwrappers/generated/log_emitter"
"github.com/smartcontractkit/chainlink/core/internal/cltest/heavyweight"
Expand Down Expand Up @@ -106,62 +103,42 @@ func TestPopulateLoadedDB(t *testing.T) {
}

func TestLogPoller_Integration(t *testing.T) {
lggr := logger.TestLogger(t)
db := pgtest.NewSqlxDB(t)
chainID := testutils.NewRandomEVMChainID()
_, err := db.Exec(`INSERT INTO evm_chains (id, created_at, updated_at) VALUES ($1, NOW(), NOW())`, utils.NewBig(chainID))
require.NoError(t, err)

// Set up a test chain with a log emitting contract deployed.
owner := testutils.MustNewSimTransactor(t)
ec := backends.NewSimulatedBackend(map[common.Address]core.GenesisAccount{
owner.From: {
Balance: big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)),
},
}, 10e6)
t.Cleanup(func() { ec.Close() })
emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
ec.Commit()
ec.Commit() // Block 2. Ensure we have finality number of blocks
th := logpoller.SetupTH(t)
th.Client.Commit() // Block 2. Ensure we have finality number of blocks

// Set up a log poller listening for log emitter logs.
lp := logpoller.NewLogPoller(logpoller.NewORM(chainID, db, lggr, pgtest.NewPGCfg(true)),
client.NewSimulatedBackendClient(t, ec, chainID), lggr, 100*time.Millisecond, 2, 3, 2)
// Only filter for log1 events.
require.NoError(t, lp.MergeFilter([]common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{emitterAddress1}))
require.NoError(t, lp.Start(testutils.Context(t)))
require.NoError(t, th.LogPoller.MergeFilter([]common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1}))
require.NoError(t, th.LogPoller.Start(testutils.Context(t)))

// Emit some logs in blocks 3->7.
for i := 0; i < 5; i++ {
emitter1.EmitLog1(owner, []*big.Int{big.NewInt(int64(i))})
emitter1.EmitLog2(owner, []*big.Int{big.NewInt(int64(i))})
ec.Commit()
th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))})
th.Emitter1.EmitLog2(th.Owner, []*big.Int{big.NewInt(int64(i))})
th.Client.Commit()
}
// The poller starts on a new chain at latest-finality (5 in this case),
// replay to ensure we get all the logs.
require.NoError(t, lp.Replay(testutils.Context(t), 1))
require.NoError(t, th.LogPoller.Replay(testutils.Context(t), 1))

// We should immediately have all those Log1 logs.
logs, err := lp.Logs(2, 7, EmitterABI.Events["Log1"].ID, emitterAddress1)
logs, err := th.LogPoller.Logs(2, 7, EmitterABI.Events["Log1"].ID, th.EmitterAddress1)
require.NoError(t, err)
assert.Equal(t, 5, len(logs))
// Now let's update the filter and replay to get Log2 logs.
require.NoError(t, lp.MergeFilter([]common.Hash{EmitterABI.Events["Log2"].ID}, []common.Address{emitterAddress1}))
require.NoError(t, th.LogPoller.MergeFilter([]common.Hash{EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1}))
// Replay an invalid block should error
assert.Error(t, lp.Replay(testutils.Context(t), 0))
assert.Error(t, lp.Replay(testutils.Context(t), 20))
assert.Error(t, th.LogPoller.Replay(testutils.Context(t), 0))
assert.Error(t, th.LogPoller.Replay(testutils.Context(t), 20))
// Replay only from block 4, so we should see logs in block 4,5,6,7 (4 logs)
require.NoError(t, lp.Replay(testutils.Context(t), 4))
require.NoError(t, th.LogPoller.Replay(testutils.Context(t), 4))

// We should immediately see 4 logs2 logs.
logs, err = lp.Logs(2, 7, EmitterABI.Events["Log2"].ID, emitterAddress1)
logs, err = th.LogPoller.Logs(2, 7, EmitterABI.Events["Log2"].ID, th.EmitterAddress1)
require.NoError(t, err)
assert.Equal(t, 4, len(logs))

// Cancelling a replay should return an error synchronously.
ctx, cancel := context.WithCancel(context.Background())
cancel()
assert.True(t, errors.Is(lp.Replay(ctx, 4), logpoller.ErrReplayAbortedByClient))
require.NoError(t, lp.Close())
assert.True(t, errors.Is(th.LogPoller.Replay(ctx, 4), logpoller.ErrReplayAbortedByClient))
require.NoError(t, th.LogPoller.Close())
}
38 changes: 31 additions & 7 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/core/chains/evm/client"
evmtypes "github.com/smartcontractkit/chainlink/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/core/logger"
Expand Down Expand Up @@ -68,7 +69,7 @@ type logPoller struct {
eventSigs map[common.Hash]struct{}

replayStart chan ReplayRequest
replayComplete chan struct{}
replayComplete chan error
ctx context.Context
cancel context.CancelFunc
done chan struct{}
Expand All @@ -93,7 +94,7 @@ func NewLogPoller(orm *ORM, ec client.Client, lggr logger.Logger, pollPeriod tim
orm: orm,
lggr: lggr,
replayStart: make(chan ReplayRequest),
replayComplete: make(chan struct{}),
replayComplete: make(chan error),
done: make(chan struct{}),
pollPeriod: pollPeriod,
finalityDepth: finalityDepth,
Expand Down Expand Up @@ -184,12 +185,14 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) error {
}
// Block until replay complete or cancelled.
select {
case <-lp.replayComplete:
case err := <-lp.replayComplete:
return err
case <-lp.ctx.Done():
return ErrReplayAbortedOnShutdown
case <-ctx.Done():
return ErrReplayAbortedByClient
}
// Should never reach here.
return nil
}

Expand All @@ -211,6 +214,22 @@ func (lp *logPoller) Close() error {
})
}

func (lp *logPoller) getReplayFromBlock(ctx context.Context, requested int64) (int64, error) {
lastProcessed, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx))
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
// Real DB error
return 0, err
}
// Nothing in db, use requested
return requested, nil
}
// We have lastProcessed, take min(requested, lastProcessed).
// This is to avoid replaying from a block later than what we have in the DB
// and skipping blocks.
return mathutil.Min(requested, lastProcessed.BlockNumber), nil
}

func (lp *logPoller) run() {
defer close(lp.done)
tick := time.After(0)
Expand All @@ -219,17 +238,22 @@ func (lp *logPoller) run() {
case <-lp.ctx.Done():
return
case replayReq := <-lp.replayStart:
lp.lggr.Warnw("Executing replay", "fromBlock", replayReq.fromBlock)
// Serially process replay requests.
lp.pollAndSaveLogs(replayReq.ctx, replayReq.fromBlock)
fromBlock, err := lp.getReplayFromBlock(replayReq.ctx, replayReq.fromBlock)
if err == nil {
// Serially process replay requests.
lp.lggr.Warnw("Executing replay", "fromBlock", fromBlock, "requested", replayReq.fromBlock)
lp.pollAndSaveLogs(replayReq.ctx, fromBlock)
} else {
lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err)
}
select {
case <-lp.ctx.Done():
// We're shutting down, lets return.
return
case <-replayReq.ctx.Done():
// Client gave up, lets continue.
continue
case lp.replayComplete <- struct{}{}:
case lp.replayComplete <- err:
}
case <-tick:
tick = time.After(utils.WithJitter(lp.pollPeriod))
Expand Down
Loading

0 comments on commit 0283788

Please sign in to comment.