From 7f894a5b9b142d6db8c40782e4362f3c15cc41a1 Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Fri, 22 Nov 2024 13:44:01 -0500 Subject: [PATCH] Use materialized view for vault pnl data. --- .../stores/vault-pnl-ticks-view.test.ts | 209 ++++++++++++++++++ ...20241119162238_create_vault_hourly_view.ts | 47 ++++ .../20241119163402_create_vault_daily_view.ts | 47 ++++ indexer/packages/postgres/src/index.ts | 1 + .../src/stores/vault-pnl-ticks-view.ts | 65 ++++++ .../api/v4/vault-controller.test.ts | 5 + .../controllers/api/v4/vault-controller.ts | 7 +- indexer/services/roundtable/src/config.ts | 7 + .../roundtable/src/tasks/refresh-vault-pnl.ts | 45 ++++ 9 files changed, 429 insertions(+), 4 deletions(-) create mode 100644 indexer/packages/postgres/__tests__/stores/vault-pnl-ticks-view.test.ts create mode 100644 indexer/packages/postgres/src/db/migrations/migration_files/20241119162238_create_vault_hourly_view.ts create mode 100644 indexer/packages/postgres/src/db/migrations/migration_files/20241119163402_create_vault_daily_view.ts create mode 100644 indexer/packages/postgres/src/stores/vault-pnl-ticks-view.ts create mode 100644 indexer/services/roundtable/src/tasks/refresh-vault-pnl.ts diff --git a/indexer/packages/postgres/__tests__/stores/vault-pnl-ticks-view.test.ts b/indexer/packages/postgres/__tests__/stores/vault-pnl-ticks-view.test.ts new file mode 100644 index 0000000000..926c10d917 --- /dev/null +++ b/indexer/packages/postgres/__tests__/stores/vault-pnl-ticks-view.test.ts @@ -0,0 +1,209 @@ +import { + PnlTickInterval, + PnlTicksFromDatabase, +} from '../../src/types'; +import * as VaultPnlTicksView from '../../src/stores/vault-pnl-ticks-view'; +import * as PnlTicksTable from '../../src/stores/pnl-ticks-table'; +import * as BlockTable from '../../src/stores/block-table'; +import * as VaultTable from '../../src/stores/vault-table'; +import { clearData, migrate, teardown } from '../../src/helpers/db-helpers'; +import { seedData } from '../helpers/mock-generators'; +import * as WalletTable from '../../src/stores/wallet-table'; +import * as SubaccountTable from '../../src/stores/subaccount-table'; +import { + defaultSubaccountId, + defaultSubaccountIdWithAlternateAddress, + defaultSubaccountWithAlternateAddress, + defaultWallet2, + defaultVault, + defaultSubaccount, +} from '../helpers/constants'; +import { DateTime } from 'luxon'; + +describe('PnlTicks store', () => { + beforeEach(async () => { + await seedData(); + await WalletTable.create(defaultWallet2); + await SubaccountTable.create(defaultSubaccountWithAlternateAddress); + await Promise.all([ + VaultTable.create({ + ...defaultVault, + address: defaultSubaccount.address, + }), + VaultTable.create({ + ...defaultVault, + address: defaultSubaccountWithAlternateAddress.address, + }), + ]); + }); + + beforeAll(async () => { + await migrate(); + }); + + afterEach(async () => { + await clearData(); + }); + + afterAll(async () => { + await teardown(); + }); + + it.each([ + { + description: 'Get hourly pnl ticks', + interval: PnlTickInterval.hour, + }, + { + description: 'Get daily pnl ticks', + interval: PnlTickInterval.day, + }, + ])('$description', async ({ + interval, + }: { + interval: PnlTickInterval, + }) => { + const createdTicks: PnlTicksFromDatabase[] = await setupIntervalPnlTicks(); + await VaultPnlTicksView.refreshDailyView(); + await VaultPnlTicksView.refreshHourlyView(); + const pnlTicks: PnlTicksFromDatabase[] = await VaultPnlTicksView.getVaultsPnl( + interval, + 7 * 24 * 60 * 60, // 1 week + DateTime.fromISO(createdTicks[8].blockTime).plus({ seconds: 1 }), + ); + // See setup function for created ticks. + // Should exclude tick that is within the same hour except the first. + const expectedHourlyTicks: PnlTicksFromDatabase[] = [ + createdTicks[7], + createdTicks[5], + createdTicks[2], + createdTicks[0], + ]; + // Should exclude ticks that is within the same day except for the first. + const expectedDailyTicks: PnlTicksFromDatabase[] = [ + createdTicks[7], + createdTicks[2], + ]; + + if (interval === PnlTickInterval.day) { + expect(pnlTicks).toEqual(expectedDailyTicks); + } else if (interval === PnlTickInterval.hour) { + expect(pnlTicks).toEqual(expectedHourlyTicks); + } + }); + + async function setupIntervalPnlTicks(): Promise { + const currentTime: DateTime = DateTime.utc().startOf('day'); + const tenMinAgo: string = currentTime.minus({ minute: 10 }).toISO(); + const almostTenMinAgo: string = currentTime.minus({ second: 603 }).toISO(); + const twoHoursAgo: string = currentTime.minus({ hour: 2 }).toISO(); + const twoDaysAgo: string = currentTime.minus({ day: 2 }).toISO(); + const monthAgo: string = currentTime.minus({ day: 30 }).toISO(); + await Promise.all([ + BlockTable.create({ + blockHeight: '3', + time: monthAgo, + }), + BlockTable.create({ + blockHeight: '4', + time: twoDaysAgo, + }), + BlockTable.create({ + blockHeight: '6', + time: twoHoursAgo, + }), + BlockTable.create({ + blockHeight: '8', + time: almostTenMinAgo, + }), + BlockTable.create({ + blockHeight: '10', + time: tenMinAgo, + }), + ]); + const createdTicks: PnlTicksFromDatabase[] = await PnlTicksTable.createMany([ + { + subaccountId: defaultSubaccountId, + equity: '1100', + createdAt: almostTenMinAgo, + totalPnl: '1200', + netTransfers: '50', + blockHeight: '10', + blockTime: almostTenMinAgo, + }, + { + subaccountId: defaultSubaccountId, + equity: '1090', + createdAt: tenMinAgo, + totalPnl: '1190', + netTransfers: '50', + blockHeight: '8', + blockTime: tenMinAgo, + }, + { + subaccountId: defaultSubaccountId, + equity: '1080', + createdAt: twoHoursAgo, + totalPnl: '1180', + netTransfers: '50', + blockHeight: '6', + blockTime: twoHoursAgo, + }, + { + subaccountId: defaultSubaccountId, + equity: '1070', + createdAt: twoDaysAgo, + totalPnl: '1170', + netTransfers: '50', + blockHeight: '4', + blockTime: twoDaysAgo, + }, + { + subaccountId: defaultSubaccountId, + equity: '1200', + createdAt: monthAgo, + totalPnl: '1170', + netTransfers: '50', + blockHeight: '3', + blockTime: monthAgo, + }, + { + subaccountId: defaultSubaccountIdWithAlternateAddress, + equity: '200', + createdAt: almostTenMinAgo, + totalPnl: '300', + netTransfers: '50', + blockHeight: '10', + blockTime: almostTenMinAgo, + }, + { + subaccountId: defaultSubaccountIdWithAlternateAddress, + equity: '210', + createdAt: tenMinAgo, + totalPnl: '310', + netTransfers: '50', + blockHeight: '8', + blockTime: tenMinAgo, + }, + { + subaccountId: defaultSubaccountIdWithAlternateAddress, + equity: '220', + createdAt: twoHoursAgo, + totalPnl: '320', + netTransfers: '50', + blockHeight: '6', + blockTime: twoHoursAgo, + }, + { + subaccountId: defaultSubaccountIdWithAlternateAddress, + equity: '230', + createdAt: twoDaysAgo, + totalPnl: '330', + netTransfers: '50', + blockHeight: '4', + blockTime: twoDaysAgo, + }, + ]); + return createdTicks; + } +}); \ No newline at end of file diff --git a/indexer/packages/postgres/src/db/migrations/migration_files/20241119162238_create_vault_hourly_view.ts b/indexer/packages/postgres/src/db/migrations/migration_files/20241119162238_create_vault_hourly_view.ts new file mode 100644 index 0000000000..aff1144bc1 --- /dev/null +++ b/indexer/packages/postgres/src/db/migrations/migration_files/20241119162238_create_vault_hourly_view.ts @@ -0,0 +1,47 @@ +import * as Knex from "knex"; + +const RAW_VAULTS_PNL_HOURLY_QUERY: string = ` +CREATE MATERIALIZED VIEW IF NOT EXISTS vaults_hourly_pnl AS WITH vault_subaccounts AS +( + SELECT subaccounts.id + FROM vaults, + subaccounts + WHERE vaults.address = subaccounts.address + AND subaccounts."subaccountNumber" = 0), pnl_subaccounts AS +( + SELECT * + FROM vault_subaccounts + UNION + SELECT id + FROM subaccounts + WHERE address = 'dydx18tkxrnrkqc2t0lr3zxr5g6a4hdvqksylxqje4r' + AND "subaccountNumber" = 0) +SELECT "id", + "subaccountId", + "equity", + "totalPnl", + "netTransfers", + "createdAt", + "blockHeight", + "blockTime" +FROM ( + SELECT pnl_ticks.*, + ROW_NUMBER() OVER ( partition BY "subaccountId", DATE_TRUNC( 'hour', "blockTime" ) ORDER BY "blockTime" ) AS r + FROM pnl_ticks + WHERE "subaccountId" IN + ( + SELECT * + FROM pnl_subaccounts) + AND "blockTime" >= NOW() - interval '604800 second' ) AS pnl_intervals +WHERE r = 1 +ORDER BY "subaccountId"; +` + +export async function up(knex: Knex): Promise { + await knex.raw(RAW_VAULTS_PNL_HOURLY_QUERY); + await knex.raw(`CREATE UNIQUE INDEX ON vaults_hourly_pnl (id);`) +} + +export async function down(knex: Knex): Promise { + await knex.raw('DROP MATERIALIZED VIEW IF EXISTS vaults_hourly_pnl;') +} diff --git a/indexer/packages/postgres/src/db/migrations/migration_files/20241119163402_create_vault_daily_view.ts b/indexer/packages/postgres/src/db/migrations/migration_files/20241119163402_create_vault_daily_view.ts new file mode 100644 index 0000000000..ad18d018f1 --- /dev/null +++ b/indexer/packages/postgres/src/db/migrations/migration_files/20241119163402_create_vault_daily_view.ts @@ -0,0 +1,47 @@ +import * as Knex from "knex"; + +const RAW_VAULTS_PNL_DAILY_QUERY: string = ` +CREATE MATERIALIZED VIEW IF NOT EXISTS vaults_daily_pnl AS WITH vault_subaccounts AS +( + SELECT subaccounts.id + FROM vaults, + subaccounts + WHERE vaults.address = subaccounts.address + AND subaccounts."subaccountNumber" = 0), pnl_subaccounts AS +( + SELECT * + FROM vault_subaccounts + UNION + SELECT id + FROM subaccounts + WHERE address = 'dydx18tkxrnrkqc2t0lr3zxr5g6a4hdvqksylxqje4r' + AND "subaccountNumber" = 0) +SELECT "id", + "subaccountId", + "equity", + "totalPnl", + "netTransfers", + "createdAt", + "blockHeight", + "blockTime" +FROM ( + SELECT pnl_ticks.*, + ROW_NUMBER() OVER ( partition BY "subaccountId", DATE_TRUNC( 'day', "blockTime" ) ORDER BY "blockTime" ) AS r + FROM pnl_ticks + WHERE "subaccountId" IN + ( + SELECT * + FROM pnl_subaccounts) + AND "blockTime" >= NOW() - interval '7776000 second' ) AS pnl_intervals +WHERE r = 1 +ORDER BY "subaccountId"; +` + +export async function up(knex: Knex): Promise { + await knex.raw(RAW_VAULTS_PNL_DAILY_QUERY); + await knex.raw(`CREATE UNIQUE INDEX ON vaults_daily_pnl (id);`) +} + +export async function down(knex: Knex): Promise { + await knex.raw('DROP MATERIALIZED VIEW IF EXISTS vaults_daily_pnl;') +} diff --git a/indexer/packages/postgres/src/index.ts b/indexer/packages/postgres/src/index.ts index 70c0d719a5..52b328a1e7 100644 --- a/indexer/packages/postgres/src/index.ts +++ b/indexer/packages/postgres/src/index.ts @@ -51,6 +51,7 @@ export * as AffiliateReferredUsersTable from './stores/affiliate-referred-users- export * as FirebaseNotificationTokenTable from './stores/firebase-notification-token-table'; export * as AffiliateInfoTable from './stores/affiliate-info-table'; export * as VaultTable from './stores/vault-table'; +export * as VaultPnlTicksView from './stores/vault-pnl-ticks-view'; export * as perpetualMarketRefresher from './loops/perpetual-market-refresher'; export * as assetRefresher from './loops/asset-refresher'; diff --git a/indexer/packages/postgres/src/stores/vault-pnl-ticks-view.ts b/indexer/packages/postgres/src/stores/vault-pnl-ticks-view.ts new file mode 100644 index 0000000000..fb8b765827 --- /dev/null +++ b/indexer/packages/postgres/src/stores/vault-pnl-ticks-view.ts @@ -0,0 +1,65 @@ +import _ from 'lodash'; +import { DateTime } from 'luxon'; + +import { knexReadReplica } from '../helpers/knex'; +import { + PnlTickInterval, + PnlTicksFromDatabase, +} from '../types'; +import { rawQuery } from '../helpers/stores-helpers'; + +const VAULT_HOURLY_PNL_VIEW: string = 'vaults_hourly_pnl'; +const VAULT_DAILY_PNL_VIEW: string = 'vaults_daily_pnl'; + +export async function refreshHourlyView(): Promise { + return await rawQuery( + `REFRESH MATERIALIZED VIEW CONCURRENTLY ${VAULT_HOURLY_PNL_VIEW}`, + { + readReplica: false, + } + ); +} + +export async function refreshDailyView(): Promise { + return await rawQuery( + `REFRESH MATERIALIZED VIEW CONCURRENTLY ${VAULT_DAILY_PNL_VIEW}`, + { + readReplica: false, + } + ); +} + +export async function getVaultsPnl( + interval: PnlTickInterval, + timeWindowSeconds: number, + earliestDate: DateTime, +): Promise { + let viewName: string = VAULT_DAILY_PNL_VIEW; + if (interval == PnlTickInterval.hour) { + viewName = VAULT_HOURLY_PNL_VIEW; + } + const result: { + rows: PnlTicksFromDatabase[], + } = await knexReadReplica.getConnection().raw( + ` + SELECT + "id", + "subaccountId", + "equity", + "totalPnl", + "netTransfers", + "createdAt", + "blockHeight", + "blockTime" + FROM ${viewName} + WHERE + "blockTime" >= '${earliestDate.toUTC().toISO()}'::timestamp AND + "blockTime" > NOW() - INTERVAL '${timeWindowSeconds} second' + ORDER BY "subaccountId", "blockTime"; + `, + ) as unknown as { + rows: PnlTicksFromDatabase[], + }; + + return result.rows; +} diff --git a/indexer/services/comlink/__tests__/controllers/api/v4/vault-controller.test.ts b/indexer/services/comlink/__tests__/controllers/api/v4/vault-controller.test.ts index 26304550fd..f5fe40239f 100644 --- a/indexer/services/comlink/__tests__/controllers/api/v4/vault-controller.test.ts +++ b/indexer/services/comlink/__tests__/controllers/api/v4/vault-controller.test.ts @@ -16,6 +16,7 @@ import { MEGAVAULT_MODULE_ADDRESS, MEGAVAULT_SUBACCOUNT_ID, TransferTable, + VaultPnlTicksView, } from '@dydxprotocol-indexer/postgres'; import { RequestMethod, VaultHistoricalPnl } from '../../../../src/types'; import request from 'supertest'; @@ -127,6 +128,8 @@ describe('vault-controller#V4', () => { afterEach(async () => { await dbHelpers.clearData(); + await VaultPnlTicksView.refreshDailyView(); + await VaultPnlTicksView.refreshHourlyView(); config.VAULT_PNL_HISTORY_HOURS = vaultPnlHistoryHoursPrev; config.VAULT_LATEST_PNL_TICK_WINDOW_HOURS = vaultPnlLastPnlWindowPrev; config.VAULT_PNL_START_DATE = vaultPnlStartDatePrev; @@ -652,6 +655,8 @@ describe('vault-controller#V4', () => { ]); createdTicks.push(...mainSubaccountTicks); } + await VaultPnlTicksView.refreshDailyView(); + await VaultPnlTicksView.refreshHourlyView(); return createdTicks; } diff --git a/indexer/services/comlink/src/controllers/api/v4/vault-controller.ts b/indexer/services/comlink/src/controllers/api/v4/vault-controller.ts index dbc4043ab8..bd2c489478 100644 --- a/indexer/services/comlink/src/controllers/api/v4/vault-controller.ts +++ b/indexer/services/comlink/src/controllers/api/v4/vault-controller.ts @@ -29,6 +29,7 @@ import { TransferTable, TransferColumns, Ordering, + VaultPnlTicksView, } from '@dydxprotocol-indexer/postgres'; import Big from 'big.js'; import bounds from 'binary-searching'; @@ -343,10 +344,9 @@ async function getVaultSubaccountPnlTicks( PnlTicksFromDatabase[], PnlTicksFromDatabase[], ] = await Promise.all([ - PnlTicksTable.getPnlTicksAtIntervals( + VaultPnlTicksView.getVaultsPnl( resolution, windowSeconds, - vaultSubaccountIds, getVautlPnlStartDate(), ), PnlTicksTable.getLatestPnlTick( @@ -559,10 +559,9 @@ export async function getLatestPnlTick( PnlTicksFromDatabase[], PnlTicksFromDatabase[], ] = await Promise.all([ - PnlTicksTable.getPnlTicksAtIntervals( + VaultPnlTicksView.getVaultsPnl( PnlTickInterval.hour, config.VAULT_LATEST_PNL_TICK_WINDOW_HOURS * 60 * 60, - vaultSubaccountIds, getVautlPnlStartDate(), ), PnlTicksTable.getLatestPnlTick( diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 6d7efbaca6..2b0a5bb965 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -60,6 +60,7 @@ export const configSchema = { LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME: parseBoolean({ default: true }), LOOPS_ENABLED_UPDATE_AFFILIATE_INFO: parseBoolean({ default: true }), LOOPS_ENABLED_DELETE_OLD_FIREBASE_NOTIFICATION_TOKENS: parseBoolean({ default: true }), + LOOPS_ENABLED_REFRESH_VAULT_PNL: parseBoolean({ default: true }), // Loop Timing LOOPS_INTERVAL_MS_MARKET_UPDATER: parseInteger({ @@ -140,6 +141,9 @@ export const configSchema = { LOOPS_INTERVAL_MS_CACHE_ORDERBOOK_MID_PRICES: parseInteger({ default: ONE_SECOND_IN_MILLISECONDS, }), + LOOPS_INTERVAL_MS_REFRESH_VAULT_PNL: parseInteger({ + default: 5 * ONE_MINUTE_IN_MILLISECONDS, + }), // Start delay START_DELAY_ENABLED: parseBoolean({ default: true }), @@ -215,6 +219,9 @@ export const configSchema = { SUBACCOUNT_USERNAME_BATCH_SIZE: parseInteger({ default: 2000 }), // number of attempts to generate username for a subaccount ATTEMPT_PER_SUBACCOUNT: parseInteger({ default: 3 }), + + // Refresh vault pnl view + TIME_WINDOW_FOR_REFRESH_MS: parseInteger({ default: 15 * ONE_MINUTE_IN_MILLISECONDS }), }; export default parseSchema(configSchema); diff --git a/indexer/services/roundtable/src/tasks/refresh-vault-pnl.ts b/indexer/services/roundtable/src/tasks/refresh-vault-pnl.ts new file mode 100644 index 0000000000..8a63b93f69 --- /dev/null +++ b/indexer/services/roundtable/src/tasks/refresh-vault-pnl.ts @@ -0,0 +1,45 @@ +import { logger, stats } from '@dydxprotocol-indexer/base'; +import { + VaultPnlTicksView +} from '@dydxprotocol-indexer/postgres'; +import { DateTime } from 'luxon'; +import config from '../config'; + +/** + * Update the affiliate info for all affiliate addresses. + */ +export default async function runTask(): Promise { + const taskStart: number = Date.now(); + + const currentTime: DateTime = DateTime.utc(); + if (currentTime.diff( + currentTime.startOf('hour') + ).toMillis() < config.TIME_WINDOW_FOR_REFRESH_MS) { + logger.info({ + at: 'refresh-vault-pnl#runTask', + message: 'Refreshing vault hourly pnl view', + currentTime, + }); + await VaultPnlTicksView.refreshHourlyView(); + stats.timing( + `${config.SERVICE_NAME}.refresh-vault-pnl.hourly-view.timing`, + Date.now() - taskStart, + ) + } + + const refreshDailyStart: number = Date.now(); + if (currentTime.diff( + currentTime.startOf('day') + ).toMillis() < config.TIME_WINDOW_FOR_REFRESH_MS) { + logger.info({ + at: 'refresh-vault-pnl#runTask', + message: 'Refreshing vault daily pnl view', + currentTime, + }); + await VaultPnlTicksView.refreshDailyView(); + stats.timing( + `${config.SERVICE_NAME}.refresh-vault-pnl.daily-view.timing`, + Date.now() - refreshDailyStart, + ); + } +} \ No newline at end of file