Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #698]🚀Support UpdateConsumerOffset(15) request⚡️ #700

Merged
merged 4 commits into from
Jun 27, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Jun 27, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #698

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced methods for updating consumer offsets, managing subscription groups, and checking topic existence in RocketMQ.
    • Added support for JSON serialization of consumer offsets.
    • Enhanced topic configuration and subscription group management.
  • Bug Fixes

    • Improved consumer offset management by updating ConsumerOffsetManager initialization and methods.
  • Documentation

    • Updated documentation to reflect new methods and changes in consumer offset and topic management.

Copy link
Contributor

coderabbitai bot commented Jun 27, 2024

Walkthrough

The code changes focus on improving RocketMQ's handling of consumer offsets. Key modifications include the direct storage of ConsumerOffsetManager in BrokerRuntime, initialization updates, and added methods for managing consumer offsets. The ConsumerManageProcessor now includes new fields to accommodate broker configurations and logic updates. Additionally, new methods check for subscription groups and topics. UpdateConsumerOffsetRequestHeader and UpdateConsumerOffsetResponseHeader structures have been introduced with serialization and deserialization methods to support consumer offset updates.

Changes

Files/Modules Change Summary
rocketmq-broker/src/broker_runtime.rs Modified consumer_offset_manager field and initialization in BrokerRuntime struct.
rocketmq-broker/src/offset/manager/consumer_offset_manager.rs Added and updated methods for managing consumer offsets and adjustments to the ConsumerOffsetManager struct.
rocketmq-broker/src/processor/consumer_manage_processor.rs Added new fields and logic for consumer offset updates in ConsumerManageProcessor struct.
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs Added contains_subscription_group method to check for subscription groups.
rocketmq-broker/src/topic/manager/topic_config_manager.rs Added contains_topic method to check for topics in TopicConfigManager struct.
rocketmq-common/src/common/broker/broker_config.rs Added consumer_offset_update_version_step field to BrokerConfig struct.
rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs Renamed variable from topic_config to _topic_config within topic_config_and_mapping_serialize_wrapper_getters
rocketmq-remoting/src/protocol/header.rs Added update_consumer_offset_header module to the protocol package.
rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs Introduced structures and implementations for updating consumer offsets, including serialization/deserialization.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant BrokerRuntime
    participant ConsumerOffsetManager
    participant ConsumerManageProcessor

    Client->>BrokerRuntime: Update Consumer Offset Request
    BrokerRuntime->>ConsumerOffsetManager: Handle Offset (Init)
    ConsumerOffsetManager-->>BrokerRuntime: Offset Managed
    BrokerRuntime->>ConsumerManageProcessor: Process New Offset Information
    ConsumerManageProcessor->>ConsumerOffsetManager: Update Offset for Client
    ConsumerOffsetManager-->>ConsumerManageProcessor: Offset Updated
    ConsumerManageProcessor-->>Client: Offset Update Confirmation
Loading

Assessment against linked issues

Objective Addressed Explanation
Support UpdateConsumerOffset(15) request (#698)
Add logic for updating consumer offset based on conditions

Poem

Amidst the clouds where data streams fly,
A broker whispers, letting offsets lie.
New methods bloom like flowers in the spring,
Enhancing RocketMQ, a joyful ring.
With offsets managed in a structured dance,
The brokers and clients in harmony prance.
Code flows smooth, like rivers to the sea,
A Rabbit's delight, in this symphony. 🌼🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot requested a review from SpaceXCN June 27, 2024 02:55
@mxsm
Copy link
Owner Author

mxsm commented Jun 27, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between daada10 and 1dbe411.

Files selected for processing (9)
  • rocketmq-broker/src/broker_runtime.rs (4 hunks)
  • rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (4 hunks)
  • rocketmq-broker/src/processor/consumer_manage_processor.rs (4 hunks)
  • rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1 hunks)
  • rocketmq-broker/src/topic/manager/topic_config_manager.rs (1 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
  • rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs (1 hunks)
Files not reviewed due to errors (1)
  • rocketmq-broker/src/topic/manager/topic_config_manager.rs (no review received)
Additional comments not posted (8)
rocketmq-remoting/src/protocol/header.rs (1)

28-28: New module addition for consumer offset updates.

Ensure that the new update_consumer_offset_header module is properly integrated and utilized where necessary.

Verification successful

New module addition for consumer offset updates.
The new update_consumer_offset_header module is properly integrated and utilized in the codebase.

  • rocketmq-remoting/src/protocol/header.rs: Module definition.
  • rocketmq-broker/src/processor/consumer_manage_processor.rs: Module usage.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that the new module is being used in the codebase.

# Test: Search for the module usage. Expect: At least one occurrence.
rg --type rust $'update_consumer_offset_header'

Length of output: 292

rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs (1)

101-101: Good practice to mark unused variables.

Changing topic_config to _topic_config is a good practice in Rust to indicate that the variable is not used, which avoids compiler warnings.

rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)

104-112: New method for checking subscription group existence.

The addition of contains_subscription_group is crucial for dynamic subscription group management. Ensure this method is integrated and used correctly in the context of subscription group checks.

Verification successful

Method contains_subscription_group integrated correctly.

The method contains_subscription_group is used in rocketmq-broker/src/processor/consumer_manage_processor.rs to check for the existence of a subscription group and handle cases where the group does not exist. This confirms its proper integration and usage in the codebase.

  • rocketmq-broker/src/processor/consumer_manage_processor.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the new method is used in the codebase.

# Test: Search for the method usage. Expect: At least one occurrence.
rg --type rust $'contains_subscription_group'

Length of output: 291



Script:

#!/bin/bash
# Description: Retrieve the context of the method usage in the specified file.

# Extract 10 lines before and after the usage of `contains_subscription_group` in the file.
rg --type rust -A 10 -B 10 'contains_subscription_group' rocketmq-broker/src/processor/consumer_manage_processor.rs

Length of output: 922

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (1)

38-65: Enhancements in consumer offset management.

The new methods and modifications in ConsumerOffsetManager enhance the management of consumer offsets. It's important to ensure that these changes are thoroughly tested, especially the new commit_offset and has_offset_reset methods.

Also applies to: 67-122, 157-162, 189-195

Verification successful

The script output shows multiple occurrences of commit_offset and has_offset_reset in the codebase. However, we need to verify that these occurrences are related to the ConsumerOffsetManager methods. Let's check the context of these occurrences to ensure they are correctly implemented and used.


Enhancements in consumer offset management.

The commit_offset and has_offset_reset methods in ConsumerOffsetManager are correctly implemented and utilized in the codebase. Specifically, their usage in rocketmq-broker/src/processor/consumer_manage_processor.rs confirms their integration.

  • rocketmq-broker/src/processor/consumer_manage_processor.rs
  • rocketmq-broker/src/offset/manager/consumer_offset_manager.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the new methods are used in the codebase.

# Test: Search for the method usage. Expect: At least one occurrence for each method.
rg --type rust $'commit_offset|has_offset_reset'

Length of output: 4130



Script:

#!/bin/bash
# Verify the context of commit_offset and has_offset_reset methods to ensure they are used in relation to ConsumerOffsetManager

# Search for the commit_offset method definition in ConsumerOffsetManager
rg --type rust 'pub fn commit_offset' -A 10 rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

# Search for the has_offset_reset method definition in ConsumerOffsetManager
rg --type rust 'pub fn has_offset_reset' -A 10 rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

# Check where commit_offset is used in the codebase to ensure it's from ConsumerOffsetManager
rg 'commit_offset' -A 3

# Check where has_offset_reset is used in the codebase to ensure it's from ConsumerOffsetManager
rg 'has_offset_reset' -A 3

Length of output: 13514

rocketmq-common/src/common/broker/broker_config.rs (1)

Line range hint 155-219: New configuration field for consumer offset updates.

The addition of consumer_offset_update_version_step in BrokerConfig allows for more controlled updates of consumer offsets. Verify that this new field is being used appropriately in the system configuration and during runtime operations.

Verification successful

New configuration field for consumer offset updates has been verified.

The field consumer_offset_update_version_step is used appropriately in the consumer_offset_manager.rs file to control the update intervals of the data version.

  • rocketmq-broker/src/offset/manager/consumer_offset_manager.rs: The field is used in a conditional statement to control the frequency of data version updates.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the new configuration field is being used in the codebase.

# Test: Search for the field usage. Expect: At least one occurrence.
rg --type rust $'consumer_offset_update_version_step'

Length of output: 389



Script:

#!/bin/bash
# Description: Display the context around the usage of consumer_offset_update_version_step in consumer_offset_manager.rs

# Display the relevant lines of code around its usage in consumer_offset_manager.rs
rg --context 10 'consumer_offset_update_version_step' rocketmq-broker/src/offset/manager/consumer_offset_manager.rs

Length of output: 989

rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs (3)

28-29: Approval of structural definitions for response header.

The UpdateConsumerOffsetResponseHeader struct is well-defined with appropriate traits for serialization, which are crucial for network communication protocols.


31-40: Check for the use of optional fields in request headers.

The queue_id and commit_offset are marked as optional. Ensure that the business logic properly handles cases where these values might be None, especially since queue ID and commit offset are critical for the operation.


71-92: Review of the from method implementation.

The method implementation for deserialization from a HashMap seems correct. However, ensure that the unwrap_or_default usage aligns with expected behavior, particularly for critical fields like queue_id and commit_offset.

Verification successful

Verify the deserialization logic for critical fields.

To ensure the from method handles default values correctly, especially for queue_id and commit_offset, we need to check the usage and tests for UpdateConsumerOffsetRequestHeader.


Verify the deserialization logic in consumer_manage_processor.rs.

To ensure the from method handles default values correctly, especially for queue_id and commit_offset, we need to check the usage in rocketmq-broker/src/processor/consumer_manage_processor.rs.


Verify the usage of UpdateConsumerOffsetRequestHeader in consumer_manage_processor.rs.

To ensure the from method handles default values correctly, especially for queue_id and commit_offset, we need to check the usage of UpdateConsumerOffsetRequestHeader in rocketmq-broker/src/processor/consumer_manage_processor.rs.


Verify the interaction of decode_command_custom_header with UpdateConsumerOffsetRequestHeader.

To ensure the from method handles default values correctly, especially for queue_id and commit_offset, we need to check the implementation of decode_command_custom_header and its interaction with UpdateConsumerOffsetRequestHeader.


Review the implementation of decode_command_custom_header.

To ensure the from method handles default values correctly, especially for queue_id and commit_offset, we need to check the implementation of decode_command_custom_header in rocketmq-remoting/src/protocol/remoting_command.rs.


Verification successful: Deserialization logic handles default values correctly.

The from method in UpdateConsumerOffsetRequestHeader correctly uses unwrap_or_default to handle default values for fields, including critical ones like queue_id and commit_offset. The decode_command_custom_header method relies on this logic, ensuring proper deserialization.

  • rocketmq-remoting/src/protocol/remoting_command.rs: Implementation of decode_command_custom_header.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the deserialization logic's handling of default values.

# Test: Review the deserialization logic in the context of actual usage.
rg --type rust 'from'

Length of output: 87894



Script:

#!/bin/bash
# Search for tests or usage examples of UpdateConsumerOffsetRequestHeader to verify deserialization logic.
rg 'UpdateConsumerOffsetRequestHeader' --type rust

Length of output: 3359



Script:

#!/bin/bash
# Search for the usage of the from method in consumer_manage_processor.rs to verify deserialization logic.
rg 'from' rocketmq-broker/src/processor/consumer_manage_processor.rs

Length of output: 203



Script:

#!/bin/bash
# Search for the usage of UpdateConsumerOffsetRequestHeader in consumer_manage_processor.rs to verify deserialization logic.
rg 'UpdateConsumerOffsetRequestHeader' rocketmq-broker/src/processor/consumer_manage_processor.rs

Length of output: 349



Script:

#!/bin/bash
# Search for the implementation of decode_command_custom_header to verify its interaction with UpdateConsumerOffsetRequestHeader.
rg 'decode_command_custom_header' --type rust

Length of output: 3660



Script:

#!/bin/bash
# Display the implementation of decode_command_custom_header in remoting_command.rs to verify its interaction with UpdateConsumerOffsetRequestHeader.
rg 'decode_command_custom_header' -A 20 rocketmq-remoting/src/protocol/remoting_command.rs

Length of output: 947

@@ -75,7 +75,7 @@ pub(crate) struct BrokerRuntime {
server_config: Arc<ServerConfig>,
topic_config_manager: TopicConfigManager,
topic_queue_mapping_manager: Arc<TopicQueueMappingManager>,
consumer_offset_manager: Arc<ConsumerOffsetManager>,
consumer_offset_manager: ConsumerOffsetManager,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor Suggestion: Consider using Arc for ConsumerOffsetManager.

Changing ConsumerOffsetManager from Arc<ConsumerOffsetManager> to a direct instance might lead to unnecessary cloning and potential performance degradation when the instance is shared across multiple threads. Consider maintaining the Arc wrapping to ensure efficient memory management and thread safety.

consumer_offset_manager: Arc<ConsumerOffsetManager>,

Comment on lines +289 to +290
self.consumer_offset_manager
.set_message_store(Some(Arc::new(message_store.clone())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling when setting the message store.

The method set_message_store is called without checking the result or handling potential errors. It's crucial to handle errors during such operations to avoid runtime failures. Consider adding error handling or ensuring that the method can never fail under normal conditions.

if let Err(e) = self.consumer_offset_manager.set_message_store(Some(Arc::new(message_store.clone()))) {
    error!("Failed to set message store: {:?}", e);
}

@@ -369,14 +371,17 @@
self.topic_queue_mapping_manager.clone(),
self.consumer_manager.clone(),
self.consumer_filter_manager.clone(),
self.consumer_offset_manager.clone(),
Arc::new(self.consumer_offset_manager.clone()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor suggestion: Avoid redundant cloning and simplify the code.

There are multiple instances where objects are cloned unnecessarily, especially when they are already wrapped in Arc. Simplifying this can improve performance and reduce memory usage.

let consumer_offset_manager = Arc::new(self.consumer_offset_manager.clone());
let topic_config_manager = Arc::new(self.topic_config_manager.clone());

let consumer_manage_processor = ConsumerManageProcessor::new(
    self.broker_config.clone(),
    self.consumer_manager.clone(),
    self.topic_queue_mapping_manager.clone(),
    self.subscription_group_manager.clone(),
    consumer_offset_manager,
    topic_config_manager,
    self.message_store.clone().unwrap(),
);

Also applies to: 379-384

@@ -180,7 +180,7 @@
server_config,
topic_config_manager,
topic_queue_mapping_manager,
consumer_offset_manager: Arc::new(Default::default()),
consumer_offset_manager: ConsumerOffsetManager::new(broker_config.clone(), None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Initialization of ConsumerOffsetManager needs to be wrapped with Arc.

The consumer_offset_manager is used in contexts that require thread safety, as evidenced by its wrapping in Arc in various processors. To maintain consistency and ensure thread-safe access, it should be initialized with Arc in the BrokerRuntime constructor.

  • File: rocketmq-broker/src/broker_runtime.rs
  • Line: 183
Analysis chain

Initialization of ConsumerOffsetManager needs clarification.

The initialization of ConsumerOffsetManager directly within the BrokerRuntime constructor without an Arc might cause issues if this instance is expected to be shared across threads or asynchronously accessed. Verify if this change aligns with the intended usage patterns and consider wrapping it with Arc if necessary.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all usages of `ConsumerOffsetManager` to ensure it's properly managed for thread safety.

# Test: Search for usages of `ConsumerOffsetManager`. Expect: Proper handling or wrapping with Arc where shared across threads.
rg --type rust $'ConsumerOffsetManager'

Length of output: 1687



Script:

#!/bin/bash
# Description: Examine the usage of `consumer_offset_manager` within `BrokerRuntime` to determine if it requires `Arc`.

# Find all methods and usages of `consumer_offset_manager` in `BrokerRuntime`.
rg --type rust --context 10 'consumer_offset_manager' rocketmq-broker/src/broker_runtime.rs

Length of output: 9288

Comment on lines +48 to +67
impl CommandCustomHeader for UpdateConsumerOffsetRequestHeader {
fn to_map(&self) -> Option<HashMap<String, String>> {
let mut map = HashMap::new();
map.insert(
Self::CONSUMER_GROUP.to_string(),
self.consumer_group.clone(),
);
map.insert(Self::TOPIC.to_string(), self.topic.clone());
if let Some(queue_id) = self.queue_id {
map.insert(Self::QUEUE_ID.to_string(), queue_id.to_string());
}
if let Some(commit_offset) = self.commit_offset {
map.insert(Self::COMMIT_OFFSET.to_string(), commit_offset.to_string());
}
if let Some(ref value) = self.topic_request_header {
if let Some(val) = value.to_map() {
map.extend(val);
}
}
Some(map)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of the to_map method implementation.

The method correctly serializes the header fields into a HashMap. However, consider using unwrap_or_else with a default function instead of unwrap_or_default to handle potential None values more explicitly, which could enhance readability and error handling.

-    .unwrap_or_default(),
+    .unwrap_or_else(|| "".to_string()),

Committable suggestion was skipped due to low confidence.

Comment on lines +95 to +192
impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader {
fn with_lo(&mut self, lo: Option<bool>) {
self.topic_request_header.as_mut().unwrap().lo = lo;
}

fn lo(&self) -> Option<bool> {
self.topic_request_header.as_ref().unwrap().lo
}

fn with_topic(&mut self, topic: String) {
self.topic = topic;
}

fn topic(&self) -> String {
self.topic.clone()
}

fn broker_name(&self) -> Option<String> {
self.topic_request_header
.as_ref()
.unwrap()
.rpc
.as_ref()
.unwrap()
.broker_name
.clone()
}

fn with_broker_name(&mut self, broker_name: String) {
self.topic_request_header
.as_mut()
.unwrap()
.rpc
.as_mut()
.unwrap()
.broker_name = Some(broker_name);
}

fn namespace(&self) -> Option<String> {
self.topic_request_header
.as_ref()
.unwrap()
.rpc
.as_ref()
.unwrap()
.namespace
.clone()
}

fn with_namespace(&mut self, namespace: String) {
self.topic_request_header
.as_mut()
.unwrap()
.rpc
.as_mut()
.unwrap()
.namespace = Some(namespace);
}

fn namespaced(&self) -> Option<bool> {
self.topic_request_header
.as_ref()
.unwrap()
.rpc
.as_ref()
.unwrap()
.namespaced
}

fn with_namespaced(&mut self, namespaced: bool) {
self.topic_request_header
.as_mut()
.unwrap()
.rpc
.as_mut()
.unwrap()
.namespaced = Some(namespaced);
}

fn oneway(&self) -> Option<bool> {
self.topic_request_header
.as_ref()
.unwrap()
.rpc
.as_ref()
.unwrap()
.oneway
}

fn with_oneway(&mut self, oneway: bool) {
self.topic_request_header
.as_mut()
.unwrap()
.rpc
.as_mut()
.unwrap()
.oneway = Some(oneway);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Ensure safe usage of unwrap() in UpdateConsumerOffsetRequestHeader trait implementation.

The unwrap() calls in methods like with_lo(), broker_name(), and others assume that topic_request_header and rpc are always Some. Without explicit checks, these assumptions can lead to runtime panics.

  • File: rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs
  • Lines: 95-192

Consider adding checks or using safer alternatives like expect() with meaningful error messages or handling the None case gracefully.

Analysis chain

Review of trait implementations for request header.

The trait implementations provide necessary functionalities like setting and getting fields. Ensure that the use of unwrap() in methods like with_lo() and broker_name() is safe and won't lead to runtime panics.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify safety of unwrap usage in trait implementations.

# Test: Search for error handling related to these trait implementations.
rg --type rust 'unwrap()'

Length of output: 74318



Script:

#!/bin/bash
# Extract context around unwrap() calls in the specified file to verify their safety.
rg --context 5 'unwrap()' rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs

Length of output: 4289

Comment on lines +160 to +234
let mut request_header = request
.decode_command_custom_header::<UpdateConsumerOffsetRequestHeader>()
.unwrap();
let mut mapping_context = self
.topic_queue_mapping_manager
.build_topic_queue_mapping_context(&request_header, false);

let rewrite_result = self.rewrite_request_for_static_topic_for_consume_offset(
&mut request_header,
&mut mapping_context,
);
if let Some(result) = rewrite_result {
return Some(result);
}
let topic = request_header.topic.as_str();
let group = request_header.consumer_group.as_str();
let queue_id = request_header.queue_id;
let offset = request_header.commit_offset;
let response = RemotingCommand::create_response_command();
if !self
.subscription_group_manager
.contains_subscription_group(group)
{
return Some(
response
.set_code(ResponseCode::SubscriptionGroupNotExist)
.set_remark(Some(format!("subscription group not exist, {}", group))),
);
}

if !self.topic_config_manager.contains_topic(topic) {
return Some(
response
.set_code(ResponseCode::TopicNotExist)
.set_remark(Some(format!("topic not exist, {}", topic))),
);
}

if queue_id.is_none() {
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(format!("QueueId is null, topic is {}", topic))),
);
}
if offset.is_none() {
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(format!("Offset is null, topic is {}", topic))),
);
}
if self.broker_config.use_server_side_reset_offset
&& self
.consumer_offset_manager
.has_offset_reset(topic, group, queue_id.unwrap())
{
info!(
"Update consumer offset is rejected because of previous offset-reset. \
Group={},Topic={}, QueueId={}, Offset={}",
topic,
group,
queue_id.unwrap(),
offset.unwrap()
);
return Some(response.set_remark(Some("Offset has been previously reset".to_string())));
}
self.consumer_offset_manager.commit_offset(
ctx.remoting_address(),
group,
topic,
queue_id.unwrap(),
offset.unwrap(),
);
Some(response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Tests missing for new conditions in update_consumer_offset method.

The new conditions added to the update_consumer_offset method, such as checking subscription group existence and topic existence, do not appear to be covered by existing test cases. It is crucial to add tests to verify these new conditions.

  • Add unit tests for the update_consumer_offset method to cover the new conditions.
  • Ensure that the tests verify the existence of subscription groups and topics as implemented in the new logic.
Analysis chain

Review of the update_consumer_offset method.

This method has been significantly updated to handle new logic for updating consumer offsets. Ensure that all new conditions, such as checking subscription group existence and topic existence, are correctly implemented and tested.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the new conditions in the `update_consumer_offset` method.

# Test: Search for unit tests covering these new conditions.
rg --type rust 'test' | grep 'update_consumer_offset'

Length of output: 467



Script:

#!/bin/bash
# Display the contents of the test cases in the specified file to verify coverage of new conditions.
cat rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs

Length of output: 8522



Script:

#!/bin/bash
# Search for test cases or modules that might cover the `update_consumer_offset` method in the entire codebase.
rg 'update_consumer_offset' --type rust

Length of output: 943



Script:

#!/bin/bash
# Search for integration tests or higher-level test cases that might cover the `update_consumer_offset` method in the entire codebase.
rg 'test' --type rust | grep -i 'integration\|consumer_manage_processor'

Length of output: 72

@mxsm
Copy link
Owner Author

mxsm commented Jun 27, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 1dbe411 and 8039dc2.

Files selected for processing (1)
  • rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs

Copy link

codecov bot commented Jun 27, 2024

Codecov Report

Attention: Patch coverage is 30.07812% with 179 lines in your changes missing coverage. Please review.

Project coverage is 27.32%. Comparing base (daada10) to head (8039dc2).

Files Patch % Lines
...c/protocol/header/update_consumer_offset_header.rs 56.29% 59 Missing ⚠️
...oker/src/offset/manager/consumer_offset_manager.rs 0.00% 56 Missing ⚠️
...-broker/src/processor/consumer_manage_processor.rs 0.00% 47 Missing ⚠️
rocketmq-broker/src/broker_runtime.rs 0.00% 8 Missing ⚠️
...subscription/manager/subscription_group_manager.rs 0.00% 5 Missing ⚠️
...q-broker/src/topic/manager/topic_config_manager.rs 0.00% 3 Missing ⚠️
rocketmq-common/src/common/broker/broker_config.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #700      +/-   ##
==========================================
+ Coverage   27.26%   27.32%   +0.05%     
==========================================
  Files         255      256       +1     
  Lines       19868    20113     +245     
==========================================
+ Hits         5417     5495      +78     
- Misses      14451    14618     +167     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@TeslaRustor TeslaRustor added approved PR has approved and removed ready to review labels Jun 27, 2024
@mxsm mxsm merged commit f60b39c into main Jun 27, 2024
17 checks passed
@mxsm mxsm deleted the feature-698 branch June 27, 2024 03:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved PR has approved auto merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Support UpdateConsumerOffset(15) request
2 participants