Skip to content

Commit

Permalink
Add DownloadRequestBuilder for more complex download support
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 11, 2022
1 parent 0b09ecc commit f8e9ec6
Showing 1 changed file with 92 additions and 34 deletions.
126 changes: 92 additions & 34 deletions src/client/object.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use futures_util::{stream, Stream, TryStream};
use reqwest::StatusCode;

use crate::{
error::GoogleResponse,
Expand Down Expand Up @@ -276,24 +276,8 @@ impl<'a> ObjectClient<'a> {
/// # }
/// ```
pub async fn download(&self, bucket: &str, file_name: &str) -> crate::Result<Vec<u8>> {
let url = format!(
"{}/b/{}/o/{}?alt=media",
crate::BASE_URL,
percent_encode(bucket),
percent_encode(file_name),
);
let resp = self
.0
.client
.get(&url)
.headers(self.0.get_headers().await?)
.send()
.await?;
if resp.status() == StatusCode::NOT_FOUND {
Err(crate::Error::Other(resp.text().await?))
} else {
Ok(resp.error_for_status()?.bytes().await?.to_vec())
}
let resp = self.download_request(bucket, file_name);
Ok(resp.bytes().await?.to_vec())
}

/// Download the content of the object with the specified name in the specified bucket, without
Expand Down Expand Up @@ -322,22 +306,9 @@ impl<'a> ObjectClient<'a> {
&self,
bucket: &str,
file_name: &str,
) -> crate::Result<impl Stream<Item = crate::Result<u8>> + Unpin> {
) -> crate::Result<impl Stream<Item=crate::Result<u8>> + Unpin> {
use futures_util::{StreamExt, TryStreamExt};
let url = format!(
"{}/b/{}/o/{}?alt=media",
crate::BASE_URL,
percent_encode(bucket),
percent_encode(file_name),
);
let response = self
.0
.client
.get(&url)
.headers(self.0.get_headers().await?)
.send()
.await?
.error_for_status()?;
let response = self.download_request(bucket, file_name).send().await?;
let size = response.content_length();
let bytes = response
.bytes_stream()
Expand All @@ -346,6 +317,33 @@ impl<'a> ObjectClient<'a> {
Ok(SizedByteStream::new(bytes, size))
}

/// Returns a [`DownloadRequestBuilder`] which can be used to download the content of the
/// object with the specified name in the specified bucket, with additional options
///
/// ### Example
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use cloud_storage::Client;
/// use cloud_storage::Object;
/// use futures_util::stream::StreamExt;
/// use tokio::fs::File;
/// use tokio::io::{AsyncWriteExt, BufWriter};
///
/// let client = Client::default();
/// let mut stream = client.object().download_request("my_bucket", "path/to/my/file.png").with_range(0..100).bytes_stream().await?;
/// let mut file = BufWriter::new(File::create("file.png").await.unwrap());
/// while let Some(bytes) = stream.next().await {
/// file.write_all(&bytes.unwrap()).await.unwrap();
/// }
/// file.flush().await?;
/// # Ok(())
/// # }
/// ```
pub fn download_request(&self, bucket: &str, file_name: &str) -> DownloadRequestBuilder<'_> {
DownloadRequestBuilder::new(self.0, bucket, file_name)
}

/// Updates a single object with the specified name in the specified bucket with the new
/// information in `object`.
///
Expand Down Expand Up @@ -588,3 +586,63 @@ impl<'a> ObjectClient<'a> {
// }
}
}

/// A builder used to construct a download request
pub struct DownloadRequestBuilder<'a> {
url: String,
range: Option<String>,
gcs_client: &'a super::Client,
}

impl<'a> DownloadRequestBuilder<'a> {
/// Create a new request builder
fn new(gcs_client: &'a super::Client, bucket: &str, file_name: &str) -> Self {
let url = format!(
"{}/b/{}/o/{}?alt=media",
crate::BASE_URL,
percent_encode(bucket),
percent_encode(file_name),
);

Self {
url,
gcs_client,
range: None,
}
}

/// Specify a range of bytes to download
///
/// See https://cloud.google.com/storage/docs/json_api/v1/parameters#range
pub fn with_range(self, range: impl Into<String>) -> Self {
Self {
range: Some(range.into()),
..self
}
}

/// Dispatch the request and return the response
async fn send(self) -> crate::Result<reqwest::Response> {
let headers = self.gcs_client.get_headers().await?;
let mut builder = self.gcs_client.client.get(self.url).headers(headers);

if let Some(range) = self.range {
builder = builder.header(reqwest::header::RANGE, range)
}

let resp = builder.send().await?.error_for_status()?;

Ok(resp)
}

/// Get the full response body as [`Bytes`]
pub async fn bytes(self) -> crate::Result<Bytes> {
Ok(self.send().await?.bytes().await?)
}

/// Return a stream of the downloaded bytes
pub async fn bytes_stream(self) -> crate::Result<impl Stream<Item=crate::Result<Bytes>>> {
use futures_util::stream::TryStreamExt;
Ok(self.send().await?.bytes_stream().map_err(Into::into))
}
}

0 comments on commit f8e9ec6

Please sign in to comment.