Skip to content

Commit

Permalink
Merge pull request #2762 from synapsecns/feat/listener-finality
Browse files Browse the repository at this point in the history
feat(listener): listener safe, finalized, latest block numbers
  • Loading branch information
golangisfun123 authored Jun 21, 2024
2 parents 37af1a8 + 888792f commit f25026c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 11 deletions.
22 changes: 21 additions & 1 deletion ethergo/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ipfs/go-log"
"github.com/jpillora/backoff"
"github.com/synapsecns/sanguine/core/metrics"
Expand Down Expand Up @@ -48,6 +49,8 @@ type chainListener struct {
pollInterval, pollIntervalSetting time.Duration
// newBlockHandler is an optional handler that is called when a new block is detected.
newBlockHandler NewBlockHandler
finalityMode rpc.BlockNumber
blockWait uint64
}

var (
Expand All @@ -66,6 +69,8 @@ func NewChainListener(omnirpcClient client.EVM, store listenerDB.ChainListenerDB
client: omnirpcClient,
backoff: newBackoffConfig(),
pollIntervalSetting: time.Millisecond * 50,
finalityMode: rpc.LatestBlockNumber,
blockWait: 0,
}

for _, option := range options {
Expand Down Expand Up @@ -132,7 +137,7 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er
}()

oldLatestBlock := c.latestBlock
c.latestBlock, err = c.client.BlockNumber(ctx)
c.latestBlock, err = c.getBlockNumber(ctx)
if err != nil {
return fmt.Errorf("could not get block number: %w", err)
}
Expand Down Expand Up @@ -183,6 +188,21 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er
return nil
}

func (c chainListener) getBlockNumber(ctx context.Context) (uint64, error) {
block, err := c.client.BlockByNumber(ctx, big.NewInt(c.finalityMode.Int64()))
if err != nil {
return 0, fmt.Errorf("could not get block by number: %w", err)
}

blockNumber := block.Number()

if c.blockWait > 0 {
blockNumber.Sub(blockNumber, big.NewInt(int64(c.blockWait)))
}

return blockNumber.Uint64(), nil
}

func (c chainListener) getMetadata(parentCtx context.Context) (startBlock, chainID uint64, err error) {
var lastIndexed uint64
ctx, span := c.handler.Tracer().Start(parentCtx, "getMetadata")
Expand Down
78 changes: 68 additions & 10 deletions ethergo/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,22 @@ package listener_test
import (
"context"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/synapsecns/sanguine/ethergo/listener"
"sync"
)

func (l *ListenerTestSuite) TestListenForEvents() {
_, handle := l.manager.GetCounter(l.GetTestContext(), l.backend)
var wg sync.WaitGroup
const iterations = 10
for i := 0; i < iterations; i++ {
i := i
wg.Add(1)
go func(_ int) {
go func() {
defer wg.Done()

auth := l.backend.GetTxContext(l.GetTestContext(), nil)

//nolint:typecheck
bridgeRequestTX, err := handle.IncrementCounter(auth.TransactOpts)
l.NoError(err)
Expand All @@ -32,18 +30,55 @@ func (l *ListenerTestSuite) TestListenForEvents() {
l.NoError(err)
l.NotNil(bridgeResponseTX)
l.backend.WaitForConfirmation(l.GetTestContext(), bridgeResponseTX)
}(i)
}()
}

wg.Wait()

startBlock, err := handle.DeployBlock(&bind.CallOpts{Context: l.GetTestContext()})
l.NoError(err)

cl, err := listener.NewChainListener(l.backend, l.store, handle.Address(), uint64(startBlock.Int64()), l.metrics, listener.WithNewBlockHandler(func(ctx context.Context, block uint64) error {
fmt.Println(block)
return nil
}))
cl, err := listener.NewChainListener(
l.backend,
l.store,
handle.Address(),
uint64(startBlock.Int64()),
l.metrics,
listener.WithNewBlockHandler(func(ctx context.Context, block uint64) error {
fmt.Println(block)
return nil
}),
)
l.NoError(err)

clSafe, err := listener.NewChainListener(
l.backend,
l.store,
handle.Address(),
uint64(startBlock.Int64()),
l.metrics,
listener.WithNewBlockHandler(func(ctx context.Context, block uint64) error {
fmt.Println(block)
return nil
}),
listener.WithFinalityMode("safe"),
listener.WithBlockWait(10),
)
l.NoError(err)

clFinalized, err := listener.NewChainListener(
l.backend,
l.store,
handle.Address(),
uint64(startBlock.Int64()),
l.metrics,
listener.WithNewBlockHandler(func(ctx context.Context, block uint64) error {
fmt.Println(block)
return nil
}),
listener.WithFinalityMode("finalized"),
listener.WithBlockWait(10),
)
l.NoError(err)

eventCount := 0
Expand All @@ -59,4 +94,27 @@ func (l *ListenerTestSuite) TestListenForEvents() {

return nil
})

_ = clSafe.Listen(listenCtx, func(ctx context.Context, log types.Log) error {
eventCount++

if eventCount == iterations*2 {
cancel()
}

return nil
})

_ = clFinalized.Listen(listenCtx, func(ctx context.Context, log types.Log) error {
eventCount++

if eventCount == iterations*2 {
cancel()
}

return nil
})

l.NotEqual(cl.LatestBlock(), clFinalized.LatestBlock(), clSafe.LatestBlock())

}
23 changes: 23 additions & 0 deletions ethergo/listener/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package listener
import (
"context"
"time"

"github.com/ethereum/go-ethereum/rpc"
)

// Option is a functional option for chainListener.
Expand All @@ -24,3 +26,24 @@ func WithPollInterval(interval time.Duration) Option {
c.pollIntervalSetting = interval
}
}

// WithFinalityMode sets the finality mode.
func WithFinalityMode(mode string) Option {
return func(c *chainListener) {
switch mode {
case "latest":
c.finalityMode = rpc.LatestBlockNumber
case "safe":
c.finalityMode = rpc.SafeBlockNumber
case "finalized":
c.finalityMode = rpc.FinalizedBlockNumber
}
}
}

// WithBlockWait sets the block wait.
func WithBlockWait(wait uint64) Option {
return func(c *chainListener) {
c.blockWait = wait
}
}

0 comments on commit f25026c

Please sign in to comment.