Skip to content

Commit

Permalink
perf: apply projection when reading checkpoint parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
alexwilcoxson-rel committed Jul 29, 2024
1 parent 13af7cb commit fda0498
Showing 1 changed file with 131 additions and 8 deletions.
139 changes: 131 additions & 8 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use itertools::Itertools;
use lazy_static::lazy_static;
use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use parquet::arrow::ProjectionMask;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tracing::debug;
Expand Down Expand Up @@ -250,19 +251,45 @@ impl LogSegment {
pub(super) fn checkpoint_stream(
&self,
store: Arc<dyn ObjectStore>,
_read_schema: &Schema,
read_schema: &Schema,
config: &DeltaTableConfig,
) -> BoxStream<'_, DeltaResult<RecordBatch>> {
let batch_size = config.log_batch_size;
let read_schema = Arc::new(read_schema.clone());
futures::stream::iter(self.checkpoint_files.clone())
.map(move |meta| {
let store = store.clone();
let read_schema = read_schema.clone();
async move {
let reader = ParquetObjectReader::new(store, meta);
let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?;
builder.with_batch_size(batch_size).build()
let mut reader = ParquetObjectReader::new(store, meta);
let options = ArrowReaderOptions::new();
let reader_meta = ArrowReaderMetadata::load_async(&mut reader, options).await?;

// Create projection selecting read_schema fields from parquet file's arrow schema
let projection = reader_meta
.schema()
.fields
.iter()
.enumerate()
.filter_map(|(i, f)| {
if read_schema.fields.contains_key(f.name()) {
Some(i)
} else {
None
}
})
.collect::<Vec<_>>();
let projection =
ProjectionMask::roots(reader_meta.parquet_schema(), projection);

// Note: the output batch stream batches have all null value rows for action types not
// present in the projection. When a RowFilter was used to remove null rows, the performance
// got worse when projecting all fields, and was no better when projecting a subset.
// The all null rows are filtered out anyway when the batch stream is consumed.
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, reader_meta)
.with_projection(projection.clone())
.with_batch_size(batch_size)
.build()
}
})
.buffered(config.log_buffer_size)
Expand Down Expand Up @@ -514,7 +541,13 @@ pub(super) mod tests {
use deltalake_test::utils::*;
use tokio::task::JoinHandle;

use crate::checkpoints::create_checkpoint_from_table_uri_and_cleanup;
use crate::{
checkpoints::{create_checkpoint_for, create_checkpoint_from_table_uri_and_cleanup},
kernel::{Action, Add, Format, Remove},
operations::transaction::{CommitBuilder, TableReference},
protocol::{DeltaOperation, SaveMode},
DeltaTableBuilder,
};

use super::*;

Expand Down Expand Up @@ -737,4 +770,94 @@ pub(super) mod tests {
assert!(!path.is_commit_file());
}
}

#[tokio::test]
async fn test_checkpoint_stream_parquet_read() {
let metadata = Metadata {
id: "test".to_string(),
format: Format::new("parquet".to_string(), None),
schema_string: r#"{"type":"struct", "fields": []}"#.to_string(),
..Default::default()
};
let protocol = Protocol::default();

let mut actions = vec![Action::Metadata(metadata), Action::Protocol(protocol)];
for i in 0..10 {
actions.push(Action::Add(Add {
path: format!("part-{}.parquet", i),
modification_time: chrono::Utc::now().timestamp_millis() as i64,
..Default::default()
}));
}

let log_store = DeltaTableBuilder::from_uri("memory:///".to_string())
.build_storage()
.unwrap();
let op = DeltaOperation::Write {
mode: SaveMode::Overwrite,
partition_by: None,
predicate: None,
};
let commit = CommitBuilder::default()
.with_actions(actions)
.build(None, log_store.clone(), op)
.await
.unwrap();

let mut actions = Vec::new();
// remove all but one file
for i in 0..9 {
actions.push(Action::Remove(Remove {
path: format!("part-{}.parquet", i),
deletion_timestamp: Some(chrono::Utc::now().timestamp_millis() as i64),
..Default::default()
}))
}

let op = DeltaOperation::Delete { predicate: None };
let table_data = &commit.snapshot as &dyn TableReference;
let commit = CommitBuilder::default()
.with_actions(actions)
.build(Some(table_data), log_store.clone(), op)
.await
.unwrap();

create_checkpoint_for(commit.version, &commit.snapshot, log_store.as_ref())
.await
.unwrap();

let batches = LogSegment::try_new(
&Path::default(),
Some(commit.version),
log_store.object_store().as_ref(),
)
.await
.unwrap()
.checkpoint_stream(
log_store.object_store(),
&StructType::new(vec![
ActionType::Metadata.schema_field().clone(),
ActionType::Protocol.schema_field().clone(),
ActionType::Add.schema_field().clone(),
]),
&Default::default(),
)
.try_collect::<Vec<_>>()
.await
.unwrap();

let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter()).unwrap();

// there are 9 remove action rows but all columns are null
// because the removes are not projected in the schema
// these get filtered out upstream and there was no perf
// benefit when applying a row filter
// in addition there is 1 add, 1 metadata, and 1 protocol row
assert_eq!(batch.num_rows(), 12);

assert_eq!(batch.schema().fields().len(), 3);
assert!(batch.schema().field_with_name("metaData").is_ok());
assert!(batch.schema().field_with_name("protocol").is_ok());
assert!(batch.schema().field_with_name("add").is_ok());
}
}

0 comments on commit fda0498

Please sign in to comment.