Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: checkpoint logic for EVM, ICON, and WASM chains #329

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion relayer/chains/evm/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (p *Provider) Listener(ctx context.Context, lastProcessedTx relayertypes.La
}
}
}
startHeight = p.GetLastSavedBlockHeight()
startHeight = p.GetCheckpoint()
subscribeStart.Reset(time.Second * 1)
case <-subscribeStart.C:
subscribeStart.Stop()
Expand Down Expand Up @@ -132,6 +132,7 @@ func (p *Provider) Listener(ctx context.Context, lastProcessedTx relayertypes.La
Messages: []*relayertypes.Message{message},
}
}
p.SetLastProcessedHeight(br.end)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions relayer/chains/evm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Provider struct {
NonceTracker types.NonceTrackerI
LastSavedHeightFunc func() uint64
routerMutex *sync.Mutex
LastProcessedHeight uint64
}

func (p *Config) NewProvider(ctx context.Context, log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) {
Expand Down Expand Up @@ -479,3 +480,8 @@ func (p *Provider) SetLastSavedHeightFunc(f func() uint64) {
func (p *Provider) GetLastSavedBlockHeight() uint64 {
return p.LastSavedHeightFunc()
}

// SetLastProcessedHeight sets the last processed height
func (p *Provider) SetLastProcessedHeight(height uint64) {
p.LastProcessedHeight = height
}
11 changes: 11 additions & 0 deletions relayer/chains/evm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,14 @@ func (p *Provider) QueryTransactionReceipt(ctx context.Context, txHash string) (

return finalizedReceipt, nil
}

// GetCheckpoint returns the last processed height
// Get the last processed height if polling is processing
// get latest height if polling is not processing
func (p *Provider) GetCheckpoint() uint64 {
lastSavedHeight := p.GetLastSavedBlockHeight()
if p.LastProcessedHeight > lastSavedHeight {
return p.LastProcessedHeight
}
return lastSavedHeight
}
8 changes: 3 additions & 5 deletions relayer/chains/icon/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,13 @@ func (p *Provider) Listener(ctx context.Context, lastProcessedTx providerTypes.L
if !errors.Is(ctx.Err(), context.Canceled) {
p.log.Debug("event notification received", zap.Any("event", v))
if v.Progress != "" {
height, err := v.Progress.Value()
height, err := v.Progress.Uint64()
if err != nil {
p.log.Error("failed to get progress height", zap.Error(err))
return err
}
if height > 0 {
eventReq.Height = types.NewHexInt(height)
}
if height < processedheight {
p.log.Info("Synced", zap.Int64("height", height), zap.Int64("delta", processedheight-height))
p.SetLastProcessedHeight(height)
}
return nil
}
Expand Down Expand Up @@ -106,6 +103,7 @@ func (p *Provider) Listener(ctx context.Context, lastProcessedTx providerTypes.L
if errors.Is(err, context.Canceled) {
return
}
eventReq.Height = types.NewHexInt(int64(p.GetCheckpoint()))
time.Sleep(time.Second * 3)
reconnect()
p.log.Warn("error occured during monitor event", zap.Error(err))
Expand Down
15 changes: 15 additions & 0 deletions relayer/chains/icon/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type Provider struct {
contracts map[string]providerTypes.EventMap
networkID types.HexInt
LastSavedHeightFunc func() uint64
LastProcessedHeight uint64
}

func (p *Provider) NID() string {
Expand Down Expand Up @@ -276,3 +277,17 @@ func (p *Provider) SetLastSavedHeightFunc(f func() uint64) {
func (p *Provider) GetLastSavedBlockHeight() uint64 {
return p.LastSavedHeightFunc()
}

// SetLastProcessedHeight sets the last processed height
func (p *Provider) SetLastProcessedHeight(height uint64) {
p.LastProcessedHeight = height
}

// GetCheckpoint returns the last processed height
func (p *Provider) GetCheckpoint() uint64 {
lastSavedHeight := p.GetLastSavedBlockHeight()
if p.LastProcessedHeight > lastSavedHeight {
return p.LastProcessedHeight
}
return lastSavedHeight
}
6 changes: 6 additions & 0 deletions relayer/chains/icon/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ func (i HexInt) BigInt() (*big.Int, error) {
}
}

func (i HexInt) Uint64() (uint64, error) {
s := strings.TrimPrefix(string(i), "0x")
v, err := strconv.ParseUint(s, 16, 64)
return v, err
}

func decodeHexNumber(s string) (bool, []byte, error) {
negative := false
if len(s) > 0 && s[0] == '-' {
Expand Down
27 changes: 22 additions & 5 deletions relayer/chains/wasm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Provider struct {
eventList []sdkTypes.Event
LastSavedHeightFunc func() uint64
routerMutex *sync.Mutex
LastProcessedHeight uint64
}

func (p *Provider) QueryLatestHeight(ctx context.Context) (uint64, error) {
Expand Down Expand Up @@ -146,15 +147,15 @@ func (p *Provider) Listener(ctx context.Context, lastProcessedTx relayTypes.Last
}
case <-pollHeightTicker.C:
pollHeightTicker.Stop()
startHeight = p.GetLastSavedHeight()
if startHeight == 0 {
startHeight = latestHeight
}
startHeight = p.GetCheckpoint()
latestHeight, err = p.QueryLatestHeight(ctx)
if err != nil {
if err != nil {
p.logger.Error("failed to get latest block height", zap.Error(err))
pollHeightTicker.Reset(time.Second * 3)
}
if startHeight == 0 {
startHeight = latestHeight
}
}
}
}
Expand Down Expand Up @@ -844,3 +845,19 @@ func (p *Provider) SetLastSavedHeightFunc(f func() uint64) {
func (p *Provider) GetLastSavedHeight() uint64 {
return p.LastSavedHeightFunc()
}

// SetLastProcessedHeight sets the last processed height
func (p *Provider) SetLastProcessedHeight(height uint64) {
p.LastProcessedHeight = height
}

// GetCheckpoint returns the checkpoint height
// It returns the last processed height if it is greater than the last saved height
// Otherwise, it returns the last saved height
func (p *Provider) GetCheckpoint() uint64 {
lastSavedHeight := p.GetLastSavedHeight()
if p.LastProcessedHeight > lastSavedHeight {
return p.LastProcessedHeight
}
return lastSavedHeight
}
1 change: 1 addition & 0 deletions relayer/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config interface {
}

type ChainQuery interface {
GetCheckpoint() uint64
QueryLatestHeight(ctx context.Context) (uint64, error)
QueryTransactionReceipt(ctx context.Context, txHash string) (*types.Receipt, error)
}
Expand Down
12 changes: 8 additions & 4 deletions relayer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,14 @@ func (r *Relayer) SaveChainsBlockHeight(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
for nid, chain := range r.chains {
height, err := chain.Provider.QueryLatestHeight(ctx)
if err != nil {
r.log.Error("error occured when querying latest height", zap.String("nid", nid), zap.Error(err))
continue
height := chain.Provider.GetCheckpoint()
if chain.LastSavedHeight == height {
latestHeight, err := chain.Provider.QueryLatestHeight(ctx)
if err != nil {
r.log.Error("error occured when fetching latest height", zap.String("nid", nid), zap.Error(err))
continue
}
height = latestHeight
}
if err := r.SaveBlockHeight(ctx, chain, height); err != nil {
r.log.Error("error occured when saving block height", zap.String("nid", nid), zap.Error(err))
Expand Down
Loading