From dc07f9454251b42388c2a7cae8e3d65264d7130b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 30 Mar 2023 18:08:04 +0100 Subject: [PATCH] Add ObjectStore::list_with_offset (#3970) (#3973) * Stub out ObjectStore::list_with_offset (#3970) * Add tests and add AWS implementation * Update localstack * Add further implementations --- .github/workflows/object_store.yml | 2 +- object_store/src/aws/client.rs | 27 +++++++-- object_store/src/aws/mod.rs | 19 ++++++- object_store/src/chunked.rs | 8 +++ object_store/src/lib.rs | 91 +++++++++++++++++++++++++++++- object_store/src/limit.rs | 10 ++++ object_store/src/throttle.rs | 61 +++++++++++--------- 7 files changed, 181 insertions(+), 37 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index f182d21eef13..8e97c4440567 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -106,7 +106,7 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_ENDPOINT: http://localhost:4566 run: | - docker run -d -p 4566:4566 localstack/localstack:0.14.4 + docker run -d -p 4566:4566 localstack/localstack:2.0 docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2 aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index bd58d09676aa..7ac4b705b36c 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -382,6 +382,7 @@ impl S3Client { prefix: Option<&str>, delimiter: bool, token: Option<&str>, + offset: Option<&str>, ) -> Result<(ListResult, Option)> { let credential = self.get_credential().await?; let url = self.config.bucket_endpoint.clone(); @@ -403,6 +404,10 @@ impl S3Client { query.push(("prefix", prefix)) } + if let Some(offset) = offset { + query.push(("start-after", offset)) + } + let response = self .client .request(Method::GET, &url) @@ -433,14 +438,24 @@ impl S3Client { &self, prefix: Option<&Path>, delimiter: bool, + offset: Option<&Path>, ) -> BoxStream<'_, Result> { + let offset = offset.map(|x| x.to_string()); let prefix = format_prefix(prefix); - stream_paginated(prefix, move |prefix, token| async move { - let (r, next_token) = self - .list_request(prefix.as_deref(), delimiter, token.as_deref()) - .await?; - Ok((r, prefix, next_token)) - }) + stream_paginated( + (prefix, offset), + move |(prefix, offset), token| async move { + let (r, next_token) = self + .list_request( + prefix.as_deref(), + delimiter, + token.as_deref(), + offset.as_deref(), + ) + .await?; + Ok((r, (prefix, offset), next_token)) + }, + ) .boxed() } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 752fb2e7df9d..1e302e688978 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -273,7 +273,22 @@ impl ObjectStore for AmazonS3 { ) -> Result>> { let stream = self .client - .list_paginated(prefix, false) + .list_paginated(prefix, false, None) + .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) + .try_flatten() + .boxed(); + + Ok(stream) + } + + async fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> Result>> { + let stream = self + .client + .list_paginated(prefix, false, Some(offset)) .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) .try_flatten() .boxed(); @@ -282,7 +297,7 @@ impl ObjectStore for AmazonS3 { } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - let mut stream = self.client.list_paginated(prefix, true); + let mut stream = self.client.list_paginated(prefix, true, None); let mut common_prefixes = BTreeSet::new(); let mut objects = Vec::new(); diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 76865ef96701..aebefec61559 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -174,6 +174,14 @@ impl ObjectStore for ChunkedStore { self.inner.list(prefix).await } + async fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> Result>> { + self.inner.list_with_offset(prefix, offset).await + } + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { self.inner.list_with_delimiter(prefix).await } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 706cc076672c..5737071286c8 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -258,7 +258,7 @@ use crate::util::{coalesce_ranges, collect_bytes, OBJECT_STORE_COALESCE_DEFAULT} use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; -use futures::{stream::BoxStream, StreamExt}; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use snafu::Snafu; use std::fmt::{Debug, Formatter}; #[cfg(not(target_arch = "wasm32"))] @@ -371,11 +371,33 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of /// `foo/bar_baz/x`. + /// + /// Note: the order of returned [`ObjectMeta`] is not guaranteed async fn list( &self, prefix: Option<&Path>, ) -> Result>>; + /// List all the objects with the given prefix and a location greater than `offset` + /// + /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce + /// the number of network requests required + /// + /// Note: the order of returned [`ObjectMeta`] is not guaranteed + async fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> Result>> { + let offset = offset.clone(); + let stream = self + .list(prefix) + .await? + .try_filter(move |f| futures::future::ready(f.location > offset)) + .boxed(); + Ok(stream) + } + /// List objects with the given prefix and an implementation specific /// delimiter. Returns common prefixes (directories) in addition to object /// metadata. @@ -477,6 +499,14 @@ impl ObjectStore for Box { self.as_ref().list(prefix).await } + async fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> Result>> { + self.as_ref().list_with_offset(prefix, offset).await + } + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { self.as_ref().list_with_delimiter(prefix).await } @@ -926,6 +956,65 @@ mod tests { let files = flatten_list_stream(storage, None).await.unwrap(); assert!(files.is_empty(), "{files:?}"); + + // Test list order + let files = vec![ + Path::from("a a/b.file"), + Path::parse("a%2Fa.file").unwrap(), + Path::from("a/😀.file"), + Path::from("a/a file"), + Path::parse("a/a%2F.file").unwrap(), + Path::from("a/a.file"), + Path::from("a/a/b.file"), + Path::from("a/b.file"), + Path::from("aa/a.file"), + Path::from("ab/a.file"), + ]; + + for file in &files { + storage.put(file, "foo".into()).await.unwrap(); + } + + let cases = [ + (None, Path::from("a")), + (None, Path::from("a/a file")), + (None, Path::from("a/a/b.file")), + (None, Path::from("ab/a.file")), + (None, Path::from("a%2Fa.file")), + (None, Path::from("a/😀.file")), + (Some(Path::from("a")), Path::from("")), + (Some(Path::from("a")), Path::from("a")), + (Some(Path::from("a")), Path::from("a/😀")), + (Some(Path::from("a")), Path::from("a/😀.file")), + (Some(Path::from("a")), Path::from("a/b")), + (Some(Path::from("a")), Path::from("a/a/b.file")), + ]; + + for (prefix, offset) in cases { + let s = storage + .list_with_offset(prefix.as_ref(), &offset) + .await + .unwrap(); + + let mut actual: Vec<_> = + s.map_ok(|x| x.location).try_collect().await.unwrap(); + + actual.sort_unstable(); + + let expected: Vec<_> = files + .iter() + .cloned() + .filter(|x| { + let prefix_match = + prefix.as_ref().map(|p| x.prefix_matches(p)).unwrap_or(true); + prefix_match && x > &offset + }) + .collect(); + + assert_eq!(actual, expected, "{prefix:?} - {offset:?}"); + } + + delete_fixtures(storage).await; } fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec { diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index b3e55a918b9a..d0d9f73c5c59 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -147,6 +147,16 @@ impl ObjectStore for LimitStore { Ok(PermitWrapper::new(s, permit).boxed()) } + async fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> Result>> { + let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); + let s = self.inner.list_with_offset(prefix, offset).await?; + Ok(PermitWrapper::new(s, permit).boxed()) + } + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.list_with_delimiter(prefix).await diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 6dff64aab69c..e51303114788 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -24,7 +24,7 @@ use crate::MultipartId; use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; use async_trait::async_trait; use bytes::Bytes; -use futures::{stream::BoxStream, StreamExt}; +use futures::{stream::BoxStream, FutureExt, StreamExt}; use std::time::Duration; use tokio::io::AsyncWrite; @@ -185,19 +185,10 @@ impl ObjectStore for ThrottledStore { GetResult::File(_, _) => unimplemented!(), }; - GetResult::Stream( - s.then(move |bytes_result| async move { - match bytes_result { - Ok(bytes) => { - let bytes_len: u32 = usize_to_u32_saturate(bytes.len()); - sleep(wait_get_per_byte * bytes_len).await; - Ok(bytes) - } - Err(err) => Err(err), - } - }) - .boxed(), - ) + GetResult::Stream(throttle_stream(s, move |bytes| { + let bytes_len: u32 = usize_to_u32_saturate(bytes.len()); + wait_get_per_byte * bytes_len + })) }) } @@ -247,20 +238,21 @@ impl ObjectStore for ThrottledStore { // need to copy to avoid moving / referencing `self` let wait_list_per_entry = self.config().wait_list_per_entry; + let stream = self.inner.list(prefix).await?; + Ok(throttle_stream(stream, move |_| wait_list_per_entry)) + } - self.inner.list(prefix).await.map(|stream| { - stream - .then(move |result| async move { - match result { - Ok(entry) => { - sleep(wait_list_per_entry).await; - Ok(entry) - } - Err(err) => Err(err), - } - }) - .boxed() - }) + async fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> Result>> { + sleep(self.config().wait_list_per_call).await; + + // need to copy to avoid moving / referencing `self` + let wait_list_per_entry = self.config().wait_list_per_entry; + let stream = self.inner.list_with_offset(prefix, offset).await?; + Ok(throttle_stream(stream, move |_| wait_list_per_entry)) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { @@ -307,6 +299,21 @@ fn usize_to_u32_saturate(x: usize) -> u32 { x.try_into().unwrap_or(u32::MAX) } +fn throttle_stream( + stream: BoxStream<'_, Result>, + delay: F, +) -> BoxStream<'_, Result> +where + F: Fn(&T) -> Duration + Send + Sync + 'static, +{ + stream + .then(move |result| { + let delay = result.as_ref().ok().map(&delay).unwrap_or_default(); + sleep(delay).then(|_| futures::future::ready(result)) + }) + .boxed() +} + #[cfg(test)] mod tests { use super::*;