Skip to content

Commit

Permalink
[WIP] improve(relayer): More robust fast relayer event ingestion (#1715)
Browse files Browse the repository at this point in the history
The fast relayer currently imposes a finality overlay onto each indexer
process, such that each process will wait for blocks, and will then
forward events that have met quorum, or discard them. The actual number
of blocks that is used for is the lowest MDC tier. For most chains this
is 1 or 2 blocks, and for Polygon this is ~32.

This is fragile on chains with very short block times and results in
some events being dropped due to delayed delivery of events by some RPC
providers. Additionally, on chains with a high number of MDCs, it
imposes an a long delay on relaying the events. This can be meaningful
in the case of FilledV3Relay events, which are now used by the relayer
to track its origin chain commitments.

With this change, retain events for a longer period and forward them if
and when they reach the minimum configured quorum. This is much more
robust and should result in a greater number of events making it through
to the relayer. I thought about an ejection policy for events that are
retained for a long period of time but didn't settle on anything yet.
Perhaps some simple number like 64 blocks would be fine for all chains.

A specific consideration with this change was the potential for events
to be ingested by the SpokePoolClient "out of order". The impact of this
seems to be very low or non-existent, with the observation that the
SpokePoolClient tends to store deposit and fill events in mappings and
sorts them on demand.
  • Loading branch information
pxrl authored Aug 2, 2024
1 parent 499c760 commit 10e0ab6
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 86 deletions.
6 changes: 1 addition & 5 deletions src/clients/SpokePoolClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { EventsAddedMessage, EventRemovedMessage } from "../utils/SuperstructUti
export type SpokePoolClient = clients.SpokePoolClient;

export type IndexerOpts = {
finality: number;
path?: string;
};

Expand Down Expand Up @@ -37,7 +36,6 @@ export function isSpokePoolEventRemoved(message: unknown): message is SpokePoolE

export class IndexedSpokePoolClient extends clients.SpokePoolClient {
public readonly chain: string;
public readonly finality: number;
public readonly indexerPath: string;

private worker: ChildProcess;
Expand All @@ -63,7 +61,6 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient {
super(logger, spokePool, hubPoolClient, chainId, deploymentBlock, eventSearchConfig);

this.chain = getNetworkName(chainId);
this.finality = opts.finality;
this.indexerPath = opts.path ?? RELAYER_DEFAULT_SPOKEPOOL_INDEXER;

this.pendingBlockNumber = deploymentBlock;
Expand All @@ -80,10 +77,9 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient {
*/
protected startWorker(): void {
const {
finality,
eventSearchConfig: { fromBlock, maxBlockLookBack: blockRange },
} = this;
const opts = { finality, blockRange, lookback: `@${fromBlock}` };
const opts = { blockRange, lookback: `@${fromBlock}` };

const args = Object.entries(opts)
.map(([k, v]) => [`--${k}`, `${v}`])
Expand Down
7 changes: 2 additions & 5 deletions src/libexec/RelayerSpokePoolIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type WebSocketProvider = ethersProviders.WebSocketProvider;
type EventSearchConfig = sdkUtils.EventSearchConfig;
type ScraperOpts = {
lookback?: number; // Event lookback (in seconds).
finality?: number; // Event finality (in blocks).
deploymentBlock: number; // SpokePool deployment block
maxBlockRange?: number; // Maximum block range for paginated getLogs queries.
filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply.
Expand Down Expand Up @@ -213,9 +212,8 @@ async function run(argv: string[]): Promise<void> {
};
const args = minimist(argv, minimistOpts);

const { chainId, finality = 32, lookback, relayer = null, maxBlockRange = 10_000 } = args;
const { chainId, lookback, relayer = null, maxBlockRange = 10_000 } = args;
assert(Number.isInteger(chainId), "chainId must be numeric ");
assert(Number.isInteger(finality), "finality must be numeric ");
assert(Number.isInteger(maxBlockRange), "maxBlockRange must be numeric");
assert(!isDefined(relayer) || ethersUtils.isAddress(relayer), `relayer address is invalid (${relayer})`);

Expand Down Expand Up @@ -246,7 +244,6 @@ async function run(argv: string[]): Promise<void> {
}

const opts = {
finality,
quorum,
deploymentBlock,
lookback: latestBlock.number - startBlock,
Expand Down Expand Up @@ -290,7 +287,7 @@ async function run(argv: string[]): Promise<void> {

// Events to listen for.
const events = ["V3FundsDeposited", "RequestedSpeedUpV3Deposit", "FilledV3Relay"];
const eventMgr = new EventManager(logger, chainId, finality, quorum);
const eventMgr = new EventManager(logger, chainId, quorum);
do {
let providers: WebSocketProvider[] = [];
try {
Expand Down
2 changes: 0 additions & 2 deletions src/relayer/RelayerClientHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ export async function constructRelayerClients(
if (config.externalIndexer) {
spokePoolClients = Object.fromEntries(
await sdkUtils.mapAsync(enabledChains ?? configStoreClient.getEnabledChains(), async (chainId) => {
const finality = config.minDepositConfirmations[chainId].at(0)?.minConfirmations ?? 1024;
const opts = {
finality,
lookback: config.maxRelayerLookBack,
blockRange: config.maxBlockLookBack[chainId],
};
Expand Down
51 changes: 22 additions & 29 deletions src/utils/EventUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,26 @@ export function getUniqueLogIndex(events: { transactionHash: string }[]): number
return logIndexesForMessage;
}

type QuorumEvent = Event & { providers: string[] };

/**
* EventManager can be used to obtain basic quorum validation of events emitted by multiple providers.
* This can be useful with WebSockets, where events are emitted asynchronously.
* This feature should eventually evolve into a wrapper for the Ethers WebSocketProvider type.
*/
export class EventManager {
public readonly chain: string;
public readonly events: { [blockNumber: number]: (Event & { providers: string[] })[] } = {};
public readonly finality: number;
public readonly events: { [blockNumber: number]: QuorumEvent[] } = {};

private blockNumber: number;

constructor(
private readonly logger: winston.Logger,
public readonly chainId: number,
finality: number,
public readonly quorum: number
) {
this.chain = getNetworkName(chainId);
this.blockNumber = 0;
this.finality = Math.max(finality, 1);
}

/**
Expand All @@ -83,7 +82,7 @@ export class EventManager {
* @param event Event to search for.
* @returns The matching event, or undefined.
*/
findEvent(event: Event): (Event & { providers: string[] }) | undefined {
findEvent(event: Event): QuorumEvent | undefined {
return this.events[event.blockNumber]?.find(
(storedEvent) =>
storedEvent.logIndex === event.logIndex &&
Expand Down Expand Up @@ -163,39 +162,33 @@ export class EventManager {
}

/**
* Record a new block. This function triggers the existing queue of pending events to be evaluated for basic finality.
* Events meeting finality criteria are submitted to the parent process (if defined). Events submitted are
* Record a new block. This function triggers the existing queue of pending events to be evaluated for quorum.
* Events meeting quorum criteria are submitted to the parent process (if defined). Events submitted are
* subsequently flushed from this class.
* @param blockNumber Number of the latest block.
* @returns void
*/
tick(blockNumber: number): Event[] {
this.blockNumber = blockNumber > this.blockNumber ? blockNumber : this.blockNumber;

// After `finality` blocks behind head, events for a block are considered finalised.
// This is configurable and will almost always be less than chain finality guarantees.
const finalised = blockNumber - this.finality;

// Collect the events that met quorum, stripping out the provider information; drop any that didn't.
// This can be brittle when finality is low (i.e. 1). @todo: Support querying back over multiple blocks
// to account for RPC notification delays.
const events = (this.events[finalised] ?? [])
.filter((event) => {
const eventQuorum = this.getEventQuorum(event);
if (this.quorum > eventQuorum) {
this.logger.debug({
at: "EventManager::tick",
message: `Dropped ${this.chain} ${event.event} event due to insufficient quorum.`,
});
const blockNumbers = Object.keys(this.events)
.map(Number)
.sort((x, y) => x - y);
const quorumEvents: QuorumEvent[] = [];

blockNumbers.forEach((blockNumber) => {
// Filter out events that have reached quorum for propagation.
this.events[blockNumber] = this.events[blockNumber].filter((event) => {
if (this.quorum > this.getEventQuorum(event)) {
return true; // No quorum; retain for next time.
}
return eventQuorum >= this.quorum;
})
.map(({ providers, ...event }) => event);

// Flush the events that were just submitted.
delete this.events[finalised];
quorumEvents.push(event);
return false;
});
});

return events;
// Strip out the quorum information before returning.
return quorumEvents.map(({ providers, ...event }) => event);
}

/**
Expand Down
70 changes: 25 additions & 45 deletions test/EventManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ describe("EventManager: Event Handling ", async function () {

let logger: winston.Logger;
let eventMgr: EventManager;
let finality: number, quorum: number;
let quorum: number;

beforeEach(async function () {
({ spyLogger: logger } = createSpyLogger());
quorum = 1;
finality = 5;
eventMgr = new EventManager(logger, chainId, finality, quorum);
quorum = 2;
eventMgr = new EventManager(logger, chainId, quorum);
});

it("Correctly applies quorum on added events", async function () {
Expand All @@ -76,74 +75,50 @@ describe("EventManager: Event Handling ", async function () {
});
});

it("Waits for finality before confirming events", async function () {
const [provider] = providers;
expect(quorum).to.equal(1);

expect(finality).to.be.greaterThan(1);
const finalisedBlock = eventTemplate.blockNumber + finality;
it("Waits for quorum before relaying events", async function () {
const [provider1, provider2] = providers;
expect(quorum).to.equal(2);

eventMgr.add(eventTemplate, provider);
eventMgr.add(eventTemplate, provider1);

// The added event should not be returned when the blockNumber is less than `finalisedBlock`.
for (let blockNumber = 0; blockNumber < finalisedBlock; ++blockNumber) {
// The added event should not be returned despite the blockNumber increasing.
let blockNumber: number;
for (blockNumber = 0; blockNumber < 10; ++blockNumber) {
const events = eventMgr.tick(blockNumber);
expect(events.length).to.equal(0);
}

// At `finalisedBlock` the event should be returned.
let events = eventMgr.tick(finalisedBlock);
eventMgr.add(eventTemplate, provider2);
let events = eventMgr.tick(blockNumber);
expect(events.length).to.equal(1);
expect(events[0]).to.deep.equal(eventTemplate);

// After `finalisedBlock`, no further events are available.
events = eventMgr.tick(finalisedBlock + 1);
// No further events are available.
events = eventMgr.tick(++blockNumber);
expect(events.length).to.equal(0);
});

it("Only emits finalised events that met quorum", async function () {
quorum = 2;
eventMgr = new EventManager(logger, chainId, finality, quorum);

expect(finality).to.be.greaterThan(1);
const finalisedBlock = eventTemplate.blockNumber + finality;

// Add an event from the first provider.
eventMgr.add(eventTemplate, providers[0]);
const eventQuorum = eventMgr.getEventQuorum(eventTemplate);
expect(eventQuorum).to.equal(1);

// Simulate finality on the event with insufficient quorum. It should be suppressed.
let events = eventMgr.tick(finalisedBlock);
expect(events.length).to.equal(0);

// Add the same event from the 2nd provider. It shouldn't ever be
// confirmed because the block height is now ahead of the event.
eventMgr.add(eventTemplate, providers[1]);
events = eventMgr.tick(finalisedBlock);
expect(events.length).to.equal(0);
});

it("Drops removed events before finality", async function () {
it("Drops removed events before quorum", async function () {
const removed = true;
expect(quorum).to.equal(1);
expect(quorum).to.equal(2);

const [provider] = providers;
const [provider1, provider2] = providers;

// Add the event once (not finalised).
eventMgr.add(eventTemplate, provider);
eventMgr.add(eventTemplate, provider1);
let events = eventMgr.tick(eventTemplate.blockNumber + 1);
expect(events.length).to.equal(0);
let eventQuorum = eventMgr.getEventQuorum(eventTemplate);
expect(eventQuorum).to.equal(1);

// Remove the event after notification by the same provider.
eventMgr.remove({ ...eventTemplate, removed }, provider);
eventMgr.remove({ ...eventTemplate, removed }, provider1);
eventQuorum = eventMgr.getEventQuorum(eventTemplate);
expect(eventQuorum).to.equal(0);

// Re-add the same event.
eventMgr.add(eventTemplate, provider);
eventMgr.add(eventTemplate, provider1);
events = eventMgr.tick(eventTemplate.blockNumber + 1);
expect(events.length).to.equal(0);
eventQuorum = eventMgr.getEventQuorum(eventTemplate);
Expand All @@ -153,5 +128,10 @@ describe("EventManager: Event Handling ", async function () {
eventMgr.remove({ ...eventTemplate, removed }, "randomProvider");
eventQuorum = eventMgr.getEventQuorum(eventTemplate);
expect(eventQuorum).to.equal(0);

// Add the same event from provider2. There should be no quorum.
eventMgr.add(eventTemplate, provider2);
events = eventMgr.tick(eventTemplate.blockNumber + 1);
expect(events.length).to.equal(0);
});
});

0 comments on commit 10e0ab6

Please sign in to comment.