diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index bab783a9e4bd..e7b1a75086d4 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -72,11 +72,17 @@ message Statistics { bool is_exact = 4; } +message FileRange { + int64 start = 1; + int64 end = 2; +} + message PartitionedFile { string path = 1; uint64 size = 2; uint64 last_modified_ns = 3; repeated datafusion.ScalarValue partition_values = 4; + FileRange range = 5; } message CsvFormat { diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index cc7f866e91a0..7e759bd606eb 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -30,7 +30,7 @@ use chrono::{TimeZone, Utc}; use datafusion::datafusion_data_access::{ object_store::local::LocalFileSystem, FileMeta, SizedFile, }; -use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::listing::{FileRange, PartitionedFile}; use datafusion::execution::context::ExecutionProps; use datafusion::logical_plan::FunctionRegistry; @@ -301,6 +301,18 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .iter() .map(|v| v.try_into()) .collect::, _>>()?, + range: val.range.as_ref().map(|v| v.try_into()).transpose()?, + }) + } +} + +impl TryFrom<&protobuf::FileRange> for FileRange { + type Error = BallistaError; + + fn try_from(value: &protobuf::FileRange) -> Result { + Ok(FileRange { + start: value.start, + end: value.end, }) } } diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 5b9fb1b25235..3a1f24d0f6d1 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ Statistics, }; -use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::listing::{FileRange, PartitionedFile}; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::expressions::{Count, Literal}; @@ -354,6 +354,18 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .iter() .map(|v| v.try_into()) .collect::, _>>()?, + range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, + }) + } +} + +impl TryFrom<&FileRange> for protobuf::FileRange { + type Error = BallistaError; + + fn try_from(value: &FileRange) -> Result { + Ok(protobuf::FileRange { + start: value.start, + end: value.end, }) } } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index a236bf1e91bb..a6fd125437db 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -174,6 +174,7 @@ pub async fn pruned_partition_list( Ok(PartitionedFile { partition_values: vec![], file_meta: f?, + range: None, }) }), )); @@ -217,6 +218,7 @@ pub async fn pruned_partition_list( Ok(PartitionedFile { partition_values, file_meta, + range: None, }) }) } @@ -358,6 +360,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec { ScalarValue::try_from_array(batch.column(col), row).unwrap() }) .collect(), + range: None, }) }) .collect() diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 200349b5efb0..d7932b38f62e 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -32,7 +32,19 @@ pub use table::{ListingOptions, ListingTable, ListingTableConfig}; pub type PartitionedFileStream = Pin> + Send + Sync + 'static>>; +/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" +/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping +/// sections of a Parquet file in parallel. #[derive(Debug, Clone)] +pub struct FileRange { + /// Range start + pub start: i64, + /// Range end + pub end: i64, +} + +#[derive(Debug, Clone)] +/// A single file or part of a file that should be read, along with its schema, statistics /// A single file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. pub struct PartitionedFile { @@ -40,7 +52,8 @@ pub struct PartitionedFile { pub file_meta: FileMeta, /// Values of partition columns to be appended to each row pub partition_values: Vec, - // We may include row group range here for a more fine-grained parallel execution + /// An optional file range for a more fine-grained parallel execution + pub range: Option, } impl PartitionedFile { @@ -52,6 +65,19 @@ impl PartitionedFile { last_modified: None, }, partition_values: vec![], + range: None, + } + } + + /// Create a file range without metadata or partition + pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { + Self { + file_meta: FileMeta { + sized_file: SizedFile { path, size }, + last_modified: None, + }, + partition_values: vec![], + range: Some(FileRange { start, end }), } } } @@ -67,5 +93,6 @@ pub fn local_unpartitioned_file(file: String) -> PartitionedFile { PartitionedFile { file_meta: local::local_unpartitioned_file(file), partition_values: vec![], + range: None, } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 071a07fa0880..cfc99a71a29a 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -337,6 +337,14 @@ impl ParquetExecStream { file_metrics, )); } + if let Some(range) = &file.range { + assert!( + range.start >= 0 && range.end > 0 && range.end > range.start, + "invalid range specified: {:?}", + range + ); + opt = opt.with_range(range.start, range.end); + } let file_reader = SerializedFileReader::new_with_options( ChunkObjectReader(object_reader), @@ -649,6 +657,7 @@ mod tests { }; use super::*; + use crate::datasource::listing::FileRange; use crate::execution::options::CsvReadOptions; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::Float32Array; @@ -656,6 +665,7 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field}, }; + use datafusion_data_access::object_store::local; use datafusion_expr::{col, lit}; use futures::StreamExt; use parquet::{ @@ -1099,6 +1109,81 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_exec_with_range() -> Result<()> { + fn file_range(file: String, start: i64, end: i64) -> PartitionedFile { + PartitionedFile { + file_meta: local::local_unpartitioned_file(file), + partition_values: vec![], + range: Some(FileRange { start, end }), + } + } + + async fn assert_parquet_read( + file_groups: Vec>, + expected_row_num: Option, + task_ctx: Arc, + file_schema: SchemaRef, + ) -> Result<()> { + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups, + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + None, + ); + assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); + let results = parquet_exec.execute(0, task_ctx).await?.next().await; + + if let Some(expected_row_num) = expected_row_num { + let batch = results.unwrap()?; + assert_eq!(expected_row_num, batch.num_rows()); + } else { + assert!(results.is_none()); + } + + Ok(()) + } + + let session_ctx = SessionContext::new(); + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/alltypes_plain.parquet", testdata); + let file_schema = ParquetFormat::default() + .infer_schema(local_object_reader_stream(vec![filename.clone()])) + .await?; + + let group_empty = vec![vec![file_range(filename.clone(), 0, 5)]]; + let group_contain = vec![vec![file_range(filename.clone(), 5, i64::MAX)]]; + let group_all = vec![vec![ + file_range(filename.clone(), 0, 5), + file_range(filename.clone(), 5, i64::MAX), + ]]; + + assert_parquet_read( + group_empty, + None, + session_ctx.task_ctx(), + file_schema.clone(), + ) + .await?; + assert_parquet_read( + group_contain, + Some(8), + session_ctx.task_ctx(), + file_schema.clone(), + ) + .await?; + assert_parquet_read(group_all, Some(8), session_ctx.task_ctx(), file_schema) + .await?; + + Ok(()) + } + #[tokio::test] async fn parquet_exec_with_partition() -> Result<()> { let session_ctx = SessionContext::new(); @@ -1171,6 +1256,7 @@ mod tests { last_modified: None, }, partition_values: vec![], + range: None, }; let parquet_exec = ParquetExec::new(