diff --git a/crates/api/src/v1/fetch.rs b/crates/api/src/v1/fetch.rs index 69eff281..081e9e8e 100644 --- a/crates/api/src/v1/fetch.rs +++ b/crates/api/src/v1/fetch.rs @@ -6,7 +6,7 @@ use std::{borrow::Cow, collections::HashMap}; use thiserror::Error; use warg_crypto::hash::AnyHash; use warg_protocol::{ - registry::{LogId, RegistryLen}, + registry::{LogId, PackageId, RegistryLen}, PublishedProtoEnvelopeBody, }; @@ -53,6 +53,24 @@ pub struct FetchLogsResponse { pub packages: HashMap>, } +/// Represents a fetch package names request. +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FetchPackageNamesRequest<'a> { + /// List of package log IDs to request the package name. + pub packages: Cow<'a, Vec>, +} + +/// Represents a fetch package names response. If the requested number of packages exceeds the limit +/// that the server can fulfill on a single request, the client should retry with the log IDs that +/// are absent in the response body. +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FetchPackageNamesResponse { + /// The log ID hash mapping to a package ID. If `None`, the package name cannot be provided. + pub packages: HashMap>, +} + /// Represents a fetch API error. #[non_exhaustive] #[derive(Debug, Error)] diff --git a/crates/api/src/v1/ledger.rs b/crates/api/src/v1/ledger.rs new file mode 100644 index 00000000..959a2502 --- /dev/null +++ b/crates/api/src/v1/ledger.rs @@ -0,0 +1,113 @@ +//! Types relating to the ledger API. + +use serde::{Deserialize, Serialize, Serializer}; +use std::borrow::Cow; +use thiserror::Error; +use warg_crypto::hash::HashAlgorithm; +use warg_protocol::registry::RegistryIndex; + +/// Represents response a get ledger sources request. +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LedgerSourcesResponse { + /// The hash algorithm used by the ledger. + pub hash_algorithm: HashAlgorithm, + /// The list of ledger sources. + pub sources: Vec, +} + +/// Ledger source for a specified registry index range. Expected to be sorted in ascending order. +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LedgerSource { + /// First registry index that is included in this ledger source. + pub first_registry_index: RegistryIndex, + /// Last registry index that is included in this ledger source. + pub last_registry_index: RegistryIndex, + /// The HTTP GET URL location for the ledger source. + pub url: String, + /// Content type for the ledger source. + pub content_type: LedgerSourceContentType, + /// Optional, server accepts for HTTP Range header. + #[serde(default, skip_serializing_if = "is_false")] + pub accept_ranges: bool, +} + +fn is_false(b: &bool) -> bool { + !b +} + +/// Content type for the ledger source. +#[derive(Default, PartialEq, Serialize, Deserialize, Debug)] +pub enum LedgerSourceContentType { + /// The content type is binary representation of the LogId and RecordId hashes without padding. + /// In the case of `sha256` hash algorithm, this is a repeating sequence of 64 bytes (32 bytes + /// for each the LogId and RecordId) without padding. + #[default] + #[serde(rename = "application/vnd.warg.ledger.packed")] + Packed, +} + +impl LedgerSourceContentType { + /// Returns the content type represented as a string. + pub fn as_str(&self) -> &'static str { + match self { + Self::Packed => "application/vnd.warg.ledger.packed", + } + } +} + +/// Represents a ledger API error. +#[non_exhaustive] +#[derive(Debug, Error)] +pub enum LedgerError { + /// An error with a message occurred. + #[error("{message}")] + Message { + /// The HTTP status code. + status: u16, + /// The error message + message: String, + }, +} + +impl LedgerError { + /// Returns the HTTP status code of the error. + pub fn status(&self) -> u16 { + match self { + Self::Message { status, .. } => *status, + } + } +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged, rename_all = "camelCase")] +enum RawError<'a> { + Message { status: u16, message: Cow<'a, str> }, +} + +impl Serialize for LedgerError { + fn serialize(&self, serializer: S) -> Result { + match self { + Self::Message { status, message } => RawError::Message { + status: *status, + message: Cow::Borrowed(message), + } + .serialize(serializer), + } + } +} + +impl<'de> Deserialize<'de> for LedgerError { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + match RawError::deserialize(deserializer)? { + RawError::Message { status, message } => Ok(Self::Message { + status, + message: message.into_owned(), + }), + } + } +} diff --git a/crates/api/src/v1/mod.rs b/crates/api/src/v1/mod.rs index 878035f9..71af9127 100644 --- a/crates/api/src/v1/mod.rs +++ b/crates/api/src/v1/mod.rs @@ -2,12 +2,19 @@ pub mod content; pub mod fetch; +pub mod ledger; +pub mod monitor; pub mod package; pub mod paths; pub mod proof; use serde::{Deserialize, Serialize}; +/// The HTTP request and response header name that specifies the registry domain whose data is the +/// subject of the request. This header is only expected to be used if referring to a different +/// registry than the host registry. +pub const REGISTRY_HEADER_NAME: &str = "warg-registry"; + /// Represents the supported kinds of content sources. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] diff --git a/crates/api/src/v1/monitor.rs b/crates/api/src/v1/monitor.rs new file mode 100644 index 00000000..f1e7aad0 --- /dev/null +++ b/crates/api/src/v1/monitor.rs @@ -0,0 +1,88 @@ +//! Types relating to the monitor API. + +use serde::{Deserialize, Serialize, Serializer}; +use std::borrow::Cow; +use thiserror::Error; + +/// Represents checkpoint verification response. +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointVerificationResponse { + /// The checkpoint verification state. + pub checkpoint: VerificationState, + /// The checkpoint signature verification state. + pub signature: VerificationState, + /// Optional, retry after specified number of seconds. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_after: Option, +} + +/// Represents checkpoint verification state. +#[derive(Eq, PartialEq, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum VerificationState { + /// The checkpoint is unverified and could be valid or invalid. + #[serde(rename_all = "camelCase")] + Unverified, + /// The checkpoint is verified. + #[serde(rename_all = "camelCase")] + Verified, + /// The checkpoint is invalid. + #[serde(rename_all = "camelCase")] + Invalid, +} + +/// Represents a monitor API error. +#[non_exhaustive] +#[derive(Debug, Error)] +pub enum MonitorError { + /// An error with a message occurred. + #[error("{message}")] + Message { + /// The HTTP status code. + status: u16, + /// The error message + message: String, + }, +} + +impl MonitorError { + /// Returns the HTTP status code of the error. + pub fn status(&self) -> u16 { + match self { + Self::Message { status, .. } => *status, + } + } +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged, rename_all = "camelCase")] +enum RawError<'a> { + Message { status: u16, message: Cow<'a, str> }, +} + +impl Serialize for MonitorError { + fn serialize(&self, serializer: S) -> Result { + match self { + Self::Message { status, message } => RawError::Message { + status: *status, + message: Cow::Borrowed(message), + } + .serialize(serializer), + } + } +} + +impl<'de> Deserialize<'de> for MonitorError { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + match RawError::deserialize(deserializer)? { + RawError::Message { status, message } => Ok(Self::Message { + status, + message: message.into_owned(), + }), + } + } +} diff --git a/crates/api/src/v1/package.rs b/crates/api/src/v1/package.rs index cf209147..a7eb482e 100644 --- a/crates/api/src/v1/package.rs +++ b/crates/api/src/v1/package.rs @@ -41,7 +41,8 @@ pub struct MissingContent { #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PublishRecordRequest<'a> { - /// The id of the package being published. + /// The package name being published. + #[serde(alias = "packageName")] pub package_id: Cow<'a, PackageId>, /// The publish record to add to the package log. pub record: Cow<'a, ProtoEnvelopeBody>, @@ -120,6 +121,18 @@ pub enum PackageError { /// The record is not currently sourcing content. #[error("the record is not currently sourcing content")] RecordNotSourcing, + /// The provided package's namespace was not found in the operator log. + #[error("namespace `{0}` is not defined on the registry")] + NamespaceNotDefined(String), + /// The provided package's namespace is imported from another registry. + #[error("namespace `{0}` is an imported namespace from another registry")] + NamespaceImported(String), + /// The provided package's namespace conflicts with an existing namespace where the name only differs by case. + #[error("namespace conflicts with existing namespace `{0}`; package namespaces must be unique in a case insensitive way")] + NamespaceConflict(String), + /// The provided package name conflicts with an existing package where the name only differs by case. + #[error("the package conflicts with existing package name `{0}`; package names must be unique in a case insensitive way")] + PackageNameConflict(PackageId), /// The operation was not authorized by the registry. #[error("unauthorized operation: {0}")] Unauthorized(String), @@ -146,7 +159,10 @@ impl PackageError { // Note: this is 403 and not a 401 as the registry does not use // HTTP authentication. Self::Unauthorized { .. } => 403, - Self::LogNotFound(_) | Self::RecordNotFound(_) => 404, + Self::LogNotFound(_) | Self::RecordNotFound(_) | Self::NamespaceNotDefined(_) => 404, + Self::NamespaceImported(_) + | Self::NamespaceConflict(_) + | Self::PackageNameConflict(_) => 409, Self::RecordNotSourcing => 405, Self::Rejection(_) => 422, Self::NotSupported(_) => 501, @@ -160,6 +176,9 @@ impl PackageError { enum EntityType { Log, Record, + Namespace, + NamespaceImport, + Name, } #[derive(Serialize, Deserialize)] @@ -179,6 +198,12 @@ where ty: EntityType, id: Cow<'a, T>, }, + Conflict { + status: Status<409>, + #[serde(rename = "type")] + ty: EntityType, + id: Cow<'a, T>, + }, RecordNotSourcing { status: Status<405>, }, @@ -216,6 +241,30 @@ impl Serialize for PackageError { id: Cow::Borrowed(record_id), } .serialize(serializer), + Self::NamespaceNotDefined(namespace) => RawError::NotFound { + status: Status::<404>, + ty: EntityType::Namespace, + id: Cow::Borrowed(namespace), + } + .serialize(serializer), + Self::NamespaceImported(namespace) => RawError::Conflict { + status: Status::<409>, + ty: EntityType::NamespaceImport, + id: Cow::Borrowed(namespace), + } + .serialize(serializer), + Self::NamespaceConflict(existing) => RawError::Conflict { + status: Status::<409>, + ty: EntityType::Namespace, + id: Cow::Borrowed(existing), + } + .serialize(serializer), + Self::PackageNameConflict(existing) => RawError::Conflict { + status: Status::<409>, + ty: EntityType::Name, + id: Cow::Borrowed(existing), + } + .serialize(serializer), Self::RecordNotSourcing => RawError::RecordNotSourcing::<()> { status: Status::<405>, } @@ -266,6 +315,22 @@ impl<'de> Deserialize<'de> for PackageError { })? .into(), )), + EntityType::Namespace => Ok(Self::NamespaceNotDefined(id.into_owned())), + _ => Err(serde::de::Error::invalid_value( + Unexpected::Enum, + &"a valid entity type", + )), + }, + RawError::Conflict { status: _, ty, id } => match ty { + EntityType::Namespace => Ok(Self::NamespaceConflict(id.into_owned())), + EntityType::NamespaceImport => Ok(Self::NamespaceImported(id.into_owned())), + EntityType::Name => Ok(Self::PackageNameConflict( + PackageId::new(id.into_owned()).unwrap(), + )), + _ => Err(serde::de::Error::invalid_value( + Unexpected::Enum, + &"a valid entity type", + )), }, RawError::RecordNotSourcing { status: _ } => Ok(Self::RecordNotSourcing), RawError::Rejection { status: _, message } => Ok(Self::Rejection(message.into_owned())), diff --git a/crates/api/src/v1/paths.rs b/crates/api/src/v1/paths.rs index 9ae08f3b..c348332c 100644 --- a/crates/api/src/v1/paths.rs +++ b/crates/api/src/v1/paths.rs @@ -13,6 +13,16 @@ pub fn fetch_checkpoint() -> &'static str { "v1/fetch/checkpoint" } +/// The path of the "fetch package names" API. +pub fn fetch_package_names() -> &'static str { + "v1/fetch/names" +} + +/// The path of the get ledger sources. +pub fn ledger_sources() -> &'static str { + "v1/ledger" +} + /// The path of the "publish package record" API. pub fn publish_package_record(log_id: &LogId) -> String { format!("v1/package/{log_id}/record") @@ -37,3 +47,8 @@ pub fn prove_consistency() -> &'static str { pub fn prove_inclusion() -> &'static str { "v1/proof/inclusion" } + +/// The path for verifying a checkpoint. +pub fn verify_checkpoint() -> &'static str { + "v1/verify/checkpoint" +} diff --git a/crates/client/src/api.rs b/crates/client/src/api.rs index 45236047..9e54c0dd 100644 --- a/crates/client/src/api.rs +++ b/crates/client/src/api.rs @@ -12,7 +12,12 @@ use std::{borrow::Cow, collections::HashMap}; use thiserror::Error; use warg_api::v1::{ content::{ContentError, ContentSourcesResponse}, - fetch::{FetchError, FetchLogsRequest, FetchLogsResponse}, + fetch::{ + FetchError, FetchLogsRequest, FetchLogsResponse, FetchPackageNamesRequest, + FetchPackageNamesResponse, + }, + ledger::{LedgerError, LedgerSourcesResponse}, + monitor::{CheckpointVerificationResponse, MonitorError}, package::{ContentSource, PackageError, PackageRecord, PublishRecordRequest}, paths, proof::{ @@ -46,6 +51,12 @@ pub enum ClientError { /// An error was returned from the proof API. #[error(transparent)] Proof(#[from] ProofError), + /// An error was returned from the monitor API. + #[error(transparent)] + Monitor(#[from] MonitorError), + /// An error was returned from the ledger API. + #[error(transparent)] + Ledger(#[from] LedgerError), /// An error occurred while communicating with the registry. #[error("failed to send request to registry server: {0}")] Communication(#[from] reqwest::Error), @@ -173,6 +184,18 @@ impl Client { into_result::<_, FetchError>(reqwest::get(url).await?).await } + /// Verify checkpoint of the registry. + pub async fn verify_checkpoint( + &self, + request: SerdeEnvelope, + ) -> Result { + let url = self.url.join(paths::verify_checkpoint()); + tracing::debug!("verifying checkpoint at `{url}`"); + + let response = self.client.post(url).json(&request).send().await?; + into_result::<_, MonitorError>(response).await + } + /// Fetches package log entries from the registry. pub async fn fetch_logs( &self, @@ -185,6 +208,27 @@ impl Client { into_result::<_, FetchError>(response).await } + /// Fetches package names from the registry. + pub async fn fetch_package_names( + &self, + request: FetchPackageNamesRequest<'_>, + ) -> Result { + let url = self.url.join(paths::fetch_package_names()); + tracing::debug!("fetching package names at `{url}`"); + + let response = self.client.post(url).json(&request).send().await?; + into_result::<_, FetchError>(response).await + } + + /// Gets ledger sources from the registry. + pub async fn ledger_sources(&self) -> Result { + let url = self.url.join(paths::ledger_sources()); + tracing::debug!("getting ledger sources at `{url}`"); + + let response = reqwest::get(url).await?; + into_result::<_, LedgerError>(response).await + } + /// Publish a new record to a package log. pub async fn publish_package_record( &self, diff --git a/crates/protocol/src/operator/mod.rs b/crates/protocol/src/operator/mod.rs index 58790397..88edd28d 100644 --- a/crates/protocol/src/operator/mod.rs +++ b/crates/protocol/src/operator/mod.rs @@ -10,7 +10,7 @@ mod model; mod state; pub use model::{OperatorEntry, OperatorRecord}; -pub use state::{LogState, ValidationError}; +pub use state::{LogState, NamespaceState, ValidationError}; /// The currently supported operator protocol version. pub const OPERATOR_RECORD_VERSION: u32 = 0; @@ -83,6 +83,13 @@ impl TryFrom for model::OperatorEntry { .map(TryInto::try_into) .collect::>()?, }, + Contents::DefineNamespace(define_namespace) => model::OperatorEntry::DefineNamespace { + namespace: define_namespace.namespace, + }, + Contents::ImportNamespace(import_namespace) => model::OperatorEntry::ImportNamespace { + namespace: import_namespace.namespace, + registry: import_namespace.registry, + }, }; Ok(output) } @@ -103,6 +110,8 @@ impl TryFrom for model::Permission { Err(Error::new(PermissionParseError { value: permission })) } protobuf::OperatorPermission::Commit => Ok(model::Permission::Commit), + protobuf::OperatorPermission::DefineNamespace => Ok(model::Permission::DefineNamespace), + protobuf::OperatorPermission::ImportNamespace => Ok(model::Permission::ImportNamespace), } } } @@ -161,6 +170,18 @@ impl<'a> From<&'a model::OperatorEntry> for protobuf::OperatorEntry { key_id: key_id.to_string(), permissions: permissions.iter().map(Into::into).collect(), }), + model::OperatorEntry::DefineNamespace { namespace } => { + Contents::DefineNamespace(protobuf::OperatorDefineNamespace { + namespace: namespace.clone(), + }) + } + model::OperatorEntry::ImportNamespace { + namespace, + registry, + } => Contents::ImportNamespace(protobuf::OperatorImportNamespace { + namespace: namespace.clone(), + registry: registry.clone(), + }), }; let contents = Some(contents); protobuf::OperatorEntry { contents } @@ -171,6 +192,8 @@ impl<'a> From<&'a model::Permission> for i32 { fn from(permission: &'a model::Permission) -> Self { let proto_perm = match permission { model::Permission::Commit => protobuf::OperatorPermission::Commit, + model::Permission::DefineNamespace => protobuf::OperatorPermission::DefineNamespace, + model::Permission::ImportNamespace => protobuf::OperatorPermission::ImportNamespace, }; proto_perm.into() } diff --git a/crates/protocol/src/operator/model.rs b/crates/protocol/src/operator/model.rs index eae448f6..fd76cf6a 100644 --- a/crates/protocol/src/operator/model.rs +++ b/crates/protocol/src/operator/model.rs @@ -30,13 +30,22 @@ impl crate::Record for OperatorRecord { #[serde(rename_all = "camelCase")] #[non_exhaustive] pub enum Permission { + /// Permission to sign checkpoints. Commit, + /// Permission to define namespace in operator log. + DefineNamespace, + /// Permission to import namespace from another registry and add to the operator log. + ImportNamespace, } impl Permission { /// Gets an array of all permissions. - pub const fn all() -> [Permission; 1] { - [Permission::Commit] + pub const fn all() -> [Permission; 3] { + [ + Permission::Commit, + Permission::DefineNamespace, + Permission::ImportNamespace, + ] } } @@ -44,6 +53,8 @@ impl fmt::Display for Permission { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Permission::Commit => write!(f, "commit"), + Permission::DefineNamespace => write!(f, "defineNamespace"), + Permission::ImportNamespace => write!(f, "importNamespace"), } } } @@ -54,6 +65,8 @@ impl FromStr for Permission { fn from_str(s: &str) -> Result { match s { "commit" => Ok(Permission::Commit), + "defineNamespace" => Ok(Permission::DefineNamespace), + "importNamespace" => Ok(Permission::ImportNamespace), _ => Err(()), } } @@ -82,6 +95,10 @@ pub enum OperatorEntry { key_id: signing::KeyID, permissions: Vec, }, + /// The registry defines a namespace to be used in its own package logs. + DefineNamespace { namespace: String }, + /// The registry defines a namespace as imported from another registry. + ImportNamespace { namespace: String, registry: String }, } impl OperatorEntry { @@ -90,6 +107,8 @@ impl OperatorEntry { match self { Self::Init { .. } => None, Self::GrantFlat { .. } | Self::RevokeFlat { .. } => Some(Permission::Commit), + Self::DefineNamespace { .. } => Some(Permission::DefineNamespace), + Self::ImportNamespace { .. } => Some(Permission::ImportNamespace), } } } diff --git a/crates/protocol/src/operator/state.rs b/crates/protocol/src/operator/state.rs index 9ff1f2a5..1c1598b8 100644 --- a/crates/protocol/src/operator/state.rs +++ b/crates/protocol/src/operator/state.rs @@ -1,4 +1,5 @@ use super::{model, OPERATOR_RECORD_VERSION}; +use crate::registry::PackageId; use crate::registry::RecordId; use crate::ProtoEnvelope; use indexmap::{IndexMap, IndexSet}; @@ -57,6 +58,39 @@ pub enum ValidationError { #[error("record has lower timestamp than previous")] TimestampLowerThanPrevious, + + #[error("the namespace `{namespace}` is invalid; namespace must be a kebab case string")] + InvalidNamespace { namespace: String }, + + #[error("the namespace `{namespace}` conflicts with the existing namespace `{existing}`; namespace must be unique in a case insensitive way")] + NamespaceConflict { namespace: String, existing: String }, + + #[error("the namespace `{namespace}` is already defined and cannot be redefined")] + NamespaceAlreadyDefined { namespace: String }, +} + +/// The namespace definition. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +struct NamespaceDefinition { + /// Case sensitive namespace name. + namespace: String, + /// Namespace state. + state: NamespaceState, +} + +/// The namespace state for defining or importing from other registries. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum NamespaceState { + /// The namespace is defined for the registry to use for its own package logs. + Defined, + /// The namespace is imported from another registry. + #[serde(rename_all = "camelCase")] + Imported { + /// The imported registry. + registry: String, + }, } /// Information about the current head of the operator log. @@ -89,6 +123,9 @@ pub struct LogState { /// The keys known to the validator. #[serde(skip_serializing_if = "IndexMap::is_empty")] keys: IndexMap, + /// The namespaces known to the validator. The key is the lowercased namespace. + #[serde(skip_serializing_if = "IndexMap::is_empty")] + namespaces: IndexMap, } impl LogState { @@ -132,6 +169,29 @@ impl LogState { self.keys.get(key_id) } + /// Gets the namespace state. + pub fn namespace_state(&self, namespace: &str) -> Result, &str> { + if let Some(def) = self.namespaces.get(&namespace.to_ascii_lowercase()) { + if def.namespace == namespace { + // namespace exact match, return namespace state + Ok(Some(&def.state)) + } else { + // namespace matches a namespace but differ in a case sensitive way, + // so return error with existing namespace + Err(&def.namespace) + } + } else { + // namespace is not defined + Ok(None) + } + } + + /// Checks the key has permission to sign checkpoints. + pub fn key_has_permission_to_sign_checkpoints(&self, key_id: &signing::KeyID) -> bool { + self.check_key_permissions(key_id, &[model::Permission::Commit]) + .is_ok() + } + fn initialized(&self) -> bool { // The package log is initialized if the hash algorithm is set self.algorithm.is_some() @@ -261,6 +321,18 @@ impl LogState { key_id, permissions, } => self.validate_revoke_entry(signer_key_id, key_id, permissions)?, + model::OperatorEntry::DefineNamespace { namespace } => { + self.validate_namespace(namespace, NamespaceState::Defined)? + } + model::OperatorEntry::ImportNamespace { + namespace, + registry, + } => self.validate_namespace( + namespace, + NamespaceState::Imported { + registry: registry.to_string(), + }, + )?, } } @@ -334,6 +406,46 @@ impl LogState { Ok(()) } + fn validate_namespace( + &mut self, + namespace: &str, + state: NamespaceState, + ) -> Result<(), ValidationError> { + if !PackageId::is_valid_namespace(namespace) { + return Err(ValidationError::InvalidNamespace { + namespace: namespace.to_string(), + }); + } + + let namespace_lowercase = namespace.to_ascii_lowercase(); + + if let Some(def) = self.namespaces.get(&namespace_lowercase) { + if namespace == def.namespace { + // namespace matches exactly + Err(ValidationError::NamespaceAlreadyDefined { + namespace: namespace.to_string(), + }) + } else { + // namespace matches an existing namespace but differs in a case sensitive way + Err(ValidationError::NamespaceConflict { + namespace: namespace.to_string(), + existing: def.namespace.to_string(), + }) + } + } else { + // namespace is not defined + self.namespaces.insert( + namespace_lowercase, + NamespaceDefinition { + namespace: namespace.to_string(), + state, + }, + ); + + Ok(()) + } + } + fn check_key_permissions( &self, key_id: &signing::KeyID, @@ -361,6 +473,7 @@ impl LogState { head, permissions, keys, + namespaces, } = self; Snapshot { @@ -368,6 +481,7 @@ impl LogState { head: head.clone(), permissions: permissions.len(), keys: keys.len(), + namespaces: namespaces.len(), } } @@ -377,12 +491,14 @@ impl LogState { head, permissions, keys, + namespaces, } = snapshot; self.algorithm = algorithm; self.head = head; self.permissions.truncate(permissions); self.keys.truncate(keys); + self.namespaces.truncate(namespaces); } } @@ -400,6 +516,7 @@ struct Snapshot { head: Option, permissions: usize, keys: usize, + namespaces: usize, } #[cfg(test)] @@ -443,9 +560,14 @@ mod tests { algorithm: Some(HashAlgorithm::Sha256), permissions: IndexMap::from([( alice_id.clone(), - IndexSet::from([model::Permission::Commit]), + IndexSet::from([ + model::Permission::Commit, + model::Permission::DefineNamespace, + model::Permission::ImportNamespace + ]), )]), keys: IndexMap::from([(alice_id, alice_pub)]), + namespaces: IndexMap::new(), } ); } @@ -480,9 +602,14 @@ mod tests { algorithm: Some(HashAlgorithm::Sha256), permissions: IndexMap::from([( alice_id.clone(), - IndexSet::from([model::Permission::Commit]), + IndexSet::from([ + model::Permission::Commit, + model::Permission::DefineNamespace, + model::Permission::ImportNamespace, + ]), )]), keys: IndexMap::from([(alice_id, alice_pub)]), + namespaces: IndexMap::new(), }; assert_eq!(validator, expected); @@ -502,6 +629,10 @@ mod tests { key_id: "not-valid".to_string().into(), permissions: vec![model::Permission::Commit], }, + // This entry is valid but should be rolled back since there is an invalid entry + model::OperatorEntry::DefineNamespace { + namespace: "example-namespace".to_string(), + }, ], }; @@ -517,4 +648,134 @@ mod tests { // The validator should not have changed assert_eq!(validator, expected); } + + #[test] + fn test_namespaces() { + let (alice_pub, alice_priv) = generate_p256_pair(); + let alice_id = alice_pub.fingerprint(); + + let timestamp = SystemTime::now(); + let record = model::OperatorRecord { + prev: None, + version: 0, + timestamp, + entries: vec![ + model::OperatorEntry::Init { + hash_algorithm: HashAlgorithm::Sha256, + key: alice_pub.clone(), + }, + model::OperatorEntry::DefineNamespace { + namespace: "my-namespace".to_string(), + }, + model::OperatorEntry::ImportNamespace { + namespace: "imported-namespace".to_string(), + registry: "registry.example.com".to_string(), + }, + ], + }; + + let envelope = + ProtoEnvelope::signed_contents(&alice_priv, record).expect("failed to sign envelope"); + let mut validator = LogState::default(); + validator.validate(&envelope).unwrap(); + + let expected = LogState { + head: Some(Head { + digest: RecordId::operator_record::(&envelope), + timestamp, + }), + algorithm: Some(HashAlgorithm::Sha256), + permissions: IndexMap::from([( + alice_id.clone(), + IndexSet::from([ + model::Permission::Commit, + model::Permission::DefineNamespace, + model::Permission::ImportNamespace, + ]), + )]), + keys: IndexMap::from([(alice_id, alice_pub)]), + namespaces: IndexMap::from([ + ( + "my-namespace".to_string(), + NamespaceDefinition { + namespace: "my-namespace".to_string(), + state: NamespaceState::Defined, + }, + ), + ( + "imported-namespace".to_string(), + NamespaceDefinition { + namespace: "imported-namespace".to_string(), + state: NamespaceState::Imported { + registry: "registry.example.com".to_string(), + }, + }, + ), + ]), + }; + + assert_eq!(validator, expected); + + { + let record = model::OperatorRecord { + prev: Some(RecordId::operator_record::(&envelope)), + version: 0, + timestamp: SystemTime::now(), + entries: vec![ + // This entry is valid + model::OperatorEntry::DefineNamespace { + namespace: "other-namespace".to_string(), + }, + // This entry is not valid + model::OperatorEntry::ImportNamespace { + namespace: "my-namespace".to_string(), + registry: "registry.alternative.com".to_string(), + }, + ], + }; + + let envelope = ProtoEnvelope::signed_contents(&alice_priv, record) + .expect("failed to sign envelope"); + + // This validation should fail and the validator state should remain unchanged + match validator.validate(&envelope).unwrap_err() { + ValidationError::NamespaceAlreadyDefined { .. } => {} + _ => panic!("expected a different error"), + } + + // The validator should not have changed + assert_eq!(validator, expected); + } + + { + let record = model::OperatorRecord { + prev: Some(RecordId::operator_record::(&envelope)), + version: 0, + timestamp: SystemTime::now(), + entries: vec![ + // This entry is valid + model::OperatorEntry::DefineNamespace { + namespace: "other-namespace".to_string(), + }, + // This entry is not valid + model::OperatorEntry::ImportNamespace { + namespace: "my-NAMESPACE".to_string(), + registry: "registry.alternative.com".to_string(), + }, + ], + }; + + let envelope = ProtoEnvelope::signed_contents(&alice_priv, record) + .expect("failed to sign envelope"); + + // This validation should fail and the validator state should remain unchanged + match validator.validate(&envelope).unwrap_err() { + ValidationError::NamespaceConflict { .. } => {} + _ => panic!("expected a different error"), + } + + // The validator should not have changed + assert_eq!(validator, expected); + } + } } diff --git a/crates/protocol/tests/operator-logs/longer.json b/crates/protocol/tests/operator-logs/longer.json index 7867354a..4993fd2b 100644 --- a/crates/protocol/tests/operator-logs/longer.json +++ b/crates/protocol/tests/operator-logs/longer.json @@ -37,4 +37,4 @@ ] } } -] \ No newline at end of file +] diff --git a/crates/protocol/tests/operator-logs/output/longer.json b/crates/protocol/tests/operator-logs/output/longer.json index 5794b4e0..e7403c80 100644 --- a/crates/protocol/tests/operator-logs/output/longer.json +++ b/crates/protocol/tests/operator-logs/output/longer.json @@ -7,7 +7,9 @@ }, "permissions": { "sha256:d6d9b4cd077a829c0275233bf3843c8294e250dfcc82b8ea15745e92982a820d": [ - "commit" + "commit", + "defineNamespace", + "importNamespace" ], "sha256:8ed824821ce75c381458f8097996ab77780550ba7fb9c240e4799bb781941abb": [] }, @@ -16,4 +18,4 @@ "sha256:8ed824821ce75c381458f8097996ab77780550ba7fb9c240e4799bb781941abb": "ecdsa-p256:A5qc6uBi070EBb4GihGzpx6Cm5+oZnv4dWpBhhuZVagu" } } -} \ No newline at end of file +} diff --git a/crates/protocol/tests/operator-logs/output/minimal.json b/crates/protocol/tests/operator-logs/output/minimal.json index 694ba7fa..4cfda048 100644 --- a/crates/protocol/tests/operator-logs/output/minimal.json +++ b/crates/protocol/tests/operator-logs/output/minimal.json @@ -7,7 +7,9 @@ }, "permissions": { "sha256:d6d9b4cd077a829c0275233bf3843c8294e250dfcc82b8ea15745e92982a820d": [ - "commit" + "commit", + "defineNamespace", + "importNamespace" ], "sha256:8225e770ee82a8a974c7732b9ca246d70b1f03dc9dbd25f5801c5cb455dee508": [ "commit" @@ -18,4 +20,4 @@ "sha256:8225e770ee82a8a974c7732b9ca246d70b1f03dc9dbd25f5801c5cb455dee508": "ecdsa-p256:A4yBQt9Im8xnO9Sr9PT7OrOUQP8Olijcq1dPwtdTpigm" } } -} \ No newline at end of file +} diff --git a/crates/server/README.md b/crates/server/README.md index a81a18ab..f08414db 100644 --- a/crates/server/README.md +++ b/crates/server/README.md @@ -14,8 +14,10 @@ will be lost when the server is stopped. To start the server, provide the `WARG_OPERATOR_KEY` environment variable, which is used to sign the entries in the server's operator log: +Also, provide the `WARG_NAMESPACE` environment variable to define the initial namespace for package publishing. + ```console -$ WARG_OPERATOR_KEY="ecdsa-p256:I+UlDo0HxyBBFeelhPPWmD+LnklOpqZDkrFP5VduASk=" cargo run -- --content-dir content +$ WARG_NAMESPACE=example WARG_OPERATOR_KEY="ecdsa-p256:I+UlDo0HxyBBFeelhPPWmD+LnklOpqZDkrFP5VduASk=" cargo run -- --content-dir content 2023-04-18T23:48:52.149746Z INFO warg_server::services::core: initializing core service 2023-04-18T23:48:52.170199Z INFO warg_server::services::core: core service is running 2023-04-18T23:48:52.170233Z INFO warg_server: listening on 127.0.0.1:8090 @@ -55,7 +57,7 @@ To start the registry server, provide both the `WARG_OPERATOR_KEY` and `WARG_DATABASE_URL` environment variables: ```console -WARG_DATABASE_URL=postgres://postgres:password@localhost/registry WARG_OPERATOR_KEY="ecdsa-p256:I+UlDo0HxyBBFeelhPPWmD+LnklOpqZDkrFP5VduASk=" cargo run -p warg-server --features postgres -- --content-dir content --data-store postgres +WARG_NAMESPACE=example WARG_DATABASE_URL=postgres://postgres:password@localhost/registry WARG_OPERATOR_KEY="ecdsa-p256:I+UlDo0HxyBBFeelhPPWmD+LnklOpqZDkrFP5VduASk=" cargo run -p warg-server --features postgres -- --content-dir content --data-store postgres ``` The `--data-store postgres` flag starts the server with PostgreSQL data storage. diff --git a/crates/server/openapi.yaml b/crates/server/openapi.yaml index 9e521e85..6620fb6f 100644 --- a/crates/server/openapi.yaml +++ b/crates/server/openapi.yaml @@ -20,19 +20,89 @@ x-42c-skipIssues: tags: - name: fetch - description: API for fetching checkpoints and logs from the registry. + description: API for fetching checkpoints, logs and package names from the registry. - name: package description: API for managing package logs in the registry. - name: content description: API for content sources in the registry. - name: proof description: API for proving the integrity of the registry. + - name: monitor + description: API for verifying registry checkpoints. + - name: ledger + description: API for fetching the ledger. servers: - url: http://localhost:8090/v1 description: Local development server paths: + /fetch/names: + post: + summary: Fetch package names + operationId: fetchNames + security: [] + tags: + - fetch + description: | + Fetch the package names for registry log IDs. + parameters: + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/FetchPackageNamesRequest" + responses: + "200": + description: The package names were successfully fetched. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + $ref: "#/components/schemas/FetchPackageNamesResponse" + "404": + description: A requested entity was not found. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + type: object + additionalProperties: false + required: + - status + - type + - id + properties: + status: + type: integer + description: The HTTP status code for the error. + example: 404 + type: + type: string + description: The type of entity that was not found. + enum: [log] + example: log + id: + type: string + description: The identifier of the entity that was not found. + example: sha256:b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c + default: + description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + $ref: "#/components/schemas/Error" /fetch/logs: post: summary: Fetch registry logs @@ -42,6 +112,10 @@ paths: - fetch description: | Fetch the operator and packages logs from the registry. + parameters: + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" requestBody: required: true content: @@ -51,12 +125,18 @@ paths: responses: "200": description: The logs were successfully fetched. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: $ref: "#/components/schemas/FetchLogsResponse" "404": description: A requested entity was not found. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -65,6 +145,9 @@ paths: - "$ref": "#/components/schemas/FetchLogsLogLengthNotFoundError" default: description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -77,15 +160,25 @@ paths: tags: - fetch description: Fetch the latest checkpoint from the registry. + parameters: + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" responses: "200": description: The checkpoint was successfully fetched. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: $ref: "#/components/schemas/SignedCheckpoint" default: description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -110,6 +203,9 @@ paths: required: true schema: "$ref": "#/components/schemas/AnyHash" + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" requestBody: required: true content: @@ -119,6 +215,9 @@ paths: responses: "202": description: The package record was accepted. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -126,6 +225,75 @@ paths: "403": description: | The key used to sign the record was not authorized to publish a record to the log. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "404": + description: A requested entity was not found. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + type: object + additionalProperties: false + required: + - status + - type + - id + properties: + status: + type: integer + description: The HTTP status code for the error. + example: 404 + type: + type: string + description: The type of entity that was not found. + enum: [namespace] + example: namespace + id: + type: string + description: | + The identifier of the entity that was not found. + "409": + description: The requested package publish conflicts. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + type: object + additionalProperties: false + required: + - status + - type + - id + properties: + status: + type: integer + description: The HTTP status code for the error. + example: 409 + type: + type: string + description: The type of entity that was not found. + enum: [name, namespace, namespaceImport] + example: namespace + id: + type: string + description: | + The identifier of the entity that was not found. + "422": + description: | + The package was rejected by the registry. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -134,12 +302,18 @@ paths: description: | The server does not support publishing package records with explicitly specified content source locations. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: $ref: "#/components/schemas/Error" default: description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -172,15 +346,24 @@ paths: required: true schema: "$ref": "#/components/schemas/AnyHash" + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" responses: "200": description: The package record. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: "$ref": "#/components/schemas/PackageRecord" "404": description: A requested entity was not found. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -206,6 +389,9 @@ paths: The identifier of the entity that was not found. default: description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -226,15 +412,24 @@ paths: required: true schema: "$ref": "#/components/schemas/AnyHash" + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" responses: "200": description: The content digest sources. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: "$ref": "#/components/schemas/ContentSourcesResponse" "404": description: A requested entity was not found. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -260,6 +455,9 @@ paths: The identifier of the entity that was not found. default: description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -273,6 +471,10 @@ paths: - proof description: | Proves the consistency of the registry between two specified checkpoints. + parameters: + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" requestBody: content: application/json: @@ -281,12 +483,18 @@ paths: responses: "200": description: The consistency proof was generated successfully. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: $ref: "#/components/schemas/ProveConsistencyResponse" "404": description: A requested entity was not found. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -311,6 +519,9 @@ paths: description: The identifier of the entity that was not found. "422": description: The proof bundle could not be generated. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -322,6 +533,9 @@ paths: failure: "#/components/schemas/BundleFailureError" default: description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -335,6 +549,10 @@ paths: - proof description: | Proves that the given log leafs are present in the given registry checkpoint. + parameters: + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" requestBody: content: application/json: @@ -343,12 +561,18 @@ paths: responses: "200": description: The inclusion proof was generated successfully. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: $ref: "#/components/schemas/ProveInclusionResponse" "404": description: A requested entity was not found. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -373,6 +597,9 @@ paths: description: The identifier of the entity that was not found. "422": description: The proof bundle could not be generated. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: @@ -388,11 +615,89 @@ paths: failure: "#/components/schemas/BundleFailureError" default: description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" content: application/json: schema: $ref: "#/components/schemas/Error" + /verify/checkpoint: + post: + summary: Verify registry checkpoint + operationId: verifyCheckpoint + security: [] + tags: + - monitor + description: Verify checkpoint from the registry. The client must interpret the response body to determine the verification status. + parameters: + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SignedCheckpoint" + responses: + "200": + description: The checkpoint verification request was processed. The client must interpret the response body to determine the verification status. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + $ref: "#/components/schemas/CheckpointVerificationResponse" + default: + description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + /ledger: + get: + summary: Fetch ledger sources + operationId: getLedgerSources + security: [] + tags: + - ledger + description: Fetch the registry ledger download URL sources. + parameters: + - name: Warg-Registry + in: header + $ref: "#/components/headers/WargRegistryHeader" + responses: + "200": + description: The ledger sources was successfully fetched. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + $ref: "#/components/schemas/LedgerSourcesResponse" + default: + description: An error occurred when processing the request. + headers: + Warg-Registry: + $ref: "#/components/headers/WargRegistryHeader" + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + components: + headers: + WargRegistryHeader: + description: If present and supported, this registry responds on behalf of the other registry specified in this header value. + required: false + schema: + type: string + example: registry.example.com schemas: Error: type: object @@ -415,6 +720,37 @@ components: description: Represents a supported hash. example: sha256:b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c pattern: ^[a-z0-9-]+:[a-f0-9]+$ + FetchPackageNamesRequest: + type: object + description: A request to fetch package names from the registry. + additionalProperties: false + required: + - packages + properties: + packages: + type: array + description: The log ID for each requested package. + example: ["sha256:7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded9773"] + minimum: 1 + items: + $ref: "#/components/schemas/AnyHash" + description: The log ID for each package. + FetchPackageNamesResponse: + type: object + description: A response containing the requested package names. + additionalProperties: false + properties: + packages: + type: object + description: The map of log ID to package name. + patternProperties: + "^[a-z0-9-]+:[a-f0-9]+$": + type: string + nullable: true + description: The package name for each package. If `null`, the package name is not able to be provided for the log ID. + example: + "sha256:7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded9773": "example-namespace:package-name" + "sha256:b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c": null FetchLogsRequest: type: object description: A request to fetch logs from the registry. @@ -1005,3 +1341,77 @@ components: type: integer description: The log length that was not found. example: 1001 + LedgerSourcesResponse: + type: object + description: A response containing the registry ledger sources. + additionalProperties: false + properties: + hashAlgorithm: + type: string + description: The type of hash algorithm used for log and record IDs. + enum: [sha256] + example: sha256 + sources: + type: array + description: The ledger sources. + items: + type: object + description: Ledger source HTTP get URL. + additionalProperties: false + required: + - firstRegistryIndex + - lastRegistryIndex + - url + - contentType + properties: + firstRegistryIndex: + type: integer + description: The first registry index included in the source. + example: 0 + lastRegistryIndex: + type: integer + description: The last registry index included in the source. + example: 999 + url: + type: string + description: The HTTP GET URL to fetch the source. + example: http://registry.example.com/ledger/0 + contentType: + type: string + description: The content type of source. + enum: ["application/vnd.warg.ledger.packed"] + example: "application/vnd.warg.ledger.packed" + acceptRanges: + type: boolean + description: Flag indicating if the server accepts byte ranges with `Range` header. + CheckpointVerificationResponse: + type: object + additionalProperties: false + required: + - checkpoint + - signature + properties: + checkpoint: + type: string + description: | + The verification of the checkpoint's `logLength`, `logRoot` and `mapRoot`: + + * `unverified`: checkpoint may be valid or invalid; if `retryAfter` is provided, the client should retry the request; + * `verified`: `logLength`, `logRoot` and `mapRoot` are verified; + * `invalid`: checkpoint with the specified `logLength` was either not produced or does not match the correct `logRoot` and `mapRoot`; + enum: [unverified, verified, invalid] + example: verified + signature: + type: string + description: | + The verification of the checkpoint's `keyId` and `signature`: + + * `unverified`: checkpoint's `signature` may be valid or invalid; if `retryAfter` is provided, the client should retry the request; + * `verified`: checkpoint's `signature` is verified; + * `invalid`: `keyId` is not known or does not have authorization (could have been revoke or never granted) to sign checkpoints or the `signature` itself is invalid; + enum: [unverified, verified, invalid] + example: unverified + retryAfter: + type: integer + description: If either `checkpoint` or `signature` is `unverified` status, then the server may instruct the client to retry the request after the specified number of seconds. + example: 60 diff --git a/crates/server/src/api/v1/content.rs b/crates/server/src/api/v1/content.rs index 607b1147..774b1ad4 100644 --- a/crates/server/src/api/v1/content.rs +++ b/crates/server/src/api/v1/content.rs @@ -1,4 +1,4 @@ -use super::{Json, Path}; +use super::{Json, Path, RegistryHeader}; use axum::{ debug_handler, extract::State, http::StatusCode, response::IntoResponse, routing::get, Router, }; @@ -61,6 +61,7 @@ impl IntoResponse for ContentApiError { async fn get_content( State(config): State, Path(digest): Path, + RegistryHeader(_registry_header): RegistryHeader, ) -> Result, ContentApiError> { if !config.content_present(&digest) { return Err(ContentApiError(ContentError::ContentDigestNotFound(digest))); diff --git a/crates/server/src/api/v1/fetch.rs b/crates/server/src/api/v1/fetch.rs index acaf563b..d961998e 100644 --- a/crates/server/src/api/v1/fetch.rs +++ b/crates/server/src/api/v1/fetch.rs @@ -1,4 +1,4 @@ -use super::Json; +use super::{Json, RegistryHeader}; use crate::datastore::DataStoreError; use crate::services::CoreService; use axum::http::StatusCode; @@ -10,7 +10,10 @@ use axum::{ Router, }; use std::collections::HashMap; -use warg_api::v1::fetch::{FetchError, FetchLogsRequest, FetchLogsResponse, PublishedRecord}; +use warg_api::v1::fetch::{ + FetchError, FetchLogsRequest, FetchLogsResponse, FetchPackageNamesRequest, + FetchPackageNamesResponse, PublishedRecord, +}; use warg_crypto::hash::{AnyHash, Sha256}; use warg_protocol::registry::{LogId, RecordId, TimestampedCheckpoint}; use warg_protocol::SerdeEnvelope; @@ -18,6 +21,8 @@ use warg_protocol::SerdeEnvelope; const DEFAULT_RECORDS_LIMIT: u16 = 100; const MAX_RECORDS_LIMIT: u16 = 1000; +const MAX_PACKAGE_NAMES_LIMIT: usize = 1000; + #[derive(Clone)] pub struct Config { core_service: CoreService, @@ -30,8 +35,9 @@ impl Config { pub fn into_router(self) -> Router { Router::new() - .route("/logs", post(fetch_logs)) .route("/checkpoint", get(fetch_checkpoint)) + .route("/logs", post(fetch_logs)) + .route("/names", post(fetch_package_names)) .with_state(self) } } @@ -78,6 +84,7 @@ impl IntoResponse for FetchApiError { #[debug_handler] async fn fetch_logs( State(config): State, + RegistryHeader(_registry_header): RegistryHeader, Json(body): Json>, ) -> Result, FetchApiError> { let limit = body.limit.unwrap_or(DEFAULT_RECORDS_LIMIT); @@ -159,8 +166,30 @@ async fn fetch_logs( #[debug_handler] async fn fetch_checkpoint( State(config): State, + RegistryHeader(_registry_header): RegistryHeader, ) -> Result>, FetchApiError> { Ok(Json( config.core_service.store().get_latest_checkpoint().await?, )) } + +#[debug_handler] +async fn fetch_package_names( + State(config): State, + RegistryHeader(_registry_header): RegistryHeader, + Json(body): Json>, +) -> Result, FetchApiError> { + let log_ids = if body.packages.len() > MAX_PACKAGE_NAMES_LIMIT { + body.packages.get(..MAX_PACKAGE_NAMES_LIMIT).unwrap() + } else { + &body.packages + }; + + let packages = config + .core_service + .store() + .get_package_names(log_ids) + .await?; + + Ok(Json(FetchPackageNamesResponse { packages })) +} diff --git a/crates/server/src/api/v1/ledger.rs b/crates/server/src/api/v1/ledger.rs new file mode 100644 index 00000000..a60e6d3a --- /dev/null +++ b/crates/server/src/api/v1/ledger.rs @@ -0,0 +1,119 @@ +use super::{Json, Path, RegistryHeader}; +use crate::datastore::DataStoreError; +use crate::services::CoreService; +use axum::http::StatusCode; +use axum::{ + debug_handler, extract::State, response::IntoResponse, response::Response, routing::get, Router, +}; +use warg_api::v1::ledger::{ + LedgerError, LedgerSource, LedgerSourceContentType, LedgerSourcesResponse, +}; +use warg_crypto::hash::HashAlgorithm; +use warg_protocol::registry::RegistryIndex; + +const MAX_LEDGER_RECORDS_LIMIT: usize = 1000; + +#[derive(Clone)] +pub struct Config { + core_service: CoreService, +} + +impl Config { + pub fn new(core_service: CoreService) -> Self { + Self { core_service } + } + + pub fn into_router(self) -> Router { + Router::new() + .route("/", get(get_ledger_sources)) + .route("/records/:start", get(get_ledger_records)) + .with_state(self) + } +} + +struct LedgerApiError(LedgerError); + +impl From for LedgerApiError { + fn from(e: DataStoreError) -> Self { + tracing::error!("unexpected data store error: {e}"); + + Self(LedgerError::Message { + status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + message: "an error occurred while processing the request".into(), + }) + } +} + +impl IntoResponse for LedgerApiError { + fn into_response(self) -> axum::response::Response { + (StatusCode::from_u16(self.0.status()).unwrap(), Json(self.0)).into_response() + } +} + +#[debug_handler] +async fn get_ledger_sources( + State(config): State, + RegistryHeader(_registry_header): RegistryHeader, +) -> Result, LedgerApiError> { + let log_length = config + .core_service + .store() + .get_latest_checkpoint() + .await? + .into_contents() + .checkpoint + .log_length; + + let sources = (0..log_length) + .step_by(MAX_LEDGER_RECORDS_LIMIT) + .map(|start_index| { + let end_index = if start_index + MAX_LEDGER_RECORDS_LIMIT >= log_length { + log_length - 1 + } else { + start_index + MAX_LEDGER_RECORDS_LIMIT - 1 + }; + + LedgerSource { + first_registry_index: start_index, + last_registry_index: end_index, + url: format!("v1/ledger/records/{start_index}"), + accept_ranges: false, + content_type: LedgerSourceContentType::Packed, + } + }) + .collect::>(); + + Ok(Json(LedgerSourcesResponse { + hash_algorithm: HashAlgorithm::Sha256, + sources, + })) +} + +#[debug_handler] +async fn get_ledger_records( + State(config): State, + Path(start): Path, + RegistryHeader(_registry_header): RegistryHeader, +) -> Result { + let log_leafs = config + .core_service + .store() + .get_log_leafs_starting_with_registry_index(start, MAX_LEDGER_RECORDS_LIMIT) + .await?; + + let mut body: Vec = Vec::with_capacity(log_leafs.len() * 64); + + for (_, leaf) in log_leafs.into_iter() { + body.extend_from_slice(leaf.log_id.as_ref()); + body.extend_from_slice(leaf.record_id.as_ref()); + } + + Ok(Response::builder() + .status(200) + .header( + axum::http::header::CONTENT_TYPE, + LedgerSourceContentType::Packed.as_str(), + ) + .body(axum::body::boxed(axum::body::Full::from(body))) + .unwrap()) +} diff --git a/crates/server/src/api/v1/mod.rs b/crates/server/src/api/v1/mod.rs index 4ee593ed..28d5eb98 100644 --- a/crates/server/src/api/v1/mod.rs +++ b/crates/server/src/api/v1/mod.rs @@ -4,20 +4,24 @@ use crate::{ }; use anyhow::Result; use axum::{ + async_trait, extract::{ rejection::{JsonRejection, PathRejection}, FromRequest, FromRequestParts, }, - http::StatusCode, + http::{request::Parts, StatusCode}, response::IntoResponse, Router, }; use serde::{Serialize, Serializer}; -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, str::FromStr, sync::Arc}; use url::Url; +use warg_api::v1::REGISTRY_HEADER_NAME; pub mod content; pub mod fetch; +pub mod ledger; +pub mod monitor; pub mod package; pub mod proof; @@ -90,6 +94,36 @@ pub async fn not_found() -> impl IntoResponse { } } +/// An extractor for the `Warg-Registry` header. Currently, this server implementation +/// does not support this header and returns a `501` error. +pub struct RegistryHeader(Option); + +#[async_trait] +impl FromRequestParts for RegistryHeader +where + S: Send + Sync, +{ + type Rejection = (StatusCode, &'static str); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + if parts.headers.contains_key(REGISTRY_HEADER_NAME) { + Err(( + StatusCode::NOT_IMPLEMENTED, + "`Warg-Registry` header is not supported", + )) + } else { + Ok(RegistryHeader(None)) + } + } +} + +impl FromStr for RegistryHeader { + type Err = std::convert::Infallible; + fn from_str(src: &str) -> Result { + Ok(RegistryHeader(Some(src.to_string()))) + } +} + pub fn create_router( content_base_url: Url, core: CoreService, @@ -106,13 +140,17 @@ pub fn create_router( content_policy, record_policy, ); - let fetch_config = fetch::Config::new(core); + let fetch_config = fetch::Config::new(core.clone()); let content_config = content::Config::new(content_base_url, files_dir); + let monitor_config = monitor::Config::new(core.clone()); + let ledger_config = ledger::Config::new(core); Router::new() - .nest("/package", package_config.into_router()) .nest("/content", content_config.into_router()) .nest("/fetch", fetch_config.into_router()) + .nest("/ledger", ledger_config.into_router()) + .nest("/package", package_config.into_router()) .nest("/proof", proof_config.into_router()) + .nest("/verify", monitor_config.into_router()) .fallback(not_found) } diff --git a/crates/server/src/api/v1/monitor.rs b/crates/server/src/api/v1/monitor.rs new file mode 100644 index 00000000..85ba201c --- /dev/null +++ b/crates/server/src/api/v1/monitor.rs @@ -0,0 +1,146 @@ +use super::{Json, RegistryHeader}; +use crate::datastore::DataStoreError; +use crate::services::CoreService; +use axum::http::StatusCode; +use axum::{debug_handler, extract::State, response::IntoResponse, routing::post, Router}; +use warg_api::v1::monitor::{CheckpointVerificationResponse, MonitorError, VerificationState}; +use warg_crypto::hash::Sha256; +use warg_protocol::registry::{LogId, TimestampedCheckpoint}; +use warg_protocol::SerdeEnvelope; + +#[derive(Clone)] +pub struct Config { + core_service: CoreService, +} + +impl Config { + pub fn new(core_service: CoreService) -> Self { + Self { core_service } + } + + pub fn into_router(self) -> Router { + Router::new() + .route("/checkpoint", post(verify_checkpoint)) + .with_state(self) + } +} + +struct MonitorApiError(MonitorError); + +impl From for MonitorApiError { + fn from(e: DataStoreError) -> Self { + tracing::error!("unexpected data store error: {e}"); + + Self(MonitorError::Message { + status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + message: "an error occurred while processing the request".into(), + }) + } +} + +impl IntoResponse for MonitorApiError { + fn into_response(self) -> axum::response::Response { + (StatusCode::from_u16(self.0.status()).unwrap(), Json(self.0)).into_response() + } +} + +/// Verifies a checkpoint and its signature. +/// +/// Note: Other implementations may choose to perform validation differently +/// and to respond with `Unverified` in cases where the required information +/// is not available or would be too expensive to compute. +/// +/// Checkpoint verification is `Verified` when +/// a checkpoint with the provided `log_length` is found in the store +/// and both the log and map roots match the stored checkpoint. +/// +/// Checkpoint verification is `Invalid` otherwise. +/// +/// Signature verification is `Verified` when either +/// A) It matches the signature on the stored checkpoint, or +/// B) It is a valid and authorized signature by a key in the operator log. +/// +/// Signature verification is `Invalid` otherwise. +#[debug_handler] +async fn verify_checkpoint( + State(config): State, + RegistryHeader(_registry_header): RegistryHeader, + Json(body): Json>, +) -> Result, MonitorApiError> { + // Do a first pass checking the provided checkpoint against the data store + let (checkpoint_verification, signature_verification) = + try_verify_exact_match(&config.core_service, &body).await; + + // If the signature is `Unverified`, check signature against keys in operator log: + let signature_verification = if signature_verification == VerificationState::Unverified { + match config + .core_service + .store() + .verify_timestamped_checkpoint_signature(&LogId::operator_log::(), &body) + .await + { + Ok(_) => VerificationState::Verified, + Err(error) => match error { + DataStoreError::UnknownKey(_) + | DataStoreError::SignatureVerificationFailed(_) + | DataStoreError::KeyUnauthorized(_) => VerificationState::Invalid, + _ => return Err(MonitorApiError::from(error)), + }, + } + } else { + signature_verification + }; + + Ok(Json(CheckpointVerificationResponse { + checkpoint: checkpoint_verification, + signature: signature_verification, + retry_after: None, + })) +} + +/// Attempt to verify checkpoint by looking for an exact match in the store. +/// Returns (checkpoint: Invalid, signature: Unverified) if one isn't found. +async fn try_verify_exact_match( + core_service: &CoreService, + checkpoint_envelope: &SerdeEnvelope, +) -> (VerificationState, VerificationState) { + let checkpoint = &checkpoint_envelope.as_ref().checkpoint; + + // Look for a stored checkpoint with the same log_length as was specified + let found = core_service + .store() + .get_checkpoint(checkpoint.log_length) + .await; + + if let Ok(found_checkpoint_envelope) = found { + let found_checkpoint = &found_checkpoint_envelope.as_ref().checkpoint; + // Check log root and map root + let log_matches = found_checkpoint.log_root == checkpoint.log_root; + let map_matches = found_checkpoint.map_root == checkpoint.map_root; + + // A checkpoint is verified if the exact checkpoint was recorded in the store. + // Otherwise it is considered invalid by the reference implementation. + let checkpoint_verification = if log_matches && map_matches { + VerificationState::Verified + } else { + VerificationState::Invalid + }; + + // Check for exact match on signature and key ID + let signature_matches = + found_checkpoint_envelope.signature() == checkpoint_envelope.signature(); + let key_id_matches = found_checkpoint_envelope.key_id() == checkpoint_envelope.key_id(); + + // A checkpoint is verified if the signature and key_id match the found checkpoint. + // Otherwise it is consdered unverified by this function, but can be checked against known keys afterwards. + let signature_verification = if signature_matches && key_id_matches { + VerificationState::Verified + } else { + VerificationState::Unverified + }; + + (checkpoint_verification, signature_verification) + } else { + (VerificationState::Invalid, VerificationState::Unverified) + } +} diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index 996ca53b..b997de6e 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -1,4 +1,4 @@ -use super::{Json, Path}; +use super::{Json, Path, RegistryHeader}; use crate::{ datastore::{DataStoreError, RecordStatus}, policy::{ @@ -139,9 +139,17 @@ impl From for PackageApiError { } DataStoreError::LogNotFound(id) => PackageError::LogNotFound(id), DataStoreError::RecordNotFound(id) => PackageError::RecordNotFound(id), - DataStoreError::UnknownKey(_) | DataStoreError::SignatureVerificationFailed => { + DataStoreError::UnknownKey(_) | DataStoreError::SignatureVerificationFailed(_) => { PackageError::Unauthorized(e.to_string()) } + DataStoreError::PackageNamespaceNotDefined(id) => PackageError::NamespaceNotDefined(id), + DataStoreError::PackageNamespaceConflict { existing, .. } => { + PackageError::NamespaceConflict(existing) + } + DataStoreError::PackageNamespaceImported(id) => PackageError::NamespaceImported(id), + DataStoreError::PackageNameConflict { existing, .. } => { + PackageError::PackageNameConflict(existing) + } // Other errors are internal server errors e => { tracing::error!("unexpected data store error: {e}"); @@ -181,6 +189,7 @@ impl IntoResponse for PackageApiError { async fn publish_record( State(config): State, Path(log_id): Path, + RegistryHeader(_registry_header): RegistryHeader, Json(body): Json>, ) -> Result { let expected_log_id = LogId::package_log::(&body.package_id); @@ -204,6 +213,15 @@ async fn publish_record( )); } + // Verify the package name is unique in a case insensitive way and + // the namespace is defined in the operator log and not imported + // from another registry. + config + .core_service + .store() + .verify_can_publish_package(&LogId::operator_log::(), &body.package_id) + .await?; + // Preemptively perform the policy check on the record before storing it // This is performed here so that we never store an unauthorized record if let Some(policy) = &config.record_policy { @@ -257,6 +275,7 @@ async fn publish_record( async fn get_record( State(config): State, Path((log_id, record_id)): Path<(LogId, RecordId)>, + RegistryHeader(_registry_header): RegistryHeader, ) -> Result, PackageApiError> { let record = config .core_service @@ -296,6 +315,7 @@ async fn get_record( async fn upload_content( State(config): State, Path((log_id, record_id, digest)): Path<(LogId, RecordId, AnyHash)>, + RegistryHeader(_registry_header): RegistryHeader, stream: BodyStream, ) -> Result { match config diff --git a/crates/server/src/api/v1/proof.rs b/crates/server/src/api/v1/proof.rs index bc2086d2..8759e412 100644 --- a/crates/server/src/api/v1/proof.rs +++ b/crates/server/src/api/v1/proof.rs @@ -1,4 +1,4 @@ -use super::Json; +use super::{Json, RegistryHeader}; use crate::services::{CoreService, CoreServiceError}; use axum::{ debug_handler, extract::State, http::StatusCode, response::IntoResponse, routing::post, Router, @@ -60,6 +60,7 @@ impl IntoResponse for ProofApiError { #[debug_handler] async fn prove_consistency( State(config): State, + RegistryHeader(_registry_header): RegistryHeader, Json(body): Json, ) -> Result, ProofApiError> { let bundle = config @@ -75,6 +76,7 @@ async fn prove_consistency( #[debug_handler] async fn prove_inclusion( State(config): State, + RegistryHeader(_registry_header): RegistryHeader, Json(body): Json, ) -> Result, ProofApiError> { let log_length = body.log_length as RegistryLen; diff --git a/crates/server/src/bin/warg-server.rs b/crates/server/src/bin/warg-server.rs index a073aea5..9b86a9b8 100644 --- a/crates/server/src/bin/warg-server.rs +++ b/crates/server/src/bin/warg-server.rs @@ -6,6 +6,7 @@ use tokio::signal; use tracing_subscriber::filter::LevelFilter; use url::Url; use warg_crypto::signing::PrivateKey; +use warg_protocol::operator; use warg_server::{args::get_opt_secret, policy::record::AuthorizedKeyPolicy, Config, Server}; #[derive(ValueEnum, Debug, Clone, Copy, PartialEq, Eq, Default)] @@ -69,6 +70,10 @@ struct Args { /// The path to the authorized keys record policy file. #[arg(long, env = "WARG_AUTHORIZED_KEYS_FILE")] authorized_keys_file: Option, + + /// The initial namespace defined for this registry. + #[arg(long, env = "WARG_NAMESPACE")] + namespace: Option, } impl Args { @@ -94,8 +99,12 @@ async fn main() -> Result<()> { get_opt_secret("operator-key", args.operator_key_file, args.operator_key)?; let operator_key = PrivateKey::decode(operator_key_str).context("failed to parse operator key")?; + let namespaces = args + .namespace + .as_ref() + .map(|namespace| vec![(namespace.to_lowercase(), operator::NamespaceState::Defined)]); - let mut config = Config::new(operator_key, args.content_dir) + let mut config = Config::new(operator_key, namespaces, args.content_dir) .with_addr(args.listen) .with_shutdown(shutdown_signal()); diff --git a/crates/server/src/datastore/memory.rs b/crates/server/src/datastore/memory.rs index 22aa8adc..c75afed8 100644 --- a/crates/server/src/datastore/memory.rs +++ b/crates/server/src/datastore/memory.rs @@ -2,12 +2,12 @@ use super::{DataStore, DataStoreError}; use futures::Stream; use indexmap::IndexMap; use std::{ - collections::{BTreeSet, HashMap, HashSet}, + collections::{HashMap, HashSet}, pin::Pin, sync::Arc, }; use tokio::sync::RwLock; -use warg_crypto::{hash::AnyHash, Signable}; +use warg_crypto::{hash::AnyHash, Encode, Signable}; use warg_protocol::{ operator, package::{self, PackageEntry}, @@ -77,7 +77,8 @@ enum RecordStatus { struct State { operators: HashMap>, packages: HashMap>, - package_ids: BTreeSet, + package_ids: HashMap>, + package_ids_lowercase: HashMap, checkpoints: IndexMap>, records: HashMap>, log_leafs: HashMap, @@ -121,6 +122,30 @@ impl DataStore for MemoryDataStore { Ok(Box::pin(futures::stream::empty())) } + async fn get_log_leafs_starting_with_registry_index( + &self, + starting_index: RegistryIndex, + limit: usize, + ) -> Result, DataStoreError> { + let state = self.0.read().await; + + let limit = if limit > state.log_leafs.len() - starting_index { + state.log_leafs.len() - starting_index + } else { + limit + }; + + let mut leafs = Vec::with_capacity(limit); + for entry in starting_index..starting_index + limit { + match state.log_leafs.get(&entry) { + Some(log_leaf) => leafs.push((entry, log_leaf.clone())), + None => break, + } + } + + Ok(leafs) + } + async fn get_log_leafs_with_registry_index( &self, entries: &[RegistryIndex], @@ -138,6 +163,24 @@ impl DataStore for MemoryDataStore { Ok(leafs) } + async fn get_package_names( + &self, + log_ids: &[LogId], + ) -> Result>, DataStoreError> { + let state = self.0.read().await; + + log_ids + .iter() + .map(|log_id| { + if let Some(opt_package_name) = state.package_ids.get(log_id) { + Ok((log_id.clone(), opt_package_name.clone())) + } else { + Err(DataStoreError::LogNotFound(log_id.clone())) + } + }) + .collect::>, _>>() + } + async fn store_operator_record( &self, log_id: &LogId, @@ -269,7 +312,12 @@ impl DataStore for MemoryDataStore { missing: missing.iter().map(|&d| d.clone()).collect(), }), ); - state.package_ids.insert(package_id.clone()); + state + .package_ids + .insert(log_id.clone(), Some(package_id.clone())); + state + .package_ids_lowercase + .insert(package_id.as_ref().to_ascii_lowercase(), package_id.clone()); assert!(prev.is_none()); Ok(()) @@ -449,6 +497,18 @@ impl DataStore for MemoryDataStore { Ok(checkpoint.clone()) } + async fn get_checkpoint( + &self, + log_length: RegistryLen, + ) -> Result, DataStoreError> { + let state = self.0.read().await; + let checkpoint = state + .checkpoints + .get(&log_length) + .ok_or_else(|| DataStoreError::CheckpointNotFound(log_length))?; + Ok(checkpoint.clone()) + } + async fn get_operator_records( &self, log_id: &LogId, @@ -655,12 +715,100 @@ impl DataStore for MemoryDataStore { .ok_or_else(|| DataStoreError::UnknownKey(record.key_id().clone()))?; package::PackageRecord::verify(key, record.content_bytes(), record.signature()) - .map_err(|_| DataStoreError::SignatureVerificationFailed) + .map_err(|_| DataStoreError::SignatureVerificationFailed(record.signature().clone())) + } + + async fn verify_can_publish_package( + &self, + operator_log_id: &LogId, + package_id: &PackageId, + ) -> Result<(), DataStoreError> { + let state = self.0.read().await; + + // verify namespace is defined and not imported + match state + .operators + .get(operator_log_id) + .ok_or_else(|| DataStoreError::LogNotFound(operator_log_id.clone()))? + .validator + .namespace_state(package_id.namespace()) + { + Ok(Some(state)) => match state { + operator::NamespaceState::Defined => {} + operator::NamespaceState::Imported { .. } => { + return Err(DataStoreError::PackageNamespaceImported( + package_id.namespace().to_string(), + )) + } + }, + Ok(None) => { + return Err(DataStoreError::PackageNamespaceNotDefined( + package_id.namespace().to_string(), + )) + } + Err(existing_namespace) => { + return Err(DataStoreError::PackageNamespaceConflict { + namespace: package_id.namespace().to_string(), + existing: existing_namespace.to_string(), + }) + } + } + + // verify package name is unique in a case insensitive way + match state + .package_ids_lowercase + .get(&package_id.as_ref().to_ascii_lowercase()) + { + Some(existing) if existing.as_ref() != package_id.as_ref() => { + Err(DataStoreError::PackageNameConflict { + name: package_id.clone(), + existing: existing.clone(), + }) + } + _ => Ok(()), + } + } + + async fn verify_timestamped_checkpoint_signature( + &self, + operator_log_id: &LogId, + ts_checkpoint: &SerdeEnvelope, + ) -> Result<(), DataStoreError> { + let state = self.0.read().await; + + let validator = &state + .operators + .get(operator_log_id) + .ok_or_else(|| DataStoreError::LogNotFound(operator_log_id.clone()))? + .validator; + + TimestampedCheckpoint::verify( + validator + .public_key(ts_checkpoint.key_id()) + .ok_or(DataStoreError::UnknownKey(ts_checkpoint.key_id().clone()))?, + &ts_checkpoint.as_ref().encode(), + ts_checkpoint.signature(), + ) + .or(Err(DataStoreError::SignatureVerificationFailed( + ts_checkpoint.signature().clone(), + )))?; + + if !validator.key_has_permission_to_sign_checkpoints(ts_checkpoint.key_id()) { + return Err(DataStoreError::KeyUnauthorized( + ts_checkpoint.key_id().clone(), + )); + } + + Ok(()) } #[cfg(feature = "debug")] async fn debug_list_package_ids(&self) -> anyhow::Result> { let state = self.0.read().await; - Ok(state.package_ids.iter().cloned().collect()) + Ok(state + .package_ids + .values() + .filter_map(|opt_package_id| opt_package_id.clone()) + .collect()) } } diff --git a/crates/server/src/datastore/mod.rs b/crates/server/src/datastore/mod.rs index 7b069e33..a5310d96 100644 --- a/crates/server/src/datastore/mod.rs +++ b/crates/server/src/datastore/mod.rs @@ -1,7 +1,13 @@ use futures::Stream; -use std::{collections::HashSet, pin::Pin}; +use std::{ + collections::{HashMap, HashSet}, + pin::Pin, +}; use thiserror::Error; -use warg_crypto::{hash::AnyHash, signing::KeyID}; +use warg_crypto::{ + hash::AnyHash, + signing::{KeyID, Signature}, +}; use warg_protocol::{ operator, package, registry::{ @@ -50,11 +56,31 @@ pub enum DataStoreError { #[error("the package record was invalid: {0}")] PackageValidationFailed(#[from] package::ValidationError), + #[error("the package `{name}` conflicts with package `{existing}`; package names must be unique in a case insensitive way")] + PackageNameConflict { + name: PackageId, + existing: PackageId, + }, + + #[error("the package namespace `{namespace}` conflicts with existing namespace `{existing}`; package namespaces must be unique in a case insensitive way")] + PackageNamespaceConflict { namespace: String, existing: String }, + + #[error("the package namespace `{0}` is not defined")] + PackageNamespaceNotDefined(String), + + #[error( + "the package namespace `{0}` is imported from another registry and cannot accept publishes" + )] + PackageNamespaceImported(String), + + #[error("key id `{0}` does not have permission")] + KeyUnauthorized(KeyID), + #[error("unknown key id `{0}`")] UnknownKey(KeyID), - #[error("record signature verification failed")] - SignatureVerificationFailed, + #[error("signature `{0}` verification failed")] + SignatureVerificationFailed(Signature), #[error("the record was rejected: {0}")] Rejection(String), @@ -226,6 +252,25 @@ pub trait DataStore: Send + Sync { &self, ) -> Result, DataStoreError>; + /// Get checkpoint by log length. + async fn get_checkpoint( + &self, + log_length: RegistryLen, + ) -> Result, DataStoreError>; + + /// Gets package names from log IDs. If package name is unavailable, a corresponding `None` is returned. + async fn get_package_names( + &self, + log_ids: &[LogId], + ) -> Result>, DataStoreError>; + + /// Gets a batch of log leafs starting with a registry log index. + async fn get_log_leafs_starting_with_registry_index( + &self, + starting_index: RegistryIndex, + limit: usize, + ) -> Result, DataStoreError>; + /// Gets the operator records for the given registry log length. async fn get_operator_records( &self, @@ -270,6 +315,22 @@ pub trait DataStore: Send + Sync { record: &ProtoEnvelope, ) -> Result<(), DataStoreError>; + /// Verifies the package name is unique in a case insensitive way and that the + /// package namespace is defined for this registry and is not imported + /// from another registry. + async fn verify_can_publish_package( + &self, + operator_log_id: &LogId, + package_id: &PackageId, + ) -> Result<(), DataStoreError>; + + /// Verifies the TimestampedCheckpoint signature. + async fn verify_timestamped_checkpoint_signature( + &self, + operator_log_id: &LogId, + ts_checkpoint: &SerdeEnvelope, + ) -> Result<(), DataStoreError>; + // Returns a list of package names, for debugging only. #[cfg(feature = "debug")] #[doc(hidden)] diff --git a/crates/server/src/datastore/postgres/migrations/2023-11-15-210355_package_name_lowercase/down.sql b/crates/server/src/datastore/postgres/migrations/2023-11-15-210355_package_name_lowercase/down.sql new file mode 100644 index 00000000..d9a93fe9 --- /dev/null +++ b/crates/server/src/datastore/postgres/migrations/2023-11-15-210355_package_name_lowercase/down.sql @@ -0,0 +1 @@ +-- This file should undo anything in `up.sql` diff --git a/crates/server/src/datastore/postgres/migrations/2023-11-15-210355_package_name_lowercase/up.sql b/crates/server/src/datastore/postgres/migrations/2023-11-15-210355_package_name_lowercase/up.sql new file mode 100644 index 00000000..d4305c36 --- /dev/null +++ b/crates/server/src/datastore/postgres/migrations/2023-11-15-210355_package_name_lowercase/up.sql @@ -0,0 +1,2 @@ +-- Your SQL goes here +CREATE UNIQUE INDEX logs_package_name_lowercase ON logs (LOWER(name)); diff --git a/crates/server/src/datastore/postgres/mod.rs b/crates/server/src/datastore/postgres/mod.rs index 075f1514..93808026 100644 --- a/crates/server/src/datastore/postgres/mod.rs +++ b/crates/server/src/datastore/postgres/mod.rs @@ -4,6 +4,7 @@ use self::models::{ }; use super::{DataStore, DataStoreError, Record}; use anyhow::{anyhow, Result}; +use diesel::sql_types::{Nullable, Text}; use diesel::{prelude::*, result::DatabaseErrorKind}; use diesel_async::{ pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager}, @@ -20,7 +21,7 @@ use std::{ collections::{HashMap, HashSet}, pin::Pin, }; -use warg_crypto::{hash::AnyHash, Decode, Signable}; +use warg_crypto::{hash::AnyHash, Decode, Encode, Signable}; use warg_protocol::{ operator, package::{self, PackageEntry}, @@ -34,6 +35,8 @@ use warg_protocol::{ mod models; mod schema; +sql_function!(fn lower(x: Nullable) -> Nullable); + async fn get_records( conn: &mut AsyncPgConnection, log_id: i32, @@ -449,6 +452,38 @@ impl DataStore for PostgresDataStore { )) } + async fn get_log_leafs_starting_with_registry_index( + &self, + starting_index: RegistryIndex, + limit: usize, + ) -> Result, DataStoreError> { + let mut conn = self.pool.get().await?; + + Ok(schema::records::table + .inner_join(schema::logs::table) + .select(( + schema::records::registry_log_index, + schema::logs::log_id, + schema::records::record_id, + )) + .filter(schema::records::registry_log_index.ge(starting_index as i64)) + .order(schema::records::registry_log_index.asc()) + .limit(limit as i64) + .load::<(Option, ParsedText, ParsedText)>(&mut conn) + .await? + .into_iter() + .map(|(registry_index, log_id, record_id)| { + ( + registry_index.unwrap() as RegistryIndex, + LogLeaf { + log_id: log_id.0.into(), + record_id: record_id.0.into(), + }, + ) + }) + .collect::>()) + } + // Note: order of the entries is expected to match to the corresponding returned log leafs. async fn get_log_leafs_with_registry_index( &self, @@ -491,6 +526,39 @@ impl DataStore for PostgresDataStore { .collect::, _>>()?) } + async fn get_package_names( + &self, + log_ids: &[LogId], + ) -> Result>, DataStoreError> { + let mut conn = self.pool.get().await?; + + let map = schema::logs::table + .select((schema::logs::log_id, schema::logs::name)) + .filter( + schema::logs::log_id + .eq_any(log_ids.iter().map(TextRef).collect::>>()), + ) + .load::<(ParsedText, Option)>(&mut conn) + .await? + .into_iter() + .map(|(log_id, opt_package_name)| { + ( + log_id.0.into(), + opt_package_name.map(|name| PackageId::new(name).unwrap()), + ) + }) + .collect::>>(); + + // check if any log IDs were not found + for log_id in log_ids { + if !map.contains_key(log_id) { + return Err(DataStoreError::LogNotFound(log_id.clone())); + } + } + + Ok(map) + } + async fn store_operator_record( &self, log_id: &LogId, @@ -780,6 +848,33 @@ impl DataStore for PostgresDataStore { )) } + async fn get_checkpoint( + &self, + log_length: RegistryLen, + ) -> Result, DataStoreError> { + let mut conn = self.pool.get().await?; + + let checkpoint = schema::checkpoints::table + .filter(schema::checkpoints::log_length.eq(log_length as i64)) + .first::(&mut conn) + .await + .optional()? + .ok_or_else(|| DataStoreError::CheckpointNotFound(log_length))?; + + Ok(SerdeEnvelope::from_parts_unchecked( + TimestampedCheckpoint { + checkpoint: Checkpoint { + log_root: checkpoint.log_root.0, + log_length, + map_root: checkpoint.map_root.0, + }, + timestamp: checkpoint.timestamp.try_into().unwrap(), + }, + checkpoint.key_id.0, + checkpoint.signature.0, + )) + } + async fn get_operator_records( &self, log_id: &LogId, @@ -862,7 +957,100 @@ impl DataStore for PostgresDataStore { }; package::PackageRecord::verify(key, record.content_bytes(), record.signature()) - .map_err(|_| DataStoreError::SignatureVerificationFailed) + .map_err(|_| DataStoreError::SignatureVerificationFailed(record.signature().clone())) + } + + async fn verify_can_publish_package( + &self, + operator_log_id: &LogId, + package_id: &PackageId, + ) -> Result<(), DataStoreError> { + let mut conn = self.pool.get().await?; + + let validator = schema::logs::table + .select(schema::logs::validator) + .filter(schema::logs::log_id.eq(TextRef(operator_log_id))) + .first::>(&mut conn) + .await + .optional()? + .ok_or_else(|| DataStoreError::LogNotFound(operator_log_id.clone()))?; + + // verify namespace is defined and not imported + match validator.namespace_state(package_id.namespace()) { + Ok(Some(state)) => match state { + operator::NamespaceState::Defined => {} + operator::NamespaceState::Imported { .. } => { + return Err(DataStoreError::PackageNamespaceImported( + package_id.namespace().to_string(), + )) + } + }, + Ok(None) => { + return Err(DataStoreError::PackageNamespaceNotDefined( + package_id.namespace().to_string(), + )) + } + Err(existing_namespace) => { + return Err(DataStoreError::PackageNamespaceConflict { + namespace: package_id.namespace().to_string(), + existing: existing_namespace.to_string(), + }) + } + } + + // verify package name is unique in a case insensitive way + match schema::logs::table + .select(schema::logs::name) + .filter( + lower(schema::logs::name).eq(TextRef(&package_id.as_ref().to_ascii_lowercase())), + ) + .first::>(&mut conn) + .await + .optional()? + { + Some(Some(name)) if name != package_id.as_ref() => { + Err(DataStoreError::PackageNameConflict { + name: package_id.clone(), + existing: PackageId::new(name).unwrap(), + }) + } + _ => Ok(()), + } + } + + async fn verify_timestamped_checkpoint_signature( + &self, + operator_log_id: &LogId, + ts_checkpoint: &SerdeEnvelope, + ) -> Result<(), DataStoreError> { + let mut conn = self.pool.get().await?; + + let validator = schema::logs::table + .select(schema::logs::validator) + .filter(schema::logs::log_id.eq(TextRef(operator_log_id))) + .first::>(&mut conn) + .await + .optional()? + .ok_or_else(|| DataStoreError::LogNotFound(operator_log_id.clone()))?; + + TimestampedCheckpoint::verify( + validator + .public_key(ts_checkpoint.key_id()) + .ok_or(DataStoreError::UnknownKey(ts_checkpoint.key_id().clone()))?, + &ts_checkpoint.as_ref().encode(), + ts_checkpoint.signature(), + ) + .or(Err(DataStoreError::SignatureVerificationFailed( + ts_checkpoint.signature().clone(), + )))?; + + if !validator.key_has_permission_to_sign_checkpoints(ts_checkpoint.key_id()) { + return Err(DataStoreError::KeyUnauthorized( + ts_checkpoint.key_id().clone(), + )); + } + + Ok(()) } #[cfg(feature = "debug")] diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index d0ad90f0..eb39439f 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -16,6 +16,7 @@ use std::{ use tokio::task::JoinHandle; use url::Url; use warg_crypto::signing::PrivateKey; +use warg_protocol::operator; pub mod api; pub mod args; @@ -31,6 +32,7 @@ type ShutdownFut = Pin + Send + Sync>>; /// The server configuration. pub struct Config { operator_key: PrivateKey, + namespaces: Option>, addr: Option, data_store: Option>, content_dir: PathBuf, @@ -45,6 +47,7 @@ impl std::fmt::Debug for Config { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Config") .field("operator_key", &"") + .field("namespaces", &self.namespaces) .field("addr", &self.addr) .field( "data_store", @@ -67,9 +70,14 @@ impl std::fmt::Debug for Config { impl Config { /// Creates a new server configuration. - pub fn new(operator_key: PrivateKey, content_dir: PathBuf) -> Self { + pub fn new( + operator_key: PrivateKey, + namespaces: Option>, + content_dir: PathBuf, + ) -> Self { Self { operator_key, + namespaces, addr: None, data_store: None, content_dir, @@ -187,6 +195,7 @@ impl Server { .unwrap_or_else(|| Box::::default()); let (core, core_handle) = CoreService::start( self.config.operator_key, + self.config.namespaces, store, self.config .checkpoint_interval diff --git a/crates/server/src/services/core.rs b/crates/server/src/services/core.rs index aae6cd87..103e3b29 100644 --- a/crates/server/src/services/core.rs +++ b/crates/server/src/services/core.rs @@ -44,6 +44,7 @@ impl CoreService { /// copies of the service handle to allow for graceful shutdown. pub async fn start( operator_key: PrivateKey, + namespaces: Option>, store: Box, checkpoint_interval: Duration, ) -> Result<(Self, JoinHandle<()>), CoreServiceError> { @@ -53,7 +54,7 @@ impl CoreService { store, state: Default::default(), }; - inner.initialize().await?; + inner.initialize(namespaces).await?; // Spawn state update task let inner = Arc::new(inner); @@ -182,7 +183,10 @@ struct Inner { impl Inner { // Load state from DataStore or initialize empty state, returning any // entries that are not yet part of a checkpoint. - async fn initialize(&mut self) -> Result<(), CoreServiceError> { + async fn initialize( + &mut self, + namespaces: Option>, + ) -> Result<(), CoreServiceError> { tracing::debug!("Initializing CoreService"); let published = self.store.get_all_validated_records().await?.peekable(); @@ -191,7 +195,7 @@ impl Inner { // If there are no published records, initialize a new state if published.as_mut().peek().await.is_none() { tracing::debug!("No existing records; initializing new state"); - return self.initialize_new().await; + return self.initialize_new(namespaces).await; } // Reconstruct internal state from previously-stored data @@ -217,18 +221,43 @@ impl Inner { Ok(()) } - async fn initialize_new(&mut self) -> Result<(), CoreServiceError> { + async fn initialize_new( + &mut self, + namespaces: Option>, + ) -> Result<(), CoreServiceError> { let state = self.state.get_mut(); // Construct operator init record + let init = operator::OperatorEntry::Init { + hash_algorithm: Digest::ALGORITHM, + key: self.operator_key.public_key(), + }; + let entries = if let Some(namespaces) = namespaces { + let mut entries = Vec::with_capacity(1 + namespaces.len()); + entries.push(init); + for (namespace, state) in namespaces.into_iter() { + entries.push(match state { + operator::NamespaceState::Defined => { + operator::OperatorEntry::DefineNamespace { namespace } + } + operator::NamespaceState::Imported { registry } => { + operator::OperatorEntry::ImportNamespace { + namespace, + registry, + } + } + }); + } + entries + } else { + vec![init] + }; + let init_record = operator::OperatorRecord { prev: None, version: 0, timestamp: SystemTime::now(), - entries: vec![operator::OperatorEntry::Init { - hash_algorithm: Digest::ALGORITHM, - key: self.operator_key.public_key(), - }], + entries, }; let signed_init_record = ProtoEnvelope::signed_contents(&self.operator_key, init_record).unwrap(); diff --git a/proto/warg/protocol/warg.proto b/proto/warg/protocol/warg.proto index a92555cd..9d863f09 100644 --- a/proto/warg/protocol/warg.proto +++ b/proto/warg/protocol/warg.proto @@ -28,12 +28,16 @@ message OperatorEntry { OperatorInit init = 1; OperatorGrantFlat grant_flat = 2; OperatorRevokeFlat revoke_flat = 3; + OperatorDefineNamespace define_namespace = 4; + OperatorImportNamespace import_namespace = 5; } } enum OperatorPermission { OPERATOR_PERMISSION_UNSPECIFIED = 0; OPERATOR_PERMISSION_COMMIT = 1; + OPERATOR_PERMISSION_DEFINE_NAMESPACE = 2; + OPERATOR_PERMISSION_IMPORT_NAMESPACE = 3; } message OperatorInit { @@ -57,6 +61,18 @@ message OperatorRevokeFlat { repeated OperatorPermission permissions = 2; } +message OperatorDefineNamespace { + // The registry defined namespace to be used in its own package logs. + string namespace = 1; +} + +message OperatorImportNamespace { + // The registry defined namespace to be imported from another registry. + string namespace = 1; + // The registry that the namespace is imported from. + string registry = 2; +} + message PackageRecord { // The previous entry in the log. // First entry of a log has no previous entry. @@ -109,4 +125,4 @@ message PackageRelease { message PackageYank { string version = 1; -} \ No newline at end of file +} diff --git a/tests/memory/mod.rs b/tests/memory/mod.rs index 2e5a4dbc..bb79580c 100644 --- a/tests/memory/mod.rs +++ b/tests/memory/mod.rs @@ -24,6 +24,8 @@ async fn it_publishes_a_component() -> Result<()> { "expected two log entries (initial + component)" ); + test_fetch_package_names(&config).await?; + Ok(()) } @@ -89,6 +91,12 @@ async fn it_rejects_unknown_signing_key() -> Result<()> { test_unknown_signing_key(&config).await } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn it_rejects_package_name_conflict() -> Result<()> { + let (_server, config) = spawn_server(&root().await?, None, None, None).await?; + test_publishing_name_conflict(&config).await +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn it_rejects_invalid_signature() -> Result<()> { let (_server, config) = spawn_server(&root().await?, None, None, None).await?; @@ -106,3 +114,9 @@ async fn it_formats_custom_content_urls() -> Result<()> { .await?; test_custom_content_url(&config).await } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn it_get_ledger() -> Result<()> { + let (_server, config) = spawn_server(&root().await?, None, None, None).await?; + test_get_ledger(&config).await +} diff --git a/tests/postgres/mod.rs b/tests/postgres/mod.rs index 8ed5939e..9b69cf97 100644 --- a/tests/postgres/mod.rs +++ b/tests/postgres/mod.rs @@ -49,16 +49,20 @@ async fn it_works_with_postgres() -> TestResult { test_wit_publishing(&config).await?; test_wasm_content_policy(&config).await?; test_unauthorized_signing_key(&config).await?; + test_publishing_name_conflict(&config).await?; // This is tested below where a different server is used that // allows any signing key //test_unknown_signing_key(&config).await?; test_invalid_signature(&config).await?; + test_fetch_package_names(&config).await?; + test_get_ledger(&config).await?; let mut packages = vec![ PackageId::new("test:component")?, PackageId::new("test:yankee")?, PackageId::new("test:wit-package")?, PackageId::new("test:unauthorized-key")?, + PackageId::new("test:name")?, ]; // There should be two log entries in the registry diff --git a/tests/server.rs b/tests/server.rs index 047e4031..35f564df 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -10,6 +10,8 @@ use std::{ use url::Url; use warg_api::v1::{ content::{ContentSource, ContentSourcesResponse}, + fetch::{FetchPackageNamesRequest, FetchPackageNamesResponse}, + ledger::{LedgerSource, LedgerSourceContentType, LedgerSourcesResponse}, package::PublishRecordRequest, paths, }; @@ -18,7 +20,11 @@ use warg_client::{ storage::{PublishEntry, PublishInfo, RegistryStorage}, ClientError, Config, }; -use warg_crypto::{hash::Sha256, signing::PrivateKey, Encode, Signable}; +use warg_crypto::{ + hash::{HashAlgorithm, Sha256}, + signing::PrivateKey, + Encode, Signable, +}; use warg_protocol::{ package::{PackageEntry, PackageRecord, PACKAGE_RECORD_VERSION}, registry::{LogId, PackageId}, @@ -330,6 +336,35 @@ async fn test_unknown_signing_key(config: &Config) -> Result<()> { Ok(()) } +async fn test_publishing_name_conflict(config: &Config) -> Result<()> { + let client = create_client(config)?; + let signing_key = test_signing_key(); + + publish_component( + &client, + &PackageId::new("test:name")?, + "0.1.0", + "(component)", + true, + &signing_key, + ) + .await?; + + // should be rejected + publish_component( + &client, + &PackageId::new("test:NAME")?, + "0.1.1", + "(component)", + true, + &signing_key, + ) + .await + .expect_err("expected publish to fail"); + + Ok(()) +} + async fn test_invalid_signature(config: &Config) -> Result<()> { const PACKAGE_ID: &str = "test:invalid-signature"; @@ -379,7 +414,7 @@ async fn test_invalid_signature(config: &Config) -> Result<()> { "unexpected response from server: {status}\n{body}", ); assert!( - body.contains("record signature verification failed"), + body.contains("verification failed"), "unexpected response body: {body}" ); @@ -436,3 +471,121 @@ async fn test_custom_content_url(config: &Config) -> Result<()> { Ok(()) } + +async fn test_fetch_package_names(config: &Config) -> Result<()> { + let id_1 = PackageId::new("test:component")?; + let log_id_1 = LogId::package_log::(&id_1); + + let url = Url::parse(config.default_url.as_ref().unwrap())? + .join(paths::fetch_package_names()) + .unwrap(); + + let body = FetchPackageNamesRequest { + packages: Cow::Owned(vec![log_id_1.clone()]), + }; + + let client = reqwest::Client::new(); + let response = client + .post(url) + .json(&serde_json::to_value(&body).unwrap()) + .send() + .await?; + + let status = response.status(); + let names_resp = response.json::().await?; + + assert_eq!( + status, + StatusCode::OK, + "unexpected response from server: {status}", + ); + + let lookup_id_1 = names_resp.packages.get(&log_id_1); + assert_eq!( + lookup_id_1, + Some(&Some(id_1.clone())), + "fetch of package name {id_1} mismatched to {lookup_id_1:?}" + ); + + Ok(()) +} + +async fn test_get_ledger(config: &Config) -> Result<()> { + let client = api::Client::new(config.default_url.as_ref().unwrap())?; + + let ts_checkpoint = client.latest_checkpoint().await?; + let checkpoint = &ts_checkpoint.as_ref().checkpoint; + + let url = Url::parse(config.default_url.as_ref().unwrap())? + .join(paths::ledger_sources()) + .unwrap(); + + let client = reqwest::Client::new(); + let response = client.get(url).send().await?; + + let status = response.status(); + let ledger_sources = response.json::().await?; + + assert_eq!( + status, + StatusCode::OK, + "unexpected response from server: {status}", + ); + + let hash_algorithm = ledger_sources.hash_algorithm; + assert_eq!( + hash_algorithm, + HashAlgorithm::Sha256, + "unexpected hash_algorithm: {hash_algorithm}", + ); + + let sources_len = ledger_sources.sources.len(); + assert_eq!(sources_len, 1, "unexpected sources length: {sources_len}",); + + let LedgerSource { + first_registry_index, + last_registry_index, + url, + content_type, + .. + } = ledger_sources.sources.get(0).unwrap(); + + assert_eq!( + content_type, + &LedgerSourceContentType::Packed, + "unexpected ledger source content type", + ); + assert_eq!( + *first_registry_index, 0, + "unexpected ledger source first registry index: {first_registry_index}", + ); + assert_eq!( + *last_registry_index, + checkpoint.log_length - 1, + "unexpected ledger source last registry index: {last_registry_index}", + ); + + let url = Url::parse(config.default_url.as_ref().unwrap())? + .join(url) + .unwrap(); + + // get ledger source + let response = client.get(url).send().await?; + + let status = response.status(); + assert_eq!( + status, + StatusCode::OK, + "unexpected response from server: {status}", + ); + + let bytes = response.bytes().await?; + let bytes_len = bytes.len(); + assert_eq!( + bytes_len, + checkpoint.log_length * 64, + "unexpected response body length for ledger source from server: {bytes_len}", + ); + + Ok(()) +} diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 2f62368c..f261b34e 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -17,7 +17,7 @@ use warg_crypto::{ hash::AnyHash, signing::{KeyID, PrivateKey}, }; -use warg_protocol::registry::PackageId; +use warg_protocol::{operator, registry::PackageId}; use warg_server::{ datastore::DataStore, policy::{content::WasmContentPolicy, record::AuthorizedKeyPolicy}, @@ -25,6 +25,13 @@ use warg_server::{ }; use wit_parser::{Resolve, UnresolvedPackage}; +pub fn test_namespaces() -> Option> { + Some(vec![( + "test".to_string(), + operator::NamespaceState::Defined, + )]) +} + pub fn test_operator_key() -> PrivateKey { let key = "ecdsa-p256:I+UlDo0HxyBBFeelhPPWmD+LnklOpqZDkrFP5VduASk="; PrivateKey::decode(key.to_string()).unwrap() @@ -113,7 +120,7 @@ pub async fn spawn_server( let _subscriber_guard = thread_test_logging(); let shutdown = CancellationToken::new(); - let mut config = Config::new(test_operator_key(), root.join("server")) + let mut config = Config::new(test_operator_key(), test_namespaces(), root.join("server")) .with_addr(([127, 0, 0, 1], 0)) .with_shutdown(shutdown.clone().cancelled_owned()) .with_checkpoint_interval(Duration::from_millis(100))