Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/performance tracking #1896

Merged
merged 30 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
50d05f0
add profiler class
gsteenkamp89 Nov 6, 2024
b2fa955
refine profiler class, add comments
gsteenkamp89 Nov 7, 2024
a094501
ditch observer
gsteenkamp89 Nov 7, 2024
25f02b1
measure synchronous task
gsteenkamp89 Nov 7, 2024
cf64f8d
use singleton
gsteenkamp89 Nov 7, 2024
6b1a627
profile inventory client
gsteenkamp89 Nov 7, 2024
444445c
mimic node performance api, use marks and measure
gsteenkamp89 Nov 8, 2024
14b7e93
inventory client
gsteenkamp89 Nov 8, 2024
4b412f9
update data worker & finalizer
gsteenkamp89 Nov 8, 2024
5f254b0
extend logger with default metadata
gsteenkamp89 Nov 8, 2024
4c59dd8
evm utils
gsteenkamp89 Nov 8, 2024
ca935bf
update dataworker, realyer to use profiler
gsteenkamp89 Nov 8, 2024
b8de9cc
require logger
gsteenkamp89 Nov 8, 2024
c16459b
Merge branch 'master' into feat/performance-tracking
gsteenkamp89 Nov 8, 2024
42e41ee
move and refactor profiler
gsteenkamp89 Nov 14, 2024
20d3939
fixup
gsteenkamp89 Nov 14, 2024
d2a4480
update comments
gsteenkamp89 Nov 18, 2024
27938a6
Merge branch 'master' into feat/performance-tracking
gsteenkamp89 Nov 18, 2024
3c9755b
use profiler from sdk
gsteenkamp89 Nov 19, 2024
0beef74
Update package.json
gsteenkamp89 Nov 19, 2024
d0ebb83
import from sdkUtils.ts
gsteenkamp89 Nov 19, 2024
60152cf
use simplified api for simple measure
gsteenkamp89 Nov 19, 2024
669707a
refactor
gsteenkamp89 Nov 19, 2024
c4e406a
Merge remote-tracking branch 'origin/master' into feat/performance-tr…
pxrl Nov 22, 2024
69f430f
Bump sdk
pxrl Nov 26, 2024
7f031e2
Undo yarn.lock diffs
pxrl Nov 26, 2024
5a1bf55
Fix yarn.lock
pxrl Nov 26, 2024
2014aa1
Add Relayer & TokenClient
pxrl Nov 26, 2024
e4c9910
Update src/relayer/index.ts
pxrl Nov 26, 2024
cc1637c
lint
pxrl Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"dependencies": {
"@across-protocol/constants": "^3.1.19",
"@across-protocol/contracts": "^3.0.16",
"@across-protocol/sdk": "^3.2.13",
"@across-protocol/sdk": "^3.2.15",
"@arbitrum/sdk": "^3.1.3",
"@consensys/linea-sdk": "^0.2.1",
"@defi-wonderland/smock": "^2.3.5",
Expand Down
41 changes: 24 additions & 17 deletions src/clients/InventoryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
assert,
compareAddressesSimple,
getUsdcSymbol,
Profiler,
getNativeTokenSymbol,
} from "../utils";
import { HubPoolClient, TokenClient, BundleDataClient } from ".";
Expand Down Expand Up @@ -298,13 +299,17 @@ export class InventoryClient {
// Return the upcoming refunds (in pending and next bundles) on each chain.
async getBundleRefunds(l1Token: string): Promise<{ [chainId: string]: BigNumber }> {
let refundsToConsider: CombinedRefunds[] = [];

const taskProfiler = new Profiler({
logger: this.logger,
at: "InventoryClient::getBundleRefunds",
});
const A = taskProfiler.start("A", {
l1Token,
});
pxrl marked this conversation as resolved.
Show resolved Hide resolved
// Increase virtual balance by pending relayer refunds from the latest valid bundle and the
// upcoming bundle. We can assume that all refunds from the second latest valid bundle have already
// been executed.
let startTimer: number;
if (!isDefined(this.bundleRefundsPromise)) {
startTimer = performance.now();
pxrl marked this conversation as resolved.
Show resolved Hide resolved
// @dev Save this as a promise so that other parallel calls to this function don't make the same call.
this.bundleRefundsPromise = this.getAllBundleRefunds();
}
Expand All @@ -327,12 +332,11 @@ export class InventoryClient {
},
{}
);
if (startTimer) {
this.log(`Time taken to get bundle refunds: ${Math.round((performance.now() - startTimer) / 1000)}s`, {
l1Token,
totalRefundsPerChain,
});
}

A.stop({
message: "Time to calculate total refunds per chain",
l1Token,
});
return totalRefundsPerChain;
}

Expand Down Expand Up @@ -616,9 +620,14 @@ export class InventoryClient {
l1Token: string,
chainsToEvaluate: number[]
): Promise<{ [chainId: number]: BigNumber }> {
const taskProfiler = new Profiler({
logger: this.logger,
at: "InventoryClient::getLatestRunningBalances",
});
const { root: latestPoolRebalanceRoot, blockRanges } = await this.bundleDataClient.getLatestPoolRebalanceRoot();
const chainIds = this.hubPoolClient.configStoreClient.getChainIdIndicesForBlock();
const start = performance.now();

taskProfiler.mark("start");
const runningBalances = Object.fromEntries(
await sdkUtils.mapAsync(chainsToEvaluate, async (chainId) => {
const chainIdIndex = chainIds.indexOf(chainId);
Expand Down Expand Up @@ -674,13 +683,11 @@ export class InventoryClient {
];
})
);
this.log(
`Approximated latest (abs. val) running balance for ORU chains for token ${l1Token} in ${
Math.round(performance.now() - start) / 1000
}s`,
{ runningBalances }
);

taskProfiler.measure("getLatestRunningBalances", {
from: "start",
message: "Time to get running balances",
runningBalances,
});
return Object.fromEntries(Object.entries(runningBalances).map(([k, v]) => [k, v.absLatestRunningBalance]));
}

Expand Down
14 changes: 9 additions & 5 deletions src/dataworker/DataworkerUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
getTimestampsForBundleEndBlocks,
isDefined,
MerkleTree,
Profiler,
TOKEN_SYMBOLS_MAP,
winston,
} from "../utils";
Expand Down Expand Up @@ -342,7 +343,11 @@ export async function persistDataToArweave(
logger: winston.Logger,
tag?: string
): Promise<void> {
const startTime = performance.now();
const taskProfiler = new Profiler({
logger,
at: "DataworkerUtils#persistDataToArweave",
});
taskProfiler.mark("A");
// Check if data already exists on Arweave with the given tag.
// If so, we don't need to persist it again.
const [matchingTxns, address, balance] = await Promise.all([
Expand Down Expand Up @@ -389,10 +394,9 @@ export async function persistDataToArweave(
balance: formatWinston(balance),
notificationPath: "across-arweave",
});
const endTime = performance.now();
logger.debug({
at: "Dataworker#index",
message: `Time to persist data to Arweave: ${endTime - startTime}ms`,
taskProfiler.measure("Arweave Persist", {
from: "A",
message: "Time to persist to Arweave",
});
}
}
51 changes: 32 additions & 19 deletions src/dataworker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Signer,
disconnectRedisClients,
isDefined,
Profiler,
} from "../utils";
import { spokePoolClientsToProviders } from "../common";
import { Dataworker } from "./Dataworker";
Expand Down Expand Up @@ -52,21 +53,29 @@ export async function createDataworker(
dataworker,
};
}

export async function runDataworker(_logger: winston.Logger, baseSigner: Signer): Promise<void> {
logger = _logger;
let loopStart = performance.now();
const { clients, config, dataworker } = await createDataworker(logger, baseSigner);
logger.debug({
const taskProfiler = new Profiler({
at: "Dataworker#index",
message: `Time to update non-spoke clients: ${(performance.now() - loopStart) / 1000}s`,
logger: _logger,
});
loopStart = performance.now();
logger = _logger;

const { clients, config, dataworker } = await taskProfiler.measureAsync(
createDataworker(logger, baseSigner),
"createDataworker",
{
message: "Time to update non-spoke clients",
}
);

let proposedBundleData: BundleData | undefined = undefined;
let poolRebalanceLeafExecutionCount = 0;
try {
logger[startupLogLevel(config)]({ at: "Dataworker#index", message: "Dataworker started 👩‍🔬", config });

for (;;) {
taskProfiler.mark("loopStart");
// Determine the spoke client's lookback:
// 1. We initiate the spoke client event search windows based on a start bundle's bundle block end numbers and
// how many bundles we want to look back from the start bundle blocks.
Expand Down Expand Up @@ -108,7 +117,7 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer)
fromBlocks,
toBlocks
);
const dataworkerFunctionLoopTimerStart = performance.now();
taskProfiler.mark("dataworkerFunctionLoopTimerStart");
// Validate and dispute pending proposal before proposing a new one
if (config.disputerEnabled) {
await dataworker.validatePendingRootBundle(
Expand Down Expand Up @@ -191,19 +200,23 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer)
} else {
await clients.multiCallerClient.executeTxnQueues();
}

const dataworkerFunctionLoopTimerEnd = performance.now();
logger.debug({
at: "Dataworker#index",
message: `Time to update spoke pool clients and run dataworker function: ${Math.round(
(dataworkerFunctionLoopTimerEnd - loopStart) / 1000
)}s`,
timeToLoadSpokes: Math.round((dataworkerFunctionLoopTimerStart - loopStart) / 1000),
timeToRunDataworkerFunctions: Math.round(
(dataworkerFunctionLoopTimerEnd - dataworkerFunctionLoopTimerStart) / 1000
),
taskProfiler.mark("dataworkerFunctionLoopTimerEnd");
taskProfiler.measure("timeToLoadSpokes", {
message: "Time to load spokes in data worker loop",
from: "loopStart",
to: "dataworkerFunctionLoopTimerStart",
});
taskProfiler.measure("timeToRunDataworkerFunctions", {
message: "Time to run data worker functions in data worker loop",
from: "dataworkerFunctionLoopTimerStart",
to: "dataworkerFunctionLoopTimerEnd",
});
// do we need to add an additional log for the sum of the previous?
taskProfiler.measure("dataWorkerTotal", {
message: "Total time taken for dataworker loop",
from: "loopStart",
to: "dataworkerFunctionLoopTimerEnd",
});
loopStart = performance.now();

if (await processEndPollingLoop(logger, "Dataworker", config.pollingDelay)) {
break;
Expand Down
35 changes: 27 additions & 8 deletions src/finalizer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
startupLogLevel,
winston,
CHAIN_IDs,
Profiler,
} from "../utils";
import { ChainFinalizer, CrossChainMessage } from "./types";
import {
Expand Down Expand Up @@ -471,17 +472,23 @@ export class FinalizerConfig extends DataworkerConfig {

export async function runFinalizer(_logger: winston.Logger, baseSigner: Signer): Promise<void> {
logger = _logger;

// Same config as Dataworker for now.
const config = new FinalizerConfig(process.env);
const taskProfiler = new Profiler({
logger,
at: "Finalizer#index",
config,
});

logger[startupLogLevel(config)]({ at: "Finalizer#index", message: "Finalizer started 🏋🏿‍♀️", config });
const { commonClients, spokePoolClients } = await constructFinalizerClients(logger, config, baseSigner);

try {
for (;;) {
const loopStart = performance.now();
taskProfiler.mark("loopStart");
await updateSpokePoolClients(spokePoolClients, ["TokensBridged"]);
const loopStartPostSpokePoolUpdates = performance.now();
taskProfiler.mark("loopStartPostSpokePoolUpdates");

if (config.finalizerEnabled) {
const availableChains = commonClients.configStoreClient
Expand All @@ -501,13 +508,25 @@ export async function runFinalizer(_logger: winston.Logger, baseSigner: Signer):
} else {
logger[startupLogLevel(config)]({ at: "Dataworker#index", message: "Finalizer disabled" });
}
const loopEndPostFinalizations = performance.now();

logger.debug({
at: "Finalizer#index",
message: `Time to loop: ${Math.round((loopEndPostFinalizations - loopStart) / 1000)}s`,
timeToUpdateSpokeClients: Math.round((loopStartPostSpokePoolUpdates - loopStart) / 1000),
timeToFinalize: Math.round((loopEndPostFinalizations - loopStartPostSpokePoolUpdates) / 1000),
taskProfiler.mark("loopEndPostFinalizations");

taskProfiler.measure("timeToUpdateSpokeClients", {
from: "loopStart",
to: "loopStartPostSpokePoolUpdates",
strategy: config.finalizationStrategy,
});

taskProfiler.measure("timeToFinalize", {
from: "loopStartPostSpokePoolUpdates",
to: "loopEndPostFinalizations",
strategy: config.finalizationStrategy,
});

taskProfiler.measure("loopTime", {
message: "Time to loop",
from: "loopStart",
to: "loopEndPostFinalizations",
strategy: config.finalizationStrategy,
});

Expand Down
20 changes: 13 additions & 7 deletions src/libexec/util/evm/util.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from "assert";
import { Contract, EventFilter } from "ethers";
import { getNetworkName, isDefined, paginatedEventQuery, winston } from "../../../utils";
import { getNetworkName, isDefined, paginatedEventQuery, Profiler, winston } from "../../../utils";
import { Log, ScraperOpts } from "../../types";

/**
Expand Down Expand Up @@ -45,8 +45,12 @@ export async function scrapeEvents(
spokePool: Contract,
eventName: string,
opts: ScraperOpts & { toBlock: number },
logger: winston.Logger
logger?: winston.Logger
): Promise<Log[]> {
const taskProfiler = new Profiler({
logger,
at: "scrapeEvents",
});
const { lookback, deploymentBlock, filterArgs, maxBlockRange, toBlock } = opts;
const { chainId } = await spokePool.provider.getNetwork();
const chain = getNetworkName(chainId);
Expand All @@ -55,13 +59,15 @@ export async function scrapeEvents(
assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`);
const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange };

const tStart = performance.now();
taskProfiler.mark("A");
const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]);
const events = await paginatedEventQuery(spokePool, filter, searchConfig);
const tStop = performance.now();
logger.debug({
at: "scrapeEvents",
message: `Scraped ${events.length} ${chain} ${eventName} events in ${Math.round((tStop - tStart) / 1000)} seconds`,
taskProfiler.measure("paginatedEventQuery", {
from: "A",
message: `Scraped ${events.length} ${chain} ${eventName} events.`,
numEvents: events.length,
chain,
eventName,
searchConfig,
});

Expand Down
20 changes: 14 additions & 6 deletions src/relayer/Relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
fixedPointAdjustment,
TransactionResponse,
ZERO_ADDRESS,
Profiler,
} from "../utils";
import { RelayerClients } from "./RelayerClientHelper";
import { RelayerConfig } from "./RelayerConfig";
Expand Down Expand Up @@ -1028,13 +1029,17 @@ export class Relayer {
repaymentChainId?: number;
repaymentChainProfitability: RepaymentChainProfitability;
}> {
const taskProfiler = new Profiler({
at: "Relayer::resolveRepaymentChain",
logger: this.logger,
});
const { inventoryClient, profitClient } = this.clients;
const { depositId, originChainId, destinationChainId, inputAmount, outputAmount, transactionHash, fromLiteChain } =
deposit;
const originChain = getNetworkName(originChainId);
const destinationChain = getNetworkName(destinationChainId);

const start = performance.now();
taskProfiler.mark("A");
const preferredChainIds = await inventoryClient.determineRefundChainId(deposit, hubPoolToken.address);
if (preferredChainIds.length === 0) {
// @dev If the origin chain is a lite chain and there are no preferred repayment chains, then we can assume
Expand All @@ -1058,14 +1063,17 @@ export class Relayer {
};
}

this.logger.debug({
at: "Relayer::resolveRepaymentChain",
taskProfiler.measure("Resolve Repayment Chains", {
from: "A",
message: `Determined eligible repayment chains ${JSON.stringify(
preferredChainIds
)} for deposit ${depositId} from ${originChain} to ${destinationChain} in ${
Math.round(performance.now() - start) / 1000
}s.`,
)} for deposit ${depositId} from ${originChain} to ${destinationChain}.`,
preferredChainIds,
depositId,
originChain,
destinationChain,
});

const _repaymentFees = preferredChainIds.map((chainId) =>
repaymentFees.find(({ paymentChainId }) => paymentChainId === chainId)
);
Expand Down
Loading
Loading