Skip to content

Commit

Permalink
refactor(relayer): Rearrange external listener loop (#1766)
Browse files Browse the repository at this point in the history
Rather than trying to recover a websocket error, just log it and try to
keep going. Eventually it will bail; the parent can then be responsible
for restarting it if necessary.
  • Loading branch information
pxrl authored Aug 20, 2024
1 parent ff6939c commit 74d8359
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 37 deletions.
21 changes: 19 additions & 2 deletions src/clients/SpokePoolClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,25 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient {
}

stopWorker(): void {
this.worker.disconnect();
this.worker.kill("SIGKILL");
if (this.worker.connected) {
this.worker.disconnect();
} else {
this.logger.warn({
at: "SpokePoolClient#stopWorker",
message: `Skipped disconnecting on ${this.chain} SpokePool listener (already disconnected).`,
});
}

const { exitCode } = this.worker;
if (exitCode === null) {
this.worker.kill("SIGKILL");
} else {
this.logger.warn({
at: "SpokePoolClient#stopWorker",
message: `Skipped SIGKILL on ${this.chain} SpokePool listener (already exited).`,
exitCode,
});
}
}

/**
Expand Down
60 changes: 25 additions & 35 deletions src/libexec/RelayerSpokePoolIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,44 +288,34 @@ async function run(argv: string[]): Promise<void> {
// Events to listen for.
const events = ["V3FundsDeposited", "RequestedSpeedUpV3Deposit", "FilledV3Relay"];
const eventMgr = new EventManager(logger, chainId, quorum);
do {
let providers: WebSocketProvider[] = [];
try {
providers = getWSProviders(chainId, quorum);
assert(providers.length > 0, `Insufficient providers for ${chain} (required ${quorum} by quorum)`);
providers.forEach((provider) => {
provider._websocket.on("error", (err) => {
logger.debug({
at: "RelayerSpokePoolIndexer::run",
message: `Caught ${chain} provider error.`,
provider: getOriginFromURL(provider.connection.url),
err,
});
});

provider._websocket.on("close", () => {
logger.debug({
at: "RelayerSpokePoolIndexer::run",
message: `${chain} provider connection closed.`,
provider: getOriginFromURL(provider.connection.url),
});
});
});

logger.debug({ at: "RelayerSpokePoolIndexer::run", message: `Starting ${chain} listener.`, events, opts });
await listen(eventMgr, spokePool, events, providers, opts);
} catch (err) {
providers.forEach((provider) => provider.removeAllListeners());
const providers = getWSProviders(chainId, quorum);
let nProviders = providers.length;
assert(providers.length > 0, `Insufficient providers for ${chain} (required ${quorum} by quorum)`);
providers.forEach((provider) => {
provider._websocket.on("error", (err) => {
const _provider = getOriginFromURL(provider.connection.url);
const at = "RelayerSpokePoolIndexer::run";
let message = `Caught ${chain} provider error.`;
let log = logger.debug;
if (--nProviders < quorum) {
stop = true;
log = logger.warn;
message += " Insufficient providers to continue.";
}
log({ at, message, provider: _provider, quorum, nProviders, err });
});

// @todo: What to do if the websocket drops? Should be able to reconnect?
logger.warn({
provider._websocket.on("close", () => {
logger.debug({
at: "RelayerSpokePoolIndexer::run",
message: "Caught runtime error.",
err,
message: `${chain} provider connection closed.`,
provider: getOriginFromURL(provider.connection.url),
});
throw err;
}
} while (!stop);
});
});

logger.debug({ at: "RelayerSpokePoolIndexer::run", message: `Starting ${chain} listener.`, events, opts });
await listen(eventMgr, spokePool, events, providers, opts);
}

if (require.main === module) {
Expand Down

0 comments on commit 74d8359

Please sign in to comment.