Skip to content

Commit

Permalink
Namespaces (#247)
Browse files Browse the repository at this point in the history
This PR adds namespace config to warg client for federation.
The client will check operator logs for imported namespaces and use them
as warg headers on requests.
The client will check for warg hint headers when fetching logs, and
prompt users to update local namespace config to point at a domain if
the server provides a hint.
If the client does not find a package namespace in operator logs, it
will check the local namespace config to see if there is a registry
domain that it should include as a warg header.

---------

Co-authored-by: Calvin Prewitt <[email protected]>
  • Loading branch information
macovedj and calvinrp authored Feb 20, 2024
1 parent 820f25b commit 22cfb3e
Show file tree
Hide file tree
Showing 27 changed files with 980 additions and 181 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
.cargo
vendor/
publish
# Test generated files
bundled.wasm
locked.wasm
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ warg config --registry http://127.0.0.1:8090
```

This creates a [`$CONFIG_DIR/warg/config.json`][config_dir] configuration file;
the configuration file will specify the default registry URL to use so that the
the configuration file will specify the home registry URL to use so that the
`--registry` option does not need to be specified for every command.

Data downloaded by the client is stored in [`$CACHE_DIR/warg`][cache_dir] by
Expand Down Expand Up @@ -156,7 +156,7 @@ warg publish revoke --name example:hello sha256:abc...

### Resetting and clearing local data

To reset local data for the default registry:
To reset local data for the home registry:
```
warg reset
```
Expand Down
3 changes: 3 additions & 0 deletions crates/api/src/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use serde::{Deserialize, Serialize};
/// subject of the request. This header is only expected to be used if referring to a different
/// registry than the host registry.
pub const REGISTRY_HEADER_NAME: &str = "warg-registry";
/// The HTTP response header name that specifies that the client should
/// try another registry
pub const REGISTRY_HINT_HEADER_NAME: &str = "warg-registry-hint";

/// Represents the supported kinds of content sources.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand Down
164 changes: 145 additions & 19 deletions crates/client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures_util::{Stream, TryStreamExt};
use reqwest::{
header::{HeaderMap, HeaderValue},
Body, IntoUrl, Method, Response, StatusCode,
header::{HeaderMap, HeaderName, HeaderValue},
Body, IntoUrl, Method, RequestBuilder, Response, StatusCode,
};
use serde::de::DeserializeOwned;
use std::{borrow::Cow, collections::HashMap};
Expand All @@ -23,6 +23,7 @@ use warg_api::v1::{
proof::{
ConsistencyRequest, ConsistencyResponse, InclusionRequest, InclusionResponse, ProofError,
},
REGISTRY_HEADER_NAME, REGISTRY_HINT_HEADER_NAME,
};
use warg_crypto::hash::{AnyHash, HashError, Sha256};
use warg_protocol::{
Expand All @@ -34,8 +35,7 @@ use warg_transparency::{
map::MapProofBundle,
};

use crate::registry_url::RegistryUrl;

use crate::{registry_url::RegistryUrl, storage::RegistryDomain};
/// Represents an error that occurred while communicating with the registry.
#[derive(Debug, Error)]
pub enum ClientError {
Expand Down Expand Up @@ -102,6 +102,9 @@ pub enum ClientError {
/// Invalid upload HTTP method.
#[error("server returned an invalid HTTP header `{0}: {1}`")]
InvalidHttpHeader(String, String),
/// The provided log was not found with hint header.
#[error("log `{0}` was not found in this registry, but the registry provided the hint header: `{1:?}`")]
LogNotFoundWithHint(LogId, HeaderValue),
/// An other error occurred during the requested operation.
#[error(transparent)]
Other(#[from] anyhow::Error),
Expand Down Expand Up @@ -153,11 +156,28 @@ async fn into_result<T: DeserializeOwned, E: DeserializeOwned + Into<ClientError
}
}

trait WithWargHeader {
type Client;
fn warg_header(self, registry_header: &Option<RegistryDomain>) -> Result<RequestBuilder>;
}

impl WithWargHeader for RequestBuilder {
type Client = Client;
fn warg_header(self, registry_header: &Option<RegistryDomain>) -> Result<RequestBuilder> {
if let Some(reg) = registry_header {
Ok(self.header(REGISTRY_HEADER_NAME, HeaderValue::try_from(reg.clone())?))
} else {
Ok(self)
}
}
}

/// Represents a Warg API client for communicating with
/// a Warg registry server.
pub struct Client {
url: RegistryUrl,
client: reqwest::Client,
warg_registry_header: Option<RegistryDomain>,
}

impl Client {
Expand All @@ -167,6 +187,7 @@ impl Client {
Ok(Self {
url,
client: reqwest::Client::new(),
warg_registry_header: None,
})
}

Expand All @@ -181,7 +202,37 @@ impl Client {
) -> Result<SerdeEnvelope<TimestampedCheckpoint>, ClientError> {
let url = self.url.join(paths::fetch_checkpoint());
tracing::debug!("getting latest checkpoint at `{url}`");
into_result::<_, FetchError>(reqwest::get(url).await?).await
into_result::<_, FetchError>(
self.client
.get(url)
.warg_header(self.get_warg_registry())?
.send()
.await?,
)
.await
}

/// Gets the latest checkpoints from registries.
pub async fn latest_checkpoints(
&self,
registries: impl Iterator<Item = &String>,
) -> Result<HashMap<String, SerdeEnvelope<TimestampedCheckpoint>>> {
let mut timestamps = HashMap::new();
for reg in registries.into_iter() {
let url = self.url.join(paths::fetch_checkpoint());
let registry_header = HeaderName::try_from(REGISTRY_HEADER_NAME).unwrap();
let header_val = HeaderValue::try_from(reg).unwrap();
let res: SerdeEnvelope<TimestampedCheckpoint> = into_result::<_, FetchError>(
self.client
.get(url)
.header(registry_header, header_val)
.send()
.await?,
)
.await?;
timestamps.insert(reg.clone(), res);
}
Ok(timestamps)
}

/// Verify checkpoint of the registry.
Expand All @@ -192,7 +243,13 @@ impl Client {
let url = self.url.join(paths::verify_checkpoint());
tracing::debug!("verifying checkpoint at `{url}`");

let response = self.client.post(url).json(&request).send().await?;
let response = self
.client
.post(url)
.json(&request)
.warg_header(self.get_warg_registry())?
.send()
.await?;
into_result::<_, MonitorError>(response).await
}

Expand All @@ -203,9 +260,23 @@ impl Client {
) -> Result<FetchLogsResponse, ClientError> {
let url = self.url.join(paths::fetch_logs());
tracing::debug!("fetching logs at `{url}`");
let response = self
.client
.post(&url)
.json(&request)
.warg_header(self.get_warg_registry())?
.send()
.await?;

let response = self.client.post(url).json(&request).send().await?;
into_result::<_, FetchError>(response).await
let header = response.headers().get(REGISTRY_HINT_HEADER_NAME).cloned();
into_result::<_, FetchError>(response)
.await
.map_err(|err| match err {
ClientError::Fetch(FetchError::LogNotFound(log_id)) if header.is_some() => {
ClientError::LogNotFoundWithHint(log_id, header.unwrap())
}
_ => err,
})
}

/// Fetches package names from the registry.
Expand All @@ -216,7 +287,13 @@ impl Client {
let url = self.url.join(paths::fetch_package_names());
tracing::debug!("fetching package names at `{url}`");

let response = self.client.post(url).json(&request).send().await?;
let response = self
.client
.post(url)
.warg_header(self.get_warg_registry())?
.json(&request)
.send()
.await?;
into_result::<_, FetchError>(response).await
}

Expand All @@ -225,8 +302,14 @@ impl Client {
let url = self.url.join(paths::ledger_sources());
tracing::debug!("getting ledger sources at `{url}`");

let response = reqwest::get(url).await?;
into_result::<_, LedgerError>(response).await
into_result::<_, LedgerError>(
self.client
.get(url)
.warg_header(self.get_warg_registry())?
.send()
.await?,
)
.await
}

/// Publish a new record to a package log.
Expand All @@ -241,7 +324,13 @@ impl Client {
name = request.package_name
);

let response = self.client.post(url).json(&request).send().await?;
let response = self
.client
.post(url)
.json(&request)
.warg_header(self.get_warg_registry())?
.send()
.await?;
into_result::<_, PackageError>(response).await
}

Expand All @@ -254,8 +343,14 @@ impl Client {
let url = self.url.join(&paths::package_record(log_id, record_id));
tracing::debug!("getting record `{record_id}` for package `{log_id}` at `{url}`");

let response = reqwest::get(url).await?;
into_result::<_, PackageError>(response).await
into_result::<_, PackageError>(
self.client
.get(url)
.warg_header(self.get_warg_registry())?
.send()
.await?,
)
.await
}

/// Gets a content sources from the registry.
Expand All @@ -266,8 +361,14 @@ impl Client {
let url = self.url.join(&paths::content_sources(digest));
tracing::debug!("getting content sources for digest `{digest}` at `{url}`");

let response = reqwest::get(url).await?;
into_result::<_, ContentError>(response).await
into_result::<_, ContentError>(
self.client
.get(url)
.warg_header(self.get_warg_registry())?
.send()
.await?,
)
.await
}

/// Downloads the content associated with a given record.
Expand All @@ -288,7 +389,12 @@ impl Client {

tracing::debug!("downloading content `{digest}` from `{url}`");

let response = self.client.get(url).send().await?;
let response = self
.client
.get(url)
.warg_header(self.get_warg_registry())?
.send()
.await?;
if !response.status().is_success() {
tracing::debug!(
"failed to download content `{digest}` from `{url}`: {status}",
Expand All @@ -303,6 +409,16 @@ impl Client {
Err(ClientError::AllSourcesFailed(digest.clone()))
}

/// Set warg-registry header value
pub fn set_warg_registry(&mut self, registry: Option<RegistryDomain>) {
self.warg_registry_header = registry;
}

/// Get warg-registry header value
pub fn get_warg_registry(&self) -> &Option<RegistryDomain> {
&self.warg_registry_header
}

/// Proves the inclusion of the given package log heads in the registry.
pub async fn prove_inclusion(
&self,
Expand All @@ -314,7 +430,12 @@ impl Client {
tracing::debug!("proving checkpoint inclusion at `{url}`");

let response = into_result::<InclusionResponse, ProofError>(
self.client.post(url).json(&request).send().await?,
self.client
.post(url)
.json(&request)
.warg_header(self.get_warg_registry())?
.send()
.await?,
)
.await?;

Expand All @@ -330,7 +451,12 @@ impl Client {
) -> Result<(), ClientError> {
let url = self.url.join(paths::prove_consistency());
let response = into_result::<ConsistencyResponse, ProofError>(
self.client.post(url).json(&request).send().await?,
self.client
.post(url)
.json(&request)
.warg_header(self.get_warg_registry())?
.send()
.await?,
)
.await?;

Expand Down
Loading

0 comments on commit 22cfb3e

Please sign in to comment.