Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1828]🚀Implement MQClientAPIImpl#ackMessageAsync #1837

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rocketmq-client/src/client_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub enum MQClientError {

#[error("{0}")]
IllegalArgumentError(String),

#[error("{0}")]
CommonError(#[from] rocketmq_common::error::Error),
}

#[derive(Error, Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

use std::collections::HashSet;
use std::error::Error;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -63,6 +64,7 @@
use crate::client_error::ClientErr;
use crate::client_error::MQClientError;
use crate::consumer::ack_callback::AckCallback;
use crate::consumer::ack_result::AckResult;
use crate::consumer::consumer_impl::consume_message_concurrently_service::ConsumeMessageConcurrentlyService;
use crate::consumer::consumer_impl::consume_message_orderly_service::ConsumeMessageOrderlyService;
use crate::consumer::consumer_impl::consume_message_pop_concurrently_service::ConsumeMessagePopConcurrentlyService;
Expand Down Expand Up @@ -1431,7 +1433,7 @@
error!("The broker[{}] not exist", des_broker_name);
return;
}

let find_broker_result = find_broker_result.unwrap();

Check warning on line 1436 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1436

Added line #L1436 was not covered by tests
let request_header = AckMessageRequestHeader {
consumer_group: consumer_group.clone(),
topic: CheetahString::from_string(
Expand All @@ -1449,8 +1451,29 @@
lo: None,
}),
};
//client_instance.mq_client_api_impl.as_mut().unwrap()
unimplemented!("ackAsync");
struct DefaultAckCallback;
impl AckCallback for DefaultAckCallback {
fn on_success(&self, _ack_result: AckResult) {}

Check warning on line 1456 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1456

Added line #L1456 was not covered by tests

fn on_exception(&self, _e: Box<dyn Error>) {}

Check warning on line 1458 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1458

Added line #L1458 was not covered by tests
}
match client_instance
.mq_client_api_impl
.as_mut()
.unwrap()
.ack_message_async(
&find_broker_result.broker_addr,
request_header,
ASYNC_TIMEOUT,
DefaultAckCallback,
)
.await

Check warning on line 1470 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1460-L1470

Added lines #L1460 - L1470 were not covered by tests
{
Ok(_) => {}
Err(e) => {
error!("ackAsync error: {}", e);

Check warning on line 1474 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1472-L1474

Added lines #L1472 - L1474 were not covered by tests
}
}
}

pub(crate) async fn change_pop_invisible_time_async(
Expand Down
63 changes: 63 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
use rocketmq_remoting::clients::RemotingClient;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::protocol::body::batch_ack_message_request_body::BatchAckMessageRequestBody;
use rocketmq_remoting::protocol::body::check_client_request_body::CheckClientRequestBody;
use rocketmq_remoting::protocol::body::get_consumer_listby_group_response_body::GetConsumerListByGroupResponseBody;
use rocketmq_remoting::protocol::body::query_assignment_request_body::QueryAssignmentRequestBody;
Expand All @@ -50,6 +51,7 @@
use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody;
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody;
use rocketmq_remoting::protocol::header::ack_message_request_header::AckMessageRequestHeader;
use rocketmq_remoting::protocol::header::change_invisible_time_request_header::ChangeInvisibleTimeRequestHeader;
use rocketmq_remoting::protocol::header::change_invisible_time_response_header::ChangeInvisibleTimeResponseHeader;
use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
Expand Down Expand Up @@ -1688,6 +1690,67 @@
}
Ok(pop_result)
}

pub async fn ack_message_async(
&self,
addr: &CheetahString,
request_header: AckMessageRequestHeader,
timeout_millis: u64,
ack_callback: impl AckCallback,
) -> Result<()> {
self.ack_message_async_inner(
addr,
Some(request_header),
None,
timeout_millis,
ack_callback,
)
.await
}

Check warning on line 1709 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1694-L1709

Added lines #L1694 - L1709 were not covered by tests

pub(self) async fn ack_message_async_inner(
&self,
addr: &CheetahString,
request_header: Option<AckMessageRequestHeader>,
request_body: Option<BatchAckMessageRequestBody>,
timeout_millis: u64,
ack_callback: impl AckCallback,
) -> Result<()> {
let request = if let Some(header) = request_header {
RemotingCommand::create_request_command(RequestCode::AckMessage, header)

Check warning on line 1720 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1711-L1720

Added lines #L1711 - L1720 were not covered by tests
} else {
let body = request_body.unwrap();
RemotingCommand::new_request(
RequestCode::BatchAckMessage,
body.encode().map_err(MQClientError::CommonError)?,

Check warning on line 1725 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1722-L1725

Added lines #L1722 - L1725 were not covered by tests
)
};
match self
.remoting_client
.invoke_async(Some(addr), request, timeout_millis)
.await

Check warning on line 1731 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1728-L1731

Added lines #L1728 - L1731 were not covered by tests
{
Ok(response) => {
let response_code = ResponseCode::from(response.code());
let ack_result = if response_code == ResponseCode::Success {
AckResult {
status: AckStatus::Ok,
..Default::default()
}

Check warning on line 1739 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1733-L1739

Added lines #L1733 - L1739 were not covered by tests
} else {
AckResult {
status: AckStatus::NotExist,
..Default::default()
}

Check warning on line 1744 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1741-L1744

Added lines #L1741 - L1744 were not covered by tests
};
ack_callback.on_success(ack_result);

Check warning on line 1746 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1746

Added line #L1746 was not covered by tests
}
Err(e) => {
ack_callback.on_exception(Box::new(e));
}

Check warning on line 1750 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1748-L1750

Added lines #L1748 - L1750 were not covered by tests
}
Ok(())
}

Check warning on line 1753 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1752-L1753

Added lines #L1752 - L1753 were not covered by tests
}

fn build_queue_offset_sorted_map(
Expand Down
Loading