Skip to content

Commit

Permalink
Consolidate Index and TimeIndex into single Index
Browse files Browse the repository at this point in the history
  • Loading branch information
hubcio committed Oct 27, 2024
1 parent 4396aef commit 871229c
Show file tree
Hide file tree
Showing 44 changed files with 248 additions and 1,288 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@
"segment": {
"size": "1 GB",
"cache_indexes": true,
"cache_time_indexes": true,
"message_expiry": "none",
"archive_expired": false
},
Expand Down
7 changes: 1 addition & 6 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -437,16 +437,11 @@ message_expiry = "none"
# Configures whether expired segments are archived (boolean) or just deleted without archiving.
archive_expired = false

# Controls whether to cache indexes for segment access (boolean).
# Controls whether to cache indexes (time and positional) for segment access (boolean).
# `true` keeps indexes in memory, speeding up data retrieval.
# `false` reads indexes from disk, which can conserve memory at the cost of access speed.
cache_indexes = true

# Determines whether to cache time-based indexes for segments (boolean).
# `true` allows faster timestamp-based data retrieval by keeping indexes in memory.
# `false` conserves memory by reading time indexes from disk, which may slow down access.
cache_time_indexes = true

# Message deduplication configuration
[system.message_deduplication]
# Controls whether message deduplication is enabled (boolean).
Expand Down
4 changes: 1 addition & 3 deletions integration/tests/streaming/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use iggy::utils::timestamp::IggyTimestamp;
use server::state::system::PartitionState;
use server::streaming::batching::appendable_batch_info::AppendableBatchInfo;
use server::streaming::partitions::partition::Partition;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION};
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION};
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::Arc;
use tokio::fs;
Expand Down Expand Up @@ -202,10 +202,8 @@ async fn assert_persisted_partition(partition_path: &str, with_segment: bool) {
let segment_path = format!("{}/{:0>20}", partition_path, start_offset);
let log_path = format!("{}.{}", segment_path, LOG_EXTENSION);
let index_path = format!("{}.{}", segment_path, INDEX_EXTENSION);
let time_index_path = format!("{}.{}", segment_path, TIME_INDEX_EXTENSION);
assert!(fs::metadata(&log_path).await.is_ok());
assert!(fs::metadata(&index_path).await.is_ok());
assert!(fs::metadata(&time_index_path).await.is_ok());
}
}

Expand Down
5 changes: 1 addition & 4 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use iggy::utils::expiry::IggyExpiry;
use iggy::utils::{checksum, timestamp::IggyTimestamp};
use server::streaming::models::messages::RetainedMessage;
use server::streaming::segments::segment;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION};
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION};
use server::streaming::sizeable::Sizeable;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
Expand Down Expand Up @@ -111,7 +111,6 @@ async fn should_load_existing_segment_from_disk() {
assert_eq!(loaded_segment.is_closed, segment.is_closed);
assert_eq!(loaded_segment.log_path, segment.log_path);
assert_eq!(loaded_segment.index_path, segment.index_path);
assert_eq!(loaded_segment.time_index_path, segment.time_index_path);
assert!(loaded_messages.is_empty());
}
}
Expand Down Expand Up @@ -353,10 +352,8 @@ async fn assert_persisted_segment(partition_path: &str, start_offset: u64) {
let segment_path = format!("{}/{:0>20}", partition_path, start_offset);
let log_path = format!("{}.{}", segment_path, LOG_EXTENSION);
let index_path = format!("{}.{}", segment_path, INDEX_EXTENSION);
let time_index_path = format!("{}.{}", segment_path, TIME_INDEX_EXTENSION);
assert!(fs::metadata(&log_path).await.is_ok());
assert!(fs::metadata(&index_path).await.is_ok());
assert!(fs::metadata(&time_index_path).await.is_ok());
}

fn create_message(offset: u64, payload: &str, timestamp: IggyTimestamp) -> PolledMessage {
Expand Down
6 changes: 6 additions & 0 deletions sdk/src/utils/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ impl IggyDuration {
IggyDuration { duration }
}

pub fn new_from_secs(secs: u64) -> IggyDuration {
IggyDuration {
duration: Duration::from_secs(secs),
}
}

pub fn as_human_time_string(&self) -> String {
format!("{}", format_duration(self.duration))
}
Expand Down
30 changes: 23 additions & 7 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.60"
version = "0.4.61"
edition = "2021"
build = "src/build.rs"

Expand All @@ -12,7 +12,6 @@ tokio-console = ["dep:console-subscriber", "tokio/tracing"]
[dependencies]
ahash = { version = "0.8.11" }
anyhow = "1.0.86"
async-stream = "0.3.5"
async-trait = "0.1.82"
atone = "0.3.7"
axum = "0.7.5"
Expand All @@ -36,13 +35,29 @@ moka = { version = "0.12.5", features = ["future"] }
openssl = { version = "0.10.66", features = ["vendored"] }
opentelemetry = { version = "0.26.0", features = ["trace", "logs"] }
opentelemetry-appender-tracing = { version = "0.26.0", features = ["log"] }
opentelemetry-otlp = { version = "0.26.0", features = ["logs", "trace", "grpc-tonic", "http", "http-proto", "reqwest-client", "tokio"] }
opentelemetry-otlp = { version = "0.26.0", features = [
"logs",
"trace",
"grpc-tonic",
"http",
"http-proto",
"reqwest-client",
"tokio",
] }
opentelemetry-semantic-conventions = { version = "0.26.0" }
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio", "logs", "trace", "tokio"] }
opentelemetry_sdk = { version = "0.26.0", features = [
"rt-tokio",
"logs",
"trace",
"tokio",
] }
prometheus-client = "0.22.2"
quinn = { version = "0.11.5" }
rcgen = "0.13.1"
reqwest = { version = "0.12.4", features = ["rustls-tls", "rustls-tls-no-provider"] }
reqwest = { version = "0.12.4", features = [
"rustls-tls",
"rustls-tls-no-provider",
] }
ring = "0.17.8"
rmp-serde = "1.3.0"
rust-s3 = { version = "0.34.0", features = ["default"] }
Expand Down Expand Up @@ -79,10 +94,11 @@ tikv-jemallocator = { version = "0.6", optional = true }
[build-dependencies]
figment = { version = "0.10.18", features = ["json", "toml", "env"] }
serde_json = "1.0.127"
vergen-git2 = { version = "1.0.0", features = ["build",
vergen-git2 = { version = "1.0.0", features = [
"build",
"cargo",
"rustc",
"si"
"si",
] }

[[bin]]
Expand Down
6 changes: 1 addition & 5 deletions server/src/channels/commands/maintain_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,7 @@ async fn archive_segments(
}

let segment = segment.unwrap();
let files = [
segment.index_path.as_ref(),
segment.time_index_path.as_ref(),
segment.log_path.as_ref(),
];
let files = [segment.index_path.as_ref(), segment.log_path.as_ref()];
if let Err(error) = archiver.archive(&files, None).await {
error!(
"Failed to archive segment with start offset: {} for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}",
Expand Down
2 changes: 1 addition & 1 deletion server/src/channels/commands/print_sysinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ServerCommand<SysInfoPrintCommand> for SysInfoPrintExecutor {
/ stats.total_memory.as_bytes_u64() as f64)
* 100f64;

info!("CPU: {:.2}% / {:.2}% (IggyUsage/Total), Mem: {:.2}% / {} / {} / {} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages processed: {}, Read: {}, Written: {}, Run Time: {} s",
info!("CPU: {:.2}% / {:.2}% (IggyUsage/Total), Mem: {:.2}% / {} / {} / {} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages processed: {}, Read: {}, Written: {}, Uptime: {}",
stats.cpu_usage,
stats.total_cpu_usage,
free_memory_percent,
Expand Down
94 changes: 94 additions & 0 deletions server/src/compat/index_conversion/index_converter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::streaming::utils::file;
use crate::{server_error::ServerError, streaming::segments::storage::INDEX_SIZE};
use bytes::{BufMut, BytesMut};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tracing::trace;

pub struct IndexConverter {
pub index_path: String,
pub time_index_path: String,
}

impl IndexConverter {
pub fn new(index_path: String, time_index_path: String) -> Self {
Self {
index_path,
time_index_path,
}
}

pub async fn migrate(&self) -> Result<(), ServerError> {
let indexes = self.convert_indexes().await?;
self.replace_with_converted(indexes).await?;

Ok(())
}

pub async fn needs_migration(&self) -> Result<bool, ServerError> {
Ok(file::exists(&self.time_index_path).await?)
}

async fn convert_indexes(&self) -> Result<Vec<u8>, ServerError> {
let index_file = file::open(&self.index_path).await?;
let time_index_file = file::open(&self.time_index_path).await?;

let mut index_reader = BufReader::new(index_file);
let mut time_index_reader = BufReader::new(time_index_file);

let new_index = Vec::new();
let mut new_index_writer = BufWriter::new(new_index);

loop {
let relative_offset_result = index_reader.read_u32_le().await;
let position_result = index_reader.read_u32_le().await;

let time_relative_offset_result = time_index_reader.read_u32_le().await;
let timestamp_result = time_index_reader.read_u64_le().await;

if relative_offset_result.is_err()
|| position_result.is_err()
|| time_relative_offset_result.is_err()
|| timestamp_result.is_err()
{
trace!("Reached EOF for index file: {}", &self.index_path);
break;
}

let relative_offset = relative_offset_result?;
let position = position_result?;
let time_relative_offset = time_relative_offset_result?;
let timestamp = timestamp_result?;

if relative_offset != time_relative_offset {
return Err(ServerError::IndexMigrationError(
"Mismatched relative offsets in normal index: {relative_offset} vs time index: {time_relative_offset}".to_string(),
));
}

let mut new_index_entry = BytesMut::with_capacity(INDEX_SIZE as usize); // u32 + u32 + u64
new_index_entry.put_u32_le(relative_offset);
new_index_entry.put_u32_le(position);
new_index_entry.put_u64_le(timestamp);

new_index_writer.write_all(&new_index_entry).await?;
}
new_index_writer.flush().await?;

Ok(new_index_writer.into_inner())
}

async fn replace_with_converted(&self, indexes: Vec<u8>) -> Result<(), ServerError> {
let _ = file::remove(&self.index_path).await;
let _ = file::remove(&self.time_index_path).await;

let new_index_file = file::overwrite(&self.index_path).await?;

let mut new_index_writer = BufWriter::new(new_index_file);
new_index_writer.write_all(&indexes).await?;
new_index_writer.flush().await?;

trace!("Replaced old index with new index at {}", &self.index_path);

Ok(())
}
}
1 change: 1 addition & 0 deletions server/src/compat/index_conversion/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod index_converter;
5 changes: 0 additions & 5 deletions server/src/compat/message_conversion/binary_schema.rs

This file was deleted.

18 changes: 0 additions & 18 deletions server/src/compat/message_conversion/chunks_error.rs

This file was deleted.

Loading

0 comments on commit 871229c

Please sign in to comment.