Skip to content

Commit

Permalink
fix: fixed included cached assets on wick reg push
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Jun 27, 2023
1 parent f3904cf commit 4577461
Show file tree
Hide file tree
Showing 16 changed files with 206 additions and 143 deletions.
5 changes: 5 additions & 0 deletions crates/bins/wick/src/commands/registry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use clap::Subcommand;

pub(crate) mod login;
pub(crate) mod manifest;
pub(crate) mod pull;
pub(crate) mod push;

Expand All @@ -17,4 +18,8 @@ pub(crate) enum SubCommands {
/// Save the credentials for a registry.
#[clap(name = "login")]
Login(login::Options),

/// Retrieve the manifest for a package.
#[clap(name = "manifest")]
Manifest(manifest::Options),
}
123 changes: 123 additions & 0 deletions crates/bins/wick/src/commands/registry/manifest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::collections::HashMap;

use anyhow::Result;
use clap::Args;
use serde_json::json;
use structured_output::StructuredOutput;
use wick_oci_utils::OciDescriptor;

use crate::options::get_auth_for_scope;
#[derive(Debug, Clone, Args)]
#[clap(rename_all = "kebab-case")]
#[group(skip)]
pub(crate) struct Options {
/// OCI reference to pull.
#[clap(action)]
pub(crate) reference: String,

/// Registry to use (overriding configured registry)
#[clap(long = "registry", action)]
pub(crate) registry: Option<String>,

#[clap(flatten)]
pub(crate) oci_opts: crate::oci::Options,
}

#[allow(clippy::unused_async)]
pub(crate) async fn handle(
opts: Options,
settings: wick_settings::Settings,
span: tracing::Span,
) -> Result<StructuredOutput> {
let _enter = span.enter();
let configured_creds = settings
.credentials
.iter()
.find(|c| opts.reference.starts_with(&c.scope));

let (username, password) = get_auth_for_scope(
configured_creds,
opts.oci_opts.username.as_deref(),
opts.oci_opts.password.as_deref(),
);

let mut oci_opts = wick_oci_utils::OciOptions::default();
oci_opts
.set_allow_insecure(opts.oci_opts.insecure_registries)
.set_allow_latest(true)
.set_username(username)
.set_password(password);

span.in_scope(|| debug!(options=?oci_opts, reference= opts.reference, "pulling reference"));

let (manifest, digest) = wick_oci_utils::fetch_image_manifest(&opts.reference, &oci_opts).await?;

let manifest_layers = manifest
.layers
.iter()
.enumerate()
.map(|(i, desc)| format!(" {}:\n{}", i, print_oci_descriptor(desc, 4)))
.collect::<Vec<_>>()
.join("\n");

let text = format!(
r#"# {}:
Digest: {}
Version: {}
Media Type: {}
Config:
{}
Annotations:
{}
Layers:
{}
"#,
opts.reference,
digest,
manifest.schema_version,
manifest.media_type.as_deref().unwrap_or_default(),
print_oci_descriptor(&manifest.config, 2),
print_annotations(&manifest.annotations, 2),
manifest_layers,
);

span.in_scope(|| debug!(%manifest, reference= opts.reference, "pulled manifest"));
let json = json!({"manifest":&manifest, "digest":digest});

Ok(StructuredOutput::new(text, json))
}

fn print_annotations(annotations: &Option<HashMap<String, String>>, indent: u8) -> String {
if let Some(annotations) = annotations {
return annotations
.iter()
.map(|(key, val)| format!("{}{}: {}", " ".repeat(indent as usize), key, val))
.collect::<Vec<_>>()
.join("\n");
}
"".to_owned()
}

fn print_oci_descriptor(descriptor: &OciDescriptor, indent: u8) -> String {
let mut text = vec![format!(
"{}Media Type: {}",
" ".repeat(indent as usize),
descriptor.media_type
)];
text.push(format!("{}Digest: {}", " ".repeat(indent as usize), descriptor.digest));
text.push(format!("{}Size: {}", " ".repeat(indent as usize), descriptor.size));
if let Some(urls) = &descriptor.urls {
if !urls.is_empty() {
text.push(format!("{}URLs: {}", " ".repeat(indent as usize), urls.join(", ")));
}
}
if descriptor.annotations.is_some() {
text.push(format!(
"{}Annotations:\n{}",
" ".repeat(indent as usize),
print_annotations(&descriptor.annotations, indent + 2)
));
}
text.join("\n")
}
1 change: 1 addition & 0 deletions crates/bins/wick/src/commands/registry/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) struct Options {
#[clap(flatten)]
pub(crate) oci_opts: crate::oci::Options,

/// Registry to use (overriding configured registry)
#[clap(long = "registry", action)]
pub(crate) registry: Option<String>,

Expand Down
1 change: 1 addition & 0 deletions crates/bins/wick/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ async fn async_main(cli: Cli, settings: wick_settings::Settings) -> Result<Struc
commands::registry::SubCommands::Push(cmd) => commands::registry::push::handle(cmd, settings, span).await,
commands::registry::SubCommands::Pull(cmd) => commands::registry::pull::handle(cmd, settings, span).await,
commands::registry::SubCommands::Login(cmd) => commands::registry::login::handle(cmd, settings, span).await,
commands::registry::SubCommands::Manifest(cmd) => commands::registry::manifest::handle(cmd, settings, span).await,
},
CliCommand::Rpc(cmd) => match cmd {
commands::rpc::SubCommands::Invoke(cmd) => commands::rpc::invoke::handle(cmd, settings, span).await,
Expand Down
2 changes: 1 addition & 1 deletion crates/interfaces/wick-interface-http/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
name: http
kind: wick/types@v1
metadata:
version: 0.2.0
version: 0.3.0
package:
registry:
host: registry.candle.dev
Expand Down
126 changes: 15 additions & 111 deletions crates/wick/wick-asset-reference/src/asset_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;

use asset_container::{self as assets, Asset, AssetManager, Progress, Status};
use bytes::{Bytes, BytesMut};
use asset_container::{self as assets, Asset, AssetManager, Progress};
use bytes::Bytes;
use normpath::PathExt;
use parking_lot::RwLock;
use tokio::io::AsyncReadExt;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::Stream;
use tracing::debug;
use wick_oci_utils::OciOptions;
Expand Down Expand Up @@ -82,11 +81,18 @@ impl AssetReference {
self.baseurl.read().clone()
}

#[allow(clippy::option_if_let_else)]
pub fn path(&self) -> Result<PathBuf, Error> {
self.resolve_path(true)
}

#[allow(clippy::option_if_let_else)]
fn resolve_path(&self, use_cache: bool) -> Result<PathBuf, Error> {
if let Some(cache_loc) = self.cache_location.read().as_ref() {
Ok(cache_loc.clone())
} else if let Ok(url) = normalize_path(self.location.as_ref(), self.baseurl()) {
if use_cache {
return Ok(cache_loc.clone());
}
}
if let Ok(url) = normalize_path(self.location.as_ref(), self.baseurl()) {
Ok(url)
} else if wick_oci_utils::parse_reference(self.location.as_str()).is_ok() {
Ok(PathBuf::from(&self.location))
Expand All @@ -112,96 +118,12 @@ impl AssetReference {
}
}

fn retrieve_as_file(&self) -> std::pin::Pin<Box<dyn Stream<Item = Progress> + Send + '_>> {
let mut buffer = [0u8; 1024];

let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let location = self.location.clone();
tokio::spawn(async move {
let location = location.as_str();
let mut bm = BytesMut::new();
let mut file = match tokio::fs::File::open(location).await {
Ok(file) => file,
Err(err) => {
let _ = tx.send(Progress::new(location, Status::Error(err.to_string())));
return;
}
};

let file_size = match file.metadata().await {
Ok(metadata) => metadata.len(),
Err(err) => {
let _ = tx.send(Progress::new(location, Status::Error(err.to_string())));
return;
}
};

let _ = tx.send(Progress::new(
location,
Status::Progress {
num: 0,
total: file_size as _,
},
));
loop {
let bytes = match file.read(&mut buffer).await {
Ok(0) => break,
Ok(bytes) => bytes,
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
let _ = tx.send(Progress::new(location, Status::Error(e.to_string())));
return;
}
};
let _ = tx.send(Progress::new(
location,
Status::Progress {
num: bytes,
total: file_size as _,
},
));
bm.extend_from_slice(&buffer[..bytes]);
}

let _ = tx.send(Progress::new(location, Status::AssetComplete(bm.to_vec())));
});
Box::pin(UnboundedReceiverStream::new(rx))
}

/// Check if the asset exists on disk.
#[must_use]
pub fn exists_locally(&self) -> bool {
let path = self.path();
let path = self.resolve_path(false);
path.is_ok() && path.unwrap().exists()
}

fn retrieve_as_oci_with_progress(
&self,
options: OciOptions,
) -> std::pin::Pin<Box<dyn Stream<Item = Progress> + Send + '_>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let location = self.location.clone();
tokio::spawn(async move {
let location = location.as_str();
let _ = tx.send(Progress::new(location, Status::Progress { num: 0, total: 0 }));
match wick_oci_utils::fetch_oci_bytes(location, &options).await {
Ok(bytes) => {
let _ = tx.send(Progress::new(
location,
Status::Progress {
num: bytes.len(),
total: bytes.len(),
},
));
let _ = tx.send(Progress::new(location, Status::AssetComplete(bytes)));
}
Err(e) => {
let _ = tx.send(Progress::new(location, Status::Error(e.to_string())));
}
}
});
Box::pin(UnboundedReceiverStream::new(rx))
}
}

impl Asset for AssetReference {
Expand All @@ -221,26 +143,8 @@ impl Asset for AssetReference {
*self.baseurl.write() = Some(baseurl);
}

fn fetch_with_progress(&self, options: OciOptions) -> std::pin::Pin<Box<dyn Stream<Item = Progress> + Send + '_>> {
let path = self.path();

debug!(path = ?path, "fetching asset with progress");
match path {
Ok(path) => {
if path.exists() {
debug!(path = %path.display(), "load as file");
self.retrieve_as_file()
} else {
debug!(url = %path.display(), "load as oci");
self.retrieve_as_oci_with_progress(options)
}
}

Err(e) => Box::pin(tokio_stream::once(Progress::new(
self.location(),
Status::Error(e.to_string()),
))),
}
fn fetch_with_progress(&self, _options: OciOptions) -> std::pin::Pin<Box<dyn Stream<Item = Progress> + Send + '_>> {
unimplemented!()
}

fn fetch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"$anchor": "#v1/{{name.value}}",
"enum" : [
{{#join values "," ~}}
"{{name.value}}"
"{{ pascalCase name.value}}"
{{~/join}}
]
},
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub(crate) enum {{name.value}} {
{{#withType .}}
{{#withAnnotation "tagged"}}
#[serde(rename = "{{arguments.0.value.value}}")]
{{else}}
#[serde(rename = "{{pascalCase name.value}}")]
{{/withAnnotation}}
{{/withType}}
{{#ifCond name.value '===' "ComponentOperationExpression"}}
Expand Down
Loading

0 comments on commit 4577461

Please sign in to comment.