From f818eb79d122db945b132a8812c7ed96bb33587b Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:05:56 +0200 Subject: [PATCH] refactor(relayer): Rearrange external listener loop (#1766) Rather than trying to recover a websocket error, just log it and try to keep going. Eventually it will bail; the parent can then be responsible for restarting it if necessary. --- src/clients/SpokePoolClient.ts | 21 ++++++++- src/libexec/RelayerSpokePoolIndexer.ts | 60 +++++++++++--------------- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/src/clients/SpokePoolClient.ts b/src/clients/SpokePoolClient.ts index 04048de2d7..f99981b82e 100644 --- a/src/clients/SpokePoolClient.ts +++ b/src/clients/SpokePoolClient.ts @@ -97,8 +97,25 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient { } stopWorker(): void { - this.worker.disconnect(); - this.worker.kill("SIGKILL"); + if (this.worker.connected) { + this.worker.disconnect(); + } else { + this.logger.warn({ + at: "SpokePoolClient#stopWorker", + message: `Skipped disconnecting on ${this.chain} SpokePool listener (already disconnected).`, + }); + } + + const { exitCode } = this.worker; + if (exitCode === null) { + this.worker.kill("SIGKILL"); + } else { + this.logger.warn({ + at: "SpokePoolClient#stopWorker", + message: `Skipped SIGKILL on ${this.chain} SpokePool listener (already exited).`, + exitCode, + }); + } } /** diff --git a/src/libexec/RelayerSpokePoolIndexer.ts b/src/libexec/RelayerSpokePoolIndexer.ts index 54d56eddad..70f03bf35b 100644 --- a/src/libexec/RelayerSpokePoolIndexer.ts +++ b/src/libexec/RelayerSpokePoolIndexer.ts @@ -288,44 +288,34 @@ async function run(argv: string[]): Promise { // Events to listen for. const events = ["V3FundsDeposited", "RequestedSpeedUpV3Deposit", "FilledV3Relay"]; const eventMgr = new EventManager(logger, chainId, quorum); - do { - let providers: WebSocketProvider[] = []; - try { - providers = getWSProviders(chainId, quorum); - assert(providers.length > 0, `Insufficient providers for ${chain} (required ${quorum} by quorum)`); - providers.forEach((provider) => { - provider._websocket.on("error", (err) => { - logger.debug({ - at: "RelayerSpokePoolIndexer::run", - message: `Caught ${chain} provider error.`, - provider: getOriginFromURL(provider.connection.url), - err, - }); - }); - - provider._websocket.on("close", () => { - logger.debug({ - at: "RelayerSpokePoolIndexer::run", - message: `${chain} provider connection closed.`, - provider: getOriginFromURL(provider.connection.url), - }); - }); - }); - - logger.debug({ at: "RelayerSpokePoolIndexer::run", message: `Starting ${chain} listener.`, events, opts }); - await listen(eventMgr, spokePool, events, providers, opts); - } catch (err) { - providers.forEach((provider) => provider.removeAllListeners()); + const providers = getWSProviders(chainId, quorum); + let nProviders = providers.length; + assert(providers.length > 0, `Insufficient providers for ${chain} (required ${quorum} by quorum)`); + providers.forEach((provider) => { + provider._websocket.on("error", (err) => { + const _provider = getOriginFromURL(provider.connection.url); + const at = "RelayerSpokePoolIndexer::run"; + let message = `Caught ${chain} provider error.`; + let log = logger.debug; + if (--nProviders < quorum) { + stop = true; + log = logger.warn; + message += " Insufficient providers to continue."; + } + log({ at, message, provider: _provider, quorum, nProviders, err }); + }); - // @todo: What to do if the websocket drops? Should be able to reconnect? - logger.warn({ + provider._websocket.on("close", () => { + logger.debug({ at: "RelayerSpokePoolIndexer::run", - message: "Caught runtime error.", - err, + message: `${chain} provider connection closed.`, + provider: getOriginFromURL(provider.connection.url), }); - throw err; - } - } while (!stop); + }); + }); + + logger.debug({ at: "RelayerSpokePoolIndexer::run", message: `Starting ${chain} listener.`, events, opts }); + await listen(eventMgr, spokePool, events, providers, opts); } if (require.main === module) {