From f9684fc4c91bd3187460d1722c88b5c1de2e2fbf Mon Sep 17 00:00:00 2001 From: Roy Li Date: Wed, 25 Sep 2024 13:14:32 -0400 Subject: [PATCH] Create consumer using rack id --- .../indexer-build-and-push-testnet.yml | 1 + .../packages/kafka/__tests__/consumer.test.ts | 4 +- indexer/packages/kafka/src/consumer.ts | 75 +++++++++++-------- .../src/helpers/kafka/kafka-controller.ts | 6 +- indexer/services/scripts/src/print-block.ts | 6 +- .../src/helpers/kafka/kafka-controller.ts | 5 +- .../src/helpers/kafka/kafka-controller.ts | 6 +- 7 files changed, 57 insertions(+), 46 deletions(-) diff --git a/.github/workflows/indexer-build-and-push-testnet.yml b/.github/workflows/indexer-build-and-push-testnet.yml index 3cee1bf285e..a2ebdbff62d 100644 --- a/.github/workflows/indexer-build-and-push-testnet.yml +++ b/.github/workflows/indexer-build-and-push-testnet.yml @@ -3,6 +3,7 @@ name: Indexer Build & Push Images to AWS ECR for Testnet Branch on: # yamllint disable-line rule:truthy push: branches: + - roy/* - main - 'release/indexer/v[0-9]+.[0-9]+.x' # e.g. release/indexer/v0.1.x - 'release/indexer/v[0-9]+.x' # e.g. release/indexer/v1.x diff --git a/indexer/packages/kafka/__tests__/consumer.test.ts b/indexer/packages/kafka/__tests__/consumer.test.ts index de801b2dfe4..e05d67d5e3c 100644 --- a/indexer/packages/kafka/__tests__/consumer.test.ts +++ b/indexer/packages/kafka/__tests__/consumer.test.ts @@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src'; describe.skip('consumer', () => { beforeAll(async () => { await Promise.all([ - consumer.connect(), + consumer!.connect(), producer.connect(), ]); - await consumer.subscribe({ topic: TO_ENDER_TOPIC }); + await consumer!.subscribe({ topic: TO_ENDER_TOPIC }); await startConsumer(); }); diff --git a/indexer/packages/kafka/src/consumer.ts b/indexer/packages/kafka/src/consumer.ts index 93c41d12c54..82c26cf1e02 100644 --- a/indexer/packages/kafka/src/consumer.ts +++ b/indexer/packages/kafka/src/consumer.ts @@ -1,4 +1,5 @@ import { + getAvailabilityZoneId, logger, } from '@dydxprotocol-indexer/base'; import { @@ -13,15 +14,10 @@ const groupIdPrefix: string = config.SERVICE_NAME; const groupIdSuffix: string = config.KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS ? `_${uuidv4()}` : ''; const groupId: string = `${groupIdPrefix}${groupIdSuffix}`; -export const consumer: Consumer = kafka.consumer({ - groupId, - sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS, - rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS, - heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS, - maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS, - readUncommitted: false, - maxBytes: 4194304, // 4MB -}); +// As a hack, we made this mutable since CommonJS doesn't support top level await. +// Top level await would needed to fetch the az id (used as rack id). +// eslint-disable-next-line import/no-mutable-exports +export let consumer: Consumer | undefined; // List of functions to run per message consumed. let onMessageFunction: (topic: string, message: KafkaMessage) => Promise; @@ -51,38 +47,51 @@ export function updateOnBatchFunction( // Whether the consumer is stopped. let stopped: boolean = false; -consumer.on('consumer.disconnect', async () => { +export async function stopConsumer(): Promise { logger.info({ - at: 'consumers#disconnect', - message: 'Kafka consumer disconnected', + at: 'kafka-consumer#stop', + message: 'Stopping kafka consumer', groupId, }); - if (!stopped) { - await consumer.connect(); - logger.info({ - at: 'kafka-consumer#disconnect', - message: 'Kafka consumer reconnected', - groupId, - }); - } else { + stopped = true; + await consumer!.disconnect(); +} + +export async function initConsumer(): Promise { + consumer = kafka.consumer({ + groupId, + sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS, + rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS, + heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS, + maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS, + readUncommitted: false, + maxBytes: 4194304, // 4MB + rackId: await getAvailabilityZoneId(), + }); + + consumer!.on('consumer.disconnect', async () => { logger.info({ - at: 'kafka-consumer#disconnect', - message: 'Not reconnecting since task is shutting down', + at: 'consumers#disconnect', + message: 'Kafka consumer disconnected', groupId, }); - } -}); -export async function stopConsumer(): Promise { - logger.info({ - at: 'kafka-consumer#stop', - message: 'Stopping kafka consumer', - groupId, + if (!stopped) { + await consumer!.connect(); + logger.info({ + at: 'kafka-consumer#disconnect', + message: 'Kafka consumer reconnected', + groupId, + }); + } else { + logger.info({ + at: 'kafka-consumer#disconnect', + message: 'Not reconnecting since task is shutting down', + groupId, + }); + } }); - - stopped = true; - await consumer.disconnect(); } export async function startConsumer(batchProcessing: boolean = false): Promise { @@ -104,7 +113,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise { await Promise.all([ - consumer.connect(), + initConsumer(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: TO_ENDER_TOPIC, // https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning // Need to set fromBeginning to true, so when ender restarts, it will consume all messages diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index 05855229ca2..34d32a77280 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -42,7 +42,7 @@ export function seek(offset: bigint): void { offset: offset.toString(), }); - consumer.seek({ + consumer!.seek({ topic: TO_ENDER_TOPIC, partition: 0, offset: offset.toString(), @@ -57,11 +57,11 @@ export function seek(offset: bigint): void { export async function connect(height: number): Promise { await Promise.all([ - consumer.connect(), + consumer!.connect(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: TO_ENDER_TOPIC, fromBeginning: true, }); diff --git a/indexer/services/socks/src/helpers/kafka/kafka-controller.ts b/indexer/services/socks/src/helpers/kafka/kafka-controller.ts index 03409f28494..247819f8bf2 100644 --- a/indexer/services/socks/src/helpers/kafka/kafka-controller.ts +++ b/indexer/services/socks/src/helpers/kafka/kafka-controller.ts @@ -2,18 +2,19 @@ import { logger } from '@dydxprotocol-indexer/base'; import { WebsocketTopics, consumer, + initConsumer, stopConsumer, } from '@dydxprotocol-indexer/kafka'; export async function connect(): Promise { - await consumer.connect(); + await initConsumer(); logger.info({ at: 'kafka-controller#connect', message: 'Connected to Kafka', }); - await consumer.subscribe({ topics: Object.values(WebsocketTopics) }); + await consumer!.subscribe({ topics: Object.values(WebsocketTopics) }); } export async function disconnect(): Promise { diff --git a/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts b/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts index ae2038f0f33..528eb4b8515 100644 --- a/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts +++ b/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts @@ -1,6 +1,6 @@ import { logger } from '@dydxprotocol-indexer/base'; import { - consumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction, + consumer, initConsumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction, } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; @@ -10,11 +10,11 @@ import { onMessage } from '../../lib/on-message'; export async function connect(): Promise { await Promise.all([ - consumer.connect(), + initConsumer(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: KafkaTopics.TO_VULCAN, // https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning // fromBeginning is by default set to false, so vulcan will only consume messages produced