Skip to content

Commit

Permalink
[ISSUE #1502]♻️Refactor PullMessageRequestHeader🍻 (#1504)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Dec 2, 2024
1 parent 3ef462f commit 7c97393
Show file tree
Hide file tree
Showing 4 changed files with 438 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
self.update_broadcast_pulled_offset(
request_header.topic.as_ref(),
request_header.consumer_group.as_ref(),
request_header.queue_id.unwrap(),
request_header.queue_id,
&request_header,
&channel,
Some(&mut response),
Expand Down Expand Up @@ -199,7 +199,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
&get_message_result,
request_header.consumer_group.as_str(),
request_header.topic.as_str(),
request_header.queue_id.unwrap(),
request_header.queue_id,
);
if let Some(body) = body {
response.set_body_mut_ref(body);
Expand All @@ -223,7 +223,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
polling_time_mills = self.broker_config.short_polling_time_mills;
}
let topic = request_header.topic.as_str();
let queue_id = request_header.queue_id.unwrap();
let queue_id = request_header.queue_id;
let offset = request_header.queue_offset;

let pull_request = PullRequest::new(
Expand Down Expand Up @@ -255,7 +255,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
let mut mq = MessageQueue::new();
mq.set_topic(request_header.topic.clone());
mq.set_broker_name(self.broker_config.broker_name.clone());
mq.set_queue_id(request_header.queue_id.unwrap());
mq.set_queue_id(request_header.queue_id);

let offset_moved_event = OffsetMovedEvent {
consumer_group: request_header.consumer_group.to_string(),
Expand Down Expand Up @@ -349,7 +349,7 @@ impl DefaultPullMessageResultHandler {
.consumer_group
.clone_from(&request_header.consumer_group);
context.topic.clone_from(&request_header.topic);
context.queue_id = request_header.queue_id;
context.queue_id = Some(request_header.queue_id);
context.account_auth_type = auth_type;
context.account_owner_parent = owner_parent;
context.account_owner_self = owner_self;
Expand Down Expand Up @@ -443,7 +443,7 @@ impl DefaultPullMessageResultHandler {
request_header.queue_offset,
get_message_result.next_begin_offset(),
request_header.topic,
request_header.queue_id.unwrap_or(0),
request_header.queue_id,
request_header.consumer_group
);
} else {
Expand Down Expand Up @@ -504,7 +504,7 @@ impl DefaultPullMessageResultHandler {
"slave redirect pullRequest to master, topic: {}, queueId: {}, consumer group: \
{}, next: {}, min: {}, max: {}",
request_header.topic,
request_header.queue_id.unwrap_or(0),
request_header.queue_id,
request_header.consumer_group,
response_header.next_begin_offset.unwrap(),
response_header.min_offset.unwrap(),
Expand All @@ -529,7 +529,7 @@ impl DefaultPullMessageResultHandler {
client_address,
request_header.consumer_group.as_ref(),
request_header.topic.as_ref(),
request_header.queue_id.unwrap(),
request_header.queue_id,
next_offset,
);

Expand All @@ -542,7 +542,7 @@ impl DefaultPullMessageResultHandler {
client_address,
request_header.consumer_group.as_ref(),
request_header.topic.as_ref(),
request_header.queue_id.unwrap(),
request_header.queue_id,
request_header.commit_offset,
);
}
Expand Down
12 changes: 5 additions & 7 deletions rocketmq-broker/src/processor/pull_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<MS> PullMessageProcessor<MS> {
let bname = &mapping_item.bname;
let phy_queue_id = mapping_item.queue_id;
let phy_queue_offset = mapping_item.compute_physical_queue_offset(global_offset);
request_header.queue_id = Some(phy_queue_id);
request_header.queue_id = phy_queue_id;
request_header.queue_offset = phy_queue_offset;
if mapping_item.check_if_end_offset_decided()
/* && request_header.max_msg_nums.is_some() */
Expand Down Expand Up @@ -471,18 +471,16 @@ where
{
return Some(resp);
}
if request_header.queue_id.is_none()
|| request_header.queue_id.unwrap() < 0
|| request_header.queue_id.unwrap()
>= topic_config.as_ref().unwrap().read_queue_nums as i32
if request_header.queue_id < 0
|| request_header.queue_id >= topic_config.as_ref().unwrap().read_queue_nums as i32
{
return Some(
response
.set_code(RemotingSysResponseCode::SystemError)
.set_remark(format!(
"queueId[{}] is illegal, topic:[{}] topicConfig.readQueueNums:[{}] \
consumer:[{}]",
request_header.queue_id.unwrap(),
request_header.queue_id,
request_header.topic,
topic_config.as_ref().unwrap().read_queue_nums,
channel.remote_address()
Expand Down Expand Up @@ -711,7 +709,7 @@ where
let use_reset_offset_feature = self.broker_config.use_server_side_reset_offset;
let topic = request_header.topic.as_ref();
let group = request_header.consumer_group.as_ref();
let queue_id = request_header.queue_id.unwrap();
let queue_id = request_header.queue_id;
let reset_offset = self
.consumer_offset_manager
.query_then_erase_reset_offset(topic, group, queue_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl PullAPIWrapper {
let request_header = PullMessageRequestHeader {
consumer_group: self.consumer_group.clone(),
topic: CheetahString::from_string(mq.get_topic().to_string()),
queue_id: Some(mq.get_queue_id()),
queue_id: mq.get_queue_id(),
queue_offset: offset,
max_msg_nums: max_nums,
sys_flag: sys_flag_inner,
Expand Down
Loading

0 comments on commit 7c97393

Please sign in to comment.