From b67e104e146825c78991517930c86bd609d78283 Mon Sep 17 00:00:00 2001 From: Christopher-Li Date: Wed, 21 Aug 2024 15:26:50 -0400 Subject: [PATCH] Remove unnecessary logs and add Ender metrics (#2133) (cherry picked from commit 3c13930ac9c4c3e5fd79c6d652d469e676df0794) --- indexer/packages/kafka/src/batch-kafka-producer.ts | 7 ++++++- indexer/services/ender/src/lib/batched-handlers.ts | 10 +--------- indexer/services/ender/src/lib/candles-generator.ts | 2 ++ indexer/services/ender/src/lib/on-message.ts | 8 ++++++++ indexer/services/ender/src/lib/sync-handlers.ts | 6 ------ 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/indexer/packages/kafka/src/batch-kafka-producer.ts b/indexer/packages/kafka/src/batch-kafka-producer.ts index 0ae478680e..5bc63f7a41 100644 --- a/indexer/packages/kafka/src/batch-kafka-producer.ts +++ b/indexer/packages/kafka/src/batch-kafka-producer.ts @@ -1,7 +1,8 @@ -import { logger } from '@dydxprotocol-indexer/base'; +import { logger, stats } from '@dydxprotocol-indexer/base'; import { IHeaders, Producer, RecordMetadata } from 'kafkajs'; import _ from 'lodash'; +import config from './config'; import { KafkaTopics } from './types'; /** @@ -65,6 +66,7 @@ export class BatchKafkaProducer { } private sendBatch(): void { + const startTime: number = Date.now(); if (!_.isEmpty(this.producerMessages)) { this.producerPromises.push( this.producer.send({ topic: this.topic, messages: this.producerMessages }), @@ -80,7 +82,10 @@ export class BatchKafkaProducer { 0, ), topic: this.topic, + sendTime: Date.now() - startTime, }); + stats.gauge(`${config.SERVICE_NAME}.kafka_batch_size`, this.currentSize); + stats.timing(`${config.SERVICE_NAME}.kafka_batch_send_time`, Date.now() - startTime); this.producerMessages = []; this.currentSize = 0; } diff --git a/indexer/services/ender/src/lib/batched-handlers.ts b/indexer/services/ender/src/lib/batched-handlers.ts index a99637f7a8..9f656a9d96 100644 --- a/indexer/services/ender/src/lib/batched-handlers.ts +++ b/indexer/services/ender/src/lib/batched-handlers.ts @@ -1,4 +1,4 @@ -import { logger, stats } from '@dydxprotocol-indexer/base'; +import { stats } from '@dydxprotocol-indexer/base'; import _ from 'lodash'; import * as pg from 'pg'; @@ -100,14 +100,6 @@ export class BatchedHandlers { _.forEach(consolidatedKafkaEventGroup, (events: ConsolidatedKafkaEvent[]) => { kafkaPublisher.addEvents(events); }); - logger.info({ - at: 'BatchedHandlers#process', - message: 'Finished processing batch of handlers', - batchIndex, - handlerCountMapping, - batchProcessTime: Date.now() - start, - batchSize: this.batchedHandlers[batchIndex].length, - }); stats.timing(`${config.SERVICE_NAME}.batch_process_time`, Date.now() - start); stats.histogram(`${config.SERVICE_NAME}.batch_size`, this.batchedHandlers[batchIndex].length); } diff --git a/indexer/services/ender/src/lib/candles-generator.ts b/indexer/services/ender/src/lib/candles-generator.ts index 00fd1ab598..59d5ad0109 100644 --- a/indexer/services/ender/src/lib/candles-generator.ts +++ b/indexer/services/ender/src/lib/candles-generator.ts @@ -534,6 +534,7 @@ export class CandlesGenerator { * Get the cached orderbook mid price for a given ticker */ export async function getOrderbookMidPriceMap(): Promise<{ [ticker: string]: OrderbookMidPrice; }> { + const start: number = Date.now(); const perpetualMarkets = Object.values(perpetualMarketRefresher.getPerpetualMarketsMap()); const promises = perpetualMarkets.map(async (perpetualMarket: PerpetualMarketFromDatabase) => { @@ -550,5 +551,6 @@ export async function getOrderbookMidPriceMap(): Promise<{ [ticker: string]: Ord Object.assign(priceMap, price); }); + stats.timing(`${config.SERVICE_NAME}.get_orderbook_mid_price_map.timing`, Date.now() - start); return priceMap; } diff --git a/indexer/services/ender/src/lib/on-message.ts b/indexer/services/ender/src/lib/on-message.ts index 99eec93054..79fa23f524 100644 --- a/indexer/services/ender/src/lib/on-message.ts +++ b/indexer/services/ender/src/lib/on-message.ts @@ -73,6 +73,14 @@ export async function onMessage(message: KafkaMessage): Promise { topic: KafkaTopics.TO_ENDER, }, ); + logger.info({ + at: 'onMessage#onMessage', + message: 'Processing message', + offset, + blockHeight, + messageTimeInQueue: start - Number(message.timestamp), + numEvents: indexerTendermintBlock.events.length, + }); let success: boolean = false; const txId: number = await Transaction.start(); diff --git a/indexer/services/ender/src/lib/sync-handlers.ts b/indexer/services/ender/src/lib/sync-handlers.ts index badcafba09..cbd77b47f7 100644 --- a/indexer/services/ender/src/lib/sync-handlers.ts +++ b/indexer/services/ender/src/lib/sync-handlers.ts @@ -83,12 +83,6 @@ export class SyncHandlers { _.forEach(consolidatedKafkaEventGroup, (events: ConsolidatedKafkaEvent[]) => { kafkaPublisher.addEvents(events); }); - logger.info({ - at: 'SyncHandlers#process', - message: 'Finished processing synchronous handlers', - handlerCountMapping, - batchProcessTime: Date.now() - start, - }); stats.timing(`${config.SERVICE_NAME}.synchronous_events_process_time`, Date.now() - start); } }