-
Notifications
You must be signed in to change notification settings - Fork 115
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[IND-402] Add cache for order updates for stateful orders. (#682)
- Loading branch information
1 parent
e510168
commit 5d6d407
Showing
7 changed files
with
359 additions
and
0 deletions.
There are no files selected for viewing
164 changes: 164 additions & 0 deletions
164
indexer/packages/redis/__tests__/caches/stateful-order-updates-cache.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import { deleteAllAsync } from '../../src/helpers/redis'; | ||
import { redis as client } from '../helpers/utils'; | ||
import { | ||
addStatefulOrderUpdate, | ||
removeStatefulOrderUpdate, | ||
getOldOrderUpdates, | ||
} from '../../src/caches/stateful-order-updates-cache'; | ||
import { IndexerOrderId, OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos'; | ||
import Long from 'long'; | ||
import { orderId } from './constants'; | ||
import { OrderTable } from '@dydxprotocol-indexer/postgres'; | ||
import { StatefulOrderUpdateInfo } from 'packages/redis/src'; | ||
|
||
describe('statefulOrderUpdatesCache', () => { | ||
const orderUpdate: OrderUpdateV1 = { | ||
orderId, | ||
totalFilledQuantums: Long.fromNumber(100, true), | ||
}; | ||
const orderUuid: string = OrderTable.orderIdToUuid(orderId); | ||
const initialTimestamp: number = Date.now(); | ||
const olderTimestamp: number = initialTimestamp - 10; | ||
const newerTimestamp: number = initialTimestamp + 10; | ||
|
||
beforeEach(async () => { | ||
await deleteAllAsync(client); | ||
}); | ||
|
||
afterEach(async () => { | ||
await deleteAllAsync(client); | ||
}); | ||
|
||
describe('addStatefulOrderUpdate', () => { | ||
it('adds stateful order update to cache', async () => { | ||
await addStatefulOrderUpdate( | ||
orderUuid, | ||
orderUpdate, | ||
initialTimestamp, | ||
client, | ||
); | ||
|
||
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( | ||
newerTimestamp, client, | ||
); | ||
const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( | ||
orderUuid, initialTimestamp, client, | ||
); | ||
|
||
expect(statefulOrderUpdateInfo).toHaveLength(1); | ||
expect(statefulOrderUpdateInfo[0]).toEqual({ | ||
orderId: orderUuid, | ||
timestamp: initialTimestamp, | ||
}); | ||
expect(removedUpdate).toBeDefined(); | ||
expect(removedUpdate).toEqual(orderUpdate); | ||
}); | ||
}); | ||
|
||
describe('removeStatefulorderUpdate', () => { | ||
it('removes and returns existing stateful order update from cache', async () => { | ||
await addStatefulOrderUpdate( | ||
orderUuid, | ||
orderUpdate, | ||
initialTimestamp, | ||
client, | ||
); | ||
|
||
const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( | ||
orderUuid, initialTimestamp, client, | ||
); | ||
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( | ||
newerTimestamp, client, | ||
); | ||
|
||
expect(removedUpdate).toBeDefined(); | ||
expect(removedUpdate).toEqual(orderUpdate); | ||
expect(statefulOrderUpdateInfo).toHaveLength(0); | ||
}); | ||
|
||
it('does not remove existing stateful order update if timestamp is lower', async () => { | ||
await addStatefulOrderUpdate( | ||
orderUuid, | ||
orderUpdate, | ||
initialTimestamp, | ||
client, | ||
); | ||
|
||
const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( | ||
orderUuid, olderTimestamp, client, | ||
); | ||
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( | ||
newerTimestamp, client, | ||
); | ||
|
||
expect(removedUpdate).toBeUndefined(); | ||
expect(statefulOrderUpdateInfo).toHaveLength(1); | ||
expect(statefulOrderUpdateInfo[0]).toEqual({ | ||
orderId: orderUuid, | ||
timestamp: initialTimestamp, | ||
}); | ||
}); | ||
|
||
it('removes non-existing order and returns undefined', async () => { | ||
const removedUpdate: OrderUpdateV1 | undefined = await removeStatefulOrderUpdate( | ||
orderUuid, initialTimestamp, client, | ||
); | ||
|
||
expect(removedUpdate).toBeUndefined(); | ||
}); | ||
}); | ||
|
||
describe('getOldOrderUpdates', () => { | ||
const orderId2: IndexerOrderId = { | ||
...orderId, | ||
clientId: 45, | ||
}; | ||
const orderUuid2: string = OrderTable.orderIdToUuid(orderId2); | ||
const orderUpdate2: OrderUpdateV1 = { | ||
...orderUpdate, | ||
orderId: orderId2, | ||
}; | ||
|
||
beforeEach(async () => { | ||
await Promise.all([ | ||
addStatefulOrderUpdate( | ||
orderUuid, | ||
orderUpdate, | ||
initialTimestamp, | ||
client, | ||
), | ||
addStatefulOrderUpdate( | ||
orderUuid2, | ||
orderUpdate2, | ||
olderTimestamp, | ||
client, | ||
)], | ||
); | ||
}); | ||
|
||
it('returns stateful order update info older than the threshold', async () => { | ||
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( | ||
olderTimestamp, client, | ||
); | ||
|
||
expect(statefulOrderUpdateInfo).toEqual([{ | ||
orderId: orderUuid2, | ||
timestamp: olderTimestamp, | ||
}]); | ||
}); | ||
|
||
it('returns multiple stateful order update info older than the threshold', async () => { | ||
const statefulOrderUpdateInfo: StatefulOrderUpdateInfo[] = await getOldOrderUpdates( | ||
initialTimestamp, client, | ||
); | ||
|
||
expect(statefulOrderUpdateInfo).toEqual([{ | ||
orderId: orderUuid2, | ||
timestamp: olderTimestamp, | ||
}, { | ||
orderId: orderUuid, | ||
timestamp: initialTimestamp, | ||
}]); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
127 changes: 127 additions & 0 deletions
127
indexer/packages/redis/src/caches/stateful-order-updates-cache.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
import { OrderUpdateV1 } from '@dydxprotocol-indexer/v4-protos'; | ||
import _ from 'lodash'; | ||
import { Callback, RedisClient } from 'redis'; | ||
|
||
import { zRangeByScoreAsync } from '../helpers/redis'; | ||
import { StatefulOrderUpdateInfo } from '../types'; | ||
import { addStatefulOrderUpdateScript, removeStatefulOrderUpdateScript } from './scripts'; | ||
|
||
// Cache of order ids of the stateful order updates and when the updates were added to teh cache | ||
export const ORDER_UPDATE_IDS_CACHE_KEY: string = 'v4/stateful_order_update_ids'; | ||
// Cache of order updates for stateful orders | ||
export const ORDER_UPDATES_CACHE_KEY: string = 'v4/stateful_order_updates'; | ||
|
||
export async function addStatefulOrderUpdate( | ||
statefulOrderId: string, | ||
orderUpdate: OrderUpdateV1, | ||
updateTimestamp: number, | ||
client: RedisClient, | ||
): Promise<void> { | ||
const numKeys: number = 2; | ||
let evalAsync: ( | ||
orderId: string, | ||
encodedOrderUpdate: string, | ||
timestamp: number, | ||
) => Promise<void> = ( | ||
orderId, | ||
encodedOrderUpdate, | ||
timestamp, | ||
) => { | ||
return new Promise((resolve, reject) => { | ||
const callback: Callback<void> = ( | ||
err: Error | null, | ||
) => { | ||
if (err) { | ||
return reject(err); | ||
} | ||
return resolve(); | ||
}; | ||
client.evalsha( | ||
addStatefulOrderUpdateScript.hash, | ||
numKeys, | ||
ORDER_UPDATE_IDS_CACHE_KEY, | ||
ORDER_UPDATES_CACHE_KEY, | ||
orderId, | ||
encodedOrderUpdate, | ||
timestamp, | ||
callback, | ||
); | ||
}); | ||
}; | ||
|
||
evalAsync = evalAsync.bind(client); | ||
|
||
return evalAsync( | ||
statefulOrderId, | ||
Buffer.from(Uint8Array.from(OrderUpdateV1.encode(orderUpdate).finish())).toString('binary'), | ||
updateTimestamp, | ||
); | ||
} | ||
|
||
export async function removeStatefulOrderUpdate( | ||
statefulOrderId: string, | ||
removeTimestamp: number, | ||
client: RedisClient, | ||
): Promise<OrderUpdateV1 | undefined> { | ||
const numKeys: number = 2; | ||
let evalAsync: ( | ||
orderId: string, | ||
timestamp: number, | ||
) => Promise<OrderUpdateV1 | undefined> = ( | ||
orderId, | ||
timestamp, | ||
) => { | ||
return new Promise((resolve, reject) => { | ||
const callback: Callback<string> = ( | ||
err: Error | null, | ||
results: string, | ||
) => { | ||
if (err) { | ||
return reject(err); | ||
} | ||
if (results === '') { | ||
return resolve(undefined); | ||
} | ||
|
||
return resolve(OrderUpdateV1.decode(Buffer.from(results, 'binary'))); | ||
}; | ||
client.evalsha( | ||
removeStatefulOrderUpdateScript.hash, | ||
numKeys, | ||
ORDER_UPDATE_IDS_CACHE_KEY, | ||
ORDER_UPDATES_CACHE_KEY, | ||
orderId, | ||
timestamp, | ||
callback, | ||
); | ||
}); | ||
}; | ||
|
||
evalAsync = evalAsync.bind(client); | ||
|
||
return evalAsync( | ||
statefulOrderId, | ||
removeTimestamp, | ||
); | ||
} | ||
|
||
export async function getOldOrderUpdates( | ||
latestTimestamp: number, | ||
client: RedisClient, | ||
): Promise<StatefulOrderUpdateInfo[]> { | ||
const rawResults: string[] = await zRangeByScoreAsync({ | ||
key: ORDER_UPDATE_IDS_CACHE_KEY, | ||
start: -Infinity, | ||
end: latestTimestamp, | ||
endIsInclusive: true, | ||
withScores: true, | ||
}, client); | ||
return _.chunk(rawResults, 2).map( | ||
(keyValuePair: string[]) => { | ||
return { | ||
orderId: keyValuePair[0], | ||
timestamp: Number(keyValuePair[1]), | ||
}; | ||
}, | ||
); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
16 changes: 16 additions & 0 deletions
16
indexer/packages/redis/src/scripts/add_stateful_order_update.lua
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
-- Hash of the ZSET tracking when a stateful order update was added to the cache | ||
local statefulOrderUpdateIdHash = KEYS[1] | ||
-- Hash of the HSET tracking the stateful order updates | ||
local statefulOrderUpdateHash = KEYS[2] | ||
|
||
-- Order id of the stateful order update to add to the cache | ||
local statefulOrderId = ARGV[1] | ||
-- Encoded stateful order update protobuf | ||
local statefulOrderUpdate = ARGV[2] | ||
-- Timestamp of when the order update was added | ||
local timestamp = ARGV[3] | ||
|
||
redis.call("ZADD", statefulOrderUpdateIdHash, timestamp, statefulOrderId) | ||
redis.call("HSET", statefulOrderUpdateHash, statefulOrderId, statefulOrderUpdate) | ||
|
||
return 1 |
41 changes: 41 additions & 0 deletions
41
indexer/packages/redis/src/scripts/remove_stateful_order_update.lua
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
-- Hash of the ZSET tracking when a stateful order update was added to the cache | ||
local statefulOrderUpdateIdHash = KEYS[1] | ||
-- Hash of the HSET tracking the stateful order updates | ||
local statefulOrderUpdateHash = KEYS[2] | ||
|
||
-- Order id of the stateful order update to add to the cache | ||
local statefulOrderId = ARGV[1] | ||
-- Maximum timestamp of the stateful order update to remove | ||
-- Needed for removing old stateful order updates | ||
local maxTimestamp = ARGV[2] | ||
|
||
-- This script attempts to remove a stateful order update associated with an order id | ||
-- from the ZSET / HSET tracking stateful order updates | ||
-- This script returns the either an empty string if no stateful order update was removed | ||
-- or the encoded stateful order update protobuf that was removed | ||
|
||
local oldTimestamp = redis.call("ZSCORE", statefulOrderUpdateIdHash, statefulOrderId) | ||
-- if there is no timestamp, return an empty string | ||
if not oldTimestamp then | ||
return "" | ||
end | ||
|
||
-- If the timestamp of the stateful order update is less than the maximum tiemstamp | ||
-- to remove, return an empty string and don't remove the stateful order update | ||
if tonumber(oldTimestamp) > tonumber(maxTimestamp) then | ||
return "" | ||
end | ||
|
||
-- The timestamp exists and is less than the maximum timestamp, and so delete the order id | ||
-- from the ZSET | ||
redis.call("ZREM", statefulOrderUpdateIdHash, statefulOrderId) | ||
|
||
local oldStatefulOrderUpdate = redis.call("HGET", statefulOrderUpdateHash, statefulOrderId) | ||
-- If there's no order update, return empty string | ||
if not oldStatefulOrderUpdate then | ||
return "" | ||
end | ||
|
||
redis.call("HDEL", statefulOrderUpdateHash, statefulOrderId) | ||
|
||
return oldStatefulOrderUpdate |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters