From 196dc84fbd24baf5fadf7ed5ec60f328d14da365 Mon Sep 17 00:00:00 2001 From: vincentwschau <99756290+vincentwschau@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:24:46 -0500 Subject: [PATCH] Improve query to find candles map. (#2650) --- .../postgres/src/stores/candle-table.ts | 86 +++++++++++++------ .../__tests__/lib/candles-generator.test.ts | 36 ++++---- .../services/ender/src/caches/block-cache.ts | 5 ++ .../services/ender/src/caches/candle-cache.ts | 2 - 4 files changed, 83 insertions(+), 46 deletions(-) diff --git a/indexer/packages/postgres/src/stores/candle-table.ts b/indexer/packages/postgres/src/stores/candle-table.ts index 5eae4ee242..af6192e387 100644 --- a/indexer/packages/postgres/src/stores/candle-table.ts +++ b/indexer/packages/postgres/src/stores/candle-table.ts @@ -1,7 +1,7 @@ -import _ from 'lodash'; import { PartialModelObject, QueryBuilder } from 'objection'; import { BUFFER_ENCODING_UTF_8, DEFAULT_POSTGRES_OPTIONS } from '../constants'; +import { knexReadReplica } from '../helpers/knex'; import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers'; import Transaction from '../helpers/transaction'; import { getUuid } from '../helpers/uuid'; @@ -174,36 +174,66 @@ export async function findLatest( export async function findCandlesMap( tickers: string[], - resolutions: CandleResolution[], - options: Options = DEFAULT_POSTGRES_OPTIONS, ): Promise { + if (tickers.length === 0) { + return {}; + } + const candlesMap: CandlesMap = {}; + for (const ticker of tickers) { + candlesMap[ticker] = {}; + } - await Promise.all( - _.map( - tickers, - async (ticker: string) => { - candlesMap[ticker] = {}; - const findLatestCandles: Promise[] = resolutions.map( - (resolution: CandleResolution) => findLatest( - ticker, - resolution, - options, - ), - ); - - // Map each resolution to its respective candle - const allLatestCandles: (CandleFromDatabase | undefined)[] = await Promise.all( - findLatestCandles, - ); - _.forEach(allLatestCandles, (candle: CandleFromDatabase | undefined) => { - if (candle !== undefined) { - candlesMap[ticker][candle.resolution] = candle; - } - }); - }, - ), - ); + const minuteCandlesResult: { + rows: CandleFromDatabase[], + } = await knexReadReplica.getConnection().raw( + ` + SELECT DISTINCT ON ( + ticker, + resolution + ) candles.* FROM + candles + WHERE + "ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND + "startedAt" > NOW() - INTERVAL '3 hours' AND + resolution IN ('1MIN', '5MINS', '15MINS', '30MINS', '1HOUR') + ORDER BY + ticker, + resolution, + "startedAt" DESC; + `, + ) as unknown as { + rows: CandleFromDatabase[], + }; + const hourDayCandlesResult: { + rows: CandleFromDatabase[], + } = await knexReadReplica.getConnection().raw( + ` + SELECT DISTINCT ON ( + ticker, + resolution + ) candles.* FROM + candles + WHERE + "ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND + "startedAt" > NOW() - INTERVAL '2 days' AND + resolution IN ('4HOURS', '1DAY') + ORDER BY + ticker, + resolution, + "startedAt" DESC; + `, + ) as unknown as { + rows: CandleFromDatabase[], + }; + const latestCandles: CandleFromDatabase[] = minuteCandlesResult.rows + .concat(hourDayCandlesResult.rows); + for (const candle of latestCandles) { + if (candlesMap[candle.ticker] === undefined) { + candlesMap[candle.ticker] = {}; + } + candlesMap[candle.ticker][candle.resolution] = candle; + } return candlesMap; } diff --git a/indexer/services/ender/__tests__/lib/candles-generator.test.ts b/indexer/services/ender/__tests__/lib/candles-generator.test.ts index 8d430ddf42..6004dc7f48 100644 --- a/indexer/services/ender/__tests__/lib/candles-generator.test.ts +++ b/indexer/services/ender/__tests__/lib/candles-generator.test.ts @@ -37,8 +37,14 @@ import { redis, } from '@dydxprotocol-indexer/redis'; import { ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX } from '@dydxprotocol-indexer/redis/build/src/caches/orderbook-mid-prices-cache'; +import { DateTime, Settings } from 'luxon'; describe('candleHelper', () => { + const startedAt: DateTime = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime, + CandleResolution.ONE_MINUTE, + ); + beforeAll(async () => { await dbHelpers.migrate(); await dbHelpers.clearData(); @@ -48,6 +54,7 @@ describe('candleHelper', () => { beforeEach(async () => { await testMocks.seedData(); await perpetualMarketRefresher.updatePerpetualMarkets(); + Settings.now = () => startedAt.plus({ minutes: 30 }).valueOf(); }); afterEach(async () => { @@ -55,6 +62,7 @@ describe('candleHelper', () => { clearCandlesMap(); jest.clearAllMocks(); await redis.deleteAllAsync(redisClient); + Settings.now = () => new Date().valueOf(); }); afterAll(async () => { @@ -87,10 +95,6 @@ describe('candleHelper', () => { orderbookMidPriceClose: undefined, orderbookMidPriceOpen: undefined, }; - const startedAt: IsoString = helpers.calculateNormalizedCandleStartTime( - testConstants.createdDateTime, - CandleResolution.ONE_MINUTE, - ).toISO(); const previousStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime( testConstants.createdDateTime.minus({ minutes: 1 }), CandleResolution.ONE_MINUTE, @@ -304,8 +308,8 @@ describe('candleHelper', () => { '100', // open interest false, // block contains trades { // expected candle - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: closePrice, @@ -343,8 +347,8 @@ describe('candleHelper', () => { true, // block contains trades { // expected candle ...defaultCandle, - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), resolution: CandleResolution.ONE_MINUTE, startingOpenInterest: '100', orderbookMidPriceClose: '1000', @@ -356,7 +360,7 @@ describe('candleHelper', () => { [ 'updates empty candle', // description { // initial candle - startedAt, + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: closePrice, @@ -374,8 +378,8 @@ describe('candleHelper', () => { true, // block contains trades { // expected candle ...defaultCandle, - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), resolution: CandleResolution.ONE_MINUTE, startingOpenInterest: existingStartingOpenInterest, orderbookMidPriceClose: null, @@ -396,7 +400,7 @@ describe('candleHelper', () => { [ 'does not update candle when there are no trades and an existing candle', // description { // initial candle - startedAt, + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: lowPrice, @@ -413,8 +417,8 @@ describe('candleHelper', () => { '100', // open interest false, // block contains trades { // expected candle - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: lowPrice, @@ -463,7 +467,7 @@ describe('candleHelper', () => { if (expectedCandle === undefined) { // Verify no candles in postgres and no kafka messages - await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt); + await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt.toISO()); verifyNoCandlesKafkaMessages(publisher, CandleResolution.ONE_MINUTE); } else { const expectedCandles: CandleFromDatabase[] = [expectedCandle]; @@ -489,7 +493,7 @@ describe('candleHelper', () => { const startTime: IsoString = helpers.calculateNormalizedCandleStartTime( testConstants.createdDateTime.minus({ minutes: 100 }), CandleResolution.ONE_MINUTE, - ).toISO(); + ).toUTC().toISO(); await Promise.all( _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { diff --git a/indexer/services/ender/src/caches/block-cache.ts b/indexer/services/ender/src/caches/block-cache.ts index 1b7af459f6..fd73321e47 100644 --- a/indexer/services/ender/src/caches/block-cache.ts +++ b/indexer/services/ender/src/caches/block-cache.ts @@ -107,6 +107,7 @@ function isNextBlock(blockHeight: string): boolean { * All caches must be initialized in a Transaction to ensure consistency */ export async function initializeAllCaches(): Promise { + const start: number = Date.now(); const txId: number = await Transaction.start(); await Transaction.setIsolationLevel(txId, IsolationLevel.READ_COMMITTED); @@ -120,6 +121,10 @@ export async function initializeAllCaches(): Promise { ]); await Transaction.rollback(txId); + stats.timing( + `${config.SERVICE_NAME}.initialize_caches`, + Date.now() - start, + ); } export function resetBlockCache(): void { diff --git a/indexer/services/ender/src/caches/candle-cache.ts b/indexer/services/ender/src/caches/candle-cache.ts index 69577ac4be..6fff23e502 100644 --- a/indexer/services/ender/src/caches/candle-cache.ts +++ b/indexer/services/ender/src/caches/candle-cache.ts @@ -23,8 +23,6 @@ export async function startCandleCache(txId?: number): Promise { candlesMap = await CandleTable.findCandlesMap( tickers, - Object.values(CandleResolution), - { txId }, ); }