diff --git a/ethergo/listener/listener.go b/ethergo/listener/listener.go index 8e8befd582..e55a997b09 100644 --- a/ethergo/listener/listener.go +++ b/ethergo/listener/listener.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" "github.com/ipfs/go-log" "github.com/jpillora/backoff" "github.com/synapsecns/sanguine/core/metrics" @@ -48,6 +49,8 @@ type chainListener struct { pollInterval, pollIntervalSetting time.Duration // newBlockHandler is an optional handler that is called when a new block is detected. newBlockHandler NewBlockHandler + finalityMode rpc.BlockNumber + blockWait uint64 } var ( @@ -66,6 +69,8 @@ func NewChainListener(omnirpcClient client.EVM, store listenerDB.ChainListenerDB client: omnirpcClient, backoff: newBackoffConfig(), pollIntervalSetting: time.Millisecond * 50, + finalityMode: rpc.LatestBlockNumber, + blockWait: 0, } for _, option := range options { @@ -132,7 +137,7 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er }() oldLatestBlock := c.latestBlock - c.latestBlock, err = c.client.BlockNumber(ctx) + c.latestBlock, err = c.getBlockNumber(ctx) if err != nil { return fmt.Errorf("could not get block number: %w", err) } @@ -183,6 +188,21 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er return nil } +func (c chainListener) getBlockNumber(ctx context.Context) (uint64, error) { + block, err := c.client.BlockByNumber(ctx, big.NewInt(c.finalityMode.Int64())) + if err != nil { + return 0, fmt.Errorf("could not get block by number: %w", err) + } + + blockNumber := block.Number() + + if c.blockWait > 0 { + blockNumber.Sub(blockNumber, big.NewInt(int64(c.blockWait))) + } + + return blockNumber.Uint64(), nil +} + func (c chainListener) getMetadata(parentCtx context.Context) (startBlock, chainID uint64, err error) { var lastIndexed uint64 ctx, span := c.handler.Tracer().Start(parentCtx, "getMetadata") diff --git a/ethergo/listener/listener_test.go b/ethergo/listener/listener_test.go index 633a5cfcef..7feae7f5d4 100644 --- a/ethergo/listener/listener_test.go +++ b/ethergo/listener/listener_test.go @@ -3,10 +3,11 @@ package listener_test import ( "context" "fmt" + "sync" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/core/types" "github.com/synapsecns/sanguine/ethergo/listener" - "sync" ) func (l *ListenerTestSuite) TestListenForEvents() { @@ -14,13 +15,10 @@ func (l *ListenerTestSuite) TestListenForEvents() { var wg sync.WaitGroup const iterations = 10 for i := 0; i < iterations; i++ { - i := i wg.Add(1) - go func(_ int) { + go func() { defer wg.Done() - auth := l.backend.GetTxContext(l.GetTestContext(), nil) - //nolint:typecheck bridgeRequestTX, err := handle.IncrementCounter(auth.TransactOpts) l.NoError(err) @@ -32,7 +30,7 @@ func (l *ListenerTestSuite) TestListenForEvents() { l.NoError(err) l.NotNil(bridgeResponseTX) l.backend.WaitForConfirmation(l.GetTestContext(), bridgeResponseTX) - }(i) + }() } wg.Wait() @@ -40,10 +38,47 @@ func (l *ListenerTestSuite) TestListenForEvents() { startBlock, err := handle.DeployBlock(&bind.CallOpts{Context: l.GetTestContext()}) l.NoError(err) - cl, err := listener.NewChainListener(l.backend, l.store, handle.Address(), uint64(startBlock.Int64()), l.metrics, listener.WithNewBlockHandler(func(ctx context.Context, block uint64) error { - fmt.Println(block) - return nil - })) + cl, err := listener.NewChainListener( + l.backend, + l.store, + handle.Address(), + uint64(startBlock.Int64()), + l.metrics, + listener.WithNewBlockHandler(func(ctx context.Context, block uint64) error { + fmt.Println(block) + return nil + }), + ) + l.NoError(err) + + clSafe, err := listener.NewChainListener( + l.backend, + l.store, + handle.Address(), + uint64(startBlock.Int64()), + l.metrics, + listener.WithNewBlockHandler(func(ctx context.Context, block uint64) error { + fmt.Println(block) + return nil + }), + listener.WithFinalityMode("safe"), + listener.WithBlockWait(10), + ) + l.NoError(err) + + clFinalized, err := listener.NewChainListener( + l.backend, + l.store, + handle.Address(), + uint64(startBlock.Int64()), + l.metrics, + listener.WithNewBlockHandler(func(ctx context.Context, block uint64) error { + fmt.Println(block) + return nil + }), + listener.WithFinalityMode("finalized"), + listener.WithBlockWait(10), + ) l.NoError(err) eventCount := 0 @@ -59,4 +94,27 @@ func (l *ListenerTestSuite) TestListenForEvents() { return nil }) + + _ = clSafe.Listen(listenCtx, func(ctx context.Context, log types.Log) error { + eventCount++ + + if eventCount == iterations*2 { + cancel() + } + + return nil + }) + + _ = clFinalized.Listen(listenCtx, func(ctx context.Context, log types.Log) error { + eventCount++ + + if eventCount == iterations*2 { + cancel() + } + + return nil + }) + + l.NotEqual(cl.LatestBlock(), clFinalized.LatestBlock(), clSafe.LatestBlock()) + } diff --git a/ethergo/listener/options.go b/ethergo/listener/options.go index 89c45c759d..9413ab315e 100644 --- a/ethergo/listener/options.go +++ b/ethergo/listener/options.go @@ -3,6 +3,8 @@ package listener import ( "context" "time" + + "github.com/ethereum/go-ethereum/rpc" ) // Option is a functional option for chainListener. @@ -24,3 +26,24 @@ func WithPollInterval(interval time.Duration) Option { c.pollIntervalSetting = interval } } + +// WithFinalityMode sets the finality mode. +func WithFinalityMode(mode string) Option { + return func(c *chainListener) { + switch mode { + case "latest": + c.finalityMode = rpc.LatestBlockNumber + case "safe": + c.finalityMode = rpc.SafeBlockNumber + case "finalized": + c.finalityMode = rpc.FinalizedBlockNumber + } + } +} + +// WithBlockWait sets the block wait. +func WithBlockWait(wait uint64) Option { + return func(c *chainListener) { + c.blockWait = wait + } +}