From 1a79d2179e12304f0e7d9e9b79ff15d44a6edccc Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Thu, 19 Oct 2023 23:02:10 -0400 Subject: [PATCH 1/4] [IND-402] Add cache to store order updates for stateful orders. --- .../stateful-order-updates-cache.test.ts | 164 ++++++++++++++++++ indexer/packages/redis/src/caches/scripts.ts | 4 + .../caches/stateful-order-updates-cache.ts | 127 ++++++++++++++ .../src/scripts/add_stateful_order_update.lua | 16 ++ .../scripts/remove_stateful_order_update.lua | 41 +++++ indexer/packages/redis/src/types.ts | 6 + 6 files changed, 358 insertions(+) create mode 100644 indexer/packages/redis/__tests__/caches/stateful-order-updates-cache.test.ts create mode 100644 indexer/packages/redis/src/caches/stateful-order-updates-cache.ts create mode 100644 indexer/packages/redis/src/scripts/add_stateful_order_update.lua create mode 100644 indexer/packages/redis/src/scripts/remove_stateful_order_update.lua diff --git a/indexer/packages/redis/__tests__/caches/stateful-order-updates-cache.test.ts b/indexer/packages/redis/__tests__/caches/stateful-order-updates-cache.test.ts new file mode 100644 index 0000000000..a9767efbf9 --- /dev/null +++ b/indexer/packages/redis/__tests__/caches/stateful-order-updates-cache.test.ts @@ -0,0 +1,164 @@ +import { deleteAllAsync } from '../../src/helpers/redis'; +import { redis as client } from '../helpers/utils'; +import { + addStatefulOrderUpdate, + removeStatefulOrderUpdate, + getOldOrderUpdates, +} from '../../src/caches/stateful-order-updates-cache'; +import { IndexerOrderId, OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos'; +import Long from 'long'; +import { orderId } from './constants'; +import { OrderTable } from '@dydxprotocol-indexer/postgres'; +import { StatefulOrderUpdateInfo } from 'packages/redis/src'; + +describe('statefulOrderUpdatesCache', () => { + const orderUpdate: OrderUpdateV1 = { + orderId, + totalFilledQuantums: Long.fromNumber(100, true), + }; + const orderUuid: string = OrderTable.orderIdToUuid(orderId); + const initialTimestamp: number = Date.now(); + const olderTimestamp: number = initialTimestamp - 10; + const newerTimestamp: number = initialTimestamp + 10; + + beforeEach(async () => { + await deleteAllAsync(client); + }); + + afterEach(async () => { + await deleteAllAsync(client); + }); + + describe('addStatefulOrderUpdate', () => { + it('adds stateful order update to cache', async () => { + await addStatefulOrderUpdate( + orderUuid, + orderUpdate, + initialTimestamp, + client, + ); + + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + newerTimestamp, client, + ); + const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( + orderUuid, initialTimestamp, client, + ); + + expect(statefulOrderUpdateInfo).toHaveLength(1); + expect(statefulOrderUpdateInfo[0]).toEqual({ + orderId: orderUuid, + timestamp: initialTimestamp, + }); + expect(removedUpdate).toBeDefined(); + expect(removedUpdate).toEqual(orderUpdate); + }); + }); + + describe('removeStatefulorderUpdate', () => { + it('removes and returns existing stateful order update from cache', async () => { + await addStatefulOrderUpdate( + orderUuid, + orderUpdate, + initialTimestamp, + client, + ); + + const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( + orderUuid, initialTimestamp, client, + ); + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + newerTimestamp, client, + ); + + expect(removedUpdate).toBeDefined(); + expect(removedUpdate).toEqual(orderUpdate); + expect(statefulOrderUpdateInfo).toHaveLength(0); + }); + + it('does not remove existing stateful order update if timestamp is lower', async () => { + await addStatefulOrderUpdate( + orderUuid, + orderUpdate, + initialTimestamp, + client, + ); + + const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( + orderUuid, olderTimestamp, client, + ); + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + newerTimestamp, client, + ); + + expect(removedUpdate).toBeUndefined(); + expect(statefulOrderUpdateInfo).toHaveLength(1); + expect(statefulOrderUpdateInfo[0]).toEqual({ + orderId: orderUuid, + timestamp: initialTimestamp, + }); + }); + + it('removes non-existing order and returns undefined', async () => { + const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( + orderUuid, initialTimestamp, client, + ); + + expect(removedUpdate).toBeUndefined(); + }); + }); + + describe('getOldOrderUpdates', () => { + const orderId2: IndexerOrderId = { + ...orderId, + clientId: 45, + }; + const orderUuid2: string = OrderTable.orderIdToUuid(orderId2); + const orderUpdate2: OrderUpdateV1 = { + ...orderUpdate, + orderId: orderId2, + }; + + beforeEach(async () => { + await Promise.all([ + addStatefulOrderUpdate( + orderUuid, + orderUpdate, + initialTimestamp, + client, + ), + addStatefulOrderUpdate( + orderUuid2, + orderUpdate2, + olderTimestamp, + client, + )], + ); + }); + + it('returns stateful order update info older than the threshold', async () => { + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + olderTimestamp, client, + ); + + expect(statefulOrderUpdateInfo).toEqual([{ + orderId: orderUuid2, + timestamp: olderTimestamp, + }]); + }); + + it('returns multiple stateful order update info older than the threshold', async () => { + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + initialTimestamp, client, + ); + + expect(statefulOrderUpdateInfo).toEqual([{ + orderId: orderUuid2, + timestamp: olderTimestamp, + }, { + orderId: orderUuid, + timestamp: initialTimestamp, + }]); + }); + }); +}); diff --git a/indexer/packages/redis/src/caches/scripts.ts b/indexer/packages/redis/src/caches/scripts.ts index cd933900e6..5bfa6a1382 100644 --- a/indexer/packages/redis/src/caches/scripts.ts +++ b/indexer/packages/redis/src/caches/scripts.ts @@ -60,6 +60,8 @@ export const updateOrderScript: LuaScript = newLuaScript('updateOrder', '../scri export const placeOrderScript: LuaScript = newLuaScript('placeOrder', '../scripts/place_order.lua'); export const removeOrderScript: LuaScript = newLuaScript('removeOrder', '../scripts/remove_order.lua'); export const addCanceledOrderIdScript: LuaScript = newLuaScript('addCanceledOrderId', '../scripts/add_canceled_order_id.lua'); +export const addStatefulOrderUpdateScript: LuaScript = newLuaScript('addStatefulOrderUpdate', '../scripts/add_stateful_order_update.lua'); +export const removeStatefulOrderUpdateScript: LuaScript = newLuaScript('removeStatefulOrderUpdate', '../scripts/remove_stateful_order_update.lua'); export const allLuaScripts: LuaScript[] = [ deleteZeroPriceLevelScript, @@ -69,4 +71,6 @@ export const allLuaScripts: LuaScript[] = [ placeOrderScript, removeOrderScript, addCanceledOrderIdScript, + addStatefulOrderUpdateScript, + removeStatefulOrderUpdateScript, ]; diff --git a/indexer/packages/redis/src/caches/stateful-order-updates-cache.ts b/indexer/packages/redis/src/caches/stateful-order-updates-cache.ts new file mode 100644 index 0000000000..92e1183dbf --- /dev/null +++ b/indexer/packages/redis/src/caches/stateful-order-updates-cache.ts @@ -0,0 +1,127 @@ +import { OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos'; +import _ from 'lodash'; +import { Callback, RedisClient } from 'redis'; + +import { zRangeByScoreAsync } from '../helpers/redis'; +import { StatefulOrderUpdateInfo } from '../types'; +import { addStatefulOrderUpdateScript, removeStatefulOrderUpdateScript } from './scripts'; + +// Cache of order ids of the stateful order updates and when the updates were added to teh cache +export const ORDER_UPDATE_IDS_CACHE_KEY: string = 'v4/stateful_order_update_ids'; +// Cache of order updates for stateful orders +export const ORDER_UPDATES_CACHE_KEY: string = 'v4/stateful_order_updates'; + +export async function addStatefulOrderUpdate( + statefulOrderId: string, + orderUpdate: OrderUpdateV1, + updateTimestamp: number, + client: RedisClient, +): Promise { + const numKeys: number = 2; + let evalAsync: ( + orderId: string, + encodedOrderUpdate: string, + timestamp: number, + ) => Promise = ( + orderId, + encodedOrderUpdate, + timestamp, + ) => { + return new Promise((resolve, reject) => { + const callback: Callback = ( + err: Error | null, + ) => { + if (err) { + return reject(err); + } + return resolve(); + }; + client.evalsha( + addStatefulOrderUpdateScript.hash, + numKeys, + ORDER_UPDATE_IDS_CACHE_KEY, + ORDER_UPDATES_CACHE_KEY, + orderId, + encodedOrderUpdate, + timestamp, + callback, + ); + }); + }; + + evalAsync = evalAsync.bind(client); + + return evalAsync( + statefulOrderId, + Buffer.from(Uint8Array.from(OrderUpdateV1.encode(orderUpdate).finish())).toString('binary'), + updateTimestamp, + ); +} + +export async function removeStatefulOrderUpdate( + statefulOrderId: string, + removeTimestamp: number, + client: RedisClient, +): Promise { + const numKeys: number = 2; + let evalAsync: ( + orderId: string, + timestamp: number, + ) => Promise = ( + orderId, + timestamp, + ) => { + return new Promise((resolve, reject) => { + const callback: Callback = ( + err: Error | null, + results: string, + ) => { + if (err) { + return reject(err); + } + if (results === '') { + return resolve(undefined); + } + + return resolve(OrderUpdateV1.decode(Buffer.from(results, 'binary'))); + }; + client.evalsha( + removeStatefulOrderUpdateScript.hash, + numKeys, + ORDER_UPDATE_IDS_CACHE_KEY, + ORDER_UPDATES_CACHE_KEY, + orderId, + timestamp, + callback, + ); + }); + }; + + evalAsync = evalAsync.bind(client); + + return evalAsync( + statefulOrderId, + removeTimestamp, + ); +} + +export async function getOldOrderUpdates( + latestTimestamp: number, + client: RedisClient, +): Promise { + const rawResults: string[] = await zRangeByScoreAsync({ + key: ORDER_UPDATE_IDS_CACHE_KEY, + start: -Infinity, + end: latestTimestamp, + endIsInclusive: true, + withScores: true, + }, client); + return _.chunk(rawResults, 2).map( + (keyValuePair: string[]) => { + return { + orderId: keyValuePair[0], + timestamp: Number(keyValuePair[1]), + }; + }, + ); +} diff --git a/indexer/packages/redis/src/scripts/add_stateful_order_update.lua b/indexer/packages/redis/src/scripts/add_stateful_order_update.lua new file mode 100644 index 0000000000..5b8a39fa8e --- /dev/null +++ b/indexer/packages/redis/src/scripts/add_stateful_order_update.lua @@ -0,0 +1,16 @@ +-- Hash of the ZSET tracking when a stateful order update was added to the cache +local statefulOrderUpdateIdHash = KEYS[1] +-- Hash of the HSET tracking the stateful order updates +local statefulOrderUpdateHash = KEYS[2] + +-- Order id of the stateful order update to add to the cache +local statefulOrderId = ARGV[1] +-- Encoded stateful order update protobuf +local statefulOrderUpdate = ARGV[2] +-- Timestamp of when the order update was added +local timestamp = ARGV[3] + +redis.call("ZADD", statefulOrderUpdateIdHash, timestamp, statefulOrderId) +redis.call("HSET", statefulOrderUpdateHash, statefulOrderId, statefulOrderUpdate) + +return 1 diff --git a/indexer/packages/redis/src/scripts/remove_stateful_order_update.lua b/indexer/packages/redis/src/scripts/remove_stateful_order_update.lua new file mode 100644 index 0000000000..5a07ed371b --- /dev/null +++ b/indexer/packages/redis/src/scripts/remove_stateful_order_update.lua @@ -0,0 +1,41 @@ +-- Hash of the ZSET tracking when a stateful order update was added to the cache +local statefulOrderUpdateIdHash = KEYS[1] +-- Hash of the HSET tracking the stateful order updates +local statefulOrderUpdateHash = KEYS[2] + +-- Order id of the stateful order update to add to the cache +local statefulOrderId = ARGV[1] +-- Maximum timestamp of the stateful order update to remove +-- Needed for removing old stateful order updates +local maxTimestamp = ARGV[2] + +-- This script attempts to remove a stateful order update associated with an order id +-- from the ZSET / HSET tracking stateful order updates +-- This script returns the either an empty string if no stateful order update was removed +-- or the encoded stateful order update protobuf that was removed + +local oldTimestamp = redis.call("ZSCORE", statefulOrderUpdateIdHash, statefulOrderId) +-- if there is no timestamp, return an empty string +if not oldTimestamp then + return "" +end + +-- If the timestamp of the stateful order update is less than the maximum tiemstamp +-- to remove, return an empty string and don't remove the stateful order update +if tonumber(oldTimestamp) > tonumber(maxTimestamp) then + return "" +end + +-- The timestamp exists and is less than the maximum timestamp, and so delete the order id +-- from the ZSET +redis.call("ZREM", statefulOrderUpdateIdHash, statefulOrderId) + +local oldStatefulOrderUpdate = redis.call("HGET", statefulOrderUpdateHash, statefulOrderId) +-- If there's no order update, return empty string +if not oldStatefulOrderUpdate then + return "" +end + +redis.call("HDEL", statefulOrderUpdateHash, statefulOrderId) + +return oldStatefulOrderUpdate diff --git a/indexer/packages/redis/src/types.ts b/indexer/packages/redis/src/types.ts index 60e23b7178..0f67c7788d 100644 --- a/indexer/packages/redis/src/types.ts +++ b/indexer/packages/redis/src/types.ts @@ -74,3 +74,9 @@ export type PnlTickForSubaccounts = { // the uuid. [subaccountId: string]: PnlTicksCreateObject }; + +/* -------- Stateful order update cache types -------- */ +export interface StatefulOrderUpdateInfo { + orderId: string, + timestamp: number, +} From ce25de4d8bb2e933a4602ced1f3fcaf3f25f9082 Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Fri, 20 Oct 2023 11:10:26 -0400 Subject: [PATCH 2/4] Expose new cache. --- indexer/packages/base/src/utils.ts | 14 ++++++++++++++ indexer/packages/redis/src/index.ts | 1 + 2 files changed, 15 insertions(+) create mode 100644 indexer/packages/base/src/utils.ts diff --git a/indexer/packages/base/src/utils.ts b/indexer/packages/base/src/utils.ts new file mode 100644 index 0000000000..e5797bfe02 --- /dev/null +++ b/indexer/packages/base/src/utils.ts @@ -0,0 +1,14 @@ +/** + * Checks if any object has any defined properties. + * @param object + * @returns True if there is at least 1 defined property, false otherwise. + */ +export function hasDefinedProperties(object: Object): boolean { + for (const entry of Object.entries(object)) { + if (entry[1] !== undefined) { + return true; + } + } + + return false; +} diff --git a/indexer/packages/redis/src/index.ts b/indexer/packages/redis/src/index.ts index ce93d10093..3c57153e4b 100644 --- a/indexer/packages/redis/src/index.ts +++ b/indexer/packages/redis/src/index.ts @@ -9,6 +9,7 @@ export * as NextFundingCache from './caches/next-funding-cache'; export * as OrderbookLevelsCache from './caches/orderbook-levels-cache'; export * as LatestAccountPnlTicksCache from './caches/latest-account-pnl-ticks-cache'; export * as CanceledOrdersCache from './caches/canceled-orders-cache'; +export * as StatefulORderUpdatesCache from './caches/stateful-order-updates-cache'; export { placeOrder } from './caches/place-order'; export { removeOrder } from './caches/remove-order'; export { updateOrder } from './caches/update-order'; From b9668026fa32b3a33885050adc9576037f5bf9cb Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Fri, 20 Oct 2023 11:13:09 -0400 Subject: [PATCH 3/4] Fix typo. --- indexer/packages/redis/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/packages/redis/src/index.ts b/indexer/packages/redis/src/index.ts index 3c57153e4b..c1b716f726 100644 --- a/indexer/packages/redis/src/index.ts +++ b/indexer/packages/redis/src/index.ts @@ -9,7 +9,7 @@ export * as NextFundingCache from './caches/next-funding-cache'; export * as OrderbookLevelsCache from './caches/orderbook-levels-cache'; export * as LatestAccountPnlTicksCache from './caches/latest-account-pnl-ticks-cache'; export * as CanceledOrdersCache from './caches/canceled-orders-cache'; -export * as StatefulORderUpdatesCache from './caches/stateful-order-updates-cache'; +export * as StatefulOrderUpdatesCache from './caches/stateful-order-updates-cache'; export { placeOrder } from './caches/place-order'; export { removeOrder } from './caches/remove-order'; export { updateOrder } from './caches/update-order'; From 414ce74056e010f0efa060708d93e6b5fed4b8cd Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Mon, 23 Oct 2023 04:11:58 -0400 Subject: [PATCH 4/4] Remove unneeded file. --- indexer/packages/base/src/utils.ts | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 indexer/packages/base/src/utils.ts diff --git a/indexer/packages/base/src/utils.ts b/indexer/packages/base/src/utils.ts deleted file mode 100644 index e5797bfe02..0000000000 --- a/indexer/packages/base/src/utils.ts +++ /dev/null @@ -1,14 +0,0 @@ -/** - * Checks if any object has any defined properties. - * @param object - * @returns True if there is at least 1 defined property, false otherwise. - */ -export function hasDefinedProperties(object: Object): boolean { - for (const entry of Object.entries(object)) { - if (entry[1] !== undefined) { - return true; - } - } - - return false; -}