Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create consumer using rack id (#2352) (backport #2393) #2414

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions indexer/packages/kafka/__tests__/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src';
describe.skip('consumer', () => {
beforeAll(async () => {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);
await consumer.subscribe({ topic: TO_ENDER_TOPIC });
await consumer!.subscribe({ topic: TO_ENDER_TOPIC });
await startConsumer();
});

Expand Down
75 changes: 42 additions & 33 deletions indexer/packages/kafka/src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
getAvailabilityZoneId,
logger,
} from '@dydxprotocol-indexer/base';
import {
Expand All @@ -13,15 +14,10 @@ const groupIdPrefix: string = config.SERVICE_NAME;
const groupIdSuffix: string = config.KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS ? `_${uuidv4()}` : '';
const groupId: string = `${groupIdPrefix}${groupIdSuffix}`;

export const consumer: Consumer = kafka.consumer({
groupId,
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS,
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS,
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS,
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS,
readUncommitted: false,
maxBytes: 4194304, // 4MB
});
// As a hack, we made this mutable since CommonJS doesn't support top level await.
// Top level await would needed to fetch the az id (used as rack id).
// eslint-disable-next-line import/no-mutable-exports
export let consumer: Consumer | undefined;

// List of functions to run per message consumed.
let onMessageFunction: (topic: string, message: KafkaMessage) => Promise<void>;
Expand Down Expand Up @@ -51,38 +47,51 @@ export function updateOnBatchFunction(
// Whether the consumer is stopped.
let stopped: boolean = false;

consumer.on('consumer.disconnect', async () => {
export async function stopConsumer(): Promise<void> {
logger.info({
at: 'consumers#disconnect',
message: 'Kafka consumer disconnected',
at: 'kafka-consumer#stop',
message: 'Stopping kafka consumer',
groupId,
});

if (!stopped) {
await consumer.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
stopped = true;
await consumer!.disconnect();
}

export async function initConsumer(): Promise<void> {
consumer = kafka.consumer({
groupId,
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS,
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS,
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS,
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS,
readUncommitted: false,
maxBytes: 4194304, // 4MB
rackId: await getAvailabilityZoneId(),
});

consumer!.on('consumer.disconnect', async () => {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
at: 'consumers#disconnect',
message: 'Kafka consumer disconnected',
groupId,
});
}
});

export async function stopConsumer(): Promise<void> {
logger.info({
at: 'kafka-consumer#stop',
message: 'Stopping kafka consumer',
groupId,
if (!stopped) {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
}
});

stopped = true;
await consumer.disconnect();
}

export async function startConsumer(batchProcessing: boolean = false): Promise<void> {
Expand All @@ -102,7 +111,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise<v
};
}

await consumer.run(consumerRunConfig);
await consumer!.run(consumerRunConfig);

logger.info({
at: 'consumers#connect',
Expand Down
6 changes: 3 additions & 3 deletions indexer/services/ender/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
consumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction,
consumer, initConsumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction,
} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
initConsumer(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: TO_ENDER_TOPIC,
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning
// Need to set fromBeginning to true, so when ender restarts, it will consume all messages
Expand Down
6 changes: 3 additions & 3 deletions indexer/services/scripts/src/print-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function seek(offset: bigint): void {
offset: offset.toString(),
});

consumer.seek({
consumer!.seek({
topic: TO_ENDER_TOPIC,
partition: 0,
offset: offset.toString(),
Expand All @@ -57,11 +57,11 @@ export function seek(offset: bigint): void {

export async function connect(height: number): Promise<void> {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: TO_ENDER_TOPIC,
fromBeginning: true,
});
Expand Down
5 changes: 3 additions & 2 deletions indexer/services/socks/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ import { logger } from '@dydxprotocol-indexer/base';
import {
WebsocketTopics,
consumer,
initConsumer,
stopConsumer,
} from '@dydxprotocol-indexer/kafka';

export async function connect(): Promise<void> {
await consumer.connect();
await initConsumer();

logger.info({
at: 'kafka-controller#connect',
message: 'Connected to Kafka',
});

await consumer.subscribe({ topics: Object.values(WebsocketTopics) });
await consumer!.subscribe({ topics: Object.values(WebsocketTopics) });
}

export async function disconnect(): Promise<void> {
Expand Down
6 changes: 3 additions & 3 deletions indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
consumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction,
consumer, initConsumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction,
} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

Expand All @@ -10,11 +10,11 @@ import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
initConsumer(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: KafkaTopics.TO_VULCAN,
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning
// Need to set fromBeginning to true, so when vulcan restarts, it will consume all messages
Expand Down
Loading