Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
adamfraser committed Sep 18, 2024
1 parent e0b5afa commit da74622
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 1 deletion.
65 changes: 65 additions & 0 deletions indexer/packages/redis/src/caches/orderbook-levels-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
deleteStalePriceLevelScript,
getOrderbookSideScript,
incrementOrderbookLevelScript,
addMarketPriceScript,
getMarketMedianScript,
} from './scripts';

// Cache of orderbook levels for each clob pair
Expand All @@ -20,6 +22,7 @@ import {
// TODO(CORE-512): add info/resources around caches. Doc:
// https://www.notion.so/dydx/Indexer-Technical-Spec-a6b15644502048f994c98dee35b96e96#61d5f8ca5117476caab78b3f0691b1d0
export const ORDERS_CACHE_KEY_PREFIX: string = 'v4/orderbookLevels/';
export const MARKET_PRICES_CACHE_KEY_PREFIX: string = 'v4/marketPrices/';

/**
* Update the total size of orders at a price level for a specific ticker/side with a delta. The
Expand Down Expand Up @@ -553,3 +556,65 @@ export async function getOrderBookMidPrice(
}
return bestBid.plus(bestAsk).div(2).toFixed();
}

// Adds a price to the market prices cache
export async function addPriceToMarket(
client: RedisClient,
ticker: string,
price: number,
): Promise<void> {
// Number of keys for the lua script.
const numKeys: number = 1;

let evalAsync: (
marketCacheKey: string,
) => Promise<void> = (marketCacheKey) => {

return new Promise<void>((resolve, reject) => {
const callback: Callback<void> = (
err: Error | null,
) => {
if (err) {
return reject(err);
}
return resolve();
};

const nowSeconds = Math.floor(Date.now() / 1000); // Current time in seconds
client.evalsha(
addMarketPriceScript.hash,
numKeys,
marketCacheKey,
price,
nowSeconds,
callback,
);

});
};
evalAsync = evalAsync.bind(client);

return evalAsync(
getMarketPriceCacheKey(ticker),
);
}

function getMarketPriceCacheKey(ticker: string): string {
return `${ORDERS_CACHE_KEY_PREFIX}${ticker}`;
}

export async function getMedianPrice(client: RedisClient, ticker: string): Promise<string | null> {
// Call the Lua script using EVAL
const medianPrice = await client.evalsha(
getMarketMedianScript.hash,
1,
getMarketPriceCacheKey(ticker),
) as string | null | boolean;

// Handle the cases where result might be boolean (false)
if (medianPrice === null || medianPrice === false) {
return null;
}

return medianPrice as string;
}
2 changes: 2 additions & 0 deletions indexer/packages/redis/src/caches/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ export const removeOrderScript: LuaScript = newLuaScript('removeOrder', '../scri
export const addCanceledOrderIdScript: LuaScript = newLuaScript('addCanceledOrderId', '../scripts/add_canceled_order_id.lua');
export const addStatefulOrderUpdateScript: LuaScript = newLuaScript('addStatefulOrderUpdate', '../scripts/add_stateful_order_update.lua');
export const removeStatefulOrderUpdateScript: LuaScript = newLuaScript('removeStatefulOrderUpdate', '../scripts/remove_stateful_order_update.lua');
export const addMarketPriceScript: LuaScript = newLuaScript('addMarketPrice', '../scripts/add_market_price.lua');
export const getMarketMedianScript: LuaScript = newLuaScript('getMarketMedianPrice', '../scripts/get_market_median_price.lua');

export const allLuaScripts: LuaScript[] = [
deleteZeroPriceLevelScript,
Expand Down
17 changes: 17 additions & 0 deletions indexer/packages/redis/src/scripts/add_market_price.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Key for the ZSET storing price data
local priceCacheKey = KEYS[1]
-- Price to be added
local price = tonumber(ARGV[1])
-- Current timestamp
local nowSeconds = tonumber(ARGV[2])
-- Time window (5 seconds)
local fiveSeconds = 5

-- 1. Add the price to the sorted set (score is the current timestamp)
redis.call("zadd", priceCacheKey, nowSeconds, price)

-- 2. Remove any entries older than 5 seconds
local cutoffTime = nowSeconds - fiveSeconds
redis.call("zremrangebyscore", priceCacheKey, "-inf", cutoffTime)

return true
23 changes: 23 additions & 0 deletions indexer/packages/redis/src/scripts/get_market_median_price.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- Key for the sorted set storing price data
local priceCacheKey = KEYS[1]

-- Get all the prices from the sorted set (ascending order)
local prices = redis.call('zrange', priceCacheKey, 0, -1)

-- If no prices are found, return nil
if #prices == 0 then
return nil
end

-- Calculate the middle index
local middle = math.floor(#prices / 2)

-- Calculate median
if #prices % 2 == 0 then
-- If even, return the average of the two middle elements
local median = (tonumber(prices[middle]) + tonumber(prices[middle + 1])) / 2
return tostring(median)
else
-- If odd, return the middle element
return prices[middle + 1]
end
21 changes: 20 additions & 1 deletion indexer/services/ender/src/lib/candles-generator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import { logger, stats } from '@dydxprotocol-indexer/base';
import { CANDLES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-indexer/kafka';
import {
CANDLE_RESOLUTION_TO_PROTO,
Expand Down Expand Up @@ -88,6 +88,25 @@ export class CandlesGenerator {
// 4. Add Candle kafka messages to KafkaPublisher
this.addCandleKafkaMessages(candles);

// WIP testing:
candles.forEach((candle) => {
OrderbookLevelsCache.addPriceToMarket(redisClient, candle.ticker, start);

logger.info({
at: 'candles_generator#update_candles',
message: `Adding price to market ${candle.ticker}`,
});
});

candles.forEach((candle) => {
const price = OrderbookLevelsCache.getMedianPrice(redisClient, candle.ticker);

logger.info({
at: 'candles_generator#get_median_price',
message: `Median price for market ${candle.ticker} is ${price}`,
});
});

stats.timing(
`${config.SERVICE_NAME}.update_candles.timing`,
Date.now() - start,
Expand Down

0 comments on commit da74622

Please sign in to comment.