Skip to content

Commit

Permalink
introduce reorg handler
Browse files Browse the repository at this point in the history
  • Loading branch information
fbac committed Jan 21, 2025
1 parent f065992 commit f81d6cd
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 100 deletions.
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ packages:
github.com/xmtp/xmtpd/pkg/indexer:
interfaces:
IBlockTracker:
ChainReorgHandler:
github.com/xmtp/xmtpd/pkg/blockchain:
interfaces:
ChainClient:
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *RpcLogStreamer) getNextPage(
r.logger.Debug("Chain is up to date. Skipping update")
return []types.Log{}, nil, nil
}
numOfBlocksToProcess := highestBlockCanProcess - fromBlock + 1
numOfBlocksToProcess := (highestBlockCanProcess - fromBlock) + 1

var to uint64
// Make sure we stay within a reasonable page size
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ INSERT INTO blockchain_messages(block_number, block_hash, originator_node_id, or
-- Returns blocks in descending order (newest to oldest)
-- StartBlock should be the lower bound (older block)
-- EndBlock should be the upper bound (newer block)
-- Example: GetBlocksInRange(1000, 2000) gets blocks 2000 down to 1000
-- Example: GetBlocksInRange(1000, 2000), returns 1000, 1001, 1002, ..., 2000
SELECT DISTINCT ON (block_number)
block_number,
block_hash
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"database/sql"
"encoding/hex"
"errors"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -295,7 +296,7 @@ func indexLogs(
}

reorgCheckAt = event.BlockNumber
logger.Debug("periodic blockchain reorg check",
logger.Debug("blockchain reorg periodic check",
zap.Uint64("blockNumber", reorgCheckAt),
)

Expand All @@ -306,19 +307,22 @@ func indexLogs(
zap.String("onchainBlockHash", onchainBlock.Hash().String()),
)

reorgBlockNumber, reorgBlockHash, err := reorgHandler.FindCommonAncestor(
reorgBlockNumber, reorgBlockHash, err := reorgHandler.FindReorgPoint(
storedBlockNumber,
)
if err != nil {
logger.Error("error finding common ancestor", zap.Error(err))
if err != nil && !errors.Is(err, ErrNoBlocksFound) {
logger.Error("reorg point not found", zap.Error(err))
continue
}

reorgDetectedAt = storedBlockNumber
reorgBeginsAt = reorgBlockNumber
reorgFinishesAt = storedBlockNumber

blockTracker.UpdateLatestBlock(ctx, reorgBlockNumber, reorgBlockHash)
if trackerErr := blockTracker.UpdateLatestBlock(ctx, reorgBlockNumber, reorgBlockHash); trackerErr != nil {
logger.Error("error updating block tracker", zap.Error(trackerErr))
}

reorgChannel <- reorgBlockNumber
continue
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ func TestIndexLogsSuccess(t *testing.T) {
blockTracker.EXPECT().
UpdateLatestBlock(mock.Anything, newBlockNumber, newBlockHash.Bytes()).
Return(nil)
blockTracker.EXPECT().
GetLatestBlock().
Return(newBlockNumber, newBlockHash.Bytes())

reorgHandler := indexerMocks.NewMockChainReorgHandler(t)

logStorer := storerMocks.NewMockLogStorer(t)
logStorer.EXPECT().
Expand All @@ -62,6 +61,7 @@ func TestIndexLogsSuccess(t *testing.T) {
testutils.NewLog(t),
logStorer,
blockTracker,
reorgHandler,
)

time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -90,11 +90,8 @@ func TestIndexLogsRetryableError(t *testing.T) {

mockClient := blockchainMocks.NewMockChainClient(t)
logStorer := storerMocks.NewMockLogStorer(t)

blockTracker := indexerMocks.NewMockIBlockTracker(t)
blockTracker.EXPECT().
GetLatestBlock().
Return(newBlockNumber, newBlockHash.Bytes())
reorgHandler := indexerMocks.NewMockChainReorgHandler(t)

// Will fail for the first call with a retryable error and a non-retryable error on the second call
attemptNumber := 0
Expand All @@ -116,6 +113,7 @@ func TestIndexLogsRetryableError(t *testing.T) {
testutils.NewLog(t),
logStorer,
blockTracker,
reorgHandler,
)

time.Sleep(200 * time.Millisecond)
Expand Down
144 changes: 144 additions & 0 deletions pkg/indexer/reorgHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package indexer

import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"math/big"

"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db/queries"
)

type ChainReorgHandler interface {
FindReorgPoint(detectedAt uint64) (uint64, []byte, error)
}

type ReorgHandler struct {
ctx context.Context
client blockchain.ChainClient
queries *queries.Queries
}

var (
ErrNoBlocksFound = errors.New("no blocks found")
ErrGetBlock = errors.New("failed to get block")
)

// TODO: Make this configurable?
const BLOCK_RANGE_SIZE uint64 = 1000

func NewChainReorgHandler(
ctx context.Context,
client blockchain.ChainClient,
queries *queries.Queries,
) *ReorgHandler {
return &ReorgHandler{
ctx: ctx,
client: client,
queries: queries,
}
}

// TODO: When reorg range has been calculated, alert clients (TBD)
func (r *ReorgHandler) FindReorgPoint(detectedAt uint64) (uint64, []byte, error) {
startBlock, endBlock := blockRange(detectedAt)

for {
storedBlocks, err := r.queries.GetBlocksInRange(
r.ctx,
queries.GetBlocksInRangeParams{
StartBlock: startBlock,
EndBlock: endBlock,
},
)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return 0, nil, fmt.Errorf("failed to get stored blocks: %w", err)
}

if len(storedBlocks) == 0 || errors.Is(err, sql.ErrNoRows) {
if startBlock == 0 {
return 0, nil, ErrNoBlocksFound
}

startBlock, endBlock = blockRange(startBlock)
continue
}

oldestBlock := storedBlocks[0]
chainBlock, err := r.client.BlockByNumber(r.ctx, big.NewInt(int64(oldestBlock.BlockNumber)))
if err != nil {
return 0, nil, fmt.Errorf("%w %d: %v", ErrGetBlock, oldestBlock.BlockNumber, err)
}

// Oldest block doesn't match, reorg happened earlier in the chain
if !bytes.Equal(oldestBlock.BlockHash, chainBlock.Hash().Bytes()) {
if startBlock == 0 {
return 0, nil, ErrNoBlocksFound
}

startBlock, endBlock = blockRange(startBlock)
continue
}

// Oldest block matches, reorg happened in this range
return r.searchInRange(storedBlocks)
}
}

func (r *ReorgHandler) searchInRange(blocks []queries.GetBlocksInRangeRow) (uint64, []byte, error) {
left, right := 0, len(blocks)-1
for left <= right {
mid := (left + right) / 2
block := blocks[mid]

chainBlock, err := r.client.BlockByNumber(
r.ctx,
big.NewInt(int64(block.BlockNumber)),
)
if err != nil {
return 0, nil, fmt.Errorf("%w %d: %v", ErrGetBlock, block.BlockNumber, err)
}

if bytes.Equal(block.BlockHash, chainBlock.Hash().Bytes()) {
// Found a matching block, check if next block differs to confirm reorg point
if mid < len(blocks)-1 {
nextBlock := blocks[mid+1]
nextChainBlock, err := r.client.BlockByNumber(
r.ctx,
big.NewInt(int64(nextBlock.BlockNumber)),
)
if err != nil {
return 0, nil, fmt.Errorf("%w %d: %v", ErrGetBlock, nextBlock.BlockNumber, err)
}

if !bytes.Equal(nextBlock.BlockHash, nextChainBlock.Hash().Bytes()) {
return block.BlockNumber, chainBlock.Hash().Bytes(), nil
}
} else if mid == len(blocks)-1 {
return block.BlockNumber, chainBlock.Hash().Bytes(), nil
}

// If next block doesn't differ, search upper half
left = mid + 1
} else {
// If chainBlock differs, search lower half
right = mid - 1
}
}

// TODO: This should never happen, start from 0?
return 0, nil, fmt.Errorf("reorg point not found")
}

func blockRange(from uint64) (startBlock uint64, endBlock uint64) {
endBlock = from

if endBlock > BLOCK_RANGE_SIZE {
startBlock = endBlock - BLOCK_RANGE_SIZE
}

return startBlock, endBlock
}
43 changes: 43 additions & 0 deletions pkg/indexer/reorgHandler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package indexer

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_blockRange(t *testing.T) {
tests := []struct {
name string
from uint64
wantStartBlock uint64
wantEndBlock uint64
}{
{
name: "block range with subtraction",
from: 1001,
wantStartBlock: 1,
wantEndBlock: 1001,
},
{
name: "block range without subtraction",
from: 500,
wantStartBlock: 0,
wantEndBlock: 500,
},
{
name: "block range zero",
from: 0,
wantStartBlock: 0,
wantEndBlock: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
startBlock, endBlock := blockRange(tt.from)
assert.Equal(t, tt.wantStartBlock, startBlock)
assert.Equal(t, tt.wantEndBlock, endBlock)
})
}
}
26 changes: 1 addition & 25 deletions pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storer

import (
"context"
"database/sql"
"errors"

"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -85,36 +84,13 @@ func (s *GroupMessageStorer) StoreLog(
return NewLogStorageError(err, false)
}

version := sql.NullInt32{Int32: 1, Valid: true}

if appendLog {

Check failure on line 87 in pkg/indexer/storer/groupMessage.go

View workflow job for this annotation

GitHub Actions / Lint-Go

SA9003: empty branch (staticcheck)
version, err = GetVersionForAppend(
ctx,
s.queries,
s.logger,
GROUP_MESSAGE_ORIGINATOR_ID,
int64(msgSent.SequenceId),
)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return NewLogStorageError(err, true)
}
if errors.Is(err, sql.ErrNoRows) {
s.logger.Debug("No rows found for envelope, inserting new",
zap.Int("originator_node_id", GROUP_MESSAGE_ORIGINATOR_ID),
zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)),
)
}
}
// placeholder
}

s.logger.Debug("Inserting message from contract", zap.String("topic", topicStruct.String()))

if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
BlockNumber: sql.NullInt64{Int64: int64(event.BlockNumber), Valid: true},
BlockHash: event.BlockHash.Bytes(),
Version: version,
IsCanonical: sql.NullBool{Bool: true, Valid: true},
OriginatorNodeID: GROUP_MESSAGE_ORIGINATOR_ID,
OriginatorSequenceID: int64(msgSent.SequenceId),
Topic: topicStruct.Bytes(),
Expand Down
Loading

0 comments on commit f81d6cd

Please sign in to comment.