Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(relayer): Factor out common listener utils #1841

Merged
merged 10 commits into from
Sep 30, 2024
139 changes: 22 additions & 117 deletions src/libexec/RelayerSpokePoolIndexer.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import assert from "assert";
import minimist from "minimist";
import { Contract, EventFilter, providers as ethersProviders, utils as ethersUtils } from "ethers";
import { Contract, providers as ethersProviders, utils as ethersUtils } from "ethers";
import { utils as sdkUtils } from "@across-protocol/sdk";
import * as utils from "../../scripts/utils";
import { Log } from "../interfaces";
import { SpokePoolClientMessage } from "../clients";
import {
disconnectRedisClients,
EventManager,
Expand All @@ -19,20 +17,13 @@ import {
getRedisCache,
getWSProviders,
Logger,
paginatedEventQuery,
sortEventsAscending,
winston,
} from "../utils";
import { postEvents, removeEvent } from "./util/ipc";
import { ScraperOpts } from "./types";
import { getEventFilter, getEventFilterArgs, scrapeEvents as _scrapeEvents } from "./util/evm";

type WebSocketProvider = ethersProviders.WebSocketProvider;
type EventSearchConfig = sdkUtils.EventSearchConfig;
type ScraperOpts = {
lookback?: number; // Event lookback (in seconds).
deploymentBlock: number; // SpokePool deployment block
maxBlockRange?: number; // Maximum block range for paginated getLogs queries.
filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply.
};

const { NODE_SUCCESS, NODE_APP_ERR } = utils;

const INDEXER_POLLING_PERIOD = 2_000; // ms; time to sleep between checking for exit request via SIGHUP.
Expand All @@ -45,108 +36,21 @@ let stop = false;
let oldestTime = 0;

/**
* Given an event name and contract, return the corresponding Ethers EventFilter object.
* @param contract Ethers Constract instance.
* @param eventName The name of the event to be filtered.
* @param filterArgs Optional filter arguments to be applied.
* @returns An Ethers EventFilter instance.
*/
function getEventFilter(contract: Contract, eventName: string, filterArgs?: string[]): EventFilter {
const filter = contract.filters[eventName];
if (!isDefined(filter)) {
throw new Error(`Event ${eventName} not defined for contract`);
}

return isDefined(filterArgs) ? filter(...filterArgs) : filter();
}

function getEventFilterArgs(relayer?: string): { [event: string]: string[] } {
const FilledV3Relay = !isDefined(relayer)
? undefined
: [null, null, null, null, null, null, null, null, null, null, relayer];

return { FilledV3Relay };
}

/**
* Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent
* process (if defined).
* @param blockNumber Block number up to which the update applies.
* @param currentTime The SpokePool timestamp at blockNumber.
* @param events An array of Log objects to be submitted.
* @returns void
*/
function postEvents(blockNumber: number, currentTime: number, events: Log[]): void {
if (!isDefined(process.send) || stop) {
return;
}

// Drop the array component of event.args and retain the named k/v pairs,
// otherwise stringification tends to retain only the array.
events = sortEventsAscending(events);

const message: SpokePoolClientMessage = {
blockNumber,
currentTime,
oldestTime,
nEvents: events.length,
data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given an event removal notification, post the message to the parent process.
* @param event Log instance.
* @returns void
*/
function removeEvent(event: Log): void {
if (!isDefined(process.send) || stop) {
return;
}

const message: SpokePoolClientMessage = {
event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the
* parent process (if defined).
* Aggregate utils/scrapeEvents for a series of event names.
* @param spokePool Ethers Constract instance.
* @param eventName The name of the event to be filtered.
* @param eventNames The array of events to be queried.
* @param opts Options to configure event scraping behaviour.
* @returns void
*/
async function scrapeEvents(spokePool: Contract, eventName: string, opts: ScraperOpts): Promise<void> {
const { lookback, deploymentBlock, filterArgs, maxBlockRange } = opts;
const { provider } = spokePool;
const { chainId } = await provider.getNetwork();
const chain = getNetworkName(chainId);

let tStart: number, tStop: number;

const pollEvents = async (filter: EventFilter, searchConfig: EventSearchConfig): Promise<Log[]> => {
tStart = performance.now();
const events = await paginatedEventQuery(spokePool, filter, searchConfig);
tStop = performance.now();
logger.debug({
at: "SpokePoolIndexer::listen",
message: `Indexed ${events.length} ${chain} events in ${Math.round((tStop - tStart) / 1000)} seconds`,
searchConfig,
});
return events;
};

const { number: toBlock, timestamp: currentTime } = await provider.getBlock("latest");
const fromBlock = Math.max(toBlock - (lookback ?? deploymentBlock), deploymentBlock);
assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`);
const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange };

const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]);
const events = await pollEvents(filter, searchConfig);
postEvents(toBlock, currentTime, events);
export async function scrapeEvents(spokePool: Contract, eventNames: string[], opts: ScraperOpts): Promise<void> {
const { number: toBlock, timestamp: currentTime } = await spokePool.provider.getBlock("latest");
const events = await Promise.all(
eventNames.map((eventName) => _scrapeEvents(spokePool, eventName, { ...opts, toBlock }, logger))
);

if (!stop) {
postEvents(toBlock, oldestTime, currentTime, events.flat());
}
}

/**
Expand Down Expand Up @@ -178,7 +82,9 @@ async function listen(

// Post an update to the parent. Do this irrespective of whether there were new events or not, since there's
// information in blockNumber and currentTime alone.
postEvents(blockNumber, currentTime, events);
if (!stop) {
postEvents(blockNumber, oldestTime, currentTime, events);
}
});

// Add a handler for each new instance of a subscribed event.
Expand All @@ -191,7 +97,9 @@ async function listen(
if (event.removed) {
eventMgr.remove(event, host);
// Notify the parent immediately in case the event was already submitted.
removeEvent(event);
if (!stop) {
removeEvent(event);
}
} else {
eventMgr.add(event, host);
}
Expand Down Expand Up @@ -277,10 +185,7 @@ async function run(argv: string[]): Promise<void> {
if (latestBlock.number > startBlock) {
const events = ["V3FundsDeposited", "FilledV3Relay", "RelayedRootBundle", "ExecutedRelayerRefundRoot"];
const _spokePool = spokePool.connect(quorumProvider);
await Promise.all([
resolveOldestTime(_spokePool, startBlock),
...events.map((event) => scrapeEvents(_spokePool, event, opts)),
]);
await Promise.all([resolveOldestTime(_spokePool, startBlock), scrapeEvents(_spokePool, events, opts)]);
}

// If no lookback was specified then default to the timestamp of the latest block.
Expand Down
9 changes: 9 additions & 0 deletions src/libexec/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export { Log } from "../interfaces";
export { SpokePoolClientMessage } from "../clients";

export type ScraperOpts = {
lookback?: number; // Event lookback (in seconds).
deploymentBlock: number; // SpokePool deployment block
pxrl marked this conversation as resolved.
Show resolved Hide resolved
maxBlockRange?: number; // Maximum block range for paginated getLogs queries.
filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply.
};
1 change: 1 addition & 0 deletions src/libexec/util/evm/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./util";
69 changes: 69 additions & 0 deletions src/libexec/util/evm/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import assert from "assert";
import { Contract, EventFilter } from "ethers";
import { getNetworkName, isDefined, paginatedEventQuery, winston } from "../../../utils";
import { Log, ScraperOpts } from "../../types";

/**
* Given an event name and contract, return the corresponding Ethers EventFilter object.
* @param contract Ethers Constract instance.
* @param eventName The name of the event to be filtered.
* @param filterArgs Optional filter arguments to be applied.
* @returns An Ethers EventFilter instance.
*/
export function getEventFilter(contract: Contract, eventName: string, filterArgs?: string[]): EventFilter {
const filter = contract.filters[eventName];
if (!isDefined(filter)) {
throw new Error(`Event ${eventName} not defined for contract`);
}

return isDefined(filterArgs) ? filter(...filterArgs) : filter();
}

/**
* Get a general event filter mapping to be used for filtering SpokePool contract events.
* This is currently only useful for filtering the relayer address on FilledV3Relay events.
* @param relayer Optional relayer address to filter on.
* @returns An argument array for input to an Ethers EventFilter.
*/
export function getEventFilterArgs(relayer?: string): { [event: string]: (null | string)[] } {
const FilledV3Relay = !isDefined(relayer)
? undefined
: [null, null, null, null, null, null, null, null, null, null, relayer];

return { FilledV3Relay };
}

/**
* Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the
* parent process (if defined).
* @param spokePool Ethers Constract instance.
* @param eventName The name of the event to be filtered.
* @param opts Options to configure event scraping behaviour.
* @returns void
*/
export async function scrapeEvents(
spokePool: Contract,
eventName: string,
opts: ScraperOpts & { toBlock: number },
logger: winston.Logger
): Promise<Log[]> {
const { lookback, deploymentBlock, filterArgs, maxBlockRange, toBlock } = opts;
const { chainId } = await spokePool.provider.getNetwork();
const chain = getNetworkName(chainId);

const fromBlock = Math.max(toBlock - (lookback ?? deploymentBlock), deploymentBlock);
assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`);
const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange };

const tStart = performance.now();
const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]);
const events = await paginatedEventQuery(spokePool, filter, searchConfig);
const tStop = performance.now();
logger.debug({
at: "scrapeEvents",
message: `Scraped ${events.length} ${chain} ${eventName} events in ${Math.round((tStop - tStart) / 1000)} seconds`,
searchConfig,
});

return events;
}
43 changes: 43 additions & 0 deletions src/libexec/util/ipc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { utils as sdkUtils } from "@across-protocol/sdk";
import { isDefined, sortEventsAscending } from "../../utils";
import { Log, SpokePoolClientMessage } from "./../types";

/**
* Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent
* process (if defined).
* @param blockNumber Block number up to which the update applies.
* @param currentTime The SpokePool timestamp at blockNumber.
* @param events An array of Log objects to be submitted.
* @returns void
*/
export function postEvents(blockNumber: number, oldestTime: number, currentTime: number, events: Log[]): void {
if (!isDefined(process.send)) {
return;
}

events = sortEventsAscending(events);
const message: SpokePoolClientMessage = {
blockNumber,
currentTime,
oldestTime,
nEvents: events.length,
data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given an event removal notification, post the message to the parent process.
* @param event Log instance.
* @returns void
*/
export function removeEvent(event: Log): void {
if (!isDefined(process.send)) {
return;
}

const message: SpokePoolClientMessage = {
event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}