Skip to content

Commit

Permalink
Merge branch 'main' into vincentc/ind-402-send-cached-stateful-order-…
Browse files Browse the repository at this point in the history
…updates
  • Loading branch information
vincentwschau committed Oct 23, 2023
2 parents fbe8b0b + aee5223 commit 2352bd2
Show file tree
Hide file tree
Showing 19 changed files with 563 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { TRADES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-ind
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { TradeMessage } from '@dydxprotocol-indexer/v4-protos';

import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';

export function contentToTradeMessage(
tradeContent: TradeContent,
Expand Down Expand Up @@ -40,3 +40,12 @@ export function createConsolidatedKafkaEventFromTrade(
message: trade,
};
}

export function createConsolidatedKafkaEventFromSubaccount(
subaccount: AnnotatedSubaccountMessage,
): ConsolidatedKafkaEvent {
return {
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
message: subaccount,
};
}
278 changes: 268 additions & 10 deletions indexer/services/ender/__tests__/lib/kafka-publisher.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,51 @@
import {
producer,
TRADES_WEBSOCKET_MESSAGE_VERSION,
KafkaTopics,
ProducerMessage,
KafkaTopics, producer, ProducerMessage, TRADES_WEBSOCKET_MESSAGE_VERSION,
} from '@dydxprotocol-indexer/kafka';
import { testConstants, TradeContent, TradeMessageContents } from '@dydxprotocol-indexer/postgres';
import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import {
FillFromDatabase,
FillTable,
FillType,
Liquidity,
OrderFromDatabase,
OrderSide,
OrderStatus,
SubaccountMessageContents,
SubaccountTable,
testConstants,
TradeContent,
TradeMessageContents,
TransferFromDatabase,
} from '@dydxprotocol-indexer/postgres';
import { IndexerSubaccountId, SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';
import { ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';
import {
AnnotatedSubaccountMessage,
ConsolidatedKafkaEvent,
SingleTradeMessage,
} from '../../src/lib/types';

import { KafkaPublisher } from '../../src/lib/kafka-publisher';
import {
defaultSubaccountMessage, defaultTradeContent, defaultTradeMessage, defaultTradeKafkaEvent,
defaultSubaccountMessage,
defaultTradeContent,
defaultTradeKafkaEvent,
defaultTradeMessage,
defaultWalletAddress,
} from '../helpers/constants';
import { contentToSingleTradeMessage, contentToTradeMessage, createConsolidatedKafkaEventFromTrade } from '../helpers/kafka-publisher-helpers';
import {
contentToSingleTradeMessage,
contentToTradeMessage,
createConsolidatedKafkaEventFromSubaccount,
createConsolidatedKafkaEventFromTrade,
} from '../helpers/kafka-publisher-helpers';
import {
generateFillSubaccountMessage,
generateOrderSubaccountMessage,
generateTransferContents,
} from '../../src/helpers/kafka-helper';
import { DateTime } from 'luxon';
import { convertToSubaccountMessage } from '../../src/lib/helper';

describe('kafka-publisher', () => {
let producerSendMock: jest.SpyInstance;
Expand Down Expand Up @@ -139,11 +170,238 @@ describe('kafka-publisher', () => {
consolidatedBeforeTrade,
]);

publisher.sortTradeEvents();
publisher.sortEvents(publisher.tradeMessages);
expect(publisher.tradeMessages).toEqual([beforeTrade, trade, afterTrade]);
});
});

describe('sortSubaccountEvents', () => {
const subaccount: AnnotatedSubaccountMessage = defaultSubaccountMessage;
const consolidatedSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(subaccount);
it.each([
[
'blockHeight',
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).minus(1).toString(),
},
{
...subaccount,
blockHeight: Big(subaccount.blockHeight).plus(1).toString(),
},
],
[
'transactionIndex',
{
...subaccount,
transactionIndex: subaccount.transactionIndex - 1,
},
{
...subaccount,
transactionIndex: subaccount.transactionIndex + 1,
},
],
[
'eventIndex',
{
...subaccount,
eventIndex: subaccount.eventIndex - 1,
},
{
...subaccount,
eventIndex: subaccount.eventIndex + 1,
},
],
])('successfully subaccounts events by %s', (
_field: string,
beforeSubaccount: AnnotatedSubaccountMessage,
afterSubaccount: AnnotatedSubaccountMessage,
) => {
const publisher: KafkaPublisher = new KafkaPublisher();
const consolidatedBeforeSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
beforeSubaccount,
);
const consolidatedAfterSubaccount:
ConsolidatedKafkaEvent = createConsolidatedKafkaEventFromSubaccount(
afterSubaccount,
);

publisher.addEvents([
consolidatedAfterSubaccount,
consolidatedSubaccount,
consolidatedBeforeSubaccount,
]);

publisher.sortEvents(publisher.subaccountMessages);
expect(publisher.subaccountMessages).toEqual([beforeSubaccount, subaccount, afterSubaccount]);
});
});

describe('aggregateFillEventsForSubaccountMessages', () => {
const fill: FillFromDatabase = {
id: FillTable.uuid(testConstants.defaultTendermintEventId, Liquidity.TAKER),
subaccountId: testConstants.defaultSubaccountId,
side: OrderSide.BUY,
liquidity: Liquidity.TAKER,
type: FillType.MARKET,
clobPairId: '1',
orderId: testConstants.defaultOrderId,
size: '10',
price: '20000',
quoteAmount: '200000',
eventId: testConstants.defaultTendermintEventId,
transactionHash: '', // TODO: Add a real transaction Hash
createdAt: testConstants.createdDateTime.toISO(),
createdAtHeight: testConstants.createdHeight,
clientMetadata: '0',
fee: '1.1',
};
const order: OrderFromDatabase = {
...testConstants.defaultOrderGoodTilBlockTime,
id: testConstants.defaultOrderId,
};

const recipientSubaccountId: IndexerSubaccountId = IndexerSubaccountId.fromPartial({
owner: 'recipient',
number: 1,
});
const deposit: TransferFromDatabase = {
id: '',
senderWalletAddress: defaultWalletAddress,
recipientSubaccountId: SubaccountTable.uuid(
recipientSubaccountId.owner,
recipientSubaccountId.number,
),
assetId: testConstants.defaultAsset.id,
size: '10',
eventId: testConstants.defaultTendermintEventId,
transactionHash: 'hash',
createdAt: DateTime.utc().toISO(),
createdAtHeight: '1',
};
it('successfully aggregates all fill events per order id and sorts messages', async () => {
const publisher: KafkaPublisher = new KafkaPublisher();

// merged with message 3.
const msg1Contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage(fill, 'BTC-USD'),
],
orders: [
generateOrderSubaccountMessage(order, 'BTC-USD'),
],
};
const message1: AnnotatedSubaccountMessage = {
blockHeight: '1',
transactionIndex: 1,
eventIndex: 1,
contents: JSON.stringify(msg1Contents),
subaccountId: {
owner: 'owner1',
number: 0,
},
version: '1',
orderId: 'order1',
isFill: true,
subaccountMessageContents: msg1Contents,
};

const msg2Contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage(fill, 'ETH-USD'),
],
};
const message2: AnnotatedSubaccountMessage = {
...message1,
transactionIndex: 2,
contents: JSON.stringify(msg2Contents),
orderId: 'order2',
subaccountMessageContents: msg2Contents,
};

const msg3Contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage({
...fill,
size: '100',
}, 'BTC-USD'),
],
orders: [
generateOrderSubaccountMessage({
...order,
status: OrderStatus.FILLED,
}, 'BTC-USD'),
],
};
const message3: AnnotatedSubaccountMessage = {
...message1,
transactionIndex: 3,
contents: JSON.stringify(msg3Contents),
subaccountMessageContents: msg3Contents,
};

// non-fill subaccount message.
const msg4Contents: SubaccountMessageContents = generateTransferContents(
deposit,
testConstants.defaultAsset,
recipientSubaccountId,
undefined,
recipientSubaccountId,
);
const message4: AnnotatedSubaccountMessage = {
...message1,
eventIndex: 4,
orderId: undefined,
isFill: undefined,
contents: JSON.stringify(msg4Contents),
};

const expectedMergedContents: SubaccountMessageContents = {
fills: [
msg1Contents.fills![0],
msg3Contents.fills![0],
],
orders: [
msg3Contents.orders![0],
],
};
const mergedMessage3: AnnotatedSubaccountMessage = {
...message3,
contents: JSON.stringify(expectedMergedContents),
subaccountMessageContents: expectedMergedContents,
};

publisher.addEvents([
createConsolidatedKafkaEventFromSubaccount(message1),
createConsolidatedKafkaEventFromSubaccount(message2),
createConsolidatedKafkaEventFromSubaccount(message3),
createConsolidatedKafkaEventFromSubaccount(message4),
]);

publisher.aggregateFillEventsForSubaccountMessages();
const expectedMsgs: SubaccountMessage[] = [
convertToSubaccountMessage(message4),
convertToSubaccountMessage(message2),
convertToSubaccountMessage(mergedMessage3),
];
expect(publisher.subaccountMessages).toEqual(expectedMsgs);

await publisher.publish();

expect(producerSendMock).toHaveBeenCalledTimes(1);
expect(producerSendMock).toHaveBeenCalledWith({
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: _.map(expectedMsgs, (message: SubaccountMessage) => {
return {
value: Buffer.from(Uint8Array.from(SubaccountMessage.encode(message).finish())),
};
}),
});
});
});

describe('groupKafkaTradesByClobPairId', () => {
it('successfully groups kafka trade messages', () => {
const kafkaPublisher: KafkaPublisher = new KafkaPublisher();
Expand Down
14 changes: 11 additions & 3 deletions indexer/services/ender/src/handlers/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ import {
TRADES_WEBSOCKET_MESSAGE_VERSION,
KafkaTopics,
} from '@dydxprotocol-indexer/kafka';
import { SubaccountMessageContents } from '@dydxprotocol-indexer/postgres';
import {
IndexerTendermintBlock,
IndexerTendermintEvent,
MarketMessage,
OffChainUpdateV1,
SubaccountId,
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import { DateTime } from 'luxon';

import config from '../config';
import { indexerTendermintEventToTransactionIndex } from '../lib/helper';
import { ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage } from '../lib/types';
import {
AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, EventMessage, SingleTradeMessage,
} from '../lib/types';

export type HandlerInitializer = new (
block: IndexerTendermintBlock,
Expand Down Expand Up @@ -103,9 +105,12 @@ export abstract class Handler<T> {
protected generateConsolidatedSubaccountKafkaEvent(
contents: string,
subaccountId: SubaccountId,
orderId?: string,
isFill?: boolean,
subaccountMessageContents?: SubaccountMessageContents,
): ConsolidatedKafkaEvent {
stats.increment(`${config.SERVICE_NAME}.create_subaccount_kafka_event`, 1);
const subaccountMessage: SubaccountMessage = {
const subaccountMessage: AnnotatedSubaccountMessage = {
blockHeight: this.block.height.toString(),
transactionIndex: indexerTendermintEventToTransactionIndex(
this.indexerTendermintEvent,
Expand All @@ -114,6 +119,9 @@ export abstract class Handler<T> {
contents,
subaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
orderId,
isFill,
subaccountMessageContents,
};

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ export abstract class AbstractOrderFillHandler<T> extends Handler<T> {
return this.generateConsolidatedSubaccountKafkaEvent(
JSON.stringify(message),
subaccountIdProto,
order?.id,
true,
message,
);
}

Expand Down
Loading

0 comments on commit 2352bd2

Please sign in to comment.