Skip to content

Commit

Permalink
[ISSUE #2075]💫Implement PopReviveService#getReviveMessage🚀 (#2078)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 4, 2025
1 parent f93a956 commit e1d64b7
Showing 1 changed file with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,40 @@ impl<MS: MessageStore> PopReviveService<MS> {
}
}

pub(crate) async fn get_revive_message(
&self,
offset: i64,
queue_id: i32,
) -> Option<Vec<ArcMut<MessageExt>>> {
let pull_result = self
.get_message(
&CheetahString::from_static_str(PopAckConstants::REVIVE_GROUP),
&self.revive_topic,
queue_id,
offset,
32,
true,
)
.await?;
if reach_tail(&pull_result, offset) {
//nothing to do
} else if pull_result.pull_status == PullStatus::OffsetIllegal
|| pull_result.pull_status == PullStatus::NoMatchedMsg
{
if !self.should_run_pop_revive {
return None;
}
self.consumer_offset_manager.commit_offset(
CheetahString::from_static_str(PopAckConstants::LOCAL_HOST),
&CheetahString::from_static_str(PopAckConstants::REVIVE_GROUP),
&self.revive_topic,
queue_id,
pull_result.next_begin_offset as i64 - 1,
);
}
pull_result.msg_found_list
}

pub async fn get_message(
&self,
group: &CheetahString,
Expand Down Expand Up @@ -251,6 +285,12 @@ impl<MS: MessageStore> PopReviveService<MS> {
}
}

fn reach_tail(pull_result: &PullResult, offset: i64) -> bool {
*pull_result.pull_status() == PullStatus::NoNewMsg
|| *pull_result.pull_status() == PullStatus::OffsetIllegal
&& offset == pull_result.max_offset as i64
}

fn decode_msg_list(
get_message_result: GetMessageResult,
de_compress_body: bool,
Expand Down

0 comments on commit e1d64b7

Please sign in to comment.