Skip to content

Commit

Permalink
[IND-402] Cache and send order updates for stateful orders. (#683)
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentwschau authored and Crystal Lemire committed Oct 26, 2023
1 parent 5d6d407 commit 5878a43
Show file tree
Hide file tree
Showing 8 changed files with 368 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
IndexerOrder,
OrderPlaceV1_OrderPlacementStatus,
StatefulOrderEventV1,
OrderUpdateV1,
} from '@dydxprotocol-indexer/v4-protos';
import { KafkaMessage } from 'kafkajs';
import { onMessage } from '../../../src/lib/on-message';
Expand All @@ -45,6 +46,10 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
import { producer } from '@dydxprotocol-indexer/kafka';
import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import {
redis, redisTestConstants, StatefulOrderUpdateInfo, StatefulOrderUpdatesCache,
} from '@dydxprotocol-indexer/redis';
import { redisClient } from '../../../src/helpers/redis/redis-controller';

describe('statefulOrderPlacementHandler', () => {
beforeAll(async () => {
Expand All @@ -64,6 +69,7 @@ describe('statefulOrderPlacementHandler', () => {

afterEach(async () => {
await dbHelpers.clearData();
await redis.deleteAllAsync(redisClient);
jest.clearAllMocks();
});

Expand Down Expand Up @@ -138,15 +144,48 @@ describe('statefulOrderPlacementHandler', () => {

it.each([
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
['stateful order placement', defaultStatefulOrderEvent],
['stateful long term order placement', defaultStatefulOrderLongTermEvent],
[
'stateful order placement and no cached update',
defaultStatefulOrderEvent,
undefined,
],
[
'stateful long term order placement and no cached update',
defaultStatefulOrderLongTermEvent,
undefined,
],
[
'stateful order placement and cached update',
defaultStatefulOrderEvent,
{
...redisTestConstants.orderUpdate.orderUpdate,
orderId: defaultOrder.orderId,
},
],
[
'stateful long term order placement and cached update',
defaultStatefulOrderLongTermEvent,
{
...redisTestConstants.orderUpdate.orderUpdate,
orderId: defaultOrder.orderId,
},
],
])('successfully places order with %s', async (
_name: string,
statefulOrderEvent: StatefulOrderEventV1,
cachedOrderUpdate: OrderUpdateV1 | undefined,
) => {
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
statefulOrderEvent,
);
if (cachedOrderUpdate !== undefined) {
await StatefulOrderUpdatesCache.addStatefulOrderUpdate(
orderId,
cachedOrderUpdate,
Date.now(),
redisClient,
);
}

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);
Expand Down Expand Up @@ -185,6 +224,25 @@ describe('statefulOrderPlacementHandler', () => {
orderId: defaultOrder.orderId!,
offchainUpdate: expectedOffchainUpdate,
});

// If there was a cached order update, expect the cache to be empty and a corresponding
// off-chain update to have been sent to the Kafka producer
if (cachedOrderUpdate !== undefined) {
const orderUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache
.getOldOrderUpdates(
Date.now(),
redisClient,
);
expect(orderUpdates).toHaveLength(0);

expectVulcanKafkaMessage({
producerSendMock,
orderId: defaultOrder.orderId!,
offchainUpdate: {
orderUpdate: cachedOrderUpdate,
},
});
}
});

it.each([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import {
perpetualMarketRefresher,
OrderStatus,
} from '@dydxprotocol-indexer/postgres';
import { StatefulOrderUpdatesCache } from '@dydxprotocol-indexer/redis';
import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser';
import {
OrderPlaceV1_OrderPlacementStatus,
OffChainUpdateV1,
IndexerOrder,
StatefulOrderEventV1,
OrderUpdateV1,
} from '@dydxprotocol-indexer/v4-protos';

import { redisClient } from '../../helpers/redis/redis-controller';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';

Expand Down Expand Up @@ -60,18 +63,34 @@ export class StatefulOrderPlacementHandler extends
this.generateTimingStatsOptions('upsert_order'),
);

const kafakEvents: ConsolidatedKafkaEvent[] = [];

const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({
orderPlace: {
order,
placementStatus: OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED,
},
});
kafakEvents.push(this.generateConsolidatedVulcanKafkaEvent(
getOrderIdHash(order.orderId!),
offChainUpdate,
));

return [
this.generateConsolidatedVulcanKafkaEvent(
const pendingOrderUpdate: OrderUpdateV1 | undefined = await StatefulOrderUpdatesCache
.removeStatefulOrderUpdate(
OrderTable.orderIdToUuid(order.orderId!),
Date.now(),
redisClient,
);
if (pendingOrderUpdate !== undefined) {
kafakEvents.push(this.generateConsolidatedVulcanKafkaEvent(
getOrderIdHash(order.orderId!),
offChainUpdate,
),
];
OffChainUpdateV1.fromPartial({
orderUpdate: pendingOrderUpdate,
}),
));
}

return kafakEvents;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
StatefulOrderUpdateInfo,
StatefulOrderUpdatesCache,
redis,
redisTestConstants,
} from '@dydxprotocol-indexer/redis';
import config from '../../src/config';
import removeOldOrderUpdatesTask from '../../src/tasks/remove-old-order-updates';
import { redisClient } from '../../src/helpers/redis';

describe('remove-old-order-updates', () => {
const fakeTime: Date = new Date(2023, 9, 25, 0, 0, 0, 0);

beforeAll(() => {
jest.useFakeTimers().setSystemTime(fakeTime);
});

afterAll(() => {
jest.resetAllMocks();
jest.useRealTimers();
});

beforeEach(() => {
jest.spyOn(stats, 'gauge');
jest.clearAllMocks();
});

afterEach(async () => {
await redis.deleteAllAsync(redisClient);
jest.clearAllMocks();
});

it('succeeds with no cached order updates', async () => {
await removeOldOrderUpdatesTask();
expect(stats.gauge).toHaveBeenCalledWith(
`${config.SERVICE_NAME}.remove_old_order_updates.num_removed`,
0,
);
});

it('succeeds with no old cached order updates', async () => {
await StatefulOrderUpdatesCache.addStatefulOrderUpdate(
redisTestConstants.defaultOrderUuidGoodTilBlockTime,
redisTestConstants.orderUpdate.orderUpdate,
fakeTime.getTime() - 1,
redisClient,
);
const existingUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache
.getOldOrderUpdates(
fakeTime.getTime(),
redisClient,
);
expect(existingUpdates).toHaveLength(1);

await removeOldOrderUpdatesTask();
expect(stats.gauge).toHaveBeenCalledWith(
`${config.SERVICE_NAME}.remove_old_order_updates.num_removed`,
0,
);

const updatesAfterTask: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache
.getOldOrderUpdates(
fakeTime.getTime(),
redisClient,
);
expect(updatesAfterTask).toEqual(existingUpdates);
});

it('succeeds with no old cached order updates', async () => {
await StatefulOrderUpdatesCache.addStatefulOrderUpdate(
redisTestConstants.defaultOrderUuidGoodTilBlockTime,
redisTestConstants.orderUpdate.orderUpdate,
fakeTime.getTime() - 1,
redisClient,
);
const existingUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache
.getOldOrderUpdates(
fakeTime.getTime(),
redisClient,
);
expect(existingUpdates).toHaveLength(1);

await removeOldOrderUpdatesTask();
expect(stats.gauge).toHaveBeenCalledWith(
`${config.SERVICE_NAME}.remove_old_order_updates.num_removed`,
0,
);

const updatesAfterTask: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache
.getOldOrderUpdates(
fakeTime.getTime(),
redisClient,
);
expect(updatesAfterTask).toEqual(existingUpdates);
});

it('succeeds removing old cached order update', async () => {
await StatefulOrderUpdatesCache.addStatefulOrderUpdate(
redisTestConstants.defaultOrderUuidGoodTilBlockTime,
redisTestConstants.orderUpdate.orderUpdate,
fakeTime.getTime() - config.OLD_CACHED_ORDER_UPDATES_WINDOW_MS,
redisClient,
);
const existingUpdates: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache
.getOldOrderUpdates(
fakeTime.getTime(),
redisClient,
);
expect(existingUpdates).toHaveLength(1);

await removeOldOrderUpdatesTask();
expect(stats.gauge).toHaveBeenCalledWith(
`${config.SERVICE_NAME}.remove_old_order_updates.num_removed`,
1,
);

const updatesAfterTask: StatefulOrderUpdateInfo[] = await StatefulOrderUpdatesCache
.getOldOrderUpdates(
fakeTime.getTime(),
redisClient,
);
expect(updatesAfterTask).toHaveLength(0);
});
});
7 changes: 7 additions & 0 deletions indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export const configSchema = {
LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }),
LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: true }),
LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }),
LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }),

// Loop Timing
LOOPS_INTERVAL_MS_MARKET_UPDATER: parseInteger({
Expand Down Expand Up @@ -70,6 +71,9 @@ export const configSchema = {
LOOPS_INTERVAL_MS_TRACK_LAG: parseInteger({
default: TEN_SECONDS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_REMOVE_OLD_ORDER_UPDATES: parseInteger({
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),

// Start delay
START_DELAY_ENABLED: parseBoolean({ default: true }),
Expand Down Expand Up @@ -119,6 +123,9 @@ export const configSchema = {
MAX_COMPLIANCE_DATA_QUERY_PER_LOOP: parseInteger({ default: 100 }),
COMPLIANCE_PROVIDER_QUERY_BATCH_SIZE: parseInteger({ default: 100 }),
COMPLIANCE_PROVIDER_QUERY_DELAY_MS: parseInteger({ default: ONE_SECOND_IN_MILLISECONDS }),

// Remove old cached order updates
OLD_CACHED_ORDER_UPDATES_WINDOW_MS: parseInteger({ default: 30 * ONE_SECOND_IN_MILLISECONDS }),
};

export default parseSchema(configSchema);
9 changes: 9 additions & 0 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels';
import marketUpdaterTask from './tasks/market-updater';
import orderbookInstrumentationTask from './tasks/orderbook-instrumentation';
import removeExpiredOrdersTask from './tasks/remove-expired-orders';
import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates';
import trackLag from './tasks/track-lag';
import updateComplianceDataTask from './tasks/update-compliance-data';
import updateResearchEnvironmentTask from './tasks/update-research-environment';
Expand Down Expand Up @@ -112,6 +113,14 @@ async function start(): Promise<void> {
);
}

if (config.LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES) {
startLoop(
removeOldOrderUpdatesTask,
'remove_old_order_updates',
config.LOOPS_INTERVAL_MS_REMOVE_OLD_ORDER_UPDATES,
);
}

logger.info({
at: 'index',
message: 'Successfully started',
Expand Down
Loading

0 comments on commit 5878a43

Please sign in to comment.