Skip to content

Commit

Permalink
optimize wormhole job
Browse files Browse the repository at this point in the history
  • Loading branch information
vrtnd committed Feb 5, 2025
1 parent 06e0815 commit 836b496
Showing 1 changed file with 69 additions and 41 deletions.
110 changes: 69 additions & 41 deletions src/handlers/runWormhole.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,67 @@ import { insertTransactionRow } from "../utils/wrappa/postgres/write";
import { getBridgeID } from "../utils/wrappa/postgres/query";
import dayjs from "dayjs";
import { insertConfigEntriesForAdapter } from "../utils/adapter";
import { cache, getCacheKey } from "../utils/cache";

const WORMHOLE_CACHE_PREFIX = "wormhole";
const LAST_PROCESSED_TS_KEY = `${WORMHOLE_CACHE_PREFIX}:last_processed_ts`;

export const handler = async () => {
try {
await insertConfigEntriesForAdapter(adapter, "wormhole");
const startTs = dayjs().subtract(12, "hour").unix();

const lastProcessedTs = cache.get(LAST_PROCESSED_TS_KEY) || dayjs().subtract(1, "day").unix();
const endTs = dayjs().unix();
const bridgeIds = Object.fromEntries(
await Promise.all(
Object.keys(adapter).map(async (chain) => {
chain = chain.toLowerCase();
const bridgeId = await getBridgeID("wormhole", chain);
return [chain, bridgeId?.id];
})
)
);
console.log(
`Running Wormhole adapter for ${startTs} (${dayjs
.unix(startTs)
.format("YYYY-MM-DD HH:mm:ss")}) to ${endTs} (${dayjs.unix(endTs).format("YYYY-MM-DD HH:mm:ss")})`
);
const events = await fetchWormholeEvents(startTs, endTs);
const BATCH_SIZE = 500;

const processBatch = async (sql: any, batch: any[]) => {

const MAX_TIME_CHUNK = 60 * 60 * 24;
const timeGap = endTs - lastProcessedTs;

if (timeGap > MAX_TIME_CHUNK) {
for (let chunkStart = lastProcessedTs; chunkStart < endTs; chunkStart += MAX_TIME_CHUNK) {
const chunkEnd = Math.min(chunkStart + MAX_TIME_CHUNK, endTs);
await processTimeRange(chunkStart, chunkEnd);
}
} else {
await processTimeRange(lastProcessedTs, endTs);
}

cache.set(LAST_PROCESSED_TS_KEY, endTs);
} catch (error) {
console.error("Error processing Wormhole events:", error);
throw error;
}
};

const processTimeRange = async (startTs: number, endTs: number) => {
const bridgeIds = Object.fromEntries(
await Promise.all(
Object.keys(adapter).map(async (chain) => {
chain = chain.toLowerCase();
const bridgeId = await getBridgeID("wormhole", chain);
return [chain, bridgeId?.id];
})
)
);

console.log(
`Running Wormhole adapter for ${startTs} (${dayjs.unix(startTs).format("YYYY-MM-DD HH:mm:ss")}) to ${endTs} (${dayjs
.unix(endTs)
.format("YYYY-MM-DD HH:mm:ss")})`
);

const cacheKey = getCacheKey(WORMHOLE_CACHE_PREFIX, startTs.toString(), endTs.toString());
let events = cache.get(cacheKey);

if (!events) {
events = await fetchWormholeEvents(startTs, endTs);
cache.set(cacheKey, events, { ttl: 60 * 60 * 1000 });
}

const BATCH_SIZE = 500;

await sql.begin(async (sql) => {
for (let i = 0; i < events.length; i += BATCH_SIZE) {
const batch = events.slice(i, i + BATCH_SIZE);
const insertPromises: Promise<void>[] = [];

for (const event of batch) {
Expand All @@ -42,6 +80,11 @@ export const handler = async () => {
destination_chain,
} = event;

const txCacheKey = getCacheKey(WORMHOLE_CACHE_PREFIX, "tx", transaction_hash);
if (cache.has(txCacheKey)) {
continue;
}

const sourceChain = normalizeChainName(source_chain);
const destinationChain = normalizeChainName(destination_chain);

Expand Down Expand Up @@ -102,31 +145,16 @@ export const handler = async () => {
console.error(`Error inserting Wormhole event: ${error}`, event);
}
}
}

await Promise.all(insertPromises);
console.log(`Inserted ${insertPromises.length} of ${events.length} Wormhole events`);
};

let start = 0;
let end = events.length;

while (start < end) {
await sql.begin(async (sql) => {
await processBatch(sql, events.slice(start, start + BATCH_SIZE));
});

await sql.begin(async (sql) => {
await processBatch(sql, events.slice(end - BATCH_SIZE, end));
});
cache.set(txCacheKey, true, { ttl: 24 * 60 * 60 * 1000 });
}

start += BATCH_SIZE;
end -= BATCH_SIZE;
if (insertPromises.length > 0) {
await Promise.all(insertPromises);
console.log(`Inserted ${insertPromises.length} Wormhole events (batch ${i / BATCH_SIZE + 1})`);
}
}
} catch (error) {
console.error("Error processing Wormhole events:", error);
throw error;
}
});
};

export default wrapScheduledLambda(handler);

0 comments on commit 836b496

Please sign in to comment.