Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #699]🚀Support pull message result handle #701

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
use crate::hook::check_before_put_message::CheckBeforePutMessageHook;
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager;
use crate::out_api::broker_outer_api::BrokerOuterAPI;
Expand Down Expand Up @@ -372,6 +373,7 @@
self.consumer_manager.clone(),
self.consumer_filter_manager.clone(),
Arc::new(self.consumer_offset_manager.clone()),
Arc::new(BroadcastOffsetManager::default()),

Check warning on line 376 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L376

Added line #L376 was not covered by tests
self.message_store.as_ref().unwrap().clone(),
);

Expand Down
15 changes: 15 additions & 0 deletions rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,18 @@
*/
#[derive(Debug, Default)]
pub struct BroadcastOffsetManager {}

#[allow(unused_variables)]
impl BroadcastOffsetManager {
pub fn query_init_offset(

Check warning on line 22 in rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs#L22

Added line #L22 was not covered by tests
&self,
topic: &str,
group_id: &str,
queue_id: i32,
client_id: &str,
request_offset: i64,
from_proxy: bool,
) -> i64 {
unimplemented!()

Check warning on line 31 in rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs#L31

Added line #L31 was not covered by tests
}
}
Comment on lines +20 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement query_init_offset method in BroadcastOffsetManager.

The new method query_init_offset is a critical addition for handling initial offset queries. It currently contains a placeholder implementation (unimplemented!()). Ensure that the actual implementation is completed before merging this PR, and consider adding unit tests to cover this new functionality.

// Suggested test for query_init_offset
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_query_init_offset() {
        let manager = BroadcastOffsetManager::default();
        assert_eq!(manager.query_init_offset("topic", "group", 1, "client", 0, false), 0);
    }
}

Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,23 @@
&mut response,
client_address.as_str(),
);
/* self.execute_consume_message_hook_before(
self.execute_consume_message_hook_before(

Check warning on line 93 in rocketmq-broker/src/processor/default_pull_message_result_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/default_pull_message_result_handler.rs#L93

Added line #L93 was not covered by tests
&request,
&request_header,
&get_message_result,
broker_allow_suspend,
From::from(response.code()),
);*/

);
/*let response_header = response.read_custom_header_mut::<PullMessageResponseHeader>();
let rewrite_result = PullMessageProcessor::rewrite_response_for_static_topic(
&request_header,
response_header.unwrap(),
&mut mapping_context,
ResponseCode::from(response.code()),
);
if rewrite_result.is_some() {
return rewrite_result;
}*/
None
}
}
Expand Down
Loading
Loading