Skip to content

Commit

Permalink
Ensure we return a sorted listing when using a local client (#344)
Browse files Browse the repository at this point in the history
This ensures that calls to `list_files` are returned sorted if a
`LocalFileSystem` `ObjectStore` is used.

It's not clean for a few reasons:
1. We have to materialize the entire iter in order to sort it
2. We can't easily detect if we're local just by using `type_of` or
similar. We have a foreign trait object that doesn't have an `as_any` on
it, so we can force the reference into an `Any` which would allow us to
use [`is`](https://doc.rust-lang.org/std/any/trait.Any.html#method.is),
and we can't _add_ an implementation of something like `as_any` because
`dyn ObjectStore` isn't `Sized`. So this resorts to `format`ing the
object at creation (since `ObjectStore` requires `Display`) and checking
if it starts with `LocalFileSystem`....

Adds a test that when using local client things come back sorted now.
Without these changes the test failed.

I think we should merge (or something like it) and then also see about
adding a `list_sorted` and `list_with_offset_sorted` to `ObjectStore`

---------

Co-authored-by: Nick Lanham <[email protected]>
  • Loading branch information
nicklan and Nick Lanham authored Oct 18, 2024
1 parent 9b2e7e3 commit 6205711
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 8 deletions.
62 changes: 59 additions & 3 deletions kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::sync::Arc;

use bytes::Bytes;
use futures::stream::StreamExt;
use itertools::Itertools;
use object_store::path::Path;
use object_store::DynObjectStore;
use object_store::{DynObjectStore, ObjectStore};
use url::Url;

use crate::engine::default::executor::TaskExecutor;
Expand All @@ -12,15 +13,22 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
#[derive(Debug)]
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
inner: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
readahead: usize,
}

impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
pub(crate) fn new(
store: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
) -> Self {
Self {
inner: store,
has_ordered_listing,
table_root,
task_executor,
readahead: 10,
Expand Down Expand Up @@ -72,7 +80,14 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
}
});

Ok(Box::new(receiver.into_iter()))
if !self.has_ordered_listing {
// This FS doesn't return things in the order we require
let mut fms: Vec<FileMeta> = receiver.into_iter().try_collect()?;
fms.sort_unstable();
Ok(Box::new(fms.into_iter().map(Ok)))
} else {
Ok(Box::new(receiver.into_iter()))
}
}

/// Read data specified by the start and end offset from the file.
Expand Down Expand Up @@ -144,6 +159,8 @@ mod tests {
use object_store::{local::LocalFileSystem, ObjectStore};

use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::DefaultEngine;
use crate::Engine;

use itertools::Itertools;

Expand Down Expand Up @@ -174,6 +191,7 @@ mod tests {
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);
Expand All @@ -195,4 +213,42 @@ mod tests {
assert_eq!(data[1], Bytes::from("data"));
assert_eq!(data[2], Bytes::from("el-da"));
}

#[tokio::test]
async fn test_default_engine_listing() {
let tmp = tempfile::tempdir().unwrap();
let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
let data = Bytes::from("kernel-data");

let expected_names: Vec<String> = (0..10)
.map(|i| format!("_delta_log/{:0>20}.json", i))
.collect();

// put them in in reverse order
for name in expected_names.iter().rev() {
tmp_store
.put(&Path::from(name.as_str()), data.clone().into())
.await
.unwrap();
}

let url = Url::from_directory_path(tmp.path()).unwrap();
let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from_url_path(url.path()).expect("Couldn't get path");
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
let client = engine.get_file_system_client();

let files = client.list_from(&Url::parse("file://").unwrap()).unwrap();
let mut len = 0;
for (file, expected) in files.zip(expected_names.iter()) {
assert!(
file.as_ref().unwrap().location.path().ends_with(expected),
"{} does not end with {}",
file.unwrap().location.path(),
expected
);
len += 1;
}
assert_eq!(len, 10, "list_from should have returned 10 files");
}
}
22 changes: 22 additions & 0 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,31 @@ impl<E: TaskExecutor> DefaultEngine<E> {
/// - `table_root_path`: The root path of the table within storage.
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
// local filesystem doesn't return a sorted list by default. Although the `object_store`
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
// implementations actually do:
// - AWS:
// [`ListObjectsV2`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)
// states: "For general purpose buckets, ListObjectsV2 returns objects in lexicographical
// order based on their key names." (Directory buckets are out of scope for now)
// - Azure: Docs state
// [here](https://learn.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources):
// "A listing operation returns an XML response that contains all or part of the requested
// list. The operation returns entities in alphabetical order."
// - GCP: The [main](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) doc
// doesn't indicate order, but [this
// page](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) does say: "This page
// shows you how to list the [objects](https://cloud.google.com/storage/docs/objects) stored
// in your Cloud Storage buckets, which are ordered in the list lexicographically by name."
// So we just need to know if we're local and then if so, we sort the returned file list in
// `filesystem.rs`
let store_str = format!("{}", store);
let is_local = store_str.starts_with("LocalFileSystem");
Self {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
!is_local,
table_root,
task_executor.clone(),
)),
Expand Down
14 changes: 13 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
rust_2021_compatibility
)]

use std::ops::Range;
use std::sync::Arc;
use std::{cmp::Ordering, ops::Range};

use bytes::Bytes;
use url::Url;
Expand Down Expand Up @@ -111,6 +111,18 @@ pub struct FileMeta {
pub size: usize,
}

impl Ord for FileMeta {
fn cmp(&self, other: &Self) -> Ordering {
self.location.cmp(&other.location)
}
}

impl PartialOrd for FileMeta {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// Trait for implementing an Expression evaluator.
///
/// It contains one Expression which can be evaluated on multiple ColumnarBatches.
Expand Down
26 changes: 22 additions & 4 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,15 @@ fn list_log_files_with_checkpoint(
)));
}

// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version));
debug_assert!(
commit_files
.windows(2)
.all(|cfs| cfs[0].version <= cfs[1].version),
"fs_client.list_from() didn't return a sorted listing! {:?}",
commit_files
);
// We assume listing returned ordered, we want reverse order
let commit_files = commit_files.into_iter().rev().collect();

Ok((commit_files, checkpoint_files))
}
Expand Down Expand Up @@ -443,8 +450,16 @@ fn list_log_files(
}

commit_files.retain(|f| f.version as i64 > max_checkpoint_version);
// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version));

debug_assert!(
commit_files
.windows(2)
.all(|cfs| cfs[0].version <= cfs[1].version),
"fs_client.list_from() didn't return a sorted listing! {:?}",
commit_files
);
// We assume listing returned ordered, we want reverse order
let commit_files = commit_files.into_iter().rev().collect();

Ok((commit_files, checkpoint_files))
}
Expand Down Expand Up @@ -523,6 +538,7 @@ mod tests {
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);
Expand Down Expand Up @@ -582,6 +598,7 @@ mod tests {

let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
Expand Down Expand Up @@ -626,6 +643,7 @@ mod tests {

let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
Expand Down

0 comments on commit 6205711

Please sign in to comment.