Skip to content

Commit

Permalink
feat: track version history in database (#409)
Browse files Browse the repository at this point in the history
* refactor: create CeramicService and make processing undelivered events a fn call

* feat: create a version table and update it when starting the daemon

This can be used in the future to prevent downgrades that go across incompatible schema/data migrations
  • Loading branch information
dav1do authored Jul 26, 2024
1 parent ed45718 commit f490f0a
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 24 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ serde_qs = "0.10.1"
serde_with = "2.1"
sha2 = { version = "0.10", default-features = false }
smallvec = "1.10"
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio"] }
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] }
ssh-key = { version = "0.5.1", default-features = false }
ssi = { version = "0.7", features = ["ed25519"] }
swagger = { version = "6.1", features = [
Expand Down
2 changes: 2 additions & 0 deletions migrations/sqlite/20240611183747_version.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS "ceramic_one_version";
7 changes: 7 additions & 0 deletions migrations/sqlite/20240611183747_version.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS "ceramic_one_version" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
"version" TEXT NOT NULL UNIQUE,
"installed_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"last_started_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
14 changes: 9 additions & 5 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ceramic_core::{EventId, Interest};
use ceramic_kubo_rpc::Multiaddr;
use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig};
use ceramic_service::{CeramicEventService, CeramicInterestService};
use ceramic_service::{CeramicEventService, CeramicInterestService, CeramicService};
use clap::{Args, Parser, Subcommand, ValueEnum};
use futures::StreamExt;
use multibase::Base;
Expand Down Expand Up @@ -295,8 +295,12 @@ impl DBOpts {
async fn build_sqlite_dbs(path: &str, process_undelivered: bool) -> Result<Databases> {
let sql_pool =
ceramic_store::SqlitePool::connect(path, ceramic_store::Migrations::Apply).await?;
let interest_store = Arc::new(CeramicInterestService::new(sql_pool.clone()));
let event_store = Arc::new(CeramicEventService::new(sql_pool, process_undelivered).await?);
let ceramic_service = CeramicService::try_new(sql_pool).await?;
let interest_store = ceramic_service.interest_service().to_owned();
let event_store = ceramic_service.event_service().to_owned();
if process_undelivered {
event_store.process_all_undelivered_events().await?;
}
info!(path, "connected to sqlite db");

Ok(Databases::Sqlite(SqliteBackend {
Expand Down Expand Up @@ -356,7 +360,7 @@ impl Daemon {
// though they are currently all implemented by a single struct and we're just cloning Arcs.
match db {
Databases::Sqlite(db) => {
Daemon::run_int(
Daemon::run_internal(
opts,
db.interest_store.clone(),
db.interest_store,
Expand All @@ -370,7 +374,7 @@ impl Daemon {
}
}

async fn run_int<I1, I2, E1, E2, E3>(
async fn run_internal<I1, I2, E1, E2, E3>(
opts: DaemonOpts,
interest_api_store: Arc<I1>,
interest_recon_store: Arc<I2>,
Expand Down
4 changes: 3 additions & 1 deletion p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1384,12 +1384,14 @@ mod tests {
let sql_pool = SqlitePool::connect_in_memory().await.unwrap();

let metrics = Metrics::register(&mut prometheus_client::registry::Registry::default());
let store = Arc::new(CeramicEventService::new(sql_pool).await?);
store.process_all_undelivered_events().await?;
let mut p2p = Node::new(
network_config,
rpc_server_addr,
keypair.into(),
None::<(DummyRecon<Interest>, DummyRecon<EventId>)>,
Arc::new(CeramicEventService::new(sql_pool, true).await?),
store,
metrics,
)
.await?;
Expand Down
21 changes: 11 additions & 10 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,9 @@ pub enum DeliverableRequirement {

impl CeramicEventService {
/// Create a new CeramicEventStore
pub async fn new(pool: SqlitePool, process_undelivered: bool) -> Result<Self> {
pub async fn new(pool: SqlitePool) -> Result<Self> {
CeramicOneEvent::init_delivered_order(&pool).await?;

if process_undelivered {
let _updated = OrderingTask::process_all_undelivered_events(
&pool,
MAX_ITERATIONS,
DELIVERABLE_EVENTS_BATCH_SIZE,
)
.await?;
}

let delivery_task = OrderingTask::run(pool.clone(), PENDING_EVENTS_CHANNEL_DEPTH).await;

Ok(Self {
Expand All @@ -84,6 +75,16 @@ impl CeramicEventService {
})
}

/// Returns the number of undelivered events that were updated
pub async fn process_all_undelivered_events(&self) -> Result<usize> {
OrderingTask::process_all_undelivered_events(
&self.pool,
MAX_ITERATIONS,
DELIVERABLE_EVENTS_BATCH_SIZE,
)
.await
}

pub async fn migrate_from_ipfs(&self, network: Network, blocks: impl BlockStore) -> Result<()> {
let migrator = Migrator::new(self, network, blocks)
.await
Expand Down
1 change: 1 addition & 0 deletions service/src/interest/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use ceramic_store::SqlitePool;

/// A Service that understands how to process and store Ceramic Interests.
/// Implements the [`recon::Store`], [`iroh_bitswap::Store`], and [`ceramic_api::EventStore`] traits for [`ceramic_core::Interest`].
#[derive(Debug)]
pub struct CeramicInterestService {
pub(crate) pool: SqlitePool,
}
Expand Down
33 changes: 33 additions & 0 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,44 @@
mod error;
mod event;
mod interest;

#[cfg(test)]
mod tests;

use std::sync::Arc;

use ceramic_store::{CeramicOneVersion, SqlitePool};
pub use error::Error;
pub use event::{BlockStore, CeramicEventService};
pub use interest::CeramicInterestService;

pub(crate) type Result<T> = std::result::Result<T, Error>;

/// The ceramic service holds the logic needed by the other components (e.g. api, recon) to access the store and process events
/// in a way that makes sense to the ceramic protocol, and not just as raw bytes.
#[derive(Debug)]
pub struct CeramicService {
pub(crate) interest: Arc<CeramicInterestService>,
pub(crate) event: Arc<CeramicEventService>,
}

impl CeramicService {
/// Create a new CeramicService and process undelivered events if requested
pub async fn try_new(pool: SqlitePool) -> Result<Self> {
// In the future, we may need to check the previous version to make sure we're not downgrading and risking data loss
CeramicOneVersion::insert_current(&pool).await?;
let interest = Arc::new(CeramicInterestService::new(pool.clone()));
let event = Arc::new(CeramicEventService::new(pool).await?);
Ok(Self { interest, event })
}

/// Get the interest service
pub fn interest_service(&self) -> &Arc<CeramicInterestService> {
&self.interest
}

/// Get the event service
pub fn event_service(&self) -> &Arc<CeramicEventService> {
&self.event
}
}
3 changes: 2 additions & 1 deletion service/src/tests/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ macro_rules! test_with_sqlite {
async fn [<$test_name _sqlite>]() {

let conn = ceramic_store::SqlitePool::connect_in_memory().await.unwrap();
let store = $crate::CeramicEventService::new(conn, true).await.unwrap();
let store = $crate::CeramicEventService::new(conn).await.unwrap();
store.process_all_undelivered_events().await.unwrap();
$(
for stmt in $sql_stmts {
store.pool.run_statement(stmt).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion service/src/tests/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn test_migration(cars: Vec<Vec<u8>>) {
let conn = ceramic_store::SqlitePool::connect_in_memory()
.await
.unwrap();
let service = CeramicEventService::new(conn, false).await.unwrap();
let service = CeramicEventService::new(conn).await.unwrap();
service
.migrate_from_ipfs(Network::Local(42), blocks)
.await
Expand Down
2 changes: 1 addition & 1 deletion service/src/tests/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn setup_service() -> CeramicEventService {
.await
.unwrap();

CeramicEventService::new(conn, false).await.unwrap()
CeramicEventService::new(conn).await.unwrap()
}

async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) {
Expand Down
4 changes: 2 additions & 2 deletions store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub use error::Error;
pub use metrics::{Metrics, StoreMetricsMiddleware};
pub use sql::{
entities::{BlockHash, EventBlockRaw, EventInsertable, EventInsertableBody},
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, InsertResult,
InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction,
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneVersion,
InsertResult, InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction,
};

pub(crate) type Result<T> = std::result::Result<T, Error>;
2 changes: 2 additions & 0 deletions store/src/sql/access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ mod block;
mod event;
mod event_block;
mod interest;
mod version;

pub use block::CeramicOneBlock;
pub use event::{CeramicOneEvent, InsertResult, InsertedEvent};
pub use event_block::CeramicOneEventBlock;
pub use interest::CeramicOneInterest;
pub use version::CeramicOneVersion;
98 changes: 98 additions & 0 deletions store/src/sql/access/version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::str::FromStr;

use anyhow::anyhow;

use crate::{
sql::{entities::VersionRow, SqlitePool},
Error, Result,
};

#[derive(Debug, Clone, PartialEq, Eq)]
/// It's kind of pointless to roundtrip CARGO_PKG_VERSION through this struct,
/// but it makes it clear how we expect to format our versions in the database.
struct SemVer {
major: u64,
minor: u64,
patch: u64,
}

impl std::fmt::Display for SemVer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}

impl std::str::FromStr for SemVer {
type Err = Error;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let parts: Vec<&str> = s.split('.').collect();
if parts.len() != 3 {
Err(Error::new_invalid_arg(anyhow!(
"Invalid version. Must have 3 parts: {}",
s.to_string()
)))
} else {
let major = parts[0].parse().map_err(|_| {
Error::new_invalid_arg(anyhow!(
"Invalid version. Major did not parse: {}",
s.to_string()
))
})?;
let minor = parts[1].parse().map_err(|_| {
Error::new_invalid_arg(anyhow!(
"Invalid version. Minor did not parse: {}",
s.to_string()
))
})?;
let patch = parts[2].parse().map_err(|_| {
Error::new_invalid_arg(anyhow!(
"Invalid version. Patch did not parse: {}",
s.to_string()
))
})?;
Ok(Self {
major,
minor,
patch,
})
}
}
}

#[derive(Debug, Clone)]
/// Access to ceramic version information
pub struct CeramicOneVersion {}

impl CeramicOneVersion {
/// Fetch the previous version from the database. May be None if no previous version exists.
pub async fn fetch_previous(pool: &SqlitePool) -> Result<Option<VersionRow>> {
let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?;
VersionRow::_fetch_previous(pool, &current.to_string()).await
}

/// Insert the current version into the database
pub async fn insert_current(pool: &SqlitePool) -> Result<()> {
let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?;
VersionRow::insert_current(pool, &current.to_string()).await
}
}

#[cfg(test)]
mod test {
use super::*;

use crate::SqlitePool;

#[tokio::test]
async fn insert_version() {
let mem = SqlitePool::connect_in_memory().await.unwrap();
CeramicOneVersion::insert_current(&mem).await.unwrap();
}

#[tokio::test]
async fn prev_version() {
let mem = SqlitePool::connect_in_memory().await.unwrap();
CeramicOneVersion::fetch_previous(&mem).await.unwrap();
}
}
2 changes: 2 additions & 0 deletions store/src/sql/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ mod event;
mod event_block;
mod hash;
mod utils;
mod version;

pub use block::{BlockBytes, BlockRow};
pub use event::{rebuild_car, EventInsertable, EventInsertableBody};
pub use event_block::{EventBlockRaw, ReconEventBlockRaw};
pub use hash::{BlockHash, ReconHash};
pub use version::VersionRow;

pub use utils::{CountRow, DeliveredEventRow, OrderKey};
Loading

0 comments on commit f490f0a

Please sign in to comment.