Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
hwray committed Nov 22, 2024
1 parent 395f448 commit 42d7537
Show file tree
Hide file tree
Showing 16 changed files with 275 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import {
defaultPreviousHeight,
defaultTime,
defaultTxHash,
defaultUpdatePerpetualEvent,
defaultUpdatePerpetualEventV1,
defaultUpdatePerpetualEventV2,
} from '../helpers/constants';
import {
IndexerTendermintBlock,
IndexerTendermintEvent,
Timestamp,
UpdatePerpetualEventV1,
UpdatePerpetualEventV2,
} from '@dydxprotocol-indexer/v4-protos';
import {
createIndexerTendermintBlock,
Expand Down Expand Up @@ -57,14 +59,29 @@ describe('update-perpetual-handler', () => {
jest.resetAllMocks();
});

describe('getParallelizationIds', () => {
describe.each([
[
'UpdatePerpetualEventV1',
UpdatePerpetualEventV1.encode(defaultUpdatePerpetualEventV1).finish(),
defaultUpdatePerpetualEventV1,
],
[
'PerpetualMarketCreateEventV2',
UpdatePerpetualEventV2.encode(defaultUpdatePerpetualEventV2).finish(),
defaultUpdatePerpetualEventV2,
],
])('%s', (
_name: string,
updatePerpetualEventBytes: Uint8Array,
event: UpdatePerpetualEventV1 | UpdatePerpetualEventV2,
) => {
it('returns the correct parallelization ids', () => {
const transactionIndex: number = 0;
const eventIndex: number = 0;

const indexerTendermintEvent: IndexerTendermintEvent = createIndexerTendermintEvent(
DydxIndexerSubtypes.UPDATE_PERPETUAL,
UpdatePerpetualEventV1.encode(defaultUpdatePerpetualEvent).finish(),
updatePerpetualEventBytes,
transactionIndex,
eventIndex,
);
Expand All @@ -80,40 +97,41 @@ describe('update-perpetual-handler', () => {
0,
indexerTendermintEvent,
0,
defaultUpdatePerpetualEvent,
event,
);

expect(handler.getParallelizationIds()).toEqual([]);
});
});

it('updates an existing perpetual market', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdatePerpetualEvent({
updatePerpetualEvent: defaultUpdatePerpetualEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);
it('updates an existing perpetual market', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdatePerpetualEvent({
updatePerpetualEvent: event,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const perpetualMarket:
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
defaultUpdatePerpetualEvent.id.toString(),
);
expect(perpetualMarket).toEqual(expect.objectContaining({
id: defaultUpdatePerpetualEvent.id.toString(),
ticker: defaultUpdatePerpetualEvent.ticker,
marketId: defaultUpdatePerpetualEvent.marketId,
atomicResolution: defaultUpdatePerpetualEvent.atomicResolution,
liquidityTierId: defaultUpdatePerpetualEvent.liquidityTier,
}));
expect(perpetualMarket).toEqual(
perpetualMarketRefresher.getPerpetualMarketFromId(
defaultUpdatePerpetualEvent.id.toString()));
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
const perpetualMarket:
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
event.id.toString(),
);
// TODO new fields
expect(perpetualMarket).toEqual(expect.objectContaining({
id: event.id.toString(),
ticker: event.ticker,
marketId: event.marketId,
atomicResolution: event.atomicResolution,
liquidityTierId: event.liquidityTier,
}));
expect(perpetualMarket).toEqual(
perpetualMarketRefresher.getPerpetualMarketFromId(
event.id.toString()));
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
});
});

Expand Down
14 changes: 12 additions & 2 deletions indexer/services/ender/__tests__/helpers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
TransferEventV1,
UpdateClobPairEventV1,
UpdatePerpetualEventV1,
UpdatePerpetualEventV2,
OpenInterestUpdateEventV1,
OpenInterestUpdate,
} from '@dydxprotocol-indexer/v4-protos';
Expand Down Expand Up @@ -191,14 +192,23 @@ export const defaultOpenInterestUpdateEvent: OpenInterestUpdateEventV1 = {
openInterestUpdates: [defaultOpenInterestUpdate1, defaultOpenInterestUpdate2],
};

export const defaultUpdatePerpetualEvent: UpdatePerpetualEventV1 = {
export const defaultUpdatePerpetualEventV1: UpdatePerpetualEventV1 = {
id: 0,
ticker: 'BTC-USD2',
marketId: 1,
atomicResolution: -8,
liquidityTier: 1,
};

export const defaultUpdatePerpetualEventV2: UpdatePerpetualEventV2 = {
id: 0,
ticker: 'BTC-USD2',
marketId: 1,
atomicResolution: -8,
liquidityTier: 1,
marketType: PerpetualMarketType.PERPETUAL_MARKET_TYPE_CROSS,
};

export const defaultUpdateClobPairEvent: UpdateClobPairEventV1 = {
clobPairId: 1,
status: ClobPairStatus.CLOB_PAIR_STATUS_ACTIVE,
Expand Down Expand Up @@ -397,7 +407,7 @@ export const defaultTradeMessage: SingleTradeMessage = contentToSingleTradeMessa
testConstants.defaultPerpetualMarket.clobPairId,
);
export const defaultTradeKafkaEvent:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromTrade(defaultTradeMessage);
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromTrade(defaultTradeMessage);

export const defaultStatefulOrderPlacementEvent: StatefulOrderEventV1 = {
orderPlace: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import {
IndexerTendermintBlock,
IndexerTendermintEvent,
UpdatePerpetualEventV1,
UpdatePerpetualEventV2,
} from '@dydxprotocol-indexer/v4-protos';
import {
dbHelpers, testMocks, perpetualMarketRefresher,
} from '@dydxprotocol-indexer/postgres';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import {
defaultUpdatePerpetualEvent,
defaultUpdatePerpetualEventV1,
defaultUpdatePerpetualEventV2,
defaultHeight,
defaultTime,
defaultTxHash,
Expand Down Expand Up @@ -42,11 +44,26 @@ describe('update-perpetual-validator', () => {
await dbHelpers.teardown();
});

describe('validate', () => {
describe.each([
[
'UpdatePerpetualEventV1',
UpdatePerpetualEventV1.encode(defaultUpdatePerpetualEventV1).finish(),
defaultUpdatePerpetualEventV1,
],
[
'PerpetualMarketCreateEventV2',
UpdatePerpetualEventV2.encode(defaultUpdatePerpetualEventV2).finish(),
defaultUpdatePerpetualEventV2,
],
])('%s', (
_name: string,
updatePerpetualEventBytes: Uint8Array,
event: UpdatePerpetualEventV1 | UpdatePerpetualEventV2,
) => {
it('does not throw error on valid perpetual market create event', () => {
const validator: UpdatePerpetualValidator = new UpdatePerpetualValidator(
defaultUpdatePerpetualEvent,
createBlock(defaultUpdatePerpetualEvent),
event,
createBlock(updatePerpetualEventBytes),
0,
);

Expand All @@ -57,10 +74,10 @@ describe('update-perpetual-validator', () => {
it('throws error if id does not correspond to an existing perpetual market', () => {
const validator: UpdatePerpetualValidator = new UpdatePerpetualValidator(
{
...defaultUpdatePerpetualEvent,
...event,
id: 20,
},
createBlock(defaultUpdatePerpetualEvent),
createBlock(updatePerpetualEventBytes),
0,
);

Expand All @@ -72,11 +89,11 @@ describe('update-perpetual-validator', () => {
});

function createBlock(
updatePerpetualEvent: UpdatePerpetualEventV1,
updatePerpetualEventBytes: Uint8Array,
): IndexerTendermintBlock {
const event: IndexerTendermintEvent = createIndexerTendermintEvent(
DydxIndexerSubtypes.UPDATE_PERPETUAL,
UpdatePerpetualEventV1.encode(updatePerpetualEvent).finish(),
updatePerpetualEventBytes,
0,
0,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import {
perpetualMarketRefresher,
PerpetualMarketModel,
} from '@dydxprotocol-indexer/postgres';
import { UpdatePerpetualEventV1 } from '@dydxprotocol-indexer/v4-protos';
import { UpdatePerpetualEventV1, UpdatePerpetualEventV2 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { generatePerpetualMarketMessage } from '../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';

export class UpdatePerpetualHandler extends Handler<UpdatePerpetualEventV1> {
eventType: string = 'UpdatePerpetualEventV1';
export class UpdatePerpetualHandler extends Handler<
UpdatePerpetualEventV1 | UpdatePerpetualEventV2> {
eventType: string = 'UpdatePerpetualEvent';

public getParallelizationIds(): string[] {
return [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const HANDLER_SCRIPTS: string[] = [
'dydx_trading_rewards_handler.sql',
'dydx_transfer_handler.sql',
'dydx_update_clob_pair_handler.sql',
'dydx_update_perpetual_handler.sql',
'dydx_update_perpetual_v1_handler.sql',
'dydx_update_perpetual_v2_handler.sql',
'dydx_vault_upsert_handler.sql',
];

Expand Down
13 changes: 7 additions & 6 deletions indexer/services/ender/src/lib/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorIn
[serializeSubtypeAndVersion(DydxIndexerSubtypes.PERPETUAL_MARKET.toString(), 2)]: PerpetualMarketValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.LIQUIDITY_TIER.toString(), 1)]: LiquidityTierValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPDATE_PERPETUAL.toString(), 1)]: UpdatePerpetualValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPDATE_PERPETUAL.toString(), 2)]: UpdatePerpetualValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString(), 1)]: UpdateClobPairValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.DELEVERAGING.toString(), 1)]: DeleveragingValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.LIQUIDITY_TIER.toString(), 2)]: LiquidityTierValidatorV2,
Expand Down Expand Up @@ -146,10 +147,10 @@ export class BlockProcessor {
const event: IndexerTendermintEvent = this.block.events[i];
const transactionIndex: number = indexerTendermintEventToTransactionIndex(event);
const eventProtoWithType:
EventProtoWithTypeAndVersion | undefined = indexerTendermintEventToEventProtoWithType(
i,
event,
);
EventProtoWithTypeAndVersion | undefined = indexerTendermintEventToEventProtoWithType(
i,
event,
);
if (eventProtoWithType === undefined) {
continue;
}
Expand Down Expand Up @@ -192,12 +193,12 @@ export class BlockProcessor {
validatorMap: Record<string, ValidatorInitializer>,
): void {
const Initializer:
ValidatorInitializer | undefined = validatorMap[
ValidatorInitializer | undefined = validatorMap[
serializeSubtypeAndVersion(
eventProto.type,
eventProto.version,
)
];
];

if (Initializer === undefined) {
const message: string = `cannot process subtype ${eventProto.type} and version ${eventProto.version}`;
Expand Down
32 changes: 25 additions & 7 deletions indexer/services/ender/src/lib/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
LiquidityTierUpsertEventV1,
LiquidityTierUpsertEventV2,
UpdatePerpetualEventV1,
UpdatePerpetualEventV2,
UpdateClobPairEventV1,
SubaccountMessage,
DeleveragingEventV1,
Expand Down Expand Up @@ -198,13 +199,30 @@ export function indexerTendermintEventToEventProtoWithType(
};
}
case (DydxIndexerSubtypes.UPDATE_PERPETUAL.toString()): {
return {
type: DydxIndexerSubtypes.UPDATE_PERPETUAL,
eventProto: UpdatePerpetualEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
blockEventIndex,
};
if (version === 1) {
return {
type: DydxIndexerSubtypes.UPDATE_PERPETUAL,
eventProto: UpdatePerpetualEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
blockEventIndex,
};
} else if (version === 2) {
return {
type: DydxIndexerSubtypes.UPDATE_PERPETUAL,
eventProto: UpdatePerpetualEventV2.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
blockEventIndex,
};
} else {
const message: string = `Invalid version for update perpetual event: ${version}`;
logger.error({
at: 'helpers#indexerTendermintEventToEventWithType',
message,
});
return undefined;
}
}
case (DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString()): {
return {
Expand Down
9 changes: 8 additions & 1 deletion indexer/services/ender/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
LiquidityTierUpsertEventV1,
LiquidityTierUpsertEventV2,
UpdatePerpetualEventV1,
UpdatePerpetualEventV2,
UpdateClobPairEventV1,
DeleveragingEventV1,
TradingRewardsEventV1,
Expand Down Expand Up @@ -145,6 +146,12 @@ export type EventProtoWithTypeAndVersion = {
indexerTendermintEvent: IndexerTendermintEvent,
version: number,
blockEventIndex: number,
} | {
type: DydxIndexerSubtypes.UPDATE_PERPETUAL,
eventProto: UpdatePerpetualEventV2,
indexerTendermintEvent: IndexerTendermintEvent,
version: number,
blockEventIndex: number,
} | {
type: DydxIndexerSubtypes.UPDATE_CLOB_PAIR,
eventProto: UpdateClobPairEventV1,
Expand Down Expand Up @@ -227,7 +234,7 @@ export type FundingEventMessage = {

export type SumFields = PerpetualPositionColumns.sumOpen | PerpetualPositionColumns.sumClose;
export type PriceFields = PerpetualPositionColumns.entryPrice |
PerpetualPositionColumns.exitPrice;
PerpetualPositionColumns.exitPrice;

export type OrderFillEventWithLiquidity = {
event: OrderFillEventV1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ BEGIN
WHEN '"liquidity_tier"'::jsonb THEN
rval[i] = dydx_liquidity_tier_handler(event_data);
WHEN '"update_perpetual"'::jsonb THEN
rval[i] = dydx_update_perpetual_handler(event_data);
CASE (event_->'version')::int
WHEN 1 THEN
rval[i] = dydx_update_perpetual_v1_handler(event_data);
WHEN 2 THEN
rval[i] = dydx_update_perpetual_v2_handler(event_data);
ELSE
NULL;
END CASE;
WHEN '"update_clob_pair"'::jsonb THEN
rval[i] = dydx_update_clob_pair_handler(event_data);
WHEN '"funding_values"'::jsonb THEN
Expand Down
Loading

0 comments on commit 42d7537

Please sign in to comment.