Skip to content

Commit

Permalink
improve(relayer): Support dynamic handover in production (#1781)
Browse files Browse the repository at this point in the history
The fast relayer can presently have multiple concurrent instances
for a short period of time because there is no method of coordinating
the "active" relayer. This can cause some collisions when both relayer
instances attempt to fill the same transaction(s), and requires that the
relayer be stopped via a SIGHUP, which is slightly awkward in the
serverless environment.

This change instead uses redis as a coordination mechanism between
relayer instances, such that a new relayer will update a specific redis
record when it is ready to take over from any previous instance.
Likewise, and existing instance will detect when the record is updated
and will gracefully exit thereafter.
  • Loading branch information
pxrl authored Sep 11, 2024
1 parent 8240eb6 commit 7508b76
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 22 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"dependencies": {
"@across-protocol/constants": "^3.1.14",
"@across-protocol/contracts": "^3.0.10",
"@across-protocol/sdk": "^3.1.33",
"@across-protocol/sdk": "^3.1.34",
"@arbitrum/sdk": "^3.1.3",
"@consensys/linea-sdk": "^0.2.1",
"@defi-wonderland/smock": "^2.3.5",
Expand Down
4 changes: 4 additions & 0 deletions src/clients/SpokePoolClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient {
}

protected async _update(eventsToQuery: string[]): Promise<clients.SpokePoolUpdate> {
if (this.pendingBlockNumber === this.deploymentBlock) {
return { success: false, reason: clients.UpdateFailureReason.NotReady };
}

// If any events have been removed upstream, remove them first.
this.pendingEventsRemoved = this.pendingEventsRemoved.filter((event) => !this.removeEvent(event));

Expand Down
5 changes: 4 additions & 1 deletion src/relayer/Relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ export class Relayer {

/**
* @description Perform per-loop updates.
* @return True if all SpokePoolClients updated successfully, otherwise false.
*/
async update(): Promise<void> {
async update(): Promise<boolean> {
const {
acrossApiClient,
configStoreClient,
Expand Down Expand Up @@ -133,6 +134,8 @@ export class Relayer {
inventoryClient.update(this.inventoryChainIds),
tokenClient.update(),
]);

return Object.values(spokePoolClients).every((spokePoolClient) => spokePoolClient.isUpdated);
}

/**
Expand Down
74 changes: 58 additions & 16 deletions src/relayer/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
import { utils as sdkUtils } from "@across-protocol/sdk";
import { config, delay, disconnectRedisClients, getCurrentTime, getNetworkName, Signer, winston } from "../utils";
import {
config,
delay,
disconnectRedisClients,
getCurrentTime,
getNetworkName,
getRedisCache,
Signer,
winston,
} from "../utils";
import { Relayer } from "./Relayer";
import { RelayerConfig } from "./RelayerConfig";
import { constructRelayerClients } from "./RelayerClientHelper";
config();
let logger: winston.Logger;

const ACTIVE_RELAYER_EXPIRY = 600; // 10 minutes.
const { RUN_IDENTIFIER: runIdentifier, BOT_IDENTIFIER: botIdentifier } = process.env;
const randomNumber = () => Math.floor(Math.random() * 1_000_000);

export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): Promise<void> {
Expand All @@ -14,8 +25,9 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P

logger = _logger;
const config = new RelayerConfig(process.env);
const { externalIndexer, pollingDelay, sendingRelaysEnabled, sendingSlowRelaysEnabled } = config;

const loop = config.pollingDelay > 0;
const loop = pollingDelay > 0;
let stop = !loop;
process.on("SIGHUP", () => {
logger.debug({
Expand All @@ -25,46 +37,76 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P
stop = true;
});

const redis = await getRedisCache(logger);
let activeRelayerUpdated = false;

// Explicitly don't log ignoredAddresses because it can be huge and can overwhelm log transports.
const { ignoredAddresses: _ignoredConfig, ...loggedConfig } = config;
logger.debug({ at: "Relayer#run", message: "Relayer started 🏃‍♂️", loggedConfig, relayerRun });
const relayerClients = await constructRelayerClients(logger, config, baseSigner);
const relayer = new Relayer(await baseSigner.getAddress(), logger, relayerClients, config);
const simulate = !config.sendingRelaysEnabled;
const enableSlowFills = config.sendingSlowRelaysEnabled;
await relayer.init();

let txnReceipts: { [chainId: number]: Promise<string[]> };
let run = 1;
const { spokePoolClients } = relayerClients;
const simulate = !sendingRelaysEnabled;
let txnReceipts: { [chainId: number]: Promise<string[]> } = {};

try {
do {
for (let run = 1; !stop; ++run) {
if (loop) {
logger.debug({ at: "relayer#run", message: `Starting relayer execution loop ${run}.` });
}

const tLoopStart = performance.now();
const ready = await relayer.update();
const activeRelayer = await redis.get(botIdentifier);

await relayer.update();
txnReceipts = await relayer.checkForUnfilledDepositsAndFill(enableSlowFills, simulate);
await relayer.runMaintenance();
// If there is another active relayer, allow up to 10 update cycles for this instance to be ready,
// then proceed unconditionally to protect against any RPC outages blocking the relayer.
if (!ready && activeRelayer && run < 10) {
const runTime = Math.round((performance.now() - tLoopStart) / 1000);
const delta = pollingDelay - runTime;
logger.debug({ at: "Relayer#run", message: `Not ready to relay, waiting ${delta} seconds.` });
await delay(delta);
continue;
}

// Signal to any existing relayer that a handover is underway, or alternatively
// check for handover initiated by another (newer) relayer instance.
if (loop && botIdentifier && runIdentifier) {
if (activeRelayer !== runIdentifier) {
if (!activeRelayerUpdated) {
await redis.set(botIdentifier, runIdentifier, ACTIVE_RELAYER_EXPIRY);
activeRelayerUpdated = true;
} else {
logger.debug({ at: "Relayer#run", message: `Handing over to ${botIdentifier} instance ${activeRelayer}.` });
stop = true;
}
}
}

if (!stop) {
txnReceipts = await relayer.checkForUnfilledDepositsAndFill(sendingSlowRelaysEnabled, simulate);
await relayer.runMaintenance();
}

if (loop) {
const runTime = Math.round((performance.now() - tLoopStart) / 1000);
logger.debug({
at: "Relayer#run",
message: `Completed relayer execution loop ${run++} in ${runTime} seconds.`,
message: `Completed relayer execution loop ${run} in ${runTime} seconds.`,
});

if (!stop && runTime < config.pollingDelay) {
const delta = config.pollingDelay - runTime;
if (!stop && runTime < pollingDelay) {
const delta = pollingDelay - runTime;
logger.debug({
at: "relayer#run",
message: `Waiting ${delta} s before next loop.`,
});
await delay(delta);
}
}
} while (!stop);
}

// Before exiting, wait for transaction submission to complete.
for (const [chainId, submission] of Object.entries(txnReceipts)) {
Expand All @@ -80,8 +122,8 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P
} finally {
await disconnectRedisClients(logger);

if (config.externalIndexer) {
Object.values(relayerClients.spokePoolClients).map((spokePoolClient) => spokePoolClient.stopWorker());
if (externalIndexer) {
Object.values(spokePoolClients).map((spokePoolClient) => spokePoolClient.stopWorker());
}
}

Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@
axios "^1.6.2"
zksync-web3 "^0.14.3"

"@across-protocol/sdk@^3.1.33":
version "3.1.33"
resolved "https://registry.yarnpkg.com/@across-protocol/sdk/-/sdk-3.1.33.tgz#39ccc6df69f8568d77f7dc913632c7e9f4db73ef"
integrity sha512-7qS4MZZ7MHtpFvorSSXOdatTzXMIBLDc8uiKLX0TYhPhtYq9z2tBK+UdstMJ+pzds7JUqOdUasjbWvb837CzmQ==
"@across-protocol/sdk@^3.1.34":
version "3.1.34"
resolved "https://registry.yarnpkg.com/@across-protocol/sdk/-/sdk-3.1.34.tgz#b4f641fea762e8c3d27b344b85ac6351a13e1f49"
integrity sha512-7ryd+gx4I4AZBT3BHbOGZqhdRSFgCtoaiDS8JytqHqyrsoNnE02jOElj1JF4UarhG+ABp/ywbQFXs3Tr64tPvA==
dependencies:
"@across-protocol/across-token" "^1.0.0"
"@across-protocol/constants" "^3.1.14"
Expand Down

0 comments on commit 7508b76

Please sign in to comment.