Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time slice #20

Merged
merged 31 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
738dc07
Set up WIP gossip and supporting code
ThetaSinner Nov 21, 2024
c2e641a
Extend time handling logic and tests
ThetaSinner Nov 25, 2024
4dc5970
Integrate changes from main and fix tests
ThetaSinner Nov 25, 2024
c531932
More testing
ThetaSinner Nov 25, 2024
74f60d9
Write documentation
ThetaSinner Nov 25, 2024
e27dfbd
Strip out gossip to focus on time slices
ThetaSinner Nov 25, 2024
70e2e3d
Tidy
ThetaSinner Nov 25, 2024
d6e7271
Fix comment
ThetaSinner Nov 25, 2024
842a696
Lint
ThetaSinner Nov 25, 2024
d5f13d7
Update crates/dht/src/time.rs
ThetaSinner Nov 26, 2024
a835ddb
Update crates/dht/src/time.rs
ThetaSinner Nov 26, 2024
1d80690
Update crates/dht/src/time.rs
ThetaSinner Nov 26, 2024
c852e1a
Update crates/dht/src/time.rs
ThetaSinner Nov 26, 2024
ef2ed7a
Review changes
ThetaSinner Nov 26, 2024
9928919
Update crates/dht/src/time.rs
ThetaSinner Nov 27, 2024
c47d672
Update crates/dht/src/time.rs
ThetaSinner Nov 27, 2024
0f4a930
Update crates/dht/src/time.rs
ThetaSinner Nov 27, 2024
4bc2cbc
Return an error when the factor is invalid, rather than using a panic
ThetaSinner Nov 27, 2024
8e35c84
Rename from_store to try_from_store
ThetaSinner Nov 27, 2024
02ef349
Fix adding multiple new full slices
ThetaSinner Nov 27, 2024
5d4583e
Update crates/dht/src/time.rs
ThetaSinner Nov 27, 2024
47763b5
Add note about factor size limits and selection
ThetaSinner Nov 27, 2024
3534934
Improve code breakdown so the split between time layout and hashes is…
ThetaSinner Nov 27, 2024
4729b85
Remove op flags, not relevant to this PR
ThetaSinner Nov 27, 2024
b3e33b8
Extract helper in tests for recent and full slice time
ThetaSinner Nov 27, 2024
8c9511b
Update crates/dht/src/time.rs
ThetaSinner Nov 27, 2024
ff1f69e
Test improvements
ThetaSinner Nov 27, 2024
eb31dee
Improve docs
ThetaSinner Nov 27, 2024
abc6a3a
Add missing backtick
ThetaSinner Nov 27, 2024
2f60930
Update lock after rebase
ThetaSinner Nov 28, 2024
1b1953f
Format after rebase
ThetaSinner Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
members = [
"crates/api",
"crates/bootstrap_srv",
"crates/dht",
"crates/memory",
]
resolver = "2"

[workspace.dependencies]
# Self dependencies for workspace crates to depend on each other.
# For example, most crates will depend on the api crate.
kitsune2_api = { version = "0.0.1-alpha", path = "crates/api" }

# debugging is far easier when you can see short byte arrays
# as base64 instead of decimal u8s.
base64 = "0.22.1"
Expand All @@ -26,6 +32,8 @@ serde = { version = "1.0.215", features = ["derive"] }
# kitsune2 agent info is serialized as json to improve debugability of
# bootstrapping. So, we need a json library.
serde_json = "1.0.132"
# Simplify writing async code
futures = "0.3"
# bootstrap_srv uses tempfiles as virtual memory for storage instead of RAM.
tempfile = "3.14.0"
# kitsune2 internally uses a mix of std::io::Error and thiserror derivation.
Expand All @@ -39,3 +47,4 @@ tiny_http = "0.12.0"
# --- dev-dependencies ---
tokio = "1.41.1"
ureq = "2.10.1"
kitsune2_memory = { path = "crates/memory" }
1 change: 1 addition & 0 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bytes = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
2 changes: 2 additions & 0 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ mod timestamp;
pub use timestamp::*;

pub mod agent;
pub mod op_store;
pub use op_store::*;
83 changes: 83 additions & 0 deletions crates/api/src/op_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Kitsune2 op store types.

use crate::{K2Result, OpId, Timestamp};
use futures::future::BoxFuture;
use std::cmp::Ordering;
use std::sync::Arc;

/// An op with metadata.
///
/// This is the basic unit of data in the kitsune2 system.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct MetaOp {
/// The id of the op.
pub op_id: OpId,

/// The creation timestamp of the op.
///
/// This must be the same for everyone who sees this op.
///
/// The host must reject the op if the timestamp does not agree with any timestamps inside the
/// op data.
pub timestamp: Timestamp,

/// The actual op data.
pub op_data: Vec<u8>,
}

impl Ord for MetaOp {
fn cmp(&self, other: &Self) -> Ordering {
(&self.timestamp, &self.op_id).cmp(&(&other.timestamp, &other.op_id))
}
}

impl PartialOrd for MetaOp {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// The API that a kitsune2 host must implement to provide data persistence for kitsune2.
pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
/// Process incoming ops.
///
/// Pass the incoming ops to the host for processing. The host is expected to store the ops
/// if it is able to process them.
fn process_incoming_ops(
&self,
op_list: Vec<MetaOp>,
) -> BoxFuture<'_, K2Result<()>>;

/// Retrieve a batch of ops from the host by time range.
///
/// This must be the timestamp of the op, not the time that we saw the op or chose to store it.
/// The returned ops must be ordered by timestamp, ascending.
fn retrieve_op_hashes_in_time_slice(
&self,
start: Timestamp,
end: Timestamp,
) -> BoxFuture<'_, K2Result<Vec<OpId>>>;

/// Store the combined hash of a time slice.
fn store_slice_hash(
&self,
slice_id: u64,
slice_hash: Vec<u8>,
) -> BoxFuture<'_, K2Result<()>>;

/// Count the number of stored time slices.
fn slice_hash_count(&self) -> BoxFuture<'_, K2Result<u64>>;

/// Retrieve the combined hash of a time slice.
///
/// If the slice is not found, return None.
/// If the time slice is present, then it must be identical to what was stored by Kitsune2
/// using [Self::store_slice_hash].
fn retrieve_slice_hash(
&self,
slice_id: u64,
) -> BoxFuture<'_, K2Result<Option<Vec<u8>>>>;
}

/// Trait-object version of kitsune2 op store.
pub type DynOpStore = Arc<dyn OpStore>;
30 changes: 30 additions & 0 deletions crates/api/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,36 @@ impl std::ops::Add<std::time::Duration> for Timestamp {
}
}

impl std::ops::AddAssign<std::time::Duration> for Timestamp {
fn add_assign(&mut self, rhs: std::time::Duration) {
self.0 += rhs.as_micros() as i64;
}
}

impl std::ops::Sub<std::time::Duration> for Timestamp {
type Output = Result<Timestamp, ()>;

fn sub(self, rhs: std::time::Duration) -> Self::Output {
if self.0 < rhs.as_micros() as i64 {
Err(())
} else {
Ok(Timestamp(self.0 - rhs.as_micros() as i64))
}
}
}

impl std::ops::Sub for Timestamp {
type Output = Result<std::time::Duration, ()>;

fn sub(self, rhs: Self) -> Self::Output {
if self.0 < rhs.0 {
Err(())
} else {
Ok(std::time::Duration::from_micros((self.0 - rhs.0) as u64))
}
}
}

impl From<std::time::SystemTime> for Timestamp {
fn from(t: std::time::SystemTime) -> Self {
Self(
Expand Down
20 changes: 20 additions & 0 deletions crates/dht/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "kitsune2_dht"
version = "0.0.1-alpha"
description = "The DHT model for kitsune2"
license = "Apache-2.0"
homepage = "https://github.com/holochain/kitsune2"
documentation = "https://docs.rs/kitsune2_api"
authors = ["Holochain Core Dev Team <[email protected]>"]
keywords = ["holochain", "holo", "p2p", "dht", "networking"]
categories = ["network-programming"]
edition = "2021"

[dependencies]
kitsune2_api = { workspace = true }
bytes = { workspace = true }

[dev-dependencies]
kitsune2_memory = { workspace = true }

tokio = { workspace = true, features = ["macros", "rt"] }
3 changes: 3 additions & 0 deletions crates/dht/src/constant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use std::time::Duration;

pub const UNIT_TIME: Duration = Duration::from_secs(15 * 60); // 15 minutes
5 changes: 5 additions & 0 deletions crates/dht/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod constant;
pub use constant::*;

pub mod time;
pub use time::*;
Loading