Skip to content

Commit

Permalink
Allow object downloads without authentication from certain remote obj…
Browse files Browse the repository at this point in the history
…ect stores (#14388)

## Description 

We want to allow object download without authentication from certain
remote stores with free egress i.e. cloudflare, wasabi, etc. This PR
adds a new downloader module which allows us to do that for S3, GCS, etc
Next step would be to integrate it in sui tool and let users benefit
from it.

## Test Plan 

Added unit test. Also tested with S3 and found it to be working
  • Loading branch information
sadhansood authored Oct 26, 2023
1 parent 552201f commit 8ef3818
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/sui-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ byteorder.workspace = true
anyhow.workspace = true
tempfile.workspace = true
tap.workspace = true
reqwest.workspace = true
percent-encoding = "2.2.0"
chrono.workspace = true
object_store.workspace = true
backoff.workspace = true
bytes.workspace = true
Expand Down
70 changes: 70 additions & 0 deletions crates/sui-storage/src/object_store/downloader/gcs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::object_store::downloader::{get, Downloader, DEFAULT_USER_AGENT};
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use object_store::path::Path;
use object_store::GetResult;
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::Client;
use reqwest::ClientBuilder;
use std::sync::Arc;

#[derive(Debug)]
struct GoogleCloudStorageClient {
client: Client,
bucket_name_encoded: String,
}

impl GoogleCloudStorageClient {
pub fn new(bucket: &str) -> Result<Self> {
let mut builder = ClientBuilder::new();
builder = builder.user_agent(DEFAULT_USER_AGENT);
let client = builder.https_only(false).build()?;
let bucket_name_encoded = percent_encode(bucket.as_bytes(), NON_ALPHANUMERIC).to_string();

Ok(Self {
client,
bucket_name_encoded,
})
}

async fn get(&self, path: &Path) -> Result<GetResult> {
let url = self.object_url(path);
get(&url, "gcs", path, &self.client).await
}

fn object_url(&self, path: &Path) -> String {
let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
format!(
"https://storage.googleapis.com/{}/{}",
self.bucket_name_encoded, encoded
)
}
}

/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/).
#[derive(Debug)]
pub struct GoogleCloudStorage {
client: Arc<GoogleCloudStorageClient>,
}

impl GoogleCloudStorage {
pub fn new(bucket: &str) -> Result<Self> {
let gcs_client = GoogleCloudStorageClient::new(bucket)?;
Ok(GoogleCloudStorage {
client: Arc::new(gcs_client),
})
}
}

#[async_trait]
impl Downloader for GoogleCloudStorage {
async fn get(&self, location: &Path) -> Result<Bytes> {
let result = self.client.get(location).await?;
let bytes = result.bytes().await?;
Ok(bytes)
}
}
44 changes: 44 additions & 0 deletions crates/sui-storage/src/object_store/downloader/local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::object_store::downloader::Downloader;
use crate::object_store::util::path_to_filesystem;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use bytes::Bytes;
use object_store::path::Path;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;

pub struct LocalStorage {
root: PathBuf,
}

impl LocalStorage {
pub fn new(directory: &std::path::Path) -> Result<Self> {
let path = fs::canonicalize(directory).context(anyhow!("Unable to canonicalize"))?;
fs::create_dir_all(&path).context(anyhow!(
"Failed to create local directory: {}",
path.display()
))?;
Ok(LocalStorage { root: path })
}
}

#[async_trait]
impl Downloader for LocalStorage {
async fn get(&self, location: &Path) -> Result<Bytes> {
let path_to_filesystem = path_to_filesystem(self.root.clone(), location)?;
let handle = tokio::task::spawn_blocking(move || {
let mut f = File::open(path_to_filesystem)
.map_err(|e| anyhow!("Failed to open file with error: {}", e.to_string()))?;
let mut buf = vec![];
f.read_to_end(&mut buf)
.context(anyhow!("Failed to read file"))?;
Ok(buf.into())
});
handle.await?
}
}
148 changes: 148 additions & 0 deletions crates/sui-storage/src/object_store/downloader/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

mod gcs;
mod local;
mod s3;

use async_trait::async_trait;
use std::sync::Arc;

use crate::object_store::downloader::gcs::GoogleCloudStorage;
use crate::object_store::downloader::local::LocalStorage;
use crate::object_store::downloader::s3::AmazonS3;
use crate::object_store::{ObjectStoreConfig, ObjectStoreType};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{Error, GetResult, GetResultPayload, ObjectMeta};
use reqwest::header::{HeaderMap, CONTENT_LENGTH, ETAG, LAST_MODIFIED};
use reqwest::{Client, Method};

// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
//
// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ).
pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
.remove(b'-')
.remove(b'.')
.remove(b'_')
.remove(b'~');
const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);

#[async_trait]
pub trait Downloader {
async fn get(&self, location: &Path) -> Result<Bytes>;
}

pub trait DownloaderBuilder {
fn make_downloader(self, use_transfer_acceleration: bool) -> Result<Arc<dyn Downloader>>;
}

impl DownloaderBuilder for ObjectStoreConfig {
fn make_downloader(self, use_transfer_acceleration: bool) -> Result<Arc<dyn Downloader>> {
match self.object_store {
Some(ObjectStoreType::File) => {
Ok(LocalStorage::new(&self.directory.unwrap()).map(Arc::new)?)
}
Some(ObjectStoreType::S3) => Ok(AmazonS3::new(
&self.bucket.unwrap(),
&self.aws_region.unwrap(),
use_transfer_acceleration,
)
.map(Arc::new)?),
Some(ObjectStoreType::GCS) => {
Ok(GoogleCloudStorage::new(&self.bucket.unwrap()).map(Arc::new)?)
}
_ => Err(anyhow!("At least one storage backend should be provided")),
}
}
}

async fn get(
url: &str,
store: &'static str,
location: &Path,
client: &Client,
) -> Result<GetResult> {
let request = client.request(Method::GET, url);
let response = request.send().await.context("failed to get")?;
let meta = header_meta(location, response.headers()).context("Failed to get header")?;
let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store,
source: Box::new(source),
})
.boxed();
Ok(GetResult {
range: 0..meta.size,
payload: GetResultPayload::Stream(stream),
meta,
})
}

fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta> {
let last_modified = headers
.get(LAST_MODIFIED)
.context("Missing last modified")?;

let content_length = headers
.get(CONTENT_LENGTH)
.context("Missing content length")?;

let last_modified = last_modified.to_str().context("bad header")?;
let last_modified = DateTime::parse_from_rfc2822(last_modified)
.context("invalid last modified")?
.with_timezone(&Utc);

let content_length = content_length.to_str().context("bad header")?;
let content_length = content_length.parse().context("invalid content length")?;

let e_tag = headers.get(ETAG).context("missing etag")?;
let e_tag = e_tag.to_str().context("bad header")?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
})
}

#[cfg(test)]
mod tests {
use crate::object_store::downloader::DownloaderBuilder;
use crate::object_store::{ObjectStoreConfig, ObjectStoreType};
use object_store::path::Path;
use std::fs;
use tempfile::TempDir;

#[tokio::test]
pub async fn test_local_download() -> anyhow::Result<()> {
let input = TempDir::new()?;
let input_path = input.path();
let child = input_path.join("child");
fs::create_dir(&child)?;
let file1 = child.join("file1");
fs::write(file1, b"Lorem ipsum")?;
let grandchild = child.join("grand_child");
fs::create_dir(&grandchild)?;
let file2 = grandchild.join("file2");
fs::write(file2, b"Lorem ipsum")?;

let input_store = ObjectStoreConfig {
object_store: Some(ObjectStoreType::File),
directory: Some(input_path.to_path_buf()),
..Default::default()
}
.make_downloader(true)?;

let downloaded = input_store.get(&Path::from("child/file1")).await?;
assert_eq!(downloaded.to_vec(), b"Lorem ipsum");
Ok(())
}
}
76 changes: 76 additions & 0 deletions crates/sui-storage/src/object_store/downloader/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::object_store::downloader::{
get, Downloader, DEFAULT_USER_AGENT, STRICT_PATH_ENCODE_SET,
};
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use object_store::path::Path;
use object_store::GetResult;
use percent_encoding::{utf8_percent_encode, PercentEncode};
use reqwest::Client;
use reqwest::ClientBuilder;
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct S3Client {
endpoint: String,
client: Client,
}

impl S3Client {
pub fn new(bucket: &str, region: &str, transfer_accelerated: bool) -> Result<Self> {
let mut builder = ClientBuilder::new();
builder = builder.user_agent(DEFAULT_USER_AGENT);
let endpoint = if transfer_accelerated {
format!("https://{bucket}.s3-accelerate.amazonaws.com")
} else {
// only use virtual hosted style requests
format!("https://{bucket}.s3.{region}.amazonaws.com")
};
let client = builder.https_only(false).build()?;
Ok(Self { endpoint, client })
}
pub fn new_with_endpoint(endpoint: String) -> Result<Self> {
let mut builder = ClientBuilder::new();
builder = builder.user_agent(DEFAULT_USER_AGENT);
let client = builder.https_only(false).build()?;
Ok(Self { endpoint, client })
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let url = self.path_url(location);
get(&url, "s3", location, &self.client).await
}
fn path_url(&self, path: &Path) -> String {
format!("{}/{}", self.endpoint, Self::encode_path(path))
}
fn encode_path(path: &Path) -> PercentEncode<'_> {
utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)
}
}

/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
#[derive(Debug)]
pub struct AmazonS3 {
client: Arc<S3Client>,
}

impl AmazonS3 {
pub fn new(bucket: &str, region: &str, transfer_accelerated: bool) -> Result<Self> {
let s3_client = S3Client::new(bucket, region, transfer_accelerated)?;
Ok(AmazonS3 {
client: Arc::new(s3_client),
})
}
}

#[async_trait]
impl Downloader for AmazonS3 {
async fn get(&self, location: &Path) -> Result<Bytes> {
let result = self.client.get(location).await?;
let bytes = result.bytes().await?;
Ok(bytes)
}
}
1 change: 1 addition & 0 deletions crates/sui-storage/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use tracing::info;

pub mod downloader;
pub mod util;

/// Object-store type.
Expand Down

1 comment on commit 8ef3818

@vercel
Copy link

@vercel vercel bot commented on 8ef3818 Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.