diff --git a/.mockery.yaml b/.mockery.yaml index ef28dc75..908e50a4 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -20,6 +20,7 @@ packages: github.com/xmtp/xmtpd/pkg/indexer: interfaces: IBlockTracker: + ChainReorgHandler: github.com/xmtp/xmtpd/pkg/blockchain: interfaces: ChainClient: diff --git a/pkg/blockchain/interface.go b/pkg/blockchain/interface.go index 904e4f41..fc7e049e 100644 --- a/pkg/blockchain/interface.go +++ b/pkg/blockchain/interface.go @@ -30,6 +30,7 @@ type ChainClient interface { ethereum.BlockNumberReader ethereum.LogFilterer ethereum.ChainIDReader + ethereum.ChainReader } type TransactionSigner interface { diff --git a/pkg/blockchain/rpcLogStreamer.go b/pkg/blockchain/rpcLogStreamer.go index 2b58edb1..86dd4582 100644 --- a/pkg/blockchain/rpcLogStreamer.go +++ b/pkg/blockchain/rpcLogStreamer.go @@ -48,13 +48,21 @@ func (c *RpcLogStreamBuilder) ListenForContractEvent( contractAddress common.Address, topics []common.Hash, maxDisconnectTime time.Duration, -) <-chan types.Log { +) (<-chan types.Log, chan<- uint64) { eventChannel := make(chan types.Log, 100) + reorgChannel := make(chan uint64, 1) c.contractConfigs = append( c.contractConfigs, - contractConfig{fromBlock, contractAddress, topics, eventChannel, maxDisconnectTime}, + contractConfig{ + fromBlock, + contractAddress, + topics, + eventChannel, + reorgChannel, + maxDisconnectTime, + }, ) - return eventChannel + return eventChannel, reorgChannel } func (c *RpcLogStreamBuilder) Build() (*RpcLogStreamer, error) { @@ -66,7 +74,8 @@ type contractConfig struct { fromBlock uint64 contractAddress common.Address topics []common.Hash - channel chan<- types.Log + eventChannel chan<- types.Log + reorgChannel chan uint64 maxDisconnectTime time.Duration } @@ -119,12 +128,19 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) { fromBlock := watcher.fromBlock logger := r.logger.With(zap.String("contractAddress", watcher.contractAddress.Hex())) startTime := time.Now() - defer close(watcher.channel) + defer close(watcher.eventChannel) + for { select { case <-r.ctx.Done(): logger.Debug("Stopping watcher") return + case reorgBlock := <-watcher.reorgChannel: + fromBlock = reorgBlock + logger.Info( + "Blockchain reorg detected, resuming from block", + zap.Uint64("fromBlock", fromBlock), + ) default: logs, nextBlock, err := r.getNextPage(watcher, fromBlock) if err != nil { @@ -157,7 +173,7 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) { time.Sleep(NO_LOGS_SLEEP_TIME) } for _, log := range logs { - watcher.channel <- log + watcher.eventChannel <- log } if nextBlock != nil { fromBlock = *nextBlock @@ -182,7 +198,7 @@ func (r *RpcLogStreamer) getNextPage( r.logger.Debug("Chain is up to date. Skipping update") return []types.Log{}, nil, nil } - numOfBlocksToProcess := highestBlockCanProcess - fromBlock + 1 + numOfBlocksToProcess := (highestBlockCanProcess - fromBlock) + 1 var to uint64 // Make sure we stay within a reasonable page size @@ -211,6 +227,10 @@ func (r *RpcLogStreamer) getNextPage( return logs, &nextBlockNumber, nil } +func (r *RpcLogStreamer) Client() ChainClient { + return r.client +} + func buildFilterQuery( contractConfig contractConfig, fromBlock int64, diff --git a/pkg/blockchain/rpcLogStreamer_test.go b/pkg/blockchain/rpcLogStreamer_test.go index 3990cc5b..2489d2f0 100644 --- a/pkg/blockchain/rpcLogStreamer_test.go +++ b/pkg/blockchain/rpcLogStreamer_test.go @@ -31,7 +31,7 @@ func buildStreamer( fromBlock: fromBlock, contractAddress: address, topics: []common.Hash{topic}, - channel: channel, + eventChannel: channel, } return NewRpcLogStreamer(context.Background(), client, log, []contractConfig{cfg}), channel } @@ -41,7 +41,7 @@ func TestBuilder(t *testing.T) { require.NoError(t, err) builder := NewRpcLogStreamBuilder(context.Background(), testclient, testutils.NewLog(t)) - listenerChannel := builder.ListenForContractEvent( + listenerChannel, _ := builder.ListenForContractEvent( 1, testutils.RandomAddress(), []common.Hash{testutils.RandomLogTopic()}, 5*time.Minute, @@ -79,7 +79,7 @@ func TestRpcLogStreamer(t *testing.T) { fromBlock: fromBlock, contractAddress: address, topics: []common.Hash{topic}, - channel: make(chan types.Log), + eventChannel: make(chan types.Log), } logs, nextPage, err := streamer.getNextPage(cfg, fromBlock) diff --git a/pkg/db/queries.sql b/pkg/db/queries.sql index e199ba3c..71113788 100644 --- a/pkg/db/queries.sql +++ b/pkg/db/queries.sql @@ -129,4 +129,37 @@ SELECT FROM gateway_envelopes GROUP BY - originator_node_id; \ No newline at end of file + originator_node_id; + +-- name: InsertBlockchainMessage :exec +INSERT INTO blockchain_messages(block_number, block_hash, originator_node_id, originator_sequence_id, is_canonical) + VALUES (@block_number, @block_hash, @originator_node_id, @originator_sequence_id, @is_canonical) +ON CONFLICT + DO NOTHING; + +-- name: GetBlocksInRange :many +-- Returns blocks in ascending order (oldest to newest) +-- StartBlock should be the lower bound (older block) +-- EndBlock should be the upper bound (newer block) +-- Example: GetBlocksInRange(1000, 2000), returns 1000, 1001, 1002, ..., 2000 +SELECT DISTINCT ON (block_number) + block_number, + block_hash +FROM + blockchain_messages +WHERE + block_number BETWEEN @start_block AND @end_block + AND block_hash IS NOT NULL + AND is_canonical = TRUE +ORDER BY + block_number ASC, + block_hash; + +-- name: UpdateBlocksCanonicalityInRange :exec +UPDATE + blockchain_messages +SET + is_canonical = FALSE +WHERE + block_number >= @reorg_block_number; + diff --git a/pkg/db/queries/db.go b/pkg/db/queries/db.go index 1cbab906..51c445c9 100644 --- a/pkg/db/queries/db.go +++ b/pkg/db/queries/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.28.0 package queries diff --git a/pkg/db/queries/models.go b/pkg/db/queries/models.go index 6579fe5a..ae4f9436 100644 --- a/pkg/db/queries/models.go +++ b/pkg/db/queries/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.28.0 package queries @@ -16,6 +16,14 @@ type AddressLog struct { RevocationSequenceID sql.NullInt64 } +type BlockchainMessage struct { + BlockNumber uint64 + BlockHash []byte + OriginatorNodeID int32 + OriginatorSequenceID int64 + IsCanonical bool +} + type GatewayEnvelope struct { GatewayTime time.Time OriginatorNodeID int32 diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index f607340e..abe292e4 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.28.0 // source: queries.sql package queries @@ -75,6 +75,58 @@ func (q *Queries) GetAddressLogs(ctx context.Context, addresses []string) ([]Get return items, nil } +const getBlocksInRange = `-- name: GetBlocksInRange :many +SELECT DISTINCT ON (block_number) + block_number, + block_hash +FROM + blockchain_messages +WHERE + block_number BETWEEN $1 AND $2 + AND block_hash IS NOT NULL + AND is_canonical = TRUE +ORDER BY + block_number ASC, + block_hash +` + +type GetBlocksInRangeParams struct { + StartBlock uint64 + EndBlock uint64 +} + +type GetBlocksInRangeRow struct { + BlockNumber uint64 + BlockHash []byte +} + +// Returns blocks in descending order (newest to oldest) +// StartBlock should be the lower bound (older block) +// EndBlock should be the upper bound (newer block) +// Example: GetBlocksInRange(1000, 2000), returns 1000, 1001, 1002, ..., 2000 +func (q *Queries) GetBlocksInRange(ctx context.Context, arg GetBlocksInRangeParams) ([]GetBlocksInRangeRow, error) { + rows, err := q.db.QueryContext(ctx, getBlocksInRange, arg.StartBlock, arg.EndBlock) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetBlocksInRangeRow + for rows.Next() { + var i GetBlocksInRangeRow + if err := rows.Scan(&i.BlockNumber, &i.BlockHash); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getLatestBlock = `-- name: GetLatestBlock :one SELECT block_number, @@ -176,6 +228,32 @@ func (q *Queries) InsertAddressLog(ctx context.Context, arg InsertAddressLogPara return result.RowsAffected() } +const insertBlockchainMessage = `-- name: InsertBlockchainMessage :exec +INSERT INTO blockchain_messages(block_number, block_hash, originator_node_id, originator_sequence_id, is_canonical) + VALUES ($1, $2, $3, $4, $5) +ON CONFLICT + DO NOTHING +` + +type InsertBlockchainMessageParams struct { + BlockNumber uint64 + BlockHash []byte + OriginatorNodeID int32 + OriginatorSequenceID int64 + IsCanonical bool +} + +func (q *Queries) InsertBlockchainMessage(ctx context.Context, arg InsertBlockchainMessageParams) error { + _, err := q.db.ExecContext(ctx, insertBlockchainMessage, + arg.BlockNumber, + arg.BlockHash, + arg.OriginatorNodeID, + arg.OriginatorSequenceID, + arg.IsCanonical, + ) + return err +} + const insertGatewayEnvelope = `-- name: InsertGatewayEnvelope :execrows INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope) VALUES ($1, $2, $3, $4) @@ -444,3 +522,17 @@ func (q *Queries) SetLatestBlock(ctx context.Context, arg SetLatestBlockParams) _, err := q.db.ExecContext(ctx, setLatestBlock, arg.ContractAddress, arg.BlockNumber, arg.BlockHash) return err } + +const updateBlocksCanonicalityInRange = `-- name: UpdateBlocksCanonicalityInRange :exec +UPDATE + blockchain_messages +SET + is_canonical = FALSE +WHERE + block_number >= $1 +` + +func (q *Queries) UpdateBlocksCanonicalityInRange(ctx context.Context, reorgBlockNumber uint64) error { + _, err := q.db.ExecContext(ctx, updateBlocksCanonicalityInRange, reorgBlockNumber) + return err +} diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 4f12a9cd..df63a37b 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -1,8 +1,12 @@ package indexer import ( + "bytes" "context" "database/sql" + "encoding/hex" + "errors" + "math/big" "sync" "time" @@ -43,14 +47,20 @@ func NewIndexer( } } -func (s *Indexer) Close() { - s.log.Debug("Closing") - if s.streamer != nil { - s.streamer.streamer.Stop() +func (i *Indexer) Close() { + i.log.Debug("Closing") + if i.streamer != nil { + if i.streamer.messagesReorgChannel != nil { + close(i.streamer.messagesReorgChannel) + } + if i.streamer.identityUpdatesReorgChannel != nil { + close(i.streamer.identityUpdatesReorgChannel) + } + i.streamer.streamer.Stop() } - s.cancel() - s.wg.Wait() - s.log.Debug("Closed") + i.cancel() + i.wg.Wait() + i.log.Debug("Closed") } func (i *Indexer) StartIndexer( @@ -86,10 +96,13 @@ func (i *Indexer) StartIndexer( indexLogs( ctx, + streamer.streamer.Client(), streamer.messagesChannel, + streamer.messagesReorgChannel, indexingLogger, storer.NewGroupMessageStorer(querier, indexingLogger, messagesContract), streamer.messagesBlockTracker, + streamer.reorgHandler, ) }) @@ -107,7 +120,10 @@ func (i *Indexer) StartIndexer( With(zap.String("contractAddress", cfg.IdentityUpdatesContractAddress)) indexLogs( ctx, - streamer.identityUpdatesChannel, indexingLogger, + streamer.streamer.Client(), + streamer.identityUpdatesChannel, + streamer.identityUpdatesReorgChannel, + indexingLogger, storer.NewIdentityUpdateStorer( db, indexingLogger, @@ -115,6 +131,7 @@ func (i *Indexer) StartIndexer( validationService, ), streamer.identityUpdatesBlockTracker, + streamer.reorgHandler, ) }) @@ -124,8 +141,11 @@ func (i *Indexer) StartIndexer( type builtStreamer struct { streamer *blockchain.RpcLogStreamer + reorgHandler ChainReorgHandler messagesChannel <-chan types.Log + messagesReorgChannel chan<- uint64 identityUpdatesChannel <-chan types.Log + identityUpdatesReorgChannel chan<- uint64 identityUpdatesBlockTracker *BlockTracker messagesBlockTracker *BlockTracker } @@ -147,7 +167,7 @@ func configureLogStream( } latestBlockNumber, _ := messagesTracker.GetLatestBlock() - messagesChannel := builder.ListenForContractEvent( + messagesChannel, messagesReorgChannel := builder.ListenForContractEvent( latestBlockNumber, common.HexToAddress(cfg.MessagesContractAddress), []common.Hash{messagesTopic}, @@ -165,7 +185,7 @@ func configureLogStream( } latestBlockNumber, _ = identityUpdatesTracker.GetLatestBlock() - identityUpdatesChannel := builder.ListenForContractEvent( + identityUpdatesChannel, identityUpdatesReorgChannel := builder.ListenForContractEvent( latestBlockNumber, common.HexToAddress(cfg.IdentityUpdatesContractAddress), []common.Hash{identityUpdatesTopic}, @@ -177,10 +197,15 @@ func configureLogStream( return nil, err } + reorgHandler := NewChainReorgHandler(ctx, streamer.Client(), querier) + return &builtStreamer{ streamer: streamer, + reorgHandler: reorgHandler, messagesChannel: messagesChannel, + messagesReorgChannel: messagesReorgChannel, identityUpdatesChannel: identityUpdatesChannel, + identityUpdatesReorgChannel: identityUpdatesReorgChannel, identityUpdatesBlockTracker: identityUpdatesTracker, messagesBlockTracker: messagesTracker, }, nil @@ -195,20 +220,120 @@ The only non-retriable errors should be things like malformed events or failed v */ func indexLogs( ctx context.Context, + client blockchain.ChainClient, eventChannel <-chan types.Log, + reorgChannel chan<- uint64, logger *zap.Logger, logStorer storer.LogStorer, blockTracker IBlockTracker, + reorgHandler ChainReorgHandler, ) { - var err storer.LogStorageError + var ( + errStorage storer.LogStorageError + storedBlockNumber uint64 + storedBlockHash []byte + lastBlockSeen uint64 + reorgCheckInterval uint64 = 10 // TODO: Adapt based on blocks per batch + reorgCheckAt uint64 + reorgDetectedAt uint64 + reorgBeginsAt uint64 + reorgFinishesAt uint64 + reorgInProgress bool + ) + // We don't need to listen for the ctx.Done() here, since the eventChannel will be closed when the parent context is canceled for event := range eventChannel { + // 1.1 Handle active reorg state first + if reorgDetectedAt > 0 { + // Under a reorg, future events are no-op + if event.BlockNumber >= reorgDetectedAt { + logger.Debug("discarding future event due to reorg", + zap.Uint64("eventBlockNumber", event.BlockNumber), + zap.Uint64("reorgBlockNumber", reorgBeginsAt)) + continue + } + logger.Info("starting processing reorg", + zap.Uint64("eventBlockNumber", event.BlockNumber), + zap.Uint64("reorgBlockNumber", reorgBeginsAt)) + + // When all future events have been discarded, it means we've reached the reorg point + storedBlockNumber, storedBlockHash = blockTracker.GetLatestBlock() + lastBlockSeen = event.BlockNumber + reorgDetectedAt = 0 + reorgInProgress = true + } + + // 1.2 Handle deactivation of reorg state + if reorgInProgress && event.BlockNumber > reorgFinishesAt { + logger.Info("finished processing reorg", + zap.Uint64("eventBlockNumber", event.BlockNumber), + zap.Uint64("reorgFinishesAt", reorgFinishesAt)) + reorgInProgress = false + } + + // 2. Get the latest block from tracker once per block + if lastBlockSeen > 0 && lastBlockSeen != event.BlockNumber { + storedBlockNumber, storedBlockHash = blockTracker.GetLatestBlock() + } + lastBlockSeen = event.BlockNumber + + // 3. Check for reorgs, when: + // - There are no reorgs in progress + // - There's a stored block + // - The event block number is greater than the stored block number + // - The check interval has passed + if !reorgInProgress && + storedBlockNumber > 0 && + event.BlockNumber > storedBlockNumber && + event.BlockNumber >= reorgCheckAt+reorgCheckInterval { + onchainBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber))) + if err != nil { + logger.Error("error querying block from the blockchain", + zap.Uint64("blockNumber", storedBlockNumber), + zap.Error(err), + ) + continue + } + + reorgCheckAt = event.BlockNumber + logger.Debug("blockchain reorg periodic check", + zap.Uint64("blockNumber", reorgCheckAt), + ) + + if !bytes.Equal(storedBlockHash, onchainBlock.Hash().Bytes()) { + logger.Warn("blockchain reorg detected", + zap.Uint64("storedBlockNumber", storedBlockNumber), + zap.String("storedBlockHash", hex.EncodeToString(storedBlockHash)), + zap.String("onchainBlockHash", onchainBlock.Hash().String()), + ) + + reorgBlockNumber, reorgBlockHash, err := reorgHandler.FindReorgPoint( + storedBlockNumber, + ) + if err != nil && !errors.Is(err, ErrNoBlocksFound) { + logger.Error("reorg point not found", zap.Error(err)) + continue + } + + reorgDetectedAt = storedBlockNumber + reorgBeginsAt = reorgBlockNumber + reorgFinishesAt = storedBlockNumber + + if trackerErr := blockTracker.UpdateLatestBlock(ctx, reorgBlockNumber, reorgBlockHash); trackerErr != nil { + logger.Error("error updating block tracker", zap.Error(trackerErr)) + } + + reorgChannel <- reorgBlockNumber + continue + } + } + Retry: for { - err = logStorer.StoreLog(ctx, event) - if err != nil { - logger.Error("error storing log", zap.Error(err)) - if err.ShouldRetry() { + errStorage = logStorer.StoreLog(ctx, event) + if errStorage != nil { + logger.Error("error storing log", zap.Error(errStorage)) + if errStorage.ShouldRetry() { time.Sleep(100 * time.Millisecond) continue Retry } @@ -219,7 +344,6 @@ func indexLogs( } } break Retry - } } logger.Debug("finished") diff --git a/pkg/indexer/indexer_test.go b/pkg/indexer/indexer_test.go index 753862c8..792bf2dd 100644 --- a/pkg/indexer/indexer_test.go +++ b/pkg/indexer/indexer_test.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/mock" "github.com/xmtp/xmtpd/pkg/indexer/storer" + blockchainMocks "github.com/xmtp/xmtpd/pkg/mocks/blockchain" indexerMocks "github.com/xmtp/xmtpd/pkg/mocks/indexer" storerMocks "github.com/xmtp/xmtpd/pkg/mocks/storer" "github.com/xmtp/xmtpd/pkg/testutils" @@ -17,40 +18,81 @@ import ( func TestIndexLogsSuccess(t *testing.T) { channel := make(chan types.Log, 10) - defer close(channel) + reorgChannel := make(chan uint64, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + close(channel) + close(reorgChannel) + }() + newBlockNumber := uint64(10) newBlockHash := common.HexToHash( "0x0000000000000000000000000000000000000000000000000000000000000000", ) - logStorer := storerMocks.NewMockLogStorer(t) - blockTracker := indexerMocks.NewMockIBlockTracker(t) - blockTracker.EXPECT(). - UpdateLatestBlock(mock.Anything, newBlockNumber, newBlockHash.Bytes()). - Return(nil) - event := types.Log{ Address: common.HexToAddress("0x123"), BlockNumber: newBlockNumber, + BlockHash: newBlockHash, } - logStorer.EXPECT().StoreLog(mock.Anything, event).Times(1).Return(nil) + channel <- event - go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker) + mockClient := blockchainMocks.NewMockChainClient(t) + + blockTracker := indexerMocks.NewMockIBlockTracker(t) + blockTracker.EXPECT(). + UpdateLatestBlock(mock.Anything, newBlockNumber, newBlockHash.Bytes()). + Return(nil) + + reorgHandler := indexerMocks.NewMockChainReorgHandler(t) + + logStorer := storerMocks.NewMockLogStorer(t) + logStorer.EXPECT(). + StoreLog(mock.Anything, event). + Return(nil) + + go indexLogs( + ctx, + mockClient, + channel, + reorgChannel, + testutils.NewLog(t), + logStorer, + blockTracker, + reorgHandler, + ) + time.Sleep(100 * time.Millisecond) } func TestIndexLogsRetryableError(t *testing.T) { channel := make(chan types.Log, 10) - defer close(channel) + reorgChannel := make(chan uint64, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + close(channel) + close(reorgChannel) + }() - logStorer := storerMocks.NewMockLogStorer(t) - blockTracker := indexerMocks.NewMockIBlockTracker(t) + newBlockNumber := uint64(10) + newBlockHash := common.HexToHash( + "0x0000000000000000000000000000000000000000000000000000000000000000", + ) event := types.Log{ - Address: common.HexToAddress("0x123"), + Address: common.HexToAddress("0x123"), + BlockNumber: newBlockNumber, + BlockHash: newBlockHash, } + mockClient := blockchainMocks.NewMockChainClient(t) + logStorer := storerMocks.NewMockLogStorer(t) + blockTracker := indexerMocks.NewMockIBlockTracker(t) + reorgHandler := indexerMocks.NewMockChainReorgHandler(t) + // Will fail for the first call with a retryable error and a non-retryable error on the second call attemptNumber := 0 @@ -60,9 +102,20 @@ func TestIndexLogsRetryableError(t *testing.T) { attemptNumber++ return storer.NewLogStorageError(errors.New("retryable error"), attemptNumber < 2) }) + channel <- event - go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker) + go indexLogs( + ctx, + mockClient, + channel, + reorgChannel, + testutils.NewLog(t), + logStorer, + blockTracker, + reorgHandler, + ) + time.Sleep(200 * time.Millisecond) logStorer.AssertNumberOfCalls(t, "StoreLog", 2) diff --git a/pkg/indexer/reorgHandler.go b/pkg/indexer/reorgHandler.go new file mode 100644 index 00000000..2af9fe54 --- /dev/null +++ b/pkg/indexer/reorgHandler.go @@ -0,0 +1,152 @@ +package indexer + +import ( + "bytes" + "context" + "database/sql" + "errors" + "fmt" + "math/big" + + "github.com/xmtp/xmtpd/pkg/blockchain" + "github.com/xmtp/xmtpd/pkg/db/queries" +) + +type ChainReorgHandler interface { + FindReorgPoint(detectedAt uint64) (uint64, []byte, error) +} + +type ReorgHandler struct { + ctx context.Context + client blockchain.ChainClient + queries *queries.Queries +} + +var ( + ErrNoBlocksFound = errors.New("no blocks found") + ErrGetBlock = errors.New("failed to get block") +) + +// TODO: Make this configurable? +const BLOCK_RANGE_SIZE uint64 = 1000 + +func NewChainReorgHandler( + ctx context.Context, + client blockchain.ChainClient, + queries *queries.Queries, +) *ReorgHandler { + return &ReorgHandler{ + ctx: ctx, + client: client, + queries: queries, + } +} + +// TODO: When reorg range has been calculated, alert clients (TBD) +func (r *ReorgHandler) FindReorgPoint(detectedAt uint64) (uint64, []byte, error) { + startBlock, endBlock := blockRange(detectedAt) + + for { + storedBlocks, err := r.queries.GetBlocksInRange( + r.ctx, + queries.GetBlocksInRangeParams{ + StartBlock: startBlock, + EndBlock: endBlock, + }, + ) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return 0, nil, fmt.Errorf("failed to get stored blocks: %w", err) + } + + if len(storedBlocks) == 0 || errors.Is(err, sql.ErrNoRows) { + if startBlock == 0 { + return 0, nil, ErrNoBlocksFound + } + + startBlock, endBlock = blockRange(startBlock) + continue + } + + oldestBlock := storedBlocks[0] + chainBlock, err := r.client.BlockByNumber(r.ctx, big.NewInt(int64(oldestBlock.BlockNumber))) + if err != nil { + return 0, nil, fmt.Errorf("%w %d: %v", ErrGetBlock, oldestBlock.BlockNumber, err) + } + + // Oldest block doesn't match, reorg happened earlier in the chain + if !bytes.Equal(oldestBlock.BlockHash, chainBlock.Hash().Bytes()) { + if startBlock == 0 { + return 0, nil, ErrNoBlocksFound + } + + startBlock, endBlock = blockRange(startBlock) + continue + } + + // Oldest block matches, reorg happened in this range + blockNumber, blockHash, err := r.searchInRange(storedBlocks) + if err != nil { + return 0, nil, fmt.Errorf("failed to search reorg block in range: %w", err) + } + + if err := r.queries.UpdateBlocksCanonicalityInRange(r.ctx, blockNumber); err != nil { + return 0, nil, fmt.Errorf("failed to update block range canonicality: %w", err) + } + + return blockNumber, blockHash, nil + } +} + +func (r *ReorgHandler) searchInRange(blocks []queries.GetBlocksInRangeRow) (uint64, []byte, error) { + left, right := 0, len(blocks)-1 + for left <= right { + mid := (left + right) / 2 + storedBlock := blocks[mid] + + chainBlock, err := r.client.BlockByNumber( + r.ctx, + big.NewInt(int64(storedBlock.BlockNumber)), + ) + if err != nil { + return 0, nil, fmt.Errorf("%w %d: %v", ErrGetBlock, storedBlock.BlockNumber, err) + } + + if bytes.Equal(storedBlock.BlockHash, chainBlock.Hash().Bytes()) { + // Found a matching block, check if next block differs to confirm reorg point + if mid < len(blocks)-1 { + nextBlock := blocks[mid+1] + nextChainBlock, err := r.client.BlockByNumber( + r.ctx, + big.NewInt(int64(nextBlock.BlockNumber)), + ) + if err != nil { + return 0, nil, fmt.Errorf("%w %d: %v", ErrGetBlock, nextBlock.BlockNumber, err) + } + + if !bytes.Equal(nextBlock.BlockHash, nextChainBlock.Hash().Bytes()) { + return storedBlock.BlockNumber, chainBlock.Hash().Bytes(), nil + } + } + + // If next block doesn't differ, search upper half + left = mid + 1 + } else { + // If chainBlock differs, search lower half + right = mid - 1 + } + } + + // This should never happen. If it happens, return the first block in the range. + block := blocks[0] + return block.BlockNumber, block.BlockHash, nil +} + +func blockRange(from uint64) (startBlock uint64, endBlock uint64) { + endBlock = from + + if endBlock >= BLOCK_RANGE_SIZE { + startBlock = endBlock - BLOCK_RANGE_SIZE + } + + return startBlock, endBlock +} diff --git a/pkg/indexer/reorgHandler_test.go b/pkg/indexer/reorgHandler_test.go new file mode 100644 index 00000000..b2d17314 --- /dev/null +++ b/pkg/indexer/reorgHandler_test.go @@ -0,0 +1,43 @@ +package indexer + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_blockRange(t *testing.T) { + tests := []struct { + name string + from uint64 + wantStartBlock uint64 + wantEndBlock uint64 + }{ + { + name: "block range with subtraction", + from: 1001, + wantStartBlock: 1, + wantEndBlock: 1001, + }, + { + name: "block range without subtraction", + from: 500, + wantStartBlock: 0, + wantEndBlock: 500, + }, + { + name: "block range zero", + from: 0, + wantStartBlock: 0, + wantEndBlock: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + startBlock, endBlock := blockRange(tt.from) + assert.Equal(t, tt.wantStartBlock, startBlock) + assert.Equal(t, tt.wantEndBlock, endBlock) + }) + } +} diff --git a/pkg/indexer/storer/groupMessage.go b/pkg/indexer/storer/groupMessage.go index 9c65a63b..5903fbb0 100644 --- a/pkg/indexer/storer/groupMessage.go +++ b/pkg/indexer/storer/groupMessage.go @@ -13,6 +13,11 @@ import ( "google.golang.org/protobuf/proto" ) +const ( + // We may not want to hardcode this to 0 and have an originator ID for each smart contract? + GROUP_MESSAGE_ORIGINATOR_ID = 0 +) + type GroupMessageStorer struct { contract *groupmessages.GroupMessages queries *queries.Queries @@ -32,7 +37,10 @@ func NewGroupMessageStorer( } // Validate and store a group message log event -func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError { +func (s *GroupMessageStorer) StoreLog( + ctx context.Context, + event types.Log, +) LogStorageError { msgSent, err := s.contract.ParseMessageSent(event) if err != nil { return NewLogStorageError(err, false) @@ -78,8 +86,7 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS s.logger.Debug("Inserting message from contract", zap.String("topic", topicStruct.String())) if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ - // We may not want to hardcode this to 0 and have an originator ID for each smart contract? - OriginatorNodeID: 0, + OriginatorNodeID: GROUP_MESSAGE_ORIGINATOR_ID, OriginatorSequenceID: int64(msgSent.SequenceId), Topic: topicStruct.Bytes(), OriginatorEnvelope: originatorEnvelopeBytes, @@ -88,5 +95,16 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS return NewLogStorageError(err, true) } + if err = s.queries.InsertBlockchainMessage(ctx, queries.InsertBlockchainMessageParams{ + BlockNumber: event.BlockNumber, + BlockHash: event.BlockHash.Bytes(), + OriginatorNodeID: GROUP_MESSAGE_ORIGINATOR_ID, + OriginatorSequenceID: int64(msgSent.SequenceId), + IsCanonical: true, // New messages are always canonical + }); err != nil { + s.logger.Error("Error inserting blockchain message", zap.Error(err)) + return NewLogStorageError(err, true) + } + return nil } diff --git a/pkg/indexer/storer/identityUpdate.go b/pkg/indexer/storer/identityUpdate.go index 737923fc..9abeb86e 100644 --- a/pkg/indexer/storer/identityUpdate.go +++ b/pkg/indexer/storer/identityUpdate.go @@ -23,6 +23,7 @@ import ( ) const ( + // We may not want to hardcode this to 1 and have an originator ID for each smart contract? IDENTITY_UPDATE_ORIGINATOR_ID = 1 ) @@ -48,7 +49,10 @@ func NewIdentityUpdateStorer( } // Validate and store an identity update log event -func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError { +func (s *IdentityUpdateStorer) StoreLog( + ctx context.Context, + event types.Log, +) LogStorageError { msgSent, err := s.contract.ParseIdentityUpdateCreated(event) if err != nil { return NewLogStorageError(err, false) @@ -167,7 +171,6 @@ func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) Lo } if _, err = querier.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ - // We may not want to hardcode this to 1 and have an originator ID for each smart contract? OriginatorNodeID: IDENTITY_UPDATE_ORIGINATOR_ID, OriginatorSequenceID: int64(msgSent.SequenceId), Topic: messageTopic.Bytes(), @@ -177,6 +180,17 @@ func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) Lo return NewLogStorageError(err, true) } + if err = querier.InsertBlockchainMessage(ctx, queries.InsertBlockchainMessageParams{ + BlockNumber: event.BlockNumber, + BlockHash: event.BlockHash.Bytes(), + OriginatorNodeID: IDENTITY_UPDATE_ORIGINATOR_ID, + OriginatorSequenceID: int64(msgSent.SequenceId), + IsCanonical: true, // New messages are always canonical + }); err != nil { + s.logger.Error("Error inserting blockchain message", zap.Error(err)) + return NewLogStorageError(err, true) + } + return nil }, ) diff --git a/pkg/interceptors/server/auth_test.go b/pkg/interceptors/server/auth_test.go index a02085c3..01d890f4 100644 --- a/pkg/interceptors/server/auth_test.go +++ b/pkg/interceptors/server/auth_test.go @@ -36,7 +36,7 @@ func TestUnaryInterceptor(t *testing.T) { return metadata.NewIncomingContext(context.Background(), md) }, setupVerifier: func() { - mockVerifier.EXPECT().Verify("valid_token").Return(100, nil) + mockVerifier.EXPECT().Verify("valid_token").Return(uint32(0), nil) }, wantError: nil, wantVerifiedNode: true, @@ -71,7 +71,7 @@ func TestUnaryInterceptor(t *testing.T) { setupVerifier: func() { mockVerifier.EXPECT(). Verify("invalid_token"). - Return(0, errors.New("invalid signature")) + Return(uint32(0), errors.New("invalid signature")) }, wantError: status.Error( codes.Unauthenticated, @@ -131,7 +131,7 @@ func TestStreamInterceptor(t *testing.T) { return metadata.NewIncomingContext(context.Background(), md) }, setupVerifier: func() { - mockVerifier.EXPECT().Verify("valid_token").Return(100, nil) + mockVerifier.EXPECT().Verify("valid_token").Return(uint32(0), nil) }, wantError: nil, wantVerifiedNode: true, @@ -156,7 +156,7 @@ func TestStreamInterceptor(t *testing.T) { setupVerifier: func() { mockVerifier.EXPECT(). Verify("invalid_token"). - Return(0, errors.New("invalid signature")) + Return(uint32(0), errors.New("invalid signature")) }, wantError: status.Error( codes.Unauthenticated, diff --git a/pkg/migrations/00004_add_blockchain_columns.down.sql b/pkg/migrations/00004_add_blockchain_columns.down.sql new file mode 100644 index 00000000..8473d618 --- /dev/null +++ b/pkg/migrations/00004_add_blockchain_columns.down.sql @@ -0,0 +1,6 @@ +-- Drop index first +DROP INDEX IF EXISTS idx_blockchain_messages_canonical; + +-- Then drop the table +DROP TABLE IF EXISTS blockchain_messages; + diff --git a/pkg/migrations/00004_add_blockchain_columns.up.sql b/pkg/migrations/00004_add_blockchain_columns.up.sql new file mode 100644 index 00000000..77272d6e --- /dev/null +++ b/pkg/migrations/00004_add_blockchain_columns.up.sql @@ -0,0 +1,12 @@ +CREATE TABLE blockchain_messages( + block_number BIGINT NOT NULL, + block_hash BYTEA NOT NULL, + originator_node_id INT NOT NULL, + originator_sequence_id BIGINT NOT NULL, + is_canonical BOOLEAN NOT NULL DEFAULT TRUE, + PRIMARY KEY (block_number, block_hash, originator_node_id, originator_sequence_id), + FOREIGN KEY (originator_node_id, originator_sequence_id) REFERENCES gateway_envelopes(originator_node_id, originator_sequence_id) +); + +CREATE INDEX idx_blockchain_messages_block_canonical ON blockchain_messages(block_number, is_canonical); + diff --git a/pkg/mocks/authn/mock_JWTVerifier.go b/pkg/mocks/authn/mock_JWTVerifier.go index 0f335df3..0a44816a 100644 --- a/pkg/mocks/authn/mock_JWTVerifier.go +++ b/pkg/mocks/authn/mock_JWTVerifier.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package authn diff --git a/pkg/mocks/blockchain/mock_ChainClient.go b/pkg/mocks/blockchain/mock_ChainClient.go index 18e44ecc..b3c3287e 100644 --- a/pkg/mocks/blockchain/mock_ChainClient.go +++ b/pkg/mocks/blockchain/mock_ChainClient.go @@ -1,10 +1,12 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package blockchain import ( big "math/big" + common "github.com/ethereum/go-ethereum/common" + context "context" ethereum "github.com/ethereum/go-ethereum" @@ -27,6 +29,124 @@ func (_m *MockChainClient) EXPECT() *MockChainClient_Expecter { return &MockChainClient_Expecter{mock: &_m.Mock} } +// BlockByHash provides a mock function with given fields: ctx, hash +func (_m *MockChainClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + ret := _m.Called(ctx, hash) + + if len(ret) == 0 { + panic("no return value specified for BlockByHash") + } + + var r0 *types.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Block, error)); ok { + return rf(ctx, hash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.Block); ok { + r0 = rf(ctx, hash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, hash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_BlockByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockByHash' +type MockChainClient_BlockByHash_Call struct { + *mock.Call +} + +// BlockByHash is a helper method to define mock.On call +// - ctx context.Context +// - hash common.Hash +func (_e *MockChainClient_Expecter) BlockByHash(ctx interface{}, hash interface{}) *MockChainClient_BlockByHash_Call { + return &MockChainClient_BlockByHash_Call{Call: _e.mock.On("BlockByHash", ctx, hash)} +} + +func (_c *MockChainClient_BlockByHash_Call) Run(run func(ctx context.Context, hash common.Hash)) *MockChainClient_BlockByHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *MockChainClient_BlockByHash_Call) Return(_a0 *types.Block, _a1 error) *MockChainClient_BlockByHash_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_BlockByHash_Call) RunAndReturn(run func(context.Context, common.Hash) (*types.Block, error)) *MockChainClient_BlockByHash_Call { + _c.Call.Return(run) + return _c +} + +// BlockByNumber provides a mock function with given fields: ctx, number +func (_m *MockChainClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + ret := _m.Called(ctx, number) + + if len(ret) == 0 { + panic("no return value specified for BlockByNumber") + } + + var r0 *types.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Block, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Block); ok { + r0 = rf(ctx, number) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_BlockByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockByNumber' +type MockChainClient_BlockByNumber_Call struct { + *mock.Call +} + +// BlockByNumber is a helper method to define mock.On call +// - ctx context.Context +// - number *big.Int +func (_e *MockChainClient_Expecter) BlockByNumber(ctx interface{}, number interface{}) *MockChainClient_BlockByNumber_Call { + return &MockChainClient_BlockByNumber_Call{Call: _e.mock.On("BlockByNumber", ctx, number)} +} + +func (_c *MockChainClient_BlockByNumber_Call) Run(run func(ctx context.Context, number *big.Int)) *MockChainClient_BlockByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockChainClient_BlockByNumber_Call) Return(_a0 *types.Block, _a1 error) *MockChainClient_BlockByNumber_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_BlockByNumber_Call) RunAndReturn(run func(context.Context, *big.Int) (*types.Block, error)) *MockChainClient_BlockByNumber_Call { + _c.Call.Return(run) + return _c +} + // BlockNumber provides a mock function with given fields: ctx func (_m *MockChainClient) BlockNumber(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) @@ -200,6 +320,124 @@ func (_c *MockChainClient_FilterLogs_Call) RunAndReturn(run func(context.Context return _c } +// HeaderByHash provides a mock function with given fields: ctx, hash +func (_m *MockChainClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + ret := _m.Called(ctx, hash) + + if len(ret) == 0 { + panic("no return value specified for HeaderByHash") + } + + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Header, error)); ok { + return rf(ctx, hash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.Header); ok { + r0 = rf(ctx, hash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, hash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_HeaderByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HeaderByHash' +type MockChainClient_HeaderByHash_Call struct { + *mock.Call +} + +// HeaderByHash is a helper method to define mock.On call +// - ctx context.Context +// - hash common.Hash +func (_e *MockChainClient_Expecter) HeaderByHash(ctx interface{}, hash interface{}) *MockChainClient_HeaderByHash_Call { + return &MockChainClient_HeaderByHash_Call{Call: _e.mock.On("HeaderByHash", ctx, hash)} +} + +func (_c *MockChainClient_HeaderByHash_Call) Run(run func(ctx context.Context, hash common.Hash)) *MockChainClient_HeaderByHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *MockChainClient_HeaderByHash_Call) Return(_a0 *types.Header, _a1 error) *MockChainClient_HeaderByHash_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_HeaderByHash_Call) RunAndReturn(run func(context.Context, common.Hash) (*types.Header, error)) *MockChainClient_HeaderByHash_Call { + _c.Call.Return(run) + return _c +} + +// HeaderByNumber provides a mock function with given fields: ctx, number +func (_m *MockChainClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + ret := _m.Called(ctx, number) + + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Header); ok { + r0 = rf(ctx, number) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_HeaderByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HeaderByNumber' +type MockChainClient_HeaderByNumber_Call struct { + *mock.Call +} + +// HeaderByNumber is a helper method to define mock.On call +// - ctx context.Context +// - number *big.Int +func (_e *MockChainClient_Expecter) HeaderByNumber(ctx interface{}, number interface{}) *MockChainClient_HeaderByNumber_Call { + return &MockChainClient_HeaderByNumber_Call{Call: _e.mock.On("HeaderByNumber", ctx, number)} +} + +func (_c *MockChainClient_HeaderByNumber_Call) Run(run func(ctx context.Context, number *big.Int)) *MockChainClient_HeaderByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockChainClient_HeaderByNumber_Call) Return(_a0 *types.Header, _a1 error) *MockChainClient_HeaderByNumber_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_HeaderByNumber_Call) RunAndReturn(run func(context.Context, *big.Int) (*types.Header, error)) *MockChainClient_HeaderByNumber_Call { + _c.Call.Return(run) + return _c +} + // SubscribeFilterLogs provides a mock function with given fields: ctx, q, ch func (_m *MockChainClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { ret := _m.Called(ctx, q, ch) @@ -260,6 +498,182 @@ func (_c *MockChainClient_SubscribeFilterLogs_Call) RunAndReturn(run func(contex return _c } +// SubscribeNewHead provides a mock function with given fields: ctx, ch +func (_m *MockChainClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + ret := _m.Called(ctx, ch) + + if len(ret) == 0 { + panic("no return value specified for SubscribeNewHead") + } + + var r0 ethereum.Subscription + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)); ok { + return rf(ctx, ch) + } + if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) ethereum.Subscription); ok { + r0 = rf(ctx, ch) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ethereum.Subscription) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, chan<- *types.Header) error); ok { + r1 = rf(ctx, ch) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_SubscribeNewHead_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubscribeNewHead' +type MockChainClient_SubscribeNewHead_Call struct { + *mock.Call +} + +// SubscribeNewHead is a helper method to define mock.On call +// - ctx context.Context +// - ch chan<- *types.Header +func (_e *MockChainClient_Expecter) SubscribeNewHead(ctx interface{}, ch interface{}) *MockChainClient_SubscribeNewHead_Call { + return &MockChainClient_SubscribeNewHead_Call{Call: _e.mock.On("SubscribeNewHead", ctx, ch)} +} + +func (_c *MockChainClient_SubscribeNewHead_Call) Run(run func(ctx context.Context, ch chan<- *types.Header)) *MockChainClient_SubscribeNewHead_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(chan<- *types.Header)) + }) + return _c +} + +func (_c *MockChainClient_SubscribeNewHead_Call) Return(_a0 ethereum.Subscription, _a1 error) *MockChainClient_SubscribeNewHead_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_SubscribeNewHead_Call) RunAndReturn(run func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)) *MockChainClient_SubscribeNewHead_Call { + _c.Call.Return(run) + return _c +} + +// TransactionCount provides a mock function with given fields: ctx, blockHash +func (_m *MockChainClient) TransactionCount(ctx context.Context, blockHash common.Hash) (uint, error) { + ret := _m.Called(ctx, blockHash) + + if len(ret) == 0 { + panic("no return value specified for TransactionCount") + } + + var r0 uint + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (uint, error)); ok { + return rf(ctx, blockHash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) uint); ok { + r0 = rf(ctx, blockHash) + } else { + r0 = ret.Get(0).(uint) + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, blockHash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_TransactionCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TransactionCount' +type MockChainClient_TransactionCount_Call struct { + *mock.Call +} + +// TransactionCount is a helper method to define mock.On call +// - ctx context.Context +// - blockHash common.Hash +func (_e *MockChainClient_Expecter) TransactionCount(ctx interface{}, blockHash interface{}) *MockChainClient_TransactionCount_Call { + return &MockChainClient_TransactionCount_Call{Call: _e.mock.On("TransactionCount", ctx, blockHash)} +} + +func (_c *MockChainClient_TransactionCount_Call) Run(run func(ctx context.Context, blockHash common.Hash)) *MockChainClient_TransactionCount_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *MockChainClient_TransactionCount_Call) Return(_a0 uint, _a1 error) *MockChainClient_TransactionCount_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_TransactionCount_Call) RunAndReturn(run func(context.Context, common.Hash) (uint, error)) *MockChainClient_TransactionCount_Call { + _c.Call.Return(run) + return _c +} + +// TransactionInBlock provides a mock function with given fields: ctx, blockHash, index +func (_m *MockChainClient) TransactionInBlock(ctx context.Context, blockHash common.Hash, index uint) (*types.Transaction, error) { + ret := _m.Called(ctx, blockHash, index) + + if len(ret) == 0 { + panic("no return value specified for TransactionInBlock") + } + + var r0 *types.Transaction + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash, uint) (*types.Transaction, error)); ok { + return rf(ctx, blockHash, index) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash, uint) *types.Transaction); ok { + r0 = rf(ctx, blockHash, index) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Transaction) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash, uint) error); ok { + r1 = rf(ctx, blockHash, index) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_TransactionInBlock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TransactionInBlock' +type MockChainClient_TransactionInBlock_Call struct { + *mock.Call +} + +// TransactionInBlock is a helper method to define mock.On call +// - ctx context.Context +// - blockHash common.Hash +// - index uint +func (_e *MockChainClient_Expecter) TransactionInBlock(ctx interface{}, blockHash interface{}, index interface{}) *MockChainClient_TransactionInBlock_Call { + return &MockChainClient_TransactionInBlock_Call{Call: _e.mock.On("TransactionInBlock", ctx, blockHash, index)} +} + +func (_c *MockChainClient_TransactionInBlock_Call) Run(run func(ctx context.Context, blockHash common.Hash, index uint)) *MockChainClient_TransactionInBlock_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash), args[2].(uint)) + }) + return _c +} + +func (_c *MockChainClient_TransactionInBlock_Call) Return(_a0 *types.Transaction, _a1 error) *MockChainClient_TransactionInBlock_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_TransactionInBlock_Call) RunAndReturn(run func(context.Context, common.Hash, uint) (*types.Transaction, error)) *MockChainClient_TransactionInBlock_Call { + _c.Call.Return(run) + return _c +} + // NewMockChainClient creates a new instance of MockChainClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockChainClient(t interface { diff --git a/pkg/mocks/blockchain/mock_IBlockchainPublisher.go b/pkg/mocks/blockchain/mock_IBlockchainPublisher.go index ec5ee88d..47cfea20 100644 --- a/pkg/mocks/blockchain/mock_IBlockchainPublisher.go +++ b/pkg/mocks/blockchain/mock_IBlockchainPublisher.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package blockchain diff --git a/pkg/mocks/indexer/mock_ChainReorgHandler.go b/pkg/mocks/indexer/mock_ChainReorgHandler.go new file mode 100644 index 00000000..3a306786 --- /dev/null +++ b/pkg/mocks/indexer/mock_ChainReorgHandler.go @@ -0,0 +1,97 @@ +// Code generated by mockery v2.51.1. DO NOT EDIT. + +package indexer + +import mock "github.com/stretchr/testify/mock" + +// MockChainReorgHandler is an autogenerated mock type for the ChainReorgHandler type +type MockChainReorgHandler struct { + mock.Mock +} + +type MockChainReorgHandler_Expecter struct { + mock *mock.Mock +} + +func (_m *MockChainReorgHandler) EXPECT() *MockChainReorgHandler_Expecter { + return &MockChainReorgHandler_Expecter{mock: &_m.Mock} +} + +// FindReorgPoint provides a mock function with given fields: detectedAt +func (_m *MockChainReorgHandler) FindReorgPoint(detectedAt uint64) (uint64, []byte, error) { + ret := _m.Called(detectedAt) + + if len(ret) == 0 { + panic("no return value specified for FindReorgPoint") + } + + var r0 uint64 + var r1 []byte + var r2 error + if rf, ok := ret.Get(0).(func(uint64) (uint64, []byte, error)); ok { + return rf(detectedAt) + } + if rf, ok := ret.Get(0).(func(uint64) uint64); ok { + r0 = rf(detectedAt) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(uint64) []byte); ok { + r1 = rf(detectedAt) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]byte) + } + } + + if rf, ok := ret.Get(2).(func(uint64) error); ok { + r2 = rf(detectedAt) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockChainReorgHandler_FindReorgPoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindReorgPoint' +type MockChainReorgHandler_FindReorgPoint_Call struct { + *mock.Call +} + +// FindReorgPoint is a helper method to define mock.On call +// - detectedAt uint64 +func (_e *MockChainReorgHandler_Expecter) FindReorgPoint(detectedAt interface{}) *MockChainReorgHandler_FindReorgPoint_Call { + return &MockChainReorgHandler_FindReorgPoint_Call{Call: _e.mock.On("FindReorgPoint", detectedAt)} +} + +func (_c *MockChainReorgHandler_FindReorgPoint_Call) Run(run func(detectedAt uint64)) *MockChainReorgHandler_FindReorgPoint_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint64)) + }) + return _c +} + +func (_c *MockChainReorgHandler_FindReorgPoint_Call) Return(_a0 uint64, _a1 []byte, _a2 error) *MockChainReorgHandler_FindReorgPoint_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockChainReorgHandler_FindReorgPoint_Call) RunAndReturn(run func(uint64) (uint64, []byte, error)) *MockChainReorgHandler_FindReorgPoint_Call { + _c.Call.Return(run) + return _c +} + +// NewMockChainReorgHandler creates a new instance of MockChainReorgHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockChainReorgHandler(t interface { + mock.TestingT + Cleanup(func()) +}) *MockChainReorgHandler { + mock := &MockChainReorgHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/mocks/indexer/mock_IBlockTracker.go b/pkg/mocks/indexer/mock_IBlockTracker.go index 2c434df8..2f1bdec8 100644 --- a/pkg/mocks/indexer/mock_IBlockTracker.go +++ b/pkg/mocks/indexer/mock_IBlockTracker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package indexer diff --git a/pkg/mocks/mls_validationv1/mock_ValidationApiClient.go b/pkg/mocks/mls_validationv1/mock_ValidationApiClient.go index 6b94b6e2..01ddccf7 100644 --- a/pkg/mocks/mls_validationv1/mock_ValidationApiClient.go +++ b/pkg/mocks/mls_validationv1/mock_ValidationApiClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package mls_validationv1 diff --git a/pkg/mocks/mlsvalidate/mock_MLSValidationService.go b/pkg/mocks/mlsvalidate/mock_MLSValidationService.go index 18e1f389..4710312d 100644 --- a/pkg/mocks/mlsvalidate/mock_MLSValidationService.go +++ b/pkg/mocks/mlsvalidate/mock_MLSValidationService.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package mlsvalidate diff --git a/pkg/mocks/registry/mock_NodeRegistry.go b/pkg/mocks/registry/mock_NodeRegistry.go index 859fa6ea..e60efd2c 100644 --- a/pkg/mocks/registry/mock_NodeRegistry.go +++ b/pkg/mocks/registry/mock_NodeRegistry.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package registry diff --git a/pkg/mocks/registry/mock_NodesContract.go b/pkg/mocks/registry/mock_NodesContract.go index 8fbc70ff..457d6618 100644 --- a/pkg/mocks/registry/mock_NodesContract.go +++ b/pkg/mocks/registry/mock_NodesContract.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package registry diff --git a/pkg/mocks/storer/mock_LogStorer.go b/pkg/mocks/storer/mock_LogStorer.go index cc9f8112..3f991bb5 100644 --- a/pkg/mocks/storer/mock_LogStorer.go +++ b/pkg/mocks/storer/mock_LogStorer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.1. DO NOT EDIT. package storer diff --git a/sqlc.yaml b/sqlc.yaml index 910d79ef..82838f62 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -7,3 +7,6 @@ sql: go: package: "queries" out: "pkg/db/queries" + overrides: + - column: "blockchain_messages.block_number" + go_type: "uint64"