From 10e0ab66e65dff2d682c4b045aebd9bb42713833 Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Fri, 2 Aug 2024 11:38:35 +0200 Subject: [PATCH] [WIP] improve(relayer): More robust fast relayer event ingestion (#1715) The fast relayer currently imposes a finality overlay onto each indexer process, such that each process will wait for blocks, and will then forward events that have met quorum, or discard them. The actual number of blocks that is used for is the lowest MDC tier. For most chains this is 1 or 2 blocks, and for Polygon this is ~32. This is fragile on chains with very short block times and results in some events being dropped due to delayed delivery of events by some RPC providers. Additionally, on chains with a high number of MDCs, it imposes an a long delay on relaying the events. This can be meaningful in the case of FilledV3Relay events, which are now used by the relayer to track its origin chain commitments. With this change, retain events for a longer period and forward them if and when they reach the minimum configured quorum. This is much more robust and should result in a greater number of events making it through to the relayer. I thought about an ejection policy for events that are retained for a long period of time but didn't settle on anything yet. Perhaps some simple number like 64 blocks would be fine for all chains. A specific consideration with this change was the potential for events to be ingested by the SpokePoolClient "out of order". The impact of this seems to be very low or non-existent, with the observation that the SpokePoolClient tends to store deposit and fill events in mappings and sorts them on demand. --- src/clients/SpokePoolClient.ts | 6 +-- src/libexec/RelayerSpokePoolIndexer.ts | 7 +-- src/relayer/RelayerClientHelper.ts | 2 - src/utils/EventUtils.ts | 51 ++++++++----------- test/EventManager.ts | 70 +++++++++----------------- 5 files changed, 50 insertions(+), 86 deletions(-) diff --git a/src/clients/SpokePoolClient.ts b/src/clients/SpokePoolClient.ts index 7edf9b3332..0b9b3c2a4e 100644 --- a/src/clients/SpokePoolClient.ts +++ b/src/clients/SpokePoolClient.ts @@ -9,7 +9,6 @@ import { EventsAddedMessage, EventRemovedMessage } from "../utils/SuperstructUti export type SpokePoolClient = clients.SpokePoolClient; export type IndexerOpts = { - finality: number; path?: string; }; @@ -37,7 +36,6 @@ export function isSpokePoolEventRemoved(message: unknown): message is SpokePoolE export class IndexedSpokePoolClient extends clients.SpokePoolClient { public readonly chain: string; - public readonly finality: number; public readonly indexerPath: string; private worker: ChildProcess; @@ -63,7 +61,6 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient { super(logger, spokePool, hubPoolClient, chainId, deploymentBlock, eventSearchConfig); this.chain = getNetworkName(chainId); - this.finality = opts.finality; this.indexerPath = opts.path ?? RELAYER_DEFAULT_SPOKEPOOL_INDEXER; this.pendingBlockNumber = deploymentBlock; @@ -80,10 +77,9 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient { */ protected startWorker(): void { const { - finality, eventSearchConfig: { fromBlock, maxBlockLookBack: blockRange }, } = this; - const opts = { finality, blockRange, lookback: `@${fromBlock}` }; + const opts = { blockRange, lookback: `@${fromBlock}` }; const args = Object.entries(opts) .map(([k, v]) => [`--${k}`, `${v}`]) diff --git a/src/libexec/RelayerSpokePoolIndexer.ts b/src/libexec/RelayerSpokePoolIndexer.ts index c71606ff45..cddde92adb 100644 --- a/src/libexec/RelayerSpokePoolIndexer.ts +++ b/src/libexec/RelayerSpokePoolIndexer.ts @@ -29,7 +29,6 @@ type WebSocketProvider = ethersProviders.WebSocketProvider; type EventSearchConfig = sdkUtils.EventSearchConfig; type ScraperOpts = { lookback?: number; // Event lookback (in seconds). - finality?: number; // Event finality (in blocks). deploymentBlock: number; // SpokePool deployment block maxBlockRange?: number; // Maximum block range for paginated getLogs queries. filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply. @@ -213,9 +212,8 @@ async function run(argv: string[]): Promise { }; const args = minimist(argv, minimistOpts); - const { chainId, finality = 32, lookback, relayer = null, maxBlockRange = 10_000 } = args; + const { chainId, lookback, relayer = null, maxBlockRange = 10_000 } = args; assert(Number.isInteger(chainId), "chainId must be numeric "); - assert(Number.isInteger(finality), "finality must be numeric "); assert(Number.isInteger(maxBlockRange), "maxBlockRange must be numeric"); assert(!isDefined(relayer) || ethersUtils.isAddress(relayer), `relayer address is invalid (${relayer})`); @@ -246,7 +244,6 @@ async function run(argv: string[]): Promise { } const opts = { - finality, quorum, deploymentBlock, lookback: latestBlock.number - startBlock, @@ -290,7 +287,7 @@ async function run(argv: string[]): Promise { // Events to listen for. const events = ["V3FundsDeposited", "RequestedSpeedUpV3Deposit", "FilledV3Relay"]; - const eventMgr = new EventManager(logger, chainId, finality, quorum); + const eventMgr = new EventManager(logger, chainId, quorum); do { let providers: WebSocketProvider[] = []; try { diff --git a/src/relayer/RelayerClientHelper.ts b/src/relayer/RelayerClientHelper.ts index b7810ed45d..871d25b387 100644 --- a/src/relayer/RelayerClientHelper.ts +++ b/src/relayer/RelayerClientHelper.ts @@ -91,9 +91,7 @@ export async function constructRelayerClients( if (config.externalIndexer) { spokePoolClients = Object.fromEntries( await sdkUtils.mapAsync(enabledChains ?? configStoreClient.getEnabledChains(), async (chainId) => { - const finality = config.minDepositConfirmations[chainId].at(0)?.minConfirmations ?? 1024; const opts = { - finality, lookback: config.maxRelayerLookBack, blockRange: config.maxBlockLookBack[chainId], }; diff --git a/src/utils/EventUtils.ts b/src/utils/EventUtils.ts index d4ebb98e1c..561b5add80 100644 --- a/src/utils/EventUtils.ts +++ b/src/utils/EventUtils.ts @@ -52,6 +52,8 @@ export function getUniqueLogIndex(events: { transactionHash: string }[]): number return logIndexesForMessage; } +type QuorumEvent = Event & { providers: string[] }; + /** * EventManager can be used to obtain basic quorum validation of events emitted by multiple providers. * This can be useful with WebSockets, where events are emitted asynchronously. @@ -59,20 +61,17 @@ export function getUniqueLogIndex(events: { transactionHash: string }[]): number */ export class EventManager { public readonly chain: string; - public readonly events: { [blockNumber: number]: (Event & { providers: string[] })[] } = {}; - public readonly finality: number; + public readonly events: { [blockNumber: number]: QuorumEvent[] } = {}; private blockNumber: number; constructor( private readonly logger: winston.Logger, public readonly chainId: number, - finality: number, public readonly quorum: number ) { this.chain = getNetworkName(chainId); this.blockNumber = 0; - this.finality = Math.max(finality, 1); } /** @@ -83,7 +82,7 @@ export class EventManager { * @param event Event to search for. * @returns The matching event, or undefined. */ - findEvent(event: Event): (Event & { providers: string[] }) | undefined { + findEvent(event: Event): QuorumEvent | undefined { return this.events[event.blockNumber]?.find( (storedEvent) => storedEvent.logIndex === event.logIndex && @@ -163,39 +162,33 @@ export class EventManager { } /** - * Record a new block. This function triggers the existing queue of pending events to be evaluated for basic finality. - * Events meeting finality criteria are submitted to the parent process (if defined). Events submitted are + * Record a new block. This function triggers the existing queue of pending events to be evaluated for quorum. + * Events meeting quorum criteria are submitted to the parent process (if defined). Events submitted are * subsequently flushed from this class. * @param blockNumber Number of the latest block. * @returns void */ tick(blockNumber: number): Event[] { this.blockNumber = blockNumber > this.blockNumber ? blockNumber : this.blockNumber; - - // After `finality` blocks behind head, events for a block are considered finalised. - // This is configurable and will almost always be less than chain finality guarantees. - const finalised = blockNumber - this.finality; - - // Collect the events that met quorum, stripping out the provider information; drop any that didn't. - // This can be brittle when finality is low (i.e. 1). @todo: Support querying back over multiple blocks - // to account for RPC notification delays. - const events = (this.events[finalised] ?? []) - .filter((event) => { - const eventQuorum = this.getEventQuorum(event); - if (this.quorum > eventQuorum) { - this.logger.debug({ - at: "EventManager::tick", - message: `Dropped ${this.chain} ${event.event} event due to insufficient quorum.`, - }); + const blockNumbers = Object.keys(this.events) + .map(Number) + .sort((x, y) => x - y); + const quorumEvents: QuorumEvent[] = []; + + blockNumbers.forEach((blockNumber) => { + // Filter out events that have reached quorum for propagation. + this.events[blockNumber] = this.events[blockNumber].filter((event) => { + if (this.quorum > this.getEventQuorum(event)) { + return true; // No quorum; retain for next time. } - return eventQuorum >= this.quorum; - }) - .map(({ providers, ...event }) => event); - // Flush the events that were just submitted. - delete this.events[finalised]; + quorumEvents.push(event); + return false; + }); + }); - return events; + // Strip out the quorum information before returning. + return quorumEvents.map(({ providers, ...event }) => event); } /** diff --git a/test/EventManager.ts b/test/EventManager.ts index fc8a3a6782..e1e988452f 100644 --- a/test/EventManager.ts +++ b/test/EventManager.ts @@ -49,13 +49,12 @@ describe("EventManager: Event Handling ", async function () { let logger: winston.Logger; let eventMgr: EventManager; - let finality: number, quorum: number; + let quorum: number; beforeEach(async function () { ({ spyLogger: logger } = createSpyLogger()); - quorum = 1; - finality = 5; - eventMgr = new EventManager(logger, chainId, finality, quorum); + quorum = 2; + eventMgr = new EventManager(logger, chainId, quorum); }); it("Correctly applies quorum on added events", async function () { @@ -76,74 +75,50 @@ describe("EventManager: Event Handling ", async function () { }); }); - it("Waits for finality before confirming events", async function () { - const [provider] = providers; - expect(quorum).to.equal(1); - - expect(finality).to.be.greaterThan(1); - const finalisedBlock = eventTemplate.blockNumber + finality; + it("Waits for quorum before relaying events", async function () { + const [provider1, provider2] = providers; + expect(quorum).to.equal(2); - eventMgr.add(eventTemplate, provider); + eventMgr.add(eventTemplate, provider1); - // The added event should not be returned when the blockNumber is less than `finalisedBlock`. - for (let blockNumber = 0; blockNumber < finalisedBlock; ++blockNumber) { + // The added event should not be returned despite the blockNumber increasing. + let blockNumber: number; + for (blockNumber = 0; blockNumber < 10; ++blockNumber) { const events = eventMgr.tick(blockNumber); expect(events.length).to.equal(0); } // At `finalisedBlock` the event should be returned. - let events = eventMgr.tick(finalisedBlock); + eventMgr.add(eventTemplate, provider2); + let events = eventMgr.tick(blockNumber); expect(events.length).to.equal(1); expect(events[0]).to.deep.equal(eventTemplate); - // After `finalisedBlock`, no further events are available. - events = eventMgr.tick(finalisedBlock + 1); + // No further events are available. + events = eventMgr.tick(++blockNumber); expect(events.length).to.equal(0); }); - it("Only emits finalised events that met quorum", async function () { - quorum = 2; - eventMgr = new EventManager(logger, chainId, finality, quorum); - - expect(finality).to.be.greaterThan(1); - const finalisedBlock = eventTemplate.blockNumber + finality; - - // Add an event from the first provider. - eventMgr.add(eventTemplate, providers[0]); - const eventQuorum = eventMgr.getEventQuorum(eventTemplate); - expect(eventQuorum).to.equal(1); - - // Simulate finality on the event with insufficient quorum. It should be suppressed. - let events = eventMgr.tick(finalisedBlock); - expect(events.length).to.equal(0); - - // Add the same event from the 2nd provider. It shouldn't ever be - // confirmed because the block height is now ahead of the event. - eventMgr.add(eventTemplate, providers[1]); - events = eventMgr.tick(finalisedBlock); - expect(events.length).to.equal(0); - }); - - it("Drops removed events before finality", async function () { + it("Drops removed events before quorum", async function () { const removed = true; - expect(quorum).to.equal(1); + expect(quorum).to.equal(2); - const [provider] = providers; + const [provider1, provider2] = providers; // Add the event once (not finalised). - eventMgr.add(eventTemplate, provider); + eventMgr.add(eventTemplate, provider1); let events = eventMgr.tick(eventTemplate.blockNumber + 1); expect(events.length).to.equal(0); let eventQuorum = eventMgr.getEventQuorum(eventTemplate); expect(eventQuorum).to.equal(1); // Remove the event after notification by the same provider. - eventMgr.remove({ ...eventTemplate, removed }, provider); + eventMgr.remove({ ...eventTemplate, removed }, provider1); eventQuorum = eventMgr.getEventQuorum(eventTemplate); expect(eventQuorum).to.equal(0); // Re-add the same event. - eventMgr.add(eventTemplate, provider); + eventMgr.add(eventTemplate, provider1); events = eventMgr.tick(eventTemplate.blockNumber + 1); expect(events.length).to.equal(0); eventQuorum = eventMgr.getEventQuorum(eventTemplate); @@ -153,5 +128,10 @@ describe("EventManager: Event Handling ", async function () { eventMgr.remove({ ...eventTemplate, removed }, "randomProvider"); eventQuorum = eventMgr.getEventQuorum(eventTemplate); expect(eventQuorum).to.equal(0); + + // Add the same event from provider2. There should be no quorum. + eventMgr.add(eventTemplate, provider2); + events = eventMgr.tick(eventTemplate.blockNumber + 1); + expect(events.length).to.equal(0); }); });