Skip to content

Commit

Permalink
store reorged messages
Browse files Browse the repository at this point in the history
  • Loading branch information
fbac committed Jan 27, 2025
1 parent 6e52c40 commit f3da803
Show file tree
Hide file tree
Showing 59 changed files with 347 additions and 328 deletions.
12 changes: 11 additions & 1 deletion pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ WHERE

-- name: InsertBlockchainMessage :exec
INSERT INTO blockchain_messages(block_number, block_hash, originator_node_id, originator_sequence_id, is_canonical)
VALUES (@block_number, @block_hash, @originator_node_id, @originator_sequence_id, @is_canonical);
VALUES (@block_number, @block_hash, @originator_node_id, @originator_sequence_id, @is_canonical)
ON CONFLICT
DO NOTHING;

-- name: GetBlocksInRange :many
-- Returns blocks in descending order (newest to oldest)
Expand All @@ -144,3 +146,11 @@ ORDER BY
block_number ASC,
block_hash;

-- name: UpdateBlocksCanonicalityInRange :exec
UPDATE
blockchain_messages
SET
is_canonical = FALSE
WHERE
block_number >= @reorg_block_number;

2 changes: 1 addition & 1 deletion pkg/db/queries/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func indexLogs(

Retry:
for {
errStorage = logStorer.StoreLog(ctx, event, reorgInProgress)
errStorage = logStorer.StoreLog(ctx, event)
if errStorage != nil {
logger.Error("error storing log", zap.Error(errStorage))
if errStorage.ShouldRetry() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestIndexLogsSuccess(t *testing.T) {

logStorer := storerMocks.NewMockLogStorer(t)
logStorer.EXPECT().
StoreLog(mock.Anything, event, false).
StoreLog(mock.Anything, event).
Return(nil)

go indexLogs(
Expand Down Expand Up @@ -97,8 +97,8 @@ func TestIndexLogsRetryableError(t *testing.T) {
attemptNumber := 0

logStorer.EXPECT().
StoreLog(mock.Anything, event, false).
RunAndReturn(func(ctx context.Context, log types.Log, isCanonical bool) storer.LogStorageError {
StoreLog(mock.Anything, event).
RunAndReturn(func(ctx context.Context, log types.Log) storer.LogStorageError {
attemptNumber++
return storer.NewLogStorageError(errors.New("retryable error"), attemptNumber < 2)
})
Expand Down
32 changes: 22 additions & 10 deletions pkg/indexer/reorgHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,38 @@ func (r *ReorgHandler) FindReorgPoint(detectedAt uint64) (uint64, []byte, error)
}

// Oldest block matches, reorg happened in this range
return r.searchInRange(storedBlocks)
blockNumber, blockHash, err := r.searchInRange(storedBlocks)
if err != nil {
return 0, nil, fmt.Errorf("failed to search reorg block in range: %w", err)
}

if err := r.queries.UpdateBlocksCanonicalityInRange(r.ctx, blockNumber); err != nil {
return 0, nil, fmt.Errorf("failed to update block range canonicality: %w", err)
}

return blockNumber, blockHash, nil
}
}

func (r *ReorgHandler) searchInRange(blocks []queries.GetBlocksInRangeRow) (uint64, []byte, error) {
if len(blocks) == 0 {
return 0, nil, ErrNoBlocksFound
}

left, right := 0, len(blocks)-1
for left <= right {
mid := (left + right) / 2
block := blocks[mid]
storedBlock := blocks[mid]

chainBlock, err := r.client.BlockByNumber(
r.ctx,
big.NewInt(int64(block.BlockNumber)),
big.NewInt(int64(storedBlock.BlockNumber)),
)
if err != nil {
return 0, nil, fmt.Errorf("%w %d: %v", ErrGetBlock, block.BlockNumber, err)
return 0, nil, fmt.Errorf("%w %d: %v", ErrGetBlock, storedBlock.BlockNumber, err)
}

if bytes.Equal(block.BlockHash, chainBlock.Hash().Bytes()) {
if bytes.Equal(storedBlock.BlockHash, chainBlock.Hash().Bytes()) {
// Found a matching block, check if next block differs to confirm reorg point
if mid < len(blocks)-1 {
nextBlock := blocks[mid+1]
Expand All @@ -115,10 +128,8 @@ func (r *ReorgHandler) searchInRange(blocks []queries.GetBlocksInRangeRow) (uint
}

if !bytes.Equal(nextBlock.BlockHash, nextChainBlock.Hash().Bytes()) {
return block.BlockNumber, chainBlock.Hash().Bytes(), nil
return storedBlock.BlockNumber, chainBlock.Hash().Bytes(), nil
}
} else if mid == len(blocks)-1 {
return block.BlockNumber, chainBlock.Hash().Bytes(), nil
}

// If next block doesn't differ, search upper half
Expand All @@ -129,8 +140,9 @@ func (r *ReorgHandler) searchInRange(blocks []queries.GetBlocksInRangeRow) (uint
}
}

// TODO: This should never happen, start from 0?
return 0, nil, fmt.Errorf("reorg point not found")
// This should never happen. If it happens, return the first block in the range.
block := blocks[0]
return block.BlockNumber, block.BlockHash, nil
}

func blockRange(from uint64) (startBlock uint64, endBlock uint64) {
Expand Down
16 changes: 11 additions & 5 deletions pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func NewGroupMessageStorer(
func (s *GroupMessageStorer) StoreLog(
ctx context.Context,
event types.Log,
appendLog bool,
) LogStorageError {
msgSent, err := s.contract.ParseMessageSent(event)
if err != nil {
Expand Down Expand Up @@ -84,10 +83,6 @@ func (s *GroupMessageStorer) StoreLog(
return NewLogStorageError(err, false)
}

if appendLog {
// placeholder
}

s.logger.Debug("Inserting message from contract", zap.String("topic", topicStruct.String()))

if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
Expand All @@ -100,5 +95,16 @@ func (s *GroupMessageStorer) StoreLog(
return NewLogStorageError(err, true)
}

if err = s.queries.InsertBlockchainMessage(ctx, queries.InsertBlockchainMessageParams{
BlockNumber: event.BlockNumber,
BlockHash: event.BlockHash.Bytes(),
OriginatorNodeID: GROUP_MESSAGE_ORIGINATOR_ID,
OriginatorSequenceID: int64(msgSent.SequenceId),
IsCanonical: true, // New messages are always canonical
}); err != nil {
s.logger.Error("Error inserting blockchain message", zap.Error(err))
return NewLogStorageError(err, true)
}

return nil
}
5 changes: 1 addition & 4 deletions pkg/indexer/storer/groupMessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func TestStoreGroupMessages(t *testing.T) {
err = storer.StoreLog(
ctx,
logMessage,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -100,14 +99,12 @@ func TestStoreGroupMessageDuplicate(t *testing.T) {
err := storer.StoreLog(
ctx,
logMessage,
false,
)
require.NoError(t, err)
// Store the log a second time
err = storer.StoreLog(
ctx,
logMessage,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -136,7 +133,7 @@ func TestStoreGroupMessageMalformed(t *testing.T) {
Data: []byte("foo"),
}

storageErr := storer.StoreLog(ctx, logMessage, false)
storageErr := storer.StoreLog(ctx, logMessage)
require.Error(t, storageErr)
require.False(t, storageErr.ShouldRetry())
}
16 changes: 11 additions & 5 deletions pkg/indexer/storer/identityUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func NewIdentityUpdateStorer(
func (s *IdentityUpdateStorer) StoreLog(
ctx context.Context,
event types.Log,
appendLog bool,
) LogStorageError {
msgSent, err := s.contract.ParseIdentityUpdateCreated(event)
if err != nil {
Expand Down Expand Up @@ -171,10 +170,6 @@ func (s *IdentityUpdateStorer) StoreLog(
return NewLogStorageError(err, true)
}

if appendLog {
// placeholder
}

if _, err = querier.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
OriginatorNodeID: IDENTITY_UPDATE_ORIGINATOR_ID,
OriginatorSequenceID: int64(msgSent.SequenceId),
Expand All @@ -185,6 +180,17 @@ func (s *IdentityUpdateStorer) StoreLog(
return NewLogStorageError(err, true)
}

if err = querier.InsertBlockchainMessage(ctx, queries.InsertBlockchainMessageParams{
BlockNumber: event.BlockNumber,
BlockHash: event.BlockHash.Bytes(),
OriginatorNodeID: IDENTITY_UPDATE_ORIGINATOR_ID,
OriginatorSequenceID: int64(msgSent.SequenceId),
IsCanonical: true, // New messages are always canonical
}); err != nil {
s.logger.Error("Error inserting blockchain message", zap.Error(err))
return NewLogStorageError(err, true)
}

return nil
},
)
Expand Down
1 change: 0 additions & 1 deletion pkg/indexer/storer/identityUpdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func TestStoreIdentityUpdate(t *testing.T) {
err := storer.StoreLog(
ctx,
logMessage,
false,
)
require.NoError(t, err)

Expand Down
3 changes: 1 addition & 2 deletions pkg/indexer/storer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
)

// Takes a log event and stores it, returning either an error that may be retriable, non-retriable, or nil
// appendLog should be true if the log is part of a reorg; invalidates the old lod and appends the new one
type LogStorer interface {
StoreLog(ctx context.Context, event types.Log, appendLog bool) LogStorageError
StoreLog(ctx context.Context, event types.Log) LogStorageError
}
21 changes: 0 additions & 21 deletions pkg/indexer/storer/logAppend.go

This file was deleted.

8 changes: 2 additions & 6 deletions pkg/migrations/00004_add_blockchain_columns.up.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
-- Create table to track blockchain messages
CREATE TABLE blockchain_messages(
block_number BIGINT NOT NULL,
block_hash BYTEA NOT NULL,
originator_node_id INT NOT NULL,
originator_sequence_id BIGINT NOT NULL,
is_canonical BOOLEAN NOT NULL DEFAULT TRUE,
PRIMARY KEY (block_number, block_hash),
PRIMARY KEY (block_number, block_hash, originator_node_id, originator_sequence_id),
FOREIGN KEY (originator_node_id, originator_sequence_id) REFERENCES gateway_envelopes(originator_node_id, originator_sequence_id)
);

CREATE INDEX idx_blockchain_messages_canonical ON blockchain_messages(block_number DESC, block_hash)
WHERE
is_canonical = TRUE;
CREATE INDEX idx_blockchain_messages_block_canonical ON blockchain_messages(block_number, is_canonical);

-- TODO: Check for better indexes
2 changes: 1 addition & 1 deletion pkg/mocks/authn/mock_JWTVerifier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/blockchain/mock_ChainClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/blockchain/mock_IBlockchainPublisher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/indexer/mock_ChainReorgHandler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/indexer/mock_IBlockTracker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/mls_validationv1/mock_ValidationApiClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/mlsvalidate/mock_MLSValidationService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/registry/mock_NodeRegistry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/registry/mock_NodesContract.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f3da803

Please sign in to comment.