Skip to content

Commit

Permalink
Add ObjectStore::list_with_offset (#3970) (#3973)
Browse files Browse the repository at this point in the history
* Stub out ObjectStore::list_with_offset (#3970)

* Add tests and add AWS implementation

* Update localstack

* Add further implementations
  • Loading branch information
tustvold authored Mar 30, 2023
1 parent 20522a8 commit dc07f94
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
AWS_SECRET_ACCESS_KEY: test
AWS_ENDPOINT: http://localhost:4566
run: |
docker run -d -p 4566:4566 localstack/localstack:0.14.4
docker run -d -p 4566:4566 localstack/localstack:2.0
docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
Expand Down
27 changes: 21 additions & 6 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ impl S3Client {
prefix: Option<&str>,
delimiter: bool,
token: Option<&str>,
offset: Option<&str>,
) -> Result<(ListResult, Option<String>)> {
let credential = self.get_credential().await?;
let url = self.config.bucket_endpoint.clone();
Expand All @@ -403,6 +404,10 @@ impl S3Client {
query.push(("prefix", prefix))
}

if let Some(offset) = offset {
query.push(("start-after", offset))
}

let response = self
.client
.request(Method::GET, &url)
Expand Down Expand Up @@ -433,14 +438,24 @@ impl S3Client {
&self,
prefix: Option<&Path>,
delimiter: bool,
offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>> {
let offset = offset.map(|x| x.to_string());
let prefix = format_prefix(prefix);
stream_paginated(prefix, move |prefix, token| async move {
let (r, next_token) = self
.list_request(prefix.as_deref(), delimiter, token.as_deref())
.await?;
Ok((r, prefix, next_token))
})
stream_paginated(
(prefix, offset),
move |(prefix, offset), token| async move {
let (r, next_token) = self
.list_request(
prefix.as_deref(),
delimiter,
token.as_deref(),
offset.as_deref(),
)
.await?;
Ok((r, (prefix, offset), next_token))
},
)
.boxed()
}

Expand Down
19 changes: 17 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,22 @@ impl ObjectStore for AmazonS3 {
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.client
.list_paginated(prefix, false)
.list_paginated(prefix, false, None)
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed();

Ok(stream)
}

async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.client
.list_paginated(prefix, false, Some(offset))
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed();
Expand All @@ -282,7 +297,7 @@ impl ObjectStore for AmazonS3 {
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let mut stream = self.client.list_paginated(prefix, true);
let mut stream = self.client.list_paginated(prefix, true, None);

let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();
Expand Down
8 changes: 8 additions & 0 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ impl ObjectStore for ChunkedStore {
self.inner.list(prefix).await
}

async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.inner.list_with_offset(prefix, offset).await
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
Expand Down
91 changes: 90 additions & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ use crate::util::{coalesce_ranges, collect_bytes, OBJECT_STORE_COALESCE_DEFAULT}
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -371,11 +371,33 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
///
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>>;

/// List all the objects with the given prefix and a location greater than `offset`
///
/// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce
/// the number of network requests required
///
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let offset = offset.clone();
let stream = self
.list(prefix)
.await?
.try_filter(move |f| futures::future::ready(f.location > offset))
.boxed();
Ok(stream)
}

/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
Expand Down Expand Up @@ -477,6 +499,14 @@ impl ObjectStore for Box<dyn ObjectStore> {
self.as_ref().list(prefix).await
}

async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.as_ref().list_with_offset(prefix, offset).await
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.as_ref().list_with_delimiter(prefix).await
}
Expand Down Expand Up @@ -926,6 +956,65 @@ mod tests {

let files = flatten_list_stream(storage, None).await.unwrap();
assert!(files.is_empty(), "{files:?}");

// Test list order
let files = vec![
Path::from("a a/b.file"),
Path::parse("a%2Fa.file").unwrap(),
Path::from("a/😀.file"),
Path::from("a/a file"),
Path::parse("a/a%2F.file").unwrap(),
Path::from("a/a.file"),
Path::from("a/a/b.file"),
Path::from("a/b.file"),
Path::from("aa/a.file"),
Path::from("ab/a.file"),
];

for file in &files {
storage.put(file, "foo".into()).await.unwrap();
}

let cases = [
(None, Path::from("a")),
(None, Path::from("a/a file")),
(None, Path::from("a/a/b.file")),
(None, Path::from("ab/a.file")),
(None, Path::from("a%2Fa.file")),
(None, Path::from("a/😀.file")),
(Some(Path::from("a")), Path::from("")),
(Some(Path::from("a")), Path::from("a")),
(Some(Path::from("a")), Path::from("a/😀")),
(Some(Path::from("a")), Path::from("a/😀.file")),
(Some(Path::from("a")), Path::from("a/b")),
(Some(Path::from("a")), Path::from("a/a/b.file")),
];

for (prefix, offset) in cases {
let s = storage
.list_with_offset(prefix.as_ref(), &offset)
.await
.unwrap();

let mut actual: Vec<_> =
s.map_ok(|x| x.location).try_collect().await.unwrap();

actual.sort_unstable();

let expected: Vec<_> = files
.iter()
.cloned()
.filter(|x| {
let prefix_match =
prefix.as_ref().map(|p| x.prefix_matches(p)).unwrap_or(true);
prefix_match && x > &offset
})
.collect();

assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
}

delete_fixtures(storage).await;
}

fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
Expand Down
10 changes: 10 additions & 0 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
Ok(PermitWrapper::new(s, permit).boxed())
}

async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
let s = self.inner.list_with_offset(prefix, offset).await?;
Ok(PermitWrapper::new(s, permit).boxed())
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.list_with_delimiter(prefix).await
Expand Down
61 changes: 34 additions & 27 deletions object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::MultipartId;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
use futures::{stream::BoxStream, FutureExt, StreamExt};
use std::time::Duration;
use tokio::io::AsyncWrite;

Expand Down Expand Up @@ -185,19 +185,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
GetResult::File(_, _) => unimplemented!(),
};

GetResult::Stream(
s.then(move |bytes_result| async move {
match bytes_result {
Ok(bytes) => {
let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
sleep(wait_get_per_byte * bytes_len).await;
Ok(bytes)
}
Err(err) => Err(err),
}
})
.boxed(),
)
GetResult::Stream(throttle_stream(s, move |bytes| {
let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
wait_get_per_byte * bytes_len
}))
})
}

Expand Down Expand Up @@ -247,20 +238,21 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {

// need to copy to avoid moving / referencing `self`
let wait_list_per_entry = self.config().wait_list_per_entry;
let stream = self.inner.list(prefix).await?;
Ok(throttle_stream(stream, move |_| wait_list_per_entry))
}

self.inner.list(prefix).await.map(|stream| {
stream
.then(move |result| async move {
match result {
Ok(entry) => {
sleep(wait_list_per_entry).await;
Ok(entry)
}
Err(err) => Err(err),
}
})
.boxed()
})
async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
sleep(self.config().wait_list_per_call).await;

// need to copy to avoid moving / referencing `self`
let wait_list_per_entry = self.config().wait_list_per_entry;
let stream = self.inner.list_with_offset(prefix, offset).await?;
Ok(throttle_stream(stream, move |_| wait_list_per_entry))
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down Expand Up @@ -307,6 +299,21 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
x.try_into().unwrap_or(u32::MAX)
}

fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
stream: BoxStream<'_, Result<T, E>>,
delay: F,
) -> BoxStream<'_, Result<T, E>>
where
F: Fn(&T) -> Duration + Send + Sync + 'static,
{
stream
.then(move |result| {
let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
sleep(delay).then(|_| futures::future::ready(result))
})
.boxed()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit dc07f94

Please sign in to comment.