From aa000df137e411fd14a24b2b7c9a0829fbe178ca Mon Sep 17 00:00:00 2001 From: vincentwschau <99756290+vincentwschau@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:24:46 -0500 Subject: [PATCH 1/4] Improve query to find candles map. (#2650) (cherry picked from commit 196dc84fbd24baf5fadf7ed5ec60f328d14da365) # Conflicts: # indexer/services/ender/__tests__/lib/candles-generator.test.ts --- .../postgres/src/stores/candle-table.ts | 86 +++++++++++++------ .../__tests__/lib/candles-generator.test.ts | 43 ++++++---- .../services/ender/src/caches/block-cache.ts | 5 ++ .../services/ender/src/caches/candle-cache.ts | 2 - 4 files changed, 91 insertions(+), 45 deletions(-) diff --git a/indexer/packages/postgres/src/stores/candle-table.ts b/indexer/packages/postgres/src/stores/candle-table.ts index 5eae4ee242..af6192e387 100644 --- a/indexer/packages/postgres/src/stores/candle-table.ts +++ b/indexer/packages/postgres/src/stores/candle-table.ts @@ -1,7 +1,7 @@ -import _ from 'lodash'; import { PartialModelObject, QueryBuilder } from 'objection'; import { BUFFER_ENCODING_UTF_8, DEFAULT_POSTGRES_OPTIONS } from '../constants'; +import { knexReadReplica } from '../helpers/knex'; import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers'; import Transaction from '../helpers/transaction'; import { getUuid } from '../helpers/uuid'; @@ -174,36 +174,66 @@ export async function findLatest( export async function findCandlesMap( tickers: string[], - resolutions: CandleResolution[], - options: Options = DEFAULT_POSTGRES_OPTIONS, ): Promise { + if (tickers.length === 0) { + return {}; + } + const candlesMap: CandlesMap = {}; + for (const ticker of tickers) { + candlesMap[ticker] = {}; + } - await Promise.all( - _.map( - tickers, - async (ticker: string) => { - candlesMap[ticker] = {}; - const findLatestCandles: Promise[] = resolutions.map( - (resolution: CandleResolution) => findLatest( - ticker, - resolution, - options, - ), - ); - - // Map each resolution to its respective candle - const allLatestCandles: (CandleFromDatabase | undefined)[] = await Promise.all( - findLatestCandles, - ); - _.forEach(allLatestCandles, (candle: CandleFromDatabase | undefined) => { - if (candle !== undefined) { - candlesMap[ticker][candle.resolution] = candle; - } - }); - }, - ), - ); + const minuteCandlesResult: { + rows: CandleFromDatabase[], + } = await knexReadReplica.getConnection().raw( + ` + SELECT DISTINCT ON ( + ticker, + resolution + ) candles.* FROM + candles + WHERE + "ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND + "startedAt" > NOW() - INTERVAL '3 hours' AND + resolution IN ('1MIN', '5MINS', '15MINS', '30MINS', '1HOUR') + ORDER BY + ticker, + resolution, + "startedAt" DESC; + `, + ) as unknown as { + rows: CandleFromDatabase[], + }; + const hourDayCandlesResult: { + rows: CandleFromDatabase[], + } = await knexReadReplica.getConnection().raw( + ` + SELECT DISTINCT ON ( + ticker, + resolution + ) candles.* FROM + candles + WHERE + "ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND + "startedAt" > NOW() - INTERVAL '2 days' AND + resolution IN ('4HOURS', '1DAY') + ORDER BY + ticker, + resolution, + "startedAt" DESC; + `, + ) as unknown as { + rows: CandleFromDatabase[], + }; + const latestCandles: CandleFromDatabase[] = minuteCandlesResult.rows + .concat(hourDayCandlesResult.rows); + for (const candle of latestCandles) { + if (candlesMap[candle.ticker] === undefined) { + candlesMap[candle.ticker] = {}; + } + candlesMap[candle.ticker][candle.resolution] = candle; + } return candlesMap; } diff --git a/indexer/services/ender/__tests__/lib/candles-generator.test.ts b/indexer/services/ender/__tests__/lib/candles-generator.test.ts index f20b68354c..5921681f0a 100644 --- a/indexer/services/ender/__tests__/lib/candles-generator.test.ts +++ b/indexer/services/ender/__tests__/lib/candles-generator.test.ts @@ -37,8 +37,14 @@ import { redis, } from '@dydxprotocol-indexer/redis'; import { ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX } from '@dydxprotocol-indexer/redis/build/src/caches/orderbook-mid-prices-cache'; +import { DateTime, Settings } from 'luxon'; describe('candleHelper', () => { + const startedAt: DateTime = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime, + CandleResolution.ONE_MINUTE, + ); + beforeAll(async () => { await dbHelpers.migrate(); await dbHelpers.clearData(); @@ -48,6 +54,7 @@ describe('candleHelper', () => { beforeEach(async () => { await testMocks.seedData(); await perpetualMarketRefresher.updatePerpetualMarkets(); + Settings.now = () => startedAt.plus({ minutes: 30 }).valueOf(); }); afterEach(async () => { @@ -55,6 +62,7 @@ describe('candleHelper', () => { clearCandlesMap(); jest.clearAllMocks(); await redis.deleteAllAsync(redisClient); + Settings.now = () => new Date().valueOf(); }); afterAll(async () => { @@ -87,10 +95,6 @@ describe('candleHelper', () => { orderbookMidPriceClose: undefined, orderbookMidPriceOpen: undefined, }; - const startedAt: IsoString = helpers.calculateNormalizedCandleStartTime( - testConstants.createdDateTime, - CandleResolution.ONE_MINUTE, - ).toISO(); const previousStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime( testConstants.createdDateTime.minus({ minutes: 1 }), CandleResolution.ONE_MINUTE, @@ -304,8 +308,8 @@ describe('candleHelper', () => { '100', // open interest false, // block contains trades { // expected candle - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: closePrice, @@ -343,8 +347,8 @@ describe('candleHelper', () => { true, // block contains trades { // expected candle ...defaultCandle, - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), resolution: CandleResolution.ONE_MINUTE, startingOpenInterest: '100', orderbookMidPriceClose: '1000', @@ -356,7 +360,7 @@ describe('candleHelper', () => { [ 'updates empty candle', // description { // initial candle - startedAt, + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: closePrice, @@ -374,8 +378,8 @@ describe('candleHelper', () => { true, // block contains trades { // expected candle ...defaultCandle, - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), resolution: CandleResolution.ONE_MINUTE, startingOpenInterest: existingStartingOpenInterest, orderbookMidPriceClose: null, @@ -396,7 +400,7 @@ describe('candleHelper', () => { [ 'does not update candle when there are no trades and an existing candle', // description { // initial candle - startedAt, + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: lowPrice, @@ -413,8 +417,8 @@ describe('candleHelper', () => { '100', // open interest false, // block contains trades { // expected candle - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: lowPrice, @@ -463,7 +467,7 @@ describe('candleHelper', () => { if (expectedCandle === undefined) { // Verify no candles in postgres and no kafka messages - await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt); + await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt.toISO()); verifyNoCandlesKafkaMessages(publisher, CandleResolution.ONE_MINUTE); } else { const expectedCandles: CandleFromDatabase[] = [expectedCandle]; @@ -485,6 +489,15 @@ describe('candleHelper', () => { const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString(); const orderbookMidPriceClose = '7500'; const orderbookMidPriceOpen = '8000'; +<<<<<<< HEAD +======= + // Set candle start time to be far in the past to ensure all candles are new + const startTime: IsoString = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime.minus({ minutes: 100 }), + CandleResolution.ONE_MINUTE, + ).toUTC().toISO(); + +>>>>>>> 196dc84f (Improve query to find candles map. (#2650)) await Promise.all( _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { return CandleTable.create({ diff --git a/indexer/services/ender/src/caches/block-cache.ts b/indexer/services/ender/src/caches/block-cache.ts index 1b7af459f6..fd73321e47 100644 --- a/indexer/services/ender/src/caches/block-cache.ts +++ b/indexer/services/ender/src/caches/block-cache.ts @@ -107,6 +107,7 @@ function isNextBlock(blockHeight: string): boolean { * All caches must be initialized in a Transaction to ensure consistency */ export async function initializeAllCaches(): Promise { + const start: number = Date.now(); const txId: number = await Transaction.start(); await Transaction.setIsolationLevel(txId, IsolationLevel.READ_COMMITTED); @@ -120,6 +121,10 @@ export async function initializeAllCaches(): Promise { ]); await Transaction.rollback(txId); + stats.timing( + `${config.SERVICE_NAME}.initialize_caches`, + Date.now() - start, + ); } export function resetBlockCache(): void { diff --git a/indexer/services/ender/src/caches/candle-cache.ts b/indexer/services/ender/src/caches/candle-cache.ts index 69577ac4be..6fff23e502 100644 --- a/indexer/services/ender/src/caches/candle-cache.ts +++ b/indexer/services/ender/src/caches/candle-cache.ts @@ -23,8 +23,6 @@ export async function startCandleCache(txId?: number): Promise { candlesMap = await CandleTable.findCandlesMap( tickers, - Object.values(CandleResolution), - { txId }, ); } From 06c7e3cb157ce5f8ab14f4fdd62fdfed2ba530b5 Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Mon, 16 Dec 2024 10:51:39 -0500 Subject: [PATCH 2/4] Fix conflict. --- indexer/services/ender/__tests__/lib/candles-generator.test.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/indexer/services/ender/__tests__/lib/candles-generator.test.ts b/indexer/services/ender/__tests__/lib/candles-generator.test.ts index 5921681f0a..9b85e9abf2 100644 --- a/indexer/services/ender/__tests__/lib/candles-generator.test.ts +++ b/indexer/services/ender/__tests__/lib/candles-generator.test.ts @@ -489,15 +489,12 @@ describe('candleHelper', () => { const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString(); const orderbookMidPriceClose = '7500'; const orderbookMidPriceOpen = '8000'; -<<<<<<< HEAD -======= // Set candle start time to be far in the past to ensure all candles are new const startTime: IsoString = helpers.calculateNormalizedCandleStartTime( testConstants.createdDateTime.minus({ minutes: 100 }), CandleResolution.ONE_MINUTE, ).toUTC().toISO(); ->>>>>>> 196dc84f (Improve query to find candles map. (#2650)) await Promise.all( _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { return CandleTable.create({ From 55e6a921acfa2c9ed9db3484abec22bc6c54113d Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Mon, 16 Dec 2024 10:58:20 -0500 Subject: [PATCH 3/4] Fix lint. --- indexer/services/ender/__tests__/lib/candles-generator.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/services/ender/__tests__/lib/candles-generator.test.ts b/indexer/services/ender/__tests__/lib/candles-generator.test.ts index 9b85e9abf2..fecaeaedd7 100644 --- a/indexer/services/ender/__tests__/lib/candles-generator.test.ts +++ b/indexer/services/ender/__tests__/lib/candles-generator.test.ts @@ -498,7 +498,7 @@ describe('candleHelper', () => { await Promise.all( _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { return CandleTable.create({ - startedAt: previousStartedAt, + startedAt: startTime, ticker: testConstants.defaultPerpetualMarket.ticker, resolution, low: existingPrice, From a87c8540de565761d679e0797afae5730a975fe3 Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Mon, 16 Dec 2024 11:47:36 -0500 Subject: [PATCH 4/4] Fix test. --- .../__tests__/lib/candles-generator.test.ts | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/indexer/services/ender/__tests__/lib/candles-generator.test.ts b/indexer/services/ender/__tests__/lib/candles-generator.test.ts index fecaeaedd7..6004dc7f48 100644 --- a/indexer/services/ender/__tests__/lib/candles-generator.test.ts +++ b/indexer/services/ender/__tests__/lib/candles-generator.test.ts @@ -518,7 +518,7 @@ describe('candleHelper', () => { setCachePrice('BTC-USD', '10005'); await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); - + // Add two trades for BTC-USD market const publisher: KafkaPublisher = new KafkaPublisher(); publisher.addEvents([ defaultTradeKafkaEvent, @@ -526,32 +526,31 @@ describe('candleHelper', () => { ]); // Create new candles, with trades - await runUpdateCandles(publisher).then(async () => { - - // Verify previous candles have orderbookMidPriceClose updated - const previousExpectedCandles: CandleFromDatabase[] = _.map( - Object.values(CandleResolution), - (resolution: CandleResolution) => { - return { - id: CandleTable.uuid(previousStartedAt, defaultCandle.ticker, resolution), - startedAt: previousStartedAt, - ticker: defaultCandle.ticker, - resolution, - low: existingPrice, - high: existingPrice, - open: existingPrice, - close: existingPrice, - baseTokenVolume, - usdVolume, - trades: existingTrades, - startingOpenInterest, - orderbookMidPriceClose: '10005', - orderbookMidPriceOpen, - }; - }, - ); - await verifyCandlesInPostgres(previousExpectedCandles); - }); + await runUpdateCandles(publisher); + + // Verify previous candles have orderbookMidPriceClose updated + const previousExpectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + return { + id: CandleTable.uuid(startTime, defaultCandle.ticker, resolution), + startedAt: startTime, + ticker: defaultCandle.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen, + }; + }, + ); + await verifyCandlesInPostgres(previousExpectedCandles); // Verify new candles were created const expectedCandles: CandleFromDatabase[] = _.map( @@ -593,11 +592,16 @@ describe('candleHelper', () => { const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString(); const orderbookMidPriceClose = '7500'; const orderbookMidPriceOpen = '8000'; + // Set candle start time to be far in the past to ensure all candles are new + const startTime: IsoString = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime.minus({ minutes: 100 }), + CandleResolution.ONE_MINUTE, + ).toISO(); await Promise.all( _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { return CandleTable.create({ - startedAt: previousStartedAt, + startedAt: startTime, ticker: testConstants.defaultPerpetualMarket.ticker, resolution, low: existingPrice, @@ -629,8 +633,8 @@ describe('candleHelper', () => { Object.values(CandleResolution), (resolution: CandleResolution) => { return { - id: CandleTable.uuid(previousStartedAt, defaultCandle.ticker, resolution), - startedAt: previousStartedAt, + id: CandleTable.uuid(startTime, defaultCandle.ticker, resolution), + startedAt: startTime, ticker: defaultCandle.ticker, resolution, low: existingPrice,