Skip to content

Commit

Permalink
Create a version table and update it when starting the daemon.
Browse files Browse the repository at this point in the history
This can be used in the future to prevent downgrades that go across data migrations.
  • Loading branch information
dav1do committed Jul 1, 2024
1 parent 6acf4a0 commit f23f243
Show file tree
Hide file tree
Showing 15 changed files with 205 additions and 10 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ serde_qs = "0.10.1"
serde_with = "2.1"
sha2 = { version = "0.10", default-features = false }
smallvec = "1.10"
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio"] }
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] }
ssh-key = { version = "0.5.1", default-features = false }
ssi = { version = "0.7", features = ["ed25519"] }
swagger = { version = "6.1", features = [
Expand Down
2 changes: 2 additions & 0 deletions migrations/sqlite/20240611183747_version.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS "ceramic_one_version";
7 changes: 7 additions & 0 deletions migrations/sqlite/20240611183747_version.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS "ceramic_one_version" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
"version" TEXT NOT NULL UNIQUE,
"installed_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"last_started_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
11 changes: 6 additions & 5 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ceramic_core::{EventId, Interest};
use ceramic_kubo_rpc::Multiaddr;
use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig};
use ceramic_service::{CeramicEventService, CeramicInterestService};
use ceramic_service::{CeramicEventService, CeramicInterestService, CeramicService};
use clap::{Args, Parser, Subcommand, ValueEnum};
use futures::StreamExt;
use multibase::Base;
Expand Down Expand Up @@ -273,8 +273,9 @@ impl DBOpts {
async fn build_sqlite_dbs(path: &str) -> Result<Databases> {
let sql_pool =
ceramic_store::SqlitePool::connect(path, ceramic_store::Migrations::Apply).await?;
let interest_store = Arc::new(CeramicInterestService::new(sql_pool.clone()));
let event_store = Arc::new(CeramicEventService::new(sql_pool).await?);
let ceramic_service = CeramicService::try_new(sql_pool).await?;
let interest_store = ceramic_service.interest_service().to_owned();
let event_store = ceramic_service.event_service().to_owned();
println!("Connected to sqlite database: {}", path);

Ok(Databases::Sqlite(SqliteBackend {
Expand Down Expand Up @@ -303,7 +304,7 @@ impl Daemon {
// though they are currently all implemented by a single struct and we're just cloning Arcs.
match db {
Databases::Sqlite(db) => {
Daemon::run_int(
Daemon::run_internal(
opts,
db.interest_store.clone(),
db.interest_store,
Expand All @@ -316,7 +317,7 @@ impl Daemon {
}
}

async fn run_int<I1, I2, E1, E2, E3>(
async fn run_internal<I1, I2, E1, E2, E3>(
opts: DaemonOpts,
interest_api_store: Arc<I1>,
interest_recon_store: Arc<I2>,
Expand Down
1 change: 1 addition & 0 deletions service/src/interest/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use ceramic_store::SqlitePool;

/// A Service that understands how to process and store Ceramic Interests.
/// Implements the [`recon::Store`], [`iroh_bitswap::Store`], and [`ceramic_api::EventStore`] traits for [`ceramic_core::Interest`].
#[derive(Debug, Clone)]
pub struct CeramicInterestService {
pub(crate) pool: SqlitePool,
}
Expand Down
33 changes: 33 additions & 0 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,44 @@
mod error;
mod event;
mod interest;

#[cfg(test)]
mod tests;

use std::sync::Arc;

use ceramic_store::{CeramicOneVersion, SqlitePool};
pub use error::Error;
pub use event::{Block, BoxedBlock, CeramicEventService};
pub use interest::CeramicInterestService;

pub(crate) type Result<T> = std::result::Result<T, Error>;

/// The ceramic service holds the logic needed by the other components (e.g. api, recon) to access the store and process events
/// in a way that makes sense to the ceramic protocol, and not just as raw bytes.
#[derive(Debug)]
pub struct CeramicService {
pub(crate) interest: Arc<CeramicInterestService>,
pub(crate) event: Arc<CeramicEventService>,
}

impl CeramicService {
/// Create a new CeramicService
pub async fn try_new(pool: SqlitePool) -> Result<Self> {
// In the future, we may need to check the previous version to make sure we're not downgrading and risking data loss
CeramicOneVersion::insert_current(&pool).await?;
let interest = Arc::new(CeramicInterestService::new(pool.clone()));
let event = Arc::new(CeramicEventService::new(pool).await?);
Ok(Self { interest, event })
}

/// Get the interest service
pub fn interest_service(&self) -> &Arc<CeramicInterestService> {
&self.interest
}

/// Get the event service
pub fn event_service(&self) -> &Arc<CeramicEventService> {
&self.event
}
}
1 change: 1 addition & 0 deletions store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ recon.workspace = true
sqlx.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
ceramic-event.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub use error::Error;
pub use metrics::{Metrics, StoreMetricsMiddleware};
pub use sql::{
entities::{BlockHash, EventBlockRaw, EventInsertable, EventInsertableBody},
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, InsertResult,
InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction,
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneVersion,
InsertResult, InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction,
};

pub(crate) type Result<T> = std::result::Result<T, Error>;
2 changes: 2 additions & 0 deletions store/src/sql/access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ mod block;
mod event;
mod event_block;
mod interest;
mod version;

pub use block::CeramicOneBlock;
pub use event::{CeramicOneEvent, InsertResult, InsertedEvent};
pub use event_block::CeramicOneEventBlock;
pub use interest::CeramicOneInterest;
pub use version::CeramicOneVersion;
98 changes: 98 additions & 0 deletions store/src/sql/access/version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::str::FromStr;

use anyhow::anyhow;

use crate::{
sql::{entities::VersionRow, SqlitePool},
Error, Result,
};

#[derive(Debug, Clone, PartialEq, Eq)]
/// It's kind of pointless to roundtrip CARGO_PKG_VERSION through this struct,
/// but it makes it clear how we expect to format our versions in the database.
struct SemVer {
major: u64,
minor: u64,
patch: u64,
}

impl std::fmt::Display for SemVer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}

impl std::str::FromStr for SemVer {
type Err = Error;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let parts: Vec<&str> = s.split('.').collect();
if parts.len() != 3 {
Err(Error::new_invalid_arg(anyhow!(
"Invalid version. Must have 3 parts: {}",
s.to_string()
)))
} else {
let major = parts[0].parse().map_err(|_| {
Error::new_invalid_arg(anyhow!(
"Invalid version. Major did not parse: {}",
s.to_string()
))
})?;
let minor = parts[1].parse().map_err(|_| {
Error::new_invalid_arg(anyhow!(
"Invalid version. Minor did not parse: {}",
s.to_string()
))
})?;
let patch = parts[2].parse().map_err(|_| {
Error::new_invalid_arg(anyhow!(
"Invalid version. Patch did not parse: {}",
s.to_string()
))
})?;
Ok(Self {
major,
minor,
patch,
})
}
}
}

#[derive(Debug, Clone)]
/// Access to ceramic version information
pub struct CeramicOneVersion {}

impl CeramicOneVersion {
/// Fetch the previous version from the database. May be None if no previous version exists.
pub async fn fetch_previous(pool: &SqlitePool) -> Result<Option<VersionRow>> {
let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?;
VersionRow::_fetch_previous(pool, &current.to_string()).await
}

/// Insert the current version into the database
pub async fn insert_current(pool: &SqlitePool) -> Result<()> {
let current = SemVer::from_str(env!("CARGO_PKG_VERSION"))?;
VersionRow::insert_current(pool, &current.to_string()).await
}
}

#[cfg(test)]
mod test {
use super::*;

use crate::SqlitePool;

#[tokio::test]
async fn insert_version() {
let mem = SqlitePool::connect_in_memory().await.unwrap();
CeramicOneVersion::insert_current(&mem).await.unwrap();
}

#[tokio::test]
async fn prev_version() {
let mem = SqlitePool::connect_in_memory().await.unwrap();
CeramicOneVersion::fetch_previous(&mem).await.unwrap();
}
}
2 changes: 2 additions & 0 deletions store/src/sql/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ mod event;
mod event_block;
mod hash;
mod utils;
mod version;

pub use block::{BlockBytes, BlockRow};
pub use event::{rebuild_car, EventInsertable, EventInsertableBody};
pub use event_block::{EventBlockRaw, ReconEventBlockRaw};
pub use hash::{BlockHash, ReconHash};
pub use version::VersionRow;

pub use utils::{CountRow, DeliveredEventRow, OrderKey};
39 changes: 39 additions & 0 deletions store/src/sql/entities/version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use sqlx::types::chrono;

use crate::{Result, SqlitePool};

#[derive(Debug, Clone, sqlx::FromRow)]
// We want to retrieve these fields for logging but we don't refer to them directly
#[allow(dead_code)]
pub struct VersionRow {
id: i64,
pub version: String,
pub installed_at: chrono::NaiveDateTime,
pub last_started_at: chrono::NaiveDateTime,
}

impl VersionRow {
/// Return the version installed before the current version
pub async fn _fetch_previous(pool: &SqlitePool, current_version: &str) -> Result<Option<Self>> {
Ok(sqlx::query_as(
"SELECT id, version, installed_at
FROM ceramic_one_version
WHERE version <> $1
ORDER BY installed_at DESC limit 1;",
)
.bind(current_version)
.fetch_optional(pool.reader())
.await?)
}

/// Add the current version to the database, updating the last_started_at field if the version already exists
pub async fn insert_current(pool: &SqlitePool, current_version: &str) -> Result<()> {
sqlx::query(
"INSERT INTO ceramic_one_version (version) VALUES ($1) ON CONFLICT (version) DO UPDATE set last_started_at = CURRENT_TIMESTAMP;",
)
.bind(current_version)
.execute(pool.writer())
.await?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions store/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ mod sqlite;
mod test;

pub use access::{
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, InsertResult,
InsertedEvent,
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneVersion,
InsertResult, InsertedEvent,
};
pub use root::SqliteRootStore;
pub use sqlite::{SqlitePool, SqliteTransaction};
Expand Down
3 changes: 3 additions & 0 deletions store/src/sql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ impl EventQuery {
}

/// Updates the delivered column in the event table so it can be set to the client
/// Requires 2 parameters:
/// $1 = delivered (i64)
/// $2 = cid (bytes)
pub fn mark_ready_to_deliver() -> &'static str {
"UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2 and delivered is NULL;"
}
Expand Down

0 comments on commit f23f243

Please sign in to comment.