Skip to content

Commit

Permalink
Add subaccount id as kafka key for subaccount messages
Browse files Browse the repository at this point in the history
  • Loading branch information
roy-dydx committed Aug 23, 2024
1 parent 852995c commit f321bd3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 0 deletions.
6 changes: 6 additions & 0 deletions indexer/services/ender/src/lib/kafka-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { FillSubaccountMessageContents, TradeMessageContents } from '@dydxprotoc
import {
BlockHeightMessage,
CandleMessage,
IndexerSubaccountId,
MarketMessage,
OffChainUpdateV1,
SubaccountMessage,
Expand Down Expand Up @@ -220,10 +221,15 @@ export class KafkaPublisher {

if (this.subaccountMessages.length > 0) {
this.aggregateFillEventsForSubaccountMessages();

allTopicKafkaMessages.push({
topic: KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: _.map(this.subaccountMessages, (message: SubaccountMessage) => {
return {
key: message.subaccountId !== undefined
? Buffer.from(Uint8Array.from(
IndexerSubaccountId.encode(message.subaccountId).finish(),
)) : undefined,
value: Buffer.from(Uint8Array.from(SubaccountMessage.encode(message).finish())),
};
}),
Expand Down
4 changes: 4 additions & 0 deletions indexer/services/vulcan/src/handlers/order-place-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
import { getOrderIdHash, isStatefulOrder, ORDER_FLAG_SHORT_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import {
IndexerOrder,
IndexerSubaccountId,
OffChainUpdateV1,
OrderPlaceV1,
OrderPlaceV1_OrderPlacementStatus,
Expand Down Expand Up @@ -116,6 +117,9 @@ export class OrderPlaceHandler extends Handler {
await this.sendCachedOrderUpdate(orderUuid, headers);
}
const subaccountMessage: Message = {
key: Buffer.from(Uint8Array.from(
IndexerSubaccountId.encode(redisOrder.order!.orderId!.subaccountId!).finish(),
)),
value: createSubaccountWebsocketMessage(
redisOrder,
dbOrder,
Expand Down
10 changes: 10 additions & 0 deletions indexer/services/vulcan/src/handlers/order-remove-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
import {
OffChainUpdateV1,
IndexerOrder,
IndexerSubaccountId,
OrderRemoveV1,
OrderRemovalReason,
OrderRemoveV1_OrderRemovalStatus,
Expand Down Expand Up @@ -220,6 +221,9 @@ export class OrderRemoveHandler extends Handler {
}

const subaccountMessage: Message = {
key: Buffer.from(Uint8Array.from(
IndexerSubaccountId.encode(orderRemove.removedOrderId!.subaccountId!).finish(),
)),
value: this.createSubaccountWebsocketMessageFromPostgresOrder(
order,
orderRemove,
Expand Down Expand Up @@ -282,6 +286,9 @@ export class OrderRemoveHandler extends Handler {
this.generateTimingStatsOptions('find_order'),
);
const subaccountMessage: Message = {
key: Buffer.from(Uint8Array.from(
IndexerSubaccountId.encode(orderRemove.removedOrderId!.subaccountId!).finish(),
)),
value: this.createSubaccountWebsocketMessageFromOrderRemoveMessage(
canceledOrder,
orderRemove,
Expand Down Expand Up @@ -321,6 +328,9 @@ export class OrderRemoveHandler extends Handler {
}

const subaccountMessage: Message = {
key: Buffer.from(Uint8Array.from(
IndexerSubaccountId.encode(orderRemove.removedOrderId!.subaccountId!).finish(),
)),
value: this.createSubaccountWebsocketMessageFromRemoveOrderResult(
removeOrderResult,
canceledOrder,
Expand Down

0 comments on commit f321bd3

Please sign in to comment.