diff --git a/indexer/packages/postgres/src/constants.ts b/indexer/packages/postgres/src/constants.ts index 6aabf9f223..ac9a38e6cb 100644 --- a/indexer/packages/postgres/src/constants.ts +++ b/indexer/packages/postgres/src/constants.ts @@ -15,6 +15,7 @@ import PerpetualPositionModel from './models/perpetual-position-model'; import SubaccountModel from './models/subaccount-model'; import TradingRewardModel from './models/trading-reward-model'; import TransferModel from './models/transfer-model'; +import VaultModel from './models/vault-model'; import { APITimeInForce, CandleResolution, @@ -103,6 +104,7 @@ export const SQL_TO_JSON_DEFINED_MODELS = [ SubaccountModel, TransferModel, TradingRewardModel, + VaultModel, ]; export type SpecifiedClobPairStatus = diff --git a/indexer/packages/postgres/src/db/migrations/migration_files/20240912180829_create_vaults_table.ts b/indexer/packages/postgres/src/db/migrations/migration_files/20240912180829_create_vaults_table.ts index 7c869cecd9..a815302127 100644 --- a/indexer/packages/postgres/src/db/migrations/migration_files/20240912180829_create_vaults_table.ts +++ b/indexer/packages/postgres/src/db/migrations/migration_files/20240912180829_create_vaults_table.ts @@ -6,7 +6,7 @@ export async function up(knex: Knex): Promise { table.bigInteger('clobPairId').notNullable(); // clob pair id for vault table.enum('status', [ 'DEACTIVATED', - 'STANDBY', + 'STAND_BY', 'QUOTING', 'CLOSE_ONLY', ]).notNullable(); // quoting status of vault diff --git a/indexer/packages/postgres/src/index.ts b/indexer/packages/postgres/src/index.ts index 0081e3bca9..1afeb6523a 100644 --- a/indexer/packages/postgres/src/index.ts +++ b/indexer/packages/postgres/src/index.ts @@ -48,6 +48,7 @@ export * as SubaccountUsernamesTable from './stores/subaccount-usernames-table'; export * as PersistentCacheTable from './stores/persistent-cache-table'; export * as AffiliateReferredUsersTable from './stores/affiliate-referred-users-table'; export * as FirebaseNotificationTokenTable from './stores/firebase-notification-token-table'; +export * as VaultTable from './stores/vault-table'; export * as perpetualMarketRefresher from './loops/perpetual-market-refresher'; export * as assetRefresher from './loops/asset-refresher'; diff --git a/indexer/packages/postgres/src/models/vault-model.ts b/indexer/packages/postgres/src/models/vault-model.ts index fd7584ca85..e2f4225854 100644 --- a/indexer/packages/postgres/src/models/vault-model.ts +++ b/indexer/packages/postgres/src/models/vault-model.ts @@ -32,6 +32,22 @@ export default class VaultModel extends BaseModel { }; } + /** + * A mapping from column name to JSON conversion expected. + * See getSqlConversionForDydxModelTypes for valid conversions. + * + * TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match. + */ + static get sqlToJsonConversions() { + return { + address: 'string', + clobPairId: 'string', + status: 'string', + createdAt: 'date-time', + updatedAt: 'date-time', + }; + } + address!: string; clobPairId!: string; diff --git a/indexer/packages/v4-protos/src/index.ts b/indexer/packages/v4-protos/src/index.ts index 8b081abbf9..81c6df94f5 100644 --- a/indexer/packages/v4-protos/src/index.ts +++ b/indexer/packages/v4-protos/src/index.ts @@ -16,4 +16,5 @@ export * from './codegen/google/protobuf/timestamp'; export * from './codegen/dydxprotocol/indexer/protocol/v1/clob'; export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount'; export * from './codegen/dydxprotocol/indexer/shared/removal_reason'; +export * from './codegen/dydxprotocol/vault/vault'; export * from './utils'; diff --git a/indexer/services/ender/__tests__/handlers/upsert-vault-handler.test.ts b/indexer/services/ender/__tests__/handlers/upsert-vault-handler.test.ts new file mode 100644 index 0000000000..03efb3d6b0 --- /dev/null +++ b/indexer/services/ender/__tests__/handlers/upsert-vault-handler.test.ts @@ -0,0 +1,150 @@ +import { + IndexerTendermintBlock, + UpsertVaultEventV1, + VaultStatus, +} from '@dydxprotocol-indexer/v4-protos'; +import { + dbHelpers, + testMocks, + testConstants, + VaultFromDatabase, + VaultTable, + VaultStatus as IndexerVaultStatus, +} from '@dydxprotocol-indexer/postgres'; +import { KafkaMessage } from 'kafkajs'; +import { createKafkaMessage } from '@dydxprotocol-indexer/kafka'; +import { onMessage } from '../../src/lib/on-message'; +import { DydxIndexerSubtypes } from '../../src/lib/types'; +import { createIndexerTendermintBlock, createIndexerTendermintEvent } from '../helpers/indexer-proto-helpers'; +import { + defaultHeight, + defaultPreviousHeight, + defaultTime, + defaultTxHash, +} from '../helpers/constants'; +import { updateBlockCache } from '../../src/caches/block-cache'; +import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; + +describe('upsertVaultHandler', () => { + beforeAll(async () => { + await dbHelpers.migrate(); + await dbHelpers.clearData(); + await createPostgresFunctions(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + updateBlockCache(defaultPreviousHeight); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + jest.clearAllMocks(); + }); + + afterAll(async () => { + await dbHelpers.teardown(); + jest.resetAllMocks(); + }); + + it('should upsert new vaults in single block', async () => { + const events: UpsertVaultEventV1[] = [ + { + address: testConstants.defaultVaultAddress, + clobPairId: 0, + status: VaultStatus.VAULT_STATUS_QUOTING, + }, { + address: testConstants.defaultAddress, + clobPairId: 1, + status: VaultStatus.VAULT_STATUS_STAND_BY, + }, + ]; + const block: IndexerTendermintBlock = createBlockFromEvents( + defaultHeight, + ...events, + ); + const binaryBlock: Uint8Array = IndexerTendermintBlock.encode(block).finish(); + const kafkaMessage: KafkaMessage = createKafkaMessage(Buffer.from(binaryBlock)); + + await onMessage(kafkaMessage); + + const vaults: VaultFromDatabase[] = await VaultTable.findAll( + {}, + [], + {}, + ); + expect(vaults).toHaveLength(2); + expect(vaults[0]).toEqual({ + address: testConstants.defaultVaultAddress, + clobPairId: '0', + status: IndexerVaultStatus.QUOTING, + createdAt: block.time?.toISOString(), + updatedAt: block.time?.toISOString(), + }); + expect(vaults[1]).toEqual({ + address: testConstants.defaultAddress, + clobPairId: '1', + status: IndexerVaultStatus.STAND_BY, + createdAt: block.time?.toISOString(), + updatedAt: block.time?.toISOString(), + }); + }); + + it('should upsert an existing vault', async () => { + await VaultTable.create(testConstants.defaultVault); + + const events: UpsertVaultEventV1[] = [ + { + address: testConstants.defaultVaultAddress, + clobPairId: 0, + status: VaultStatus.VAULT_STATUS_CLOSE_ONLY, + }, + ]; + const block: IndexerTendermintBlock = createBlockFromEvents( + defaultHeight, + ...events, + ); + const binaryBlock: Uint8Array = IndexerTendermintBlock.encode(block).finish(); + const kafkaMessage: KafkaMessage = createKafkaMessage(Buffer.from(binaryBlock)); + + await onMessage(kafkaMessage); + + const vaults: VaultFromDatabase[] = await VaultTable.findAll( + {}, + [], + {}, + ); + expect(vaults).toHaveLength(1); + expect(vaults[0]).toEqual({ + address: testConstants.defaultVault.address, + clobPairId: testConstants.defaultVault.clobPairId, + status: IndexerVaultStatus.CLOSE_ONLY, + createdAt: testConstants.defaultVault.createdAt, + updatedAt: block.time?.toISOString(), + }); + }); +}); + +function createBlockFromEvents( + height: number, + ...events: UpsertVaultEventV1[] +): IndexerTendermintBlock { + const transactionIndex = 0; + let eventIndex = 0; + + return createIndexerTendermintBlock( + height, + defaultTime, + events.map((event) => { + const indexerEvent = createIndexerTendermintEvent( + DydxIndexerSubtypes.UPSERT_VAULT, + UpsertVaultEventV1.encode(event).finish(), + transactionIndex, + eventIndex, + ); + eventIndex += 1; + return indexerEvent; + }), + [defaultTxHash], + ); +} diff --git a/indexer/services/ender/__tests__/validators/upsert-vault-validator.test.ts b/indexer/services/ender/__tests__/validators/upsert-vault-validator.test.ts new file mode 100644 index 0000000000..dd9ecdbafd --- /dev/null +++ b/indexer/services/ender/__tests__/validators/upsert-vault-validator.test.ts @@ -0,0 +1,92 @@ +import { ParseMessageError, logger } from '@dydxprotocol-indexer/base'; +import { + IndexerTendermintBlock, + IndexerTendermintEvent, + UpsertVaultEventV1, + VaultStatus, +} from '@dydxprotocol-indexer/v4-protos'; +import { dbHelpers, testConstants, testMocks } from '@dydxprotocol-indexer/postgres'; +import { DydxIndexerSubtypes } from '../../src/lib/types'; +import { defaultHeight, defaultTime, defaultTxHash } from '../helpers/constants'; +import { + createIndexerTendermintBlock, + createIndexerTendermintEvent, +} from '../helpers/indexer-proto-helpers'; +import { expectDidntLogError } from '../helpers/validator-helpers'; +import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; +import { UpsertVaultValidator } from '../../src/validators/upsert-vault-validator'; + +describe('upsert-vault-validator', () => { + beforeAll(async () => { + await dbHelpers.migrate(); + await createPostgresFunctions(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + jest.spyOn(logger, 'error'); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + jest.clearAllMocks(); + }); + + afterAll(async () => { + await dbHelpers.teardown(); + jest.resetAllMocks(); + }); + + describe('validate', () => { + it('does not throw error on valid uspert vault event', () => { + const event: UpsertVaultEventV1 = { + address: testConstants.defaultVaultAddress, + clobPairId: 0, + status: VaultStatus.VAULT_STATUS_QUOTING, + }; + const validator: UpsertVaultValidator = new UpsertVaultValidator( + event, + createBlock(event), + 0, + ); + + validator.validate(); + expectDidntLogError(); + }); + + it('throws error if address in event is empty', () => { + const event: UpsertVaultEventV1 = { + address: '', + clobPairId: 0, + status: VaultStatus.VAULT_STATUS_QUOTING, + }; + const validator: UpsertVaultValidator = new UpsertVaultValidator( + event, + createBlock(event), + 0, + ); + + expect(() => validator.validate()).toThrow(new ParseMessageError( + 'UpsertVaultEvent address is not populated', + )); + }); + }); +}); + +function createBlock( + upsertVaultEvent: UpsertVaultEventV1, +): IndexerTendermintBlock { + const event: IndexerTendermintEvent = createIndexerTendermintEvent( + DydxIndexerSubtypes.UPSERT_VAULT, + UpsertVaultEventV1.encode(upsertVaultEvent).finish(), + 0, + 0, + ); + + return createIndexerTendermintBlock( + defaultHeight, + defaultTime, + [event], + [defaultTxHash], + ); +} diff --git a/indexer/services/ender/src/handlers/upsert-vault-handler.ts b/indexer/services/ender/src/handlers/upsert-vault-handler.ts new file mode 100644 index 0000000000..76b9227fb9 --- /dev/null +++ b/indexer/services/ender/src/handlers/upsert-vault-handler.ts @@ -0,0 +1,18 @@ +import { UpsertVaultEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; + +import { ConsolidatedKafkaEvent } from '../lib/types'; +import { Handler } from './handler'; + +export class UpsertVaultHandler extends Handler { + eventType: string = 'UpsertVaultEventV1'; + + public getParallelizationIds(): string[] { + return []; + } + + // eslint-disable-next-line @typescript-eslint/require-await + public async internalHandle(_: pg.QueryResultRow): Promise { + return []; + } +} diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index 1166ab275e..1a52926f37 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -46,6 +46,7 @@ const HANDLER_SCRIPTS: string[] = [ 'dydx_transfer_handler.sql', 'dydx_update_clob_pair_handler.sql', 'dydx_update_perpetual_handler.sql', + 'dydx_vault_upsert_handler.sql', ]; const DB_SETUP_SCRIPTS: string[] = [ @@ -91,6 +92,7 @@ const HELPER_SCRIPTS: string[] = [ 'dydx_uuid_from_transaction_parts.sql', 'dydx_uuid_from_transfer_parts.sql', 'dydx_protocol_market_type_to_perpetual_market_type.sql', + 'dydx_protocol_vault_status_to_vault_status.sql', ]; const MAIN_SCRIPTS: string[] = [ diff --git a/indexer/services/ender/src/lib/block-processor.ts b/indexer/services/ender/src/lib/block-processor.ts index 6890ee5a7d..91efb5af7f 100644 --- a/indexer/services/ender/src/lib/block-processor.ts +++ b/indexer/services/ender/src/lib/block-processor.ts @@ -30,6 +30,7 @@ import { TradingRewardsValidator } from '../validators/trading-rewards-validator import { TransferValidator } from '../validators/transfer-validator'; import { UpdateClobPairValidator } from '../validators/update-clob-pair-validator'; import { UpdatePerpetualValidator } from '../validators/update-perpetual-validator'; +import { UpsertVaultValidator } from '../validators/upsert-vault-validator'; import { Validator, ValidatorInitializer } from '../validators/validator'; import { BatchedHandlers } from './batched-handlers'; import { indexerTendermintEventToEventProtoWithType, indexerTendermintEventToTransactionIndex } from './helper'; @@ -55,6 +56,7 @@ const TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record = { @@ -62,6 +64,7 @@ const BLOCK_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record'status'); + + vault_record."address" = jsonb_extract_path_text(event_data, 'address'); + vault_record."clobPairId" = (event_data->'clobPairId')::bigint; + vault_record."status" = vault_status; + vault_record."createdAt" = block_time; + vault_record."updatedAt" = block_time; + + INSERT INTO vaults VALUES (vault_record.*) + ON CONFLICT ("address") DO + UPDATE + SET + "status" = vault_status, + "updatedAt" = block_time; + + RETURN jsonb_build_object( + 'vault', + dydx_to_jsonb(vault_record) + ); +END; +$$ LANGUAGE plpgsql; diff --git a/indexer/services/ender/src/scripts/helpers/dydx_protocol_vault_status_to_vault_status.sql b/indexer/services/ender/src/scripts/helpers/dydx_protocol_vault_status_to_vault_status.sql new file mode 100644 index 0000000000..80a1f956f5 --- /dev/null +++ b/indexer/services/ender/src/scripts/helpers/dydx_protocol_vault_status_to_vault_status.sql @@ -0,0 +1,13 @@ +CREATE OR REPLACE FUNCTION dydx_protocol_vault_status_to_vault_status(vaultStatus jsonb) + RETURNS text AS $$ + +BEGIN + CASE vaultStatus + WHEN '1'::jsonb THEN RETURN 'DEACTIVATED'; /** VAULT_STATUS_DEACTIVATED **/ + WHEN '2'::jsonb THEN RETURN 'STAND_BY'; /** VAULT_STATUS_STAND_BY **/ + WHEN '3'::jsonb THEN RETURN 'QUOTING'; /** VAULT_STATUS_QUOTING **/ + WHEN '4'::jsonb THEN RETURN 'CLOSE_ONLY'; /** VAULT_STATUS_CLOSE_ONLY **/ + ELSE RAISE EXCEPTION 'Invalid vault status: %', vaultStatus; + END CASE; +END; +$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; diff --git a/indexer/services/ender/src/validators/upsert-vault-validator.ts b/indexer/services/ender/src/validators/upsert-vault-validator.ts new file mode 100644 index 0000000000..c712a9d399 --- /dev/null +++ b/indexer/services/ender/src/validators/upsert-vault-validator.ts @@ -0,0 +1,31 @@ +import { IndexerTendermintEvent, UpsertVaultEventV1 } from '@dydxprotocol-indexer/v4-protos'; + +import { Handler } from '../handlers/handler'; +import { UpsertVaultHandler } from '../handlers/upsert-vault-handler'; +import { Validator } from './validator'; + +export class UpsertVaultValidator extends Validator { + public validate(): void { + if (this.event.address === '') { + return this.logAndThrowParseMessageError( + 'UpsertVaultEvent address is not populated', + ); + } + } + + public createHandlers( + indexerTendermintEvent: IndexerTendermintEvent, + txId: number, + _: string, + ): Handler[] { + return [ + new UpsertVaultHandler( + this.block, + this.blockEventIndex, + indexerTendermintEvent, + txId, + this.event, + ), + ]; + } +}