Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to only scan part of a parquet file in a partition #1990

Merged
merged 16 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,17 @@ message Statistics {
bool is_exact = 4;
}

message FileRange {
int64 start = 1;
int64 end = 2;
}

message PartitionedFile {
string path = 1;
uint64 size = 2;
uint64 last_modified_ns = 3;
repeated datafusion.ScalarValue partition_values = 4;
FileRange range = 5;
}

message CsvFormat {
Expand Down
14 changes: 13 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use chrono::{TimeZone, Utc};
use datafusion::datafusion_data_access::{
object_store::local::LocalFileSystem, FileMeta, SizedFile,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::FunctionRegistry;

Expand Down Expand Up @@ -301,6 +301,18 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
})
}
}

impl TryFrom<&protobuf::FileRange> for FileRange {
type Error = BallistaError;

fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
Ok(FileRange {
start: value.start,
end: value.end,
})
}
}
Expand Down
14 changes: 13 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion::physical_plan::{
Statistics,
};

use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::physical_plan::file_format::FileScanConfig;

use datafusion::physical_plan::expressions::{Count, Literal};
Expand Down Expand Up @@ -354,6 +354,18 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
})
}
}

impl TryFrom<&FileRange> for protobuf::FileRange {
type Error = BallistaError;

fn try_from(value: &FileRange) -> Result<Self, Self::Error> {
Ok(protobuf::FileRange {
start: value.start,
end: value.end,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub async fn pruned_partition_list(
Ok(PartitionedFile {
partition_values: vec![],
file_meta: f?,
range: None,
})
}),
));
Expand Down Expand Up @@ -217,6 +218,7 @@ pub async fn pruned_partition_list(
Ok(PartitionedFile {
partition_values,
file_meta,
range: None,
})
})
}
Expand Down Expand Up @@ -358,6 +360,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
ScalarValue::try_from_array(batch.column(col), row).unwrap()
})
.collect(),
range: None,
})
})
.collect()
Expand Down
29 changes: 28 additions & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,28 @@ pub use table::{ListingOptions, ListingTable, ListingTableConfig};
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;

/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
/// sections of a Parquet file in parallel.
#[derive(Debug, Clone)]
pub struct FileRange {
/// Range start
pub start: i64,
/// Range end
pub end: i64,
}

#[derive(Debug, Clone)]
/// A single file or part of a file that should be read, along with its schema, statistics
/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub file_meta: FileMeta,
/// Values of partition columns to be appended to each row
pub partition_values: Vec<ScalarValue>,
// We may include row group range here for a more fine-grained parallel execution
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
}

impl PartitionedFile {
Expand All @@ -52,6 +65,19 @@ impl PartitionedFile {
last_modified: None,
},
partition_values: vec![],
range: None,
}
}

/// Create a file range without metadata or partition
pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
Self {
file_meta: FileMeta {
sized_file: SizedFile { path, size },
last_modified: None,
},
partition_values: vec![],
range: Some(FileRange { start, end }),
}
}
}
Expand All @@ -67,5 +93,6 @@ pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
PartitionedFile {
file_meta: local::local_unpartitioned_file(file),
partition_values: vec![],
range: None,
}
}
86 changes: 86 additions & 0 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ impl ParquetExecStream {
file_metrics,
));
}
if let Some(range) = &file.range {
assert!(
range.start >= 0 && range.end > 0 && range.end > range.start,
"invalid range specified: {:?}",
range
);
opt = opt.with_range(range.start, range.end);
}

let file_reader = SerializedFileReader::new_with_options(
ChunkObjectReader(object_reader),
Expand Down Expand Up @@ -649,13 +657,15 @@ mod tests {
};

use super::*;
use crate::datasource::listing::FileRange;
use crate::execution::options::CsvReadOptions;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::Float32Array;
use arrow::{
array::{Int64Array, Int8Array, StringArray},
datatypes::{DataType, Field},
};
use datafusion_data_access::object_store::local;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use parquet::{
Expand Down Expand Up @@ -1099,6 +1109,81 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_exec_with_range() -> Result<()> {
fn file_range(file: String, start: i64, end: i64) -> PartitionedFile {
PartitionedFile {
file_meta: local::local_unpartitioned_file(file),
partition_values: vec![],
range: Some(FileRange { start, end }),
}
}

async fn assert_parquet_read(
file_groups: Vec<Vec<PartitionedFile>>,
expected_row_num: Option<usize>,
task_ctx: Arc<TaskContext>,
file_schema: SchemaRef,
) -> Result<()> {
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups,
file_schema,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
);
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
let results = parquet_exec.execute(0, task_ctx).await?.next().await;

if let Some(expected_row_num) = expected_row_num {
let batch = results.unwrap()?;
assert_eq!(expected_row_num, batch.num_rows());
} else {
assert!(results.is_none());
}

Ok(())
}

let session_ctx = SessionContext::new();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
let file_schema = ParquetFormat::default()
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await?;

let group_empty = vec![vec![file_range(filename.clone(), 0, 5)]];
let group_contain = vec![vec![file_range(filename.clone(), 5, i64::MAX)]];
let group_all = vec![vec![
file_range(filename.clone(), 0, 5),
file_range(filename.clone(), 5, i64::MAX),
]];

assert_parquet_read(
group_empty,
None,
session_ctx.task_ctx(),
file_schema.clone(),
)
.await?;
assert_parquet_read(
group_contain,
Some(8),
session_ctx.task_ctx(),
file_schema.clone(),
)
.await?;
assert_parquet_read(group_all, Some(8), session_ctx.task_ctx(), file_schema)
.await?;

Ok(())
}

#[tokio::test]
async fn parquet_exec_with_partition() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down Expand Up @@ -1171,6 +1256,7 @@ mod tests {
last_modified: None,
},
partition_values: vec![],
range: None,
};

let parquet_exec = ParquetExec::new(
Expand Down