Skip to content

Commit

Permalink
make better use of namespace storage
Browse files Browse the repository at this point in the history
  • Loading branch information
macovedj committed Apr 15, 2024
1 parent 8087c66 commit 78a5571
Show file tree
Hide file tree
Showing 26 changed files with 529 additions and 348 deletions.
47 changes: 28 additions & 19 deletions crates/client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ async fn into_result<T: DeserializeOwned, E: DeserializeOwned + Into<ClientError
}

trait WithWargHeader {
fn warg_header(self, registry_header: &Option<RegistryDomain>) -> Result<RequestBuilder>;
fn warg_header(self, registry_header: Option<&RegistryDomain>) -> Result<RequestBuilder>;
}

impl WithWargHeader for RequestBuilder {
fn warg_header(self, registry_header: &Option<RegistryDomain>) -> Result<RequestBuilder> {
fn warg_header(self, registry_header: Option<&RegistryDomain>) -> Result<RequestBuilder> {
if let Some(reg) = registry_header {
Ok(self.header(REGISTRY_HEADER_NAME, HeaderValue::try_from(reg.clone())?))
} else {
Expand Down Expand Up @@ -219,13 +219,14 @@ impl Client {
/// Gets the latest checkpoint from the registry.
pub async fn latest_checkpoint(
&self,
reg_domain: Option<&RegistryDomain>,
) -> Result<SerdeEnvelope<TimestampedCheckpoint>, ClientError> {
let url = self.url.join(paths::fetch_checkpoint());
tracing::debug!("getting latest checkpoint at `{url}`");
into_result::<_, FetchError>(
self.client
.get(url)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?,
Expand Down Expand Up @@ -260,6 +261,7 @@ impl Client {
/// Verify checkpoint of the registry.
pub async fn verify_checkpoint(
&self,
reg_domain: Option<&RegistryDomain>,
request: SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<CheckpointVerificationResponse, ClientError> {
let url = self.url.join(paths::verify_checkpoint());
Expand All @@ -269,7 +271,7 @@ impl Client {
.client
.post(url)
.json(&request)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?;
Expand All @@ -279,6 +281,7 @@ impl Client {
/// Fetches package log entries from the registry.
pub async fn fetch_logs(
&self,
reg_domain: Option<&RegistryDomain>,
request: FetchLogsRequest<'_>,
) -> Result<FetchLogsResponse, ClientError> {
let url = self.url.join(paths::fetch_logs());
Expand All @@ -287,7 +290,7 @@ impl Client {
.client
.post(&url)
.json(&request)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?;
Expand All @@ -306,6 +309,7 @@ impl Client {
/// Fetches package names from the registry.
pub async fn fetch_package_names(
&self,
reg_domain: Option<&RegistryDomain>,
request: FetchPackageNamesRequest<'_>,
) -> Result<FetchPackageNamesResponse, ClientError> {
let url = self.url.join(paths::fetch_package_names());
Expand All @@ -314,7 +318,7 @@ impl Client {
let response = self
.client
.post(url)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.json(&request)
.send()
Expand All @@ -323,14 +327,17 @@ impl Client {
}

/// Gets ledger sources from the registry.
pub async fn ledger_sources(&self) -> Result<LedgerSourcesResponse, ClientError> {
pub async fn ledger_sources(
&self,
reg_domain: Option<&RegistryDomain>,
) -> Result<LedgerSourcesResponse, ClientError> {
let url = self.url.join(paths::ledger_sources());
tracing::debug!("getting ledger sources at `{url}`");

into_result::<_, LedgerError>(
self.client
.get(url)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?,
Expand All @@ -341,6 +348,7 @@ impl Client {
/// Publish a new record to a package log.
pub async fn publish_package_record(
&self,
reg_domain: Option<&RegistryDomain>,
log_id: &LogId,
request: PublishRecordRequest<'_>,
) -> Result<PackageRecord, ClientError> {
Expand All @@ -354,7 +362,7 @@ impl Client {
.client
.post(url)
.json(&request)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?;
Expand All @@ -364,6 +372,7 @@ impl Client {
/// Gets a package record from the registry.
pub async fn get_package_record(
&self,
reg_domain: Option<&RegistryDomain>,
log_id: &LogId,
record_id: &RecordId,
) -> Result<PackageRecord, ClientError> {
Expand All @@ -373,7 +382,7 @@ impl Client {
into_result::<_, PackageError>(
self.client
.get(url)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?,
Expand All @@ -384,6 +393,7 @@ impl Client {
/// Gets a content sources from the registry.
pub async fn content_sources(
&self,
reg_domain: Option<&RegistryDomain>,
digest: &AnyHash,
) -> Result<ContentSourcesResponse, ClientError> {
let url = self.url.join(&paths::content_sources(digest));
Expand All @@ -392,7 +402,7 @@ impl Client {
into_result::<_, ContentError>(
self.client
.get(url)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?,
Expand All @@ -403,11 +413,13 @@ impl Client {
/// Downloads the content associated with a given record.
pub async fn download_content(
&self,
registry_domain: Option<&RegistryDomain>,
digest: &AnyHash,
) -> Result<impl Stream<Item = Result<Bytes>>, ClientError> {
tracing::debug!("requesting content download for digest `{digest}`");

let ContentSourcesResponse { content_sources } = self.content_sources(digest).await?;
let ContentSourcesResponse { content_sources } =
self.content_sources(registry_domain, digest).await?;

let sources = content_sources
.get(digest)
Expand Down Expand Up @@ -438,14 +450,10 @@ impl Client {
self.warg_registry_header = registry;
}

/// Get warg-registry header value
pub fn get_warg_registry(&self) -> &Option<RegistryDomain> {
&self.warg_registry_header
}

/// Proves the inclusion of the given package log heads in the registry.
pub async fn prove_inclusion(
&self,
reg_domain: Option<&RegistryDomain>,
request: InclusionRequest,
checkpoint: &Checkpoint,
leafs: &[LogLeaf],
Expand All @@ -457,7 +465,7 @@ impl Client {
self.client
.post(url)
.json(&request)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?,
Expand All @@ -470,6 +478,7 @@ impl Client {
/// Proves consistency between two log roots.
pub async fn prove_log_consistency(
&self,
reg_domain: Option<&RegistryDomain>,
request: ConsistencyRequest,
from_log_root: Cow<'_, AnyHash>,
to_log_root: Cow<'_, AnyHash>,
Expand All @@ -479,7 +488,7 @@ impl Client {
self.client
.post(url)
.json(&request)
.warg_header(self.get_warg_registry())?
.warg_header(reg_domain)?
.auth(self.auth_token())
.send()
.await?,
Expand Down
20 changes: 16 additions & 4 deletions crates/client/src/depsolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ impl LockListBuilder {
match import.kind {
ImportKind::Locked(_) | ImportKind::Unlocked => {
let id = PackageName::new(import.name.clone())?;
let registry_domain = client.get_warg_registry(id.namespace()).await?;
if let Some(info) = client
.registry()
.load_package(client.api.get_warg_registry(), &id)
.load_package(registry_domain.as_ref(), &id)
.await?
{
let release = info.state.releases().last();
Expand All @@ -114,10 +115,15 @@ impl LockListBuilder {
}
self.lock_list.insert(import);
} else {
client.download(&id, &VersionReq::STAR).await?;
client
.download(registry_domain.as_ref(), &id, &VersionReq::STAR)
.await?;
if let Some(info) = client
.registry()
.load_package(client.api.get_warg_registry(), &id)
.load_package(
client.get_warg_registry(id.namespace()).await?.as_ref(),
&id,
)
.await?
{
let release = info.state.releases().last();
Expand Down Expand Up @@ -213,7 +219,13 @@ where
if let Some(info) = self
.client
.registry()
.load_package(self.client.api.get_warg_registry(), &pkg_id)
.load_package(
self.client
.get_warg_registry(pkg_id.namespace())
.await?
.as_ref(),
&pkg_id,
)
.await?
{
let release = if parsed_imp.req != VersionReq::STAR {
Expand Down
Loading

0 comments on commit 78a5571

Please sign in to comment.