Skip to content

Commit

Permalink
Archive tiered storage (#1053)
Browse files Browse the repository at this point in the history
Implement optional and configurable disk archiver for closed segments
(messages) and state log (storage). This is related to #290 and is the first iteration of working data
archiver e.g. for the data backups purposes or just storing the old data in general.
  • Loading branch information
spetz authored Jul 12, 2024
1 parent 0a1b1a7 commit d6e5a90
Show file tree
Hide file tree
Showing 38 changed files with 1,496 additions and 370 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 31 additions & 6 deletions configs/server.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,29 @@
{
"data_maintenance": {
"archiver": {
"enabled": false,
"kind": "disk",
"disk": {
"path": "local_data/archive"
},
"s3": {
"key_id": "123",
"access_key": "secret",
"bucket": "iggy",
"region": "eu-west-1"
}
},
"messages": {
"archiver_enabled": false,
"cleaner_enabled": false,
"interval": "1 m"
},
"state": {
"archiver_enabled": false,
"overwrite": true,
"interval": "1 m"
}
},
"http": {
"enabled": true,
"address": "0.0.0.0:3000",
Expand Down Expand Up @@ -113,10 +138,6 @@
"enabled": true,
"size": "4 GB"
},
"retention_policy": {
"message_expiry": "none",
"max_topic_size": "10 GB"
},
"encryption": {
"enabled": false,
"key": ""
Expand All @@ -129,7 +150,9 @@
"path": "streams"
},
"topic": {
"path": "topics"
"path": "topics",
"max_size": "10 GB",
"delete_oldest_segments": false
},
"partition": {
"path": "partitions",
Expand All @@ -140,7 +163,9 @@
"segment": {
"size": "1 GB",
"cache_indexes": true,
"cache_time_indexes": true
"cache_time_indexes": true,
"message_expiry": "none",
"archive_expired": false
},
"message_deduplication": {
"enabled": false,
Expand Down
80 changes: 62 additions & 18 deletions configs/server.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,44 @@
[data_maintenance.archiver]
# Enables or disables the archiver process.
enabled = false

# Kind of archiver to use. Available options: "disk".
kind = "disk"

[data_maintenance.archiver.disk]
# Path for storing the archived data on disk.
path = "local_data/archive"

[data_maintenance.archiver.s3]
# Access key ID for the S3 bucket.
key_id = "123"
# Secret access key for the S3 bucket
access_key = "secret"
# Region of the S3 bucket.
region = "eu-west-1"
# Endpoint of the S3 bucket.
bucket = "iggy"

[data_maintenance.messages]
# Enables or disables the archiver process for closed segments containing messages.
archiver_enabled = false

# Enables or disables the expired message cleaner process.
cleaner_enabled = false

# Interval for running the message archiver and cleaner.
interval = "1 m"

[data_maintenance.state]
# Enables or disables the archiver process for state log.
archiver_enabled = false

# Sets whether the state archiver should overwrite existing log archive or always create a new one.
overwrite = true

# Interval for running the state archiver
interval = "1 m"

# HTTP server configuration
[http]
# Determines if the HTTP server is active.
Expand Down Expand Up @@ -269,24 +310,6 @@ enabled = true
# Maximum size of the cache, e.g. "4GB".
size = "4 GB"

# Data retention policy configuration.
[system.retention_policy]
# Configures the message time-based expiry setting.
# "none" means messages are kept indefinitely.
# A time value in human-readable format determines the lifespan of messages.
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
message_expiry = "none"

# Configures the topic size-based expiry setting.
# "unlimited" or "0" means topics are kept indefinitely.
# A size value in human-readable format determines the maximum size of a topic.
# When a topic reaches this size, the oldest messages are deleted to make room for new ones.
# Messages are removed in full segments, so if segment size is 1 GB and the topic size is 10 GB,
# the oldest segment will be deleted upon reaching 10 GB.
# Example: `max_topic_size = "10 GB"` means oldest messages in topics will be deleted when they reach 10 GB.
# Note: this setting can be overwritten with CreateTopic and UpdateTopic requests.
max_topic_size = "10 GB"

# Encryption configuration
[system.encryption]
# Determines whether server-side data encryption is enabled (boolean).
Expand Down Expand Up @@ -322,6 +345,19 @@ path = "streams"
# Specifies the directory where topic data is stored, relative to `stream.path`.
path = "topics"

# Configures the topic size-based expiry setting.
# "unlimited" or "0" means topics are kept indefinitely.
# A size value in human-readable format determines the maximum size of a topic.
# When a topic reaches this size, the oldest messages are deleted to make room for new ones.
# Messages are removed in full segments, so if segment size is 1 GB and the topic size is 10 GB,
# the oldest segment will be deleted upon reaching 10 GB.
# Example: `max_topic_size = "10 GB"` means oldest messages in topics will be deleted when they reach 10 GB.
# Note: this setting can be overwritten with CreateTopic and UpdateTopic requests.
max_size = "10 GB"

# Configures whether the oldest segments are deleted when a topic reaches its maximum size (boolean).
delete_oldest_segments = false

# Partition configuration
[system.partition]
# Path for storing partition-related data (string).
Expand Down Expand Up @@ -349,6 +385,14 @@ messages_required_to_save = 10_000
# When a segment reaches this size, a new segment is created for subsequent data.
# Example: if `size` is set "1GB", the actual segment size may be 1GB + the size of remaining messages in received batch.
size = "1 GB"
# Configures the message time-based expiry setting.
# "none" means messages are kept indefinitely.
# A time value in human-readable format determines the lifespan of messages.
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
message_expiry = "none"

# Configures whether expired segments are archived (boolean) or just deleted without archiving.
archive_expired = false

# Controls whether to cache indexes for segment access (boolean).
# `true` keeps indexes in memory, speeding up data retrieval.
Expand Down
98 changes: 98 additions & 0 deletions integration/tests/archiver/disk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::archiver::ArchiverSetup;
use server::archiver::Archiver;
use server::streaming::utils::file;
use std::path::Path;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::test]
async fn should_init_base_archiver_directory() {
let setup = ArchiverSetup::init().await;
let archiver = setup.archiver();
let result = archiver.init().await;
assert!(result.is_ok());
let path = Path::new(&setup.archive_path);
assert!(path.exists());
}

#[tokio::test]
async fn should_archive_file_on_disk_by_making_a_copy_of_original_file() {
let setup = ArchiverSetup::init().await;
let archiver = setup.archiver();
let content = "hello world";
let file_to_archive_path = format!("{}/file_to_archive", setup.base_path);
create_file(&file_to_archive_path, content).await;
let files_to_archive = vec![file_to_archive_path.as_ref()];

let result = archiver.archive(&files_to_archive, None).await;
assert!(result.is_ok());
let archived_file_path = format!("{}/{}", setup.archive_path, file_to_archive_path);
assert_archived_file(&file_to_archive_path, &archived_file_path, content).await;
}

#[tokio::test]
async fn should_archive_file_on_disk_within_additional_base_directory() {
let setup = ArchiverSetup::init().await;
let archiver = setup.archiver();
let base_directory = "base";
let content = "hello world";
let file_to_archive_path = format!("{}/file_to_archive", setup.base_path);
create_file(&file_to_archive_path, content).await;
let files_to_archive = vec![file_to_archive_path.as_ref()];

let result = archiver
.archive(&files_to_archive, Some(base_directory.to_string()))
.await;
assert!(result.is_ok());
let archived_file_path = format!(
"{}/{base_directory}/{}",
setup.archive_path, file_to_archive_path
);
assert_archived_file(&file_to_archive_path, &archived_file_path, content).await;
}

#[tokio::test]
async fn should_return_true_when_file_is_archived() {
let setup = ArchiverSetup::init().await;
let archiver = setup.archiver();
let content = "hello world";
let file_to_archive_path = format!("{}/file_to_archive", setup.base_path);
create_file(&file_to_archive_path, content).await;
let files_to_archive = vec![file_to_archive_path.as_ref()];
archiver.archive(&files_to_archive, None).await.unwrap();

let is_archived = archiver.is_archived(&file_to_archive_path, None).await;
assert!(is_archived.is_ok());
assert!(is_archived.unwrap());
}

#[tokio::test]
async fn should_return_false_when_file_is_not_archived() {
let setup = ArchiverSetup::init().await;
let archiver = setup.archiver();
let content = "hello world";
let file_to_archive_path = format!("{}/file_to_archive", setup.base_path);
create_file(&file_to_archive_path, content).await;

let is_archived = archiver.is_archived(&file_to_archive_path, None).await;
assert!(is_archived.is_ok());
assert!(!is_archived.unwrap());
}

async fn create_file(path: &str, content: &str) {
let mut file = file::overwrite(path).await.unwrap();
file.write_all(content.as_bytes()).await.unwrap();
}

async fn assert_archived_file(file_to_archive_path: &str, archived_file_path: &str, content: &str) {
assert!(Path::new(&file_to_archive_path).exists());
assert!(Path::new(&archived_file_path).exists());
let archived_file = file::open(archived_file_path).await;
assert!(archived_file.is_ok());
let mut archived_file = archived_file.unwrap();
let mut archived_file_content = String::new();
archived_file
.read_to_string(&mut archived_file_content)
.await
.unwrap();
assert_eq!(content, archived_file_content);
}
40 changes: 40 additions & 0 deletions integration/tests/archiver/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use server::archiver::disk::DiskArchiver;
use server::configs::server::DiskArchiverConfig;
use tokio::fs::create_dir;
use uuid::Uuid;

mod disk;

pub struct ArchiverSetup {
base_path: String,
archive_path: String,
archiver: DiskArchiver,
}

impl ArchiverSetup {
pub async fn init() -> ArchiverSetup {
let base_path = format!("test_local_data_{}", Uuid::new_v4().to_u128_le());
let archive_path = format!("{}/archive", base_path);
let config = DiskArchiverConfig {
path: archive_path.clone(),
};
let archiver = DiskArchiver::new(config);
create_dir(&base_path).await.unwrap();

Self {
base_path,
archive_path,
archiver,
}
}

pub fn archiver(&self) -> &DiskArchiver {
&self.archiver
}
}

impl Drop for ArchiverSetup {
fn drop(&mut self) {
std::fs::remove_dir_all(&self.base_path).unwrap();
}
}
4 changes: 2 additions & 2 deletions integration/tests/config_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn validate_custom_env_provider() {
"IGGY_MESSAGE_SAVER_ENABLED",
expected_message_saver_enabled.to_string(),
);
env::set_var("IGGY_SYSTEM_RETENTION_POLICY_MESSAGE_EXPIRY", "10s");
env::set_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY", "10s");

let config_path = get_root_path().join("../configs/server.toml");
let file_config_provider = FileConfigProvider::new(config_path.as_path().display().to_string());
Expand All @@ -77,7 +77,7 @@ async fn validate_custom_env_provider() {
assert_eq!(config.tcp.enabled.to_string(), expected_tcp_enabled);
assert_eq!(config.message_saver.enabled, expected_message_saver_enabled);
assert_eq!(
config.system.retention_policy.message_expiry.to_string(),
config.system.segment.message_expiry.to_string(),
expected_message_expiry
);

Expand Down
1 change: 1 addition & 0 deletions integration/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod archiver;
mod bench;
mod cli;
mod config_provider;
Expand Down
1 change: 1 addition & 0 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {

segment.persist_messages().await.unwrap();

segment.is_closed = true;
let is_expired = segment.is_expired(now).await;
assert!(is_expired);
}
Expand Down
Loading

0 comments on commit d6e5a90

Please sign in to comment.