diff --git a/indexer/packages/redis/__tests__/caches/stateful-order-updates-cache.test.ts b/indexer/packages/redis/__tests__/caches/stateful-order-updates-cache.test.ts new file mode 100644 index 0000000000..a9767efbf9 --- /dev/null +++ b/indexer/packages/redis/__tests__/caches/stateful-order-updates-cache.test.ts @@ -0,0 +1,164 @@ +import { deleteAllAsync } from '../../src/helpers/redis'; +import { redis as client } from '../helpers/utils'; +import { + addStatefulOrderUpdate, + removeStatefulOrderUpdate, + getOldOrderUpdates, +} from '../../src/caches/stateful-order-updates-cache'; +import { IndexerOrderId, OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos'; +import Long from 'long'; +import { orderId } from './constants'; +import { OrderTable } from '@dydxprotocol-indexer/postgres'; +import { StatefulOrderUpdateInfo } from 'packages/redis/src'; + +describe('statefulOrderUpdatesCache', () => { + const orderUpdate: OrderUpdateV1 = { + orderId, + totalFilledQuantums: Long.fromNumber(100, true), + }; + const orderUuid: string = OrderTable.orderIdToUuid(orderId); + const initialTimestamp: number = Date.now(); + const olderTimestamp: number = initialTimestamp - 10; + const newerTimestamp: number = initialTimestamp + 10; + + beforeEach(async () => { + await deleteAllAsync(client); + }); + + afterEach(async () => { + await deleteAllAsync(client); + }); + + describe('addStatefulOrderUpdate', () => { + it('adds stateful order update to cache', async () => { + await addStatefulOrderUpdate( + orderUuid, + orderUpdate, + initialTimestamp, + client, + ); + + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + newerTimestamp, client, + ); + const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( + orderUuid, initialTimestamp, client, + ); + + expect(statefulOrderUpdateInfo).toHaveLength(1); + expect(statefulOrderUpdateInfo[0]).toEqual({ + orderId: orderUuid, + timestamp: initialTimestamp, + }); + expect(removedUpdate).toBeDefined(); + expect(removedUpdate).toEqual(orderUpdate); + }); + }); + + describe('removeStatefulorderUpdate', () => { + it('removes and returns existing stateful order update from cache', async () => { + await addStatefulOrderUpdate( + orderUuid, + orderUpdate, + initialTimestamp, + client, + ); + + const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( + orderUuid, initialTimestamp, client, + ); + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + newerTimestamp, client, + ); + + expect(removedUpdate).toBeDefined(); + expect(removedUpdate).toEqual(orderUpdate); + expect(statefulOrderUpdateInfo).toHaveLength(0); + }); + + it('does not remove existing stateful order update if timestamp is lower', async () => { + await addStatefulOrderUpdate( + orderUuid, + orderUpdate, + initialTimestamp, + client, + ); + + const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( + orderUuid, olderTimestamp, client, + ); + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + newerTimestamp, client, + ); + + expect(removedUpdate).toBeUndefined(); + expect(statefulOrderUpdateInfo).toHaveLength(1); + expect(statefulOrderUpdateInfo[0]).toEqual({ + orderId: orderUuid, + timestamp: initialTimestamp, + }); + }); + + it('removes non-existing order and returns undefined', async () => { + const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( + orderUuid, initialTimestamp, client, + ); + + expect(removedUpdate).toBeUndefined(); + }); + }); + + describe('getOldOrderUpdates', () => { + const orderId2: IndexerOrderId = { + ...orderId, + clientId: 45, + }; + const orderUuid2: string = OrderTable.orderIdToUuid(orderId2); + const orderUpdate2: OrderUpdateV1 = { + ...orderUpdate, + orderId: orderId2, + }; + + beforeEach(async () => { + await Promise.all([ + addStatefulOrderUpdate( + orderUuid, + orderUpdate, + initialTimestamp, + client, + ), + addStatefulOrderUpdate( + orderUuid2, + orderUpdate2, + olderTimestamp, + client, + )], + ); + }); + + it('returns stateful order update info older than the threshold', async () => { + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + olderTimestamp, client, + ); + + expect(statefulOrderUpdateInfo).toEqual([{ + orderId: orderUuid2, + timestamp: olderTimestamp, + }]); + }); + + it('returns multiple stateful order update info older than the threshold', async () => { + const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( + initialTimestamp, client, + ); + + expect(statefulOrderUpdateInfo).toEqual([{ + orderId: orderUuid2, + timestamp: olderTimestamp, + }, { + orderId: orderUuid, + timestamp: initialTimestamp, + }]); + }); + }); +}); diff --git a/indexer/packages/redis/src/caches/scripts.ts b/indexer/packages/redis/src/caches/scripts.ts index cd933900e6..5bfa6a1382 100644 --- a/indexer/packages/redis/src/caches/scripts.ts +++ b/indexer/packages/redis/src/caches/scripts.ts @@ -60,6 +60,8 @@ export const updateOrderScript: LuaScript = newLuaScript('updateOrder', '../scri export const placeOrderScript: LuaScript = newLuaScript('placeOrder', '../scripts/place_order.lua'); export const removeOrderScript: LuaScript = newLuaScript('removeOrder', '../scripts/remove_order.lua'); export const addCanceledOrderIdScript: LuaScript = newLuaScript('addCanceledOrderId', '../scripts/add_canceled_order_id.lua'); +export const addStatefulOrderUpdateScript: LuaScript = newLuaScript('addStatefulOrderUpdate', '../scripts/add_stateful_order_update.lua'); +export const removeStatefulOrderUpdateScript: LuaScript = newLuaScript('removeStatefulOrderUpdate', '../scripts/remove_stateful_order_update.lua'); export const allLuaScripts: LuaScript[] = [ deleteZeroPriceLevelScript, @@ -69,4 +71,6 @@ export const allLuaScripts: LuaScript[] = [ placeOrderScript, removeOrderScript, addCanceledOrderIdScript, + addStatefulOrderUpdateScript, + removeStatefulOrderUpdateScript, ]; diff --git a/indexer/packages/redis/src/caches/stateful-order-updates-cache.ts b/indexer/packages/redis/src/caches/stateful-order-updates-cache.ts new file mode 100644 index 0000000000..92e1183dbf --- /dev/null +++ b/indexer/packages/redis/src/caches/stateful-order-updates-cache.ts @@ -0,0 +1,127 @@ +import { OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos'; +import _ from 'lodash'; +import { Callback, RedisClient } from 'redis'; + +import { zRangeByScoreAsync } from '../helpers/redis'; +import { StatefulOrderUpdateInfo } from '../types'; +import { addStatefulOrderUpdateScript, removeStatefulOrderUpdateScript } from './scripts'; + +// Cache of order ids of the stateful order updates and when the updates were added to teh cache +export const ORDER_UPDATE_IDS_CACHE_KEY: string = 'v4/stateful_order_update_ids'; +// Cache of order updates for stateful orders +export const ORDER_UPDATES_CACHE_KEY: string = 'v4/stateful_order_updates'; + +export async function addStatefulOrderUpdate( + statefulOrderId: string, + orderUpdate: OrderUpdateV1, + updateTimestamp: number, + client: RedisClient, +): Promise { + const numKeys: number = 2; + let evalAsync: ( + orderId: string, + encodedOrderUpdate: string, + timestamp: number, + ) => Promise = ( + orderId, + encodedOrderUpdate, + timestamp, + ) => { + return new Promise((resolve, reject) => { + const callback: Callback = ( + err: Error | null, + ) => { + if (err) { + return reject(err); + } + return resolve(); + }; + client.evalsha( + addStatefulOrderUpdateScript.hash, + numKeys, + ORDER_UPDATE_IDS_CACHE_KEY, + ORDER_UPDATES_CACHE_KEY, + orderId, + encodedOrderUpdate, + timestamp, + callback, + ); + }); + }; + + evalAsync = evalAsync.bind(client); + + return evalAsync( + statefulOrderId, + Buffer.from(Uint8Array.from(OrderUpdateV1.encode(orderUpdate).finish())).toString('binary'), + updateTimestamp, + ); +} + +export async function removeStatefulOrderUpdate( + statefulOrderId: string, + removeTimestamp: number, + client: RedisClient, +): Promise { + const numKeys: number = 2; + let evalAsync: ( + orderId: string, + timestamp: number, + ) => Promise = ( + orderId, + timestamp, + ) => { + return new Promise((resolve, reject) => { + const callback: Callback = ( + err: Error | null, + results: string, + ) => { + if (err) { + return reject(err); + } + if (results === '') { + return resolve(undefined); + } + + return resolve(OrderUpdateV1.decode(Buffer.from(results, 'binary'))); + }; + client.evalsha( + removeStatefulOrderUpdateScript.hash, + numKeys, + ORDER_UPDATE_IDS_CACHE_KEY, + ORDER_UPDATES_CACHE_KEY, + orderId, + timestamp, + callback, + ); + }); + }; + + evalAsync = evalAsync.bind(client); + + return evalAsync( + statefulOrderId, + removeTimestamp, + ); +} + +export async function getOldOrderUpdates( + latestTimestamp: number, + client: RedisClient, +): Promise { + const rawResults: string[] = await zRangeByScoreAsync({ + key: ORDER_UPDATE_IDS_CACHE_KEY, + start: -Infinity, + end: latestTimestamp, + endIsInclusive: true, + withScores: true, + }, client); + return _.chunk(rawResults, 2).map( + (keyValuePair: string[]) => { + return { + orderId: keyValuePair[0], + timestamp: Number(keyValuePair[1]), + }; + }, + ); +} diff --git a/indexer/packages/redis/src/index.ts b/indexer/packages/redis/src/index.ts index ce93d10093..c1b716f726 100644 --- a/indexer/packages/redis/src/index.ts +++ b/indexer/packages/redis/src/index.ts @@ -9,6 +9,7 @@ export * as NextFundingCache from './caches/next-funding-cache'; export * as OrderbookLevelsCache from './caches/orderbook-levels-cache'; export * as LatestAccountPnlTicksCache from './caches/latest-account-pnl-ticks-cache'; export * as CanceledOrdersCache from './caches/canceled-orders-cache'; +export * as StatefulOrderUpdatesCache from './caches/stateful-order-updates-cache'; export { placeOrder } from './caches/place-order'; export { removeOrder } from './caches/remove-order'; export { updateOrder } from './caches/update-order'; diff --git a/indexer/packages/redis/src/scripts/add_stateful_order_update.lua b/indexer/packages/redis/src/scripts/add_stateful_order_update.lua new file mode 100644 index 0000000000..5b8a39fa8e --- /dev/null +++ b/indexer/packages/redis/src/scripts/add_stateful_order_update.lua @@ -0,0 +1,16 @@ +-- Hash of the ZSET tracking when a stateful order update was added to the cache +local statefulOrderUpdateIdHash = KEYS[1] +-- Hash of the HSET tracking the stateful order updates +local statefulOrderUpdateHash = KEYS[2] + +-- Order id of the stateful order update to add to the cache +local statefulOrderId = ARGV[1] +-- Encoded stateful order update protobuf +local statefulOrderUpdate = ARGV[2] +-- Timestamp of when the order update was added +local timestamp = ARGV[3] + +redis.call("ZADD", statefulOrderUpdateIdHash, timestamp, statefulOrderId) +redis.call("HSET", statefulOrderUpdateHash, statefulOrderId, statefulOrderUpdate) + +return 1 diff --git a/indexer/packages/redis/src/scripts/remove_stateful_order_update.lua b/indexer/packages/redis/src/scripts/remove_stateful_order_update.lua new file mode 100644 index 0000000000..5a07ed371b --- /dev/null +++ b/indexer/packages/redis/src/scripts/remove_stateful_order_update.lua @@ -0,0 +1,41 @@ +-- Hash of the ZSET tracking when a stateful order update was added to the cache +local statefulOrderUpdateIdHash = KEYS[1] +-- Hash of the HSET tracking the stateful order updates +local statefulOrderUpdateHash = KEYS[2] + +-- Order id of the stateful order update to add to the cache +local statefulOrderId = ARGV[1] +-- Maximum timestamp of the stateful order update to remove +-- Needed for removing old stateful order updates +local maxTimestamp = ARGV[2] + +-- This script attempts to remove a stateful order update associated with an order id +-- from the ZSET / HSET tracking stateful order updates +-- This script returns the either an empty string if no stateful order update was removed +-- or the encoded stateful order update protobuf that was removed + +local oldTimestamp = redis.call("ZSCORE", statefulOrderUpdateIdHash, statefulOrderId) +-- if there is no timestamp, return an empty string +if not oldTimestamp then + return "" +end + +-- If the timestamp of the stateful order update is less than the maximum tiemstamp +-- to remove, return an empty string and don't remove the stateful order update +if tonumber(oldTimestamp) > tonumber(maxTimestamp) then + return "" +end + +-- The timestamp exists and is less than the maximum timestamp, and so delete the order id +-- from the ZSET +redis.call("ZREM", statefulOrderUpdateIdHash, statefulOrderId) + +local oldStatefulOrderUpdate = redis.call("HGET", statefulOrderUpdateHash, statefulOrderId) +-- If there's no order update, return empty string +if not oldStatefulOrderUpdate then + return "" +end + +redis.call("HDEL", statefulOrderUpdateHash, statefulOrderId) + +return oldStatefulOrderUpdate diff --git a/indexer/packages/redis/src/types.ts b/indexer/packages/redis/src/types.ts index 60e23b7178..0f67c7788d 100644 --- a/indexer/packages/redis/src/types.ts +++ b/indexer/packages/redis/src/types.ts @@ -74,3 +74,9 @@ export type PnlTickForSubaccounts = { // the uuid. [subaccountId: string]: PnlTicksCreateObject }; + +/* -------- Stateful order update cache types -------- */ +export interface StatefulOrderUpdateInfo { + orderId: string, + timestamp: number, +}