Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to only scan part of a parquet file in a partition #1990

Merged
merged 16 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,17 @@ message Statistics {
bool is_exact = 4;
}

message FileRange {
int64 start = 1;
int64 end = 2;
}

message PartitionedFile {
string path = 1;
uint64 size = 2;
uint64 last_modified_ns = 3;
repeated datafusion.ScalarValue partition_values = 4;
FileRange range = 5;
}

message CsvFormat {
Expand Down
14 changes: 13 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use chrono::{TimeZone, Utc};
use datafusion::datafusion_data_access::{
object_store::local::LocalFileSystem, FileMeta, SizedFile,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::FunctionRegistry;

Expand Down Expand Up @@ -301,6 +301,18 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
})
}
}

impl TryFrom<&protobuf::FileRange> for FileRange {
type Error = BallistaError;

fn try_from(value: &protobuf::FileRange) -> Result<Self, Self::Error> {
Ok(FileRange {
start: value.start,
end: value.end,
})
}
}
Expand Down
14 changes: 13 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion::physical_plan::{
Statistics,
};

use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::physical_plan::file_format::FileScanConfig;

use datafusion::physical_plan::expressions::{Count, Literal};
Expand Down Expand Up @@ -354,6 +354,18 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
})
}
}

impl TryFrom<&FileRange> for protobuf::FileRange {
type Error = BallistaError;

fn try_from(value: &FileRange) -> Result<Self, Self::Error> {
Ok(protobuf::FileRange {
start: value.start,
end: value.end,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub async fn pruned_partition_list(
Ok(PartitionedFile {
partition_values: vec![],
file_meta: f?,
range: None,
})
}),
));
Expand Down Expand Up @@ -217,6 +218,7 @@ pub async fn pruned_partition_list(
Ok(PartitionedFile {
partition_values,
file_meta,
range: None,
})
})
}
Expand Down Expand Up @@ -358,6 +360,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec<PartitionedFile> {
ScalarValue::try_from_array(batch.column(col), row).unwrap()
})
.collect(),
range: None,
})
})
.collect()
Expand Down
27 changes: 26 additions & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,26 @@ pub use table::{ListingOptions, ListingTable, ListingTableConfig};
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;

/// File part identified by [start, end) positions.
yjshen marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug, Clone)]
pub struct FileRange {
/// Range start
pub start: i64,
/// Range end
pub end: i64,
}

#[derive(Debug, Clone)]
/// A single file or part of a file that should be read, along with its schema, statistics
/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub file_meta: FileMeta,
/// Values of partition columns to be appended to each row
pub partition_values: Vec<ScalarValue>,
// We may include row group range here for a more fine-grained parallel execution
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
}

impl PartitionedFile {
Expand All @@ -52,6 +63,19 @@ impl PartitionedFile {
last_modified: None,
},
partition_values: vec![],
range: None,
}
}

/// Create a file range without metadata or partition
pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
Self {
file_meta: FileMeta {
sized_file: SizedFile { path, size },
last_modified: None,
},
partition_values: vec![],
range: Some(FileRange { start, end }),
}
}
}
Expand All @@ -67,5 +91,6 @@ pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
PartitionedFile {
file_meta: local::local_unpartitioned_file(file),
partition_values: vec![],
range: None,
}
}
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,9 @@ fn read_partition(
file_metrics,
));
}
if let Some(range) = &partitioned_file.range {
opt = opt.with_range(range.start, range.end);
yjshen marked this conversation as resolved.
Show resolved Hide resolved
}

let file_reader = SerializedFileReader::new_with_options(
ChunkObjectReader(object_reader),
Expand Down Expand Up @@ -1072,6 +1075,7 @@ mod tests {
last_modified: None,
},
partition_values: vec![],
range: None,
};

let parquet_exec = ParquetExec::new(
Expand Down