-
Notifications
You must be signed in to change notification settings - Fork 122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create consumer using rack id (#2352) #2393
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src'; | |
describe.skip('consumer', () => { | ||
beforeAll(async () => { | ||
await Promise.all([ | ||
consumer.connect(), | ||
consumer!.connect(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Potential Risk with Non-Null Assertions on The
🔗 Analysis chainVerify the nullability of the The addition of non-null assertions ( Please verify the following:
Run the following script to verify the consumer initialization: Also applies to: 16-16 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check consumer initialization in the source file
# Test: Search for consumer initialization
rg --type typescript -A 5 'consumer\s*=|let\s+consumer' src/consumer.ts
# Test: Check if consumer is marked as nullable
rg --type typescript 'consumer\s*:\s*\w+\s*\|?\s*null' src/consumer.ts
Length of output: 216 Script: #!/bin/bash
# Description: Check consumer initialization in the source file
# Test: Search for consumer initialization
rg -g "*.ts" -A 5 'consumer\s*=|let\s+consumer' src/consumer.ts
# Test: Check if consumer is marked as nullable
rg -g "*.ts" 'consumer\s*:\s*\w+\s*\|?\s*null' src/consumer.ts
Length of output: 242 Script: #!/bin/bash
# Description: Locate all instances of consumer.ts in the repository
fd 'consumer\.ts$' --type f
Length of output: 68 Script: #!/bin/bash
# Description: Check consumer initialization in the correct source file
# Test: Search for consumer initialization
rg -g "*.ts" -A 5 'consumer\s*=|let\s+consumer' indexer/packages/kafka/src/consumer.ts
# Test: Check if consumer is marked as nullable
rg -g "*.ts" 'consumer\s*:\s*\w+\s*\|?\s*null' indexer/packages/kafka/src/consumer.ts
Length of output: 668 |
||
producer.connect(), | ||
]); | ||
await consumer.subscribe({ topic: TO_ENDER_TOPIC }); | ||
await consumer!.subscribe({ topic: TO_ENDER_TOPIC }); | ||
await startConsumer(); | ||
}); | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,4 +1,5 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
getAvailabilityZoneId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} from '@dydxprotocol-indexer/base'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -13,15 +14,10 @@ const groupIdPrefix: string = config.SERVICE_NAME; | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const groupIdSuffix: string = config.KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS ? `_${uuidv4()}` : ''; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const groupId: string = `${groupIdPrefix}${groupIdSuffix}`; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
export const consumer: Consumer = kafka.consumer({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
readUncommitted: false, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxBytes: 4194304, // 4MB | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// As a hack, we made this mutable since CommonJS doesn't support top level await. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Top level await would needed to fetch the az id (used as rack id). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// eslint-disable-next-line import/no-mutable-exports | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
export let consumer: Consumer | undefined; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+17
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider refactoring to avoid mutable exports Mutating exported variables can lead to unexpected behaviors and is generally discouraged. To maintain code integrity and improve maintainability, consider refactoring the code to encapsulate the |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// List of functions to run per message consumed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let onMessageFunction: (topic: string, message: KafkaMessage) => Promise<void>; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -51,38 +47,51 @@ export function updateOnBatchFunction( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Whether the consumer is stopped. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let stopped: boolean = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
consumer.on('consumer.disconnect', async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
export async function stopConsumer(): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'consumers#disconnect', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
message: 'Kafka consumer disconnected', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'kafka-consumer#stop', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
message: 'Stopping kafka consumer', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!stopped) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await consumer.connect(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'kafka-consumer#disconnect', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
message: 'Kafka consumer reconnected', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
stopped = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await consumer!.disconnect(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle potential undefined If Apply this diff to handle the potential undefined stopped = true;
- await consumer!.disconnect();
+ if (consumer) {
+ await consumer.disconnect();
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
export async function initConsumer(): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
consumer = kafka.consumer({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
readUncommitted: false, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxBytes: 4194304, // 4MB | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
rackId: await getAvailabilityZoneId(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
consumer!.on('consumer.disconnect', async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'kafka-consumer#disconnect', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
message: 'Not reconnecting since task is shutting down', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'consumers#disconnect', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
message: 'Kafka consumer disconnected', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
export async function stopConsumer(): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'kafka-consumer#stop', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
message: 'Stopping kafka consumer', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!stopped) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await consumer!.connect(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'kafka-consumer#disconnect', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
message: 'Kafka consumer reconnected', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+80
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling during consumer reconnection When reconnecting the consumer after a disconnect, errors might occur during Apply this diff to add error handling: if (!stopped) {
+ try {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
+ } catch (error) {
+ logger.error({
+ at: 'kafka-consumer#disconnect',
+ message: 'Error reconnecting Kafka consumer',
+ error,
+ groupId,
+ });
+ }
} else {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'kafka-consumer#disconnect', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
message: 'Not reconnecting since task is shutting down', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
stopped = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await consumer.disconnect(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
export async function startConsumer(batchProcessing: boolean = false): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -104,7 +113,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise<v | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await consumer.run(consumerRunConfig); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await consumer!.run(consumerRunConfig); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure The Apply this diff to check for + if (!consumer) {
+ throw new Error('Consumer not initialized. Please call initConsumer() before startConsumer().');
+ }
await consumer!.run(consumerRunConfig); 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
at: 'consumers#connect', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,18 @@ | ||
import { logger } from '@dydxprotocol-indexer/base'; | ||
import { | ||
consumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction, | ||
consumer, initConsumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction, | ||
} from '@dydxprotocol-indexer/kafka'; | ||
import { KafkaMessage } from 'kafkajs'; | ||
|
||
import { onMessage } from '../../lib/on-message'; | ||
|
||
export async function connect(): Promise<void> { | ||
await Promise.all([ | ||
consumer.connect(), | ||
initConsumer(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Approve
Consider adding error handling to account for potential initialization failures: await initConsumer();
if (!consumer) {
throw new Error('Failed to initialize Kafka consumer');
}
await consumer.subscribe({
// ... existing subscription options
}); This approach ensures that the code fails fast and explicitly if the consumer isn't properly initialized, rather than potentially causing issues later in the execution. Also applies to: 15-15 |
||
producer.connect(), | ||
]); | ||
|
||
await consumer.subscribe({ | ||
await consumer!.subscribe({ | ||
topic: TO_ENDER_TOPIC, | ||
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning | ||
// Need to set fromBeginning to true, so when ender restarts, it will consume all messages | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address skipped test suite
The entire test suite is currently skipped due to potential flakiness caused by timeouts. This is a significant issue as it means these tests are not being run, which could lead to undetected bugs.
Consider the following actions:
Please don't leave tests skipped indefinitely, as this defeats the purpose of having tests in the first place.