From 532d5fb403bf7ad4ab8292a54b6f7210737cfa94 Mon Sep 17 00:00:00 2001 From: sadhansood Date: Mon, 16 Oct 2023 15:37:54 -0600 Subject: [PATCH] Download objects from remote store without authentication --- Cargo.lock | 1 + crates/sui-storage/Cargo.toml | 1 + .../src/object_store/downloader.rs | 147 ----------------- .../src/object_store/downloader/gcs.rs | 70 +++++++++ .../src/object_store/downloader/local.rs | 44 ++++++ .../src/object_store/downloader/mod.rs | 148 ++++++++++++++++++ .../src/object_store/downloader/s3.rs | 76 +++++++++ crates/sui-storage/src/object_store/mod.rs | 2 +- 8 files changed, 341 insertions(+), 148 deletions(-) delete mode 100644 crates/sui-storage/src/object_store/downloader.rs create mode 100644 crates/sui-storage/src/object_store/downloader/gcs.rs create mode 100644 crates/sui-storage/src/object_store/downloader/local.rs create mode 100644 crates/sui-storage/src/object_store/downloader/mod.rs create mode 100644 crates/sui-storage/src/object_store/downloader/s3.rs diff --git a/Cargo.lock b/Cargo.lock index bf436f311a796e..938de32ed9006c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11793,6 +11793,7 @@ dependencies = [ "bcs", "byteorder", "bytes", + "chrono", "clap", "criterion", "eyre", diff --git a/crates/sui-storage/Cargo.toml b/crates/sui-storage/Cargo.toml index 6ee0f1f2ccadf1..6b6f3814addbd6 100644 --- a/crates/sui-storage/Cargo.toml +++ b/crates/sui-storage/Cargo.toml @@ -21,6 +21,7 @@ tempfile.workspace = true tap.workspace = true reqwest = "0.11.20" percent-encoding = "2.3.0" +chrono.workspace = true object_store.workspace = true backoff.workspace = true bytes.workspace = true diff --git a/crates/sui-storage/src/object_store/downloader.rs b/crates/sui-storage/src/object_store/downloader.rs deleted file mode 100644 index fcb41dc0d09213..00000000000000 --- a/crates/sui-storage/src/object_store/downloader.rs +++ /dev/null @@ -1,147 +0,0 @@ -use std::sync::Arc; -use bytes::Bytes; -use object_store::path::Path; -use anyhow::{anyhow, Context, Result}; -use futures::{FutureExt, TryStreamExt}; -use futures::stream::BoxStream; -use hyper::Method; -use object_store::{Error, GetResult, GetResultPayload, ObjectMeta, RetryConfig}; -use reqwest::Client; -use percent_encoding::{NON_ALPHANUMERIC, PercentEncode, utf8_percent_encode}; -use reqwest::ClientBuilder; -use reqwest::header::{CONTENT_LENGTH, ETAG, HeaderMap, LAST_MODIFIED}; - -// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html -// -// Do not URI-encode any of the unreserved characters that RFC 3986 defines: -// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ). -pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = - percent_encoding::NON_ALPHANUMERIC - .remove(b'-') - .remove(b'.') - .remove(b'_') - .remove(b'~'); -const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/'); -static DEFAULT_USER_AGENT: &str = - concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); - -pub trait ObjectDownloader { - async fn get(&self, location: &Path) -> Result; -} - -#[derive(Debug)] -pub(crate) struct S3Client { - bucket: String, - transfer_accelerated: bool, - client: Client, -} - -impl S3Client { - pub fn new(bucket: &str, transfer_accelerated: bool) -> Result { - let mut builder = ClientBuilder::new(); - builder = builder.user_agent(DEFAULT_USER_AGENT); - let client = builder.https_only(false).build()?; - Ok(Self { - bucket: bucket.to_string(), - transfer_accelerated, - client - }) - } - async fn get(&self, location: &Path) -> Result { - let url = self.path_url(location); - let builder = self.client.request(method, url); - let response = builder.send_retry(&RetryConfig::default()).await - .context("Failed to get file at path:")?; - let meta = - Self::header_meta(location, response.headers()).map_err(|e| Error::Generic { - store: "S3", - source: Box::new(e), - })?; - let stream = response - .bytes_stream() - .map_err(|source| anyhow!("Error getting file: {}", source.to_string())) - .boxed(); - Ok(GetResult { - range: 0..meta.size, - payload: GetResultPayload::Stream(stream), - meta, - }) - } - fn path_url(&self, path: &Path) -> String { - format!("{}/{}", self.bucket, encode_path(path)) - } - fn encode_path(path: &Path) -> PercentEncode<'_> { - utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET) - } - pub fn header_meta(location: &Path, headers: &HeaderMap) -> Result { - let last_modified = headers - .get(LAST_MODIFIED) - .context(MissingLastModifiedSnafu)?; - - let content_length = headers - .get(CONTENT_LENGTH) - .context(MissingContentLengthSnafu)?; - - let last_modified = last_modified.to_str().context("bad header")?; - let last_modified = DateTime::parse_from_rfc2822(last_modified) - .context("invalid last modified")? - .with_timezone(&Utc); - - let content_length = content_length.to_str().context(BadHeaderSnafu)?; - let content_length = content_length - .parse() - .context(InvalidContentLengthSnafu { content_length })?; - - let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?; - let e_tag = e_tag.to_str().context(BadHeaderSnafu)?; - - Ok(ObjectMeta { - location: location.clone(), - last_modified, - size: content_length, - e_tag: Some(e_tag.to_string()), - }) - } -} - -/// Interface for [Amazon S3](https://aws.amazon.com/s3/). -#[derive(Debug)] -pub struct AmazonS3 { - client: Arc, -} - -#[derive(Debug)] -struct GoogleCloudStorageClient { - client: Client, - base_url: String, - bucket_name: String, - bucket_name_encoded: String, -} - -/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/). -#[derive(Debug)] -pub struct GoogleCloudStorage { - client: Arc, -} - -impl GoogleCloudStorageClient { - async fn get(&self, location: &Path) -> Result { - let url = self.object_url(path); - let mut request = self.client.request(Method::GET, url); - - let response = - request - .send_retry(&self.retry_config) - .await - .context(GetRequestSnafu { - path: path.as_ref(), - })?; - - Ok(response) - - } - - fn object_url(&self, path: &Path) -> String { - let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); - format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded) - } \ No newline at end of file diff --git a/crates/sui-storage/src/object_store/downloader/gcs.rs b/crates/sui-storage/src/object_store/downloader/gcs.rs new file mode 100644 index 00000000000000..9890e41570bb0b --- /dev/null +++ b/crates/sui-storage/src/object_store/downloader/gcs.rs @@ -0,0 +1,70 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::object_store::downloader::{get, Downloader, DEFAULT_USER_AGENT}; +use anyhow::Result; +use async_trait::async_trait; +use bytes::Bytes; +use object_store::path::Path; +use object_store::GetResult; +use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; +use reqwest::Client; +use reqwest::ClientBuilder; +use std::sync::Arc; + +#[derive(Debug)] +struct GoogleCloudStorageClient { + client: Client, + bucket_name_encoded: String, +} + +impl GoogleCloudStorageClient { + pub fn new(bucket: &str) -> Result { + let mut builder = ClientBuilder::new(); + builder = builder.user_agent(DEFAULT_USER_AGENT); + let client = builder.https_only(false).build()?; + let bucket_name_encoded = percent_encode(bucket.as_bytes(), NON_ALPHANUMERIC).to_string(); + + Ok(Self { + client, + bucket_name_encoded, + }) + } + + async fn get(&self, path: &Path) -> Result { + let url = self.object_url(path); + get(&url, "gcs", path, &self.client).await + } + + fn object_url(&self, path: &Path) -> String { + let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); + format!( + "https://storage.googleapis.com/{}/{}", + self.bucket_name_encoded, encoded + ) + } +} + +/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/). +#[derive(Debug)] +pub struct GoogleCloudStorage { + client: Arc, +} + +impl GoogleCloudStorage { + pub fn new(bucket: &str) -> Result { + let gcs_client = GoogleCloudStorageClient::new(bucket)?; + Ok(GoogleCloudStorage { + client: Arc::new(gcs_client), + }) + } +} + +#[async_trait] +impl Downloader for GoogleCloudStorage { + async fn get(&self, location: &Path) -> Result { + let result = self.client.get(location).await?; + let bytes = result.bytes().await?; + Ok(bytes) + } +} diff --git a/crates/sui-storage/src/object_store/downloader/local.rs b/crates/sui-storage/src/object_store/downloader/local.rs new file mode 100644 index 00000000000000..ae3fc173d7be18 --- /dev/null +++ b/crates/sui-storage/src/object_store/downloader/local.rs @@ -0,0 +1,44 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::object_store::downloader::Downloader; +use crate::object_store::util::path_to_filesystem; +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use object_store::path::Path; +use std::fs; +use std::fs::File; +use std::io::Read; +use std::path::PathBuf; + +pub struct LocalStorage { + root: PathBuf, +} + +impl LocalStorage { + pub fn new(directory: &std::path::Path) -> Result { + let path = fs::canonicalize(directory).context(anyhow!("Unable to canonicalize"))?; + fs::create_dir_all(&path).context(anyhow!( + "Failed to create local directory: {}", + path.display() + ))?; + Ok(LocalStorage { root: path }) + } +} + +#[async_trait] +impl Downloader for LocalStorage { + async fn get(&self, location: &Path) -> Result { + let path_to_filesystem = path_to_filesystem(self.root.clone(), location)?; + let handle = tokio::task::spawn_blocking(move || { + let mut f = File::open(path_to_filesystem) + .map_err(|e| anyhow!("Failed to open file with error: {}", e.to_string()))?; + let mut buf = vec![]; + f.read_to_end(&mut buf) + .context(anyhow!("Failed to read file"))?; + Ok(buf.into()) + }); + handle.await? + } +} diff --git a/crates/sui-storage/src/object_store/downloader/mod.rs b/crates/sui-storage/src/object_store/downloader/mod.rs new file mode 100644 index 00000000000000..3f63b8f3b2d427 --- /dev/null +++ b/crates/sui-storage/src/object_store/downloader/mod.rs @@ -0,0 +1,148 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +mod gcs; +mod local; +mod s3; + +use async_trait::async_trait; +use std::sync::Arc; + +use crate::object_store::downloader::gcs::GoogleCloudStorage; +use crate::object_store::downloader::local::LocalStorage; +use crate::object_store::downloader::s3::AmazonS3; +use crate::object_store::{ObjectStoreConfig, ObjectStoreType}; +use anyhow::{anyhow, Context, Result}; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use futures::{StreamExt, TryStreamExt}; +use object_store::path::Path; +use object_store::{Error, GetResult, GetResultPayload, ObjectMeta}; +use reqwest::header::{HeaderMap, CONTENT_LENGTH, ETAG, LAST_MODIFIED}; +use reqwest::{Client, Method}; + +// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html +// +// Do not URI-encode any of the unreserved characters that RFC 3986 defines: +// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ). +pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC + .remove(b'-') + .remove(b'.') + .remove(b'_') + .remove(b'~'); +const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/'); +static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); + +#[async_trait] +pub trait Downloader { + async fn get(&self, location: &Path) -> Result; +} + +pub trait DownloaderBuilder { + fn make_downloader(self, use_transfer_acceleration: bool) -> Result>; +} + +impl DownloaderBuilder for ObjectStoreConfig { + fn make_downloader(self, use_transfer_acceleration: bool) -> Result> { + match self.object_store { + Some(ObjectStoreType::File) => { + Ok(LocalStorage::new(&self.directory.unwrap()).map(Arc::new)?) + } + Some(ObjectStoreType::S3) => Ok(AmazonS3::new( + &self.bucket.unwrap(), + &self.aws_region.unwrap(), + use_transfer_acceleration, + ) + .map(Arc::new)?), + Some(ObjectStoreType::GCS) => { + Ok(GoogleCloudStorage::new(&self.bucket.unwrap()).map(Arc::new)?) + } + _ => Err(anyhow!("At least one storage backend should be provided")), + } + } +} + +async fn get( + url: &str, + store: &'static str, + location: &Path, + client: &Client, +) -> Result { + let request = client.request(Method::GET, url); + let response = request.send().await.context("failed to get")?; + let meta = header_meta(location, response.headers()).context("Failed to get header")?; + let stream = response + .bytes_stream() + .map_err(|source| Error::Generic { + store, + source: Box::new(source), + }) + .boxed(); + Ok(GetResult { + range: 0..meta.size, + payload: GetResultPayload::Stream(stream), + meta, + }) +} + +fn header_meta(location: &Path, headers: &HeaderMap) -> Result { + let last_modified = headers + .get(LAST_MODIFIED) + .context("Missing last modified")?; + + let content_length = headers + .get(CONTENT_LENGTH) + .context("Missing content length")?; + + let last_modified = last_modified.to_str().context("bad header")?; + let last_modified = DateTime::parse_from_rfc2822(last_modified) + .context("invalid last modified")? + .with_timezone(&Utc); + + let content_length = content_length.to_str().context("bad header")?; + let content_length = content_length.parse().context("invalid content length")?; + + let e_tag = headers.get(ETAG).context("missing etag")?; + let e_tag = e_tag.to_str().context("bad header")?; + + Ok(ObjectMeta { + location: location.clone(), + last_modified, + size: content_length, + e_tag: Some(e_tag.to_string()), + }) +} + +#[cfg(test)] +mod tests { + use crate::object_store::downloader::DownloaderBuilder; + use crate::object_store::{ObjectStoreConfig, ObjectStoreType}; + use object_store::path::Path; + use std::fs; + use tempfile::TempDir; + + #[tokio::test] + pub async fn test_local_download() -> anyhow::Result<()> { + let input = TempDir::new()?; + let input_path = input.path(); + let child = input_path.join("child"); + fs::create_dir(&child)?; + let file1 = child.join("file1"); + fs::write(file1, b"Lorem ipsum")?; + let grandchild = child.join("grand_child"); + fs::create_dir(&grandchild)?; + let file2 = grandchild.join("file2"); + fs::write(file2, b"Lorem ipsum")?; + + let input_store = ObjectStoreConfig { + object_store: Some(ObjectStoreType::File), + directory: Some(input_path.to_path_buf()), + ..Default::default() + } + .make_downloader(true)?; + + let downloaded = input_store.get(&Path::from("child/file1")).await?; + assert_eq!(downloaded.to_vec(), b"Lorem ipsum"); + Ok(()) + } +} diff --git a/crates/sui-storage/src/object_store/downloader/s3.rs b/crates/sui-storage/src/object_store/downloader/s3.rs new file mode 100644 index 00000000000000..e0c9eeab3cc66a --- /dev/null +++ b/crates/sui-storage/src/object_store/downloader/s3.rs @@ -0,0 +1,76 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::object_store::downloader::{ + get, Downloader, DEFAULT_USER_AGENT, STRICT_PATH_ENCODE_SET, +}; +use anyhow::Result; +use async_trait::async_trait; +use bytes::Bytes; +use object_store::path::Path; +use object_store::GetResult; +use percent_encoding::{utf8_percent_encode, PercentEncode}; +use reqwest::Client; +use reqwest::ClientBuilder; +use std::sync::Arc; + +#[derive(Debug)] +pub(crate) struct S3Client { + endpoint: String, + client: Client, +} + +impl S3Client { + pub fn new(bucket: &str, region: &str, transfer_accelerated: bool) -> Result { + let mut builder = ClientBuilder::new(); + builder = builder.user_agent(DEFAULT_USER_AGENT); + let endpoint = if transfer_accelerated { + format!("https://{bucket}.s3-accelerate.amazonaws.com") + } else { + // only use virtual hosted style requests + format!("https://{bucket}.s3.{region}.amazonaws.com") + }; + let client = builder.https_only(false).build()?; + Ok(Self { endpoint, client }) + } + pub fn new_with_endpoint(endpoint: String) -> Result { + let mut builder = ClientBuilder::new(); + builder = builder.user_agent(DEFAULT_USER_AGENT); + let client = builder.https_only(false).build()?; + Ok(Self { endpoint, client }) + } + async fn get(&self, location: &Path) -> Result { + let url = self.path_url(location); + get(&url, "s3", location, &self.client).await + } + fn path_url(&self, path: &Path) -> String { + format!("{}/{}", self.endpoint, Self::encode_path(path)) + } + fn encode_path(path: &Path) -> PercentEncode<'_> { + utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET) + } +} + +/// Interface for [Amazon S3](https://aws.amazon.com/s3/). +#[derive(Debug)] +pub struct AmazonS3 { + client: Arc, +} + +impl AmazonS3 { + pub fn new(bucket: &str, region: &str, transfer_accelerated: bool) -> Result { + let s3_client = S3Client::new(bucket, region, transfer_accelerated)?; + Ok(AmazonS3 { + client: Arc::new(s3_client), + }) + } +} + +#[async_trait] +impl Downloader for AmazonS3 { + async fn get(&self, location: &Path) -> Result { + let result = self.client.get(location).await?; + let bytes = result.bytes().await?; + Ok(bytes) + } +} diff --git a/crates/sui-storage/src/object_store/mod.rs b/crates/sui-storage/src/object_store/mod.rs index 9ae80f568fa818..b032eab27f18f6 100644 --- a/crates/sui-storage/src/object_store/mod.rs +++ b/crates/sui-storage/src/object_store/mod.rs @@ -11,8 +11,8 @@ use std::path::PathBuf; use std::sync::Arc; use tracing::info; -pub mod util; pub mod downloader; +pub mod util; /// Object-store type. #[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, ValueEnum)]