Skip to content

Commit

Permalink
Add support for querying OCI graph
Browse files Browse the repository at this point in the history
When parsing the release index, check for the new `oci-images` key. If
present, also build up a separate graph with only nodes containing OCI
information. In that case, the node payload is the pullspec and the
scheme declared in the node metadata is `oci`.

When a client requests a graph, check if the `oci=` URL parameter was
set. If so, return back the OCI graph instead of the OSTree one.

Part of coreos/fedora-coreos-tracker#1823.
  • Loading branch information
jlebon committed Nov 7, 2024
1 parent 7c0228c commit e598442
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 38 deletions.
37 changes: 29 additions & 8 deletions commons/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,35 @@ impl Graph {
},
};
let mut has_basearch = false;
for commit in entry.commits {
if commit.architecture != scope.basearch || commit.checksum.is_empty() {
continue;
if scope.oci {
if let Some(oci_images) = entry.oci_images {
for oci_image in oci_images {
if oci_image.architecture != scope.basearch
|| oci_image.digest_ref.is_empty()
{
continue;
}
has_basearch = true;
current.payload = oci_image.digest_ref;
current
.metadata
.insert(metadata::SCHEME.to_string(), "oci".to_string());
}
} else {
// This release doesn't have OCI images, skip it.
return None;
}
} else {
for commit in entry.commits {
if commit.architecture != scope.basearch || commit.checksum.is_empty() {
continue;
}
has_basearch = true;
current.payload = commit.checksum;
current
.metadata
.insert(metadata::SCHEME.to_string(), "checksum".to_string());
}
has_basearch = true;
current.payload = commit.checksum;
current
.metadata
.insert(metadata::SCHEME.to_string(), "checksum".to_string());
}

// Not a valid release payload for this graph scope, skip it.
Expand Down Expand Up @@ -210,4 +230,5 @@ impl Graph {
pub struct GraphScope {
pub basearch: String,
pub stream: String,
pub oci: bool,
}
10 changes: 10 additions & 0 deletions commons/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct ReleasesJSON {
#[derive(Clone, Debug, Deserialize)]
pub struct Release {
pub commits: Vec<ReleaseCommit>,
#[serde(rename = "oci-images")]
pub oci_images: Option<Vec<ReleaseOciImage>>,
pub version: String,
pub metadata: String,
}
Expand All @@ -42,6 +44,14 @@ pub struct ReleaseCommit {
pub checksum: String,
}

#[derive(Clone, Debug, Deserialize)]
pub struct ReleaseOciImage {
pub architecture: String,
pub image: String,
#[serde(rename = "digest-ref")]
pub digest_ref: String,
}

/// Fedora CoreOS updates metadata
#[derive(Clone, Debug, Deserialize)]
pub struct UpdatesJSON {
Expand Down
27 changes: 19 additions & 8 deletions commons/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub fn build_cors_middleware(origin_allowlist: &Option<Vec<String>>) -> CorsFact
pub fn validate_scope(
basearch: Option<String>,
stream: Option<String>,
oci: Option<bool>,
scope_allowlist: &Option<HashSet<GraphScope>>,
) -> Result<GraphScope, failure::Error> {
let basearch = basearch.ok_or_else(|| err_msg("missing basearch"))?;
Expand All @@ -34,15 +35,22 @@ pub fn validate_scope(
let stream = stream.ok_or_else(|| err_msg("missing stream"))?;
ensure!(!stream.is_empty(), "empty stream");

let scope = GraphScope { basearch, stream };
let oci = oci.unwrap_or_default();

let scope = GraphScope {
basearch,
stream,
oci,
};

// Optionally filter out scope according to given allowlist, if any.
if let Some(allowlist) = scope_allowlist {
if !allowlist.contains(&scope) {
bail!(
"scope not allowed: basearch='{}', stream='{}'",
"scope not allowed: basearch='{}', stream='{}', oci='{}'",
scope.basearch,
scope.stream
scope.stream,
scope.oci,
);
}
}
Expand All @@ -57,26 +65,28 @@ mod tests {
#[test]
fn test_validate_scope() {
{
let r = validate_scope(None, None, &None);
let r = validate_scope(None, None, None, &None);
assert!(r.is_err());
}
{
let basearch = Some("test_empty".to_string());
let stream = Some("".to_string());
let r = validate_scope(basearch, stream, &None);
let oci = None;
let r = validate_scope(basearch, stream, oci, &None);
assert!(r.is_err());
}
{
let basearch = Some("x86_64".to_string());
let stream = Some("stable".to_string());
let r = validate_scope(basearch, stream, &None);
let oci = Some(false);
let r = validate_scope(basearch, stream, oci, &None);
assert!(r.is_ok());
}
{
let basearch = Some("x86_64".to_string());
let stream = Some("stable".to_string());
let filter_none_allowed = Some(HashSet::new());
let r = validate_scope(basearch, stream, &filter_none_allowed);
let r = validate_scope(basearch, stream, None, &filter_none_allowed);
assert!(r.is_err());
}
{
Expand All @@ -85,9 +95,10 @@ mod tests {
let allowed_scope = GraphScope {
basearch: "x86_64".to_string(),
stream: "stable".to_string(),
oci: false,
};
let filter = Some(maplit::hashset! {allowed_scope});
let r = validate_scope(basearch, stream, &filter);
let r = validate_scope(basearch, stream, None, &filter);
assert!(r.is_ok());
}
}
Expand Down
22 changes: 14 additions & 8 deletions fcos-graph-builder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ lazy_static::lazy_static! {
static ref CACHED_GRAPH_REQUESTS: IntCounterVec = register_int_counter_vec!(
"fcos_cincinnati_gb_cache_graph_requests_total",
"Total number of requests for a cached graph",
&["basearch", "stream"]
&["basearch", "stream", "type"]
).unwrap();
static ref GRAPH_FINAL_EDGES: IntGaugeVec = register_int_gauge_vec!(
"fcos_cincinnati_gb_scraper_graph_final_edges",
"Number of edges in the cached graph, after processing",
&["basearch", "stream"]
&["basearch", "stream", "type"]
).unwrap();
static ref GRAPH_FINAL_RELEASES: IntGaugeVec = register_int_gauge_vec!(
"fcos_cincinnati_gb_scraper_graph_final_releases",
"Number of releases in the cached graph, after processing",
&["basearch", "stream"]
&["basearch", "stream", "type"]
).unwrap();
static ref LAST_REFRESH: IntGaugeVec = register_int_gauge_vec!(
"fcos_cincinnati_gb_scraper_graph_last_refresh_timestamp",
"UTC timestamp of last graph refresh",
&["basearch", "stream"]
&["basearch", "stream", "type"]
).unwrap();
static ref UPSTREAM_SCRAPES: IntCounterVec = register_int_counter_vec!(
"fcos_cincinnati_gb_scraper_upstream_scrapes_total",
Expand Down Expand Up @@ -138,23 +138,29 @@ pub(crate) struct AppState {
struct GraphQuery {
basearch: Option<String>,
stream: Option<String>,
oci: Option<bool>,
}

pub(crate) async fn gb_serve_graph(
data: web::Data<AppState>,
web::Query(query): web::Query<GraphQuery>,
) -> Result<HttpResponse, failure::Error> {
let scope = match commons::web::validate_scope(query.basearch, query.stream, &data.scope_filter)
{
let scope = match commons::web::validate_scope(
query.basearch,
query.stream,
query.oci,
&data.scope_filter,
) {
Err(e) => {
log::error!("graph request with invalid scope: {}", e);
return Ok(HttpResponse::BadRequest().finish());
}
Ok(s) => {
log::trace!(
"serving request for valid scope: basearch='{}', stream='{}'",
"serving request for valid scope: basearch='{}', stream='{}', oci='{}'",
s.basearch,
s.stream
s.stream,
s.oci,
);
s
}
Expand Down
74 changes: 60 additions & 14 deletions fcos-graph-builder/src/scraper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub struct Scraper {
stream: String,
/// arch -> graph
graphs: HashMap<String, Bytes>,
/// arch -> graph
oci_graphs: HashMap<String, Bytes>,
hclient: reqwest::Client,
pause_secs: NonZeroU64,
release_index_url: reqwest::Url,
Expand All @@ -30,6 +32,10 @@ impl Scraper {
Bytes::from(data)
};
let graphs = arches
.iter()
.map(|arch| (arch.clone(), empty.clone()))
.collect();
let oci_graphs = arches
.into_iter()
.map(|arch| (arch, empty.clone()))
.collect();
Expand All @@ -46,6 +52,7 @@ impl Scraper {

let scraper = Self {
graphs,
oci_graphs,
hclient,
pause_secs: NonZeroU64::new(30).expect("non-zero pause"),
stream,
Expand Down Expand Up @@ -95,7 +102,9 @@ impl Scraper {
/// Combine release-index and updates metadata.
fn assemble_graphs(
&self,
) -> impl Future<Output = Result<HashMap<String, graph::Graph>, Error>> {
) -> impl Future<
Output = Result<(HashMap<String, graph::Graph>, HashMap<String, graph::Graph>), Error>,
> {
let stream_releases = self.fetch_releases();
let stream_updates = self.fetch_updates();

Expand All @@ -104,10 +113,11 @@ impl Scraper {
let arches: Vec<String> = self.graphs.keys().cloned().collect();

async move {
let mut map = HashMap::with_capacity(arches.len());
let (graph, updates) =
futures::future::try_join(stream_releases, stream_updates).await?;
for arch in arches {
// first the legacy graphs
let mut map = HashMap::with_capacity(arches.len());
for arch in &arches {
map.insert(
arch.clone(),
graph::Graph::from_metadata(
Expand All @@ -116,38 +126,66 @@ impl Scraper {
graph::GraphScope {
basearch: arch.clone(),
stream: stream.clone(),
oci: false,
},
)?,
);
}
Ok(map)
// now the OCI graphs
let mut oci_map = HashMap::with_capacity(arches.len());
for arch in &arches {
oci_map.insert(
arch.clone(),
graph::Graph::from_metadata(
graph.clone(),
updates.clone(),
graph::GraphScope {
basearch: arch.clone(),
stream: stream.clone(),
oci: true,
},
)?,
);
}
Ok((map, oci_map))
}
}

/// Update cached graph.
fn update_cached_graph(&mut self, arch: String, graph: graph::Graph) -> Result<(), Error> {
fn update_cached_graph(
&mut self,
arch: String,
oci: bool,
graph: graph::Graph,
) -> Result<(), Error> {
let data = serde_json::to_vec_pretty(&graph).map_err(|e| failure::format_err!("{}", e))?;
let graph_type = if oci { "oci" } else { "checksum" };

let refresh_timestamp = chrono::Utc::now();
crate::LAST_REFRESH
.with_label_values(&[&arch, &self.stream])
.with_label_values(&[&arch, &self.stream, graph_type])
.set(refresh_timestamp.timestamp());
crate::GRAPH_FINAL_EDGES
.with_label_values(&[&arch, &self.stream])
.with_label_values(&[&arch, &self.stream, graph_type])
.set(graph.edges.len() as i64);
crate::GRAPH_FINAL_RELEASES
.with_label_values(&[&arch, &self.stream])
.with_label_values(&[&arch, &self.stream, graph_type])
.set(graph.nodes.len() as i64);

log::trace!(
"cached graph for {}/{}: releases={}, edges={}",
"cached graph for {}/{}/oci={}: releases={}, edges={}",
&arch,
self.stream,
oci,
graph.nodes.len(),
graph.edges.len()
);

self.graphs.insert(arch, Bytes::from(data));
if oci {
self.oci_graphs.insert(arch, Bytes::from(data));
} else {
self.graphs.insert(arch, Bytes::from(data));
}
Ok(())
}
}
Expand Down Expand Up @@ -178,9 +216,11 @@ impl Handler<RefreshTick> for Scraper {
let latest_graphs = self.assemble_graphs();
let update_graphs = actix::fut::wrap_future::<_, Self>(latest_graphs)
.map(|graphs, actor, _ctx| {
let res: Result<(), Error> = graphs.and_then(|g| {
let res: Result<(), Error> = graphs.and_then(|(g, oci_g)| {
g.into_iter()
.map(|(arch, graph)| actor.update_cached_graph(arch, graph))
.map(|(arch, graph)| (arch, false, graph))
.chain(oci_g.into_iter().map(|(arch, graph)| (arch, true, graph)))
.map(|(arch, oci, graph)| actor.update_cached_graph(arch, oci, graph))
.collect()
});
if let Err(e) = res {
Expand Down Expand Up @@ -210,16 +250,22 @@ impl Handler<GetCachedGraph> for Scraper {

fn handle(&mut self, msg: GetCachedGraph, _ctx: &mut Self::Context) -> Self::Result {
use failure::format_err;
let graph_type = if msg.scope.oci { "oci" } else { "checksum" };

if msg.scope.stream != self.stream {
return Box::new(actix::fut::err(format_err!(
"unexpected stream '{}'",
msg.scope.stream
)));
}
if let Some(graph) = self.graphs.get(&msg.scope.basearch) {
let target_graphmap = if msg.scope.oci {
&self.oci_graphs
} else {
&self.graphs
};
if let Some(graph) = target_graphmap.get(&msg.scope.basearch) {
crate::CACHED_GRAPH_REQUESTS
.with_label_values(&[&msg.scope.basearch, &msg.scope.stream])
.with_label_values(&[&msg.scope.basearch, &msg.scope.stream, &graph_type])
.inc();

Box::new(actix::fut::ok(graph.clone()))
Expand Down
Loading

0 comments on commit e598442

Please sign in to comment.