-
Notifications
You must be signed in to change notification settings - Fork 122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create consumer using rack id (#2352) #2393
Conversation
WalkthroughThe changes involve significant updates to the Kafka consumer's initialization and connection management across multiple files. The Changes
Sequence Diagram(s)sequenceDiagram
participant App
participant KafkaController
participant Consumer
App->>KafkaController: call connect()
KafkaController->>Consumer: call initConsumer()
Consumer-->>KafkaController: consumer initialized
KafkaController->>Consumer: call subscribe()
Consumer-->>KafkaController: subscription successful
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (5)
indexer/services/socks/src/helpers/kafka/kafka-controller.ts (1)
Line range hint
1-28
: Overall: Good refactoring, but consider addressing non-null assertion.The changes in this file are part of a larger refactoring effort to improve Kafka consumer management. The introduction of
initConsumer
and the changes to theconnect
function improve modularity and separation of concerns.However, the use of the non-null assertion in the
consumer.subscribe
call introduces a potential risk of runtime errors. Consider addressing this as suggested in the previous comment.As a follow-up action, it would be beneficial to review the
initConsumer
implementation and the overall consumer lifecycle management to ensure that the consumer is always properly initialized before use.indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts (3)
13-14
: LGTM: Consumer initialization refactored.The replacement of
consumer.connect()
withinitConsumer()
is a good refactoring choice. It provides a more modular approach to initializing the Kafka consumer, which can improve code organization and allow for more complex initialization logic if needed in the future.Consider adding a comment explaining the purpose of
initConsumer()
for better code documentation. For example:// Initialize the Kafka consumer with default configuration await initConsumer(),
Line range hint
17-25
: Consider a safer approach to handle potentially undefined consumer.The addition of the non-null assertion operator (
!
) toconsumer.subscribe()
suggests thatconsumer
might be undefined at this point. While this allows the code to compile, it doesn't provide runtime safety.Consider implementing a safer approach to handle the potentially undefined consumer. Here are two suggestions:
- Add a runtime check:
if (!consumer) { throw new Error('Consumer is not initialized'); } await consumer.subscribe({ // ... existing options });
- Use optional chaining with a fallback:
await consumer?.subscribe({ // ... existing options }) ?? Promise.reject(new Error('Consumer is not initialized'));Both approaches provide better runtime safety and clearer error messages if the consumer is not properly initialized.
Line range hint
1-50
: Summary of changes and recommendationsThe changes in this file improve the overall structure of the Kafka consumer initialization and connection management. The introduction of
initConsumer()
provides a more modular approach to setting up the consumer. However, there are a few points to consider:
The non-null assertion on
consumer.subscribe()
(line 17) introduces a potential runtime issue. Consider implementing a safer approach to handle the case where the consumer might be undefined, as suggested in the previous comment.Adding a brief comment explaining the purpose of
initConsumer()
would improve code documentation.The rest of the file remains unchanged and appears to be functioning as expected.
Overall, these changes are a step in the right direction for better code organization, but addressing the potential runtime issue with the consumer subscription is crucial for improving the robustness of the code.
indexer/services/scripts/src/print-block.ts (1)
Consider a safer approach to handle potential null
consumer
The use of the non-null assertion operator (
!
) onconsumer
assumes that it will always be initialized whenseek
is called. This could lead to runtime errors ifconsumer
is actuallynull
orundefined
.Consider using a null check or optional chaining for safer operation:
if (consumer) { consumer.seek({ topic: TO_ENDER_TOPIC, partition: 0, offset: offset.toString(), }); } else { logger.error({ at: 'consumer#seek', message: 'Cannot seek: consumer is not initialized', }); }This approach prevents potential runtime errors and provides better error handling.
🔗 Analysis chain
Line range hint
45-49
: Consider a safer approach to handle potential nullconsumer
The use of the non-null assertion operator (
!
) onconsumer
assumes that it will always be initialized whenseek
is called. This could lead to runtime errors ifconsumer
is actually null or undefined.Consider using a null check or optional chaining for safer operation:
if (consumer) { consumer.seek({ topic: TO_ENDER_TOPIC, partition: 0, offset: offset.toString(), }); } else { logger.error({ at: 'consumer#seek', message: 'Cannot seek: consumer is not initialized', }); }This approach prevents potential runtime errors and provides better error handling.
To ensure
consumer
is always initialized beforeseek
is called, we can check its usage:🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for direct calls to the seek function rg --type typescript "seek\(" -C 5Length of output: 155
Script:
#!/bin/bash # Search for direct calls to the seek function in TypeScript files rg "seek\(" -g "*.ts" -C 5Length of output: 2177
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
- indexer/packages/kafka/tests/consumer.test.ts (1 hunks)
- indexer/packages/kafka/src/consumer.ts (4 hunks)
- indexer/services/ender/src/helpers/kafka/kafka-controller.ts (1 hunks)
- indexer/services/scripts/src/print-block.ts (2 hunks)
- indexer/services/socks/src/helpers/kafka/kafka-controller.ts (1 hunks)
- indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts (2 hunks)
🔇 Additional comments (8)
indexer/services/socks/src/helpers/kafka/kafka-controller.ts (3)
5-5
: LGTM: Import changes align with new consumer initialization approach.The addition of
initConsumer
import is consistent with the changes in theconnect
function. This refactoring likely improves the separation of concerns in the Kafka consumer initialization process.
10-10
: LGTM: Consumer initialization refactored.The replacement of
consumer.connect()
withinitConsumer()
is a good refactoring step. It likely encapsulates the connection logic, potentially making it easier to manage and test.
17-17
:⚠️ Potential issueConsider safer alternatives to non-null assertion.
The addition of the non-null assertion (
!
) toconsumer
suggests that it might be nullable. While this allows the code to compile, it could lead to runtime errors ifconsumer
is actually null when this line is executed.Consider the following alternatives:
Use optional chaining:
await consumer?.subscribe(...)
. This would safely handle the case whereconsumer
is null or undefined.Add a null check before the subscribe call:
if (consumer) { await consumer.subscribe({ topics: Object.values(WebsocketTopics) }); } else { throw new Error('Consumer not initialized'); }If
consumer
is guaranteed to be initialized by this point, consider updating its type to be non-nullable, removing the need for the assertion.To better understand the
consumer
initialization, let's check its usage:✅ Verification successful
Non-null assertion is safe, but consider alternative approaches for better safety.
The
initConsumer
function properly initializesconsumer
, ensuring it is not null whensubscribe
is called. However, for enhanced safety and maintainability, you might consider the following alternatives:
Optional Chaining:
await consumer?.subscribe({ topics: Object.values(WebsocketTopics) });This safely handles cases where
consumer
might be null or undefined.Explicit Null Check:
if (consumer) { await consumer.subscribe({ topics: Object.values(WebsocketTopics) }); } else { throw new Error('Consumer not initialized'); }Update Type to Non-Nullable:
Ifconsumer
is guaranteed to be initialized before use, update its type to be non-nullable, eliminating the need for the assertion.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for consumer initialization and usage rg --type typescript 'consumer\s*=' -A 5 rg --type typescript 'initConsumer' -A 5Length of output: 153
Script:
#!/bin/bash # Search for consumer initialization and usage in TypeScript files rg 'consumer\s*=' --glob '*.ts' --glob '*.tsx' -A 5 rg 'initConsumer' --glob '*.ts' --glob '*.tsx' -A 5Length of output: 4190
Script:
#!/bin/bash # Extract the implementation of initConsumer function in TypeScript files find . -type f \( -name "*.ts" -o -name "*.tsx" \) -exec grep -A 10 'function initConsumer' {} \;Length of output: 529
indexer/services/ender/src/helpers/kafka/kafka-controller.ts (1)
3-3
: LGTM: Import statement updated correctly.The import statement has been appropriately updated to include
initConsumer
, which is now used in theconnect
function.indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts (1)
3-4
: LGTM: Import statement updated correctly.The import statement has been properly updated to include
initConsumer
, which is consistent with the changes in theconnect
function. This change reflects the refactoring of the Kafka consumer initialization into a separate function.indexer/services/scripts/src/print-block.ts (2)
60-66
:⚠️ Potential issueImplement safer null checks in asynchronous
connect
functionThe use of non-null assertion operators (
!
) onconsumer
in theconnect
function assumes that it will always be initialized. This could lead to runtime errors ifconsumer
is null or undefined.Consider implementing null checks and proper error handling:
export async function connect(height: number): Promise<void> { if (!consumer) { throw new Error('Consumer is not initialized'); } try { await Promise.all([ consumer.connect(), producer.connect(), ]); await consumer.subscribe({ topic: TO_ENDER_TOPIC, fromBeginning: true, }); updateOnMessageFunction((_topic: string, message: KafkaMessage): Promise<void> => { return printMessageAtHeight(message, height); }); logger.info({ at: 'consumers#connect', message: 'Connected to Kafka', }); } catch (error) { logger.error({ at: 'consumers#connect', message: 'Failed to connect to Kafka', error, }); throw error; } }This approach provides better error handling and prevents potential runtime errors.
To ensure
consumer
is always initialized beforeconnect
is called, we can check its initialization and usage:#!/bin/bash # Search for consumer initialization rg --type typescript "consumer\s*=\s*" -C 5 # Search for calls to the connect function rg --type typescript "connect\(" -C 5
Line range hint
1-180
: Request for context: Changes toconsumer
initialization and managementThe introduction of non-null assertions for the
consumer
variable in theseek
andconnect
functions suggests that there might have been changes to howconsumer
is initialized or managed elsewhere in the codebase.To better understand the context of these changes and ensure they're part of a coherent strategy, please provide more information about:
- Changes to
consumer
initialization- Any modifications to its lifecycle management
- The rationale behind making it nullable or lazily initialized (if applicable)
Additionally, I recommend a comprehensive review of
consumer
usage across the codebase to ensure consistency and prevent potential runtime errors. Here's a script to help with this review:This will help identify any inconsistencies or potential issues in how
consumer
is used throughout the project.indexer/packages/kafka/src/consumer.ts (1)
2-2
: Verify the availability zone ID retrievalEnsure that
getAvailabilityZoneId()
returns the correct availability zone ID and handles any potential errors during the asynchronous operation. Consider adding error handling or validation if necessary.Run the following script to verify the
getAvailabilityZoneId
function implementation and usage:
} from '@dydxprotocol-indexer/kafka'; | ||
import { KafkaMessage } from 'kafkajs'; | ||
|
||
import { onMessage } from '../../lib/on-message'; | ||
|
||
export async function connect(): Promise<void> { | ||
await Promise.all([ | ||
consumer.connect(), | ||
initConsumer(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approve initConsumer()
usage, but consider error handling for consumer initialization.
-
The use of
initConsumer()
is a good change that likely improves code organization and allows for more complex initialization if needed. -
However, the non-null assertion (
!
) onconsumer
assumes thatinitConsumer()
will always successfully initialize the consumer. This could potentially lead to runtime errors if initialization fails for any reason.
Consider adding error handling to account for potential initialization failures:
await initConsumer();
if (!consumer) {
throw new Error('Failed to initialize Kafka consumer');
}
await consumer.subscribe({
// ... existing subscription options
});
This approach ensures that the code fails fast and explicitly if the consumer isn't properly initialized, rather than potentially causing issues later in the execution.
Also applies to: 15-15
@@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src'; | |||
describe.skip('consumer', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address skipped test suite
The entire test suite is currently skipped due to potential flakiness caused by timeouts. This is a significant issue as it means these tests are not being run, which could lead to undetected bugs.
Consider the following actions:
- Investigate the root cause of the flakiness and timeouts.
- Refactor the tests to be more resilient to timing issues, possibly by using mock timers or adjusting wait times.
- If the flakiness persists, consider splitting the suite into stable and unstable tests, running the stable ones regularly and the unstable ones in a separate, more tolerant CI job.
Please don't leave tests skipped indefinitely, as this defeats the purpose of having tests in the first place.
@@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src'; | |||
describe.skip('consumer', () => { | |||
beforeAll(async () => { | |||
await Promise.all([ | |||
consumer.connect(), | |||
consumer!.connect(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Potential Risk with Non-Null Assertions on consumer
The consumer
object is typed as Consumer | undefined
and is being initialized properly. However, using non-null assertions (!
) in the test can mask potential issues if consumer
is undefined
in certain scenarios.
- Recommendations:
- Implement explicit null checks before invoking methods on
consumer
. - Ensure the initialization process guarantees
consumer
is defined before usage. - Address the skipped tests to ensure they reliably validate
consumer
behavior.
- Implement explicit null checks before invoking methods on
🔗 Analysis chain
Verify the nullability of the consumer
object
The addition of non-null assertions (!
) to consumer.connect()
and consumer.subscribe()
calls suggests that the consumer
object might now be nullable. This change could mask potential issues if consumer
is actually null or undefined at runtime.
Please verify the following:
- Confirm that
consumer
is properly initialized before these method calls. - Consider adding a null check before these calls instead of using non-null assertions.
- Update the test to reflect any changes in the consumer initialization process.
Run the following script to verify the consumer initialization:
Also applies to: 16-16
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check consumer initialization in the source file
# Test: Search for consumer initialization
rg --type typescript -A 5 'consumer\s*=|let\s+consumer' src/consumer.ts
# Test: Check if consumer is marked as nullable
rg --type typescript 'consumer\s*:\s*\w+\s*\|?\s*null' src/consumer.ts
Length of output: 216
Script:
#!/bin/bash
# Description: Check consumer initialization in the source file
# Test: Search for consumer initialization
rg -g "*.ts" -A 5 'consumer\s*=|let\s+consumer' src/consumer.ts
# Test: Check if consumer is marked as nullable
rg -g "*.ts" 'consumer\s*:\s*\w+\s*\|?\s*null' src/consumer.ts
Length of output: 242
Script:
#!/bin/bash
# Description: Locate all instances of consumer.ts in the repository
fd 'consumer\.ts$' --type f
Length of output: 68
Script:
#!/bin/bash
# Description: Check consumer initialization in the correct source file
# Test: Search for consumer initialization
rg -g "*.ts" -A 5 'consumer\s*=|let\s+consumer' indexer/packages/kafka/src/consumer.ts
# Test: Check if consumer is marked as nullable
rg -g "*.ts" 'consumer\s*:\s*\w+\s*\|?\s*null' indexer/packages/kafka/src/consumer.ts
Length of output: 668
// As a hack, we made this mutable since CommonJS doesn't support top level await. | ||
// Top level await would needed to fetch the az id (used as rack id). | ||
// eslint-disable-next-line import/no-mutable-exports | ||
export let consumer: Consumer | undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider refactoring to avoid mutable exports
Mutating exported variables can lead to unexpected behaviors and is generally discouraged. To maintain code integrity and improve maintainability, consider refactoring the code to encapsulate the consumer
within a function or a class rather than exporting a mutable variable.
}); | ||
} else { | ||
stopped = true; | ||
await consumer!.disconnect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential undefined consumer
in stopConsumer
If consumer
is undefined
when stopConsumer()
is called, invoking await consumer!.disconnect();
will result in a runtime error. Consider adding a check to ensure consumer
is initialized before attempting to disconnect.
Apply this diff to handle the potential undefined consumer
:
stopped = true;
- await consumer!.disconnect();
+ if (consumer) {
+ await consumer.disconnect();
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
await consumer!.disconnect(); | |
stopped = true; | |
if (consumer) { | |
await consumer.disconnect(); | |
} |
@@ -104,7 +113,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise<v | |||
}; | |||
} | |||
|
|||
await consumer.run(consumerRunConfig); | |||
await consumer!.run(consumerRunConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure consumer
is initialized before running
The consumer
variable may be undefined
if initConsumer()
has not been called before startConsumer()
. To prevent runtime errors, consider adding a check to ensure consumer
is initialized before calling consumer!.run(...)
, or refactor the code to guarantee the initialization sequence.
Apply this diff to check for consumer
initialization:
+ if (!consumer) {
+ throw new Error('Consumer not initialized. Please call initConsumer() before startConsumer().');
+ }
await consumer!.run(consumerRunConfig);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
await consumer!.run(consumerRunConfig); | |
if (!consumer) { | |
throw new Error('Consumer not initialized. Please call initConsumer() before startConsumer().'); | |
} | |
await consumer!.run(consumerRunConfig); |
if (!stopped) { | ||
await consumer!.connect(); | ||
logger.info({ | ||
at: 'kafka-consumer#disconnect', | ||
message: 'Kafka consumer reconnected', | ||
groupId, | ||
}); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling during consumer reconnection
When reconnecting the consumer after a disconnect, errors might occur during await consumer!.connect();
. To prevent unhandled exceptions, wrap the reconnection logic in a try-catch block and handle any potential errors.
Apply this diff to add error handling:
if (!stopped) {
+ try {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
+ } catch (error) {
+ logger.error({
+ at: 'kafka-consumer#disconnect',
+ message: 'Error reconnecting Kafka consumer',
+ error,
+ groupId,
+ });
+ }
} else {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (!stopped) { | |
await consumer!.connect(); | |
logger.info({ | |
at: 'kafka-consumer#disconnect', | |
message: 'Kafka consumer reconnected', | |
groupId, | |
}); | |
} else { | |
if (!stopped) { | |
try { | |
await consumer!.connect(); | |
logger.info({ | |
at: 'kafka-consumer#disconnect', | |
message: 'Kafka consumer reconnected', | |
groupId, | |
}); | |
} catch (error) { | |
logger.error({ | |
at: 'kafka-consumer#disconnect', | |
message: 'Error reconnecting Kafka consumer', | |
error, | |
groupId, | |
}); | |
} | |
} else { | |
logger.info({ | |
at: 'kafka-consumer#disconnect', | |
message: 'Not reconnecting since task is shutting down', | |
groupId, | |
}); | |
} |
@Mergifyio release/indexer/v5.x |
https://github.com/Mergifyio release/indexer/v7.x |
❌ Sorry but I didn't understand the command. Please consult the commands documentation 📚. |
❌ Sorry but I didn't understand the command. Please consult the commands documentation 📚. |
https://github.com/Mergifyio backport release/indexer/v7.x |
✅ Backports have been created
|
Co-authored-by: roy-dydx <[email protected]>
https://github.com/Mergifyio backport release/indexer/v5.x |
✅ Backports have been created
|
Co-authored-by: roy-dydx <[email protected]>
Changelist
[Describe or list the changes made in this PR]
Test Plan
[Describe how this PR was tested (if applicable)]
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit