From 78a5571c7b99c64d12fc3515725e8ee321c1bd48 Mon Sep 17 00:00:00 2001 From: Daniel Macovei Date: Thu, 11 Apr 2024 12:22:52 -0500 Subject: [PATCH 1/4] make better use of namespace storage --- crates/client/src/api.rs | 47 +++-- crates/client/src/depsolve.rs | 20 +- crates/client/src/lib.rs | 327 +++++++++++++++++++++----------- crates/client/src/storage.rs | 35 +--- crates/client/src/storage/fs.rs | 16 +- src/bin/warg.rs | 117 +++++++++--- src/commands.rs | 54 +----- src/commands/bundle.rs | 17 +- src/commands/clear.rs | 2 +- src/commands/config.rs | 2 +- src/commands/dependencies.rs | 27 ++- src/commands/download.rs | 7 +- src/commands/info.rs | 13 +- src/commands/key.rs | 12 +- src/commands/lock.rs | 20 +- src/commands/login.rs | 4 +- src/commands/logout.rs | 4 +- src/commands/publish.rs | 61 +++--- src/commands/reset.rs | 2 +- src/commands/update.rs | 2 +- tests/client.rs | 13 +- tests/depsolve.rs | 9 +- tests/memory/mod.rs | 6 +- tests/postgres/mod.rs | 8 +- tests/server.rs | 48 +++-- tests/support/mod.rs | 4 +- 26 files changed, 529 insertions(+), 348 deletions(-) diff --git a/crates/client/src/api.rs b/crates/client/src/api.rs index eea7edd1..6c7f7b5b 100644 --- a/crates/client/src/api.rs +++ b/crates/client/src/api.rs @@ -159,11 +159,11 @@ async fn into_result) -> Result; + fn warg_header(self, registry_header: Option<&RegistryDomain>) -> Result; } impl WithWargHeader for RequestBuilder { - fn warg_header(self, registry_header: &Option) -> Result { + fn warg_header(self, registry_header: Option<&RegistryDomain>) -> Result { if let Some(reg) = registry_header { Ok(self.header(REGISTRY_HEADER_NAME, HeaderValue::try_from(reg.clone())?)) } else { @@ -219,13 +219,14 @@ impl Client { /// Gets the latest checkpoint from the registry. pub async fn latest_checkpoint( &self, + reg_domain: Option<&RegistryDomain>, ) -> Result, ClientError> { let url = self.url.join(paths::fetch_checkpoint()); tracing::debug!("getting latest checkpoint at `{url}`"); into_result::<_, FetchError>( self.client .get(url) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?, @@ -260,6 +261,7 @@ impl Client { /// Verify checkpoint of the registry. pub async fn verify_checkpoint( &self, + reg_domain: Option<&RegistryDomain>, request: SerdeEnvelope, ) -> Result { let url = self.url.join(paths::verify_checkpoint()); @@ -269,7 +271,7 @@ impl Client { .client .post(url) .json(&request) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?; @@ -279,6 +281,7 @@ impl Client { /// Fetches package log entries from the registry. pub async fn fetch_logs( &self, + reg_domain: Option<&RegistryDomain>, request: FetchLogsRequest<'_>, ) -> Result { let url = self.url.join(paths::fetch_logs()); @@ -287,7 +290,7 @@ impl Client { .client .post(&url) .json(&request) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?; @@ -306,6 +309,7 @@ impl Client { /// Fetches package names from the registry. pub async fn fetch_package_names( &self, + reg_domain: Option<&RegistryDomain>, request: FetchPackageNamesRequest<'_>, ) -> Result { let url = self.url.join(paths::fetch_package_names()); @@ -314,7 +318,7 @@ impl Client { let response = self .client .post(url) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .json(&request) .send() @@ -323,14 +327,17 @@ impl Client { } /// Gets ledger sources from the registry. - pub async fn ledger_sources(&self) -> Result { + pub async fn ledger_sources( + &self, + reg_domain: Option<&RegistryDomain>, + ) -> Result { let url = self.url.join(paths::ledger_sources()); tracing::debug!("getting ledger sources at `{url}`"); into_result::<_, LedgerError>( self.client .get(url) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?, @@ -341,6 +348,7 @@ impl Client { /// Publish a new record to a package log. pub async fn publish_package_record( &self, + reg_domain: Option<&RegistryDomain>, log_id: &LogId, request: PublishRecordRequest<'_>, ) -> Result { @@ -354,7 +362,7 @@ impl Client { .client .post(url) .json(&request) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?; @@ -364,6 +372,7 @@ impl Client { /// Gets a package record from the registry. pub async fn get_package_record( &self, + reg_domain: Option<&RegistryDomain>, log_id: &LogId, record_id: &RecordId, ) -> Result { @@ -373,7 +382,7 @@ impl Client { into_result::<_, PackageError>( self.client .get(url) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?, @@ -384,6 +393,7 @@ impl Client { /// Gets a content sources from the registry. pub async fn content_sources( &self, + reg_domain: Option<&RegistryDomain>, digest: &AnyHash, ) -> Result { let url = self.url.join(&paths::content_sources(digest)); @@ -392,7 +402,7 @@ impl Client { into_result::<_, ContentError>( self.client .get(url) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?, @@ -403,11 +413,13 @@ impl Client { /// Downloads the content associated with a given record. pub async fn download_content( &self, + registry_domain: Option<&RegistryDomain>, digest: &AnyHash, ) -> Result>, ClientError> { tracing::debug!("requesting content download for digest `{digest}`"); - let ContentSourcesResponse { content_sources } = self.content_sources(digest).await?; + let ContentSourcesResponse { content_sources } = + self.content_sources(registry_domain, digest).await?; let sources = content_sources .get(digest) @@ -438,14 +450,10 @@ impl Client { self.warg_registry_header = registry; } - /// Get warg-registry header value - pub fn get_warg_registry(&self) -> &Option { - &self.warg_registry_header - } - /// Proves the inclusion of the given package log heads in the registry. pub async fn prove_inclusion( &self, + reg_domain: Option<&RegistryDomain>, request: InclusionRequest, checkpoint: &Checkpoint, leafs: &[LogLeaf], @@ -457,7 +465,7 @@ impl Client { self.client .post(url) .json(&request) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?, @@ -470,6 +478,7 @@ impl Client { /// Proves consistency between two log roots. pub async fn prove_log_consistency( &self, + reg_domain: Option<&RegistryDomain>, request: ConsistencyRequest, from_log_root: Cow<'_, AnyHash>, to_log_root: Cow<'_, AnyHash>, @@ -479,7 +488,7 @@ impl Client { self.client .post(url) .json(&request) - .warg_header(self.get_warg_registry())? + .warg_header(reg_domain)? .auth(self.auth_token()) .send() .await?, diff --git a/crates/client/src/depsolve.rs b/crates/client/src/depsolve.rs index 5a9c96f5..5bd79d2c 100644 --- a/crates/client/src/depsolve.rs +++ b/crates/client/src/depsolve.rs @@ -101,9 +101,10 @@ impl LockListBuilder { match import.kind { ImportKind::Locked(_) | ImportKind::Unlocked => { let id = PackageName::new(import.name.clone())?; + let registry_domain = client.get_warg_registry(id.namespace()).await?; if let Some(info) = client .registry() - .load_package(client.api.get_warg_registry(), &id) + .load_package(registry_domain.as_ref(), &id) .await? { let release = info.state.releases().last(); @@ -114,10 +115,15 @@ impl LockListBuilder { } self.lock_list.insert(import); } else { - client.download(&id, &VersionReq::STAR).await?; + client + .download(registry_domain.as_ref(), &id, &VersionReq::STAR) + .await?; if let Some(info) = client .registry() - .load_package(client.api.get_warg_registry(), &id) + .load_package( + client.get_warg_registry(id.namespace()).await?.as_ref(), + &id, + ) .await? { let release = info.state.releases().last(); @@ -213,7 +219,13 @@ where if let Some(info) = self .client .registry() - .load_package(self.client.api.get_warg_registry(), &pkg_id) + .load_package( + self.client + .get_warg_registry(pkg_id.namespace()) + .await? + .as_ref(), + &pkg_id, + ) .await? { let release = if parsed_imp.req != VersionReq::STAR { diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 08c6e810..c91d482e 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -10,6 +10,7 @@ use secrecy::Secret; use semver::{Version, VersionReq}; use std::cmp::Ordering; use std::fs; +use std::future::Future; use std::str::FromStr; use std::{borrow::Cow, path::PathBuf, time::Duration}; use storage::{ @@ -65,7 +66,7 @@ where impl Client { /// Creates a new client for the given URL, registry storage, and /// content storage. - pub fn new( + pub async fn new( url: impl IntoUrl, registry: R, content: C, @@ -101,6 +102,36 @@ impl Client Result> { + self.update_checkpoint(None, &self.api.latest_checkpoint(None).await?, vec![]) + .await?; + let operator = self + .registry() + .load_operator(Some(&RegistryDomain::from_str(namespace)?)) + .await?; + if let Some(op) = operator { + let namespace_state = op.state.namespace_state(namespace); + if let Ok(Some(warg_protocol::operator::NamespaceState::Imported { registry })) = + namespace_state + { + return Ok(Some(RegistryDomain::from_str(registry)?)); + } else if let Ok(Some(warg_protocol::operator::NamespaceState::Defined)) = + namespace_state + { + return Ok(None); + }; + }; + let nm_map = self.namespace_map.load_namespace_map().await?; + Ok(if let Some(nm_map) = &nm_map { + nm_map + .get(namespace) + .map(|domain| RegistryDomain::from_str(domain).unwrap()) + } else { + None + }) + } + /// Stores namespace mapping in local storage pub async fn store_namespace( &self, @@ -137,45 +168,6 @@ impl Client ClientResult<()> { - self.update_checkpoint(&self.api.latest_checkpoint().await?, vec![]) - .await?; - let operator = self.registry().load_operator(&None).await?; - let operator_log_maps_namespace = if let Some(op) = operator { - let namespace_state = op.state.namespace_state(namespace); - if let Ok(Some(nm)) = namespace_state { - if let warg_protocol::operator::NamespaceState::Imported { registry } = nm { - self.api - .set_warg_registry(Some(RegistryDomain::from_str(registry)?)); - } - true - } else { - false - } - } else { - false - }; - if !operator_log_maps_namespace { - let map = self.namespace_map().load_namespace_map().await?; - if let Some(map) = map { - let namespace = map.get(namespace); - if let Some(nm) = namespace { - self.api - .set_warg_registry(Some(RegistryDomain::from_str(nm)?)); - } else { - self.api.set_warg_registry(None); - } - } - } - Ok(()) - } - - /// Get warg-registry header value - pub fn get_warg_registry(&self) -> &Option { - self.api.get_warg_registry() - } - /// Locks component pub async fn lock_component(&self, info: &PackageInfo) -> ClientResult> { let mut builder = LockListBuilder::default(); @@ -194,7 +186,7 @@ impl Client Client ClientResult { + let registry_domain = self.get_warg_registry(info.name.namespace()).await?; if info.entries.is_empty() { return Err(ClientError::NothingToPublish { name: info.name.clone(), @@ -333,15 +326,19 @@ impl Client Client Client ClientResult<()> { + let registry_domain = self.get_warg_registry(package.namespace()).await?; let log_id = LogId::package_log::(package); - let mut current = self.get_package_record(package, &log_id, record_id).await?; + let mut current = self + .get_package_record(registry_domain.as_ref(), package, &log_id, record_id) + .await?; loop { match current.state { @@ -449,7 +450,9 @@ impl Client { tokio::time::sleep(interval).await; - current = self.get_package_record(package, &log_id, record_id).await?; + current = self + .get_package_record(registry_domain.as_ref(), package, &log_id, record_id) + .await?; } } } @@ -468,9 +471,13 @@ impl Client Client> = IndexMap::new(); for package in packages { - updating.push( - self.registry - .load_package(self.api.get_warg_registry(), package) - .await? - .unwrap_or_else(|| PackageInfo::new(package.clone())), - ); + let namespace = package.namespace(); + let namespace_packages = namespaced.get_mut(namespace); + if let Some(nm_pkgs) = namespace_packages { + nm_pkgs.push(package); + } else { + namespaced.insert(namespace, vec![package]); + } } + let namespace_map = self.namespace_map.load_namespace_map().await?; + if let Some(nm_map) = namespace_map { + for (nm, pkg_names) in namespaced { + let mut updating = Vec::with_capacity(pkg_names.len()); + let reg_domain = nm_map.get(nm); + if reg_domain.is_some() { + for package in pkg_names { + updating.push( + self.registry + .load_package(self.get_warg_registry(nm).await?.as_ref(), package) + .await? + .unwrap_or_else(|| PackageInfo::new(package.clone())), + ); + } + } - self.update_checkpoint(&self.api.latest_checkpoint().await?, &mut updating) - .await?; + self.update_checkpoint( + reg_domain + .map(|url| RegistryDomain::from_str(url).unwrap()) + .as_ref(), + &self + .api + .latest_checkpoint(self.get_warg_registry(nm).await?.as_ref()) + .await?, + &mut updating, + ) + .await?; + } + } else { + for (_, pkg_names) in namespaced { + let mut updating = Vec::with_capacity(pkg_names.len()); + for package in pkg_names { + updating.push( + self.registry + .load_package(None, package) + .await? + .unwrap_or_else(|| PackageInfo::new(package.clone())), + ); + } + + self.update_checkpoint( + None, + &self.api.latest_checkpoint(None).await?, + &mut updating, + ) + .await?; + } + } Ok(()) } @@ -516,11 +569,12 @@ impl Client, name: &PackageName, requirement: &VersionReq, ) -> Result, ClientError> { tracing::info!("downloading package `{name}` with requirement `{requirement}`"); - let info = self.fetch_package(name).await?; + let info = self.fetch_package(registry_domain, name).await?; match info.state.find_latest_release(requirement) { Some(release) => { @@ -528,7 +582,7 @@ impl Client Client Result { + let registry_domain = self.get_warg_registry(package.namespace()).await?; tracing::info!("downloading version {version} of package `{package}`"); - let info = self.fetch_package(package).await?; + let info = self + .fetch_package(registry_domain.as_ref(), package) + .await?; let release = info.state @@ -574,13 +631,16 @@ impl Client( &self, + reg_domain: Option<&RegistryDomain>, ts_checkpoint: &SerdeEnvelope, packages: impl IntoIterator, ) -> Result<(), ClientError> { @@ -592,7 +652,7 @@ impl Client Client>(); loop { - // let response: FetchLogsResponse = match self let response: FetchLogsResponse = self .api - .fetch_logs(FetchLogsRequest { - log_length: checkpoint.log_length, - operator: operator - .head_fetch_token - .as_ref() - .map(|t| Cow::Borrowed(t.as_str())), - limit: None, - packages: Cow::Borrowed(&last_known), - }) + .fetch_logs( + reg_domain, + FetchLogsRequest { + log_length: checkpoint.log_length, + operator: operator + .head_fetch_token + .as_ref() + .map(|t| Cow::Borrowed(t.as_str())), + limit: None, + packages: Cow::Borrowed(&last_known), + }, + ) .await .map_err(|e| { ClientError::translate_log_not_found(e, |id| { @@ -741,6 +803,7 @@ impl Client Client Client { self.api .prove_log_consistency( + reg_domain, ConsistencyRequest { from: from_log_length, to: to_log_length, @@ -792,19 +852,15 @@ impl Client Client>, ) -> Result<(), ClientError> { for (name, ts_checkpoint) in ts_checkpoints { - if self.url().safe_label() != name { - self.api - .set_warg_registry(Some(RegistryDomain::from_str(&name)?)); - } else { - self.api.set_warg_registry(None) - } + let registry_domain = self.get_warg_registry(&name).await?; let mut packages = packages.get_mut(&name.clone()); if let Some(pkgs) = &mut packages { - self.update_checkpoint(&ts_checkpoint, pkgs.as_mut_slice()) - .await?; + self.update_checkpoint( + registry_domain.as_ref(), + &ts_checkpoint, + pkgs.as_mut_slice(), + ) + .await?; } } Ok(()) } - async fn fetch_package(&self, name: &PackageName) -> Result { - match self - .registry - .load_package(self.api.get_warg_registry(), name) - .await? - { + async fn fetch_package( + &self, + registry_domain: Option<&RegistryDomain>, + name: &PackageName, + ) -> Result { + match self.registry.load_package(registry_domain, name).await? { Some(info) => { tracing::info!("log for package `{name}` already exists in storage"); Ok(info) } None => { let mut info = PackageInfo::new(name.clone()); - self.update_checkpoint(&self.api.latest_checkpoint().await?, [&mut info]) - .await?; + self.update_checkpoint( + registry_domain, + &self.api.latest_checkpoint(registry_domain).await?, + [&mut info], + ) + .await?; Ok(info) } @@ -854,13 +913,14 @@ impl Client, package: &PackageName, log_id: &LogId, record_id: &RecordId, ) -> ClientResult { let record = self .api - .get_package_record(log_id, record_id) + .get_package_record(registry_domain, log_id, record_id) .await .map_err(|e| { ClientError::translate_log_not_found(e, |id| { @@ -878,7 +938,11 @@ impl Client Result { + pub async fn download_content( + &self, + registry_domain: Option<&RegistryDomain>, + digest: &AnyHash, + ) -> Result { match self.content.content_location(digest) { Some(path) => { tracing::info!("content for digest `{digest}` already exists in storage"); @@ -887,7 +951,7 @@ impl Client { self.content .store_content( - Box::pin(self.api.download_content(digest).await?), + Box::pin(self.api.download_content(registry_domain, digest).await?), Some(digest), ) .await?; @@ -923,7 +987,7 @@ impl FileSystemClient { /// If a lock cannot be acquired for a storage directory, then /// `NewClientResult::Blocked` is returned with the path to the /// directory that could not be locked. - pub fn try_new_with_config( + pub async fn try_new_with_config( url: Option<&str>, config: &Config, auth_token: Option>, @@ -945,13 +1009,9 @@ impl FileSystemClient { (_, None, _) => return Ok(StorageLockResult::NotAcquired(content_dir)), }; - Ok(StorageLockResult::Acquired(Self::new( - url.into_url(), - packages, - content, - namespace_map, - auth_token, - )?)) + Ok(StorageLockResult::Acquired( + Self::new(url.into_url(), packages, content, namespace_map, auth_token).await?, + )) } /// Creates a client for the given registry URL. @@ -960,7 +1020,7 @@ impl FileSystemClient { /// URL, an error is returned. /// /// This method blocks if storage locks cannot be acquired. - pub fn new_with_config( + pub async fn new_with_config( url: Option<&str>, config: &Config, auth_token: Option>, @@ -978,6 +1038,7 @@ impl FileSystemClient { FileSystemNamespaceMapStorage::new(namespace_map_path), auth_token, ) + .await } } @@ -1182,3 +1243,43 @@ impl ClientError { /// Represents the result of a client operation. pub type ClientResult = Result; + +/// Namespace mapping to store when retrying a command after receiving a hint header +#[derive(Clone)] +pub struct Retry { + namespace: String, + registry: String, +} + +impl Retry { + /// New Retry + pub fn new(namespace: String, registry: String) -> Self { + Self { + namespace, + registry, + } + } + + /// Map namespace using Retry information + pub async fn store_namespace( + &self, + client: &Client, + ) -> Result<()> { + client + .store_namespace( + self.namespace.clone(), + RegistryDomain::from_str(&self.registry)?, + ) + .await?; + Ok(()) + } +} + +/// Interactively retry when hint header received from warg server +pub async fn with_interactive_retry(f: impl Fn(Option) -> F) -> Result<()> +where + F: Future>, +{ + f(None).await?; + Ok(()) +} diff --git a/crates/client/src/storage.rs b/crates/client/src/storage.rs index 0dc09f51..92b6cb4a 100644 --- a/crates/client/src/storage.rs +++ b/crates/client/src/storage.rs @@ -23,15 +23,9 @@ mod fs; pub use fs::*; /// Registry domain used for warg header values -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RegistryDomain(String); -// impl From for RegistryDomain { -// fn from(value: String) -> Self { -// RegistryDomain(value) -// } -// } - impl FromStr for RegistryDomain { type Err = Error; @@ -46,13 +40,6 @@ impl ToString for RegistryDomain { } } -// impl TryFrom for RegistryName ... - -// impl From for HeaderValue { -// fn from(value: RegistryDomain) -> Self { -// HeaderValue::to_str(&value.to_string()) -// } -// } impl TryFrom for HeaderValue { type Error = Error; @@ -61,14 +48,6 @@ impl TryFrom for HeaderValue { } } -// impl TryInto for HeaderValue { -// type Error = Error; - -// fn try_into(self) -> std::result::Result { -// // Ok(HeaderValue::from_str(&value.to_string())?) - -// } -// } /// Trait for registry storage implementations. /// /// Stores information such as package/operator logs and checkpoints @@ -86,13 +65,13 @@ pub trait RegistryStorage: Send + Sync { /// Loads most recent checkpoint async fn load_checkpoint( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, ) -> Result>>; /// Stores most recent checkpoint async fn store_checkpoint( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, ts_checkpoint: &SerdeEnvelope, ) -> Result<()>; @@ -101,13 +80,13 @@ pub trait RegistryStorage: Send + Sync { /// Returns `Ok(None)` if the information is not present. async fn load_operator( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, ) -> Result>; /// Stores the operator information in the storage. async fn store_operator( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, operator: OperatorInfo, ) -> Result<()>; @@ -122,14 +101,14 @@ pub trait RegistryStorage: Send + Sync { /// Returns `Ok(None)` if the information is not present. async fn load_package( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, package: &PackageName, ) -> Result>; /// Stores the package information in the storage. async fn store_package( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, info: &PackageInfo, ) -> Result<()>; diff --git a/crates/client/src/storage/fs.rs b/crates/client/src/storage/fs.rs index 79568319..c6014243 100644 --- a/crates/client/src/storage/fs.rs +++ b/crates/client/src/storage/fs.rs @@ -81,7 +81,7 @@ impl FileSystemRegistryStorage { }) } - fn operator_path(&self, namespace_registry: &Option) -> PathBuf { + fn operator_path(&self, namespace_registry: Option<&RegistryDomain>) -> PathBuf { if let Some(nm) = namespace_registry { return self .registries_dir @@ -93,7 +93,7 @@ impl FileSystemRegistryStorage { fn package_path( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, name: &PackageName, ) -> PathBuf { if let Some(nm) = namespace_registry { @@ -131,7 +131,7 @@ impl RegistryStorage for FileSystemRegistryStorage { async fn load_checkpoint( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, ) -> Result>> { if let Some(nm) = namespace_registry { return load(&self.registries_dir.join(nm.to_string()).join("checkpoint")).await; @@ -141,7 +141,7 @@ impl RegistryStorage for FileSystemRegistryStorage { async fn store_checkpoint( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, ts_checkpoint: &SerdeEnvelope, ) -> Result<()> { if let Some(nm) = namespace_registry { @@ -231,14 +231,14 @@ impl RegistryStorage for FileSystemRegistryStorage { async fn load_operator( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, ) -> Result> { Ok(load(&self.operator_path(namespace_registry)).await?) } async fn store_operator( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, info: OperatorInfo, ) -> Result<()> { store(&self.operator_path(namespace_registry), info).await @@ -246,7 +246,7 @@ impl RegistryStorage for FileSystemRegistryStorage { async fn load_package( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, package: &PackageName, ) -> Result> { Ok(load(&self.package_path(namespace_registry, package)).await?) @@ -254,7 +254,7 @@ impl RegistryStorage for FileSystemRegistryStorage { async fn store_package( &self, - namespace_registry: &Option, + namespace_registry: Option<&RegistryDomain>, info: &PackageInfo, ) -> Result<()> { store(&self.package_path(namespace_registry, &info.name), info).await diff --git a/src/bin/warg.rs b/src/bin/warg.rs index e088e25a..7cbb9cf5 100644 --- a/src/bin/warg.rs +++ b/src/bin/warg.rs @@ -5,10 +5,10 @@ use std::process::exit; use tracing_subscriber::EnvFilter; use warg_cli::commands::{ BundleCommand, ClearCommand, ConfigCommand, DependenciesCommand, DownloadCommand, InfoCommand, - KeyCommand, LockCommand, LoginCommand, LogoutCommand, PublishCommand, ResetCommand, Retry, + KeyCommand, LockCommand, LoginCommand, LogoutCommand, PublishCommand, ResetCommand, UpdateCommand, }; -use warg_client::ClientError; +use warg_client::{with_interactive_retry, ClientError, Retry}; fn version() -> &'static str { option_env!("CARGO_VERSION_INFO").unwrap_or(env!("CARGO_PKG_VERSION")) @@ -46,27 +46,98 @@ async fn main() -> Result<()> { .with_env_filter(EnvFilter::from_default_env()) .init(); - if let Err(e) = match WargCli::parse() { - WargCli::Config(cmd) => cmd.exec().await, - WargCli::Info(cmd) => cmd.exec().await, - WargCli::Key(cmd) => cmd.exec().await, - WargCli::Lock(cmd) => cmd.exec(None).await, - WargCli::Bundle(cmd) => cmd.exec(None).await, - WargCli::Dependencies(cmd) => cmd.exec(None).await, - WargCli::Download(cmd) => cmd.exec(None).await, - WargCli::Update(cmd) => cmd.exec(None).await, - WargCli::Publish(cmd) => cmd.exec(None).await, - WargCli::Reset(cmd) => cmd.exec().await, - WargCli::Clear(cmd) => cmd.exec().await, - WargCli::Login(cmd) => cmd.exec().await, - WargCli::Logout(cmd) => cmd.exec().await, - } { - if let Some(e) = e.downcast_ref::() { - describe_client_error_or_retry(e).await?; - } else { - eprintln!("error: {e:?}"); + match &WargCli::parse() { + WargCli::Config(cmd) => cmd.clone().exec().await?, + WargCli::Info(cmd) => cmd.clone().exec().await?, + WargCli::Key(cmd) => cmd.clone().exec().await?, + WargCli::Lock(cmd) => { + with_interactive_retry(|retry: Option| async { + if let Err(e) = cmd.clone().exec(retry).await { + if let Some(e) = e.downcast_ref::() { + describe_client_error_or_retry(e).await?; + } else { + eprintln!("error: {e:?}"); + } + exit(1); + } + Ok(()) + }) + .await? + } + WargCli::Bundle(cmd) => { + with_interactive_retry(|retry: Option| async { + if let Err(e) = cmd.clone().exec(retry).await { + if let Some(e) = e.downcast_ref::() { + describe_client_error_or_retry(e).await?; + } else { + eprintln!("error: {e:?}"); + } + exit(1); + } + Ok(()) + }) + .await? + } + WargCli::Dependencies(cmd) => { + with_interactive_retry(|retry: Option| async { + if let Err(e) = cmd.clone().exec(retry).await { + if let Some(e) = e.downcast_ref::() { + describe_client_error_or_retry(e).await?; + } else { + eprintln!("error: {e:?}"); + } + exit(1); + } + Ok(()) + }) + .await? + } + WargCli::Download(cmd) => { + with_interactive_retry(|retry: Option| async { + if let Err(e) = cmd.clone().exec(retry).await { + if let Some(e) = e.downcast_ref::() { + describe_client_error_or_retry(e).await?; + } else { + eprintln!("error: {e:?}"); + } + exit(1); + } + Ok(()) + }) + .await? + } + WargCli::Update(cmd) => { + with_interactive_retry(|retry: Option| async { + if let Err(e) = cmd.clone().exec(retry).await { + if let Some(e) = e.downcast_ref::() { + describe_client_error_or_retry(e).await?; + } else { + eprintln!("error: {e:?}"); + } + exit(1); + } + Ok(()) + }) + .await? + } + WargCli::Publish(cmd) => { + with_interactive_retry(|retry: Option| async { + if let Err(e) = cmd.clone().exec(retry).await { + if let Some(e) = e.downcast_ref::() { + describe_client_error_or_retry(e).await?; + } else { + eprintln!("error: {e:?}"); + } + exit(1); + } + Ok(()) + }) + .await? } - exit(1); + WargCli::Reset(cmd) => cmd.clone().exec().await?, + WargCli::Clear(cmd) => cmd.clone().exec().await?, + WargCli::Login(cmd) => cmd.clone().exec().await?, + WargCli::Logout(cmd) => cmd.clone().exec().await?, } Ok(()) @@ -167,7 +238,7 @@ async fn describe_client_error_or_retry(e: &ClientError) -> Result<()> { Ok(()) } -async fn describe_client_error(e: &ClientError) -> Result<()> { +pub async fn describe_client_error(e: &ClientError) -> Result<()> { match e { ClientError::NoHomeRegistryUrl => { eprintln!("error: {e}; use the `config` subcommand to set a default URL"); diff --git a/src/commands.rs b/src/commands.rs index 201e47ba..376efaab 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -4,13 +4,9 @@ use anyhow::Result; use clap::Args; use secrecy::Secret; use std::path::PathBuf; -use std::str::FromStr; -use warg_client::storage::ContentStorage; -use warg_client::storage::NamespaceMapStorage; use warg_client::storage::RegistryDomain; -use warg_client::storage::RegistryStorage; -use warg_client::Client; use warg_client::RegistryUrl; +use warg_client::Retry; use warg_client::{ClientError, Config, FileSystemClient, StorageLockResult}; use warg_credentials::keyring::{get_auth_token, get_signing_key}; use warg_crypto::signing::PrivateKey; @@ -44,7 +40,7 @@ pub use self::reset::*; pub use self::update::*; /// Common options for commands. -#[derive(Args)] +#[derive(Args, Clone)] pub struct CommonOptions { /// The URL of the registry to use. #[clap(long, value_name = "URL")] @@ -82,7 +78,9 @@ impl CommonOptions { self.registry.as_deref(), config, self.auth_token(config)?, - )? { + ) + .await? + { StorageLockResult::Acquired(client) => Ok(client), StorageLockResult::NotAcquired(path) => { println!( @@ -95,6 +93,7 @@ impl CommonOptions { config, self.auth_token(config)?.map(Secret::from), ) + .await } }?; if let Some(retry) = retry { @@ -104,18 +103,13 @@ impl CommonOptions { } /// Gets the signing key for the given registry URL. - pub fn signing_key( + pub async fn signing_key( &self, - client: &Client, + registry_domain: Option<&RegistryDomain>, ) -> Result { - let registry_url = if let Some(nm) = &client.get_warg_registry() { - Some(RegistryUrl::new(nm.to_string())?) - } else { - None - }; let config = self.read_config()?; get_signing_key( - registry_url.map(|reg| reg.safe_label()).as_deref(), + registry_domain.map(|domain| domain.to_string()).as_deref(), &config.keys, config.home_url.as_deref(), ) @@ -134,33 +128,3 @@ impl CommonOptions { Ok(None) } } - -/// Namespace mapping to store when retrying a command after receiving a hint header -pub struct Retry { - namespace: String, - registry: String, -} - -impl Retry { - /// New Retry - pub fn new(namespace: String, registry: String) -> Self { - Self { - namespace, - registry, - } - } - - /// Map namespace using Retry information - pub async fn store_namespace( - &self, - client: &Client, - ) -> Result<()> { - client - .store_namespace( - self.namespace.clone(), - RegistryDomain::from_str(&self.registry)?, - ) - .await?; - Ok(()) - } -} diff --git a/src/commands/bundle.rs b/src/commands/bundle.rs index 8256fea8..b0e4003a 100644 --- a/src/commands/bundle.rs +++ b/src/commands/bundle.rs @@ -1,11 +1,12 @@ -use super::{CommonOptions, Retry}; +use super::CommonOptions; use anyhow::{bail, Result}; use clap::Args; use semver::VersionReq; use warg_client::storage::RegistryStorage; +use warg_client::Retry; use warg_protocol::registry::PackageName; /// Bundle With Registry Dependencies -#[derive(Args)] +#[derive(Args, Clone)] pub struct BundleCommand { /// The common command options. #[clap(flatten)] @@ -20,20 +21,22 @@ impl BundleCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; - client.refresh_namespace(self.package.namespace()).await?; + let client = self.common.create_client(&config, retry).await?; + let registry_domain = client.get_warg_registry(self.package.namespace()).await?; println!("registry: {url}", url = client.url()); if let Some(info) = client .registry() - .load_package(client.get_warg_registry(), &self.package) + .load_package(registry_domain.as_ref(), &self.package) .await? { client.bundle_component(&info).await?; } else { - client.download(&self.package, &VersionReq::STAR).await?; + client + .download(registry_domain.as_ref(), &self.package, &VersionReq::STAR) + .await?; if let Some(info) = client .registry() - .load_package(client.get_warg_registry(), &self.package) + .load_package(registry_domain.as_ref(), &self.package) .await? { client.bundle_component(&info).await?; diff --git a/src/commands/clear.rs b/src/commands/clear.rs index 0f9ccc6d..603d87e0 100644 --- a/src/commands/clear.rs +++ b/src/commands/clear.rs @@ -3,7 +3,7 @@ use anyhow::Result; use clap::Args; /// Deletes local content cache. -#[derive(Args)] +#[derive(Args, Clone)] pub struct ClearCommand { /// The common command options. #[clap(flatten)] diff --git a/src/commands/config.rs b/src/commands/config.rs index a151df19..628aa546 100644 --- a/src/commands/config.rs +++ b/src/commands/config.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use warg_client::{Config, RegistryUrl}; /// Creates a new warg configuration file. -#[derive(Args)] +#[derive(Args, Clone)] pub struct ConfigCommand { /// The common command options. #[clap(flatten)] diff --git a/src/commands/dependencies.rs b/src/commands/dependencies.rs index e51781af..4a54e6d9 100644 --- a/src/commands/dependencies.rs +++ b/src/commands/dependencies.rs @@ -15,7 +15,7 @@ use warg_protocol::{package::ReleaseState, registry::PackageName, VersionReq}; use wasmparser::{Chunk, ComponentImport, ComponentImportSectionReader, Parser, Payload}; /// Print Dependency Tree -#[derive(Args)] +#[derive(Args, Clone)] pub struct DependenciesCommand { /// The common command options. #[clap(flatten)] @@ -30,12 +30,17 @@ impl DependenciesCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; - client.refresh_namespace(self.package.namespace()).await?; + let client = self.common.create_client(&config, retry).await?; if let Some(info) = client .registry() - .load_package(client.get_warg_registry(), &self.package) + .load_package( + client + .get_warg_registry(self.package.namespace()) + .await? + .as_ref(), + &self.package, + ) .await? { Self::print_package_info(&client, &info).await?; @@ -52,11 +57,14 @@ impl DependenciesCommand { node: &mut TreeBuilder, parser: &mut DepsParser, ) -> Result<()> { - client.download(id, &version).await?; + let registry_domain = client.get_warg_registry(id.namespace()).await?; + client + .download(registry_domain.as_ref(), id, &version) + .await?; let package = client .registry() - .load_package(client.get_warg_registry(), id) + .load_package(registry_domain.as_ref(), id) .await?; if let Some(pkg) = package { let latest = pkg.state.releases().last(); @@ -92,15 +100,18 @@ impl DependenciesCommand { } async fn print_package_info(client: &FileSystemClient, info: &PackageInfo) -> Result<()> { + let registry_domain = client.get_warg_registry(info.name.namespace()).await?; let mut parser = DepsParser::new(); let root_package = client .registry() - .load_package(client.get_warg_registry(), &info.name) + .load_package(registry_domain.as_ref(), &info.name) .await?; if let Some(rp) = root_package { let latest = rp.state.releases().last(); if let Some(l) = latest { - client.download(&info.name, &VersionReq::STAR).await?; + client + .download(registry_domain.as_ref(), &info.name, &VersionReq::STAR) + .await?; let mut tree = new_tree(info.name.namespace(), info.name.name(), &l.version); if let ReleaseState::Released { content } = &l.state { let path = client.content().content_location(content); diff --git a/src/commands/download.rs b/src/commands/download.rs index 703874f6..4030f631 100644 --- a/src/commands/download.rs +++ b/src/commands/download.rs @@ -4,7 +4,7 @@ use clap::Args; use warg_protocol::{registry::PackageName, VersionReq}; /// Download a warg registry package. -#[derive(Args)] +#[derive(Args, Clone)] #[clap(disable_version_flag = true)] pub struct DownloadCommand { /// The common command options. @@ -22,13 +22,14 @@ impl DownloadCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; - client.refresh_namespace(self.name.namespace()).await?; + let client = self.common.create_client(&config, retry).await?; + let registry_domain = client.get_warg_registry(self.name.namespace()).await?; println!("downloading package `{name}`...", name = self.name); let res = client .download( + registry_domain.as_ref(), &self.name, self.version.as_ref().unwrap_or(&VersionReq::STAR), ) diff --git a/src/commands/info.rs b/src/commands/info.rs index 2a4921a9..21fa30ed 100644 --- a/src/commands/info.rs +++ b/src/commands/info.rs @@ -9,7 +9,7 @@ use warg_crypto::hash::AnyHash; use warg_protocol::{registry::PackageName, Version}; /// Display client storage information. -#[derive(Args)] +#[derive(Args, Clone)] pub struct InfoCommand { /// The common command options. #[clap(flatten)] @@ -28,16 +28,21 @@ impl InfoCommand { /// Executes the command. pub async fn exec(self) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, None).await?; + let client = self.common.create_client(&config, None).await?; println!("registry: {url}", url = client.url()); println!("\npackages in client storage:"); match self.package { Some(package) => { - client.refresh_namespace(package.namespace()).await?; if let Some(info) = client .registry() - .load_package(client.get_warg_registry(), &package) + .load_package( + client + .get_warg_registry(package.namespace()) + .await? + .as_ref(), + &package, + ) .await? { Self::print_package_info(&info); diff --git a/src/commands/key.rs b/src/commands/key.rs index 78115f7d..06aeb3a1 100644 --- a/src/commands/key.rs +++ b/src/commands/key.rs @@ -10,7 +10,7 @@ use warg_crypto::signing::PrivateKey; use super::CommonOptions; /// Manage signing keys for interacting with a registry. -#[derive(Args)] +#[derive(Args, Clone)] pub struct KeyCommand { /// The subcommand to execute. #[clap(subcommand)] @@ -30,7 +30,7 @@ impl KeyCommand { } /// The subcommand to execute. -#[derive(Subcommand)] +#[derive(Subcommand, Clone)] pub enum KeySubcommand { /// Creates a new signing key for a registry in the local keyring. New(KeyNewCommand), @@ -43,7 +43,7 @@ pub enum KeySubcommand { } /// Creates a new signing key for a registry in the local keyring. -#[derive(Args)] +#[derive(Args, Clone)] pub struct KeyNewCommand { /// The common command options. #[clap(flatten)] @@ -75,7 +75,7 @@ impl KeyNewCommand { } /// Shows information about the signing key for a registry in the local keyring. -#[derive(Args)] +#[derive(Args, Clone)] pub struct KeyInfoCommand { /// The common command options. #[clap(flatten)] @@ -99,7 +99,7 @@ impl KeyInfoCommand { } /// Sets the signing key for a registry in the local keyring. -#[derive(Args)] +#[derive(Args, Clone)] pub struct KeySetCommand { /// The common command options. #[clap(flatten)] @@ -132,7 +132,7 @@ impl KeySetCommand { } /// Deletes the signing key for a registry from the local keyring. -#[derive(Args)] +#[derive(Args, Clone)] pub struct KeyDeleteCommand { /// The common command options. #[clap(flatten)] diff --git a/src/commands/lock.rs b/src/commands/lock.rs index 0fa6b7e8..1dddf494 100644 --- a/src/commands/lock.rs +++ b/src/commands/lock.rs @@ -9,7 +9,7 @@ use warg_client::{ use warg_protocol::registry::PackageName; /// Print Dependency Tree -#[derive(Args)] +#[derive(Args, Clone)] pub struct LockCommand { /// The common command options. #[clap(flatten)] @@ -24,20 +24,28 @@ impl LockCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; - client.refresh_namespace(self.package.namespace()).await?; + let client = self.common.create_client(&config, retry).await?; + let registry_domain = client.get_warg_registry(self.package.namespace()).await?; println!("registry: {url}", url = client.url()); if let Some(info) = client .registry() - .load_package(client.get_warg_registry(), &self.package) + .load_package(registry_domain.as_ref(), &self.package) .await? { Self::lock(client, &info).await?; } else { - client.download(&self.package, &VersionReq::STAR).await?; + client + .download(registry_domain.as_ref(), &self.package, &VersionReq::STAR) + .await?; if let Some(info) = client .registry() - .load_package(client.get_warg_registry(), &self.package) + .load_package( + client + .get_warg_registry(self.package.namespace()) + .await? + .as_ref(), + &self.package, + ) .await? { Self::lock(client, &info).await?; diff --git a/src/commands/login.rs b/src/commands/login.rs index 6a3ecfe1..f8d2d4d0 100644 --- a/src/commands/login.rs +++ b/src/commands/login.rs @@ -10,7 +10,7 @@ use warg_credentials::keyring::{set_auth_token, set_signing_key}; use super::CommonOptions; /// Manage auth tokens for interacting with a registry. -#[derive(Args)] +#[derive(Args, Clone)] pub struct LoginCommand { /// The common command options. #[clap(flatten)] @@ -21,7 +21,7 @@ pub struct LoginCommand { keyring_entry: KeyringEntryArgs, } -#[derive(Args)] +#[derive(Args, Clone)] struct KeyringEntryArgs { /// The URL of the registry to store an auth token for. #[clap(value_name = "URL")] diff --git a/src/commands/logout.rs b/src/commands/logout.rs index e6eeb959..031b3ba5 100644 --- a/src/commands/logout.rs +++ b/src/commands/logout.rs @@ -7,7 +7,7 @@ use warg_credentials::keyring::delete_auth_token; use super::CommonOptions; /// Manage auth tokens for interacting with a registry. -#[derive(Args)] +#[derive(Args, Clone)] pub struct LogoutCommand { /// The common command options. #[clap(flatten)] @@ -17,7 +17,7 @@ pub struct LogoutCommand { keyring_entry: KeyringEntryArgs, } -#[derive(Args)] +#[derive(Args, Clone)] struct KeyringEntryArgs { /// The URL of the registry to delete an auth token for. #[clap(value_name = "URL")] diff --git a/src/commands/publish.rs b/src/commands/publish.rs index 553adf15..8d1e7e4f 100644 --- a/src/commands/publish.rs +++ b/src/commands/publish.rs @@ -57,7 +57,7 @@ where } /// Publish a package to a warg registry. -#[derive(Subcommand)] +#[derive(Subcommand, Clone)] pub enum PublishCommand { /// Initialize a new package. Init(PublishInitCommand), @@ -100,7 +100,7 @@ impl PublishCommand { } /// Initialize a new package. -#[derive(Args)] +#[derive(Args, Clone)] #[clap(disable_version_flag = true)] pub struct PublishInitCommand { /// The common command options. @@ -118,11 +118,10 @@ impl PublishInitCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; + let client = self.common.create_client(&config, retry).await?; + let registry_domain = client.get_warg_registry(self.name.namespace()).await?; - client.refresh_namespace(self.name.namespace()).await?; - - let signing_key = self.common.signing_key(&client)?; + let signing_key = self.common.signing_key(registry_domain.as_ref()).await?; match enqueue(&client, &self.name, |_| { std::future::ready(Ok(PublishEntry::Init)) }) @@ -166,7 +165,7 @@ impl PublishInitCommand { } /// Publish a package to a warg registry. -#[derive(Args)] +#[derive(Args, Clone)] #[clap(disable_version_flag = true)] pub struct PublishReleaseCommand { /// The common command options. @@ -190,9 +189,9 @@ impl PublishReleaseCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; - client.refresh_namespace(self.name.namespace()).await?; - let signing_key = self.common.signing_key(&client)?; + let client = self.common.create_client(&config, retry).await?; + let registry_domain = client.get_warg_registry(self.name.namespace()).await?; + let signing_key = self.common.signing_key(registry_domain.as_ref()).await?; let path = self.path.clone(); let version = self.version.clone(); @@ -256,7 +255,7 @@ impl PublishReleaseCommand { } /// Yank a package release from a warg registry. -#[derive(Args)] +#[derive(Args, Clone)] #[clap(disable_version_flag = true)] pub struct PublishYankCommand { /// The common command options. @@ -277,9 +276,9 @@ impl PublishYankCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; - client.refresh_namespace(self.name.namespace()).await?; - let signing_key = self.common.signing_key(&client)?; + let client = self.common.create_client(&config, retry).await?; + let registry_domain = client.get_warg_registry(self.name.namespace()).await?; + let signing_key = self.common.signing_key(registry_domain.as_ref()).await?; let version = self.version.clone(); match enqueue(&client, &self.name, move |_| async move { @@ -327,7 +326,7 @@ impl PublishYankCommand { } /// Publish a package to a warg registry. -#[derive(Args)] +#[derive(Args, Clone)] #[clap(disable_version_flag = true)] pub struct PublishGrantCommand { /// The common command options. @@ -355,9 +354,9 @@ impl PublishGrantCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; - client.refresh_namespace(self.name.namespace()).await?; - let signing_key = self.common.signing_key(&client)?; + let client = self.common.create_client(&config, retry).await?; + let registry_domain = client.get_warg_registry(self.name.namespace()).await?; + let signing_key = self.common.signing_key(registry_domain.as_ref()).await?; match enqueue(&client, &self.name, |_| async { Ok(PublishEntry::Grant { @@ -409,7 +408,7 @@ impl PublishGrantCommand { } /// Publish a package to a warg registry. -#[derive(Args)] +#[derive(Args, Clone)] #[clap(disable_version_flag = true)] pub struct PublishRevokeCommand { /// The common command options. @@ -437,9 +436,9 @@ impl PublishRevokeCommand { /// Executes the command. pub async fn exec(self, retry: Option) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, retry).await?; - client.refresh_namespace(self.name.namespace()).await?; - let signing_key = self.common.signing_key(&client)?; + let client = self.common.create_client(&config, retry).await?; + let registry_domain = client.get_warg_registry(self.name.namespace()).await?; + let signing_key = self.common.signing_key(registry_domain.as_ref()).await?; match enqueue(&client, &self.name, |_| async { Ok(PublishEntry::Revoke { @@ -491,7 +490,7 @@ impl PublishRevokeCommand { } /// Start a new pending publish. -#[derive(Args)] +#[derive(Args, Clone)] #[clap(disable_version_flag = true)] pub struct PublishStartCommand { /// The common command options. @@ -506,8 +505,7 @@ impl PublishStartCommand { /// Executes the command. pub async fn exec(self) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, None).await?; - client.refresh_namespace(self.name.namespace()).await?; + let client = self.common.create_client(&config, None).await?; match client.registry().load_publish().await? { Some(info) => bail!("a publish is already in progress for package `{name}`; use `publish abort` to abort the current publish", name = info.name), @@ -530,7 +528,7 @@ impl PublishStartCommand { } /// List the records in a pending publish. -#[derive(Args)] +#[derive(Args, Clone)] pub struct PublishListCommand { /// The common command options. #[clap(flatten)] @@ -586,7 +584,7 @@ impl PublishListCommand { } /// Abort a pending publish. -#[derive(Args)] +#[derive(Args, Clone)] pub struct PublishAbortCommand { /// The common command options. #[clap(flatten)] @@ -615,7 +613,7 @@ impl PublishAbortCommand { } /// Submit a pending publish. -#[derive(Args)] +#[derive(Args, Clone)] pub struct PublishSubmitCommand { /// The common command options. #[clap(flatten)] @@ -638,7 +636,7 @@ impl PublishSubmitCommand { name = info.name ); - let signing_key = self.common.signing_key(&client)?; + let signing_key = self.common.signing_key(None).await?; let record_id = client.publish_with_info(&signing_key, info.clone()).await?; client.registry().store_publish(None).await?; @@ -688,7 +686,7 @@ impl PublishSubmitCommand { } /// Wait for a pending publish to complete. -#[derive(Args)] +#[derive(Args, Clone)] pub struct PublishWaitCommand { /// The common command options. #[clap(flatten)] @@ -707,8 +705,7 @@ impl PublishWaitCommand { /// Executes the command. pub async fn exec(self) -> Result<()> { let config = self.common.read_config()?; - let mut client = self.common.create_client(&config, None).await?; - client.refresh_namespace(self.name.namespace()).await?; + let client = self.common.create_client(&config, None).await?; let record_id = RecordId::from(self.record_id); println!( diff --git a/src/commands/reset.rs b/src/commands/reset.rs index c16c1ed9..24e4a186 100644 --- a/src/commands/reset.rs +++ b/src/commands/reset.rs @@ -3,7 +3,7 @@ use anyhow::Result; use clap::Args; /// Reset local data for registry. -#[derive(Args)] +#[derive(Args, Clone)] pub struct ResetCommand { /// The common command options. #[clap(flatten)] diff --git a/src/commands/update.rs b/src/commands/update.rs index 4c9b8376..1a8aade0 100644 --- a/src/commands/update.rs +++ b/src/commands/update.rs @@ -3,7 +3,7 @@ use anyhow::Result; use clap::{ArgAction, Args}; /// Update all local package logs for a registry. -#[derive(Args)] +#[derive(Args, Clone)] pub struct UpdateCommand { /// The common command options. #[clap(flatten)] diff --git a/tests/client.rs b/tests/client.rs index 20a26db9..7522e8a7 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -9,8 +9,8 @@ use warg_protocol::registry::PackageName; pub mod support; -fn create_client(config: &Config) -> Result { - match FileSystemClient::try_new_with_config(None, config, None)? { +async fn create_client(config: &Config) -> Result { + match FileSystemClient::try_new_with_config(None, config, None).await? { StorageLockResult::Acquired(client) => Ok(client), _ => bail!("failed to acquire storage lock"), } @@ -23,7 +23,7 @@ async fn client_incrementally_fetches() -> Result<()> { let (_server, config) = spawn_server(&root().await?, None, None, None).await?; - let client = create_client(&config)?; + let client = create_client(&config).await?; let signing_key = support::test_signing_key(); // Store a single component that will be used for every release @@ -78,7 +78,7 @@ async fn client_incrementally_fetches() -> Result<()> { .context("failed to remove registries directory")?; // Recreate the client with the same config - let client = create_client(&config)?; + let client = create_client(&config).await?; // Regression test: update on empty registry storage client.update().await?; @@ -93,7 +93,10 @@ async fn client_incrementally_fetches() -> Result<()> { // Ensure the package log exists and has releases with all with the same digest let package = client .registry() - .load_package(client.get_warg_registry(), &name) + .load_package( + client.get_warg_registry(name.namespace()).await?.as_ref(), + &name, + ) .await? .context("package does not exist in client storage")?; diff --git a/tests/depsolve.rs b/tests/depsolve.rs index 25ec0102..59f11fbe 100644 --- a/tests/depsolve.rs +++ b/tests/depsolve.rs @@ -17,7 +17,7 @@ pub mod support; async fn depsolve() -> Result<()> { let (_server, config) = spawn_server(&root().await?, None, None, None).await?; - let client = create_client(&config)?; + let client = create_client(&config).await?; let signing_key = support::test_signing_key(); let mut head = publish_package( @@ -89,7 +89,10 @@ async fn depsolve() -> Result<()> { let info = client .registry() - .load_package(client.get_warg_registry(), &PackageName::new("test:meet")?) + .load_package( + client.get_warg_registry("test").await?.as_ref(), + &PackageName::new("test:meet")?, + ) .await? .context("package does not exist in client storage")?; @@ -147,7 +150,7 @@ async fn publish_package( name: name.clone(), head: Some(head), entries: vec![PublishEntry::Release { - version: format!("1.0.0").parse().unwrap(), + version: "1.0.0".to_string().parse().unwrap(), content: add_digest.clone(), }], }, diff --git a/tests/memory/mod.rs b/tests/memory/mod.rs index daa15ab0..4dc34b40 100644 --- a/tests/memory/mod.rs +++ b/tests/memory/mod.rs @@ -17,7 +17,7 @@ async fn it_publishes_a_component() -> Result<()> { // There should be two log entries in the registry let client = api::Client::new(config.home_url.as_ref().unwrap(), None)?; - let ts_checkpoint = client.latest_checkpoint().await?; + let ts_checkpoint = client.latest_checkpoint(None).await?; assert_eq!( ts_checkpoint.as_ref().checkpoint.log_length, 2, @@ -36,7 +36,7 @@ async fn it_yanks_a_package() -> Result<()> { // There should be three entries in the registry let client = api::Client::new(config.home_url.as_ref().unwrap(), None)?; - let ts_checkpoint = client.latest_checkpoint().await?; + let ts_checkpoint = client.latest_checkpoint(None).await?; assert_eq!( ts_checkpoint.as_ref().checkpoint.log_length, 3, @@ -53,7 +53,7 @@ async fn it_publishes_a_wit_package() -> Result<()> { // There should be two log entries in the registry let client = api::Client::new(config.home_url.as_ref().unwrap(), None)?; - let ts_checkpoint = client.latest_checkpoint().await?; + let ts_checkpoint = client.latest_checkpoint(None).await?; assert_eq!( ts_checkpoint.as_ref().checkpoint.log_length, 2, diff --git a/tests/postgres/mod.rs b/tests/postgres/mod.rs index bd348440..c3d0096d 100644 --- a/tests/postgres/mod.rs +++ b/tests/postgres/mod.rs @@ -67,7 +67,7 @@ async fn it_works_with_postgres() -> TestResult { // There should be two log entries in the registry let client = api::Client::new(config.home_url.as_ref().unwrap(), None)?; - let ts_checkpoint = client.latest_checkpoint().await?; + let ts_checkpoint = client.latest_checkpoint(None).await?; assert_eq!( ts_checkpoint.as_ref().checkpoint.log_length, packages.len() as RegistryLen + 2, /* publishes + initial checkpoint + yank */ @@ -85,7 +85,7 @@ async fn it_works_with_postgres() -> TestResult { packages.push(PackageName::new("test:unknown-key")?); let client = api::Client::new(config.home_url.as_ref().unwrap(), None)?; - let ts_checkpoint = client.latest_checkpoint().await?; + let ts_checkpoint = client.latest_checkpoint(None).await?; assert_eq!( ts_checkpoint.as_ref().checkpoint.log_length, packages.len() as RegistryLen + 2, /* publishes + initial checkpoint + yank*/ @@ -97,7 +97,7 @@ async fn it_works_with_postgres() -> TestResult { fs::remove_dir_all(root.join("content"))?; fs::remove_dir_all(root.join("registries"))?; - let client = create_client(&config)?; + let client = create_client(&config).await?; client.upsert(packages.iter()).await?; // Finally, after a restart, ensure the packages can be downloaded @@ -106,7 +106,7 @@ async fn it_works_with_postgres() -> TestResult { continue; } client - .download(&package, &"0.1.0".parse()?) + .download(None, &package, &"0.1.0".parse()?) .await? .context("failed to resolve package")?; } diff --git a/tests/server.rs b/tests/server.rs index 672f3faf..e80d09b8 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -41,7 +41,7 @@ mod postgres; async fn test_initial_checkpoint(config: &Config) -> Result<()> { let client = api::Client::new(config.home_url.as_ref().unwrap(), None)?; - let ts_checkpoint = client.latest_checkpoint().await?; + let ts_checkpoint = client.latest_checkpoint(None).await?; let checkpoint = &ts_checkpoint.as_ref().checkpoint; // There should be only a single log entry (the initial operator log entry) @@ -71,7 +71,8 @@ async fn test_component_publishing(config: &Config) -> Result<()> { const PACKAGE_VERSION: &str = "0.1.0"; let name = PackageName::new(PACKAGE_NAME)?; - let client = create_client(config)?; + let client = create_client(config).await?; + let registry_domain = client.get_warg_registry(name.namespace()).await?; let signing_key = test_signing_key(); let digest = publish_component( &client, @@ -86,7 +87,7 @@ async fn test_component_publishing(config: &Config) -> Result<()> { // Assert that the package can be downloaded client.upsert([&name]).await?; let download = client - .download(&name, &PACKAGE_VERSION.parse()?) + .download(registry_domain.as_ref(), &name, &PACKAGE_VERSION.parse()?) .await? .context("failed to resolve package")?; @@ -109,7 +110,10 @@ async fn test_component_publishing(config: &Config) -> Result<()> { } // Assert that a different version can't be downloaded - assert!(client.download(&name, &"0.2.0".parse()?).await?.is_none()); + assert!(client + .download(registry_domain.as_ref(), &name, &"0.2.0".parse()?) + .await? + .is_none()); Ok(()) } @@ -120,7 +124,8 @@ async fn test_package_yanking(config: &Config) -> Result<()> { // Publish release let name = PackageName::new(PACKAGE_NAME)?; - let client = create_client(config)?; + let client = create_client(config).await?; + let registry_domain = client.get_warg_registry(name.namespace()).await?; let signing_key = test_signing_key(); publish( &client, @@ -151,7 +156,9 @@ async fn test_package_yanking(config: &Config) -> Result<()> { // Assert that the package is yanked client.upsert([&name]).await?; - let opt = client.download(&name, &PACKAGE_VERSION.parse()?).await?; + let opt = client + .download(registry_domain.as_ref(), &name, &PACKAGE_VERSION.parse()?) + .await?; assert!(opt.is_none(), "expected no download, got {opt:?}"); Ok(()) } @@ -161,7 +168,8 @@ async fn test_wit_publishing(config: &Config) -> Result<()> { const PACKAGE_VERSION: &str = "0.1.0"; let name = PackageName::new(PACKAGE_NAME)?; - let client = create_client(config)?; + let client = create_client(config).await?; + let registry_domain = client.get_warg_registry(name.namespace()).await?; let signing_key = test_signing_key(); let digest = publish_wit( &client, @@ -176,7 +184,7 @@ async fn test_wit_publishing(config: &Config) -> Result<()> { // Assert that the package can be downloaded client.upsert([&name]).await?; let download = client - .download(&name, &PACKAGE_VERSION.parse()?) + .download(registry_domain.as_ref(), &name, &PACKAGE_VERSION.parse()?) .await? .context("failed to resolve package")?; @@ -199,7 +207,10 @@ async fn test_wit_publishing(config: &Config) -> Result<()> { } // Assert that a different version can't be downloaded - assert!(client.download(&name, &"0.2.0".parse()?).await?.is_none()); + assert!(client + .download(registry_domain.as_ref(), &name, &"0.2.0".parse()?) + .await? + .is_none()); Ok(()) } @@ -211,7 +222,7 @@ async fn test_wasm_content_policy(config: &Config) -> Result<()> { // Publish empty content to the server // This should be rejected by policy because it is not valid WebAssembly let name = PackageName::new(PACKAGE_NAME)?; - let client = create_client(config)?; + let client = create_client(config).await?; let signing_key = test_signing_key(); match publish( &client, @@ -269,7 +280,7 @@ async fn test_unauthorized_signing_key(config: &Config) -> Result<()> { // Start by publishing a new component package let name = PackageName::new(PACKAGE_NAME)?; - let client = create_client(config)?; + let client = create_client(config).await?; let signing_key = test_signing_key(); publish_component( &client, @@ -305,7 +316,7 @@ async fn test_unknown_signing_key(config: &Config) -> Result<()> { // Start by publishing a new component package let name = PackageName::new(PACKAGE_NAME)?; - let client = create_client(config)?; + let client = create_client(config).await?; let signing_key = test_signing_key(); publish_component( &client, @@ -337,7 +348,7 @@ async fn test_unknown_signing_key(config: &Config) -> Result<()> { } async fn test_publishing_name_conflict(config: &Config) -> Result<()> { - let client = create_client(config)?; + let client = create_client(config).await?; let signing_key = test_signing_key(); publish_component( @@ -426,7 +437,7 @@ async fn test_custom_content_url(config: &Config) -> Result<()> { const PACKAGE_VERSION: &str = "0.1.0"; let name = PackageName::new(PACKAGE_NAME)?; - let client = create_client(config)?; + let client = create_client(config).await?; let signing_key = test_signing_key(); let digest = publish_component( &client, @@ -441,7 +452,10 @@ async fn test_custom_content_url(config: &Config) -> Result<()> { client.upsert([&name]).await?; let package = client .registry() - .load_package(client.get_warg_registry(), &name) + .load_package( + client.get_warg_registry(name.namespace()).await?.as_ref(), + &name, + ) .await? .expect("expected the package to exist"); package @@ -451,7 +465,7 @@ async fn test_custom_content_url(config: &Config) -> Result<()> { // Look up the content URL for the record let client = api::Client::new(config.home_url.as_ref().unwrap(), None)?; - let ContentSourcesResponse { content_sources } = client.content_sources(&digest).await?; + let ContentSourcesResponse { content_sources } = client.content_sources(None, &digest).await?; assert_eq!(content_sources.len(), 1); let sources = content_sources .get(&digest) @@ -513,7 +527,7 @@ async fn test_fetch_package_names(config: &Config) -> Result<()> { async fn test_get_ledger(config: &Config) -> Result<()> { let client = api::Client::new(config.home_url.as_ref().unwrap(), None)?; - let ts_checkpoint = client.latest_checkpoint().await?; + let ts_checkpoint = client.latest_checkpoint(None).await?; let checkpoint = &ts_checkpoint.as_ref().checkpoint; let url = Url::parse(config.home_url.as_ref().unwrap())? diff --git a/tests/support/mod.rs b/tests/support/mod.rs index b69b9fb2..28a38128 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -43,8 +43,8 @@ pub fn test_signing_key() -> PrivateKey { PrivateKey::decode(key.to_string()).unwrap() } -pub fn create_client(config: &warg_client::Config) -> Result { - match FileSystemClient::try_new_with_config(None, config, None)? { +pub async fn create_client(config: &warg_client::Config) -> Result { + match FileSystemClient::try_new_with_config(None, config, None).await? { StorageLockResult::Acquired(client) => Ok(client), _ => bail!("failed to acquire storage lock"), } From 00819c6d45730316432eacf2b3fa5df63a08584a Mon Sep 17 00:00:00 2001 From: Daniel Macovei Date: Tue, 16 Apr 2024 11:16:39 -0500 Subject: [PATCH 2/4] address comments --- crates/client/src/lib.rs | 52 ++++++++-------- src/bin/warg.rs | 124 +++++++++------------------------------ 2 files changed, 54 insertions(+), 122 deletions(-) diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index c91d482e..f17828d4 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -102,7 +102,7 @@ impl Client Result> { self.update_checkpoint(None, &self.api.latest_checkpoint(None).await?, vec![]) .await?; @@ -111,16 +111,19 @@ impl Client { + return Ok(Some(RegistryDomain::from_str(registry)?)); + } + Ok(Some(warg_protocol::operator::NamespaceState::Defined)) => { + return Ok(None); + } + Ok(None) => return Ok(None), + Err(e) => { + eprintln!("Namespace `{namespace}` not found in operator log but found namespace `{e}`, which has alternative casing."); + return Ok(None); + } + } }; let nm_map = self.namespace_map.load_namespace_map().await?; Ok(if let Some(nm_map) = &nm_map { @@ -494,28 +497,23 @@ impl Client> = IndexMap::new(); for package in packages { - let namespace = package.namespace(); - let namespace_packages = namespaced.get_mut(namespace); - if let Some(nm_pkgs) = namespace_packages { - nm_pkgs.push(package); - } else { - namespaced.insert(namespace, vec![package]); - } + namespaced + .entry(package.namespace()) + .or_default() + .push(package); } let namespace_map = self.namespace_map.load_namespace_map().await?; if let Some(nm_map) = namespace_map { for (nm, pkg_names) in namespaced { let mut updating = Vec::with_capacity(pkg_names.len()); let reg_domain = nm_map.get(nm); - if reg_domain.is_some() { - for package in pkg_names { - updating.push( - self.registry - .load_package(self.get_warg_registry(nm).await?.as_ref(), package) - .await? - .unwrap_or_else(|| PackageInfo::new(package.clone())), - ); - } + for package in pkg_names { + updating.push( + self.registry + .load_package(self.get_warg_registry(nm).await?.as_ref(), package) + .await? + .unwrap_or_else(|| PackageInfo::new(package.clone())), + ); } self.update_checkpoint( diff --git a/src/bin/warg.rs b/src/bin/warg.rs index 7cbb9cf5..f4110638 100644 --- a/src/bin/warg.rs +++ b/src/bin/warg.rs @@ -46,99 +46,33 @@ async fn main() -> Result<()> { .with_env_filter(EnvFilter::from_default_env()) .init(); - match &WargCli::parse() { - WargCli::Config(cmd) => cmd.clone().exec().await?, - WargCli::Info(cmd) => cmd.clone().exec().await?, - WargCli::Key(cmd) => cmd.clone().exec().await?, - WargCli::Lock(cmd) => { - with_interactive_retry(|retry: Option| async { - if let Err(e) = cmd.clone().exec(retry).await { - if let Some(e) = e.downcast_ref::() { - describe_client_error_or_retry(e).await?; - } else { - eprintln!("error: {e:?}"); - } - exit(1); - } - Ok(()) - }) - .await? - } - WargCli::Bundle(cmd) => { - with_interactive_retry(|retry: Option| async { - if let Err(e) = cmd.clone().exec(retry).await { - if let Some(e) = e.downcast_ref::() { - describe_client_error_or_retry(e).await?; - } else { - eprintln!("error: {e:?}"); - } - exit(1); - } - Ok(()) - }) - .await? - } - WargCli::Dependencies(cmd) => { - with_interactive_retry(|retry: Option| async { - if let Err(e) = cmd.clone().exec(retry).await { - if let Some(e) = e.downcast_ref::() { - describe_client_error_or_retry(e).await?; - } else { - eprintln!("error: {e:?}"); - } - exit(1); - } - Ok(()) - }) - .await? - } - WargCli::Download(cmd) => { - with_interactive_retry(|retry: Option| async { - if let Err(e) = cmd.clone().exec(retry).await { - if let Some(e) = e.downcast_ref::() { - describe_client_error_or_retry(e).await?; - } else { - eprintln!("error: {e:?}"); - } - exit(1); - } - Ok(()) - }) - .await? - } - WargCli::Update(cmd) => { - with_interactive_retry(|retry: Option| async { - if let Err(e) = cmd.clone().exec(retry).await { - if let Some(e) = e.downcast_ref::() { - describe_client_error_or_retry(e).await?; - } else { - eprintln!("error: {e:?}"); - } - exit(1); - } - Ok(()) - }) - .await? - } - WargCli::Publish(cmd) => { - with_interactive_retry(|retry: Option| async { - if let Err(e) = cmd.clone().exec(retry).await { - if let Some(e) = e.downcast_ref::() { - describe_client_error_or_retry(e).await?; - } else { - eprintln!("error: {e:?}"); - } - exit(1); - } - Ok(()) - }) - .await? + with_interactive_retry(|retry: Option| async { + if let Err(e) = match WargCli::parse() { + WargCli::Config(cmd) => cmd.exec().await, + WargCli::Info(cmd) => cmd.exec().await, + WargCli::Key(cmd) => cmd.exec().await, + WargCli::Lock(cmd) => cmd.exec(retry).await, + WargCli::Bundle(cmd) => cmd.exec(retry).await, + WargCli::Dependencies(cmd) => cmd.exec(retry).await, + WargCli::Download(cmd) => cmd.exec(retry).await, + WargCli::Update(cmd) => cmd.exec(retry).await, + WargCli::Publish(cmd) => cmd.exec(retry).await, + WargCli::Reset(cmd) => cmd.exec().await, + WargCli::Clear(cmd) => cmd.exec().await, + WargCli::Login(cmd) => cmd.exec().await, + WargCli::Logout(cmd) => cmd.exec().await, + } { + if let Some(e) = e.downcast_ref::() { + describe_client_error_or_retry(e).await?; + } else { + eprintln!("error: {e:?}"); + } + exit(1); } - WargCli::Reset(cmd) => cmd.clone().exec().await?, - WargCli::Clear(cmd) => cmd.clone().exec().await?, - WargCli::Login(cmd) => cmd.clone().exec().await?, - WargCli::Logout(cmd) => cmd.clone().exec().await?, - } + + Ok(()) + }) + .await?; Ok(()) } @@ -162,9 +96,9 @@ async fn describe_client_error_or_retry(e: &ClientError) -> Result<()> { let registry = terms.next(); if let (Some(namespace), Some(registry)) = (namespace, registry) { let prompt = format!( - "The package `{}`, does not exist in the registry you're using.\nHowever, the package namespace `{namespace}` does exist in the registry at {registry}.\nWould you like to configure your warg cli to use this registry for packages with this namespace in the future? y/N\n", - name.name() - ); + "The package `{}`, does not exist in the registry you're using.\nHowever, the package namespace `{namespace}` does exist in the registry at {registry}.\nWould you like to configure your warg cli to use this registry for packages with this namespace in the future? y/N\n", + name.name() + ); if Confirm::with_theme(&ColorfulTheme::default()) .with_prompt(prompt) .interact() From fd8c963660644f1b13cc8547731fbbcf749e1178 Mon Sep 17 00:00:00 2001 From: Daniel Macovei Date: Thu, 18 Apr 2024 17:30:09 -0500 Subject: [PATCH 3/4] add error for similar namespaces --- crates/client/src/lib.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index f17828d4..a0f9d243 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -103,7 +103,10 @@ impl Client Result> { + pub async fn get_warg_registry( + &self, + namespace: &str, + ) -> Result, ClientError> { self.update_checkpoint(None, &self.api.latest_checkpoint(None).await?, vec![]) .await?; let operator = self @@ -120,8 +123,10 @@ impl Client return Ok(None), Err(e) => { - eprintln!("Namespace `{namespace}` not found in operator log but found namespace `{e}`, which has alternative casing."); - return Ok(None); + return Err(ClientError::SimilarNamespace { + namespace: namespace.to_string(), + e: e.to_string(), + }); } } }; @@ -1054,6 +1059,15 @@ pub struct PackageDownload { /// Represents an error returned by Warg registry clients. #[derive(Debug, Error)] pub enum ClientError { + /// Similar Namespace + #[error("Namespace `{namespace}` not found in operator log but found namespace `{e}`, which has alternative casing.")] + SimilarNamespace { + /// Namespace + namespace: String, + /// Provided Error + e: String, + }, + /// No home registry registry server URL is configured. #[error("no home registry registry server URL is configured")] NoHomeRegistryUrl, From c3ac9130d0c13738a615033a7f6ae3ebbfff4530 Mon Sep 17 00:00:00 2001 From: Daniel Macovei Date: Wed, 24 Apr 2024 14:32:32 -0500 Subject: [PATCH 4/4] remove interactive retry abstraction --- crates/client/src/lib.rs | 18 +-------------- src/bin/warg.rs | 49 ++++++++++++++++++---------------------- 2 files changed, 23 insertions(+), 44 deletions(-) diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index a0f9d243..70aaa0ac 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -10,7 +10,6 @@ use secrecy::Secret; use semver::{Version, VersionReq}; use std::cmp::Ordering; use std::fs; -use std::future::Future; use std::str::FromStr; use std::{borrow::Cow, path::PathBuf, time::Duration}; use storage::{ @@ -121,13 +120,7 @@ impl Client { return Ok(None); } - Ok(None) => return Ok(None), - Err(e) => { - return Err(ClientError::SimilarNamespace { - namespace: namespace.to_string(), - e: e.to_string(), - }); - } + _ => (), } }; let nm_map = self.namespace_map.load_namespace_map().await?; @@ -1286,12 +1279,3 @@ impl Retry { Ok(()) } } - -/// Interactively retry when hint header received from warg server -pub async fn with_interactive_retry(f: impl Fn(Option) -> F) -> Result<()> -where - F: Future>, -{ - f(None).await?; - Ok(()) -} diff --git a/src/bin/warg.rs b/src/bin/warg.rs index f4110638..9de66634 100644 --- a/src/bin/warg.rs +++ b/src/bin/warg.rs @@ -8,7 +8,7 @@ use warg_cli::commands::{ KeyCommand, LockCommand, LoginCommand, LogoutCommand, PublishCommand, ResetCommand, UpdateCommand, }; -use warg_client::{with_interactive_retry, ClientError, Retry}; +use warg_client::{ClientError, Retry}; fn version() -> &'static str { option_env!("CARGO_VERSION_INFO").unwrap_or(env!("CARGO_PKG_VERSION")) @@ -46,33 +46,28 @@ async fn main() -> Result<()> { .with_env_filter(EnvFilter::from_default_env()) .init(); - with_interactive_retry(|retry: Option| async { - if let Err(e) = match WargCli::parse() { - WargCli::Config(cmd) => cmd.exec().await, - WargCli::Info(cmd) => cmd.exec().await, - WargCli::Key(cmd) => cmd.exec().await, - WargCli::Lock(cmd) => cmd.exec(retry).await, - WargCli::Bundle(cmd) => cmd.exec(retry).await, - WargCli::Dependencies(cmd) => cmd.exec(retry).await, - WargCli::Download(cmd) => cmd.exec(retry).await, - WargCli::Update(cmd) => cmd.exec(retry).await, - WargCli::Publish(cmd) => cmd.exec(retry).await, - WargCli::Reset(cmd) => cmd.exec().await, - WargCli::Clear(cmd) => cmd.exec().await, - WargCli::Login(cmd) => cmd.exec().await, - WargCli::Logout(cmd) => cmd.exec().await, - } { - if let Some(e) = e.downcast_ref::() { - describe_client_error_or_retry(e).await?; - } else { - eprintln!("error: {e:?}"); - } - exit(1); + if let Err(e) = match WargCli::parse() { + WargCli::Config(cmd) => cmd.exec().await, + WargCli::Info(cmd) => cmd.exec().await, + WargCli::Key(cmd) => cmd.exec().await, + WargCli::Lock(cmd) => cmd.exec(None).await, + WargCli::Bundle(cmd) => cmd.exec(None).await, + WargCli::Dependencies(cmd) => cmd.exec(None).await, + WargCli::Download(cmd) => cmd.exec(None).await, + WargCli::Update(cmd) => cmd.exec(None).await, + WargCli::Publish(cmd) => cmd.exec(None).await, + WargCli::Reset(cmd) => cmd.exec().await, + WargCli::Clear(cmd) => cmd.exec().await, + WargCli::Login(cmd) => cmd.exec().await, + WargCli::Logout(cmd) => cmd.exec().await, + } { + if let Some(e) = e.downcast_ref::() { + describe_client_error_or_retry(e).await?; + } else { + eprintln!("error: {e:?}"); } - - Ok(()) - }) - .await?; + exit(1); + } Ok(()) }