Skip to content

Commit

Permalink
Return remaining messages in batch when auto commit is disabled (#1375)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Dec 4, 2024
1 parent a135c10 commit 733037c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.50"
version = "0.6.51"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
19 changes: 12 additions & 7 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ impl IggyConsumer {
)
.await;

if let Ok(polled_messages) = polled_messages {
if let Ok(mut polled_messages) = polled_messages {
if polled_messages.messages.is_empty() {
return Ok(polled_messages);
}
Expand All @@ -522,12 +522,17 @@ impl IggyConsumer {
last_consumed_offset.insert(partition_id, AtomicU64::new(0));
}

if has_consumed_offset && consumed_offset >= polled_messages.messages[0].offset {
return Ok(PolledMessages {
messages: EMPTY_MESSAGES,
current_offset: polled_messages.current_offset,
partition_id,
});
if has_consumed_offset {
polled_messages
.messages
.retain(|message| message.offset > consumed_offset);
if polled_messages.messages.is_empty() {
return Ok(PolledMessages {
messages: EMPTY_MESSAGES,
current_offset: polled_messages.current_offset,
partition_id,
});
}
}

let stored_offset;
Expand Down

0 comments on commit 733037c

Please sign in to comment.