Skip to content

Commit

Permalink
more tidy up
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <[email protected]>
  • Loading branch information
Chengxuan committed Sep 5, 2024
1 parent 837f0a8 commit ed72634
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 10 deletions.
2 changes: 1 addition & 1 deletion internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,8 @@ func (bl *blockListener) addConsumer(ctx context.Context, c *blockUpdateConsumer
bl.checkAndStartListenerLoop()
bl.waitUntilStarted(ctx) // need to make sure the listener is started before adding any consumers
bl.mux.Lock()
defer bl.mux.Unlock()
bl.consumers[*c.id] = c
bl.mux.Unlock()
}

func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
Expand Down
4 changes: 0 additions & 4 deletions internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ func TestGetInitialBlockTimeout(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(&rpcbackend.RPCError{Message: "pop"}).Run(func(args mock.Arguments) {
<-blockRPC // make it timeout
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe()

_, err := l.getInitialBlock(ctx, "latest")
assert.Regexp(t, "FF23046", err)
Expand All @@ -156,8 +154,6 @@ func TestGetHWMNotInit(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(&rpcbackend.RPCError{Message: "pop"}).Run(func(args mock.Arguments) {
<-blockRPC // make it timeout
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe()

_, err := l.getInitialBlock(ctx, "latest")
assert.Regexp(t, "FF23046", err)
Expand Down
6 changes: 2 additions & 4 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ func (es *eventStream) leadGroupSteadyState() bool {
var ag *aggregatedListener
lastUpdate := -1
failCount := 0
filterRPC := ""
filterResetRequired := false
for {
if es.c.doFailureDelay(es.ctx, failCount) {
Expand Down Expand Up @@ -330,7 +329,6 @@ func (es *eventStream) leadGroupSteadyState() bool {
es.uninstallFilter(&filter)
}
filterResetRequired = false
filterRPC = "eth_getFilterLogs" // first JSON/RPC after getting a new filter ID
// Determine the earliest block we need to poll from
fromBlock := int64(-1)
for _, l := range ag.listeners {
Expand Down Expand Up @@ -364,14 +362,14 @@ func (es *eventStream) leadGroupSteadyState() bool {
}
// Get the next batch of logs
var ethLogs []*logJSONRPC
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, filterRPC, filter)
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, "eth_getFilterLogs", filter)
// If we fail to query we just retry - setting filter to nil if not found
if rpcErr != nil {
if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(es.ctx).Infof("Filter '%v' reset: %s", filter, rpcErr.Message)
filter = ""
}
log.L(es.ctx).Errorf("Failed to query filter (%s): %s", filterRPC, rpcErr.Message)
log.L(es.ctx).Errorf("Failed to query filter (eth_getFilterLogs): %s", rpcErr.Message)
failCount++
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
Data: ethtypes.MustNewHexBytes0xPrefix("0x00000000000000000000000000000000000000000000000000000000000003e8"),
},
}
}).Maybe()
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c", false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(212122),
Expand Down

0 comments on commit ed72634

Please sign in to comment.