Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1362]🎨Refactor QueryAssignmentProcessor #1363

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
use crate::processor::end_transaction_processor::EndTransactionProcessor;
use crate::processor::pull_message_processor::PullMessageProcessor;
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
use crate::processor::query_assignment_processor::QueryAssignmentProcessor;
use crate::processor::query_message_processor::QueryMessageProcessor;
use crate::processor::reply_message_processor::ReplyMessageProcessor;
use crate::processor::send_message_processor::SendMessageProcessor;
Expand Down Expand Up @@ -537,7 +538,9 @@
self.subscription_group_manager.clone(),
)),
consumer_manage_processor: ArcMut::new(consumer_manage_processor),
query_assignment_processor: Default::default(),
query_assignment_processor: ArcMut::new(QueryAssignmentProcessor::new(
self.message_store_config.clone(),
)),

Check warning on line 543 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L541-L543

Added lines #L541 - L543 were not covered by tests
Comment on lines +541 to +543
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Warning: QueryAssignmentProcessor implementation is incomplete

The verification revealed that while the processor is properly initialized and integrated into the broker's processor chain, its core functionality is not implemented:

  • The query_assignment method is marked as unimplemented!()
  • The set_message_request_mode method is marked as unimplemented!()

These unimplemented methods handle critical request codes:

  • RequestCode::QueryAssignment
  • RequestCode::SetMessageRequestMode
🔗 Analysis chain

Verify QueryAssignmentProcessor integration

The initialization follows the established pattern for processor initialization in the broker. However, let's verify the integration points.


Let me gather more information about the QueryAssignmentProcessor implementation and its usage.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify QueryAssignmentProcessor usage and integration
# Test: Check for any direct usages of QueryAssignmentProcessor methods
rg -A 5 "query_assignment_processor\." 

# Test: Check for any request handlers that might use this processor
ast-grep --pattern 'impl $_ for QueryAssignmentProcessor {
  $$$
}'

Length of output: 561


Script:

#!/bin/bash
# Check the implementation of QueryAssignmentProcessor
ast-grep --pattern 'struct QueryAssignmentProcessor {
  $$$
}'

# Look for any method implementations related to QueryAssignmentProcessor
ast-grep --pattern 'impl QueryAssignmentProcessor {
  $$$
}'

# Search for any request handling or processing related to assignments
rg -l "Assignment" 

# Check if there are any tests related to QueryAssignmentProcessor
rg -l "QueryAssignmentProcessor.*test"

Length of output: 6368

query_message_processor: ArcMut::new(query_message_processor),
end_transaction_processor: ArcMut::new(EndTransactionProcessor::new(
self.message_store_config.clone(),
Expand Down
32 changes: 15 additions & 17 deletions rocketmq-broker/src/load_balance/message_request_mode_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use cheetah_string::CheetahString;
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
use rocketmq_rust::ArcMut;
use rocketmq_store::config::message_store_config::MessageStoreConfig;
use tracing::info;

use crate::broker_path_config_helper;

pub(crate) struct MessageRequestModeManager {
message_store_config: ArcMut<MessageStoreConfig>,
message_store_config: Arc<MessageStoreConfig>,
message_request_mode_map: Arc<
parking_lot::Mutex<
HashMap<
Expand All @@ -40,7 +39,7 @@ pub(crate) struct MessageRequestModeManager {
}

impl MessageRequestModeManager {
pub fn new(message_store_config: ArcMut<MessageStoreConfig>) -> Self {
pub fn new(message_store_config: Arc<MessageStoreConfig>) -> Self {
Self {
message_store_config,
message_request_mode_map: Arc::new(parking_lot::Mutex::new(HashMap::new())),
Expand Down Expand Up @@ -113,14 +112,13 @@ mod tests {
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
use rocketmq_rust::ArcMut;
use rocketmq_store::config::message_store_config::MessageStoreConfig;

use super::*;

#[test]
fn set_message_request_mode_adds_entry() {
let message_store_config = ArcMut::new(MessageStoreConfig::default());
let message_store_config = Arc::new(MessageStoreConfig::default());
let manager = MessageRequestModeManager::new(message_store_config);
let topic = CheetahString::from("test_topic");
let consumer_group = CheetahString::from("test_group");
Expand All @@ -138,7 +136,7 @@ mod tests {

#[test]
fn get_message_request_mode_returns_none_for_nonexistent_entry() {
let message_store_config = ArcMut::new(MessageStoreConfig::default());
let message_store_config = Arc::new(MessageStoreConfig::default());
let manager = MessageRequestModeManager::new(message_store_config);
let topic = CheetahString::from("nonexistent_topic");
let consumer_group = CheetahString::from("nonexistent_group");
Expand All @@ -150,7 +148,7 @@ mod tests {

#[test]
fn encode_pretty_returns_pretty_json() {
let message_store_config = ArcMut::new(MessageStoreConfig::default());
let message_store_config = Arc::new(MessageStoreConfig::default());
let manager = MessageRequestModeManager::new(message_store_config);
let topic = CheetahString::from("test_topic");
let consumer_group = CheetahString::from("test_group");
Expand All @@ -169,18 +167,18 @@ mod tests {

#[test]
fn decode_populates_message_request_mode_map() {
let message_store_config = ArcMut::new(MessageStoreConfig::default());
let message_store_config = Arc::new(MessageStoreConfig::default());
let manager = MessageRequestModeManager::new(message_store_config);
let json = r#"{
"test_topic": {
"test_group": {
"topic": "test_topic",
"consumerGroup": "test_group",
"mode": "PULL",
"popShareQueueNum": 0
}
}
}"#;
"test_topic": {
"test_group": {
"topic": "test_topic",
"consumerGroup": "test_group",
"mode": "PULL",
"popShareQueueNum": 0
}
}
}"#;

manager.decode(json);
let result = manager.get_message_request_mode(
Expand Down
80 changes: 72 additions & 8 deletions rocketmq-broker/src/processor/query_assignment_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,83 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::load_balance::message_request_mode_manager::MessageRequestModeManager;
use cheetah_string::CheetahString;
use rocketmq_client_rust::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
use rocketmq_client_rust::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely;
use rocketmq_client_rust::consumer::rebalance_strategy::allocate_message_queue_averagely_by_circle::AllocateMessageQueueAveragelyByCircle;
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_store::config::message_store_config::MessageStoreConfig;
use std::collections::HashMap;
use std::sync::Arc;

use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
pub struct QueryAssignmentProcessor {
message_request_mode_manager: MessageRequestModeManager,
load_strategy: HashMap<CheetahString, Arc<dyn AllocateMessageQueueStrategy>>,
message_store_config: Arc<MessageStoreConfig>,
}

#[derive(Default)]
pub struct QueryAssignmentProcessor {}
impl QueryAssignmentProcessor {
pub fn new(message_store_config: Arc<MessageStoreConfig>) -> Self {
let allocate_message_queue_averagely: Arc<dyn AllocateMessageQueueStrategy> =
Arc::new(AllocateMessageQueueAveragely);
let allocate_message_queue_averagely_by_circle: Arc<dyn AllocateMessageQueueStrategy> =
Arc::new(AllocateMessageQueueAveragelyByCircle);
let mut load_strategy = HashMap::new();
load_strategy.insert(
CheetahString::from_static_str(allocate_message_queue_averagely.get_name()),
allocate_message_queue_averagely,
);
load_strategy.insert(
CheetahString::from_static_str(allocate_message_queue_averagely_by_circle.get_name()),
allocate_message_queue_averagely_by_circle,
);
let manager = MessageRequestModeManager::new(message_store_config.clone());
let _ = manager.load();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle the potential error from manager.load()

The call to manager.load() returns a Result, but the current implementation ignores it. Ignoring potential errors can lead to unexpected behavior if the loading process fails.

Consider handling the Result properly. You can propagate the error or handle it within the method. Here's how you might modify the code:

- let _ = manager.load();
+ manager.load().map_err(|e| {
+     // Handle the error, e.g., log it or return an error response
+     eprintln!("Failed to load MessageRequestModeManager: {:?}", e);
+     // You might choose to propagate the error further
+ })?;

Committable suggestion skipped: line range outside the PR's diff.

Self {
message_request_mode_manager: manager,
load_strategy,
message_store_config,
}
}

Check warning on line 59 in rocketmq-broker/src/processor/query_assignment_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_assignment_processor.rs#L38-L59

Added lines #L38 - L59 were not covered by tests
}

impl QueryAssignmentProcessor {
fn process_request(
&self,
pub async fn process_request(
&mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
request_code: RequestCode,
request: RemotingCommand,
) -> Option<RemotingCommand> {
match request_code {
RequestCode::QueryAssignment => self.query_assignment(channel, ctx, request).await,

Check warning on line 71 in rocketmq-broker/src/processor/query_assignment_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_assignment_processor.rs#L63-L71

Added lines #L63 - L71 were not covered by tests
RequestCode::SetMessageRequestMode => {
self.set_message_request_mode(channel, ctx, request).await

Check warning on line 73 in rocketmq-broker/src/processor/query_assignment_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_assignment_processor.rs#L73

Added line #L73 was not covered by tests
}
_ => None,

Check warning on line 75 in rocketmq-broker/src/processor/query_assignment_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_assignment_processor.rs#L75

Added line #L75 was not covered by tests
}
}

Check warning on line 77 in rocketmq-broker/src/processor/query_assignment_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_assignment_processor.rs#L77

Added line #L77 was not covered by tests

async fn query_assignment(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> Option<RemotingCommand> {
unimplemented!()

Check warning on line 85 in rocketmq-broker/src/processor/query_assignment_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_assignment_processor.rs#L79-L85

Added lines #L79 - L85 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement the query_assignment method or handle unimplemented functionality

The query_assignment method currently contains unimplemented!(), which will cause the program to panic if this method is invoked. Panicking in production code can lead to application crashes.

Consider implementing the required functionality or returning an appropriate error response to the client. For example:

-     unimplemented!()
+     // TODO: Implement the query_assignment logic
+     let response = RemotingCommand::new_response(/* parameters */);
+     // Populate the response as needed
+     Some(response)

If you need assistance in implementing this method, I can help develop the logic or we can open a GitHub issue to track this task.

Committable suggestion skipped: line range outside the PR's diff.

}

async fn set_message_request_mode(
&mut self,
_channel: Channel,

Check warning on line 90 in rocketmq-broker/src/processor/query_assignment_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_assignment_processor.rs#L88-L90

Added lines #L88 - L90 were not covered by tests
_ctx: ConnectionHandlerContext,
_request: RemotingCommand,
) -> RemotingCommand {
todo!()
) -> Option<RemotingCommand> {
unimplemented!()

Check warning on line 94 in rocketmq-broker/src/processor/query_assignment_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/query_assignment_processor.rs#L93-L94

Added lines #L93 - L94 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement the set_message_request_mode method or handle unimplemented functionality

Similar to query_assignment, the set_message_request_mode method uses unimplemented!(), which will cause a panic if called. It's important to handle this to prevent potential runtime errors.

Please implement the necessary logic or return an appropriate error response. Here's a guideline:

-     unimplemented!()
+     // TODO: Implement the set_message_request_mode logic
+     let response = RemotingCommand::new_response(/* parameters */);
+     // Populate the response as needed
+     Some(response)

Let me know if you'd like assistance in implementing this method or if we should create a GitHub issue for it.

Committable suggestion skipped: line range outside the PR's diff.

}
}
2 changes: 1 addition & 1 deletion rocketmq-client/src/consumer/rebalance_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
pub mod allocate_message_queue_averagely;
mod allocate_message_queue_averagely_by_circle;
pub mod allocate_message_queue_averagely_by_circle;

use std::collections::HashSet;

Expand Down
Loading