Skip to content

Commit

Permalink
export trace data to cloud providers
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Jun 20, 2024
1 parent 1cb125d commit 73b5c90
Show file tree
Hide file tree
Showing 13 changed files with 772 additions and 114 deletions.
587 changes: 535 additions & 52 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ either = "1.12.0"
futures = "0.3.28"
json-patch = "1.2.0"
k8s-openapi = { version = "0.19.0", features = ["v1_27"] }
object_store = { version = "0.10.1", features = ["aws", "gcp", "azure", "http"] }
# remove this fork once https://github.com/uutils/parse_datetime/pull/80 is merged and a new version released
parse_datetime = { git = "https://github.com/drmorr0/parse_datetime", rev = "748d15b" }
paste = "1.0.14"
Expand Down
16 changes: 16 additions & 0 deletions api/v1/simkube.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ paths:
format: int64
export_path:
type: string
format: uri
# TODO - eventually want positive filters too
filters:
type: object
Expand All @@ -50,3 +51,18 @@ paths:
responses:
'200':
description: OK
content:
application/json:
schema:
type: object
title: export_response
properties:
trace_data:
type: string
format: byte
'404':
description: Storage location not found
'500':
description: Tracer error
'502':
description: External storage provider error
54 changes: 28 additions & 26 deletions cli/export.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::fs;

use anyhow::anyhow;
use anyhow::bail;
use bytes::Bytes;
use object_store::PutPayload;
use reqwest::Url;
use simkube::prelude::*;
use simkube::store::storage::{
get_scheme,
Scheme,
use simkube::store::external_storage::{
object_store_for_scheme,
ObjectStoreScheme,
};
use simkube::time::duration_to_ts;

Expand Down Expand Up @@ -55,12 +54,12 @@ pub struct Args {
long_help = "location to save exported trace",
default_value = "file:///tmp/kind-node-data"
)]
pub output: Url,
pub output_path: String,
}

pub async fn cmd(args: &Args) -> EmptyResult {
let filters = ExportFilters::new(args.excluded_namespaces.clone(), vec![], true);
let req = ExportRequest::new(args.start_time, args.end_time, "".into(), filters);
let req = ExportRequest::new(args.start_time, args.end_time, args.output_path.clone(), filters);
let endpoint = format!("{}/export", args.tracer_address);

println!("exporting trace data");
Expand All @@ -69,25 +68,28 @@ pub async fn cmd(args: &Args) -> EmptyResult {
println!("making request to {}", endpoint);

let client = reqwest::Client::new();
let res = client.post(endpoint).json(&req).send().await?;

write_output(&res.bytes().await?, &args.output)
}

fn write_output(data: &Bytes, output_url: &Url) -> EmptyResult {
match get_scheme(output_url)? {
Scheme::Local => {
let mut fp = output_url
.to_file_path()
.map_err(|_| anyhow!("could not compute export path: {}", output_url))?;
fs::create_dir_all(&fp)?;
fp.push("trace");
fs::write(&fp, data)?;
println!("trace successfully written to {}", fp.to_str().unwrap());
match client.post(endpoint).json(&req).send().await? {
res if res.status().is_success() => {
// If we got trace data back from the request, it means the tracer pod couldn't or
// didn't want to write it (e.g., we asked to write to a local file); in the future we
// might also try to write the data to the cloud provider storage as a fallback if it
// didn't work from the tracer pod, so this will handle that case as well.
let data = res.bytes().await?;
if !data.is_empty() {
write_output(data, &args.output_path).await?;
}
println!("Trace data exported to {}", args.output_path);
},
Scheme::AmazonS3 => unimplemented!(),
}

res => bail!("Received {} response; could not export trace data:\n\n{}", res.status(), res.text().await?),
};
Ok(())
}

async fn write_output(data: Bytes, output_path: &str) -> EmptyResult {
let url = Url::parse(output_path)?;
let (scheme, path) = ObjectStoreScheme::parse(&url)?;
let store = object_store_for_scheme(&scheme, path.clone())?;
let payload = PutPayload::from_bytes(data);
store.put(&path, payload).await?;
Ok(())
}
1 change: 1 addition & 0 deletions cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct Options {
}

#[derive(Subcommand)]
#[allow(clippy::large_enum_variant)]
enum Commands {
#[command(about = "print SimKube CRDs")]
Crd,
Expand Down
9 changes: 5 additions & 4 deletions ctrl/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use simkube::metrics::api::prometheus::{
};
use simkube::prelude::*;
use simkube::sim::*;
use simkube::store::storage;
use simkube::store::external_storage::ObjectStoreScheme;

use super::cert_manager::DRIVER_CERT_NAME;
use super::trace::get_local_trace_volume;
Expand Down Expand Up @@ -181,9 +181,10 @@ pub(super) fn build_driver_job(
ctrl_ns: &str,
) -> anyhow::Result<batchv1::Job> {
let trace_url = Url::parse(&sim.spec.driver.trace_path)?;
let (trace_vm, trace_volume, trace_mount_path) = match storage::get_scheme(&trace_url)? {
storage::Scheme::AmazonS3 => todo!(),
storage::Scheme::Local => get_local_trace_volume(&trace_url)?,
let (trace_vm, trace_volume, trace_mount_path) = match ObjectStoreScheme::parse(&trace_url)? {
(ObjectStoreScheme::AmazonS3, _) => todo!(),
(ObjectStoreScheme::Local, _) => get_local_trace_volume(&trace_url)?,
_ => unimplemented!(),
};
let (cert_vm, cert_volume, cert_mount_path) = build_certificate_volumes(cert_secret_name);

Expand Down
1 change: 1 addition & 0 deletions images/Dockerfile.sk-driver
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM debian:buster-slim
ARG K8S_VERSION=v1.29.5

RUN apt-get update && apt-get install -y \
ca-certificates \
curl \
dumb-init \
jq \
Expand Down
1 change: 1 addition & 0 deletions images/Dockerfile.sk-tracer
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
FROM debian:buster-slim

RUN apt-get update && apt-get install -y \
ca-certificates \
dumb-init \
rust-gdb

Expand Down
100 changes: 100 additions & 0 deletions lib/store/external_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use object_store::path::Path;
use object_store::DynObjectStore;
use reqwest::Url;

use crate::errors::*;

/// We use the [object_store](https://docs.rs/object_store/latest/object_store/index.html) crate to
/// enable reading/writing from the three major cloud providers (AWS, Azure, GCP), as well as
/// to/from a local filesystem or an in-memory store. Supposedly HTTP with WebDAV is supported as
/// well but that is completely untested.
///
/// The reader will load credentials from the environment to communicate with the cloud provider,
/// as follows (other auth mechanisms _may_ work as well but are currently untested):
///
/// ### AWS
///
/// Set the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables, and pass in a
/// URL like `s3://bucket/path/to/resource`.
///
/// ### Azure
///
/// Set the `AZURE_STORAGE_ACCOUNT_NAME` and `AZURE_STORAGE_ACCOUNT_KEY` environment variables, and
/// pass in a URL like `azure://container/path/to/resources` (do not include the storage acocunt
/// name in the URL).
///
/// ### GCP
///
/// Set the `GOOGLE_SERVICE_ACCOUNT` environment variable to the path for your service account JSON
/// file (if you're running inside a container, you'll need that file injected as well). Pass in a
/// URL like `gs://bucket/path/to/resource`.
// This code is copy-pasta'ed from the object_store library because it is currently private
// in that library. This code can all be deleted if/once https://github.com/apache/arrow-rs/pull/5912
// is merged.

#[derive(Debug, Eq, PartialEq)]
pub enum ObjectStoreScheme {
Local,
Memory,
AmazonS3,
GoogleCloudStorage,
MicrosoftAzure,
Http,
}

impl ObjectStoreScheme {
pub fn parse(url: &Url) -> anyhow::Result<(Self, Path)> {
let strip_bucket = || Some(url.path().strip_prefix('/')?.split_once('/')?.1);

let (scheme, path) = match (url.scheme(), url.host_str()) {
("file", None) => (Self::Local, url.path()),
("memory", None) => (Self::Memory, url.path()),
("s3" | "s3a", Some(_)) => (Self::AmazonS3, url.path()),
("gs", Some(_)) => (Self::GoogleCloudStorage, url.path()),
("az" | "adl" | "azure" | "abfs" | "abfss", Some(_)) => (Self::MicrosoftAzure, url.path()),
("http", Some(_)) => (Self::Http, url.path()),
("https", Some(host)) => {
if host.ends_with("dfs.core.windows.net")
|| host.ends_with("blob.core.windows.net")
|| host.ends_with("dfs.fabric.microsoft.com")
|| host.ends_with("blob.fabric.microsoft.com")
{
(Self::MicrosoftAzure, url.path())
} else if host.ends_with("amazonaws.com") {
match host.starts_with("s3") {
true => (Self::AmazonS3, strip_bucket().unwrap_or_default()),
false => (Self::AmazonS3, url.path()),
}
} else if host.ends_with("r2.cloudflarestorage.com") {
(Self::AmazonS3, strip_bucket().unwrap_or_default())
} else {
(Self::Http, url.path())
}
},
_ => bail!("unrecognized url: {url}"),
};

Ok((scheme, Path::from_url_path(path)?))
}
}

// End copy-pasta'ed code

pub fn object_store_for_scheme(scheme: &ObjectStoreScheme, path: Path) -> anyhow::Result<Box<DynObjectStore>> {
let store: Box<DynObjectStore> = match &scheme {
ObjectStoreScheme::Local => Box::new(object_store::local::LocalFileSystem::new()),
ObjectStoreScheme::Memory => Box::new(object_store::memory::InMemory::new()),
ObjectStoreScheme::AmazonS3 => Box::new(object_store::aws::AmazonS3Builder::from_env().with_url(path).build()?),
ObjectStoreScheme::MicrosoftAzure => {
Box::new(object_store::azure::MicrosoftAzureBuilder::from_env().with_url(path).build()?)
},
ObjectStoreScheme::GoogleCloudStorage => Box::new(
object_store::gcp::GoogleCloudStorageBuilder::from_env()
.with_url(path)
.build()?,
),
ObjectStoreScheme::Http => Box::new(object_store::http::HttpBuilder::new().with_url(path).build()?),
};
Ok(store)
}
2 changes: 1 addition & 1 deletion lib/store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod external_storage;
mod pod_owners_map;
pub mod storage;
mod trace_filter;
mod trace_store;

Expand Down
24 changes: 0 additions & 24 deletions lib/store/storage.rs

This file was deleted.

48 changes: 48 additions & 0 deletions tracer/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::sync::{
MutexGuard,
PoisonError,
};

use rocket::Responder;
use simkube::store::TraceStore;

#[derive(Responder)]
pub enum ExportResponseError {
#[response(status = 404)]
StorageNotFound(String),

#[response(status = 500)]
TracerError(String),

#[response(status = 502)]
StorageError(String),
}

impl From<anyhow::Error> for ExportResponseError {
fn from(e: anyhow::Error) -> Self {
Self::TracerError(format!("SimKube error: {e}"))
}
}

impl From<PoisonError<MutexGuard<'_, TraceStore>>> for ExportResponseError {
fn from(e: PoisonError<MutexGuard<'_, TraceStore>>) -> Self {
Self::TracerError(format!("Mutex was poisoned: {e}"))
}
}

impl From<url::ParseError> for ExportResponseError {
fn from(e: url::ParseError) -> Self {
Self::TracerError(format!("Could not parse URL: {e}"))
}
}

impl From<object_store::Error> for ExportResponseError {
fn from(e: object_store::Error) -> Self {
match e {
object_store::Error::NotFound { .. } => {
Self::StorageNotFound(format!("Could not find external storage location: {e}"))
},
_ => Self::StorageError(format!("Could not write to object store: {e}")),
}
}
}
Loading

0 comments on commit 73b5c90

Please sign in to comment.