Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1881]🚀Implement AckMessageProcessor#ackOrderly method logic #1882

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@
self.broker_config.clone(),
self.pop_inflight_message_counter.clone(),
self.store_host,
Arc::new(self.consumer_offset_manager.clone()),
pop_message_processor.clone(),
self.consumer_order_info_manager.clone(),

Check warning on line 574 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L572-L574

Added lines #L572 - L574 were not covered by tests
));
BrokerRequestProcessor {
send_message_processor: ArcMut::new(send_message_processor),
Expand Down
23 changes: 23 additions & 0 deletions rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#![allow(unused_variables)]

use std::collections::HashMap;
use std::collections::HashSet;
Expand Down Expand Up @@ -241,6 +242,28 @@
) {
unimplemented!("")
}

pub fn commit_and_next(
&self,
topic: &CheetahString,
group: &CheetahString,
queue_id: i32,
queue_offset: u64,
pop_time: u64,
) -> i64 {
unimplemented!()

Check warning on line 254 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L246-L254

Added lines #L246 - L254 were not covered by tests
}

pub fn check_block(
&self,
attempt_id: &CheetahString,
topic: &CheetahString,
group: &CheetahString,
queue_id: i32,
invisible_time: u64,
) -> bool {
unimplemented!()

Check warning on line 265 in rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs#L257-L265

Added lines #L257 - L265 were not covered by tests
}
}

#[inline]
Expand Down
109 changes: 105 additions & 4 deletions rocketmq-broker/src/processor/ack_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
#![allow(unused_variables)]

use std::cmp::Ordering;
use std::net::SocketAddr;
use std::sync::Arc;

Expand Down Expand Up @@ -52,6 +53,8 @@
use crate::broker_error::BrokerError::BrokerCommonError;
use crate::broker_error::BrokerError::BrokerRemotingError;
use crate::failover::escape_bridge::EscapeBridge;
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager;
use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter;
use crate::processor::pop_message_processor::PopMessageProcessor;
use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService;
Expand All @@ -64,6 +67,9 @@
escape_bridge: ArcMut<EscapeBridge<MS>>,
store_host: SocketAddr,
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
consumer_offset_manager: Arc<ConsumerOffsetManager>,
consumer_order_info_manager: Arc<ConsumerOrderInfoManager<MS>>,
pop_message_processor: ArcMut<PopMessageProcessor>,
}

impl<MS> AckMessageProcessor<MS>
Expand All @@ -77,6 +83,9 @@
broker_config: Arc<BrokerConfig>,
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
store_host: SocketAddr,
consumer_offset_manager: Arc<ConsumerOffsetManager>,
pop_message_processor: ArcMut<PopMessageProcessor>,
consumer_order_info_manager: Arc<ConsumerOrderInfoManager<MS>>,

Check warning on line 88 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L86-L88

Added lines #L86 - L88 were not covered by tests
) -> AckMessageProcessor<MS> {
AckMessageProcessor {
topic_config_manager,
Expand All @@ -86,6 +95,9 @@
escape_bridge,
store_host,
pop_inflight_message_counter,
consumer_offset_manager,
pop_message_processor,
consumer_order_info_manager,

Check warning on line 100 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L98-L100

Added lines #L98 - L100 were not covered by tests
}
}

Expand Down Expand Up @@ -260,7 +272,8 @@
invisible_time,
channel,
response,
);
)
.await;

Check warning on line 276 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L275-L276

Added lines #L275 - L276 were not covered by tests
return;
}
let ack = AckMsg::default();
Expand Down Expand Up @@ -324,7 +337,8 @@
invisible_time,
channel,
response,
);
)
.await;

Check warning on line 341 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L340-L341

Added lines #L340 - L341 were not covered by tests
} else {
batch_ack_msg.ack_offset_list.push(offset);
}
Expand Down Expand Up @@ -429,7 +443,7 @@
);
}

fn ack_orderly(
async fn ack_orderly(

Check warning on line 446 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L446

Added line #L446 was not covered by tests
&mut self,
topic: CheetahString,
consume_group: CheetahString,
Expand All @@ -440,6 +454,93 @@
channel: &Channel,
response: &mut RemotingCommand,
) {
unimplemented!("ack_orderly")
let lock_key = CheetahString::from_string(format!(
"{}{}{}{}{}",
&topic,
PopAckConstants::SPLIT,
&consume_group,
PopAckConstants::SPLIT,
q_id
));
let old_offset = self
.consumer_offset_manager
.query_offset(&consume_group, &topic, q_id);
if old_offset > ack_offset {
return;
}
while !self
.pop_message_processor
.queue_lock_manager()
.try_lock_with_key(lock_key.clone())
.await
{}
Comment on lines +472 to +476
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Attempt to acquire queue lock
The loop while !try_lock_with_key(...) can lead to busy-wait. Consider a small async delay or re-check to avoid hogging the runtime if locks are highly contested.

- while !self
-     .pop_message_processor
-     .queue_lock_manager()
-     .try_lock_with_key(lock_key.clone())
-     .await
- {}
+ loop {
+     if self
+        .pop_message_processor
+        .queue_lock_manager()
+        .try_lock_with_key(lock_key.clone())
+        .await {
+         break;
+     }
+     tokio::time::sleep(Duration::from_millis(5)).await;
+ }

Committable suggestion skipped: line range outside the PR's diff.

let old_offset = self
.consumer_offset_manager
.query_offset(&consume_group, &topic, q_id);
if old_offset > ack_offset {
return;
}
let next_offset = self.consumer_order_info_manager.commit_and_next(
&consume_group,
&topic,
q_id,
ack_offset as u64,
pop_time as u64,
);
Comment on lines +483 to +489
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

commit_and_next
The unwrapped function call to consumer_order_info_manager.commit_and_next is presumably fundamental to your ordering logic. Ensure robust error-handling, especially for edge cases like negative offsets or missing topic.

match next_offset.cmp(&-1) {
Ordering::Less => {}

Check warning on line 491 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L457-L491

Added lines #L457 - L491 were not covered by tests
Ordering::Equal => {
let error_info = format!(
"offset is illegal, key:{}, old:{}, commit:{}, next:{}, {}",
lock_key,
old_offset,
ack_offset,
next_offset,
channel.remote_address()
);
response.set_code_ref(ResponseCode::MessageIllegal);
response.set_remark_mut(error_info);
self.pop_message_processor
.queue_lock_manager()
.unlock_with_key(lock_key)
.await;
return;

Check warning on line 507 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L493-L507

Added lines #L493 - L507 were not covered by tests
}
Ordering::Greater => {
if !self.consumer_offset_manager.has_offset_reset(
consume_group.as_str(),
topic.as_str(),
q_id,
) {
self.consumer_offset_manager.commit_offset(
channel.remote_address(),
&consume_group,
&topic,
q_id,
next_offset,
);
}

Check warning on line 522 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L510-L522

Added lines #L510 - L522 were not covered by tests

if !self.consumer_order_info_manager.check_block(
&CheetahString::empty(),
&consume_group,
&topic,
q_id,
invisible_time as u64,
) {
self.pop_message_processor.notify_message_arriving(
&topic,
q_id,
&consume_group,
);
}

Check warning on line 536 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L524-L536

Added lines #L524 - L536 were not covered by tests
}
}
self.pop_message_processor
.queue_lock_manager()
.unlock_with_key(lock_key)
.await;
self.pop_inflight_message_counter
.decrement_in_flight_message_num(&topic, &consume_group, pop_time, q_id, 1);

Check warning on line 544 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L539-L544

Added lines #L539 - L544 were not covered by tests
}
}
13 changes: 13 additions & 0 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@
pub fn queue_lock_manager(&self) -> &QueueLockManager {
unimplemented!("PopMessageProcessor QueueLockManager")
}

pub fn notify_message_arriving(
&self,
topic: &CheetahString,
queue_id: i32,
cid: &CheetahString,
) {
info!(
"notifyMessageArriving topic={} queueId={} cid={}",

Check warning on line 61 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L54-L61

Added lines #L54 - L61 were not covered by tests
topic, queue_id, cid
);
unimplemented!("PopMessageProcessor notify_message_arriving")

Check warning on line 64 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L64

Added line #L64 was not covered by tests
}
}

impl PopMessageProcessor {
Expand Down
Loading