diff --git a/Cargo.lock b/Cargo.lock index c4fbb03a7..b99ac85f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2095,7 +2095,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.33" +version = "0.6.34" dependencies = [ "aes-gcm", "ahash 0.8.11", @@ -4098,7 +4098,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.71" +version = "0.4.72" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/bench/src/benchmark_result.rs b/bench/src/benchmark_result.rs index 1c1d65314..cd890aabb 100644 --- a/bench/src/benchmark_result.rs +++ b/bench/src/benchmark_result.rs @@ -1,5 +1,6 @@ use crate::args::simple::BenchmarkKind; use colored::Colorize; +use iggy::utils::byte_size::IggyByteSize; use std::collections::HashSet; use std::{ fmt::{Display, Formatter}, @@ -14,7 +15,7 @@ pub struct BenchmarkResult { pub end_timestamp: Instant, pub average_latency: Duration, pub latency_percentiles: LatencyPercentiles, - pub total_size_bytes: u64, + pub total_size_bytes: IggyByteSize, pub total_messages: u64, } @@ -80,7 +81,7 @@ impl BenchmarkResults { .iter() .filter(&mut predicate) .map(|r| r.total_size_bytes) - .sum::(); + .sum::(); let total_duration = (self .results .iter() @@ -149,9 +150,11 @@ impl BenchmarkResults { / self.results.len() as u32) .as_secs_f64() * 1000.0; - let average_throughput = - total_size_bytes as f64 / total_duration / 1e6 / self.results.len() as f64; - let total_throughput = total_size_bytes as f64 / total_duration / 1e6; + let average_throughput = total_size_bytes.as_bytes_u64() as f64 + / total_duration + / 1e6 + / self.results.len() as f64; + let total_throughput = total_size_bytes.as_bytes_u64() as f64 / total_duration / 1e6; let messages_per_second = total_messages as f64 / total_duration; BenchmarkStatistics { diff --git a/bench/src/consumer.rs b/bench/src/consumer.rs index 6e80dcbc5..7983506e2 100644 --- a/bench/src/consumer.rs +++ b/bench/src/consumer.rs @@ -5,7 +5,9 @@ use iggy::clients::client::IggyClient; use iggy::consumer::Consumer as IggyConsumer; use iggy::error::IggyError; use iggy::messages::poll_messages::PollingStrategy; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::duration::IggyDuration; +use iggy::utils::sizeable::Sizeable; use integration::test_server::{login_root, ClientFactory}; use std::sync::Arc; use std::time::Duration; @@ -73,7 +75,7 @@ impl Consumer { }; let mut latencies: Vec = Vec::with_capacity(self.message_batches as usize); - let mut total_size_bytes = 0; + let mut total_size_bytes = IggyByteSize::default(); let mut current_iteration: u64 = 0; let mut received_messages = 0; let mut topic_not_found_counter = 0; @@ -187,7 +189,7 @@ impl Consumer { latencies.push(latency_end); received_messages += polled_messages.messages.len() as u64; for message in polled_messages.messages { - total_size_bytes += message.get_size_bytes() as u64; + total_size_bytes += message.get_size_bytes(); } current_iteration += 1; } @@ -210,24 +212,25 @@ impl Consumer { let duration = end_timestamp - start_timestamp; let average_latency: Duration = latencies.iter().sum::() / latencies.len() as u32; - let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6; + let average_throughput = + total_size_bytes.as_bytes_u64() as f64 / duration.as_secs_f64() / 1e6; info!( - "Consumer #{} → polled {} messages {} batches of {} messages in {:.2} s, total size: {} bytes, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms", - self.consumer_id, - total_messages, - self.message_batches, - self.messages_per_batch, - duration.as_secs_f64(), - total_size_bytes, - average_throughput, - p50.as_secs_f64() * 1000.0, - p90.as_secs_f64() * 1000.0, - p95.as_secs_f64() * 1000.0, - p99.as_secs_f64() * 1000.0, - p999.as_secs_f64() * 1000.0, - average_latency.as_secs_f64() * 1000.0 - ); + "Consumer #{} → polled {} messages {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms", + self.consumer_id, + total_messages, + self.message_batches, + self.messages_per_batch, + duration.as_secs_f64(), + total_size_bytes.as_human_string(), + average_throughput, + p50.as_secs_f64() * 1000.0, + p90.as_secs_f64() * 1000.0, + p95.as_secs_f64() * 1000.0, + p99.as_secs_f64() * 1000.0, + p999.as_secs_f64() * 1000.0, + average_latency.as_secs_f64() * 1000.0 + ); Ok(BenchmarkResult { kind: BenchmarkKind::Poll, diff --git a/bench/src/producer.rs b/bench/src/producer.rs index 65fa01c03..9d0f44a87 100644 --- a/bench/src/producer.rs +++ b/bench/src/producer.rs @@ -4,6 +4,7 @@ use iggy::client::MessageClient; use iggy::clients::client::IggyClient; use iggy::error::IggyError; use iggy::messages::send_messages::{Message, Partitioning}; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::duration::IggyDuration; use integration::test_server::{login_root, ClientFactory}; use std::str::FromStr; @@ -117,25 +118,26 @@ impl Producer { let duration = end_timestamp - start_timestamp; let average_latency: Duration = latencies.iter().sum::() / latencies.len() as u32; - let total_size_bytes = total_messages * self.message_size as u64; - let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6; + let total_size_bytes = IggyByteSize::from(total_messages * self.message_size as u64); + let average_throughput = + total_size_bytes.as_bytes_u64() as f64 / duration.as_secs_f64() / 1e6; info!( - "Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {} bytes, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms", - self.producer_id, - total_messages, - self.message_batches, - self.messages_per_batch, - duration.as_secs_f64(), - total_size_bytes, - average_throughput, - p50.as_secs_f64() * 1000.0, - p90.as_secs_f64() * 1000.0, - p95.as_secs_f64() * 1000.0, - p99.as_secs_f64() * 1000.0, - p999.as_secs_f64() * 1000.0, - average_latency.as_secs_f64() * 1000.0 - ); + "Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms", + self.producer_id, + total_messages, + self.message_batches, + self.messages_per_batch, + duration.as_secs_f64(), + total_size_bytes.as_human_string(), + average_throughput, + p50.as_secs_f64() * 1000.0, + p90.as_secs_f64() * 1000.0, + p95.as_secs_f64() * 1000.0, + p99.as_secs_f64() * 1000.0, + p999.as_secs_f64() * 1000.0, + average_latency.as_secs_f64() * 1000.0 + ); Ok(BenchmarkResult { kind: BenchmarkKind::Send, diff --git a/integration/tests/cli/message/test_message_poll_to_file_command.rs b/integration/tests/cli/message/test_message_poll_to_file_command.rs index fdc72728f..88635fb07 100644 --- a/integration/tests/cli/message/test_message_poll_to_file_command.rs +++ b/integration/tests/cli/message/test_message_poll_to_file_command.rs @@ -145,7 +145,7 @@ impl IggyCmdTestCase for TestMessagePollToFileCmd<'_> { self.topic_name, self.stream_name, self.topic_name, self.stream_name); let message_file = format!("Storing messages to {} binary file", self.output_file); let message_count = format!( - "Stored {} of total size [0-9]+ B to {} binary file", + "Stored {} of total size [0-9.]+ K?B to {} binary file", match self.message_count { 1 => "1 message".into(), _ => format!("{} messages", self.message_count), diff --git a/integration/tests/streaming/messages.rs b/integration/tests/streaming/messages.rs index 1e198cf3f..7c78fcc18 100644 --- a/integration/tests/streaming/messages.rs +++ b/integration/tests/streaming/messages.rs @@ -3,7 +3,9 @@ use bytes::Bytes; use iggy::bytes_serializable::BytesSerializable; use iggy::messages::send_messages::Message; use iggy::models::header::{HeaderKey, HeaderValue}; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; +use iggy::utils::sizeable::Sizeable; use iggy::utils::timestamp::IggyTimestamp; use server::configs::system::{PartitionConfig, SystemConfig}; use server::state::system::PartitionState; @@ -103,14 +105,17 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() { setup.create_partitions_directory(stream_id, topic_id).await; partition.persist().await.unwrap(); let appendable_batch_info = AppendableBatchInfo::new( - messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(), + messages + .iter() + .map(|msg| msg.get_size_bytes()) + .sum::(), partition.partition_id, ); let appendable_batch_info_two = AppendableBatchInfo::new( messages_two .iter() - .map(|msg| msg.get_size_bytes() as u64) - .sum(), + .map(|msg| msg.get_size_bytes()) + .sum::(), partition.partition_id, ); partition @@ -218,7 +223,10 @@ async fn should_persist_messages_and_then_load_them_from_disk() { setup.create_partitions_directory(stream_id, topic_id).await; partition.persist().await.unwrap(); let appendable_batch_info = AppendableBatchInfo::new( - messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(), + messages + .iter() + .map(|msg| msg.get_size_bytes()) + .sum::(), partition.partition_id, ); partition diff --git a/integration/tests/streaming/partition.rs b/integration/tests/streaming/partition.rs index 59dbe6e49..48a68d1fe 100644 --- a/integration/tests/streaming/partition.rs +++ b/integration/tests/streaming/partition.rs @@ -1,6 +1,8 @@ use crate::streaming::common::test_setup::TestSetup; use crate::streaming::create_messages; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; +use iggy::utils::sizeable::Sizeable; use iggy::utils::timestamp::IggyTimestamp; use server::state::system::PartitionState; use server::streaming::batching::appendable_batch_info::AppendableBatchInfo; @@ -176,7 +178,10 @@ async fn should_purge_existing_partition_on_disk() { let messages = create_messages(); let messages_count = messages.len(); let appendable_batch_info = AppendableBatchInfo::new( - messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(), + messages + .iter() + .map(|msg| msg.get_size_bytes()) + .sum::(), partition.partition_id, ); partition diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs index 70faa72e8..1ea614f41 100644 --- a/integration/tests/streaming/segment.rs +++ b/integration/tests/streaming/segment.rs @@ -2,12 +2,13 @@ use crate::streaming::common::test_setup::TestSetup; use bytes::Bytes; use iggy::bytes_serializable::BytesSerializable; use iggy::models::messages::{MessageState, PolledMessage}; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; use iggy::utils::{checksum, timestamp::IggyTimestamp}; +use server::streaming::local_sizeable::LocalSizeable; use server::streaming::models::messages::RetainedMessage; use server::streaming::segments::segment; use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION}; -use server::streaming::sizeable::Sizeable; use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::fs; @@ -151,7 +152,7 @@ async fn should_persist_and_load_segment_with_messages() { .await; let messages_count = 10; let mut messages = Vec::new(); - let mut batch_size = 0u64; + let mut batch_size = IggyByteSize::default(); for i in 0..messages_count { let message = create_message(i, "test", IggyTimestamp::now()); @@ -164,7 +165,7 @@ async fn should_persist_and_load_segment_with_messages() { headers: message.headers.map(|headers| headers.to_bytes()), payload: message.payload.clone(), }); - batch_size += retained_message.get_size_bytes() as u64; + batch_size += retained_message.get_size_bytes(); messages.push(retained_message); } @@ -235,7 +236,7 @@ async fn given_all_expired_messages_segment_should_be_expired() { let messages_count = 10; let now = IggyTimestamp::now(); let mut expired_timestamp = (now.as_micros() - 2 * message_expiry_ms).into(); - let mut batch_size = 0u64; + let mut batch_size = IggyByteSize::default(); let mut messages = Vec::new(); for i in 0..messages_count { let message = create_message(i, "test", expired_timestamp); @@ -250,7 +251,7 @@ async fn given_all_expired_messages_segment_should_be_expired() { headers: message.headers.map(|headers| headers.to_bytes()), payload: message.payload.clone(), }); - batch_size += retained_message.get_size_bytes() as u64; + batch_size += retained_message.get_size_bytes(); messages.push(retained_message); } segment @@ -316,7 +317,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() payload: expired_message.payload.clone(), }); let mut expired_messages = Vec::new(); - let expired_message_size = expired_retained_message.get_size_bytes() as u64; + let expired_message_size = expired_retained_message.get_size_bytes(); expired_messages.push(expired_retained_message); let mut not_expired_messages = Vec::new(); @@ -331,7 +332,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() .map(|headers| headers.to_bytes()), payload: not_expired_message.payload.clone(), }); - let not_expired_message_size = not_expired_retained_message.get_size_bytes() as u64; + let not_expired_message_size = not_expired_retained_message.get_size_bytes(); not_expired_messages.push(not_expired_retained_message); segment diff --git a/integration/tests/streaming/stream.rs b/integration/tests/streaming/stream.rs index 8e2742991..309972d98 100644 --- a/integration/tests/streaming/stream.rs +++ b/integration/tests/streaming/stream.rs @@ -3,7 +3,9 @@ use crate::streaming::create_messages; use iggy::identifier::Identifier; use iggy::messages::poll_messages::PollingStrategy; use iggy::messages::send_messages::Partitioning; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; +use iggy::utils::sizeable::Sizeable; use iggy::utils::timestamp::IggyTimestamp; use iggy::utils::topic_size::MaxTopicSize; use server::state::system::StreamState; @@ -127,7 +129,10 @@ async fn should_purge_existing_stream_on_disk() { let topic = stream .get_topic(&Identifier::numeric(topic_id).unwrap()) .unwrap(); - let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); + let batch_size = messages + .iter() + .map(|msg| msg.get_size_bytes()) + .sum::(); topic .append_messages(batch_size, Partitioning::partition_id(1), messages) .await diff --git a/integration/tests/streaming/topic.rs b/integration/tests/streaming/topic.rs index 2d24a7636..6e2ee0f80 100644 --- a/integration/tests/streaming/topic.rs +++ b/integration/tests/streaming/topic.rs @@ -8,7 +8,9 @@ use crate::streaming::create_messages; use iggy::compression::compression_algorithm::CompressionAlgorithm; use iggy::messages::poll_messages::PollingStrategy; use iggy::messages::send_messages::Partitioning; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; +use iggy::utils::sizeable::Sizeable; use iggy::utils::timestamp::IggyTimestamp; use iggy::utils::topic_size::MaxTopicSize; use server::state::system::{PartitionState, TopicState}; @@ -203,7 +205,10 @@ async fn should_purge_existing_topic_on_disk() { let messages = create_messages(); let messages_count = messages.len(); - let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); + let batch_size = messages + .iter() + .map(|msg| msg.get_size_bytes()) + .sum::(); topic .append_messages(batch_size, Partitioning::partition_id(1), messages) .await diff --git a/integration/tests/streaming/topic_messages.rs b/integration/tests/streaming/topic_messages.rs index 678626c1d..50e7a26c7 100644 --- a/integration/tests/streaming/topic_messages.rs +++ b/integration/tests/streaming/topic_messages.rs @@ -5,6 +5,7 @@ use iggy::messages::poll_messages::PollingStrategy; use iggy::messages::send_messages::{Message, Partitioning}; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; +use iggy::utils::sizeable::Sizeable; use iggy::utils::topic_size::MaxTopicSize; use server::configs::resource_quota::MemoryResourceQuota; use server::configs::system::{CacheConfig, SystemConfig}; @@ -78,7 +79,10 @@ async fn assert_polling_messages(cache: CacheConfig, expect_enabled_cache: bool) from_utf8(&message.payload).unwrap(), )) } - let batch_size = messages.iter().map(|m| m.get_size_bytes() as u64).sum(); + let batch_size = messages + .iter() + .map(|m| m.get_size_bytes()) + .sum::(); topic .append_messages(batch_size, partitioning, messages) .await @@ -119,7 +123,7 @@ async fn given_key_none_messages_should_be_appended_to_the_next_partition_using_ let partitioning = Partitioning::balanced(); for i in 1..=partitions_count * messages_per_partition_count { let payload = get_payload(i); - let batch_size = 16 + 4 + payload.len() as u64; + let batch_size = IggyByteSize::from(16 + 4 + payload.len() as u64); topic .append_messages( batch_size, @@ -144,7 +148,7 @@ async fn given_key_partition_id_messages_should_be_appended_to_the_chosen_partit let partitioning = Partitioning::partition_id(partition_id); for i in 1..=partitions_count * messages_per_partition_count { let payload = get_payload(i); - let batch_size = 16 + 4 + payload.len() as u64; + let batch_size = IggyByteSize::from(16 + 4 + payload.len() as u64); topic .append_messages( batch_size, @@ -173,7 +177,7 @@ async fn given_key_messages_key_messages_should_be_appended_to_the_calculated_pa for entity_id in 1..=partitions_count * messages_count { let payload = get_payload(entity_id); let partitioning = Partitioning::messages_key_u32(entity_id); - let batch_size = 16 + 4 + payload.len() as u64; + let batch_size = IggyByteSize::from(16 + 4 + payload.len() as u64); topic .append_messages( batch_size, diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 0cb2ecf21..a9451b5f0 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.33" +version = "0.6.34" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/binary/mapper.rs b/sdk/src/binary/mapper.rs index dbc20cca0..c1636a8c7 100644 --- a/sdk/src/binary/mapper.rs +++ b/sdk/src/binary/mapper.rs @@ -276,7 +276,7 @@ pub fn map_polled_messages(payload: Bytes) -> Result checksum, id, headers, - length: message_length, + length: IggyByteSize::from(message_length as u64), payload: Bytes::from(payload), }); diff --git a/sdk/src/cli/message/poll_messages.rs b/sdk/src/cli/message/poll_messages.rs index 510b82ea3..4b978d6d8 100644 --- a/sdk/src/cli/message/poll_messages.rs +++ b/sdk/src/cli/message/poll_messages.rs @@ -7,6 +7,7 @@ use crate::messages::poll_messages::{PollMessages, PollingStrategy}; use crate::messages::send_messages::Message; use crate::models::header::{HeaderKey, HeaderKind}; use crate::models::messages::PolledMessages; +use crate::utils::sizeable::Sizeable; use crate::utils::timestamp::IggyTimestamp; use crate::utils::{byte_size::IggyByteSize, duration::IggyDuration}; use anyhow::Context; @@ -192,7 +193,7 @@ impl CliCommand for PollMessagesCmd { if let Some(output_file) = &self.output_file { event!(target: PRINT_TARGET, Level::INFO, "Storing messages to {output_file} binary file"); - let mut saved_size = 0; + let mut saved_size = IggyByteSize::default(); let mut file = tokio::fs::OpenOptions::new() .append(true) .create(true) @@ -213,7 +214,8 @@ impl CliCommand for PollMessagesCmd { .with_context(|| format!("Problem writing message to file: {output_file}"))?; } - event!(target: PRINT_TARGET, Level::INFO, "Stored {message_count_message} of total size {saved_size} B to {output_file} binary file"); + let saved_size_str = saved_size.as_human_string(); + event!(target: PRINT_TARGET, Level::INFO, "Stored {message_count_message} of total size {saved_size_str} to {output_file} binary file"); } else { let message_header_keys = self.create_message_header_keys(&messages); diff --git a/sdk/src/cli/message/send_messages.rs b/sdk/src/cli/message/send_messages.rs index 588c78dfe..68e4d1921 100644 --- a/sdk/src/cli/message/send_messages.rs +++ b/sdk/src/cli/message/send_messages.rs @@ -4,6 +4,7 @@ use crate::client::Client; use crate::identifier::Identifier; use crate::messages::send_messages::{Message, Partitioning}; use crate::models::header::{HeaderKey, HeaderValue}; +use crate::utils::sizeable::Sizeable; use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; @@ -103,7 +104,7 @@ impl CliCommand for SendMessagesCmd { let message = Message::from_bytes(message_bytes); match message { Ok(message) => { - let message_size = message.get_size_bytes() as usize; + let message_size = message.get_size_bytes().as_bytes_usize(); messages.push(message); bytes_read += message_size; } diff --git a/sdk/src/clients/client.rs b/sdk/src/clients/client.rs index 8fae3b866..06481a7a0 100644 --- a/sdk/src/clients/client.rs +++ b/sdk/src/clients/client.rs @@ -30,6 +30,7 @@ use crate::models::user_status::UserStatus; use crate::partitioner::Partitioner; use crate::snapshot::{SnapshotCompression, SystemSnapshotType}; use crate::tcp::client::TcpClient; +use crate::utils::byte_size::IggyByteSize; use crate::utils::crypto::Encryptor; use crate::utils::duration::IggyDuration; use crate::utils::expiry::IggyExpiry; @@ -566,7 +567,7 @@ impl MessageClient for IggyClient { for message in &mut polled_messages.messages { let payload = encryptor.decrypt(&message.payload)?; message.payload = Bytes::from(payload); - message.length = message.payload.len() as u32; + message.length = IggyByteSize::from(message.payload.len() as u64); } } diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index cf99cf407..f01308215 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -6,6 +6,7 @@ use crate::identifier::{IdKind, Identifier}; use crate::locking::{IggySharedMut, IggySharedMutFn}; use crate::messages::poll_messages::{PollingKind, PollingStrategy}; use crate::models::messages::{PolledMessage, PolledMessages}; +use crate::utils::byte_size::IggyByteSize; use crate::utils::crypto::Encryptor; use crate::utils::duration::IggyDuration; use crate::utils::timestamp::IggyTimestamp; @@ -761,7 +762,7 @@ impl Stream for IggyConsumer { let payload = payload.unwrap(); message.payload = Bytes::from(payload); - message.length = message.payload.len() as u32; + message.length = IggyByteSize::from(message.payload.len() as u64); } } diff --git a/sdk/src/consumer_groups/create_consumer_group.rs b/sdk/src/consumer_groups/create_consumer_group.rs index e6abd22ae..a449e5c1e 100644 --- a/sdk/src/consumer_groups/create_consumer_group.rs +++ b/sdk/src/consumer_groups/create_consumer_group.rs @@ -3,6 +3,7 @@ use crate::command::{Command, CREATE_CONSUMER_GROUP_CODE}; use crate::consumer_groups::MAX_NAME_LENGTH; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::utils::text; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; @@ -90,9 +91,9 @@ impl BytesSerializable for CreateConsumerGroup { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = u32::from_le_bytes(bytes[position..position + 4].try_into()?); let group_id = if group_id == 0 { None } else { Some(group_id) }; let name_length = bytes[position + 4]; @@ -137,9 +138,9 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); let name_length = bytes[position + 4]; diff --git a/sdk/src/consumer_groups/delete_consumer_group.rs b/sdk/src/consumer_groups/delete_consumer_group.rs index 1d261ceca..e5f4f5298 100644 --- a/sdk/src/consumer_groups/delete_consumer_group.rs +++ b/sdk/src/consumer_groups/delete_consumer_group.rs @@ -2,6 +2,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::command::{Command, DELETE_CONSUMER_GROUP_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -58,9 +59,9 @@ impl BytesSerializable for DeleteConsumerGroup { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = Identifier::from_bytes(bytes.slice(position..))?; let command = DeleteConsumerGroup { stream_id, @@ -92,9 +93,9 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); assert!(!bytes.is_empty()); diff --git a/sdk/src/consumer_groups/get_consumer_group.rs b/sdk/src/consumer_groups/get_consumer_group.rs index 6780a0f43..a2eac45f1 100644 --- a/sdk/src/consumer_groups/get_consumer_group.rs +++ b/sdk/src/consumer_groups/get_consumer_group.rs @@ -2,6 +2,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::command::{Command, GET_CONSUMER_GROUP_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -58,9 +59,9 @@ impl BytesSerializable for GetConsumerGroup { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = Identifier::from_bytes(bytes.slice(position..))?; let command = GetConsumerGroup { stream_id, @@ -92,9 +93,9 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); assert!(!bytes.is_empty()); diff --git a/sdk/src/consumer_groups/get_consumer_groups.rs b/sdk/src/consumer_groups/get_consumer_groups.rs index 52cf5ed26..82ddd24b1 100644 --- a/sdk/src/consumer_groups/get_consumer_groups.rs +++ b/sdk/src/consumer_groups/get_consumer_groups.rs @@ -2,6 +2,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::command::{Command, GET_CONSUMER_GROUPS_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -50,7 +51,7 @@ impl BytesSerializable for GetConsumerGroups { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; let command = GetConsumerGroups { stream_id, @@ -82,7 +83,7 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); assert!(!bytes.is_empty()); diff --git a/sdk/src/consumer_groups/join_consumer_group.rs b/sdk/src/consumer_groups/join_consumer_group.rs index c507baebe..7d2df1aa9 100644 --- a/sdk/src/consumer_groups/join_consumer_group.rs +++ b/sdk/src/consumer_groups/join_consumer_group.rs @@ -2,6 +2,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::command::{Command, JOIN_CONSUMER_GROUP_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -58,9 +59,9 @@ impl BytesSerializable for JoinConsumerGroup { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = Identifier::from_bytes(bytes.slice(position..))?; let command = JoinConsumerGroup { stream_id, @@ -92,9 +93,9 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); assert!(!bytes.is_empty()); diff --git a/sdk/src/consumer_groups/leave_consumer_group.rs b/sdk/src/consumer_groups/leave_consumer_group.rs index 9c2aa8438..40c0215df 100644 --- a/sdk/src/consumer_groups/leave_consumer_group.rs +++ b/sdk/src/consumer_groups/leave_consumer_group.rs @@ -2,6 +2,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::command::{Command, LEAVE_CONSUMER_GROUP_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -58,9 +59,9 @@ impl BytesSerializable for LeaveConsumerGroup { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = Identifier::from_bytes(bytes.slice(position..))?; let command = LeaveConsumerGroup { stream_id, @@ -92,9 +93,9 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let group_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); assert!(!bytes.is_empty()); diff --git a/sdk/src/consumer_offsets/get_consumer_offset.rs b/sdk/src/consumer_offsets/get_consumer_offset.rs index 4d1bc5f43..c9b773e02 100644 --- a/sdk/src/consumer_offsets/get_consumer_offset.rs +++ b/sdk/src/consumer_offsets/get_consumer_offset.rs @@ -3,6 +3,7 @@ use crate::command::{Command, GET_CONSUMER_OFFSET_CODE}; use crate::consumer::{Consumer, ConsumerKind}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -84,15 +85,15 @@ impl BytesSerializable for GetConsumerOffset { let mut position = 0; let consumer_kind = ConsumerKind::from_code(bytes[0])?; let consumer_id = Identifier::from_bytes(bytes.slice(1..))?; - position += 1 + consumer_id.get_size_bytes() as usize; + position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); let consumer = Consumer { kind: consumer_kind, id: consumer_id, }; let stream_id = Identifier::from_bytes(bytes.slice(position..))?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into()?); let partition_id = if partition_id == 0 { None @@ -139,15 +140,15 @@ mod tests { let mut position = 0; let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap(); let consumer_id = Identifier::from_bytes(bytes.slice(1..)).unwrap(); - position += 1 + consumer_id.get_size_bytes() as usize; + position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); let consumer = Consumer { kind: consumer_kind, id: consumer_id, }; let stream_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); assert!(!bytes.is_empty()); diff --git a/sdk/src/consumer_offsets/store_consumer_offset.rs b/sdk/src/consumer_offsets/store_consumer_offset.rs index 2e6bb6e79..a76dfb539 100644 --- a/sdk/src/consumer_offsets/store_consumer_offset.rs +++ b/sdk/src/consumer_offsets/store_consumer_offset.rs @@ -3,6 +3,7 @@ use crate::command::{Command, STORE_CONSUMER_OFFSET_CODE}; use crate::consumer::{Consumer, ConsumerKind}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -84,15 +85,15 @@ impl BytesSerializable for StoreConsumerOffset { let mut position = 0; let consumer_kind = ConsumerKind::from_code(bytes[0])?; let consumer_id = Identifier::from_bytes(bytes.slice(1..))?; - position += 1 + consumer_id.get_size_bytes() as usize; + position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); let consumer = Consumer { kind: consumer_kind, id: consumer_id, }; let stream_id = Identifier::from_bytes(bytes.slice(position..))?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into()?); let partition_id = if partition_id == 0 { None @@ -143,15 +144,15 @@ mod tests { let mut position = 0; let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap(); let consumer_id = Identifier::from_bytes(bytes.slice(1..)).unwrap(); - position += 1 + consumer_id.get_size_bytes() as usize; + position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); let consumer = Consumer { kind: consumer_kind, id: consumer_id, }; let stream_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); let offset = u64::from_le_bytes(bytes[position + 4..position + 12].try_into().unwrap()); diff --git a/sdk/src/identifier.rs b/sdk/src/identifier.rs index a41662083..138b66f50 100644 --- a/sdk/src/identifier.rs +++ b/sdk/src/identifier.rs @@ -1,5 +1,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::error::IggyError; +use crate::utils::byte_size::IggyByteSize; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -113,11 +115,6 @@ impl Identifier { } } - /// Returns the size of the identifier in bytes. - pub fn get_size_bytes(&self) -> u32 { - 2 + u32::from(self.length) - } - /// Creates a new identifier from the given identifier. pub fn from_identifier(identifier: &Identifier) -> Self { Self { @@ -169,6 +166,12 @@ impl Identifier { } } +impl Sizeable for Identifier { + fn get_size_bytes(&self) -> IggyByteSize { + IggyByteSize::from(u64::from(self.length) + 2) + } +} + impl BytesSerializable for Identifier { fn to_bytes(&self) -> Bytes { let mut bytes = BytesMut::with_capacity(2 + self.length as usize); diff --git a/sdk/src/messages/poll_messages.rs b/sdk/src/messages/poll_messages.rs index 2acab5dcd..58cd31b23 100644 --- a/sdk/src/messages/poll_messages.rs +++ b/sdk/src/messages/poll_messages.rs @@ -3,6 +3,7 @@ use crate::command::{Command, POLL_MESSAGES_CODE}; use crate::consumer::{Consumer, ConsumerKind}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::utils::timestamp::IggyTimestamp; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; @@ -258,15 +259,15 @@ impl BytesSerializable for PollMessages { let mut position = 0; let consumer_kind = ConsumerKind::from_code(bytes[0])?; let consumer_id = Identifier::from_bytes(bytes.slice(1..))?; - position += 1 + consumer_id.get_size_bytes() as usize; + position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); let consumer = Consumer { kind: consumer_kind, id: consumer_id, }; let stream_id = Identifier::from_bytes(bytes.slice(position..))?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into()?); let partition_id = match partition_id { 0 => None, @@ -404,15 +405,15 @@ mod tests { let mut position = 0; let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap(); let consumer_id = Identifier::from_bytes(bytes.slice(1..)).unwrap(); - position += 1 + consumer_id.get_size_bytes() as usize; + position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); let consumer = Consumer { kind: consumer_kind, id: consumer_id, }; let stream_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); let polling_kind = PollingKind::from_code(bytes[position + 4]).unwrap(); position += 5; diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index 084902033..01d954bc4 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -5,6 +5,8 @@ use crate::identifier::Identifier; use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE}; use crate::models::header; use crate::models::header::{HeaderKey, HeaderValue}; +use crate::utils::byte_size::IggyByteSize; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -199,10 +201,11 @@ impl Partitioning { value: partitioning.value.clone(), } } +} - /// Get the size of the partitioning in bytes. - pub fn get_size_bytes(&self) -> u32 { - 2 + u32::from(self.length) +impl Sizeable for Partitioning { + fn get_size_bytes(&self) -> IggyByteSize { + IggyByteSize::from(u64::from(self.length) + 2) } } @@ -286,11 +289,12 @@ impl Message { headers, } } +} - /// Get the size of the message in bytes. - pub fn get_size_bytes(&self) -> u32 { +impl Sizeable for Message { + fn get_size_bytes(&self) -> IggyByteSize { // ID + Length + Payload + Headers - 16 + 4 + self.payload.len() as u32 + header::get_headers_size_bytes(&self.headers) + header::get_headers_size_bytes(&self.headers) + (16 + 4 + self.payload.len() as u64).into() } } @@ -358,7 +362,7 @@ impl BytesSerializable for Partitioning { impl BytesSerializable for Message { fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(self.get_size_bytes() as usize); + let mut bytes = BytesMut::with_capacity(self.get_size_bytes().as_bytes_usize()); bytes.put_u128_le(self.id); if let Some(headers) = &self.headers { let headers_bytes = headers.to_bytes(); @@ -420,12 +424,18 @@ pub(crate) fn as_bytes( partitioning: &Partitioning, messages: &[Message], ) -> Bytes { - let messages_size = messages.iter().map(Message::get_size_bytes).sum::(); + let messages_size = messages + .iter() + .map(Message::get_size_bytes) + .sum::(); let key_bytes = partitioning.to_bytes(); let stream_id_bytes = stream_id.to_bytes(); let topic_id_bytes = topic_id.to_bytes(); let mut bytes = BytesMut::with_capacity( - stream_id_bytes.len() + topic_id_bytes.len() + key_bytes.len() + messages_size as usize, + stream_id_bytes.len() + + topic_id_bytes.len() + + key_bytes.len() + + messages_size.as_bytes_usize(), ); bytes.put_slice(&stream_id_bytes); bytes.put_slice(&topic_id_bytes); @@ -473,17 +483,17 @@ impl BytesSerializable for SendMessages { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let key = Partitioning::from_bytes(bytes.slice(position..))?; - position += key.get_size_bytes() as usize; + position += key.get_size_bytes().as_bytes_usize(); let messages_payloads = bytes.slice(position..); position = 0; let mut messages = Vec::new(); while position < messages_payloads.len() { let message = Message::from_bytes(messages_payloads.slice(position..))?; - position += message.get_size_bytes() as usize; + position += message.get_size_bytes().as_bytes_usize(); messages.push(message); } @@ -562,11 +572,11 @@ mod tests { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let key = Partitioning::from_bytes(bytes.slice(position..)).unwrap(); - position += key.get_size_bytes() as usize; + position += key.get_size_bytes().as_bytes_usize(); let messages = bytes.slice(position..); let command_messages = command .messages @@ -618,7 +628,7 @@ mod tests { let mut messages = Vec::new(); while position < messages_payloads.len() { let message = Message::from_bytes(messages_payloads.slice(position..)).unwrap(); - position += message.get_size_bytes() as usize; + position += message.get_size_bytes().as_bytes_usize(); messages.push(message); } diff --git a/sdk/src/models/header.rs b/sdk/src/models/header.rs index 1ae687ae3..bf17c06f0 100644 --- a/sdk/src/models/header.rs +++ b/sdk/src/models/header.rs @@ -1,5 +1,6 @@ use crate::bytes_serializable::BytesSerializable; use crate::error::IggyError; +use crate::utils::byte_size::IggyByteSize; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use serde_with::base64::Base64; @@ -623,7 +624,7 @@ impl BytesSerializable for HashMap { } /// Returns the size in bytes of the specified headers. -pub fn get_headers_size_bytes(headers: &Option>) -> u32 { +pub fn get_headers_size_bytes(headers: &Option>) -> IggyByteSize { // Headers length field let mut size = 4; if let Some(headers) = headers { @@ -632,7 +633,7 @@ pub fn get_headers_size_bytes(headers: &Option>) size += 4 + key.as_str().len() as u32 + 1 + 4 + value.value.len() as u32; } } - size + (size as u64).into() } #[cfg(test)] diff --git a/sdk/src/models/messages.rs b/sdk/src/models/messages.rs index e16a46a7e..b4018531b 100644 --- a/sdk/src/models/messages.rs +++ b/sdk/src/models/messages.rs @@ -2,6 +2,8 @@ use crate::bytes_serializable::BytesSerializable; use crate::error::IggyError; use crate::models::header; use crate::models::header::{HeaderKey, HeaderValue}; +use crate::utils::byte_size::IggyByteSize; +use crate::utils::sizeable::Sizeable; use crate::utils::timestamp::IggyTimestamp; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -55,7 +57,7 @@ pub struct PolledMessage { pub headers: Option>, /// The length of the payload. #[serde(skip)] - pub length: u32, + pub length: IggyByteSize, /// The binary payload of the message. #[serde_as(as = "Base64")] pub payload: Bytes, @@ -139,8 +141,7 @@ impl PolledMessage { timestamp: timestamp.as_micros(), id, checksum, - #[allow(clippy::cast_possible_truncation)] - length: payload.len() as u32, + length: IggyByteSize::from(payload.len() as u64), payload, headers, } @@ -151,12 +152,6 @@ impl PolledMessage { self.timestamp.into() } - /// Returns the size of the message in bytes. - pub fn get_size_bytes(&self) -> u32 { - // Offset + State + Timestamp + ID + Checksum + Length + Payload + Headers - 8 + 1 + 8 + 16 + 4 + 4 + self.length + header::get_headers_size_bytes(&self.headers) - } - /// Extends the provided bytes with the message. pub fn extend(&self, bytes: &mut BytesMut) { bytes.put_u64_le(self.offset); @@ -172,7 +167,16 @@ impl PolledMessage { } else { bytes.put_u32_le(0u32); } - bytes.put_u32_le(self.length); + bytes.put_u32_le(self.length.as_bytes_u64() as u32); bytes.put_slice(&self.payload); } } + +impl Sizeable for PolledMessage { + fn get_size_bytes(&self) -> IggyByteSize { + // Offset + State + Timestamp + ID + Checksum + Length + Payload + Headers + header::get_headers_size_bytes(&self.headers) + + self.length + + IggyByteSize::from(8 + 1 + 8 + 16 + 4 + 4) + } +} diff --git a/sdk/src/partitions/create_partitions.rs b/sdk/src/partitions/create_partitions.rs index 3df331afb..3206b0a8d 100644 --- a/sdk/src/partitions/create_partitions.rs +++ b/sdk/src/partitions/create_partitions.rs @@ -3,6 +3,7 @@ use crate::command::{Command, CREATE_PARTITIONS_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; use crate::partitions::MAX_PARTITIONS_COUNT; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -69,9 +70,9 @@ impl BytesSerializable for CreatePartitions { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partitions_count = u32::from_le_bytes(bytes[position..position + 4].try_into()?); let command = CreatePartitions { stream_id, @@ -108,9 +109,9 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partitions_count = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); diff --git a/sdk/src/partitions/delete_partitions.rs b/sdk/src/partitions/delete_partitions.rs index a46f4981b..9d5e1f64a 100644 --- a/sdk/src/partitions/delete_partitions.rs +++ b/sdk/src/partitions/delete_partitions.rs @@ -3,6 +3,7 @@ use crate::command::{Command, DELETE_PARTITIONS_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; use crate::partitions::MAX_PARTITIONS_COUNT; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -69,9 +70,9 @@ impl BytesSerializable for DeletePartitions { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partitions_count = u32::from_le_bytes(bytes[position..position + 4].try_into()?); let command = DeletePartitions { stream_id, @@ -108,9 +109,9 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let partitions_count = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); diff --git a/sdk/src/streams/update_stream.rs b/sdk/src/streams/update_stream.rs index 47ac02339..33d6bd240 100644 --- a/sdk/src/streams/update_stream.rs +++ b/sdk/src/streams/update_stream.rs @@ -3,6 +3,7 @@ use crate::command::{Command, UPDATE_STREAM_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; use crate::streams::MAX_NAME_LENGTH; +use crate::utils::sizeable::Sizeable; use crate::utils::text; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; @@ -70,7 +71,7 @@ impl BytesSerializable for UpdateStream { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let name_length = bytes[position]; let name = from_utf8(&bytes[position + 1..position + 1 + name_length as usize])?.to_string(); @@ -103,7 +104,7 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let name_length = bytes[position]; let name = from_utf8(&bytes[position + 1..position + 1 + name_length as usize]) .unwrap() diff --git a/sdk/src/topics/create_topic.rs b/sdk/src/topics/create_topic.rs index 31c41f744..746861c1c 100644 --- a/sdk/src/topics/create_topic.rs +++ b/sdk/src/topics/create_topic.rs @@ -5,6 +5,7 @@ use crate::error::IggyError; use crate::identifier::Identifier; use crate::topics::{MAX_NAME_LENGTH, MAX_PARTITIONS_COUNT}; use crate::utils::expiry::IggyExpiry; +use crate::utils::sizeable::Sizeable; use crate::utils::text; use crate::utils::topic_size::MaxTopicSize; use crate::validatable::Validatable; @@ -121,7 +122,7 @@ impl BytesSerializable for CreateTopic { } let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = u32::from_le_bytes(bytes[position..position + 4].try_into()?); let topic_id = if topic_id == 0 { None } else { Some(topic_id) }; let partitions_count = u32::from_le_bytes(bytes[position + 4..position + 8].try_into()?); @@ -190,7 +191,7 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); let partitions_count = u32::from_le_bytes(bytes[position + 4..position + 8].try_into().unwrap()); diff --git a/sdk/src/topics/delete_topic.rs b/sdk/src/topics/delete_topic.rs index 4ca7e91b1..7e7604fd6 100644 --- a/sdk/src/topics/delete_topic.rs +++ b/sdk/src/topics/delete_topic.rs @@ -2,6 +2,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::command::{Command, DELETE_TOPIC_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -50,7 +51,7 @@ impl BytesSerializable for DeleteTopic { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; let command = DeleteTopic { stream_id, @@ -82,7 +83,7 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); assert!(!bytes.is_empty()); diff --git a/sdk/src/topics/get_topic.rs b/sdk/src/topics/get_topic.rs index b74e8c991..18878d407 100644 --- a/sdk/src/topics/get_topic.rs +++ b/sdk/src/topics/get_topic.rs @@ -2,6 +2,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::command::{Command, GET_TOPIC_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -50,7 +51,7 @@ impl BytesSerializable for GetTopic { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; let command = GetTopic { stream_id, @@ -82,7 +83,7 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); assert!(!bytes.is_empty()); diff --git a/sdk/src/topics/purge_topic.rs b/sdk/src/topics/purge_topic.rs index 0313132b5..63102352e 100644 --- a/sdk/src/topics/purge_topic.rs +++ b/sdk/src/topics/purge_topic.rs @@ -2,6 +2,7 @@ use crate::bytes_serializable::BytesSerializable; use crate::command::{Command, PURGE_TOPIC_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -50,7 +51,7 @@ impl BytesSerializable for PurgeTopic { let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; let command = PurgeTopic { stream_id, @@ -80,7 +81,7 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); assert!(!bytes.is_empty()); diff --git a/sdk/src/topics/update_topic.rs b/sdk/src/topics/update_topic.rs index 98257cf47..1b935c4b1 100644 --- a/sdk/src/topics/update_topic.rs +++ b/sdk/src/topics/update_topic.rs @@ -5,6 +5,7 @@ use crate::error::IggyError; use crate::identifier::Identifier; use crate::topics::MAX_NAME_LENGTH; use crate::utils::expiry::IggyExpiry; +use crate::utils::sizeable::Sizeable; use crate::utils::text; use crate::utils::topic_size::MaxTopicSize; use crate::validatable::Validatable; @@ -111,9 +112,9 @@ impl BytesSerializable for UpdateTopic { } let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let compression_algorithm = CompressionAlgorithm::from_code(bytes[position])?; position += 1; let message_expiry = u64::from_le_bytes(bytes[position..position + 8].try_into()?); @@ -179,9 +180,9 @@ mod tests { let bytes = command.to_bytes(); let mut position = 0; let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes() as usize; + position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes() as usize; + position += topic_id.get_size_bytes().as_bytes_usize(); let compression_algorithm = CompressionAlgorithm::from_code(bytes[position]).unwrap(); position += 1; let message_expiry = u64::from_le_bytes(bytes[position..position + 8].try_into().unwrap()); diff --git a/sdk/src/users/change_password.rs b/sdk/src/users/change_password.rs index af4200f8a..4e67d2034 100644 --- a/sdk/src/users/change_password.rs +++ b/sdk/src/users/change_password.rs @@ -3,6 +3,7 @@ use crate::command::{Command, CHANGE_PASSWORD_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; use crate::users::defaults::*; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -81,7 +82,7 @@ impl BytesSerializable for ChangePassword { } let user_id = Identifier::from_bytes(bytes.clone())?; - let mut position = user_id.get_size_bytes() as usize; + let mut position = user_id.get_size_bytes().as_bytes_usize(); let current_password_length = bytes[position]; position += 1; let current_password = @@ -121,7 +122,7 @@ mod tests { let bytes = command.to_bytes(); let user_id = Identifier::from_bytes(bytes.clone()).unwrap(); - let mut position = user_id.get_size_bytes() as usize; + let mut position = user_id.get_size_bytes().as_bytes_usize(); let current_password_length = bytes[position]; position += 1; let current_password = diff --git a/sdk/src/users/update_permissions.rs b/sdk/src/users/update_permissions.rs index 760fa0dc3..fa8644466 100644 --- a/sdk/src/users/update_permissions.rs +++ b/sdk/src/users/update_permissions.rs @@ -3,6 +3,7 @@ use crate::command::{Command, UPDATE_PERMISSIONS_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; use crate::models::permissions::Permissions; +use crate::utils::sizeable::Sizeable; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -55,7 +56,7 @@ impl BytesSerializable for UpdatePermissions { } let user_id = Identifier::from_bytes(bytes.clone())?; - let mut position = user_id.get_size_bytes() as usize; + let mut position = user_id.get_size_bytes().as_bytes_usize(); let has_permissions = bytes[position]; if has_permissions > 1 { return Err(IggyError::InvalidCommand); @@ -106,7 +107,7 @@ mod tests { }; let bytes = command.to_bytes(); let user_id = Identifier::from_bytes(bytes.clone()).unwrap(); - let mut position = user_id.get_size_bytes() as usize; + let mut position = user_id.get_size_bytes().as_bytes_usize(); let has_permissions = bytes[position]; position += 1; let permissions_length = diff --git a/sdk/src/users/update_user.rs b/sdk/src/users/update_user.rs index 3bd705f03..03bd3acda 100644 --- a/sdk/src/users/update_user.rs +++ b/sdk/src/users/update_user.rs @@ -4,6 +4,7 @@ use crate::error::IggyError; use crate::identifier::Identifier; use crate::models::user_status::UserStatus; use crate::users::defaults::*; +use crate::utils::sizeable::Sizeable; use crate::utils::text; use crate::validatable::Validatable; use bytes::{BufMut, Bytes, BytesMut}; @@ -81,7 +82,7 @@ impl BytesSerializable for UpdateUser { } let user_id = Identifier::from_bytes(bytes.clone())?; - let mut position = user_id.get_size_bytes() as usize; + let mut position = user_id.get_size_bytes().as_bytes_usize(); let has_username = bytes[position]; if has_username > 1 { return Err(IggyError::InvalidCommand); @@ -146,7 +147,7 @@ mod tests { let bytes = command.to_bytes(); let user_id = Identifier::from_bytes(bytes.clone()).unwrap(); - let mut position = user_id.get_size_bytes() as usize; + let mut position = user_id.get_size_bytes().as_bytes_usize(); let has_username = bytes[position]; position += 1; let username_length = bytes[position]; diff --git a/sdk/src/utils/byte_size.rs b/sdk/src/utils/byte_size.rs index 8d15514c3..e2a783f9a 100644 --- a/sdk/src/utils/byte_size.rs +++ b/sdk/src/utils/byte_size.rs @@ -4,7 +4,8 @@ use byte_unit::{Byte, UnitType}; use core::fmt; use serde::{Deserialize, Serialize}; use std::{ - ops::{Add, AddAssign, Sub}, + iter::Sum, + ops::{Add, AddAssign, Sub, SubAssign}, str::FromStr, }; @@ -49,6 +50,11 @@ impl IggyByteSize { self.0.as_u64() } + /// Returns the byte size as a `usize`. + pub fn as_bytes_usize(&self) -> usize { + self.as_bytes_u64() as usize + } + /// Returns a human-readable string representation of the byte size using decimal units. pub fn as_human_string(&self) -> String { format!("{:.2}", self.0.get_appropriate_unit(UnitType::Decimal)) @@ -115,6 +121,12 @@ impl PartialOrd for IggyByteSize { } } +impl PartialOrd for IggyByteSize { + fn partial_cmp(&self, other: &IggyByteSize) -> Option { + self.as_bytes_u64().partial_cmp(&other.as_bytes_u64()) + } +} + impl fmt::Display for IggyByteSize { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.as_human_string()) @@ -143,6 +155,18 @@ impl Sub for IggyByteSize { } } +impl SubAssign for IggyByteSize { + fn sub_assign(&mut self, rhs: Self) { + self.0 = Byte::from_u64(self.as_bytes_u64() - rhs.as_bytes_u64()); + } +} + +impl Sum for IggyByteSize { + fn sum>(iter: I) -> Self { + iter.fold(IggyByteSize::default(), |acc, ibs| acc + ibs) + } +} + #[cfg(test)] mod tests { use super::*; @@ -256,4 +280,20 @@ mod tests { "18.45 EB/s" ); } + + #[test] + fn test_order() { + assert!(IggyByteSize::from(u64::MAX) > IggyByteSize::from(u64::MIN)) + } + + #[test] + fn test_sum() { + let r = 1..10; + assert_eq!( + r.clone().sum::(), + r.map(IggyByteSize::from) + .sum::() + .as_bytes_u64() + ); + } } diff --git a/sdk/src/utils/mod.rs b/sdk/src/utils/mod.rs index 74ea2ff3e..7516f4055 100644 --- a/sdk/src/utils/mod.rs +++ b/sdk/src/utils/mod.rs @@ -4,6 +4,7 @@ pub mod crypto; pub mod duration; pub mod expiry; pub mod personal_access_token_expiry; +pub mod sizeable; pub mod text; pub mod timestamp; pub mod topic_size; diff --git a/sdk/src/utils/sizeable.rs b/sdk/src/utils/sizeable.rs new file mode 100644 index 000000000..dd093fbf0 --- /dev/null +++ b/sdk/src/utils/sizeable.rs @@ -0,0 +1,6 @@ +use super::byte_size::IggyByteSize; + +/// Trait for types that return their size in bytes. +pub trait Sizeable { + fn get_size_bytes(&self) -> IggyByteSize; +} diff --git a/server/Cargo.toml b/server/Cargo.toml index ceb2fcd91..d912bc39c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.71" +version = "0.4.72" edition = "2021" build = "src/build.rs" diff --git a/server/src/binary/mapper.rs b/server/src/binary/mapper.rs index 005d623b6..efec21017 100644 --- a/server/src/binary/mapper.rs +++ b/server/src/binary/mapper.rs @@ -12,6 +12,8 @@ use iggy::models::consumer_offset_info::ConsumerOffsetInfo; use iggy::models::messages::PolledMessages; use iggy::models::stats::Stats; use iggy::models::user_info::UserId; +use iggy::utils::byte_size::IggyByteSize; +use iggy::utils::sizeable::Sizeable; use tokio::sync::RwLock; pub fn map_stats(stats: &Stats) -> Bytes { @@ -123,9 +125,9 @@ pub fn map_polled_messages(polled_messages: &PolledMessages) -> Bytes { .messages .iter() .map(|message| message.get_size_bytes()) - .sum::(); + .sum::(); - let mut bytes = BytesMut::with_capacity(20 + messages_size as usize); + let mut bytes = BytesMut::with_capacity(20 + messages_size.as_bytes_usize()); bytes.put_u32_le(polled_messages.partition_id); bytes.put_u64_le(polled_messages.current_offset); bytes.put_u32_le(messages_count); @@ -214,7 +216,7 @@ fn extend_topic(topic: &Topic, bytes: &mut BytesMut) { bytes.put_u8(topic.compression_algorithm.as_code()); bytes.put_u64_le(topic.max_topic_size.into()); bytes.put_u8(topic.replication_factor); - bytes.put_u64_le(topic.get_size().as_bytes_u64()); + bytes.put_u64_le(topic.get_size_bytes().as_bytes_u64()); bytes.put_u64_le(topic.get_messages_count()); bytes.put_u8(topic.name.len() as u8); bytes.put_slice(topic.name.as_bytes()); @@ -225,7 +227,7 @@ fn extend_partition(partition: &Partition, bytes: &mut BytesMut) { bytes.put_u64_le(partition.created_at.into()); bytes.put_u32_le(partition.get_segments().len() as u32); bytes.put_u64_le(partition.current_offset); - bytes.put_u64_le(partition.get_size_bytes()); + bytes.put_u64_le(partition.get_size_bytes().as_bytes_u64()); bytes.put_u64_le(partition.get_messages_count()); } diff --git a/server/src/compat/storage_conversion/mod.rs b/server/src/compat/storage_conversion/mod.rs index 2a4e8d9dc..7bcdf623e 100644 --- a/server/src/compat/storage_conversion/mod.rs +++ b/server/src/compat/storage_conversion/mod.rs @@ -19,6 +19,7 @@ use crate::streaming::topics::topic::Topic; use async_trait::async_trait; use iggy::consumer::ConsumerKind; use iggy::error::IggyError; +use iggy::utils::byte_size::IggyByteSize; use std::path::Path; use std::sync::Arc; use tokio::fs::{read_dir, rename}; @@ -234,8 +235,8 @@ impl SegmentStorage for NoopSegmentStorage { &self, _segment: &Segment, _batch: RetainedMessageBatch, - ) -> Result { - Ok(0) + ) -> Result { + Ok(IggyByteSize::default()) } async fn load_message_ids(&self, _segment: &Segment) -> Result, IggyError> { diff --git a/server/src/configs/resource_quota.rs b/server/src/configs/resource_quota.rs index 6eae9fcdf..97e00f4b0 100644 --- a/server/src/configs/resource_quota.rs +++ b/server/src/configs/resource_quota.rs @@ -14,15 +14,15 @@ pub enum MemoryResourceQuota { impl MemoryResourceQuota { /// Converts the resource quota into bytes. /// NOTE: This is a blocking operation and it's slow. Don't use it in the hot path. - pub fn into(self) -> u64 { + pub fn into(self) -> IggyByteSize { match self { - MemoryResourceQuota::Bytes(byte) => byte.as_bytes_u64(), + MemoryResourceQuota::Bytes(byte) => byte, MemoryResourceQuota::Percentage(percentage) => { let mut sys = System::new_all(); sys.refresh_all(); let total_memory = sys.total_memory(); - (total_memory as f64 * (percentage as f64 / 100.0)) as u64 + IggyByteSize::from((total_memory as f64 * (percentage as f64 / 100.0)) as u64) } } } @@ -134,7 +134,9 @@ mod tests { #[test] fn test_serialize() { - let quota: u64 = MemoryResourceQuota::Bytes(IggyByteSize::from_str("4GB").unwrap()).into(); + let quota: u64 = MemoryResourceQuota::Bytes(IggyByteSize::from_str("4GB").unwrap()) + .into() + .as_bytes_u64(); let serialized = serde_json::to_string("a).unwrap(); assert_eq!(serialized, json!(4000000000u32).to_string()); diff --git a/server/src/configs/validators.rs b/server/src/configs/validators.rs index 3f0a1d19f..c2aab9d30 100644 --- a/server/src/configs/validators.rs +++ b/server/src/configs/validators.rs @@ -115,9 +115,9 @@ impl Validatable for CacheConfig { ); let total_memory = sys.total_memory(); let free_memory = sys.free_memory(); - let cache_percentage = (limit_bytes as f64 / total_memory as f64) * 100.0; + let cache_percentage = (limit_bytes.as_bytes_u64() as f64 / total_memory as f64) * 100.0; - let pretty_cache_limit = IggyByteSize::from(limit_bytes).as_human_string(); + let pretty_cache_limit = limit_bytes.as_human_string(); let pretty_total_memory = IggyByteSize::from(total_memory).as_human_string(); let pretty_free_memory = IggyByteSize::from(free_memory).as_human_string(); @@ -150,7 +150,7 @@ impl Validatable for CacheConfig { impl Validatable for SegmentConfig { fn validate(&self) -> Result<(), ServerError> { - if self.size.as_bytes_u64() as u32 > segment::MAX_SIZE_BYTES { + if self.size > segment::MAX_SIZE_BYTES { return Err(ServerError::InvalidConfiguration(format!( "Segment size cannot be greater than: {} bytes.", segment::MAX_SIZE_BYTES diff --git a/server/src/http/http_server.rs b/server/src/http/http_server.rs index 775f11c6d..23a741a7d 100644 --- a/server/src/http/http_server.rs +++ b/server/src/http/http_server.rs @@ -125,6 +125,7 @@ async fn build_app_state(config: &HttpConfig, system: SharedSystem) -> Arc CorsLayer { let allowed_origins = match config.allowed_origins { origins if origins.is_empty() => AllowOrigin::default(), diff --git a/server/src/http/mapper.rs b/server/src/http/mapper.rs index 842f7e75a..62d2d847a 100644 --- a/server/src/http/mapper.rs +++ b/server/src/http/mapper.rs @@ -14,6 +14,7 @@ use iggy::models::personal_access_token::PersonalAccessTokenInfo; use iggy::models::stream::StreamDetails; use iggy::models::topic::TopicDetails; use iggy::models::user_info::{UserInfo, UserInfoDetails}; +use iggy::utils::sizeable::Sizeable; use tokio::sync::RwLock; pub fn map_stream(stream: &Stream) -> StreamDetails { @@ -56,7 +57,7 @@ pub fn map_topics(topics: &[&Topic]) -> Vec { id: topic.topic_id, created_at: topic.created_at, name: topic.name.clone(), - size: topic.get_size(), + size: topic.get_size_bytes(), partitions_count: topic.get_partitions().len() as u32, messages_count: topic.get_messages_count(), message_expiry: topic.message_expiry, @@ -75,7 +76,7 @@ pub async fn map_topic(topic: &Topic) -> TopicDetails { id: topic.topic_id, created_at: topic.created_at, name: topic.name.clone(), - size: topic.get_size(), + size: topic.get_size_bytes(), messages_count: topic.get_messages_count(), partitions_count: topic.get_partitions().len() as u32, partitions: Vec::new(), @@ -93,7 +94,7 @@ pub async fn map_topic(topic: &Topic) -> TopicDetails { created_at: partition.created_at, segments_count: partition.get_segments().len() as u32, current_offset: partition.current_offset, - size: partition.get_size_bytes().into(), + size: partition.get_size_bytes(), messages_count: partition.get_messages_count(), }); } diff --git a/server/src/streaming/batching/appendable_batch_info.rs b/server/src/streaming/batching/appendable_batch_info.rs index 0bbcc9563..3f3161e3e 100644 --- a/server/src/streaming/batching/appendable_batch_info.rs +++ b/server/src/streaming/batching/appendable_batch_info.rs @@ -1,11 +1,13 @@ +use iggy::utils::byte_size::IggyByteSize; + #[derive(Debug)] pub struct AppendableBatchInfo { - pub batch_size: u64, + pub batch_size: IggyByteSize, pub partition_id: u32, } impl AppendableBatchInfo { - pub fn new(batch_size: u64, partition_id: u32) -> Self { + pub fn new(batch_size: IggyByteSize, partition_id: u32) -> Self { Self { batch_size, partition_id, diff --git a/server/src/streaming/batching/batch_accumulator.rs b/server/src/streaming/batching/batch_accumulator.rs index 86382927c..65f6f8471 100644 --- a/server/src/streaming/batching/batch_accumulator.rs +++ b/server/src/streaming/batching/batch_accumulator.rs @@ -1,12 +1,15 @@ use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; -use crate::streaming::{models::messages::RetainedMessage, sizeable::Sizeable}; +use crate::streaming::local_sizeable::LocalSizeable; +use crate::streaming::models::messages::RetainedMessage; use bytes::BytesMut; +use iggy::utils::byte_size::IggyByteSize; +use iggy::utils::sizeable::Sizeable; use std::sync::Arc; #[derive(Debug)] pub struct BatchAccumulator { base_offset: u64, - current_size: u64, + current_size: IggyByteSize, current_offset: u64, current_timestamp: u64, capacity: u64, @@ -17,7 +20,7 @@ impl BatchAccumulator { pub fn new(base_offset: u64, capacity: usize) -> Self { Self { base_offset, - current_size: 0, + current_size: IggyByteSize::from(0), current_offset: 0, current_timestamp: 0, capacity: capacity as u64, @@ -25,7 +28,7 @@ impl BatchAccumulator { } } - pub fn append(&mut self, batch_size: u64, items: &[Arc]) { + pub fn append(&mut self, batch_size: IggyByteSize, items: &[Arc]) { assert!(!items.is_empty()); self.current_size += batch_size; self.current_offset = items.last().unwrap().offset; @@ -65,17 +68,13 @@ impl BatchAccumulator { self.base_offset } - pub fn get_size_bytes(&self) -> u64 { - self.current_size + RETAINED_BATCH_OVERHEAD as u64 - } - pub fn materialize_batch_and_maybe_update_state(&mut self) -> (bool, RetainedMessageBatch) { let batch_base_offset = self.base_offset; let batch_last_offset_delta = (self.current_offset - self.base_offset) as u32; let batch_max_timestamp = self.messages.last().unwrap().timestamp; let split_point = std::cmp::min(self.capacity as usize, self.messages.len()); let (batch, remainder) = self.messages.as_slice().split_at(split_point); - let mut bytes = BytesMut::with_capacity(self.current_size as usize); + let mut bytes = BytesMut::with_capacity(self.current_size.as_bytes_u64() as usize); for message in batch { message.extend(&mut bytes); } @@ -86,8 +85,8 @@ impl BatchAccumulator { self.base_offset = remainder.first().unwrap().offset; self.current_size = remainder .iter() - .map(|msg| msg.get_size_bytes() as u64) - .sum(); + .map(|msg| msg.get_size_bytes()) + .sum::(); self.current_offset = remainder.last().unwrap().offset; self.current_timestamp = remainder.last().unwrap().timestamp; for message in remainder { @@ -96,7 +95,7 @@ impl BatchAccumulator { self.messages = remaining_messages; } let batch_payload = bytes.freeze(); - let batch_payload_len = batch_payload.len() as u32; + let batch_payload_len = IggyByteSize::from(batch_payload.len() as u64); let batch = RetainedMessageBatch::new( batch_base_offset, batch_last_offset_delta, @@ -107,3 +106,9 @@ impl BatchAccumulator { (has_remainder, batch) } } + +impl Sizeable for BatchAccumulator { + fn get_size_bytes(&self) -> IggyByteSize { + self.current_size + RETAINED_BATCH_OVERHEAD.into() + } +} diff --git a/server/src/streaming/batching/iterator.rs b/server/src/streaming/batching/iterator.rs index 63d57bde0..2176d4f36 100644 --- a/server/src/streaming/batching/iterator.rs +++ b/server/src/streaming/batching/iterator.rs @@ -10,7 +10,7 @@ pub trait IntoMessagesIterator { pub struct RetainedMessageBatchIterator<'a> { batch: &'a RetainedMessageBatch, - current_position: u32, + current_position: u64, } impl<'a> RetainedMessageBatchIterator<'a> { @@ -27,7 +27,7 @@ impl<'a> RetainedMessageBatchIterator<'a> { impl<'a> Iterator for RetainedMessageBatchIterator<'a> { type Item = RetainedMessage; fn next(&mut self) -> Option { - if self.current_position < self.batch.length { + if self.current_position < self.batch.length.as_bytes_u64() { let start_position = self.current_position as usize; let length = u32::from_le_bytes( self.batch.bytes[start_position..start_position + 4] @@ -38,7 +38,7 @@ impl<'a> Iterator for RetainedMessageBatchIterator<'a> { .batch .bytes .slice(start_position + 4..start_position + 4 + length as usize); - self.current_position += 4 + length; + self.current_position += 4 + length as u64; RetainedMessage::try_from_bytes(message).ok() } else { None diff --git a/server/src/streaming/batching/message_batch.rs b/server/src/streaming/batching/message_batch.rs index 5acacc0a7..581d87a41 100644 --- a/server/src/streaming/batching/message_batch.rs +++ b/server/src/streaming/batching/message_batch.rs @@ -2,16 +2,16 @@ use crate::streaming::batching::batch_filter::BatchItemizer; use crate::streaming::batching::iterator::IntoMessagesIterator; use crate::streaming::models::messages::RetainedMessage; use bytes::{BufMut, Bytes, BytesMut}; +use iggy::utils::{byte_size::IggyByteSize, sizeable::Sizeable}; -pub const RETAINED_BATCH_OVERHEAD: u32 = 8 + 8 + 4 + 4; +pub const RETAINED_BATCH_OVERHEAD: u64 = 8 + 8 + 4 + 4; -use crate::streaming::sizeable::Sizeable; #[derive(Debug, Clone)] pub struct RetainedMessageBatch { pub base_offset: u64, pub last_offset_delta: u32, pub max_timestamp: u64, - pub length: u32, + pub length: IggyByteSize, pub bytes: Bytes, } @@ -20,7 +20,7 @@ impl RetainedMessageBatch { base_offset: u64, last_offset_delta: u32, max_timestamp: u64, - length: u32, + length: IggyByteSize, bytes: Bytes, ) -> Self { RetainedMessageBatch { @@ -48,7 +48,7 @@ impl RetainedMessageBatch { pub fn extend(&self, bytes: &mut BytesMut) { bytes.put_u64_le(self.base_offset); - bytes.put_u32_le(self.length); + bytes.put_u32_le(self.length.as_bytes_u64() as u32); bytes.put_u32_le(self.last_offset_delta); bytes.put_u64_le(self.max_timestamp); bytes.put_slice(&self.bytes); @@ -77,7 +77,7 @@ where } impl Sizeable for RetainedMessageBatch { - fn get_size_bytes(&self) -> u32 { - RETAINED_BATCH_OVERHEAD + self.length + fn get_size_bytes(&self) -> IggyByteSize { + self.length + RETAINED_BATCH_OVERHEAD.into() } } diff --git a/server/src/streaming/cache/buffer.rs b/server/src/streaming/cache/buffer.rs index da48ddde5..593700bcd 100644 --- a/server/src/streaming/cache/buffer.rs +++ b/server/src/streaming/cache/buffer.rs @@ -1,23 +1,25 @@ +use crate::streaming::local_sizeable::LocalSizeable; + use super::memory_tracker::CacheMemoryTracker; -use crate::streaming::sizeable::Sizeable; use atone::Vc; +use iggy::utils::byte_size::IggyByteSize; use std::fmt::Debug; use std::ops::Index; use std::sync::Arc; #[derive(Debug)] -pub struct SmartCache { - current_size: u64, +pub struct SmartCache { + current_size: IggyByteSize, buffer: Vc, memory_tracker: Arc, } impl SmartCache where - T: Sizeable + Clone + Debug, + T: LocalSizeable + Clone + Debug, { pub fn new() -> Self { - let current_size = 0; + let current_size = IggyByteSize::default(); let buffer = Vc::new(); let memory_tracker = CacheMemoryTracker::get_instance().unwrap(); @@ -40,31 +42,34 @@ where /// removes the oldest elements until there's enough space for the new element. /// It's preferred to use `extend` instead of this method. pub fn push_safe(&mut self, element: T) { - let element_size = element.get_size_bytes() as u64; + let element_size = element.get_size_bytes(); while !self.memory_tracker.will_fit_into_cache(element_size) { if let Some(oldest_element) = self.buffer.pop_front() { - let oldest_size = oldest_element.get_size_bytes() as u64; - self.memory_tracker.decrement_used_memory(oldest_size); + let oldest_size = oldest_element.get_size_bytes(); + self.memory_tracker + .decrement_used_memory(oldest_size.as_bytes_u64()); self.current_size -= oldest_size; } } - self.memory_tracker.increment_used_memory(element_size); + self.memory_tracker + .increment_used_memory(element_size.as_bytes_u64()); self.current_size += element_size; self.buffer.push_back(element); } /// Removes the oldest elements until there's enough space for the new element. pub fn evict_by_size(&mut self, size_to_remove: u64) { - let mut removed_size = 0; + let mut removed_size = IggyByteSize::default(); while let Some(element) = self.buffer.pop_front() { if removed_size >= size_to_remove { break; } - let elem_size = element.get_size_bytes() as u64; - self.memory_tracker.decrement_used_memory(elem_size); + let elem_size = element.get_size_bytes(); + self.memory_tracker + .decrement_used_memory(elem_size.as_bytes_u64()); self.current_size -= elem_size; removed_size += elem_size; } @@ -72,15 +77,16 @@ where pub fn purge(&mut self) { self.buffer.clear(); - self.memory_tracker.decrement_used_memory(self.current_size); - self.current_size = 0; + self.memory_tracker + .decrement_used_memory(self.current_size.as_bytes_u64()); + self.current_size = IggyByteSize::default(); } pub fn is_empty(&self) -> bool { self.buffer.is_empty() } - pub fn current_size(&self) -> u64 { + pub fn current_size(&self) -> IggyByteSize { self.current_size } @@ -88,8 +94,9 @@ where /// even if it exceeds the memory limit. pub fn extend(&mut self, elements: impl IntoIterator) { let elements = elements.into_iter().inspect(|element| { - let element_size = element.get_size_bytes() as u64; - self.memory_tracker.increment_used_memory(element_size); + let element_size = element.get_size_bytes(); + self.memory_tracker + .increment_used_memory(element_size.as_bytes_u64()); self.current_size += element_size; }); self.buffer.extend(elements); @@ -97,8 +104,9 @@ where /// Always appends the element into the buffer, even if it exceeds the memory limit. pub fn append(&mut self, element: T) { - let element_size = element.get_size_bytes() as u64; - self.memory_tracker.increment_used_memory(element_size); + let element_size = element.get_size_bytes(); + self.memory_tracker + .increment_used_memory(element_size.as_bytes_u64()); self.current_size += element_size; self.buffer.push(element); } @@ -114,7 +122,7 @@ where impl Index for SmartCache where - T: Sizeable + Clone + Debug, + T: LocalSizeable + Clone + Debug, { type Output = T; @@ -123,7 +131,7 @@ where } } -impl Default for SmartCache { +impl Default for SmartCache { fn default() -> Self { Self::new() } diff --git a/server/src/streaming/cache/memory_tracker.rs b/server/src/streaming/cache/memory_tracker.rs index 0563a27bf..082a999e0 100644 --- a/server/src/streaming/cache/memory_tracker.rs +++ b/server/src/streaming/cache/memory_tracker.rs @@ -14,7 +14,7 @@ static mut INSTANCE: Option> = None; #[derive(Debug)] pub struct CacheMemoryTracker { used_memory_bytes: AtomicU64, - limit_bytes: u64, + limit_bytes: IggyByteSize, } type MessageSize = u64; @@ -48,16 +48,16 @@ impl CacheMemoryTracker { let free_memory_percentage = free_memory.as_bytes_u64() as f64 / total_memory_bytes.as_bytes_u64() as f64 * 100.0; let used_memory_bytes = AtomicU64::new(0); - let limit_bytes = IggyByteSize::from(limit.into()); + let limit_bytes = limit.into(); info!( "Cache memory tracker started, cache: {}, total memory: {}, free memory: {}, free memory percentage: {:.2}%", - limit_bytes, total_memory_bytes, free_memory, free_memory_percentage + limit_bytes.as_human_string(), total_memory_bytes.as_human_string(), free_memory, free_memory_percentage ); CacheMemoryTracker { used_memory_bytes, - limit_bytes: limit_bytes.as_bytes_u64(), + limit_bytes, } } @@ -93,11 +93,12 @@ impl CacheMemoryTracker { } } - pub fn usage_bytes(&self) -> u64 { - self.used_memory_bytes.load(Ordering::SeqCst) + pub fn usage_bytes(&self) -> IggyByteSize { + IggyByteSize::from(self.used_memory_bytes.load(Ordering::SeqCst)) } - pub fn will_fit_into_cache(&self, requested_size: u64) -> bool { - self.used_memory_bytes.load(Ordering::SeqCst) + requested_size <= self.limit_bytes + pub fn will_fit_into_cache(&self, requested_size: IggyByteSize) -> bool { + IggyByteSize::from(self.used_memory_bytes.load(Ordering::SeqCst)) + requested_size + <= self.limit_bytes } } diff --git a/server/src/streaming/local_sizeable.rs b/server/src/streaming/local_sizeable.rs new file mode 100644 index 000000000..9d121cb39 --- /dev/null +++ b/server/src/streaming/local_sizeable.rs @@ -0,0 +1,7 @@ +use iggy::utils::byte_size::IggyByteSize; + +/// Trait for types that return their size in bytes. +/// repeated from the sdk because of the coherence rule, see messages.rs +pub trait LocalSizeable { + fn get_size_bytes(&self) -> IggyByteSize; +} diff --git a/server/src/streaming/mod.rs b/server/src/streaming/mod.rs index eb8709a44..e90f354c3 100644 --- a/server/src/streaming/mod.rs +++ b/server/src/streaming/mod.rs @@ -3,6 +3,7 @@ pub mod cache; pub mod clients; mod deduplication; pub mod diagnostics; +pub mod local_sizeable; pub mod models; pub mod partitions; pub mod persistence; @@ -10,7 +11,6 @@ pub mod personal_access_tokens; pub mod polling_consumer; pub mod segments; pub mod session; -pub mod sizeable; pub mod storage; pub mod streams; pub mod systems; diff --git a/server/src/streaming/models/messages.rs b/server/src/streaming/models/messages.rs index 105a0ee49..2d85151ab 100644 --- a/server/src/streaming/models/messages.rs +++ b/server/src/streaming/models/messages.rs @@ -1,15 +1,18 @@ -use crate::streaming::sizeable::Sizeable; use bytes::{BufMut, Bytes, BytesMut}; use iggy::bytes_serializable::BytesSerializable; use iggy::error::IggyError; use iggy::models::messages::PolledMessage; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::checksum; +use iggy::utils::sizeable::Sizeable; use iggy::{messages::send_messages::Message, models::messages::MessageState}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; +use crate::streaming::local_sizeable::LocalSizeable; + // It's the same as PolledMessages from Iggy models, but with the Arc instead of Message. #[derive(Debug, Serialize, Deserialize)] pub struct PolledMessages { @@ -39,7 +42,7 @@ impl RetainedMessage { id: self.id, checksum: self.checksum, headers, - length: self.payload.len() as u32, + length: IggyByteSize::from(self.payload.len() as u64), payload: self.payload.clone(), }; Ok(message) @@ -69,7 +72,7 @@ impl RetainedMessage { let message_state = self.message_state; let headers = &self.headers; - bytes.put_u32_le(length); + bytes.put_u32_le(length.as_bytes_u64() as u32); bytes.put_u64_le(offset); bytes.put_u8(message_state.as_code()); bytes.put_u64_le(timestamp); @@ -113,26 +116,20 @@ impl RetainedMessage { } impl Sizeable for RetainedMessage { - fn get_size_bytes(&self) -> u32 { - let headers_len = self - .headers - .as_ref() - .map(|h| 4 + h.len() as u32) - .unwrap_or(4); - 16 + 8 + 8 + 4 + 1 + headers_len + self.payload.len() as u32 + fn get_size_bytes(&self) -> IggyByteSize { + let headers_len = self.headers.as_ref().map(|h| 4 + h.len()).unwrap_or(4); + let size = 16 + 8 + 8 + 4 + 1 + headers_len + self.payload.len(); + IggyByteSize::from(size as u64) } } -impl Sizeable for T +impl LocalSizeable for T where T: Deref, { - fn get_size_bytes(&self) -> u32 { - let headers_len = self - .headers - .as_ref() - .map(|h| 4 + h.len() as u32) - .unwrap_or(4); - 16 + 8 + 8 + 4 + 1 + headers_len + self.payload.len() as u32 + fn get_size_bytes(&self) -> IggyByteSize { + let headers_len = self.headers.as_ref().map(|h| 4 + h.len()).unwrap_or(4); + let size = 16 + 8 + 8 + 4 + 1 + headers_len + self.payload.len(); + IggyByteSize::from(size as u64) } } diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 42c5e011d..389e8c107 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -294,7 +294,7 @@ impl Partition { let mut remaining_size = size_bytes; let mut batches = Vec::new(); for segment in self.segments.iter().rev() { - let segment_size_bytes = segment.size_bytes as u64; + let segment_size_bytes = segment.size_bytes.as_bytes_u64(); if segment_size_bytes == 0 { break; } @@ -392,7 +392,7 @@ impl Partition { } let batch_size = appendable_batch_info.batch_size - + (POLLED_MESSAGE_METADATA * messages.len() as u32) as u64; + + ((POLLED_MESSAGE_METADATA * messages.len() as u32) as u64).into(); let base_offset = if !self.should_increment_offset { 0 } else { @@ -534,7 +534,9 @@ impl Partition { #[cfg(test)] mod tests { + use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; + use iggy::utils::sizeable::Sizeable; use std::sync::atomic::{AtomicU32, AtomicU64}; use super::*; @@ -548,7 +550,10 @@ mod tests { let messages = create_messages(); let messages_count = messages.len() as u32; let appendable_batch_info = AppendableBatchInfo { - batch_size: messages.iter().map(|m| m.get_size_bytes() as u64).sum(), + batch_size: messages + .iter() + .map(|m| m.get_size_bytes()) + .sum::(), partition_id: partition.partition_id, }; partition @@ -570,7 +575,10 @@ mod tests { let messages_count = messages.len() as u32; let unique_messages_count = 3; let appendable_batch_info = AppendableBatchInfo { - batch_size: messages.iter().map(|m| m.get_size_bytes() as u64).sum(), + batch_size: messages + .iter() + .map(|m| m.get_size_bytes()) + .sum::(), partition_id: partition.partition_id, }; partition diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index 32626356c..e9fabd764 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -7,8 +7,10 @@ use crate::streaming::segments::segment::Segment; use crate::streaming::storage::SystemStorage; use dashmap::DashMap; use iggy::consumer::ConsumerKind; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::duration::IggyDuration; use iggy::utils::expiry::IggyExpiry; +use iggy::utils::sizeable::Sizeable; use iggy::utils::timestamp::IggyTimestamp; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; @@ -166,9 +168,11 @@ impl Partition { partition } +} - pub fn get_size_bytes(&self) -> u64 { - self.size_bytes.load(Ordering::SeqCst) +impl Sizeable for Partition { + fn get_size_bytes(&self) -> IggyByteSize { + IggyByteSize::from(self.size_bytes.load(Ordering::SeqCst)) } } diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index e519b77c8..7f586f863 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -4,8 +4,9 @@ use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_B use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; -use crate::streaming::sizeable::Sizeable; use iggy::error::IggyError; +use iggy::utils::byte_size::IggyByteSize; +use iggy::utils::sizeable::Sizeable; use std::sync::atomic::Ordering; use std::sync::Arc; use tracing::{info, trace, warn}; @@ -184,7 +185,7 @@ impl Segment { pub async fn append_batch( &mut self, - batch_size: u64, + batch_size: IggyByteSize, messages_count: u32, batch: &[Arc], ) -> Result<(), IggyError> { @@ -203,7 +204,8 @@ impl Segment { let curr_offset = batch_accumulator.batch_max_offset(); self.current_offset = curr_offset; - self.size_bytes += batch_size as u32; + self.size_bytes += batch_size; + let batch_size = batch_size.as_bytes_u64(); self.size_of_parent_stream .fetch_add(batch_size, Ordering::AcqRel); self.size_of_parent_topic @@ -266,14 +268,14 @@ impl Segment { } let saved_bytes = storage.save_batches(self, batch).await?; storage.save_index(&self.index_path, index).await?; - self.last_index_position += batch_size; - self.size_bytes += RETAINED_BATCH_OVERHEAD; + self.last_index_position += batch_size.as_bytes_u64() as u32; + self.size_bytes += IggyByteSize::from(RETAINED_BATCH_OVERHEAD); self.size_of_parent_stream - .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); + .fetch_add(RETAINED_BATCH_OVERHEAD, Ordering::AcqRel); self.size_of_parent_topic - .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); + .fetch_add(RETAINED_BATCH_OVERHEAD, Ordering::AcqRel); self.size_of_parent_partition - .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); + .fetch_add(RETAINED_BATCH_OVERHEAD, Ordering::AcqRel); trace!( "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index bcdddd456..0cf5cb16c 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -2,6 +2,7 @@ use crate::configs::system::SystemConfig; use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::segments::index::Index; use crate::streaming::storage::SystemStorage; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; use iggy::utils::timestamp::IggyTimestamp; use std::sync::atomic::AtomicU64; @@ -9,7 +10,7 @@ use std::sync::Arc; pub const LOG_EXTENSION: &str = "log"; pub const INDEX_EXTENSION: &str = "index"; -pub const MAX_SIZE_BYTES: u32 = 1000 * 1000 * 1000; +pub const MAX_SIZE_BYTES: u64 = 1000 * 1000 * 1000; #[derive(Debug)] pub struct Segment { @@ -21,9 +22,9 @@ pub struct Segment { pub current_offset: u64, pub index_path: String, pub log_path: String, - pub size_bytes: u32, + pub size_bytes: IggyByteSize, pub last_index_position: u32, - pub max_size_bytes: u32, + pub max_size_bytes: IggyByteSize, pub size_of_parent_stream: Arc, pub size_of_parent_topic: Arc, pub size_of_parent_partition: Arc, @@ -66,9 +67,9 @@ impl Segment { current_offset: start_offset, log_path: Self::get_log_path(&path), index_path: Self::get_index_path(&path), - size_bytes: 0, + size_bytes: IggyByteSize::from(0), last_index_position: 0, - max_size_bytes: config.segment.size.as_bytes_u64() as u32, + max_size_bytes: config.segment.size, message_expiry: match message_expiry { IggyExpiry::ServerDefault => config.segment.message_expiry, _ => message_expiry, diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 3cfd44e1b..e0180c8ae 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -4,7 +4,6 @@ use crate::streaming::models::messages::RetainedMessage; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; -use crate::streaming::sizeable::Sizeable; use crate::streaming::storage::SegmentStorage; use crate::streaming::utils::file; use crate::streaming::utils::head_tail_buf::HeadTailBuffer; @@ -14,6 +13,7 @@ use bytes::{BufMut, BytesMut}; use iggy::error::IggyError; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::checksum; +use iggy::utils::sizeable::Sizeable; use std::io::SeekFrom; use std::path::Path; use std::sync::atomic::Ordering; @@ -48,7 +48,7 @@ impl SegmentStorage for FileSegmentStorage { ); let log_file = file::open(&segment.log_path).await?; let file_size = log_file.metadata().await.unwrap().len() as u64; - segment.size_bytes = file_size as _; + segment.size_bytes = IggyByteSize::from(file_size); segment.last_index_position = file_size as _; if segment.config.segment.cache_indexes { @@ -145,15 +145,16 @@ impl SegmentStorage for FileSegmentStorage { ); self.persister.delete(&segment.log_path).await?; self.persister.delete(&segment.index_path).await?; + let segment_size_bytes = segment.size_bytes.as_bytes_u64(); segment .size_of_parent_stream - .fetch_sub(segment.size_bytes as u64, Ordering::SeqCst); + .fetch_sub(segment_size_bytes, Ordering::SeqCst); segment .size_of_parent_topic - .fetch_sub(segment.size_bytes as u64, Ordering::SeqCst); + .fetch_sub(segment_size_bytes, Ordering::SeqCst); segment .size_of_parent_partition - .fetch_sub(segment.size_bytes as u64, Ordering::SeqCst); + .fetch_sub(segment_size_bytes, Ordering::SeqCst); segment .messages_count_of_parent_stream .fetch_sub(segment_count_of_messages, Ordering::SeqCst); @@ -191,18 +192,18 @@ impl SegmentStorage for FileSegmentStorage { size_bytes: u64, ) -> Result, IggyError> { let mut batches = Vec::new(); - let mut total_size_bytes = 0; + let mut total_size_bytes = IggyByteSize::default(); load_messages_by_size(segment, size_bytes, |batch| { - total_size_bytes += batch.get_size_bytes() as u64; + total_size_bytes += batch.get_size_bytes(); batches.push(batch); Ok(()) }) .await?; let messages_count = batches.len(); trace!( - "Loaded {} newest messages batches of total size {} bytes from disk.", + "Loaded {} newest messages batches of total size {} from disk.", messages_count, - total_size_bytes + total_size_bytes.as_human_string(), ); Ok(batches) } @@ -211,9 +212,9 @@ impl SegmentStorage for FileSegmentStorage { &self, segment: &Segment, batch: RetainedMessageBatch, - ) -> Result { + ) -> Result { let batch_size = batch.get_size_bytes(); - let mut bytes = BytesMut::with_capacity(batch_size as usize); + let mut bytes = BytesMut::with_capacity(batch_size.as_bytes_usize()); batch.extend(&mut bytes); if let Err(err) = self @@ -511,7 +512,7 @@ async fn load_batches_by_range( batch_base_offset, last_offset_delta, max_timestamp, - batch_length, + IggyByteSize::from(batch_length as u64), payload.freeze(), ); on_batch(batch)?; @@ -531,7 +532,7 @@ async fn load_messages_by_size( } let threshold = file_size.saturating_sub(size_bytes); - let mut accumulated_size: u64 = 0; + let mut accumulated_size = IggyByteSize::default(); let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); loop { @@ -564,10 +565,10 @@ async fn load_messages_by_size( batch_base_offset, last_offset_delta, max_timestamp, - batch_length, + IggyByteSize::from(batch_length as u64), payload.freeze(), ); - let message_size = batch.get_size_bytes() as u64; + let message_size = batch.get_size_bytes(); if accumulated_size >= threshold { on_batch(batch)?; } diff --git a/server/src/streaming/sizeable.rs b/server/src/streaming/sizeable.rs deleted file mode 100644 index 43114d737..000000000 --- a/server/src/streaming/sizeable.rs +++ /dev/null @@ -1,4 +0,0 @@ -/// Trait for types that return their size in bytes. -pub trait Sizeable { - fn get_size_bytes(&self) -> u32; -} diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index b8a39330e..32a7c4d36 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -16,6 +16,7 @@ use crate::streaming::topics::topic::Topic; use async_trait::async_trait; use iggy::consumer::ConsumerKind; use iggy::error::IggyError; +use iggy::utils::byte_size::IggyByteSize; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -74,7 +75,7 @@ pub trait SegmentStorage: Send + Sync { &self, segment: &Segment, batch: RetainedMessageBatch, - ) -> Result; + ) -> Result; async fn load_message_ids(&self, segment: &Segment) -> Result, IggyError>; async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>; async fn load_all_indexes(&self, segment: &Segment) -> Result, IggyError>; @@ -295,8 +296,8 @@ pub(crate) mod tests { &self, _segment: &Segment, _batch: RetainedMessageBatch, - ) -> Result { - Ok(0) + ) -> Result { + Ok(IggyByteSize::default()) } async fn load_message_ids(&self, _segment: &Segment) -> Result, IggyError> { diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs index 33bf2b930..f95ba3575 100644 --- a/server/src/streaming/systems/messages.rs +++ b/server/src/streaming/systems/messages.rs @@ -7,6 +7,8 @@ use iggy::messages::poll_messages::PollingStrategy; use iggy::messages::send_messages::Message; use iggy::messages::send_messages::Partitioning; use iggy::models::messages::{PolledMessage, PolledMessages}; +use iggy::utils::byte_size::IggyByteSize; +use iggy::utils::sizeable::Sizeable; use iggy::{error::IggyError, identifier::Identifier}; use tracing::{error, trace}; @@ -69,7 +71,7 @@ impl System { offset: message.offset, timestamp: message.timestamp, checksum: message.checksum, - length: payload.len() as u32, + length: IggyByteSize::from(payload.len() as u64), payload: Bytes::from(payload), headers: message.headers.clone(), }); @@ -100,7 +102,7 @@ impl System { topic.topic_id, )?; - let mut batch_size_bytes = 0; + let mut batch_size_bytes = IggyByteSize::default(); let mut messages = messages; if let Some(encryptor) = &self.encryptor { for message in messages.iter_mut() { @@ -109,7 +111,7 @@ impl System { Ok(payload) => { message.payload = Bytes::from(payload); message.length = message.payload.len() as u32; - batch_size_bytes += message.get_size_bytes() as u64; + batch_size_bytes += message.get_size_bytes(); } Err(error) => { error!("Cannot encrypt the message. Error: {}", error); @@ -118,7 +120,10 @@ impl System { } } } else { - batch_size_bytes = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); + batch_size_bytes = messages + .iter() + .map(|msg| msg.get_size_bytes()) + .sum::(); } if let Some(memory_tracker) = CacheMemoryTracker::get_instance() { diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index 72df3f4c9..5f13e76f7 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -9,6 +9,7 @@ use crate::streaming::storage::SystemStorage; use crate::streaming::streams::stream::Stream; use crate::streaming::users::permissioner::Permissioner; use iggy::error::IggyError; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor}; use std::collections::HashMap; use std::path::Path; @@ -260,7 +261,7 @@ impl System { } } - pub async fn clean_cache(&self, size_to_clean: u64) { + pub async fn clean_cache(&self, size_to_clean: IggyByteSize) { for stream in self.streams.values() { for topic in stream.get_topics() { for partition in topic.get_partitions().into_iter() { @@ -268,9 +269,9 @@ impl System { let memory_tracker = CacheMemoryTracker::get_instance().unwrap(); let mut partition_guard = partition.write().await; let cache = &mut partition_guard.cache.as_mut().unwrap(); - let size_to_remove = (cache.current_size() as f64 - / memory_tracker.usage_bytes() as f64 - * size_to_clean as f64) + let size_to_remove = (cache.current_size().as_bytes_u64() as f64 + / memory_tracker.usage_bytes().as_bytes_u64() as f64 + * size_to_clean.as_bytes_u64() as f64) .ceil() as u64; cache.evict_by_size(size_to_remove * CACHE_OVER_EVICTION_FACTOR); }); diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 7187d4cd1..8036135c8 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -1,7 +1,6 @@ use crate::streaming::batching::appendable_batch_info::AppendableBatchInfo; use crate::streaming::models::messages::RetainedMessage; use crate::streaming::polling_consumer::PollingConsumer; -use crate::streaming::sizeable::Sizeable; use crate::streaming::topics::topic::Topic; use crate::streaming::utils::file::folder_size; use crate::streaming::utils::hash; @@ -10,7 +9,9 @@ use iggy::locking::IggySharedMutFn; use iggy::messages::poll_messages::{PollingKind, PollingStrategy}; use iggy::messages::send_messages::{Message, Partitioning, PartitioningKind}; use iggy::models::messages::PolledMessages; +use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; +use iggy::utils::sizeable::Sizeable; use iggy::utils::timestamp::IggyTimestamp; use std::collections::HashMap; use std::sync::atomic::Ordering; @@ -70,7 +71,7 @@ impl Topic { pub async fn append_messages( &self, - batch_size: u64, + batch_size: IggyByteSize, partitioning: Partitioning, messages: Vec, ) -> Result<(), IggyError> { @@ -204,14 +205,18 @@ impl Topic { // Fetch data from disk proportional to the partition size // eg. 12 partitions, each has 300 MB, cache limit is 500 MB, so there is total 3600 MB of data on SSD. // 500 MB * (300 / 3600 MB) ~= 41.6 MB to load from cache (assuming all partitions have the same size on disk) - let size_to_fetch_from_disk = (cache_limit_bytes as f64 - * (partition_size_bytes as f64 / total_size_on_disk_bytes as f64)) + let size_to_fetch_from_disk = (cache_limit_bytes.as_bytes_u64() as f64 + * (partition_size_bytes.as_bytes_u64() as f64 + / total_size_on_disk_bytes.as_bytes_u64() as f64)) as u64; let messages = partition .get_newest_messages_by_size(size_to_fetch_from_disk as u64) .await?; - let sum: u64 = messages.iter().map(|m| m.get_size_bytes() as u64).sum(); + let sum = messages + .iter() + .map(|m| m.get_size_bytes()) + .sum::(); if !Self::cache_integrity_check(&messages) { warn!( "Cache integrity check failed for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}. Emptying cache...", @@ -306,7 +311,10 @@ mod tests { for entity_id in 1..=messages_count { let messages = vec![Message::new(Some(entity_id as u128), Bytes::new(), None)]; - let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); + let batch_size = messages + .iter() + .map(|msg| msg.get_size_bytes()) + .sum::(); topic .append_messages(batch_size, partitioning.clone(), messages) .await @@ -335,7 +343,10 @@ mod tests { for entity_id in 1..=messages_count { let partitioning = Partitioning::messages_key_u32(entity_id); let messages = vec![Message::new(Some(entity_id as u128), Bytes::new(), None)]; - let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(); + let batch_size = messages + .iter() + .map(|msg| msg.get_size_bytes()) + .sum::(); topic .append_messages(batch_size, partitioning, messages) .await diff --git a/server/src/streaming/topics/topic.rs b/server/src/streaming/topics/topic.rs index bec156748..4f758a8b4 100644 --- a/server/src/streaming/topics/topic.rs +++ b/server/src/streaming/topics/topic.rs @@ -10,6 +10,7 @@ use iggy::error::IggyError; use iggy::locking::IggySharedMut; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; +use iggy::utils::sizeable::Sizeable; use iggy::utils::timestamp::IggyTimestamp; use iggy::utils::topic_size::MaxTopicSize; use std::collections::HashMap; @@ -153,10 +154,6 @@ impl Topic { matches!(self.max_topic_size, MaxTopicSize::Unlimited) } - pub fn get_size(&self) -> IggyByteSize { - IggyByteSize::from(self.size_bytes.load(Ordering::SeqCst)) - } - pub fn get_partitions(&self) -> Vec> { self.partitions.values().cloned().collect() } @@ -236,6 +233,12 @@ impl Topic { } } +impl Sizeable for Topic { + fn get_size_bytes(&self) -> IggyByteSize { + IggyByteSize::from(self.size_bytes.load(Ordering::SeqCst)) + } +} + impl fmt::Display for Topic { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "ID: {}, ", self.topic_id)?; diff --git a/server/src/streaming/utils/file.rs b/server/src/streaming/utils/file.rs index 2bc4286fd..9c015fbf7 100644 --- a/server/src/streaming/utils/file.rs +++ b/server/src/streaming/utils/file.rs @@ -1,4 +1,5 @@ use atone::Vc; +use iggy::utils::byte_size::IggyByteSize; use std::path::{Path, PathBuf}; use tokio::fs::{read_dir, remove_file, File, OpenOptions}; @@ -30,7 +31,7 @@ pub async fn exists(path: &str) -> Result { tokio::fs::try_exists(path).await } -pub async fn folder_size

(path: P) -> std::io::Result +pub async fn folder_size

(path: P) -> std::io::Result where P: Into + AsRef, { @@ -51,5 +52,5 @@ where } } } - Ok(total_size) + Ok(total_size.into()) }