From 4cca0291441fe622f13db6724f8bc3efb1a31b5b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 19 Oct 2023 09:44:46 +0100 Subject: [PATCH] Return `PutResult` with an ETag from ObjectStore::put (#4934) (#4944) * Return ETag from ObjectStore::put (#4934) * Further tests * Clippy * Review feedback --- object_store/src/aws/client.rs | 12 +++- object_store/src/aws/mod.rs | 25 ++------ object_store/src/azure/mod.rs | 20 ++++--- object_store/src/chunked.rs | 3 +- object_store/src/client/header.rs | 17 +++--- object_store/src/gcp/mod.rs | 87 +++++++++++----------------- object_store/src/http/client.rs | 4 +- object_store/src/http/mod.rs | 13 ++++- object_store/src/lib.rs | 35 ++++++++++- object_store/src/limit.rs | 4 +- object_store/src/local.rs | 43 ++++++++++---- object_store/src/memory.rs | 14 +++-- object_store/src/prefix.rs | 5 +- object_store/src/throttle.rs | 5 +- object_store/tests/get_range_file.rs | 4 +- 15 files changed, 169 insertions(+), 122 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 8a45a9f3ac47..eb81e92fb932 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -21,6 +21,7 @@ use crate::aws::{ AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, }; use crate::client::get::GetClient; +use crate::client::header::get_etag; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; use crate::client::retry::RetryExt; @@ -122,6 +123,11 @@ pub(crate) enum Error { #[snafu(display("Got invalid multipart response: {}", source))] InvalidMultipartResponse { source: quick_xml::de::DeError }, + + #[snafu(display("Unable to extract metadata from headers: {}", source))] + Metadata { + source: crate::client::header::Error, + }, } impl From for crate::Error { @@ -243,12 +249,14 @@ impl S3Client { } /// Make an S3 PUT request + /// + /// Returns the ETag pub async fn put_request( &self, path: &Path, bytes: Bytes, query: &T, - ) -> Result { + ) -> Result { let credential = self.get_credential().await?; let url = self.config.path_url(path); let mut builder = self.client.request(Method::PUT, url); @@ -287,7 +295,7 @@ impl S3Client { path: path.as_ref(), })?; - Ok(response) + Ok(get_etag(response.headers()).context(MetadataSnafu)?) } /// Make an S3 Delete request diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index d3c50861c122..6d5aecea2d17 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -59,7 +59,7 @@ use crate::multipart::{PartId, PutPart, WriteMultiPart}; use crate::signer::Signer; use crate::{ ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, Path, Result, RetryConfig, + ObjectStore, Path, PutResult, Result, RetryConfig, }; mod checksum; @@ -109,12 +109,6 @@ enum Error { #[snafu(display("Missing SecretAccessKey"))] MissingSecretAccessKey, - #[snafu(display("ETag Header missing from response"))] - MissingEtag, - - #[snafu(display("Received header containing non-ASCII data"))] - BadHeader { source: reqwest::header::ToStrError }, - #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))] UnableToParseUrl { source: url::ParseError, @@ -273,9 +267,9 @@ impl Signer for AmazonS3 { #[async_trait] impl ObjectStore for AmazonS3 { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.client.put_request(location, bytes, &()).await?; - Ok(()) + async fn put(&self, location: &Path, bytes: Bytes) -> Result { + let e_tag = self.client.put_request(location, bytes, &()).await?; + Ok(PutResult { e_tag: Some(e_tag) }) } async fn put_multipart( @@ -365,10 +359,9 @@ struct S3MultiPartUpload { #[async_trait] impl PutPart for S3MultiPartUpload { async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - use reqwest::header::ETAG; let part = (part_idx + 1).to_string(); - let response = self + let content_id = self .client .put_request( &self.location, @@ -377,13 +370,7 @@ impl PutPart for S3MultiPartUpload { ) .await?; - let etag = response.headers().get(ETAG).context(MissingEtagSnafu)?; - - let etag = etag.to_str().context(BadHeaderSnafu)?; - - Ok(PartId { - content_id: etag.to_string(), - }) + Ok(PartId { content_id }) } async fn complete(&self, completed_parts: Vec) -> Result<()> { diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 2a08c6775807..0e638efc399f 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -31,7 +31,7 @@ use crate::{ multipart::{PartId, PutPart, WriteMultiPart}, path::Path, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, Result, RetryConfig, + ObjectStore, PutResult, Result, RetryConfig, }; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; @@ -62,6 +62,7 @@ mod credential; /// [`CredentialProvider`] for [`MicrosoftAzure`] pub type AzureCredentialProvider = Arc>; +use crate::client::header::get_etag; pub use credential::AzureCredential; const STORE: &str = "MicrosoftAzure"; @@ -81,9 +82,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT"; #[derive(Debug, Snafu)] #[allow(missing_docs)] enum Error { - #[snafu(display("Received header containing non-ASCII data"))] - BadHeader { source: reqwest::header::ToStrError }, - #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))] UnableToParseUrl { source: url::ParseError, @@ -126,8 +124,10 @@ enum Error { #[snafu(display("Configuration key: '{}' is not known.", key))] UnknownConfigurationKey { key: String }, - #[snafu(display("ETag Header missing from response"))] - MissingEtag, + #[snafu(display("Unable to extract metadata from headers: {}", source))] + Metadata { + source: crate::client::header::Error, + }, } impl From for super::Error { @@ -170,11 +170,13 @@ impl std::fmt::Display for MicrosoftAzure { #[async_trait] impl ObjectStore for MicrosoftAzure { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.client + async fn put(&self, location: &Path, bytes: Bytes) -> Result { + let response = self + .client .put_request(location, Some(bytes), false, &()) .await?; - Ok(()) + let e_tag = Some(get_etag(response.headers()).context(MetadataSnafu)?); + Ok(PutResult { e_tag }) } async fn put_multipart( diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index d3e02b412725..5694c55d787f 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -30,6 +30,7 @@ use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, + PutResult, }; use crate::{MultipartId, Result}; @@ -62,7 +63,7 @@ impl Display for ChunkedStore { #[async_trait] impl ObjectStore for ChunkedStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + async fn put(&self, location: &Path, bytes: Bytes) -> Result { self.inner.put(location, bytes).await } diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs index 6499eff5aebe..17f83a2ba8c8 100644 --- a/object_store/src/client/header.rs +++ b/object_store/src/client/header.rs @@ -64,6 +64,12 @@ pub enum Error { }, } +/// Extracts an etag from the provided [`HeaderMap`] +pub fn get_etag(headers: &HeaderMap) -> Result { + let e_tag = headers.get(ETAG).ok_or(Error::MissingEtag)?; + Ok(e_tag.to_str().context(BadHeaderSnafu)?.to_string()) +} + /// Extracts [`ObjectMeta`] from the provided [`HeaderMap`] pub fn header_meta( location: &Path, @@ -81,13 +87,10 @@ pub fn header_meta( None => Utc.timestamp_nanos(0), }; - let e_tag = match headers.get(ETAG) { - Some(e_tag) => { - let e_tag = e_tag.to_str().context(BadHeaderSnafu)?; - Some(e_tag.to_string()) - } - None if cfg.etag_required => return Err(Error::MissingEtag), - None => None, + let e_tag = match get_etag(headers) { + Ok(e_tag) => Some(e_tag), + Err(Error::MissingEtag) if !cfg.etag_required => None, + Err(e) => return Err(e), }; let content_length = headers diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 513e396cbae6..97755c07c671 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -54,7 +54,7 @@ use crate::{ multipart::{PartId, PutPart, WriteMultiPart}, path::{Path, DELIMITER}, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, Result, RetryConfig, + ObjectStore, PutResult, Result, RetryConfig, }; use credential::{InstanceCredentialProvider, ServiceAccountCredentials}; @@ -65,6 +65,7 @@ const STORE: &str = "GCS"; /// [`CredentialProvider`] for [`GoogleCloudStorage`] pub type GcpCredentialProvider = Arc>; +use crate::client::header::get_etag; use crate::gcp::credential::{ApplicationDefaultCredentials, DEFAULT_GCS_BASE_URL}; pub use credential::GcpCredential; @@ -155,11 +156,10 @@ enum Error { #[snafu(display("Configuration key: '{}' is not known.", key))] UnknownConfigurationKey { key: String }, - #[snafu(display("ETag Header missing from response"))] - MissingEtag, - - #[snafu(display("Received header containing non-ASCII data"))] - BadHeader { source: header::ToStrError }, + #[snafu(display("Unable to extract metadata from headers: {}", source))] + Metadata { + source: crate::client::header::Error, + }, } impl From for super::Error { @@ -247,7 +247,14 @@ impl GoogleCloudStorageClient { } /// Perform a put request - async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> { + /// + /// Returns the new ETag + async fn put_request( + &self, + path: &Path, + payload: Bytes, + query: &T, + ) -> Result { let credential = self.get_credential().await?; let url = self.object_url(path); @@ -256,8 +263,10 @@ impl GoogleCloudStorageClient { .get_content_type(path) .unwrap_or("application/octet-stream"); - self.client + let response = self + .client .request(Method::PUT, url) + .query(query) .bearer_auth(&credential.bearer) .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, payload.len()) @@ -268,7 +277,7 @@ impl GoogleCloudStorageClient { path: path.as_ref(), })?; - Ok(()) + Ok(get_etag(response.headers()).context(MetadataSnafu)?) } /// Initiate a multi-part upload @@ -469,7 +478,7 @@ impl ListClient for GoogleCloudStorageClient { struct GCSMultipartUpload { client: Arc, - encoded_path: String, + path: Path, multipart_id: MultipartId, } @@ -478,38 +487,17 @@ impl PutPart for GCSMultipartUpload { /// Upload an object part async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { let upload_id = self.multipart_id.clone(); - let url = format!( - "{}/{}/{}", - self.client.base_url, self.client.bucket_name_encoded, self.encoded_path - ); - - let credential = self.client.get_credential().await?; - - let response = self + let content_id = self .client - .client - .request(Method::PUT, &url) - .bearer_auth(&credential.bearer) - .query(&[ - ("partNumber", format!("{}", part_idx + 1)), - ("uploadId", upload_id), - ]) - .header(header::CONTENT_TYPE, "application/octet-stream") - .header(header::CONTENT_LENGTH, format!("{}", buf.len())) - .body(buf) - .send_retry(&self.client.retry_config) - .await - .context(PutRequestSnafu { - path: &self.encoded_path, - })?; - - let content_id = response - .headers() - .get("ETag") - .context(MissingEtagSnafu)? - .to_str() - .context(BadHeaderSnafu)? - .to_string(); + .put_request( + &self.path, + buf.into(), + &[ + ("partNumber", format!("{}", part_idx + 1)), + ("uploadId", upload_id), + ], + ) + .await?; Ok(PartId { content_id }) } @@ -517,10 +505,7 @@ impl PutPart for GCSMultipartUpload { /// Complete a multipart upload async fn complete(&self, completed_parts: Vec) -> Result<()> { let upload_id = self.multipart_id.clone(); - let url = format!( - "{}/{}/{}", - self.client.base_url, self.client.bucket_name_encoded, self.encoded_path - ); + let url = self.client.object_url(&self.path); let parts = completed_parts .into_iter() @@ -550,7 +535,7 @@ impl PutPart for GCSMultipartUpload { .send_retry(&self.client.retry_config) .await .context(PostRequestSnafu { - path: &self.encoded_path, + path: self.path.as_ref(), })?; Ok(()) @@ -559,8 +544,9 @@ impl PutPart for GCSMultipartUpload { #[async_trait] impl ObjectStore for GoogleCloudStorage { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.client.put_request(location, bytes).await + async fn put(&self, location: &Path, bytes: Bytes) -> Result { + let e_tag = self.client.put_request(location, bytes, &()).await?; + Ok(PutResult { e_tag: Some(e_tag) }) } async fn put_multipart( @@ -569,12 +555,9 @@ impl ObjectStore for GoogleCloudStorage { ) -> Result<(MultipartId, Box)> { let upload_id = self.client.multipart_initiate(location).await?; - let encoded_path = - percent_encode(location.to_string().as_bytes(), NON_ALPHANUMERIC).to_string(); - let inner = GCSMultipartUpload { client: Arc::clone(&self.client), - encoded_path, + path: location.clone(), multipart_id: upload_id.clone(), }; diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index b2a6ac0aa34a..4c2a7fcf8db3 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -160,7 +160,7 @@ impl Client { Ok(()) } - pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + pub async fn put(&self, location: &Path, bytes: Bytes) -> Result { let mut retry = false; loop { let url = self.path_url(location); @@ -170,7 +170,7 @@ impl Client { } match builder.send_retry(&self.retry_config).await { - Ok(_) => return Ok(()), + Ok(response) => return Ok(response), Err(source) => match source.status() { // Some implementations return 404 instead of 409 Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 2fd7850b6bbf..e41e4f990110 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -41,11 +41,12 @@ use tokio::io::AsyncWrite; use url::Url; use crate::client::get::GetClientExt; +use crate::client::header::get_etag; use crate::http::client::Client; use crate::path::Path; use crate::{ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, - ObjectMeta, ObjectStore, Result, RetryConfig, + ObjectMeta, ObjectStore, PutResult, Result, RetryConfig, }; mod client; @@ -95,8 +96,14 @@ impl std::fmt::Display for HttpStore { #[async_trait] impl ObjectStore for HttpStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.client.put(location, bytes).await + async fn put(&self, location: &Path, bytes: Bytes) -> Result { + let response = self.client.put(location, bytes).await?; + let e_tag = match get_etag(response.headers()) { + Ok(e_tag) => Some(e_tag), + Err(crate::client::header::Error::MissingEtag) => None, + Err(source) => return Err(Error::Metadata { source }.into()), + }; + Ok(PutResult { e_tag }) } async fn put_multipart( diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 9b396444fa0d..018f0f5e8dec 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -300,7 +300,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// The operation is guaranteed to be atomic, it will either successfully /// write the entirety of `bytes` to `location`, or fail. No clients /// should be able to observe a partially written object - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>; + async fn put(&self, location: &Path, bytes: Bytes) -> Result; /// Get a multi-part upload that allows writing data in chunks /// @@ -528,7 +528,7 @@ macro_rules! as_ref_impl { ($type:ty) => { #[async_trait] impl ObjectStore for $type { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + async fn put(&self, location: &Path, bytes: Bytes) -> Result { self.as_ref().put(location, bytes).await } @@ -659,6 +659,8 @@ pub struct ObjectMeta { /// The size in bytes of the object pub size: usize, /// The unique identifier for the object + /// + /// pub e_tag: Option, } @@ -850,6 +852,15 @@ impl GetResult { } } +/// Result for a put request +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PutResult { + /// The unique identifier for the object + /// + /// + pub e_tag: Option, +} + /// A specialized `Result` for object store-related errors pub type Result = std::result::Result; @@ -1383,6 +1394,26 @@ mod tests { ..GetOptions::default() }; storage.get_opts(&path, options).await.unwrap(); + + let result = storage.put(&path, "test".into()).await.unwrap(); + let new_tag = result.e_tag.unwrap(); + assert_ne!(tag, new_tag); + + let meta = storage.head(&path).await.unwrap(); + assert_eq!(meta.e_tag.unwrap(), new_tag); + + let options = GetOptions { + if_match: Some(new_tag), + ..GetOptions::default() + }; + storage.get_opts(&path, options).await.unwrap(); + + let options = GetOptions { + if_match: Some(tag), + ..GetOptions::default() + }; + let err = storage.get_opts(&path, options).await.unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); } /// Returns a chunk of length `chunk_length` diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index 00cbce023c3d..8a453813c24e 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -19,7 +19,7 @@ use crate::{ BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, - ObjectMeta, ObjectStore, Path, Result, StreamExt, + ObjectMeta, ObjectStore, Path, PutResult, Result, StreamExt, }; use async_trait::async_trait; use bytes::Bytes; @@ -72,7 +72,7 @@ impl std::fmt::Display for LimitStore { #[async_trait] impl ObjectStore for LimitStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + async fn put(&self, location: &Path, bytes: Bytes) -> Result { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.put(location, bytes).await } diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 38467c3a9e7c..4b7c96346e4d 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -20,7 +20,7 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, - ObjectStore, Result, + ObjectStore, PutResult, Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -36,6 +36,7 @@ use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; +use std::time::SystemTime; use std::{collections::BTreeSet, convert::TryFrom, io}; use std::{collections::VecDeque, path::PathBuf}; use tokio::io::AsyncWrite; @@ -270,7 +271,7 @@ impl Config { #[async_trait] impl ObjectStore for LocalFileSystem { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + async fn put(&self, location: &Path, bytes: Bytes) -> Result { let path = self.config.path_to_filesystem(location)?; maybe_spawn_blocking(move || { let (mut file, suffix) = new_staged_upload(&path)?; @@ -282,8 +283,17 @@ impl ObjectStore for LocalFileSystem { }) .map_err(|e| { let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup - e.into() - }) + e + })?; + + let metadata = file.metadata().map_err(|e| Error::Metadata { + source: e.into(), + path: path.to_string_lossy().to_string(), + })?; + + Ok(PutResult { + e_tag: Some(get_etag(&metadata)), + }) }) .await } @@ -959,24 +969,33 @@ fn last_modified(metadata: &Metadata) -> DateTime { .into() } +fn get_etag(metadata: &Metadata) -> String { + let inode = get_inode(metadata); + let size = metadata.len(); + let mtime = metadata + .modified() + .ok() + .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok()) + .unwrap_or_default() + .as_micros(); + + // Use an ETag scheme based on that used by many popular HTTP servers + // + // + format!("{inode:x}-{mtime:x}-{size:x}") +} + fn convert_metadata(metadata: Metadata, location: Path) -> Result { let last_modified = last_modified(&metadata); let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu { path: location.as_ref(), })?; - let inode = get_inode(&metadata); - let mtime = last_modified.timestamp_micros(); - - // Use an ETag scheme based on that used by many popular HTTP servers - // - // - let etag = format!("{inode:x}-{mtime:x}-{size:x}"); Ok(ObjectMeta { location, last_modified, size, - e_tag: Some(etag), + e_tag: Some(get_etag(&metadata)), }) } diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 00b330b5eb94..952b45739759 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -17,7 +17,8 @@ //! An in-memory object store implementation use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, Result, + path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, + PutResult, Result, }; use crate::{GetOptions, MultipartId}; use async_trait::async_trait; @@ -106,11 +107,12 @@ struct Storage { type SharedStorage = Arc>; impl Storage { - fn insert(&mut self, location: &Path, bytes: Bytes) { + fn insert(&mut self, location: &Path, bytes: Bytes) -> usize { let etag = self.next_etag; self.next_etag += 1; let entry = Entry::new(bytes, Utc::now(), etag); self.map.insert(location.clone(), entry); + etag } } @@ -122,9 +124,11 @@ impl std::fmt::Display for InMemory { #[async_trait] impl ObjectStore for InMemory { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.storage.write().insert(location, bytes); - Ok(()) + async fn put(&self, location: &Path, bytes: Bytes) -> Result { + let etag = self.storage.write().insert(location, bytes); + Ok(PutResult { + e_tag: Some(etag.to_string()), + }) } async fn put_multipart( diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 3776dec2e872..21f6c1d99dc9 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -23,7 +23,8 @@ use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, + Result, }; #[doc(hidden)] @@ -79,7 +80,7 @@ impl PrefixStore { #[async_trait::async_trait] impl ObjectStore for PrefixStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + async fn put(&self, location: &Path, bytes: Bytes) -> Result { let full_path = self.full_path(location); self.inner.put(&full_path, bytes).await } diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index f716a11f8a05..d6f191baf82e 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -21,7 +21,8 @@ use std::ops::Range; use std::{convert::TryInto, sync::Arc}; use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, Result, + path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, + PutResult, Result, }; use crate::{GetOptions, MultipartId}; use async_trait::async_trait; @@ -147,7 +148,7 @@ impl std::fmt::Display for ThrottledStore { #[async_trait] impl ObjectStore for ThrottledStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + async fn put(&self, location: &Path, bytes: Bytes) -> Result { sleep(self.config().wait_put_per_call).await; self.inner.put(location, bytes).await diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index 25c469260675..5703d7f24844 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -23,7 +23,7 @@ use futures::stream::BoxStream; use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, }; use std::fmt::Formatter; use tempfile::tempdir; @@ -40,7 +40,7 @@ impl std::fmt::Display for MyStore { #[async_trait] impl ObjectStore for MyStore { - async fn put(&self, path: &Path, data: Bytes) -> object_store::Result<()> { + async fn put(&self, path: &Path, data: Bytes) -> object_store::Result { self.0.put(path, data).await }