Skip to content

Commit

Permalink
refactor(relayer): Factor out common listener utils (#1841)
Browse files Browse the repository at this point in the history
This is a pre-emptive change made to make it easier to reuse common
components in alternative listener implementations. Examples for a
viem-based listener, as well as an eventual Solana listener.

There are subsequent changes in the pipeline to factor out evm-specific
parts.
  • Loading branch information
pxrl authored Sep 30, 2024
1 parent 82e4042 commit 4a39db4
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 117 deletions.
139 changes: 22 additions & 117 deletions src/libexec/RelayerSpokePoolIndexer.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import assert from "assert";
import minimist from "minimist";
import { Contract, EventFilter, providers as ethersProviders, utils as ethersUtils } from "ethers";
import { Contract, providers as ethersProviders, utils as ethersUtils } from "ethers";
import { utils as sdkUtils } from "@across-protocol/sdk";
import * as utils from "../../scripts/utils";
import { Log } from "../interfaces";
import { SpokePoolClientMessage } from "../clients";
import {
disconnectRedisClients,
EventManager,
Expand All @@ -19,20 +17,13 @@ import {
getRedisCache,
getWSProviders,
Logger,
paginatedEventQuery,
sortEventsAscending,
winston,
} from "../utils";
import { postEvents, removeEvent } from "./util/ipc";
import { ScraperOpts } from "./types";
import { getEventFilter, getEventFilterArgs, scrapeEvents as _scrapeEvents } from "./util/evm";

type WebSocketProvider = ethersProviders.WebSocketProvider;
type EventSearchConfig = sdkUtils.EventSearchConfig;
type ScraperOpts = {
lookback?: number; // Event lookback (in seconds).
deploymentBlock: number; // SpokePool deployment block
maxBlockRange?: number; // Maximum block range for paginated getLogs queries.
filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply.
};

const { NODE_SUCCESS, NODE_APP_ERR } = utils;

const INDEXER_POLLING_PERIOD = 2_000; // ms; time to sleep between checking for exit request via SIGHUP.
Expand All @@ -45,108 +36,21 @@ let stop = false;
let oldestTime = 0;

/**
* Given an event name and contract, return the corresponding Ethers EventFilter object.
* @param contract Ethers Constract instance.
* @param eventName The name of the event to be filtered.
* @param filterArgs Optional filter arguments to be applied.
* @returns An Ethers EventFilter instance.
*/
function getEventFilter(contract: Contract, eventName: string, filterArgs?: string[]): EventFilter {
const filter = contract.filters[eventName];
if (!isDefined(filter)) {
throw new Error(`Event ${eventName} not defined for contract`);
}

return isDefined(filterArgs) ? filter(...filterArgs) : filter();
}

function getEventFilterArgs(relayer?: string): { [event: string]: string[] } {
const FilledV3Relay = !isDefined(relayer)
? undefined
: [null, null, null, null, null, null, null, null, null, null, relayer];

return { FilledV3Relay };
}

/**
* Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent
* process (if defined).
* @param blockNumber Block number up to which the update applies.
* @param currentTime The SpokePool timestamp at blockNumber.
* @param events An array of Log objects to be submitted.
* @returns void
*/
function postEvents(blockNumber: number, currentTime: number, events: Log[]): void {
if (!isDefined(process.send) || stop) {
return;
}

// Drop the array component of event.args and retain the named k/v pairs,
// otherwise stringification tends to retain only the array.
events = sortEventsAscending(events);

const message: SpokePoolClientMessage = {
blockNumber,
currentTime,
oldestTime,
nEvents: events.length,
data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given an event removal notification, post the message to the parent process.
* @param event Log instance.
* @returns void
*/
function removeEvent(event: Log): void {
if (!isDefined(process.send) || stop) {
return;
}

const message: SpokePoolClientMessage = {
event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the
* parent process (if defined).
* Aggregate utils/scrapeEvents for a series of event names.
* @param spokePool Ethers Constract instance.
* @param eventName The name of the event to be filtered.
* @param eventNames The array of events to be queried.
* @param opts Options to configure event scraping behaviour.
* @returns void
*/
async function scrapeEvents(spokePool: Contract, eventName: string, opts: ScraperOpts): Promise<void> {
const { lookback, deploymentBlock, filterArgs, maxBlockRange } = opts;
const { provider } = spokePool;
const { chainId } = await provider.getNetwork();
const chain = getNetworkName(chainId);

let tStart: number, tStop: number;

const pollEvents = async (filter: EventFilter, searchConfig: EventSearchConfig): Promise<Log[]> => {
tStart = performance.now();
const events = await paginatedEventQuery(spokePool, filter, searchConfig);
tStop = performance.now();
logger.debug({
at: "SpokePoolIndexer::listen",
message: `Indexed ${events.length} ${chain} events in ${Math.round((tStop - tStart) / 1000)} seconds`,
searchConfig,
});
return events;
};

const { number: toBlock, timestamp: currentTime } = await provider.getBlock("latest");
const fromBlock = Math.max(toBlock - (lookback ?? deploymentBlock), deploymentBlock);
assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`);
const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange };

const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]);
const events = await pollEvents(filter, searchConfig);
postEvents(toBlock, currentTime, events);
export async function scrapeEvents(spokePool: Contract, eventNames: string[], opts: ScraperOpts): Promise<void> {
const { number: toBlock, timestamp: currentTime } = await spokePool.provider.getBlock("latest");
const events = await Promise.all(
eventNames.map((eventName) => _scrapeEvents(spokePool, eventName, { ...opts, toBlock }, logger))
);

if (!stop) {
postEvents(toBlock, oldestTime, currentTime, events.flat());
}
}

/**
Expand Down Expand Up @@ -178,7 +82,9 @@ async function listen(

// Post an update to the parent. Do this irrespective of whether there were new events or not, since there's
// information in blockNumber and currentTime alone.
postEvents(blockNumber, currentTime, events);
if (!stop) {
postEvents(blockNumber, oldestTime, currentTime, events);
}
});

// Add a handler for each new instance of a subscribed event.
Expand All @@ -191,7 +97,9 @@ async function listen(
if (event.removed) {
eventMgr.remove(event, host);
// Notify the parent immediately in case the event was already submitted.
removeEvent(event);
if (!stop) {
removeEvent(event);
}
} else {
eventMgr.add(event, host);
}
Expand Down Expand Up @@ -277,10 +185,7 @@ async function run(argv: string[]): Promise<void> {
if (latestBlock.number > startBlock) {
const events = ["V3FundsDeposited", "FilledV3Relay", "RelayedRootBundle", "ExecutedRelayerRefundRoot"];
const _spokePool = spokePool.connect(quorumProvider);
await Promise.all([
resolveOldestTime(_spokePool, startBlock),
...events.map((event) => scrapeEvents(_spokePool, event, opts)),
]);
await Promise.all([resolveOldestTime(_spokePool, startBlock), scrapeEvents(_spokePool, events, opts)]);
}

// If no lookback was specified then default to the timestamp of the latest block.
Expand Down
9 changes: 9 additions & 0 deletions src/libexec/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export { Log } from "../interfaces";
export { SpokePoolClientMessage } from "../clients";

export type ScraperOpts = {
lookback?: number; // Event lookback (in seconds).
deploymentBlock: number; // SpokePool deployment block
maxBlockRange?: number; // Maximum block range for paginated getLogs queries.
filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply.
};
1 change: 1 addition & 0 deletions src/libexec/util/evm/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./util";
69 changes: 69 additions & 0 deletions src/libexec/util/evm/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import assert from "assert";
import { Contract, EventFilter } from "ethers";
import { getNetworkName, isDefined, paginatedEventQuery, winston } from "../../../utils";
import { Log, ScraperOpts } from "../../types";

/**
* Given an event name and contract, return the corresponding Ethers EventFilter object.
* @param contract Ethers Constract instance.
* @param eventName The name of the event to be filtered.
* @param filterArgs Optional filter arguments to be applied.
* @returns An Ethers EventFilter instance.
*/
export function getEventFilter(contract: Contract, eventName: string, filterArgs?: string[]): EventFilter {
const filter = contract.filters[eventName];
if (!isDefined(filter)) {
throw new Error(`Event ${eventName} not defined for contract`);
}

return isDefined(filterArgs) ? filter(...filterArgs) : filter();
}

/**
* Get a general event filter mapping to be used for filtering SpokePool contract events.
* This is currently only useful for filtering the relayer address on FilledV3Relay events.
* @param relayer Optional relayer address to filter on.
* @returns An argument array for input to an Ethers EventFilter.
*/
export function getEventFilterArgs(relayer?: string): { [event: string]: (null | string)[] } {
const FilledV3Relay = !isDefined(relayer)
? undefined
: [null, null, null, null, null, null, null, null, null, null, relayer];

return { FilledV3Relay };
}

/**
* Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the
* parent process (if defined).
* @param spokePool Ethers Constract instance.
* @param eventName The name of the event to be filtered.
* @param opts Options to configure event scraping behaviour.
* @returns void
*/
export async function scrapeEvents(
spokePool: Contract,
eventName: string,
opts: ScraperOpts & { toBlock: number },
logger: winston.Logger
): Promise<Log[]> {
const { lookback, deploymentBlock, filterArgs, maxBlockRange, toBlock } = opts;
const { chainId } = await spokePool.provider.getNetwork();
const chain = getNetworkName(chainId);

const fromBlock = Math.max(toBlock - (lookback ?? deploymentBlock), deploymentBlock);
assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`);
const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange };

const tStart = performance.now();
const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]);
const events = await paginatedEventQuery(spokePool, filter, searchConfig);
const tStop = performance.now();
logger.debug({
at: "scrapeEvents",
message: `Scraped ${events.length} ${chain} ${eventName} events in ${Math.round((tStop - tStart) / 1000)} seconds`,
searchConfig,
});

return events;
}
43 changes: 43 additions & 0 deletions src/libexec/util/ipc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { utils as sdkUtils } from "@across-protocol/sdk";
import { isDefined, sortEventsAscending } from "../../utils";
import { Log, SpokePoolClientMessage } from "./../types";

/**
* Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent
* process (if defined).
* @param blockNumber Block number up to which the update applies.
* @param currentTime The SpokePool timestamp at blockNumber.
* @param events An array of Log objects to be submitted.
* @returns void
*/
export function postEvents(blockNumber: number, oldestTime: number, currentTime: number, events: Log[]): void {
if (!isDefined(process.send)) {
return;
}

events = sortEventsAscending(events);
const message: SpokePoolClientMessage = {
blockNumber,
currentTime,
oldestTime,
nEvents: events.length,
data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given an event removal notification, post the message to the parent process.
* @param event Log instance.
* @returns void
*/
export function removeEvent(event: Log): void {
if (!isDefined(process.send)) {
return;
}

const message: SpokePoolClientMessage = {
event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

0 comments on commit 4a39db4

Please sign in to comment.