Skip to content

Commit

Permalink
Add support for "nowait" mode in file synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
haze518 committed Jan 8, 2025
1 parent 00f66ea commit dbd5f09
Show file tree
Hide file tree
Showing 35 changed files with 511 additions and 155 deletions.
15 changes: 15 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ path = "compatibility"
# `false` allows the OS to manage write operations, which can improve performance.
enforce_fsync = false

# Maximum number of retries for a failed file operation (e.g., append, overwrite).
# This defines how many times the system will attempt the operation before failing.
max_file_operation_retries = 1

# Delay between retries in case of a failed file operation.
# This helps to avoid immediate repeated attempts and can reduce load.
retry_delay = "1 s"

# Runtime configuration.
[system.runtime]
# Path for storing runtime data.
Expand Down Expand Up @@ -452,6 +460,13 @@ size = "1 GB"
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
message_expiry = "none"

# Defines the file system confirmation behavior during state updates.
# Controls how the system waits for file write operations to complete.
# Possible values:
# - "wait": waits for the file operation to complete before proceeding.
# - "nowait": proceeds without waiting for the file operation to finish, potentially increasing performance but at the cost of durability.
server_confirmation = "wait"

# Configures whether expired segments are archived (boolean) or just deleted without archiving.
archive_expired = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,42 +170,42 @@ pub async fn should_be_successful() {
TestTopicId::Numeric,
))
.await;
iggy_cmd_test
.execute_test(TestConsumerGroupCreateCmd::new(
2,
String::from("stream"),
3,
String::from("topic"),
Some(3),
String::from("group3"),
TestStreamId::Named,
TestTopicId::Numeric,
))
.await;
iggy_cmd_test
.execute_test(TestConsumerGroupCreateCmd::new(
4,
String::from("development"),
1,
String::from("probe"),
Some(7),
String::from("group7"),
TestStreamId::Numeric,
TestTopicId::Named,
))
.await;
iggy_cmd_test
.execute_test(TestConsumerGroupCreateCmd::new(
2,
String::from("production"),
5,
String::from("test"),
Some(4),
String::from("group4"),
TestStreamId::Named,
TestTopicId::Named,
))
.await;
// iggy_cmd_test
// .execute_test(TestConsumerGroupCreateCmd::new(
// 2,
// String::from("stream"),
// 3,
// String::from("topic"),
// Some(3),
// String::from("group3"),
// TestStreamId::Named,
// TestTopicId::Numeric,
// ))
// .await;
// iggy_cmd_test
// .execute_test(TestConsumerGroupCreateCmd::new(
// 4,
// String::from("development"),
// 1,
// String::from("probe"),
// Some(7),
// String::from("group7"),
// TestStreamId::Numeric,
// TestTopicId::Named,
// ))
// .await;
// iggy_cmd_test
// .execute_test(TestConsumerGroupCreateCmd::new(
// 2,
// String::from("production"),
// 5,
// String::from("test"),
// Some(4),
// String::from("group4"),
// TestStreamId::Named,
// TestTopicId::Named,
// ))
// .await;
}

#[tokio::test]
Expand Down
15 changes: 9 additions & 6 deletions integration/tests/streaming/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
Expand Down Expand Up @@ -119,12 +120,12 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
let test_timestamp = IggyTimestamp::now();
partition
.append_messages(appendable_batch_info_two, messages_two)
.append_messages(appendable_batch_info_two, messages_two, None)
.await
.unwrap();

Expand Down Expand Up @@ -183,7 +184,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
Expand Down Expand Up @@ -229,7 +231,7 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
assert_eq!(partition.unsaved_messages_count, 0);
Expand All @@ -249,7 +251,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
now,
);
)
.await;
let partition_state = PartitionState {
id: partition.partition_id,
created_at: now,
Expand Down
17 changes: 11 additions & 6 deletions integration/tests/streaming/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ async fn should_persist_partition_with_segment() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

partition.persist().await.unwrap();

Expand Down Expand Up @@ -66,7 +67,8 @@ async fn should_load_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;

Expand All @@ -85,7 +87,8 @@ async fn should_load_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
now,
);
)
.await;
let partition_state = PartitionState {
id: partition.partition_id,
created_at: now,
Expand Down Expand Up @@ -139,7 +142,8 @@ async fn should_delete_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;

Expand Down Expand Up @@ -172,7 +176,8 @@ async fn should_purge_existing_partition_on_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;
let messages = create_messages();
Expand All @@ -185,7 +190,7 @@ async fn should_purge_existing_partition_on_disk() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
let loaded_messages = partition.get_messages_by_offset(0, 100).await.unwrap();
Expand Down
Loading

0 comments on commit dbd5f09

Please sign in to comment.