Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #2073]💫Implement PopReviveService#reviveRetry🚀 #2074

Merged
merged 1 commit into from
Jan 4, 2025

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Jan 4, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #2073

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Enhanced message retry handling capabilities
    • Improved configuration management for message processing
    • Added support for dynamic topic and consumer group configuration
  • Refactor

    • Updated PopReviveService struct with more comprehensive configuration management
    • Introduced new methods for handling message retries and topic initialization

Copy link
Contributor

coderabbitai bot commented Jan 4, 2025

Walkthrough

The pull request introduces significant enhancements to the PopReviveService in the RocketMQ broker's message processing system. The changes involve updating the service's struct to include new configuration and management components like broker_config, topic_config_manager, consumer_offset_manager, and escape_bridge. Two new methods, revive_retry and add_retry_topic_if_not_exist, are added to improve message retry handling and topic configuration management.

Changes

File Change Summary
rocketmq-broker/src/processor/processor_service/pop_revive_service.rs - Updated PopReviveService struct to be generic with new configuration fields
- Modified new() method to accept additional configuration parameters
- Added revive_retry() async method for message retry logic
- Added add_retry_topic_if_not_exist() method for topic configuration management
- Added init_pop_retry_offset() method for initializing retry offsets

Assessment against linked issues

Objective Addressed Explanation
Implement PopReviveService#reviveRetry [#2073]

Possibly related PRs

Suggested labels

feature, auto merge, ready to review, waiting-review, AI review first, rocketmq-broker crate, Difficulty level/Moderate

Suggested reviewers

  • TeslaRustor
  • SpaceXCN
  • rocketmq-rust-bot

Poem

🚀 In the realm of message retry's dance,
A service springs to life with enhanced stance
PopRevive leaps with config's might
Weaving retry logic, oh so bright!
Broker's messages find their way home
Through code that makes the system roam 🐰


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@RocketmqRustBot RocketmqRustBot added this to the v0.4.0 milestone Jan 4, 2025
@RocketmqRustBot RocketmqRustBot added the feature🚀 Suggest an idea for this project. label Jan 4, 2025
Copy link

codecov bot commented Jan 4, 2025

Codecov Report

Attention: Patch coverage is 0% with 107 lines in your changes missing coverage. Please review.

Project coverage is 28.39%. Comparing base (abf753d) to head (dff81c1).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
.../processor/processor_service/pop_revive_service.rs 0.00% 107 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2074      +/-   ##
==========================================
- Coverage   28.43%   28.39%   -0.05%     
==========================================
  Files         489      489              
  Lines       69326    69431     +105     
==========================================
  Hits        19713    19713              
- Misses      49613    49718     +105     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
rocketmq-broker/src/processor/processor_service/pop_revive_service.rs (2)

43-54: Add documentation for newly introduced fields.
The broker_config, topic_config_manager, consumer_offset_manager, and escape_bridge fields significantly impact how the PopReviveService interacts with broker-level configurations and offset management. Consider adding doc comments to clarify their purpose.


83-145: Avoid overwriting or duplicating tags/properties.
Inside revive_retry, lines 107–110 set the tags from the original message if present; otherwise, they set an empty property map. Then line 118 calls MessageAccessor::set_properties again, cloning the entire property set from the original message_ext. This sequence may override the earlier assignment if the message had no tags or if other properties conflict.

--- a/pop_revive_service.rs
+++ b/pop_revive_service.rs
@@ -107,9 +107,6 @@
 if let Some(tags) = message_ext.get_tags() {
     msg_inner.set_tags(tags);
 } else {
-    MessageAccessor::set_properties(&mut msg_inner, HashMap::new());
 }
 // ...
 MessageAccessor::set_properties(&mut msg_inner, message_ext.properties().clone());
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between abf753d and dff81c1.

📒 Files selected for processing (1)
  • rocketmq-broker/src/processor/processor_service/pop_revive_service.rs (2 hunks)
🔇 Additional comments (3)
rocketmq-broker/src/processor/processor_service/pop_revive_service.rs (3)

147-164: Ensure new topics are consistently visible across the system.
When calling add_retry_topic_if_not_exist, a new topic configuration is inserted. Please verify that other broker components (e.g., routing and metadata) pick up newly added topics so messages aren't dropped.

✅ Verification successful

Let me gather more information about how topic configurations are handled in the broker runtime.


Topic registration is properly handled through the broker runtime system

The verification shows that topic configurations are properly propagated across the system:

  1. When a new topic is created via update_topic_config, the TopicConfigManager automatically calls register_broker_data for new topics
  2. The registration process:
    • Spawns an async task to handle the registration
    • Uses register_single_topic_all to propagate the topic to all name servers when enable_single_topic_register is enabled
    • Updates permissions and broker metadata as needed

The code follows a robust pattern for topic registration and visibility across the broker system, ensuring new topics are properly tracked and accessible.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if add_retry_topic_if_not_exist is followed by any step or method that notifies the broker's topic routing or metadata systems.
rg -A 5 'update_topic_config' rocketmq-broker

Length of output: 4172


Script:

#!/bin/bash
# Check broker runtime handling of topic configurations
ast-grep --pattern 'fn broker_runtime_inner'

# Check if there are any topic registration or notification mechanisms
rg -B 2 -A 5 'register.*topic|notify.*topic' rocketmq-broker

Length of output: 11800


22-42: Consider grouping and re-checking imports for potential duplication or unnecessary modules.
While the introduced imports (for example BrokerConfig, TopicConfig, and more) seem necessary for the new functionality, please verify that every imported item is used to avoid cluttering the namespace.

✅ Verification successful

Let me verify the usage of these imports in the actual code.


All imports are actively used in the code

Based on the search results, every imported module and type is being used within the file:

  • BrokerConfig is used in struct fields and parameters
  • TopicConfig is used for creating and configuring topics
  • KeyBuilder is used for building pop retry topics
  • message_decoder is used for converting message properties to strings
  • MessageExt and MessageExtBrokerInner are used for message handling
  • MessageConst is used for property constants
  • MessageTrait is imported as a trait requirement
  • mix_all is used for prefix checking
  • PopAckConstants is used for queue configuration
  • TopicFilterType is used for topic filter configuration
  • MessageAccessor is used for property manipulation
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search the file to confirm usage of each import within lines 22 to 42.
rg -A 5 'use rocketmq_common::common::(broker|config|key_builder|message|mix_all|pop_ack_constants|TopicFilterType|MessageAccessor)' rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

Length of output: 1098


Script:

#!/bin/bash
# Search for usage of each imported type/module in the file
echo "=== Searching for BrokerConfig usage ==="
rg "BrokerConfig" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for TopicConfig usage ==="
rg "TopicConfig" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for KeyBuilder usage ==="
rg "KeyBuilder" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for message_decoder usage ==="
rg "message_decoder" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for MessageExt usage ==="
rg "MessageExt" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for MessageExtBrokerInner usage ==="
rg "MessageExtBrokerInner" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for MessageConst usage ==="
rg "MessageConst" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for MessageTrait usage ==="
rg "MessageTrait" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for mix_all usage ==="
rg "mix_all" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for PopAckConstants usage ==="
rg "PopAckConstants" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for TopicFilterType usage ==="
rg "TopicFilterType" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

echo -e "\n=== Searching for MessageAccessor usage ==="
rg "MessageAccessor" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

Length of output: 4256


Line range hint 57-81: Initialization logic is consistent, but consider verifying concurrency assumptions.
The constructor properly initializes the new fields, but if multiple threads instantiate or mutate PopReviveService, ensure concurrency assumptions hold. In particular, confirm that the TopicConfigManager and ConsumerOffsetManager usage remains thread-safe or is only used within a single-thread context.

✅ Verification successful

Thread-safety is properly implemented through synchronized data structures

The code inspection reveals that PopReviveService is designed with thread-safety in mind:

  • The inflight_revive_request_map is wrapped in both Arc and parking_lot::Mutex, providing thread-safe access to the shared state
  • Thread-safe components are used for shared resources:
    • Arc<ConsumerOffsetManager> for consumer offset management
    • Arc<BrokerConfig> for broker configuration
    • ArcMut<EscapeBridge> for escape bridge access

The concurrency concerns in the review comment are already addressed through proper use of synchronization primitives. The implementation correctly handles concurrent access through atomic reference counting and mutex protection where needed.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Inspect references to PopReviveService::new usage across the codebase for concurrency or multi-thread usage patterns
rg -A 5 'PopReviveService::new' 

Length of output: 31


Script:

#!/bin/bash
# Let's try a broader search to understand the usage patterns and thread safety
rg -l "PopReviveService"

# Also search for potential concurrent access patterns
ast-grep --pattern 'struct PopReviveService'

Length of output: 141


Script:

#!/bin/bash
# Look for any async or thread-related code around PopReviveService
rg -B 3 -A 7 "PopReviveService" 

Length of output: 2478


Script:

#!/bin/bash
# Search for any mutex or synchronization mechanisms
rg "Arc::new|Mutex::new|RwLock::new" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

Length of output: 179


Script:

#!/bin/bash
# Let's check for any async methods or concurrent operations in the implementation
ast-grep --pattern 'impl$$$PopReviveService$$$async fn'

# Look for any methods that might modify the shared state
rg -B 2 -A 5 "inflight_revive_request_map" rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

Length of output: 953


Script:

#!/bin/bash
# Check if this service is used in any async/concurrent context
rg -B 3 -A 7 "async.*PopReviveService"

Length of output: 40

Comment on lines +166 to +177
fn init_pop_retry_offset(&mut self, topic: &CheetahString, consumer_group: &CheetahString) {
let offset = self
.consumer_offset_manager
.query_offset(consumer_group, topic, 0);
if offset < 0 {
self.consumer_offset_manager.commit_offset(
"initPopRetryOffset".into(),
consumer_group,
topic,
0,
0,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Initialization with offset 0 might need a more detailed rationale.
If the offset is missing (“< 0”), you commit offset = 0. Consider whether resuming at offset 0 for existing topics might accidentally reprocess older messages or skip messages. A comment or doc note explaining this initialization decision would help maintainers.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement PopReviveService#reviveRetry
3 participants