Skip to content

Commit

Permalink
Range scan support for ParquetExec (#1990)
Browse files Browse the repository at this point in the history
* Filter parquet row groups by range as well

* fix

* WIP: case when expr works

* short-circuit case_when

* else

* only range part

* Update datafusion/core/src/datasource/listing/mod.rs

Co-authored-by: Andrew Lamb <[email protected]>

* test

* Update parquet.rs

* fix

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
yjshen and alamb authored Apr 14, 2022
1 parent 774b91b commit e7b08ed
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 3 deletions.
6 changes: 6 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,17 @@ message Statistics {
bool is_exact = 4;
}

message FileRange {
int64 start = 1;
int64 end = 2;
}

message PartitionedFile {
string path = 1;
uint64 size = 2;
uint64 last_modified_ns = 3;
repeated datafusion.ScalarValue partition_values = 4;
FileRange range = 5;
}

message CsvFormat {
Expand Down
14 changes: 13 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use chrono::{TimeZone, Utc};
use datafusion::datafusion_data_access::{
object_store::local::LocalFileSystem, FileMeta, SizedFile,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::FunctionRegistry;

Expand Down Expand Up @@ -301,6 +301,18 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
})
}
}

impl TryFrom<&protobuf::FileRange> for FileRange {
type Error = BallistaError;

fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
Ok(FileRange {
start: value.start,
end: value.end,
})
}
}
Expand Down
14 changes: 13 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion::physical_plan::{
Statistics,
};

use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::physical_plan::file_format::FileScanConfig;

use datafusion::physical_plan::expressions::{Count, Literal};
Expand Down Expand Up @@ -354,6 +354,18 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
})
}
}

impl TryFrom<&FileRange> for protobuf::FileRange {
type Error = BallistaError;

fn try_from(value: &FileRange) -> Result<Self, Self::Error> {
Ok(protobuf::FileRange {
start: value.start,
end: value.end,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub async fn pruned_partition_list(
Ok(PartitionedFile {
partition_values: vec![],
file_meta: f?,
range: None,
})
}),
));
Expand Down Expand Up @@ -217,6 +218,7 @@ pub async fn pruned_partition_list(
Ok(PartitionedFile {
partition_values,
file_meta,
range: None,
})
})
}
Expand Down Expand Up @@ -358,6 +360,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
ScalarValue::try_from_array(batch.column(col), row).unwrap()
})
.collect(),
range: None,
})
})
.collect()
Expand Down
29 changes: 28 additions & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,28 @@ pub use table::{ListingOptions, ListingTable, ListingTableConfig};
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;

/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
/// sections of a Parquet file in parallel.
#[derive(Debug, Clone)]
pub struct FileRange {
/// Range start
pub start: i64,
/// Range end
pub end: i64,
}

#[derive(Debug, Clone)]
/// A single file or part of a file that should be read, along with its schema, statistics
/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub file_meta: FileMeta,
/// Values of partition columns to be appended to each row
pub partition_values: Vec<ScalarValue>,
// We may include row group range here for a more fine-grained parallel execution
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
}

impl PartitionedFile {
Expand All @@ -52,6 +65,19 @@ impl PartitionedFile {
last_modified: None,
},
partition_values: vec![],
range: None,
}
}

/// Create a file range without metadata or partition
pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
Self {
file_meta: FileMeta {
sized_file: SizedFile { path, size },
last_modified: None,
},
partition_values: vec![],
range: Some(FileRange { start, end }),
}
}
}
Expand All @@ -67,5 +93,6 @@ pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
PartitionedFile {
file_meta: local::local_unpartitioned_file(file),
partition_values: vec![],
range: None,
}
}
86 changes: 86 additions & 0 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ impl ParquetExecStream {
file_metrics,
));
}
if let Some(range) = &file.range {
assert!(
range.start >= 0 && range.end > 0 && range.end > range.start,
"invalid range specified: {:?}",
range
);
opt = opt.with_range(range.start, range.end);
}

let file_reader = SerializedFileReader::new_with_options(
ChunkObjectReader(object_reader),
Expand Down Expand Up @@ -649,13 +657,15 @@ mod tests {
};

use super::*;
use crate::datasource::listing::FileRange;
use crate::execution::options::CsvReadOptions;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::Float32Array;
use arrow::{
array::{Int64Array, Int8Array, StringArray},
datatypes::{DataType, Field},
};
use datafusion_data_access::object_store::local;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use parquet::{
Expand Down Expand Up @@ -1099,6 +1109,81 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_exec_with_range() -> Result<()> {
fn file_range(file: String, start: i64, end: i64) -> PartitionedFile {
PartitionedFile {
file_meta: local::local_unpartitioned_file(file),
partition_values: vec![],
range: Some(FileRange { start, end }),
}
}

async fn assert_parquet_read(
file_groups: Vec<Vec<PartitionedFile>>,
expected_row_num: Option<usize>,
task_ctx: Arc<TaskContext>,
file_schema: SchemaRef,
) -> Result<()> {
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups,
file_schema,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
);
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
let results = parquet_exec.execute(0, task_ctx).await?.next().await;

if let Some(expected_row_num) = expected_row_num {
let batch = results.unwrap()?;
assert_eq!(expected_row_num, batch.num_rows());
} else {
assert!(results.is_none());
}

Ok(())
}

let session_ctx = SessionContext::new();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
let file_schema = ParquetFormat::default()
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await?;

let group_empty = vec![vec![file_range(filename.clone(), 0, 5)]];
let group_contain = vec![vec![file_range(filename.clone(), 5, i64::MAX)]];
let group_all = vec![vec![
file_range(filename.clone(), 0, 5),
file_range(filename.clone(), 5, i64::MAX),
]];

assert_parquet_read(
group_empty,
None,
session_ctx.task_ctx(),
file_schema.clone(),
)
.await?;
assert_parquet_read(
group_contain,
Some(8),
session_ctx.task_ctx(),
file_schema.clone(),
)
.await?;
assert_parquet_read(group_all, Some(8), session_ctx.task_ctx(), file_schema)
.await?;

Ok(())
}

#[tokio::test]
async fn parquet_exec_with_partition() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down Expand Up @@ -1171,6 +1256,7 @@ mod tests {
last_modified: None,
},
partition_values: vec![],
range: None,
};

let parquet_exec = ParquetExec::new(
Expand Down

0 comments on commit e7b08ed

Please sign in to comment.