Skip to content

Commit

Permalink
[TRA-572] Add handler for upsert vault events to ender. (#2274)
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentwschau authored Sep 17, 2024
1 parent 7faee36 commit f2fb2b9
Show file tree
Hide file tree
Showing 16 changed files with 387 additions and 1 deletion.
2 changes: 2 additions & 0 deletions indexer/packages/postgres/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import PerpetualPositionModel from './models/perpetual-position-model';
import SubaccountModel from './models/subaccount-model';
import TradingRewardModel from './models/trading-reward-model';
import TransferModel from './models/transfer-model';
import VaultModel from './models/vault-model';
import {
APITimeInForce,
CandleResolution,
Expand Down Expand Up @@ -103,6 +104,7 @@ export const SQL_TO_JSON_DEFINED_MODELS = [
SubaccountModel,
TransferModel,
TradingRewardModel,
VaultModel,
];

export type SpecifiedClobPairStatus =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export async function up(knex: Knex): Promise<void> {
table.bigInteger('clobPairId').notNullable(); // clob pair id for vault
table.enum('status', [
'DEACTIVATED',
'STANDBY',
'STAND_BY',
'QUOTING',
'CLOSE_ONLY',
]).notNullable(); // quoting status of vault
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export * as SubaccountUsernamesTable from './stores/subaccount-usernames-table';
export * as PersistentCacheTable from './stores/persistent-cache-table';
export * as AffiliateReferredUsersTable from './stores/affiliate-referred-users-table';
export * as FirebaseNotificationTokenTable from './stores/firebase-notification-token-table';
export * as VaultTable from './stores/vault-table';

export * as perpetualMarketRefresher from './loops/perpetual-market-refresher';
export * as assetRefresher from './loops/asset-refresher';
Expand Down
16 changes: 16 additions & 0 deletions indexer/packages/postgres/src/models/vault-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ export default class VaultModel extends BaseModel {
};
}

/**
* A mapping from column name to JSON conversion expected.
* See getSqlConversionForDydxModelTypes for valid conversions.
*
* TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match.
*/
static get sqlToJsonConversions() {
return {
address: 'string',
clobPairId: 'string',
status: 'string',
createdAt: 'date-time',
updatedAt: 'date-time',
};
}

address!: string;

clobPairId!: string;
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/v4-protos/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ export * from './codegen/google/protobuf/timestamp';
export * from './codegen/dydxprotocol/indexer/protocol/v1/clob';
export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount';
export * from './codegen/dydxprotocol/indexer/shared/removal_reason';
export * from './codegen/dydxprotocol/vault/vault';
export * from './utils';
150 changes: 150 additions & 0 deletions indexer/services/ender/__tests__/handlers/upsert-vault-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import {
IndexerTendermintBlock,
UpsertVaultEventV1,
VaultStatus,
} from '@dydxprotocol-indexer/v4-protos';
import {
dbHelpers,
testMocks,
testConstants,
VaultFromDatabase,
VaultTable,
VaultStatus as IndexerVaultStatus,
} from '@dydxprotocol-indexer/postgres';
import { KafkaMessage } from 'kafkajs';
import { createKafkaMessage } from '@dydxprotocol-indexer/kafka';
import { onMessage } from '../../src/lib/on-message';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import { createIndexerTendermintBlock, createIndexerTendermintEvent } from '../helpers/indexer-proto-helpers';
import {
defaultHeight,
defaultPreviousHeight,
defaultTime,
defaultTxHash,
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';

describe('upsertVaultHandler', () => {
beforeAll(async () => {
await dbHelpers.migrate();
await dbHelpers.clearData();
await createPostgresFunctions();
});

beforeEach(async () => {
await testMocks.seedData();
updateBlockCache(defaultPreviousHeight);
});

afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
});

afterAll(async () => {
await dbHelpers.teardown();
jest.resetAllMocks();
});

it('should upsert new vaults in single block', async () => {
const events: UpsertVaultEventV1[] = [
{
address: testConstants.defaultVaultAddress,
clobPairId: 0,
status: VaultStatus.VAULT_STATUS_QUOTING,
}, {
address: testConstants.defaultAddress,
clobPairId: 1,
status: VaultStatus.VAULT_STATUS_STAND_BY,
},
];
const block: IndexerTendermintBlock = createBlockFromEvents(
defaultHeight,
...events,
);
const binaryBlock: Uint8Array = IndexerTendermintBlock.encode(block).finish();
const kafkaMessage: KafkaMessage = createKafkaMessage(Buffer.from(binaryBlock));

await onMessage(kafkaMessage);

const vaults: VaultFromDatabase[] = await VaultTable.findAll(
{},
[],
{},
);
expect(vaults).toHaveLength(2);
expect(vaults[0]).toEqual({
address: testConstants.defaultVaultAddress,
clobPairId: '0',
status: IndexerVaultStatus.QUOTING,
createdAt: block.time?.toISOString(),
updatedAt: block.time?.toISOString(),
});
expect(vaults[1]).toEqual({
address: testConstants.defaultAddress,
clobPairId: '1',
status: IndexerVaultStatus.STAND_BY,
createdAt: block.time?.toISOString(),
updatedAt: block.time?.toISOString(),
});
});

it('should upsert an existing vault', async () => {
await VaultTable.create(testConstants.defaultVault);

const events: UpsertVaultEventV1[] = [
{
address: testConstants.defaultVaultAddress,
clobPairId: 0,
status: VaultStatus.VAULT_STATUS_CLOSE_ONLY,
},
];
const block: IndexerTendermintBlock = createBlockFromEvents(
defaultHeight,
...events,
);
const binaryBlock: Uint8Array = IndexerTendermintBlock.encode(block).finish();
const kafkaMessage: KafkaMessage = createKafkaMessage(Buffer.from(binaryBlock));

await onMessage(kafkaMessage);

const vaults: VaultFromDatabase[] = await VaultTable.findAll(
{},
[],
{},
);
expect(vaults).toHaveLength(1);
expect(vaults[0]).toEqual({
address: testConstants.defaultVault.address,
clobPairId: testConstants.defaultVault.clobPairId,
status: IndexerVaultStatus.CLOSE_ONLY,
createdAt: testConstants.defaultVault.createdAt,
updatedAt: block.time?.toISOString(),
});
});
});

function createBlockFromEvents(
height: number,
...events: UpsertVaultEventV1[]
): IndexerTendermintBlock {
const transactionIndex = 0;
let eventIndex = 0;

return createIndexerTendermintBlock(
height,
defaultTime,
events.map((event) => {
const indexerEvent = createIndexerTendermintEvent(
DydxIndexerSubtypes.UPSERT_VAULT,
UpsertVaultEventV1.encode(event).finish(),
transactionIndex,
eventIndex,
);
eventIndex += 1;
return indexerEvent;
}),
[defaultTxHash],
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { ParseMessageError, logger } from '@dydxprotocol-indexer/base';
import {
IndexerTendermintBlock,
IndexerTendermintEvent,
UpsertVaultEventV1,
VaultStatus,
} from '@dydxprotocol-indexer/v4-protos';
import { dbHelpers, testConstants, testMocks } from '@dydxprotocol-indexer/postgres';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import { defaultHeight, defaultTime, defaultTxHash } from '../helpers/constants';
import {
createIndexerTendermintBlock,
createIndexerTendermintEvent,
} from '../helpers/indexer-proto-helpers';
import { expectDidntLogError } from '../helpers/validator-helpers';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import { UpsertVaultValidator } from '../../src/validators/upsert-vault-validator';

describe('upsert-vault-validator', () => {
beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
});

beforeEach(async () => {
await testMocks.seedData();
jest.spyOn(logger, 'error');
});

afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
});

afterAll(async () => {
await dbHelpers.teardown();
jest.resetAllMocks();
});

describe('validate', () => {
it('does not throw error on valid uspert vault event', () => {
const event: UpsertVaultEventV1 = {
address: testConstants.defaultVaultAddress,
clobPairId: 0,
status: VaultStatus.VAULT_STATUS_QUOTING,
};
const validator: UpsertVaultValidator = new UpsertVaultValidator(
event,
createBlock(event),
0,
);

validator.validate();
expectDidntLogError();
});

it('throws error if address in event is empty', () => {
const event: UpsertVaultEventV1 = {
address: '',
clobPairId: 0,
status: VaultStatus.VAULT_STATUS_QUOTING,
};
const validator: UpsertVaultValidator = new UpsertVaultValidator(
event,
createBlock(event),
0,
);

expect(() => validator.validate()).toThrow(new ParseMessageError(
'UpsertVaultEvent address is not populated',
));
});
});
});

function createBlock(
upsertVaultEvent: UpsertVaultEventV1,
): IndexerTendermintBlock {
const event: IndexerTendermintEvent = createIndexerTendermintEvent(
DydxIndexerSubtypes.UPSERT_VAULT,
UpsertVaultEventV1.encode(upsertVaultEvent).finish(),
0,
0,
);

return createIndexerTendermintBlock(
defaultHeight,
defaultTime,
[event],
[defaultTxHash],
);
}
18 changes: 18 additions & 0 deletions indexer/services/ender/src/handlers/upsert-vault-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { UpsertVaultEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';

export class UpsertVaultHandler extends Handler<UpsertVaultEventV1> {
eventType: string = 'UpsertVaultEventV1';

public getParallelizationIds(): string[] {
return [];
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(_: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
return [];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const HANDLER_SCRIPTS: string[] = [
'dydx_transfer_handler.sql',
'dydx_update_clob_pair_handler.sql',
'dydx_update_perpetual_handler.sql',
'dydx_vault_upsert_handler.sql',
];

const DB_SETUP_SCRIPTS: string[] = [
Expand Down Expand Up @@ -91,6 +92,7 @@ const HELPER_SCRIPTS: string[] = [
'dydx_uuid_from_transaction_parts.sql',
'dydx_uuid_from_transfer_parts.sql',
'dydx_protocol_market_type_to_perpetual_market_type.sql',
'dydx_protocol_vault_status_to_vault_status.sql',
];

const MAIN_SCRIPTS: string[] = [
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/lib/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { TradingRewardsValidator } from '../validators/trading-rewards-validator
import { TransferValidator } from '../validators/transfer-validator';
import { UpdateClobPairValidator } from '../validators/update-clob-pair-validator';
import { UpdatePerpetualValidator } from '../validators/update-perpetual-validator';
import { UpsertVaultValidator } from '../validators/upsert-vault-validator';
import { Validator, ValidatorInitializer } from '../validators/validator';
import { BatchedHandlers } from './batched-handlers';
import { indexerTendermintEventToEventProtoWithType, indexerTendermintEventToTransactionIndex } from './helper';
Expand All @@ -55,13 +56,15 @@ const TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorIn
[serializeSubtypeAndVersion(DydxIndexerSubtypes.DELEVERAGING.toString(), 1)]: DeleveragingValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.LIQUIDITY_TIER.toString(), 2)]: LiquidityTierValidatorV2,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.REGISTER_AFFILIATE.toString(), 1)]: RegisterAffiliateValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPSERT_VAULT.toString(), 1)]: UpsertVaultValidator,
};

const BLOCK_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorInitializer> = {
[serializeSubtypeAndVersion(DydxIndexerSubtypes.FUNDING.toString(), 1)]: FundingValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.TRADING_REWARD.toString(), 1)]: TradingRewardsValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.STATEFUL_ORDER.toString(), 1)]: StatefulOrderValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.OPEN_INTEREST_UPDATE.toString(), 1)]: OpenInterestUpdateValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPSERT_VAULT.toString(), 1)]: UpsertVaultValidator,
};

function serializeSubtypeAndVersion(
Expand Down
10 changes: 10 additions & 0 deletions indexer/services/ender/src/lib/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
OpenInterestUpdateEventV1,
TradingRewardsEventV1,
RegisterAffiliateEventV1,
UpsertVaultEventV1,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';
Expand Down Expand Up @@ -251,6 +252,15 @@ export function indexerTendermintEventToEventProtoWithType(
blockEventIndex,
};
}
case (DydxIndexerSubtypes.UPSERT_VAULT.toString()): {
return {
type: DydxIndexerSubtypes.UPSERT_VAULT,
eventProto: UpsertVaultEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
blockEventIndex,
};
}
default: {
const message: string = `Unable to parse event subtype: ${event.subtype}`;
logger.error({
Expand Down
Loading

0 comments on commit f2fb2b9

Please sign in to comment.