diff --git a/indexer/packages/redis/src/caches/orderbook-levels-cache.ts b/indexer/packages/redis/src/caches/orderbook-levels-cache.ts index cb75fda5845..2e9ba9958ab 100644 --- a/indexer/packages/redis/src/caches/orderbook-levels-cache.ts +++ b/indexer/packages/redis/src/caches/orderbook-levels-cache.ts @@ -12,6 +12,8 @@ import { deleteStalePriceLevelScript, getOrderbookSideScript, incrementOrderbookLevelScript, + addMarketPriceScript, + getMarketMedianScript, } from './scripts'; // Cache of orderbook levels for each clob pair @@ -20,6 +22,7 @@ import { // TODO(CORE-512): add info/resources around caches. Doc: // https://www.notion.so/dydx/Indexer-Technical-Spec-a6b15644502048f994c98dee35b96e96#61d5f8ca5117476caab78b3f0691b1d0 export const ORDERS_CACHE_KEY_PREFIX: string = 'v4/orderbookLevels/'; +export const MARKET_PRICES_CACHE_KEY_PREFIX: string = 'v4/marketPrices/'; /** * Update the total size of orders at a price level for a specific ticker/side with a delta. The @@ -553,3 +556,65 @@ export async function getOrderBookMidPrice( } return bestBid.plus(bestAsk).div(2).toFixed(); } + +// Adds a price to the market prices cache +export async function addPriceToMarket( + client: RedisClient, + ticker: string, + price: number, +): Promise { + // Number of keys for the lua script. + const numKeys: number = 1; + + let evalAsync: ( + marketCacheKey: string, + ) => Promise = (marketCacheKey) => { + + return new Promise((resolve, reject) => { + const callback: Callback = ( + err: Error | null, + ) => { + if (err) { + return reject(err); + } + return resolve(); + }; + + const nowSeconds = Math.floor(Date.now() / 1000); // Current time in seconds + client.evalsha( + addMarketPriceScript.hash, + numKeys, + marketCacheKey, + price, + nowSeconds, + callback, + ); + + }); + }; + evalAsync = evalAsync.bind(client); + + return evalAsync( + getMarketPriceCacheKey(ticker), + ); +} + +function getMarketPriceCacheKey(ticker: string): string { + return `${ORDERS_CACHE_KEY_PREFIX}${ticker}`; +} + +export async function getMedianPrice(client: RedisClient, ticker: string): Promise { + // Call the Lua script using EVAL + const medianPrice = await client.evalsha( + getMarketMedianScript.hash, + 1, + getMarketPriceCacheKey(ticker), + ) as string | null | boolean; + + // Handle the cases where result might be boolean (false) + if (medianPrice === null || medianPrice === false) { + return null; + } + + return medianPrice as string; +} diff --git a/indexer/packages/redis/src/caches/scripts.ts b/indexer/packages/redis/src/caches/scripts.ts index 3e1032c6f23..0e4c405d2ca 100644 --- a/indexer/packages/redis/src/caches/scripts.ts +++ b/indexer/packages/redis/src/caches/scripts.ts @@ -63,6 +63,8 @@ export const removeOrderScript: LuaScript = newLuaScript('removeOrder', '../scri export const addCanceledOrderIdScript: LuaScript = newLuaScript('addCanceledOrderId', '../scripts/add_canceled_order_id.lua'); export const addStatefulOrderUpdateScript: LuaScript = newLuaScript('addStatefulOrderUpdate', '../scripts/add_stateful_order_update.lua'); export const removeStatefulOrderUpdateScript: LuaScript = newLuaScript('removeStatefulOrderUpdate', '../scripts/remove_stateful_order_update.lua'); +export const addMarketPriceScript: LuaScript = newLuaScript('addMarketPrice', '../scripts/add_market_price.lua'); +export const getMarketMedianScript: LuaScript = newLuaScript('getMarketMedianPrice', '../scripts/get_market_median_price.lua'); export const allLuaScripts: LuaScript[] = [ deleteZeroPriceLevelScript, diff --git a/indexer/packages/redis/src/scripts/add_market_price.lua b/indexer/packages/redis/src/scripts/add_market_price.lua new file mode 100644 index 00000000000..0e1467bb31f --- /dev/null +++ b/indexer/packages/redis/src/scripts/add_market_price.lua @@ -0,0 +1,17 @@ +-- Key for the ZSET storing price data +local priceCacheKey = KEYS[1] +-- Price to be added +local price = tonumber(ARGV[1]) +-- Current timestamp +local nowSeconds = tonumber(ARGV[2]) +-- Time window (5 seconds) +local fiveSeconds = 5 + +-- 1. Add the price to the sorted set (score is the current timestamp) +redis.call("zadd", priceCacheKey, nowSeconds, price) + +-- 2. Remove any entries older than 5 seconds +local cutoffTime = nowSeconds - fiveSeconds +redis.call("zremrangebyscore", priceCacheKey, "-inf", cutoffTime) + +return true \ No newline at end of file diff --git a/indexer/packages/redis/src/scripts/get_market_median_price.lua b/indexer/packages/redis/src/scripts/get_market_median_price.lua new file mode 100644 index 00000000000..281da9bed86 --- /dev/null +++ b/indexer/packages/redis/src/scripts/get_market_median_price.lua @@ -0,0 +1,23 @@ +-- Key for the sorted set storing price data +local priceCacheKey = KEYS[1] + +-- Get all the prices from the sorted set (ascending order) +local prices = redis.call('zrange', priceCacheKey, 0, -1) + +-- If no prices are found, return nil +if #prices == 0 then + return nil +end + +-- Calculate the middle index +local middle = math.floor(#prices / 2) + +-- Calculate median +if #prices % 2 == 0 then + -- If even, return the average of the two middle elements + local median = (tonumber(prices[middle]) + tonumber(prices[middle + 1])) / 2 + return tostring(median) +else + -- If odd, return the middle element + return prices[middle + 1] +end diff --git a/indexer/services/ender/src/lib/candles-generator.ts b/indexer/services/ender/src/lib/candles-generator.ts index d7dd7bba34c..78a3632dcfd 100644 --- a/indexer/services/ender/src/lib/candles-generator.ts +++ b/indexer/services/ender/src/lib/candles-generator.ts @@ -1,4 +1,4 @@ -import { stats } from '@dydxprotocol-indexer/base'; +import { logger, stats } from '@dydxprotocol-indexer/base'; import { CANDLES_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-indexer/kafka'; import { CANDLE_RESOLUTION_TO_PROTO, @@ -88,6 +88,25 @@ export class CandlesGenerator { // 4. Add Candle kafka messages to KafkaPublisher this.addCandleKafkaMessages(candles); + // WIP testing: + candles.forEach((candle) => { + OrderbookLevelsCache.addPriceToMarket(redisClient, candle.ticker, start); + + logger.info({ + at: 'candles_generator#update_candles', + message: `Adding price to market ${candle.ticker}`, + }); + }); + + candles.forEach((candle) => { + const price = OrderbookLevelsCache.getMedianPrice(redisClient, candle.ticker); + + logger.info({ + at: 'candles_generator#get_median_price', + message: `Median price for market ${candle.ticker} is ${price}`, + }); + }); + stats.timing( `${config.SERVICE_NAME}.update_candles.timing`, Date.now() - start,