Skip to content

Commit

Permalink
Feature/selium log documentation (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
adenh93 authored May 1, 2024
1 parent 75fb1aa commit 9795380
Show file tree
Hide file tree
Showing 24 changed files with 554 additions and 98 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
strategy:
max-parallel: 1
matrix:
name: [standard, protocol, client, server, tools]
name: [standard, protocol, client, server, tools, log]
include:
- name: standard
display: Standard
Expand All @@ -28,7 +28,10 @@ jobs:
- name: tools
display: Tools
repo: selium-tools

- name: log
display: Log
repo: selium-log

runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions log/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
[package]
name = "selium-log"
version = "0.1.0"
description = """
selium-log is an open-source implementation of a log-based message queue.
"""
edition.workspace = true
authors.workspace = true
license.workspace = true
Expand Down Expand Up @@ -31,6 +34,7 @@ thiserror = "1.0"
tempfile = "3.10"
criterion = { version = "0.3", features = ["async_tokio"] }
fake = "2.9"
anyhow = "1.0"

[[bench]]
name = "read_benchmark"
Expand Down
8 changes: 3 additions & 5 deletions log/benches/read_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use selium_log::{
config::{FlushPolicy, LogConfig},
message::{Headers, Message},
message::Message,
MessageLog,
};
use std::{path::Path, sync::Arc, time::Duration};
Expand All @@ -22,14 +22,12 @@ fn get_log_config(path: impl AsRef<Path>) -> LogConfig {

async fn write_records(path: impl AsRef<Path>) {
let config = get_log_config(path);
let mut log = MessageLog::open(Arc::new(config)).await.unwrap();
let log = MessageLog::open(Arc::new(config)).await.unwrap();

for i in 0..NUM_OF_MESSAGES {
let message = format!("Hello, world! {i}");
let batch = Bytes::from(message);
let headers = Headers::new(batch.len(), 1, 1);
let message = Message::new(headers, &batch);

let message = Message::single(&batch, 1);
log.write(message).await.unwrap();
}

Expand Down
8 changes: 3 additions & 5 deletions log/benches/write_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use selium_log::{
config::{FlushPolicy, LogConfig},
message::{Headers, Message},
message::Message,
MessageLog,
};
use std::{sync::Arc, time::Duration};
Expand All @@ -24,13 +24,11 @@ fn get_log_config() -> LogConfig {

async fn log_task() {
let config = get_log_config();
let mut log = MessageLog::open(Arc::new(config)).await.unwrap();
let log = MessageLog::open(Arc::new(config)).await.unwrap();

for _ in 0..NUM_OF_MESSAGES {
let batch = Bytes::copy_from_slice(&vec![1; 32]);
let headers = Headers::new(batch.len(), 1, 1);
let message = Message::new(headers, &batch);

let message = Message::single(&batch, 1);
log.write(message).await.unwrap();
}

Expand Down
23 changes: 23 additions & 0 deletions log/examples/basic_usage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use selium_log::{config::LogConfig, message::Message, MessageLog};
use std::sync::Arc;

const MESSAGE_VERSION: u32 = 1;

#[tokio::main]
async fn main() -> Result<()> {
let config = LogConfig::from_path("path/to/segments/dir");
let log = MessageLog::open(Arc::new(config)).await?;
let message = Message::single(b"Hello, world!", MESSAGE_VERSION);

log.write(message).await?;
log.flush().await?;
let slice = log.read_slice(0, None).await?;

if let Some(mut iter) = slice.messages() {
let next = iter.next().await?;
println!("{next:?}")
}

Ok(())
}
12 changes: 12 additions & 0 deletions log/src/config/flush_policy.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use std::time::Duration;

/// Defines the flushing policy for a message log.
/// Flushing is triggered by a defined interval, and optionally, a write threshold.
#[derive(Debug, Clone)]
pub struct FlushPolicy {
/// An optional write-count threshold. When the threshold is exceeded, a flush will be triggered.
pub(crate) number_of_writes: Option<u64>,
/// The flushing interval for the log. Triggers a flush when the interval elapses.
pub(crate) interval: Duration,
}

impl Default for FlushPolicy {
/// By default, no write-count threshold is defined, and is left as a user-provided optimization
/// depending on the throughput and durability requirements of the log.
fn default() -> Self {
Self {
number_of_writes: None,
Expand All @@ -16,20 +22,26 @@ impl Default for FlushPolicy {
}

impl FlushPolicy {
/// Creates a new FlushPolicy with suitable defaults.
pub fn new() -> Self {
Self::default()
}

/// Opts-in to flushing based on a write-count threshold, and specifies that the flush should be
/// triggered on every write.
pub fn every_write(mut self) -> Self {
self.number_of_writes = Some(1);
self
}

/// Opts-in to flushing based on a write-count threshhold, and specifies that the flush should be
/// triggered after the provided number of writes.
pub fn number_of_writes(mut self, num: u64) -> Self {
self.number_of_writes = Some(num);
self
}

/// Overrides the default flushing interval.
pub fn interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
Expand Down
23 changes: 23 additions & 0 deletions log/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Configuration settings for an individual message log.
mod flush_policy;

pub use flush_policy::FlushPolicy;
Expand All @@ -7,22 +9,39 @@ use std::{
time::Duration,
};

/// The default maximum entries for log segment.
pub const MAX_INDEX_ENTRIES_DEFAULT: u32 = 100_000;

/// The default log retention period.
pub const RETENTION_PERIOD_DEFAULT: Duration = Duration::from_secs(60 * 60 * 24 * 7);

/// The default Cleaner task interval.
pub const CLEANER_INTERVAL_DEFAULT: Duration = Duration::from_secs(60 * 5);

pub type SharedLogConfig = Arc<LogConfig>;

/// The LogConfig struct groups preferences for the log's behaviour, such as settings related to
/// message retention, frequency of flushes, etc, and is shared across each component of the log.
#[derive(Debug, Clone)]
pub struct LogConfig {
/// Indicates the maximum amount of entries a segment index will retain before a new segment
/// is created.
pub max_index_entries: u32,
/// The path to the directory containing the segment index/data files.
pub segments_path: PathBuf,
/// The retention period for each individual segment. Determines when a segment is stale/expired,
/// and can be cleaned up by the cleaner task.
pub retention_period: Duration,
/// The desired interval to poll the cleaner task to discover stale/expired segments.
pub cleaner_interval: Duration,
/// The desired flush policy for the log. The flush policy dictates the frequency of flushing based
/// on the number of writes, and/or a defined interval.
pub flush_policy: FlushPolicy,
}

impl LogConfig {
/// Creates a new LogConfig builder with the provided segments path. All remaining fields are assigned
/// reasonable defaults until overrided.
pub fn from_path(path: impl AsRef<Path>) -> Self {
Self {
max_index_entries: MAX_INDEX_ENTRIES_DEFAULT,
Expand All @@ -33,21 +52,25 @@ impl LogConfig {
}
}

/// Overrides the default `max_index_entries` field.
pub fn max_index_entries(mut self, max_entries: u32) -> Self {
self.max_index_entries = max_entries;
self
}

/// Overrides the default `retention_period` field.
pub fn retention_period(mut self, period: Duration) -> Self {
self.retention_period = period;
self
}

/// Overrides the default `cleaner_interval` field.
pub fn cleaner_interval(mut self, interval: Duration) -> Self {
self.cleaner_interval = interval;
self
}

/// Overrides the default `flush_policy` field.
pub fn flush_policy(mut self, policy: FlushPolicy) -> Self {
self.flush_policy = policy;
self
Expand Down
24 changes: 18 additions & 6 deletions log/src/data/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use tokio::{
io::{AsyncReadExt, BufReader},
};

/// An iterator over a [Data](crate::data::Data) file.
///
/// The LogIterator acts as an active reader over the log, pulling messages from the log
/// and decoding them, while maintaining a cursor to ensure that superflous reads are not performed
/// when a read limit is provided via `end_position`.
#[derive(Debug)]
pub struct LogIterator {
reader: BufReader<File>,
Expand All @@ -15,6 +20,7 @@ pub struct LogIterator {
}

impl LogIterator {
/// Constructs a new LogIterator instance.
pub fn new(reader: BufReader<File>, cursor: u64, end_position: u64) -> Self {
Self {
reader,
Expand All @@ -23,30 +29,36 @@ impl LogIterator {
}
}

pub async fn next(&mut self) -> Option<Result<Message>> {
/// Attempts to decode and retrieve the next message from the `reader`.
/// Returns [Option::None] if there are no more messages to decode.
///
/// # Errors
/// Returns std::io::ErrorKind::UnexpectedEof if the an unexpected end-of-file
/// is encountered due to a partially committed or corrupted message.
pub async fn next(&mut self) -> Result<Option<Message>> {
if self.cursor >= self.end_position {
return None;
return Ok(None);
}

let mut headers = vec![0; HEADERS_SIZE];
self.reader.read_exact(&mut headers).await.ok()?;
self.reader.read_exact(&mut headers).await?;

let headers = Headers::decode(&headers);
let remainder_len = headers.length() as usize - HEADERS_SIZE;
let combined_len = HEADERS_SIZE + remainder_len;
let records_len = remainder_len - CRC_SIZE;

let mut remainder = vec![0; remainder_len];
self.reader.read_exact(&mut remainder).await.ok()?;
self.reader.read_exact(&mut remainder).await?;

let records = &remainder[..records_len];
let mut crc = [0; CRC_SIZE];
crc.copy_from_slice(&remainder[records_len..]);
let crc = u32::from_be_bytes(crc);
let message = Message::decode(headers, records, crc);
let message = Message::new(headers, records, crc);

self.cursor += combined_len as u64;

Some(Ok(message))
Ok(Some(message))
}
}
Loading

0 comments on commit 9795380

Please sign in to comment.