Skip to content

Commit

Permalink
rust: add GCS listing and reading
Browse files Browse the repository at this point in the history
Summary:
This patch implements the extent of the Google Cloud Storage protocol
that TensorBoard needs: list objects in a bucket with a given prefix,
and read partial contents of an object. It turns out to be really easy.

For comparison, [TensorFlow also rolls its own GCS client][tf]. Theirs
is more complex because it needs to handle writable files and support
general-purpose caching patterns. By contrast, we have a simple one-pass
read pattern and already assume that files are append-only, so we avoid
both the complexity and pathological interactions like #1225.

For now, this only serves public buckets and objects. Authentication is
also easy (and doesn’t require crypto or anything complicated), but, for
ease of review, we defer it to a future patch.

[tf]: https://github.com/tensorflow/tensorflow/tree/r2.4/tensorflow/core/platform/cloud

Test Plan:
Included a simple client that supports `gsutil ls` and `gsutil cat`. Run
with `RUST_LOG=debug cargo run --release --bin gsutil` and more args:

  - `ls tensorboard-bench-logs` to list all 33K objects in the bucket,
    across 34 pages of list operations (3.3s on my machine);
  - `ls tensorboard-bench-logs --prefix mnist/` to list just a single
    logdir, which should be much faster (0.1 seconds on my machine,
    which includes setting up the keep-alive connection);
  - `cat tensorboard-bench-logs mnist/README --to=11` to print the first
    12 bytes (`Range: bytes=0-11` inclusive) of an object;
  - `cat tensorboard-bench-logs mnist/README --from=9999` to print
    nothing, since the object is shorter than 9999 bytes.

wchargin-branch: rust-gcs-client
wchargin-source: d9e404df57ecf5ee80089b810835a241084ffbc8
  • Loading branch information
wchargin committed Feb 3, 2021
1 parent a890789 commit a65fa71
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 1 deletion.
15 changes: 15 additions & 0 deletions tensorboard/data/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ rust_library(
"data_compat.rs",
"downsample.rs",
"event_file.rs",
"gcs.rs",
"gcs/client.rs",
"logdir.rs",
"masked_crc.rs",
"reservoir.rs",
Expand All @@ -56,6 +58,7 @@ rust_library(
"//third_party/rust:rand",
"//third_party/rust:rand_chacha",
"//third_party/rust:rayon",
"//third_party/rust:reqwest",
"//third_party/rust:serde",
"//third_party/rust:serde_json",
"//third_party/rust:thiserror",
Expand Down Expand Up @@ -87,6 +90,18 @@ rust_binary(
],
)

rust_binary(
name = "gsutil",
srcs = ["gcs/gsutil.rs"],
edition = "2018",
deps = [
":rustboard_core",
"//third_party/rust:clap",
"//third_party/rust:env_logger",
"//third_party/rust:log",
],
)

rust_doc_test(
name = "rustboard_core_doc_test",
dep = ":rustboard_core",
Expand Down
4 changes: 4 additions & 0 deletions tensorboard/data/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ path = "main.rs"
name = "bench"
path = "bench.rs"

[[bin]]
name = "gsutil"
path = "gcs/gsutil.rs"

[lib]
name = "rustboard_core"
path = "lib.rs"
Expand Down
2 changes: 1 addition & 1 deletion tensorboard/data/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::server::DataProviderHandler;
use data::tensor_board_data_provider_server::TensorBoardDataProviderServer;

#[derive(Clap, Debug)]
#[clap(name = "rustboard", version = "0.3.0-alpha.0")]
#[clap(name = "rustboard", version = crate::VERSION)]
struct Opts {
/// Log directory to load
///
Expand Down
20 changes: 20 additions & 0 deletions tensorboard/data/server/gcs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

//! Google Cloud Storage interop.
mod client;

pub use client::Client;
116 changes: 116 additions & 0 deletions tensorboard/data/server/gcs/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

//! Client for listing and reading GCS files.
use log::debug;
use reqwest::{blocking::Client as HttpClient, StatusCode, Url};
use std::ops::RangeInclusive;

/// Base URL for direct object reads.
const STORAGE_BASE: &str = "https://storage.googleapis.com";
/// Base URL for JSON API access.
const API_BASE: &str = "https://www.googleapis.com/storage/v1";

/// GCS client.
pub struct Client {
http: HttpClient,
}

impl Client {
/// Creates a new GCS client.
///
/// May fail if constructing the underlying HTTP client fails.
pub fn new() -> reqwest::Result<Self> {
let http = HttpClient::builder()
.user_agent(format!("tensorboard-data-server/{}", crate::VERSION))
.build()?;
Ok(Self { http })
}
}

/// Response from the `/b/<bucket>/o` object listing API.
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ListResponse {
/// Continuation token; only present when there is more data.
next_page_token: Option<String>,
/// List of objects, sorted by name.
items: Vec<ListResponseItem>,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ListResponseItem {
/// Full GCS object name, possibly including slashes, but not including the bucket.
name: String,
}

impl Client {
/// Lists all objects in a bucket matching the given prefix.
pub fn list(&self, bucket: &str, prefix: &str) -> reqwest::Result<Vec<String>> {
let mut base_url = Url::parse(API_BASE).unwrap();
base_url
.path_segments_mut()
.unwrap()
.extend(&["b", bucket, "o"]);
base_url
.query_pairs_mut()
.append_pair("prefix", prefix)
.append_pair("prettyPrint", "false")
.append_pair("fields", "nextPageToken,items/name");
let mut results = Vec::new();
let mut page_token: Option<String> = None;
for page in 1.. {
let mut url = base_url.clone();
if let Some(t) = page_token {
url.query_pairs_mut().append_pair("pageToken", t.as_str());
}
debug!(
"Listing page {} of bucket {:?} (prefix={:?})",
page, bucket, prefix
);
let res: ListResponse = self.http.get(url).send()?.error_for_status()?.json()?;
results.extend(res.items.into_iter().map(|i| i.name));
if res.next_page_token.is_none() {
break;
}
page_token = res.next_page_token;
}
Ok(results)
}

/// Reads partial content of an object. (To read the whole thing, pass `0..=u64::MAX`.)
///
/// If the `range` is partially past the end of the object, the result may be shorter than
/// expected. If it's entirely past the end, the result is an empty vector.
pub fn read(
&self,
bucket: &str,
object: &str,
range: RangeInclusive<u64>,
) -> reqwest::Result<Vec<u8>> {
let mut url = Url::parse(STORAGE_BASE).unwrap();
url.path_segments_mut().unwrap().extend(&[bucket, object]);
// With "Range: bytes=a-b", if `b >= 2**63` then GCS ignores the range entirely.
let max_max = (1 << 63) - 1;
let range = format!("bytes={}-{}", range.start(), range.end().min(&max_max));
let res = self.http.get(url).header("Range", range).send()?;
if res.status() == StatusCode::RANGE_NOT_SATISFIABLE {
return Ok(Vec::new());
}
let body = res.error_for_status()?.bytes()?;
Ok(body.to_vec())
}
}
88 changes: 88 additions & 0 deletions tensorboard/data/server/gcs/gsutil.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

//! CLI for testing GCS integration.
use clap::Clap;
use std::io::Write;

use rustboard_core::gcs;

#[clap(name = "gsutil")]
#[derive(Clap, Debug)]
struct Opts {
#[clap(long, default_value = "info")]
log_level: String,
#[clap(subcommand)]
subcmd: Subcommand,
}

#[derive(Clap, Debug)]
enum Subcommand {
/// List objects in a bucket.
Ls(LsOpts),
/// Print (partial) object contents.
Cat(CatOpts),
}

#[derive(Clap, Debug)]
struct LsOpts {
bucket: String,
#[clap(long, default_value = "", setting(clap::ArgSettings::AllowEmptyValues))]
prefix: String,
}

#[derive(Clap, Debug)]
struct CatOpts {
bucket: String,
object: String,
/// Initial byte offset, inclusive [default: start of object].
#[clap(long)]
from: Option<u64>,
/// Final byte offset, inclusive [default: end of object].
#[clap(long)]
to: Option<u64>,
}

fn main() {
let opts: Opts = Opts::parse();
init_logging(&opts);

let client = gcs::Client::new().unwrap();
match opts.subcmd {
Subcommand::Ls(opts) => {
log::info!("ENTER gcs::Client::list");
let objects = client.list(&opts.bucket, &opts.prefix);
log::info!("LEAVE gcs::Client::list");
for name in objects.unwrap() {
println!("{}", name);
}
}
Subcommand::Cat(opts) => {
log::info!("ENTER gcs::Client::read");
let range = (opts.from.unwrap_or(0))..=(opts.to.unwrap_or(u64::MAX));
let buf = client.read(&opts.bucket, &opts.object, range);
log::info!("LEAVE gcs::Client::read");
std::io::stdout().write_all(&buf.unwrap()).unwrap();
}
}
}

fn init_logging(opts: &Opts) {
use env_logger::{Builder, Env};
Builder::from_env(Env::default().default_filter_or(&opts.log_level))
.format_timestamp_micros()
.init();
}
5 changes: 5 additions & 0 deletions tensorboard/data/server/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ limitations under the License.
#![allow(clippy::needless_update)] // https://github.com/rust-lang/rust-clippy/issues/6323

/// Package version. Keep in sync with `Cargo.toml`. We don't use `env!("CARGO_PKG_VERSION")`
/// because of <https://github.com/bazelbuild/rules_rust/issues/573>.
pub(crate) const VERSION: &str = "0.3.0-alpha.0";

pub mod blob_key;
pub mod cli;
pub mod commit;
pub mod data_compat;
pub mod downsample;
pub mod event_file;
pub mod gcs;
pub mod logdir;
pub mod masked_crc;
pub mod reservoir;
Expand Down

0 comments on commit a65fa71

Please sign in to comment.