Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-402] Add cache for order updates for stateful orders. #682

Merged
merged 4 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions indexer/packages/base/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Checks if any object has any defined properties.
* @param object
* @returns True if there is at least 1 defined property, false otherwise.
*/
export function hasDefinedProperties(object: Object): boolean {
for (const entry of Object.entries(object)) {
if (entry[1] !== undefined) {
return true;
}
}

return false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import { deleteAllAsync } from '../../src/helpers/redis';
import { redis as client } from '../helpers/utils';
import {
addStatefulOrderUpdate,
removeStatefulOrderUpdate,
getOldOrderUpdates,
} from '../../src/caches/stateful-order-updates-cache';
import { IndexerOrderId, OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos';
import Long from 'long';
import { orderId } from './constants';
import { OrderTable } from '@dydxprotocol-indexer/postgres';
import { StatefulOrderUpdateInfo } from 'packages/redis/src';

describe('statefulOrderUpdatesCache', () => {
const orderUpdate: OrderUpdateV1 = {
orderId,
totalFilledQuantums: Long.fromNumber(100, true),
};
const orderUuid: string = OrderTable.orderIdToUuid(orderId);
const initialTimestamp: number = Date.now();
const olderTimestamp: number = initialTimestamp - 10;
const newerTimestamp: number = initialTimestamp + 10;

beforeEach(async () => {
await deleteAllAsync(client);
});

afterEach(async () => {
await deleteAllAsync(client);
});

describe('addStatefulOrderUpdate', () => {
it('adds stateful order update to cache', async () => {
await addStatefulOrderUpdate(
orderUuid,
orderUpdate,
initialTimestamp,
client,
);

const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates(
newerTimestamp, client,
);
const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate(
orderUuid, initialTimestamp, client,
);

expect(statefulOrderUpdateInfo).toHaveLength(1);
expect(statefulOrderUpdateInfo[0]).toEqual({
orderId: orderUuid,
timestamp: initialTimestamp,
});
expect(removedUpdate).toBeDefined();
expect(removedUpdate).toEqual(orderUpdate);
});
});
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved

describe('removeStatefulorderUpdate', () => {
it('removes and returns existing stateful order update from cache', async () => {
await addStatefulOrderUpdate(
orderUuid,
orderUpdate,
initialTimestamp,
client,
);

const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate(
orderUuid, initialTimestamp, client,
);
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates(
newerTimestamp, client,
);

expect(removedUpdate).toBeDefined();
expect(removedUpdate).toEqual(orderUpdate);
expect(statefulOrderUpdateInfo).toHaveLength(0);
});

it('does not remove existing stateful order update if timestamp is lower', async () => {
await addStatefulOrderUpdate(
orderUuid,
orderUpdate,
initialTimestamp,
client,
);

const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate(
orderUuid, olderTimestamp, client,
);
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates(
newerTimestamp, client,
);

expect(removedUpdate).toBeUndefined();
expect(statefulOrderUpdateInfo).toHaveLength(1);
expect(statefulOrderUpdateInfo[0]).toEqual({
orderId: orderUuid,
timestamp: initialTimestamp,
});
});

it('removes non-existing order and returns undefined', async () => {
const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate(
orderUuid, initialTimestamp, client,
);

expect(removedUpdate).toBeUndefined();
});
});

describe('getOldOrderUpdates', () => {
const orderId2: IndexerOrderId = {
...orderId,
clientId: 45,
};
const orderUuid2: string = OrderTable.orderIdToUuid(orderId2);
const orderUpdate2: OrderUpdateV1 = {
...orderUpdate,
orderId: orderId2,
};

beforeEach(async () => {
await Promise.all([
addStatefulOrderUpdate(
orderUuid,
orderUpdate,
initialTimestamp,
client,
),
addStatefulOrderUpdate(
orderUuid2,
orderUpdate2,
olderTimestamp,
client,
)],
);
});

it('returns stateful order update info older than the threshold', async () => {
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates(
olderTimestamp, client,
);

expect(statefulOrderUpdateInfo).toEqual([{
orderId: orderUuid2,
timestamp: olderTimestamp,
}]);
});

it('returns multiple stateful order update info older than the threshold', async () => {
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates(
initialTimestamp, client,
);

expect(statefulOrderUpdateInfo).toEqual([{
orderId: orderUuid2,
timestamp: olderTimestamp,
}, {
orderId: orderUuid,
timestamp: initialTimestamp,
}]);
});
});
});
4 changes: 4 additions & 0 deletions indexer/packages/redis/src/caches/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ export const updateOrderScript: LuaScript = newLuaScript('updateOrder', '../scri
export const placeOrderScript: LuaScript = newLuaScript('placeOrder', '../scripts/place_order.lua');
export const removeOrderScript: LuaScript = newLuaScript('removeOrder', '../scripts/remove_order.lua');
export const addCanceledOrderIdScript: LuaScript = newLuaScript('addCanceledOrderId', '../scripts/add_canceled_order_id.lua');
export const addStatefulOrderUpdateScript: LuaScript = newLuaScript('addStatefulOrderUpdate', '../scripts/add_stateful_order_update.lua');
export const removeStatefulOrderUpdateScript: LuaScript = newLuaScript('removeStatefulOrderUpdate', '../scripts/remove_stateful_order_update.lua');

export const allLuaScripts: LuaScript[] = [
deleteZeroPriceLevelScript,
Expand All @@ -69,4 +71,6 @@ export const allLuaScripts: LuaScript[] = [
placeOrderScript,
removeOrderScript,
addCanceledOrderIdScript,
addStatefulOrderUpdateScript,
removeStatefulOrderUpdateScript,
];
127 changes: 127 additions & 0 deletions indexer/packages/redis/src/caches/stateful-order-updates-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos';
import _ from 'lodash';
import { Callback, RedisClient } from 'redis';

import { zRangeByScoreAsync } from '../helpers/redis';
import { StatefulOrderUpdateInfo } from '../types';
import { addStatefulOrderUpdateScript, removeStatefulOrderUpdateScript } from './scripts';

// Cache of order ids of the stateful order updates and when the updates were added to teh cache
export const ORDER_UPDATE_IDS_CACHE_KEY: string = 'v4/stateful_order_update_ids';
// Cache of order updates for stateful orders
export const ORDER_UPDATES_CACHE_KEY: string = 'v4/stateful_order_updates';

export async function addStatefulOrderUpdate(
statefulOrderId: string,
orderUpdate: OrderUpdateV1,
updateTimestamp: number,
client: RedisClient,
): Promise<void> {
const numKeys: number = 2;
let evalAsync: (
orderId: string,
encodedOrderUpdate: string,
timestamp: number,
) => Promise<void> = (
orderId,
encodedOrderUpdate,
timestamp,
) => {
return new Promise((resolve, reject) => {
const callback: Callback<void> = (
err: Error | null,
) => {
if (err) {
return reject(err);
}
return resolve();
};
client.evalsha(
addStatefulOrderUpdateScript.hash,
numKeys,
ORDER_UPDATE_IDS_CACHE_KEY,
ORDER_UPDATES_CACHE_KEY,
orderId,
encodedOrderUpdate,
timestamp,
callback,
);
});
};

evalAsync = evalAsync.bind(client);

return evalAsync(
statefulOrderId,
Buffer.from(Uint8Array.from(OrderUpdateV1.encode(orderUpdate).finish())).toString('binary'),
updateTimestamp,
);
}
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved

export async function removeStatefulOrderUpdate(
statefulOrderId: string,
removeTimestamp: number,
client: RedisClient,
): Promise<OrderUpdateV1 | undefined> {
const numKeys: number = 2;
let evalAsync: (
orderId: string,
timestamp: number,
) => Promise<OrderUpdateV1 | undefined> = (
orderId,
timestamp,
) => {
return new Promise((resolve, reject) => {
const callback: Callback<string> = (
err: Error | null,
results: string,
) => {
if (err) {
return reject(err);
}
if (results === '') {
return resolve(undefined);
}

return resolve(OrderUpdateV1.decode(Buffer.from(results, 'binary')));
};
client.evalsha(
removeStatefulOrderUpdateScript.hash,
numKeys,
ORDER_UPDATE_IDS_CACHE_KEY,
ORDER_UPDATES_CACHE_KEY,
orderId,
timestamp,
callback,
);
});
};

evalAsync = evalAsync.bind(client);

return evalAsync(
statefulOrderId,
removeTimestamp,
);
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
}

export async function getOldOrderUpdates(
latestTimestamp: number,
client: RedisClient,
): Promise<StatefulOrderUpdateInfo[]> {
const rawResults: string[] = await zRangeByScoreAsync({
key: ORDER_UPDATE_IDS_CACHE_KEY,
start: -Infinity,
end: latestTimestamp,
endIsInclusive: true,
withScores: true,
}, client);
return _.chunk(rawResults, 2).map(
(keyValuePair: string[]) => {
return {
orderId: keyValuePair[0],
timestamp: Number(keyValuePair[1]),
};
},
);
}
1 change: 1 addition & 0 deletions indexer/packages/redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export * as NextFundingCache from './caches/next-funding-cache';
export * as OrderbookLevelsCache from './caches/orderbook-levels-cache';
export * as LatestAccountPnlTicksCache from './caches/latest-account-pnl-ticks-cache';
export * as CanceledOrdersCache from './caches/canceled-orders-cache';
export * as StatefulOrderUpdatesCache from './caches/stateful-order-updates-cache';
export { placeOrder } from './caches/place-order';
export { removeOrder } from './caches/remove-order';
export { updateOrder } from './caches/update-order';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Hash of the ZSET tracking when a stateful order update was added to the cache
local statefulOrderUpdateIdHash = KEYS[1]
-- Hash of the HSET tracking the stateful order updates
local statefulOrderUpdateHash = KEYS[2]

-- Order id of the stateful order update to add to the cache
local statefulOrderId = ARGV[1]
-- Encoded stateful order update protobuf
local statefulOrderUpdate = ARGV[2]
-- Timestamp of when the order update was added
local timestamp = ARGV[3]

redis.call("ZADD", statefulOrderUpdateIdHash, timestamp, statefulOrderId)
redis.call("HSET", statefulOrderUpdateHash, statefulOrderId, statefulOrderUpdate)

return 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- Hash of the ZSET tracking when a stateful order update was added to the cache
local statefulOrderUpdateIdHash = KEYS[1]
-- Hash of the HSET tracking the stateful order updates
local statefulOrderUpdateHash = KEYS[2]

-- Order id of the stateful order update to add to the cache
local statefulOrderId = ARGV[1]
-- Maximum timestamp of the stateful order update to remove
-- Needed for removing old stateful order updates
local maxTimestamp = ARGV[2]

-- This script attempts to remove a stateful order update associated with an order id
-- from the ZSET / HSET tracking stateful order updates
-- This script returns the either an empty string if no stateful order update was removed
-- or the encoded stateful order update protobuf that was removed

local oldTimestamp = redis.call("ZSCORE", statefulOrderUpdateIdHash, statefulOrderId)
-- if there is no timestamp, return an empty string
if not oldTimestamp then
return ""
end
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved

-- If the timestamp of the stateful order update is less than the maximum tiemstamp
-- to remove, return an empty string and don't remove the stateful order update
if tonumber(oldTimestamp) > tonumber(maxTimestamp) then
return ""
end

-- The timestamp exists and is less than the maximum timestamp, and so delete the order id
-- from the ZSET
redis.call("ZREM", statefulOrderUpdateIdHash, statefulOrderId)

local oldStatefulOrderUpdate = redis.call("HGET", statefulOrderUpdateHash, statefulOrderId)
-- If there's no order update, return empty string
if not oldStatefulOrderUpdate then
return ""
end
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved

redis.call("HDEL", statefulOrderUpdateHash, statefulOrderId)

return oldStatefulOrderUpdate
Loading
Loading