diff --git a/tensorboard/data/server/BUILD b/tensorboard/data/server/BUILD index c626daba0f..4e578e4009 100644 --- a/tensorboard/data/server/BUILD +++ b/tensorboard/data/server/BUILD @@ -33,6 +33,8 @@ rust_library( "disk_logdir.rs", "downsample.rs", "event_file.rs", + "gcs.rs", + "gcs/client.rs", "logdir.rs", "masked_crc.rs", "reservoir.rs", @@ -57,6 +59,7 @@ rust_library( "//third_party/rust:rand", "//third_party/rust:rand_chacha", "//third_party/rust:rayon", + "//third_party/rust:reqwest", "//third_party/rust:serde", "//third_party/rust:serde_json", "//third_party/rust:thiserror", @@ -88,6 +91,18 @@ rust_binary( ], ) +rust_binary( + name = "gsutil", + srcs = ["gcs/gsutil.rs"], + edition = "2018", + deps = [ + ":rustboard_core", + "//third_party/rust:clap", + "//third_party/rust:env_logger", + "//third_party/rust:log", + ], +) + rust_doc_test( name = "rustboard_core_doc_test", dep = ":rustboard_core", diff --git a/tensorboard/data/server/Cargo.toml b/tensorboard/data/server/Cargo.toml index 21fc6cfe38..25bd5e60e8 100644 --- a/tensorboard/data/server/Cargo.toml +++ b/tensorboard/data/server/Cargo.toml @@ -59,6 +59,10 @@ path = "main.rs" name = "bench" path = "bench.rs" +[[bin]] +name = "gsutil" +path = "gcs/gsutil.rs" + [lib] name = "rustboard_core" path = "lib.rs" diff --git a/tensorboard/data/server/cli.rs b/tensorboard/data/server/cli.rs index fd988d9cf3..58870cb09c 100644 --- a/tensorboard/data/server/cli.rs +++ b/tensorboard/data/server/cli.rs @@ -37,7 +37,7 @@ use crate::server::DataProviderHandler; use data::tensor_board_data_provider_server::TensorBoardDataProviderServer; #[derive(Clap, Debug)] -#[clap(name = "rustboard", version = "0.3.0-alpha.0")] +#[clap(name = "rustboard", version = crate::VERSION)] struct Opts { /// Log directory to load /// diff --git a/tensorboard/data/server/gcs.rs b/tensorboard/data/server/gcs.rs new file mode 100644 index 0000000000..bbf6727211 --- /dev/null +++ b/tensorboard/data/server/gcs.rs @@ -0,0 +1,20 @@ +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +//! Google Cloud Storage interop. + +mod client; + +pub use client::Client; diff --git a/tensorboard/data/server/gcs/client.rs b/tensorboard/data/server/gcs/client.rs new file mode 100644 index 0000000000..7c567df7fb --- /dev/null +++ b/tensorboard/data/server/gcs/client.rs @@ -0,0 +1,117 @@ +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +//! Client for listing and reading GCS files. + +use log::debug; +use reqwest::{blocking::Client as HttpClient, StatusCode, Url}; +use std::ops::RangeInclusive; + +/// Base URL for direct object reads. +const STORAGE_BASE: &str = "https://storage.googleapis.com"; +/// Base URL for JSON API access. +const API_BASE: &str = "https://www.googleapis.com/storage/v1"; + +/// GCS client. +pub struct Client { + http: HttpClient, +} + +impl Client { + /// Creates a new GCS client. + /// + /// May fail if constructing the underlying HTTP client fails. + pub fn new() -> reqwest::Result { + let http = HttpClient::builder() + .user_agent(format!("tensorboard-data-server/{}", crate::VERSION)) + .build()?; + Ok(Self { http }) + } +} + +/// Response from the `/b//o` object listing API. +#[derive(serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct ListResponse { + /// Continuation token; only present when there is more data. + next_page_token: Option, + /// List of objects, sorted by name. + #[serde(default)] // `items` omitted entirely when there are no results + items: Vec, +} +#[derive(serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct ListResponseItem { + /// Full GCS object name, possibly including slashes, but not including the bucket. + name: String, +} + +impl Client { + /// Lists all objects in a bucket matching the given prefix. + pub fn list(&self, bucket: &str, prefix: &str) -> reqwest::Result> { + let mut base_url = Url::parse(API_BASE).unwrap(); + base_url + .path_segments_mut() + .unwrap() + .extend(&["b", bucket, "o"]); + base_url + .query_pairs_mut() + .append_pair("prefix", prefix) + .append_pair("prettyPrint", "false") + .append_pair("fields", "nextPageToken,items/name"); + let mut results = Vec::new(); + let mut page_token: Option = None; + for page in 1.. { + let mut url = base_url.clone(); + if let Some(t) = page_token { + url.query_pairs_mut().append_pair("pageToken", t.as_str()); + } + debug!( + "Listing page {} of bucket {:?} (prefix={:?})", + page, bucket, prefix + ); + let res: ListResponse = self.http.get(url).send()?.error_for_status()?.json()?; + results.extend(res.items.into_iter().map(|i| i.name)); + if res.next_page_token.is_none() { + break; + } + page_token = res.next_page_token; + } + Ok(results) + } + + /// Reads partial content of an object. (To read the whole thing, pass `0..=u64::MAX`.) + /// + /// If the `range` is partially past the end of the object, the result may be shorter than + /// expected. If it's entirely past the end, the result is an empty vector. + pub fn read( + &self, + bucket: &str, + object: &str, + range: RangeInclusive, + ) -> reqwest::Result> { + let mut url = Url::parse(STORAGE_BASE).unwrap(); + url.path_segments_mut().unwrap().extend(&[bucket, object]); + // With "Range: bytes=a-b", if `b >= 2**63` then GCS ignores the range entirely. + let max_max = (1 << 63) - 1; + let range = format!("bytes={}-{}", range.start(), range.end().min(&max_max)); + let res = self.http.get(url).header("Range", range).send()?; + if res.status() == StatusCode::RANGE_NOT_SATISFIABLE { + return Ok(Vec::new()); + } + let body = res.error_for_status()?.bytes()?; + Ok(body.to_vec()) + } +} diff --git a/tensorboard/data/server/gcs/gsutil.rs b/tensorboard/data/server/gcs/gsutil.rs new file mode 100644 index 0000000000..41bede5c4a --- /dev/null +++ b/tensorboard/data/server/gcs/gsutil.rs @@ -0,0 +1,88 @@ +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +//! CLI for testing GCS integration. + +use clap::Clap; +use std::io::Write; + +use rustboard_core::gcs; + +#[clap(name = "gsutil")] +#[derive(Clap, Debug)] +struct Opts { + #[clap(long, default_value = "info")] + log_level: String, + #[clap(subcommand)] + subcmd: Subcommand, +} + +#[derive(Clap, Debug)] +enum Subcommand { + /// List objects in a bucket. + Ls(LsOpts), + /// Print (partial) object contents. + Cat(CatOpts), +} + +#[derive(Clap, Debug)] +struct LsOpts { + bucket: String, + #[clap(long, default_value = "", setting(clap::ArgSettings::AllowEmptyValues))] + prefix: String, +} + +#[derive(Clap, Debug)] +struct CatOpts { + bucket: String, + object: String, + /// Initial byte offset, inclusive [default: start of object]. + #[clap(long)] + from: Option, + /// Final byte offset, inclusive [default: end of object]. + #[clap(long)] + to: Option, +} + +fn main() { + let opts: Opts = Opts::parse(); + init_logging(&opts); + + let client = gcs::Client::new().unwrap(); + match opts.subcmd { + Subcommand::Ls(opts) => { + log::info!("ENTER gcs::Client::list"); + let objects = client.list(&opts.bucket, &opts.prefix); + log::info!("LEAVE gcs::Client::list"); + for name in objects.unwrap() { + println!("{}", name); + } + } + Subcommand::Cat(opts) => { + log::info!("ENTER gcs::Client::read"); + let range = (opts.from.unwrap_or(0))..=(opts.to.unwrap_or(u64::MAX)); + let buf = client.read(&opts.bucket, &opts.object, range); + log::info!("LEAVE gcs::Client::read"); + std::io::stdout().write_all(&buf.unwrap()).unwrap(); + } + } +} + +fn init_logging(opts: &Opts) { + use env_logger::{Builder, Env}; + Builder::from_env(Env::default().default_filter_or(&opts.log_level)) + .format_timestamp_micros() + .init(); +} diff --git a/tensorboard/data/server/lib.rs b/tensorboard/data/server/lib.rs index 5defae22e9..745c18feed 100644 --- a/tensorboard/data/server/lib.rs +++ b/tensorboard/data/server/lib.rs @@ -17,6 +17,10 @@ limitations under the License. #![allow(clippy::needless_update)] // https://github.com/rust-lang/rust-clippy/issues/6323 +/// Package version. Keep in sync with `Cargo.toml`. We don't use `env!("CARGO_PKG_VERSION")` +/// because of . +pub(crate) const VERSION: &str = "0.3.0-alpha.0"; + pub mod blob_key; pub mod cli; pub mod commit; @@ -24,6 +28,7 @@ pub mod data_compat; pub mod disk_logdir; pub mod downsample; pub mod event_file; +pub mod gcs; pub mod logdir; pub mod masked_crc; pub mod reservoir;