Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: detect chain reorgs #411

Merged
merged 14 commits into from
Jan 30, 2025
Prev Previous commit
Next Next commit
fix test
fbac committed Jan 27, 2025
commit 2fc4f4b50b0faf42aec06dc753ccc3783257e044
4 changes: 4 additions & 0 deletions pkg/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
@@ -228,6 +228,10 @@ func (r *RpcLogStreamer) getNextPage(
return logs, &nextBlockNumber, nil
}

func (r *RpcLogStreamer) Client() ChainClient {
return r.client
}

func buildFilterQuery(
contractConfig contractConfig,
fromBlock int64,
2 changes: 1 addition & 1 deletion pkg/blockchain/rpcLogStreamer_test.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ func TestBuilder(t *testing.T) {
require.NoError(t, err)
builder := NewRpcLogStreamBuilder(context.Background(), testclient, testutils.NewLog(t))

listenerChannel := builder.ListenForContractEvent(
listenerChannel, _ := builder.ListenForContractEvent(
1,
testutils.RandomAddress(),
[]common.Hash{testutils.RandomLogTopic()}, 5*time.Minute,
4 changes: 2 additions & 2 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ func (i *Indexer) StartIndexer(

indexLogs(
ctx,
client,
streamer.streamer.Client(),
streamer.messagesChannel,
streamer.messagesReorgChannel,
cfg.SafeBlockDistance,
@@ -113,7 +113,7 @@ func (i *Indexer) StartIndexer(
With(zap.String("contractAddress", cfg.IdentityUpdatesContractAddress))
indexLogs(
ctx,
client,
streamer.streamer.Client(),
streamer.identityUpdatesChannel,
streamer.identityUpdatesReorgChannel,
cfg.SafeBlockDistance,
91 changes: 75 additions & 16 deletions pkg/indexer/indexer_test.go
Original file line number Diff line number Diff line change
@@ -10,59 +10,118 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
"github.com/xmtp/xmtpd/pkg/indexer/storer"
blockchainMocks "github.com/xmtp/xmtpd/pkg/mocks/blockchain"
indexerMocks "github.com/xmtp/xmtpd/pkg/mocks/indexer"
storerMocks "github.com/xmtp/xmtpd/pkg/mocks/storer"
"github.com/xmtp/xmtpd/pkg/testutils"
)

const testSafeBlockDistance = uint64(10)

func TestIndexLogsSuccess(t *testing.T) {
channel := make(chan types.Log, 10)
defer close(channel)
reorgChannel := make(chan uint64, 1)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
close(channel)
close(reorgChannel)
}()
fbac marked this conversation as resolved.
Show resolved Hide resolved

newBlockNumber := uint64(10)
newBlockHash := common.HexToHash(
"0x0000000000000000000000000000000000000000000000000000000000000000",
)

logStorer := storerMocks.NewMockLogStorer(t)
blockTracker := indexerMocks.NewMockIBlockTracker(t)
blockTracker.EXPECT().
UpdateLatestBlock(mock.Anything, newBlockNumber, newBlockHash.Bytes()).
Return(nil)

event := types.Log{
Address: common.HexToAddress("0x123"),
BlockNumber: newBlockNumber,
BlockHash: newBlockHash,
}
logStorer.EXPECT().StoreLog(mock.Anything, event).Times(1).Return(nil)

channel <- event

go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker)
mockClient := blockchainMocks.NewMockChainClient(t)

blockTracker := indexerMocks.NewMockIBlockTracker(t)
blockTracker.EXPECT().
UpdateLatestBlock(mock.Anything, newBlockNumber, newBlockHash.Bytes()).
Return(nil)
blockTracker.EXPECT().
GetLatestBlock().
Return(newBlockNumber, newBlockHash.Bytes())

logStorer := storerMocks.NewMockLogStorer(t)
logStorer.EXPECT().
StoreLog(mock.Anything, event, false).
Return(nil)

go indexLogs(
ctx,
mockClient,
channel,
reorgChannel,
testSafeBlockDistance,
testutils.NewLog(t),
logStorer,
blockTracker,
)

time.Sleep(100 * time.Millisecond)
}

func TestIndexLogsRetryableError(t *testing.T) {
channel := make(chan types.Log, 10)
defer close(channel)
reorgChannel := make(chan uint64, 1)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
close(channel)
close(reorgChannel)
}()

logStorer := storerMocks.NewMockLogStorer(t)
blockTracker := indexerMocks.NewMockIBlockTracker(t)
newBlockNumber := uint64(10)
newBlockHash := common.HexToHash(
"0x0000000000000000000000000000000000000000000000000000000000000000",
)

event := types.Log{
Address: common.HexToAddress("0x123"),
Address: common.HexToAddress("0x123"),
BlockNumber: newBlockNumber,
BlockHash: newBlockHash,
}

mockClient := blockchainMocks.NewMockChainClient(t)
logStorer := storerMocks.NewMockLogStorer(t)

blockTracker := indexerMocks.NewMockIBlockTracker(t)
blockTracker.EXPECT().
GetLatestBlock().
Return(newBlockNumber, newBlockHash.Bytes())

// Will fail for the first call with a retryable error and a non-retryable error on the second call
attemptNumber := 0

logStorer.EXPECT().
StoreLog(mock.Anything, event).
RunAndReturn(func(ctx context.Context, log types.Log) storer.LogStorageError {
StoreLog(mock.Anything, event, false).
RunAndReturn(func(ctx context.Context, log types.Log, isCanonical bool) storer.LogStorageError {
attemptNumber++
return storer.NewLogStorageError(errors.New("retryable error"), attemptNumber < 2)
})

channel <- event

go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker)
go indexLogs(
ctx,
mockClient,
channel,
reorgChannel,
testSafeBlockDistance,
testutils.NewLog(t),
logStorer,
blockTracker,
)

time.Sleep(200 * time.Millisecond)

logStorer.AssertNumberOfCalls(t, "StoreLog", 2)
5 changes: 4 additions & 1 deletion pkg/indexer/storer/groupMessage_test.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ func TestStoreGroupMessages(t *testing.T) {
err = storer.StoreLog(
ctx,
logMessage,
false,
)
require.NoError(t, err)

@@ -99,12 +100,14 @@ func TestStoreGroupMessageDuplicate(t *testing.T) {
err := storer.StoreLog(
ctx,
logMessage,
false,
)
require.NoError(t, err)
// Store the log a second time
err = storer.StoreLog(
ctx,
logMessage,
false,
)
require.NoError(t, err)

@@ -133,7 +136,7 @@ func TestStoreGroupMessageMalformed(t *testing.T) {
Data: []byte("foo"),
}

storageErr := storer.StoreLog(ctx, logMessage)
storageErr := storer.StoreLog(ctx, logMessage, false)
require.Error(t, storageErr)
require.False(t, storageErr.ShouldRetry())
}
1 change: 1 addition & 0 deletions pkg/indexer/storer/identityUpdate_test.go
Original file line number Diff line number Diff line change
@@ -79,6 +79,7 @@ func TestStoreIdentityUpdate(t *testing.T) {
err := storer.StoreLog(
ctx,
logMessage,
false,
)
require.NoError(t, err)

8 changes: 4 additions & 4 deletions pkg/interceptors/server/auth_test.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ func TestUnaryInterceptor(t *testing.T) {
return metadata.NewIncomingContext(context.Background(), md)
},
setupVerifier: func() {
mockVerifier.EXPECT().Verify("valid_token").Return(100, nil)
mockVerifier.EXPECT().Verify("valid_token").Return(uint32(0), nil)
fbac marked this conversation as resolved.
Show resolved Hide resolved
},
wantError: nil,
wantVerifiedNode: true,
@@ -71,7 +71,7 @@ func TestUnaryInterceptor(t *testing.T) {
setupVerifier: func() {
mockVerifier.EXPECT().
Verify("invalid_token").
Return(0, errors.New("invalid signature"))
Return(uint32(0), errors.New("invalid signature"))
},
wantError: status.Error(
codes.Unauthenticated,
@@ -131,7 +131,7 @@ func TestStreamInterceptor(t *testing.T) {
return metadata.NewIncomingContext(context.Background(), md)
},
setupVerifier: func() {
mockVerifier.EXPECT().Verify("valid_token").Return(100, nil)
mockVerifier.EXPECT().Verify("valid_token").Return(uint32(0), nil)
},
wantError: nil,
wantVerifiedNode: true,
@@ -156,7 +156,7 @@ func TestStreamInterceptor(t *testing.T) {
setupVerifier: func() {
mockVerifier.EXPECT().
Verify("invalid_token").
Return(0, errors.New("invalid signature"))
Return(uint32(0), errors.New("invalid signature"))
},
wantError: status.Error(
codes.Unauthenticated,
414 changes: 414 additions & 0 deletions pkg/mocks/blockchain/mock_ChainClient.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/mocks/registry/mock_NodesContract.go
21 changes: 11 additions & 10 deletions pkg/mocks/storer/mock_LogStorer.go