Skip to content

Commit

Permalink
Merge branch 'main' into init-integration-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan authored Oct 18, 2024
2 parents 132b4ec + 6205711 commit 33500bf
Show file tree
Hide file tree
Showing 16 changed files with 199 additions and 89 deletions.
7 changes: 3 additions & 4 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ fn scan_impl(
if let Some(predicate) = predicate {
let mut visitor_state = KernelExpressionVisitorState::new();
let exprid = (predicate.visitor)(predicate.predicate, &mut visitor_state);
if let Some(predicate) = unwrap_kernel_expression(&mut visitor_state, exprid) {
debug!("Got predicate: {}", predicate);
scan_builder = scan_builder.with_predicate(predicate);
}
let predicate = unwrap_kernel_expression(&mut visitor_state, exprid);
debug!("Got predicate: {:#?}", predicate);
scan_builder = scan_builder.with_predicate(predicate.map(Arc::new));
}
Ok(Arc::new(scan_builder.build()?).into())
}
Expand Down
9 changes: 5 additions & 4 deletions kernel/src/actions/set_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use crate::actions::visitors::SetTransactionVisitor;
use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::{DeltaResult, Engine, EngineData, Expression as Expr, SchemaRef};
use crate::{DeltaResult, Engine, EngineData, Expression, ExpressionRef, SchemaRef};

pub use crate::actions::visitors::SetTransactionMap;
pub struct SetTransactionScanner {
Expand Down Expand Up @@ -52,10 +52,11 @@ impl SetTransactionScanner {
// checkpoint part when patitioned by `add.path` like the Delta spec requires. There's no
// point filtering by a particular app id, even if we have one, because app ids are all in
// the a single checkpoint part having large min/max range (because they're usually uuids).
let meta_predicate = Expr::column("txn.appId").is_not_null();
static META_PREDICATE: LazyLock<Option<ExpressionRef>> =
LazyLock::new(|| Some(Arc::new(Expression::column("txn.appId").is_not_null())));
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema, Some(meta_predicate))
.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
}

/// Scan the Delta Log for the latest transaction entry of an application
Expand Down
62 changes: 59 additions & 3 deletions kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::sync::Arc;

use bytes::Bytes;
use futures::stream::StreamExt;
use itertools::Itertools;
use object_store::path::Path;
use object_store::DynObjectStore;
use object_store::{DynObjectStore, ObjectStore};
use url::Url;

use crate::engine::default::executor::TaskExecutor;
Expand All @@ -12,15 +13,22 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
#[derive(Debug)]
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
inner: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
readahead: usize,
}

impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
pub(crate) fn new(
store: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
) -> Self {
Self {
inner: store,
has_ordered_listing,
table_root,
task_executor,
readahead: 10,
Expand Down Expand Up @@ -72,7 +80,14 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
}
});

Ok(Box::new(receiver.into_iter()))
if !self.has_ordered_listing {
// This FS doesn't return things in the order we require
let mut fms: Vec<FileMeta> = receiver.into_iter().try_collect()?;
fms.sort_unstable();
Ok(Box::new(fms.into_iter().map(Ok)))
} else {
Ok(Box::new(receiver.into_iter()))
}
}

/// Read data specified by the start and end offset from the file.
Expand Down Expand Up @@ -144,6 +159,8 @@ mod tests {
use object_store::{local::LocalFileSystem, ObjectStore};

use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::DefaultEngine;
use crate::Engine;

use itertools::Itertools;

Expand Down Expand Up @@ -174,6 +191,7 @@ mod tests {
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);
Expand All @@ -195,4 +213,42 @@ mod tests {
assert_eq!(data[1], Bytes::from("data"));
assert_eq!(data[2], Bytes::from("el-da"));
}

#[tokio::test]
async fn test_default_engine_listing() {
let tmp = tempfile::tempdir().unwrap();
let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
let data = Bytes::from("kernel-data");

let expected_names: Vec<String> = (0..10)
.map(|i| format!("_delta_log/{:0>20}.json", i))
.collect();

// put them in in reverse order
for name in expected_names.iter().rev() {
tmp_store
.put(&Path::from(name.as_str()), data.clone().into())
.await
.unwrap();
}

let url = Url::from_directory_path(tmp.path()).unwrap();
let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from_url_path(url.path()).expect("Couldn't get path");
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
let client = engine.get_file_system_client();

let files = client.list_from(&Url::parse("file://").unwrap()).unwrap();
let mut len = 0;
for (file, expected) in files.zip(expected_names.iter()) {
assert!(
file.as_ref().unwrap().location.path().ends_with(expected),
"{} does not end with {}",
file.unwrap().location.path(),
expected
);
len += 1;
}
assert_eq!(len, 10, "list_from should have returned 10 files");
}
}
5 changes: 3 additions & 2 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Error, Expression, FileDataReadResultIterator, FileMeta, JsonHandler,
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
JsonHandler,
};

#[derive(Debug)]
Expand Down Expand Up @@ -72,7 +73,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
_predicate: Option<Expression>,
_predicate: Option<ExpressionRef>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand Down
22 changes: 22 additions & 0 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,31 @@ impl<E: TaskExecutor> DefaultEngine<E> {
/// - `table_root_path`: The root path of the table within storage.
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
// local filesystem doesn't return a sorted list by default. Although the `object_store`
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
// implementations actually do:
// - AWS:
// [`ListObjectsV2`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)
// states: "For general purpose buckets, ListObjectsV2 returns objects in lexicographical
// order based on their key names." (Directory buckets are out of scope for now)
// - Azure: Docs state
// [here](https://learn.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources):
// "A listing operation returns an XML response that contains all or part of the requested
// list. The operation returns entities in alphabetical order."
// - GCP: The [main](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) doc
// doesn't indicate order, but [this
// page](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) does say: "This page
// shows you how to list the [objects](https://cloud.google.com/storage/docs/objects) stored
// in your Cloud Storage buckets, which are ordered in the list lexicographically by name."
// So we just need to know if we're local and then if so, we sort the returned file list in
// `filesystem.rs`
let store_str = format!("{}", store);
let is_local = store_str.starts_with("LocalFileSystem");
Self {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
!is_local,
table_root,
task_executor.clone(),
)),
Expand Down
18 changes: 12 additions & 6 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_s
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};
use crate::{
DeltaResult, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler,
};

#[derive(Debug)]
pub struct DefaultParquetHandler<E: TaskExecutor> {
Expand Down Expand Up @@ -48,7 +50,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
predicate: Option<Expression>,
predicate: Option<ExpressionRef>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand Down Expand Up @@ -90,7 +92,7 @@ struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
predicate: Option<ExpressionRef>,
limit: Option<usize>,
store: Arc<DynObjectStore>,
}
Expand All @@ -99,7 +101,7 @@ impl ParquetOpener {
pub(crate) fn new(
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
predicate: Option<ExpressionRef>,
store: Arc<DynObjectStore>,
) -> Self {
Self {
Expand Down Expand Up @@ -166,14 +168,18 @@ impl FileOpener for ParquetOpener {
/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
predicate: Option<Expression>,
predicate: Option<ExpressionRef>,
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
pub(crate) fn new(
batch_size: usize,
schema: SchemaRef,
predicate: Option<ExpressionRef>,
) -> Self {
Self {
batch_size,
table_schema: schema,
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Expression, FileDataReadResultIterator, FileMeta, JsonHandler,
DeltaResult, EngineData, ExpressionRef, FileDataReadResultIterator, FileMeta, JsonHandler,
};

pub(crate) struct SyncJsonHandler;
Expand All @@ -16,7 +16,7 @@ fn try_create_from_json(
file: File,
_schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
_predicate: Option<&Expression>,
_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
let json = arrow_json::ReaderBuilder::new(arrow_schema)
.build(BufReader::new(file))?
Expand All @@ -29,7 +29,7 @@ impl JsonHandler for SyncJsonHandler {
&self,
files: &[FileMeta],
schema: SchemaRef,
predicate: Option<Expression>,
predicate: Option<ExpressionRef>,
) -> DeltaResult<FileDataReadResultIterator> {
read_files(files, schema, predicate, try_create_from_json)
}
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/engine/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::arrow_expression::ArrowExpressionHandler;
use crate::engine::arrow_data::ArrowEngineData;
use crate::{
DeltaResult, Engine, Error, Expression, ExpressionHandler, FileDataReadResultIterator,
DeltaResult, Engine, Error, ExpressionHandler, ExpressionRef, FileDataReadResultIterator,
FileMeta, FileSystemClient, JsonHandler, ParquetHandler, SchemaRef,
};

Expand Down Expand Up @@ -60,12 +60,12 @@ impl Engine for SyncEngine {
fn read_files<F, I>(
files: &[FileMeta],
schema: SchemaRef,
predicate: Option<Expression>,
predicate: Option<ExpressionRef>,
mut try_create_from_file: F,
) -> DeltaResult<FileDataReadResultIterator>
where
I: Iterator<Item = DeltaResult<ArrowEngineData>> + Send + 'static,
F: FnMut(File, SchemaRef, ArrowSchemaRef, Option<&Expression>) -> DeltaResult<I>
F: FnMut(File, SchemaRef, ArrowSchemaRef, Option<ExpressionRef>) -> DeltaResult<I>
+ Send
+ 'static,
{
Expand All @@ -88,7 +88,7 @@ where
File::open(path)?,
schema.clone(),
arrow_schema.clone(),
predicate.as_ref(),
predicate.clone(),
)
})
// Flatten to Iterator<DeltaResult<DeltaResult<ArrowEngineData>>>
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};
use crate::{DeltaResult, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler};

pub(crate) struct SyncParquetHandler;

fn try_create_from_parquet(
file: File,
schema: SchemaRef,
_arrow_schema: ArrowSchemaRef,
predicate: Option<&Expression>,
predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
let metadata = ArrowReaderMetadata::load(&file, Default::default())?;
let parquet_schema = metadata.schema();
Expand All @@ -27,7 +27,7 @@ fn try_create_from_parquet(
builder = builder.with_projection(mask);
}
if let Some(predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
builder = builder.with_row_group_filter(predicate.as_ref());
}
Ok(builder.build()?.map(move |data| {
let reordered = reorder_struct_array(data?.into(), &requested_ordering)?;
Expand All @@ -40,7 +40,7 @@ impl ParquetHandler for SyncParquetHandler {
&self,
files: &[FileMeta],
schema: SchemaRef,
predicate: Option<Expression>,
predicate: Option<ExpressionRef>,
) -> DeltaResult<FileDataReadResultIterator> {
read_files(files, schema, predicate, try_create_from_parquet)
}
Expand Down
2 changes: 2 additions & 0 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ pub enum UnaryOperator {
IsNull,
}

pub type ExpressionRef = std::sync::Arc<Expression>;

/// A SQL expression.
///
/// These expressions do not track or validate data types, other than the type
Expand Down
Loading

0 comments on commit 33500bf

Please sign in to comment.