diff --git a/package.json b/package.json index ab5248503..44d6eeda5 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,7 @@ "dependencies": { "@across-protocol/constants": "^3.1.14", "@across-protocol/contracts": "^3.0.10", - "@across-protocol/sdk": "^3.1.33", + "@across-protocol/sdk": "^3.1.34", "@arbitrum/sdk": "^3.1.3", "@consensys/linea-sdk": "^0.2.1", "@defi-wonderland/smock": "^2.3.5", diff --git a/src/clients/SpokePoolClient.ts b/src/clients/SpokePoolClient.ts index da7f0bdb9..e34ae24a0 100644 --- a/src/clients/SpokePoolClient.ts +++ b/src/clients/SpokePoolClient.ts @@ -260,6 +260,10 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient { } protected async _update(eventsToQuery: string[]): Promise { + if (this.pendingBlockNumber === this.deploymentBlock) { + return { success: false, reason: clients.UpdateFailureReason.NotReady }; + } + // If any events have been removed upstream, remove them first. this.pendingEventsRemoved = this.pendingEventsRemoved.filter((event) => !this.removeEvent(event)); diff --git a/src/relayer/Relayer.ts b/src/relayer/Relayer.ts index f2cff8077..ad02f045a 100644 --- a/src/relayer/Relayer.ts +++ b/src/relayer/Relayer.ts @@ -97,8 +97,9 @@ export class Relayer { /** * @description Perform per-loop updates. + * @return True if all SpokePoolClients updated successfully, otherwise false. */ - async update(): Promise { + async update(): Promise { const { acrossApiClient, configStoreClient, @@ -133,6 +134,8 @@ export class Relayer { inventoryClient.update(this.inventoryChainIds), tokenClient.update(), ]); + + return Object.values(spokePoolClients).every((spokePoolClient) => spokePoolClient.isUpdated); } /** diff --git a/src/relayer/index.ts b/src/relayer/index.ts index 014adc331..ab1dc1b10 100644 --- a/src/relayer/index.ts +++ b/src/relayer/index.ts @@ -1,11 +1,22 @@ import { utils as sdkUtils } from "@across-protocol/sdk"; -import { config, delay, disconnectRedisClients, getCurrentTime, getNetworkName, Signer, winston } from "../utils"; +import { + config, + delay, + disconnectRedisClients, + getCurrentTime, + getNetworkName, + getRedisCache, + Signer, + winston, +} from "../utils"; import { Relayer } from "./Relayer"; import { RelayerConfig } from "./RelayerConfig"; import { constructRelayerClients } from "./RelayerClientHelper"; config(); let logger: winston.Logger; +const ACTIVE_RELAYER_EXPIRY = 600; // 10 minutes. +const { RUN_IDENTIFIER: runIdentifier, BOT_IDENTIFIER: botIdentifier } = process.env; const randomNumber = () => Math.floor(Math.random() * 1_000_000); export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): Promise { @@ -14,8 +25,9 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P logger = _logger; const config = new RelayerConfig(process.env); + const { externalIndexer, pollingDelay, sendingRelaysEnabled, sendingSlowRelaysEnabled } = config; - const loop = config.pollingDelay > 0; + const loop = pollingDelay > 0; let stop = !loop; process.on("SIGHUP", () => { logger.debug({ @@ -25,38 +37,68 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P stop = true; }); + const redis = await getRedisCache(logger); + let activeRelayerUpdated = false; + // Explicitly don't log ignoredAddresses because it can be huge and can overwhelm log transports. const { ignoredAddresses: _ignoredConfig, ...loggedConfig } = config; logger.debug({ at: "Relayer#run", message: "Relayer started 🏃‍♂️", loggedConfig, relayerRun }); const relayerClients = await constructRelayerClients(logger, config, baseSigner); const relayer = new Relayer(await baseSigner.getAddress(), logger, relayerClients, config); - const simulate = !config.sendingRelaysEnabled; - const enableSlowFills = config.sendingSlowRelaysEnabled; + await relayer.init(); - let txnReceipts: { [chainId: number]: Promise }; - let run = 1; + const { spokePoolClients } = relayerClients; + const simulate = !sendingRelaysEnabled; + let txnReceipts: { [chainId: number]: Promise } = {}; try { - do { + for (let run = 1; !stop; ++run) { if (loop) { logger.debug({ at: "relayer#run", message: `Starting relayer execution loop ${run}.` }); } const tLoopStart = performance.now(); + const ready = await relayer.update(); + const activeRelayer = await redis.get(botIdentifier); - await relayer.update(); - txnReceipts = await relayer.checkForUnfilledDepositsAndFill(enableSlowFills, simulate); - await relayer.runMaintenance(); + // If there is another active relayer, allow up to 10 update cycles for this instance to be ready, + // then proceed unconditionally to protect against any RPC outages blocking the relayer. + if (!ready && activeRelayer && run < 10) { + const runTime = Math.round((performance.now() - tLoopStart) / 1000); + const delta = pollingDelay - runTime; + logger.debug({ at: "Relayer#run", message: `Not ready to relay, waiting ${delta} seconds.` }); + await delay(delta); + continue; + } + + // Signal to any existing relayer that a handover is underway, or alternatively + // check for handover initiated by another (newer) relayer instance. + if (loop && botIdentifier && runIdentifier) { + if (activeRelayer !== runIdentifier) { + if (!activeRelayerUpdated) { + await redis.set(botIdentifier, runIdentifier, ACTIVE_RELAYER_EXPIRY); + activeRelayerUpdated = true; + } else { + logger.debug({ at: "Relayer#run", message: `Handing over to ${botIdentifier} instance ${activeRelayer}.` }); + stop = true; + } + } + } + + if (!stop) { + txnReceipts = await relayer.checkForUnfilledDepositsAndFill(sendingSlowRelaysEnabled, simulate); + await relayer.runMaintenance(); + } if (loop) { const runTime = Math.round((performance.now() - tLoopStart) / 1000); logger.debug({ at: "Relayer#run", - message: `Completed relayer execution loop ${run++} in ${runTime} seconds.`, + message: `Completed relayer execution loop ${run} in ${runTime} seconds.`, }); - if (!stop && runTime < config.pollingDelay) { - const delta = config.pollingDelay - runTime; + if (!stop && runTime < pollingDelay) { + const delta = pollingDelay - runTime; logger.debug({ at: "relayer#run", message: `Waiting ${delta} s before next loop.`, @@ -64,7 +106,7 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P await delay(delta); } } - } while (!stop); + } // Before exiting, wait for transaction submission to complete. for (const [chainId, submission] of Object.entries(txnReceipts)) { @@ -80,8 +122,8 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P } finally { await disconnectRedisClients(logger); - if (config.externalIndexer) { - Object.values(relayerClients.spokePoolClients).map((spokePoolClient) => spokePoolClient.stopWorker()); + if (externalIndexer) { + Object.values(spokePoolClients).map((spokePoolClient) => spokePoolClient.stopWorker()); } } diff --git a/yarn.lock b/yarn.lock index 66d05b06c..0dd3b0d60 100644 --- a/yarn.lock +++ b/yarn.lock @@ -50,10 +50,10 @@ axios "^1.6.2" zksync-web3 "^0.14.3" -"@across-protocol/sdk@^3.1.33": - version "3.1.33" - resolved "https://registry.yarnpkg.com/@across-protocol/sdk/-/sdk-3.1.33.tgz#39ccc6df69f8568d77f7dc913632c7e9f4db73ef" - integrity sha512-7qS4MZZ7MHtpFvorSSXOdatTzXMIBLDc8uiKLX0TYhPhtYq9z2tBK+UdstMJ+pzds7JUqOdUasjbWvb837CzmQ== +"@across-protocol/sdk@^3.1.34": + version "3.1.34" + resolved "https://registry.yarnpkg.com/@across-protocol/sdk/-/sdk-3.1.34.tgz#b4f641fea762e8c3d27b344b85ac6351a13e1f49" + integrity sha512-7ryd+gx4I4AZBT3BHbOGZqhdRSFgCtoaiDS8JytqHqyrsoNnE02jOElj1JF4UarhG+ABp/ywbQFXs3Tr64tPvA== dependencies: "@across-protocol/across-token" "^1.0.0" "@across-protocol/constants" "^3.1.14"