diff --git a/commons/src/graph.rs b/commons/src/graph.rs index 358bd29..363a602 100644 --- a/commons/src/graph.rs +++ b/commons/src/graph.rs @@ -37,15 +37,35 @@ impl Graph { }, }; let mut has_basearch = false; - for commit in entry.commits { - if commit.architecture != scope.basearch || commit.checksum.is_empty() { - continue; + if scope.oci { + if let Some(oci_images) = entry.oci_images { + for oci_image in oci_images { + if oci_image.architecture != scope.basearch + || oci_image.digest_ref.is_empty() + { + continue; + } + has_basearch = true; + current.payload = oci_image.digest_ref; + current + .metadata + .insert(metadata::SCHEME.to_string(), "oci".to_string()); + } + } else { + // This release doesn't have OCI images, skip it. + return None; + } + } else { + for commit in entry.commits { + if commit.architecture != scope.basearch || commit.checksum.is_empty() { + continue; + } + has_basearch = true; + current.payload = commit.checksum; + current + .metadata + .insert(metadata::SCHEME.to_string(), "checksum".to_string()); } - has_basearch = true; - current.payload = commit.checksum; - current - .metadata - .insert(metadata::SCHEME.to_string(), "checksum".to_string()); } // Not a valid release payload for this graph scope, skip it. @@ -210,4 +230,5 @@ impl Graph { pub struct GraphScope { pub basearch: String, pub stream: String, + pub oci: bool, } diff --git a/commons/src/metadata.rs b/commons/src/metadata.rs index ecd7ec0..d5f436e 100644 --- a/commons/src/metadata.rs +++ b/commons/src/metadata.rs @@ -32,6 +32,8 @@ pub struct ReleasesJSON { #[derive(Clone, Debug, Deserialize)] pub struct Release { pub commits: Vec, + #[serde(rename = "oci-images")] + pub oci_images: Option>, pub version: String, pub metadata: String, } @@ -42,6 +44,14 @@ pub struct ReleaseCommit { pub checksum: String, } +#[derive(Clone, Debug, Deserialize)] +pub struct ReleaseOciImage { + pub architecture: String, + pub image: String, + #[serde(rename = "digest-ref")] + pub digest_ref: String, +} + /// Fedora CoreOS updates metadata #[derive(Clone, Debug, Deserialize)] pub struct UpdatesJSON { diff --git a/commons/src/web.rs b/commons/src/web.rs index e3f0cf3..6661d4f 100644 --- a/commons/src/web.rs +++ b/commons/src/web.rs @@ -26,6 +26,7 @@ pub fn build_cors_middleware(origin_allowlist: &Option>) -> CorsFact pub fn validate_scope( basearch: Option, stream: Option, + oci: Option, scope_allowlist: &Option>, ) -> Result { let basearch = basearch.ok_or_else(|| err_msg("missing basearch"))?; @@ -34,15 +35,22 @@ pub fn validate_scope( let stream = stream.ok_or_else(|| err_msg("missing stream"))?; ensure!(!stream.is_empty(), "empty stream"); - let scope = GraphScope { basearch, stream }; + let oci = oci.unwrap_or_default(); + + let scope = GraphScope { + basearch, + stream, + oci, + }; // Optionally filter out scope according to given allowlist, if any. if let Some(allowlist) = scope_allowlist { if !allowlist.contains(&scope) { bail!( - "scope not allowed: basearch='{}', stream='{}'", + "scope not allowed: basearch='{}', stream='{}', oci='{}'", scope.basearch, - scope.stream + scope.stream, + scope.oci, ); } } @@ -57,26 +65,28 @@ mod tests { #[test] fn test_validate_scope() { { - let r = validate_scope(None, None, &None); + let r = validate_scope(None, None, None, &None); assert!(r.is_err()); } { let basearch = Some("test_empty".to_string()); let stream = Some("".to_string()); - let r = validate_scope(basearch, stream, &None); + let oci = None; + let r = validate_scope(basearch, stream, oci, &None); assert!(r.is_err()); } { let basearch = Some("x86_64".to_string()); let stream = Some("stable".to_string()); - let r = validate_scope(basearch, stream, &None); + let oci = Some(false); + let r = validate_scope(basearch, stream, oci, &None); assert!(r.is_ok()); } { let basearch = Some("x86_64".to_string()); let stream = Some("stable".to_string()); let filter_none_allowed = Some(HashSet::new()); - let r = validate_scope(basearch, stream, &filter_none_allowed); + let r = validate_scope(basearch, stream, None, &filter_none_allowed); assert!(r.is_err()); } { @@ -85,9 +95,10 @@ mod tests { let allowed_scope = GraphScope { basearch: "x86_64".to_string(), stream: "stable".to_string(), + oci: false, }; let filter = Some(maplit::hashset! {allowed_scope}); - let r = validate_scope(basearch, stream, &filter); + let r = validate_scope(basearch, stream, None, &filter); assert!(r.is_ok()); } } diff --git a/fcos-graph-builder/src/main.rs b/fcos-graph-builder/src/main.rs index 8979c3f..c916d75 100644 --- a/fcos-graph-builder/src/main.rs +++ b/fcos-graph-builder/src/main.rs @@ -24,22 +24,22 @@ lazy_static::lazy_static! { static ref CACHED_GRAPH_REQUESTS: IntCounterVec = register_int_counter_vec!( "fcos_cincinnati_gb_cache_graph_requests_total", "Total number of requests for a cached graph", - &["basearch", "stream"] + &["basearch", "stream", "type"] ).unwrap(); static ref GRAPH_FINAL_EDGES: IntGaugeVec = register_int_gauge_vec!( "fcos_cincinnati_gb_scraper_graph_final_edges", "Number of edges in the cached graph, after processing", - &["basearch", "stream"] + &["basearch", "stream", "type"] ).unwrap(); static ref GRAPH_FINAL_RELEASES: IntGaugeVec = register_int_gauge_vec!( "fcos_cincinnati_gb_scraper_graph_final_releases", "Number of releases in the cached graph, after processing", - &["basearch", "stream"] + &["basearch", "stream", "type"] ).unwrap(); static ref LAST_REFRESH: IntGaugeVec = register_int_gauge_vec!( "fcos_cincinnati_gb_scraper_graph_last_refresh_timestamp", "UTC timestamp of last graph refresh", - &["basearch", "stream"] + &["basearch", "stream", "type"] ).unwrap(); static ref UPSTREAM_SCRAPES: IntCounterVec = register_int_counter_vec!( "fcos_cincinnati_gb_scraper_upstream_scrapes_total", @@ -138,23 +138,29 @@ pub(crate) struct AppState { struct GraphQuery { basearch: Option, stream: Option, + oci: Option, } pub(crate) async fn gb_serve_graph( data: web::Data, web::Query(query): web::Query, ) -> Result { - let scope = match commons::web::validate_scope(query.basearch, query.stream, &data.scope_filter) - { + let scope = match commons::web::validate_scope( + query.basearch, + query.stream, + query.oci, + &data.scope_filter, + ) { Err(e) => { log::error!("graph request with invalid scope: {}", e); return Ok(HttpResponse::BadRequest().finish()); } Ok(s) => { log::trace!( - "serving request for valid scope: basearch='{}', stream='{}'", + "serving request for valid scope: basearch='{}', stream='{}', oci='{}'", s.basearch, - s.stream + s.stream, + s.oci, ); s } diff --git a/fcos-graph-builder/src/scraper.rs b/fcos-graph-builder/src/scraper.rs index e223494..2fb7f22 100644 --- a/fcos-graph-builder/src/scraper.rs +++ b/fcos-graph-builder/src/scraper.rs @@ -16,6 +16,8 @@ pub struct Scraper { stream: String, /// arch -> graph graphs: HashMap, + /// arch -> graph + oci_graphs: HashMap, hclient: reqwest::Client, pause_secs: NonZeroU64, release_index_url: reqwest::Url, @@ -30,6 +32,10 @@ impl Scraper { Bytes::from(data) }; let graphs = arches + .iter() + .map(|arch| (arch.clone(), empty.clone())) + .collect(); + let oci_graphs = arches .into_iter() .map(|arch| (arch, empty.clone())) .collect(); @@ -46,6 +52,7 @@ impl Scraper { let scraper = Self { graphs, + oci_graphs, hclient, pause_secs: NonZeroU64::new(30).expect("non-zero pause"), stream, @@ -95,7 +102,9 @@ impl Scraper { /// Combine release-index and updates metadata. fn assemble_graphs( &self, - ) -> impl Future, Error>> { + ) -> impl Future< + Output = Result<(HashMap, HashMap), Error>, + > { let stream_releases = self.fetch_releases(); let stream_updates = self.fetch_updates(); @@ -104,10 +113,11 @@ impl Scraper { let arches: Vec = self.graphs.keys().cloned().collect(); async move { - let mut map = HashMap::with_capacity(arches.len()); let (graph, updates) = futures::future::try_join(stream_releases, stream_updates).await?; - for arch in arches { + // first the legacy graphs + let mut map = HashMap::with_capacity(arches.len()); + for arch in &arches { map.insert( arch.clone(), graph::Graph::from_metadata( @@ -116,38 +126,66 @@ impl Scraper { graph::GraphScope { basearch: arch.clone(), stream: stream.clone(), + oci: false, }, )?, ); } - Ok(map) + // now the OCI graphs + let mut oci_map = HashMap::with_capacity(arches.len()); + for arch in &arches { + oci_map.insert( + arch.clone(), + graph::Graph::from_metadata( + graph.clone(), + updates.clone(), + graph::GraphScope { + basearch: arch.clone(), + stream: stream.clone(), + oci: true, + }, + )?, + ); + } + Ok((map, oci_map)) } } /// Update cached graph. - fn update_cached_graph(&mut self, arch: String, graph: graph::Graph) -> Result<(), Error> { + fn update_cached_graph( + &mut self, + arch: String, + oci: bool, + graph: graph::Graph, + ) -> Result<(), Error> { let data = serde_json::to_vec_pretty(&graph).map_err(|e| failure::format_err!("{}", e))?; + let graph_type = if oci { "oci" } else { "checksum" }; let refresh_timestamp = chrono::Utc::now(); crate::LAST_REFRESH - .with_label_values(&[&arch, &self.stream]) + .with_label_values(&[&arch, &self.stream, graph_type]) .set(refresh_timestamp.timestamp()); crate::GRAPH_FINAL_EDGES - .with_label_values(&[&arch, &self.stream]) + .with_label_values(&[&arch, &self.stream, graph_type]) .set(graph.edges.len() as i64); crate::GRAPH_FINAL_RELEASES - .with_label_values(&[&arch, &self.stream]) + .with_label_values(&[&arch, &self.stream, graph_type]) .set(graph.nodes.len() as i64); log::trace!( - "cached graph for {}/{}: releases={}, edges={}", + "cached graph for {}/{}/oci={}: releases={}, edges={}", &arch, self.stream, + oci, graph.nodes.len(), graph.edges.len() ); - self.graphs.insert(arch, Bytes::from(data)); + if oci { + self.oci_graphs.insert(arch, Bytes::from(data)); + } else { + self.graphs.insert(arch, Bytes::from(data)); + } Ok(()) } } @@ -178,9 +216,11 @@ impl Handler for Scraper { let latest_graphs = self.assemble_graphs(); let update_graphs = actix::fut::wrap_future::<_, Self>(latest_graphs) .map(|graphs, actor, _ctx| { - let res: Result<(), Error> = graphs.and_then(|g| { + let res: Result<(), Error> = graphs.and_then(|(g, oci_g)| { g.into_iter() - .map(|(arch, graph)| actor.update_cached_graph(arch, graph)) + .map(|(arch, graph)| (arch, false, graph)) + .chain(oci_g.into_iter().map(|(arch, graph)| (arch, true, graph))) + .map(|(arch, oci, graph)| actor.update_cached_graph(arch, oci, graph)) .collect() }); if let Err(e) = res { @@ -210,6 +250,7 @@ impl Handler for Scraper { fn handle(&mut self, msg: GetCachedGraph, _ctx: &mut Self::Context) -> Self::Result { use failure::format_err; + let graph_type = if msg.scope.oci { "oci" } else { "checksum" }; if msg.scope.stream != self.stream { return Box::new(actix::fut::err(format_err!( @@ -217,9 +258,14 @@ impl Handler for Scraper { msg.scope.stream ))); } - if let Some(graph) = self.graphs.get(&msg.scope.basearch) { + let target_graphmap = if msg.scope.oci { + &self.oci_graphs + } else { + &self.graphs + }; + if let Some(graph) = target_graphmap.get(&msg.scope.basearch) { crate::CACHED_GRAPH_REQUESTS - .with_label_values(&[&msg.scope.basearch, &msg.scope.stream]) + .with_label_values(&[&msg.scope.basearch, &msg.scope.stream, &graph_type]) .inc(); Box::new(actix::fut::ok(graph.clone())) diff --git a/fcos-policy-engine/src/main.rs b/fcos-policy-engine/src/main.rs index 7861675..5d6a55b 100644 --- a/fcos-policy-engine/src/main.rs +++ b/fcos-policy-engine/src/main.rs @@ -130,6 +130,7 @@ pub struct GraphQuery { stream: Option, rollout_wariness: Option, node_uuid: Option, + oci: Option, } pub(crate) async fn pe_serve_graph( @@ -141,6 +142,7 @@ pub(crate) async fn pe_serve_graph( let scope = match commons::web::validate_scope( query.basearch.clone(), query.stream.clone(), + query.oci, &data.scope_filter, ) { Err(e) => { @@ -160,6 +162,7 @@ pub(crate) async fn pe_serve_graph( data.upstream_endpoint.clone(), scope.stream, scope.basearch, + scope.oci, data.upstream_req_timeout, ) .await?; diff --git a/fcos-policy-engine/src/utils.rs b/fcos-policy-engine/src/utils.rs index b459cbe..bd68760 100644 --- a/fcos-policy-engine/src/utils.rs +++ b/fcos-policy-engine/src/utils.rs @@ -19,6 +19,7 @@ pub(crate) async fn fetch_graph_from_gb( upstream_base: reqwest::Url, stream: String, basearch: String, + oci: bool, req_timeout: Duration, ) -> Result { if stream.trim().is_empty() { @@ -32,6 +33,7 @@ pub(crate) async fn fetch_graph_from_gb( basearch: Some(basearch), rollout_wariness: None, node_uuid: None, + oci: Some(oci), }; // Cannot use `?` directly here otherwise will produce the error: // the trait `std::marker::Sync` is not implemented for `(dyn std::error::Error + std::marker::Send + 'static)`