Skip to content

Commit

Permalink
ensure only offsets are pulled after a filter is esstablished
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <[email protected]>
  • Loading branch information
Chengxuan committed Dec 10, 2024
1 parent b561f24 commit 53356e1
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
lastUpdate := -1
failCount := 0
filterResetRequired := false
filterRPCMethodToUse := ""
for {
if es.c.doFailureDelay(es.ctx, failCount) {
log.L(es.ctx).Debugf("Stream loop exiting")
Expand Down Expand Up @@ -329,6 +330,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
es.uninstallFilter(&filter)
}
filterResetRequired = false
filterRPCMethodToUse = "eth_getFilterLogs" // first JSON/RPC for a new filter ID fetches all the historical logs to ensure no gaps
// Determine the earliest block we need to poll from
fromBlock := int64(-1)
for _, l := range ag.listeners {
Expand Down Expand Up @@ -362,17 +364,18 @@ func (es *eventStream) leadGroupSteadyState() bool {
}
// Get the next batch of logs
var ethLogs []*logJSONRPC
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, "eth_getFilterLogs", filter)
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, filterRPCMethodToUse, filter)
// If we fail to query we just retry - setting filter to nil if not found
if rpcErr != nil {
if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(es.ctx).Infof("Filter '%v' reset: %s", filter, rpcErr.Message)
filter = ""
}
log.L(es.ctx).Errorf("Failed to query filter (eth_getFilterLogs): %s", rpcErr.Message)
log.L(es.ctx).Errorf("Failed to query filter (%s): %s", filterRPCMethodToUse, rpcErr.Message)
failCount++
continue
}
filterRPCMethodToUse = "eth_getFilterChanges" // subsequent JSON/RPC calls after the initial fetch, this fetches only the new logs
// Enrich the events
events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs)
if enrichErr != nil {
Expand Down

0 comments on commit 53356e1

Please sign in to comment.