Skip to content

Commit

Permalink
optimize routes and db
Browse files Browse the repository at this point in the history
  • Loading branch information
vrtnd committed Dec 1, 2024
1 parent 4d8c0c4 commit 96beeea
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 73 deletions.
44 changes: 40 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"@types/js-yaml": "^4.0.9",
"@types/node": "^18.19.67",
"@types/node-fetch": "^2.6.2",
"@types/object-hash": "^3.0.6",
"@types/retry": "^0.12.5",
"eslint-config-prettier": "^8.3.0",
"prettier": "^2.5.1",
Expand Down Expand Up @@ -49,7 +50,9 @@
"fastify": "^4.26.2",
"graphql": "^16.0.0",
"graphql-request": "^6.1.0",
"node-cache": "^5.1.2",
"node-fetch": "^2.6.7",
"object-hash": "^3.0.0",
"postgres": "^3.2.4",
"tron-format-address": "^0.1.11",
"tsx": "^4.7.1",
Expand Down
20 changes: 20 additions & 0 deletions sql/data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,23 @@ CREATE TABLE IF NOT EXISTS bridges.errors (
);

CREATE INDEX IF NOT EXISTS errors_ts ON bridges.errors (ts);

CREATE TABLE IF NOT EXISTS bridges.daily_volume (
id INT GENERATED ALWAYS AS IDENTITY,
bridge_id uuid NOT NULL,
ts TIMESTAMPTZ NOT NULL,
total_deposited_usd NUMERIC,
total_withdrawn_usd NUMERIC,
total_deposit_txs INTEGER,
total_withdrawal_txs INTEGER,
chain VARCHAR NOT NULL,
PRIMARY KEY(id),
UNIQUE (bridge_id, ts, chain),
CONSTRAINT fk_bridge_id
FOREIGN KEY(bridge_id)
REFERENCES bridges.config(id)
ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS daily_volume_ts ON bridges.daily_volume (ts);
CREATE INDEX IF NOT EXISTS daily_volume_chain ON bridges.daily_volume (chain);
10 changes: 7 additions & 3 deletions src/handlers/getBridges.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,16 @@ const getBridges = async () => {
};

const handler = async (event: AWSLambda.APIGatewayEvent): Promise<IResponse> => {
const bridges = await getBridges();
const includeChains = event.queryStringParameters?.includeChains === "true";
const promises = [getBridges()];
if (includeChains) {
promises.push(craftBridgeChainsResponse());
}
const [bridges, chainData] = await Promise.all(promises);
let response: any = {
bridges: bridges,
};
if (event.queryStringParameters?.includeChains === "true") {
const chainData = await craftBridgeChainsResponse();
if (includeChains) {
response.chains = chainData;
}
return successResponse(response, 10 * 60); // 10 mins cache
Expand Down
5 changes: 5 additions & 0 deletions src/server/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { runAllAdapters } from "./jobs/runAllAdapters";
import { runAggregateAllAdapters } from "./jobs/runAggregateAllAdapter";
import { runAdaptersFromTo } from "./jobs/runAdaptersFromTo";
import { handler as runWormhole } from "../handlers/runWormhole";
import { aggregateDailyVolume } from "./jobs/aggregateDailyVolume";

const createTimeout = (minutes: number) =>
new Promise((_, reject) =>
Expand Down Expand Up @@ -39,6 +40,10 @@ const cron = () => {
new CronJob("0 * * * *", async () => {
await withTimeout(runWormhole(), 30);
}).start();

new CronJob("0 * * * *", async () => {
await withTimeout(aggregateDailyVolume(), 10);
}).start();
};

export default cron;
12 changes: 10 additions & 2 deletions src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import getTransactions from "../handlers/getTransactions";
import runAdapter from "../handlers/runAdapter";
import getBridgeStatsOnDay from "../handlers/getBridgeStatsOnDay";
import cron from "./cron";
import { generateApiCacheKey, cache } from "../utils/cache";

dotenv.config();

Expand Down Expand Up @@ -39,6 +40,12 @@ const lambdaToFastify = (handler: Function) => async (request: any, reply: any)
body: request.body,
};

const cacheKey = generateApiCacheKey(event);
const cachedData = cache.get(cacheKey);
if (cachedData) {
return reply.code(200).send(cachedData);
}

try {
const timeout = new Promise((_, reject) => {
setTimeout(() => {
Expand All @@ -47,8 +54,9 @@ const lambdaToFastify = (handler: Function) => async (request: any, reply: any)
});

const result = await Promise.race([handler(event), timeout]);

return reply.code(result.statusCode).send(JSON.parse(result.body));
const parsedBody = JSON.parse(result.body);
cache.set(cacheKey, parsedBody);
return reply.code(result.statusCode).send(parsedBody);
} catch (error: any) {
request.log.error(error);

Expand Down
49 changes: 49 additions & 0 deletions src/server/jobs/aggregateDailyVolume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { sql } from "../../utils/db";

async function aggregateDailyVolume() {
try {
await sql`
INSERT INTO bridges.daily_volume (
bridge_id,
ts,
total_deposited_usd,
total_withdrawn_usd,
total_deposit_txs,
total_withdrawal_txs,
chain
)
SELECT
ha.bridge_id,
date_trunc('day', ha.ts) as ts,
CAST(SUM(ha.total_deposited_usd) AS NUMERIC) as total_deposited_usd,
CAST(SUM(ha.total_withdrawn_usd) AS NUMERIC) as total_withdrawn_usd,
CAST(SUM(ha.total_deposit_txs) AS INTEGER) as total_deposit_txs,
CAST(SUM(ha.total_withdrawal_txs) AS INTEGER) as total_withdrawal_txs,
c.chain
FROM bridges.hourly_aggregated ha
JOIN bridges.config c ON ha.bridge_id = c.id
WHERE
ha.ts < DATE_TRUNC('day', NOW())
AND (ha.total_deposited_usd IS NOT NULL AND ha.total_deposited_usd::text ~ '^[0-9]+(\.[0-9]+)?$')
AND (ha.total_withdrawn_usd IS NOT NULL AND ha.total_withdrawn_usd::text ~ '^[0-9]+(\.[0-9]+)?$')
AND (ha.total_deposit_txs IS NOT NULL AND ha.total_deposit_txs::text ~ '^[0-9]+$')
AND (ha.total_withdrawal_txs IS NOT NULL AND ha.total_withdrawal_txs::text ~ '^[0-9]+$')
GROUP BY
ha.bridge_id,
date_trunc('day', ha.ts),
c.chain
ON CONFLICT (bridge_id, ts, chain) DO UPDATE SET
total_deposited_usd = EXCLUDED.total_deposited_usd,
total_withdrawn_usd = EXCLUDED.total_withdrawn_usd,
total_deposit_txs = EXCLUDED.total_deposit_txs,
total_withdrawal_txs = EXCLUDED.total_withdrawal_txs;
`;

console.log("Daily volume migration completed successfully");
} catch (error) {
console.error("Error during daily volume migration:", error);
throw error;
}
}

export { aggregateDailyVolume };
41 changes: 17 additions & 24 deletions src/utils/bridgeVolume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export const getDailyBridgeVolume = async (
) => {
let bridgeDbName = undefined as any;
if (bridgeNetworkId) {
const bridgeNetwork = importBridgeNetwork(undefined, bridgeNetworkId)
const bridgeNetwork = importBridgeNetwork(undefined, bridgeNetworkId);
if (!bridgeNetwork) {
throw new Error("Invalid bridgeNetworkId entered for getting daily bridge volume.");
}
Expand All @@ -67,24 +67,20 @@ export const getDailyBridgeVolume = async (
});
}
let sourceChainsHistoricalDailyData = [] as IAggregatedData[];
await Promise.all(
sourceChainConfigs.map(async (config) => {
const sourceChainHistoricalData = await queryAggregatedDailyTimestampRange(
dailyStartTimestamp,
dailyEndTimestamp,
config.chain,
config.bridge_name
);
sourceChainsHistoricalDailyData = [...sourceChainHistoricalData, ...sourceChainsHistoricalDailyData];
})
);

const historicalDailyData = await queryAggregatedDailyTimestampRange(
dailyStartTimestamp,
dailyEndTimestamp,
chain,
bridgeDbName
);
const [_, historicalDailyData] = await Promise.all([
Promise.all(
sourceChainConfigs.map(async (config) => {
const sourceChainHistoricalData = await queryAggregatedDailyTimestampRange(
dailyStartTimestamp,
dailyEndTimestamp,
config.chain,
config.bridge_name
);
return sourceChainHistoricalData;
})
),
queryAggregatedDailyTimestampRange(dailyStartTimestamp, dailyEndTimestamp, chain, bridgeDbName),
]);

// this 'currentDay' idea doesn't work great, can re-think and re-write
/*
Expand Down Expand Up @@ -127,8 +123,6 @@ export const getDailyBridgeVolume = async (
historicalDailySums[timestamp].depositTxs = (historicalDailySums[timestamp].depositTxs ?? 0) + total_deposit_txs;
historicalDailySums[timestamp].withdrawTxs =
(historicalDailySums[timestamp].withdrawTxs ?? 0) + total_withdrawal_txs;


});
// the deposits and withdrawals are swapped here
sourceChainsHistoricalDailyData.map((dailyData) => {
Expand Down Expand Up @@ -226,7 +220,7 @@ export const getHourlyBridgeVolume = async (
) => {
let bridgeDbName = undefined as any;
if (bridgeNetworkId) {
const bridgeNetwork = importBridgeNetwork(undefined, bridgeNetworkId)
const bridgeNetwork = importBridgeNetwork(undefined, bridgeNetworkId);
if (!bridgeNetwork) {
throw new Error("Invalid bridgeNetworkId entered for getting daily bridge volume.");
}
Expand Down Expand Up @@ -264,7 +258,7 @@ export const getHourlyBridgeVolume = async (
chain,
bridgeDbName
);

let historicalHourlySums = {} as { [timestamp: string]: any };
historicalHourlyData.map((hourlyData) => {
const { bridge_id, ts, total_deposited_usd, total_withdrawn_usd, total_deposit_txs, total_withdrawal_txs } =
Expand All @@ -280,7 +274,6 @@ export const getHourlyBridgeVolume = async (
(historicalHourlySums[timestamp].withdrawTxs ?? 0) + total_withdrawal_txs;

// doubling volume for chains with a destination chain (those that only have 1 aggregated entry for entire bridgeNetwork)

});
// the deposits and withdrawals are swapped here
sourceChainsHourlyData.map((hourlyData) => {
Expand Down
35 changes: 35 additions & 0 deletions src/utils/cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import NodeCache from "node-cache";
import hash from "object-hash";

export const cache = new NodeCache({
stdTTL: 600,
checkperiod: 120,
maxKeys: 10000,
useClones: false,
deleteOnExpire: true,
});

interface APIEvent {
pathParameters?: Record<string, any>;
queryStringParameters?: Record<string, any>;
body?: any;
}

export const generateApiCacheKey = (event: APIEvent): string => {
const eventToNormalize = {
path: event.pathParameters || {},
query: event.queryStringParameters || {},
body: event.body || {},
};

return hash(eventToNormalize, {
algorithm: "sha256",
encoding: "hex",
unorderedArrays: true,
unorderedObjects: true,
}).substring(0, 16);
};

export const getCacheKey = (...parts: (string | undefined)[]) => parts.filter(Boolean).join(":");

export const DEFAULT_TTL = 600;
6 changes: 5 additions & 1 deletion src/utils/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const connectionString =
process.env.DB_URL ??
`postgresql://${process.env.PSQL_USERNAME}:${process.env.PSQL_PW}@${process.env.PSQL_URL}:9004/cle37p03g00dhd6lff4ch90qw`;

const sql = postgres(connectionString, { idle_timeout: 200, max_lifetime: 60 * 5, max: 25 });
const sql = postgres(connectionString, {
idle_timeout: 20,
max_lifetime: 60 * 30,
max: 10,
});

export { sql };
Loading

0 comments on commit 96beeea

Please sign in to comment.