forked from MystenLabs/sui
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Allow object downloads without authentication from certain remote obj…
…ect stores (MystenLabs#14388) ## Description We want to allow object download without authentication from certain remote stores with free egress i.e. cloudflare, wasabi, etc. This PR adds a new downloader module which allows us to do that for S3, GCS, etc Next step would be to integrate it in sui tool and let users benefit from it. ## Test Plan Added unit test. Also tested with S3 and found it to be working
- Loading branch information
1 parent
32f6a25
commit b10fdbf
Showing
7 changed files
with
345 additions
and
0 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
// Copyright (c) Mysten Labs, Inc. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::object_store::downloader::{get, Downloader, DEFAULT_USER_AGENT}; | ||
use anyhow::Result; | ||
use async_trait::async_trait; | ||
use bytes::Bytes; | ||
use object_store::path::Path; | ||
use object_store::GetResult; | ||
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; | ||
use reqwest::Client; | ||
use reqwest::ClientBuilder; | ||
use std::sync::Arc; | ||
|
||
#[derive(Debug)] | ||
struct GoogleCloudStorageClient { | ||
client: Client, | ||
bucket_name_encoded: String, | ||
} | ||
|
||
impl GoogleCloudStorageClient { | ||
pub fn new(bucket: &str) -> Result<Self> { | ||
let mut builder = ClientBuilder::new(); | ||
builder = builder.user_agent(DEFAULT_USER_AGENT); | ||
let client = builder.https_only(false).build()?; | ||
let bucket_name_encoded = percent_encode(bucket.as_bytes(), NON_ALPHANUMERIC).to_string(); | ||
|
||
Ok(Self { | ||
client, | ||
bucket_name_encoded, | ||
}) | ||
} | ||
|
||
async fn get(&self, path: &Path) -> Result<GetResult> { | ||
let url = self.object_url(path); | ||
get(&url, "gcs", path, &self.client).await | ||
} | ||
|
||
fn object_url(&self, path: &Path) -> String { | ||
let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); | ||
format!( | ||
"https://storage.googleapis.com/{}/{}", | ||
self.bucket_name_encoded, encoded | ||
) | ||
} | ||
} | ||
|
||
/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/). | ||
#[derive(Debug)] | ||
pub struct GoogleCloudStorage { | ||
client: Arc<GoogleCloudStorageClient>, | ||
} | ||
|
||
impl GoogleCloudStorage { | ||
pub fn new(bucket: &str) -> Result<Self> { | ||
let gcs_client = GoogleCloudStorageClient::new(bucket)?; | ||
Ok(GoogleCloudStorage { | ||
client: Arc::new(gcs_client), | ||
}) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Downloader for GoogleCloudStorage { | ||
async fn get(&self, location: &Path) -> Result<Bytes> { | ||
let result = self.client.get(location).await?; | ||
let bytes = result.bytes().await?; | ||
Ok(bytes) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
// Copyright (c) Mysten Labs, Inc. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::object_store::downloader::Downloader; | ||
use crate::object_store::util::path_to_filesystem; | ||
use anyhow::{anyhow, Context, Result}; | ||
use async_trait::async_trait; | ||
use bytes::Bytes; | ||
use object_store::path::Path; | ||
use std::fs; | ||
use std::fs::File; | ||
use std::io::Read; | ||
use std::path::PathBuf; | ||
|
||
pub struct LocalStorage { | ||
root: PathBuf, | ||
} | ||
|
||
impl LocalStorage { | ||
pub fn new(directory: &std::path::Path) -> Result<Self> { | ||
let path = fs::canonicalize(directory).context(anyhow!("Unable to canonicalize"))?; | ||
fs::create_dir_all(&path).context(anyhow!( | ||
"Failed to create local directory: {}", | ||
path.display() | ||
))?; | ||
Ok(LocalStorage { root: path }) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Downloader for LocalStorage { | ||
async fn get(&self, location: &Path) -> Result<Bytes> { | ||
let path_to_filesystem = path_to_filesystem(self.root.clone(), location)?; | ||
let handle = tokio::task::spawn_blocking(move || { | ||
let mut f = File::open(path_to_filesystem) | ||
.map_err(|e| anyhow!("Failed to open file with error: {}", e.to_string()))?; | ||
let mut buf = vec![]; | ||
f.read_to_end(&mut buf) | ||
.context(anyhow!("Failed to read file"))?; | ||
Ok(buf.into()) | ||
}); | ||
handle.await? | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
// Copyright (c) Mysten Labs, Inc. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
mod gcs; | ||
mod local; | ||
mod s3; | ||
|
||
use async_trait::async_trait; | ||
use std::sync::Arc; | ||
|
||
use crate::object_store::downloader::gcs::GoogleCloudStorage; | ||
use crate::object_store::downloader::local::LocalStorage; | ||
use crate::object_store::downloader::s3::AmazonS3; | ||
use crate::object_store::{ObjectStoreConfig, ObjectStoreType}; | ||
use anyhow::{anyhow, Context, Result}; | ||
use bytes::Bytes; | ||
use chrono::{DateTime, Utc}; | ||
use futures::{StreamExt, TryStreamExt}; | ||
use object_store::path::Path; | ||
use object_store::{Error, GetResult, GetResultPayload, ObjectMeta}; | ||
use reqwest::header::{HeaderMap, CONTENT_LENGTH, ETAG, LAST_MODIFIED}; | ||
use reqwest::{Client, Method}; | ||
|
||
// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html | ||
// | ||
// Do not URI-encode any of the unreserved characters that RFC 3986 defines: | ||
// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ). | ||
pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC | ||
.remove(b'-') | ||
.remove(b'.') | ||
.remove(b'_') | ||
.remove(b'~'); | ||
const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/'); | ||
static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); | ||
|
||
#[async_trait] | ||
pub trait Downloader { | ||
async fn get(&self, location: &Path) -> Result<Bytes>; | ||
} | ||
|
||
pub trait DownloaderBuilder { | ||
fn make_downloader(self, use_transfer_acceleration: bool) -> Result<Arc<dyn Downloader>>; | ||
} | ||
|
||
impl DownloaderBuilder for ObjectStoreConfig { | ||
fn make_downloader(self, use_transfer_acceleration: bool) -> Result<Arc<dyn Downloader>> { | ||
match self.object_store { | ||
Some(ObjectStoreType::File) => { | ||
Ok(LocalStorage::new(&self.directory.unwrap()).map(Arc::new)?) | ||
} | ||
Some(ObjectStoreType::S3) => Ok(AmazonS3::new( | ||
&self.bucket.unwrap(), | ||
&self.aws_region.unwrap(), | ||
use_transfer_acceleration, | ||
) | ||
.map(Arc::new)?), | ||
Some(ObjectStoreType::GCS) => { | ||
Ok(GoogleCloudStorage::new(&self.bucket.unwrap()).map(Arc::new)?) | ||
} | ||
_ => Err(anyhow!("At least one storage backend should be provided")), | ||
} | ||
} | ||
} | ||
|
||
async fn get( | ||
url: &str, | ||
store: &'static str, | ||
location: &Path, | ||
client: &Client, | ||
) -> Result<GetResult> { | ||
let request = client.request(Method::GET, url); | ||
let response = request.send().await.context("failed to get")?; | ||
let meta = header_meta(location, response.headers()).context("Failed to get header")?; | ||
let stream = response | ||
.bytes_stream() | ||
.map_err(|source| Error::Generic { | ||
store, | ||
source: Box::new(source), | ||
}) | ||
.boxed(); | ||
Ok(GetResult { | ||
range: 0..meta.size, | ||
payload: GetResultPayload::Stream(stream), | ||
meta, | ||
}) | ||
} | ||
|
||
fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta> { | ||
let last_modified = headers | ||
.get(LAST_MODIFIED) | ||
.context("Missing last modified")?; | ||
|
||
let content_length = headers | ||
.get(CONTENT_LENGTH) | ||
.context("Missing content length")?; | ||
|
||
let last_modified = last_modified.to_str().context("bad header")?; | ||
let last_modified = DateTime::parse_from_rfc2822(last_modified) | ||
.context("invalid last modified")? | ||
.with_timezone(&Utc); | ||
|
||
let content_length = content_length.to_str().context("bad header")?; | ||
let content_length = content_length.parse().context("invalid content length")?; | ||
|
||
let e_tag = headers.get(ETAG).context("missing etag")?; | ||
let e_tag = e_tag.to_str().context("bad header")?; | ||
|
||
Ok(ObjectMeta { | ||
location: location.clone(), | ||
last_modified, | ||
size: content_length, | ||
e_tag: Some(e_tag.to_string()), | ||
}) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use crate::object_store::downloader::DownloaderBuilder; | ||
use crate::object_store::{ObjectStoreConfig, ObjectStoreType}; | ||
use object_store::path::Path; | ||
use std::fs; | ||
use tempfile::TempDir; | ||
|
||
#[tokio::test] | ||
pub async fn test_local_download() -> anyhow::Result<()> { | ||
let input = TempDir::new()?; | ||
let input_path = input.path(); | ||
let child = input_path.join("child"); | ||
fs::create_dir(&child)?; | ||
let file1 = child.join("file1"); | ||
fs::write(file1, b"Lorem ipsum")?; | ||
let grandchild = child.join("grand_child"); | ||
fs::create_dir(&grandchild)?; | ||
let file2 = grandchild.join("file2"); | ||
fs::write(file2, b"Lorem ipsum")?; | ||
|
||
let input_store = ObjectStoreConfig { | ||
object_store: Some(ObjectStoreType::File), | ||
directory: Some(input_path.to_path_buf()), | ||
..Default::default() | ||
} | ||
.make_downloader(true)?; | ||
|
||
let downloaded = input_store.get(&Path::from("child/file1")).await?; | ||
assert_eq!(downloaded.to_vec(), b"Lorem ipsum"); | ||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
// Copyright (c) Mysten Labs, Inc. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::object_store::downloader::{ | ||
get, Downloader, DEFAULT_USER_AGENT, STRICT_PATH_ENCODE_SET, | ||
}; | ||
use anyhow::Result; | ||
use async_trait::async_trait; | ||
use bytes::Bytes; | ||
use object_store::path::Path; | ||
use object_store::GetResult; | ||
use percent_encoding::{utf8_percent_encode, PercentEncode}; | ||
use reqwest::Client; | ||
use reqwest::ClientBuilder; | ||
use std::sync::Arc; | ||
|
||
#[derive(Debug)] | ||
pub(crate) struct S3Client { | ||
endpoint: String, | ||
client: Client, | ||
} | ||
|
||
impl S3Client { | ||
pub fn new(bucket: &str, region: &str, transfer_accelerated: bool) -> Result<Self> { | ||
let mut builder = ClientBuilder::new(); | ||
builder = builder.user_agent(DEFAULT_USER_AGENT); | ||
let endpoint = if transfer_accelerated { | ||
format!("https://{bucket}.s3-accelerate.amazonaws.com") | ||
} else { | ||
// only use virtual hosted style requests | ||
format!("https://{bucket}.s3.{region}.amazonaws.com") | ||
}; | ||
let client = builder.https_only(false).build()?; | ||
Ok(Self { endpoint, client }) | ||
} | ||
pub fn new_with_endpoint(endpoint: String) -> Result<Self> { | ||
let mut builder = ClientBuilder::new(); | ||
builder = builder.user_agent(DEFAULT_USER_AGENT); | ||
let client = builder.https_only(false).build()?; | ||
Ok(Self { endpoint, client }) | ||
} | ||
async fn get(&self, location: &Path) -> Result<GetResult> { | ||
let url = self.path_url(location); | ||
get(&url, "s3", location, &self.client).await | ||
} | ||
fn path_url(&self, path: &Path) -> String { | ||
format!("{}/{}", self.endpoint, Self::encode_path(path)) | ||
} | ||
fn encode_path(path: &Path) -> PercentEncode<'_> { | ||
utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET) | ||
} | ||
} | ||
|
||
/// Interface for [Amazon S3](https://aws.amazon.com/s3/). | ||
#[derive(Debug)] | ||
pub struct AmazonS3 { | ||
client: Arc<S3Client>, | ||
} | ||
|
||
impl AmazonS3 { | ||
pub fn new(bucket: &str, region: &str, transfer_accelerated: bool) -> Result<Self> { | ||
let s3_client = S3Client::new(bucket, region, transfer_accelerated)?; | ||
Ok(AmazonS3 { | ||
client: Arc::new(s3_client), | ||
}) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Downloader for AmazonS3 { | ||
async fn get(&self, location: &Path) -> Result<Bytes> { | ||
let result = self.client.get(location).await?; | ||
let bytes = result.bytes().await?; | ||
Ok(bytes) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters