Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/external_parque…
Browse files Browse the repository at this point in the history
…t_index
  • Loading branch information
alamb committed May 29, 2024
2 parents 8d5237e + 0905426 commit 230d785
Show file tree
Hide file tree
Showing 99 changed files with 22,605 additions and 19,733 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ members = [
"datafusion/physical-plan",
"datafusion/proto",
"datafusion/proto/gen",
"datafusion/proto-common",
"datafusion/proto-common/gen",
"datafusion/sql",
"datafusion/sqllogictest",
"datafusion/substrait",
Expand Down Expand Up @@ -89,6 +91,7 @@ datafusion-physical-expr = { path = "datafusion/physical-expr", version = "38.0.
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "38.0.0", default-features = false }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "38.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "38.0.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "38.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "38.0.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "38.0.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "38.0.0" }
Expand Down
12 changes: 8 additions & 4 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use arrow::{
compute::kernels::cast::{cast_with_options, CastOptions},
datatypes::{
i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType,
Date32Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
Date32Type, Date64Type, Field, Float32Type, Int16Type, Int32Type, Int64Type,
Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
IntervalYearMonthType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION,
Expand Down Expand Up @@ -3179,8 +3179,12 @@ impl fmt::Display for ScalarValue {
ScalarValue::List(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::LargeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::FixedSizeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::Date32(e) => format_option!(f, e)?,
ScalarValue::Date64(e) => format_option!(f, e)?,
ScalarValue::Date32(e) => {
format_option!(f, e.map(|v| Date32Type::to_naive_date(v).to_string()))?
}
ScalarValue::Date64(e) => {
format_option!(f, e.map(|v| Date64Type::to_naive_date(v).to_string()))?
}
ScalarValue::Time32Second(e) => format_option!(f, e)?,
ScalarValue::Time32Millisecond(e) => format_option!(f, e)?,
ScalarValue::Time64Microsecond(e) => format_option!(f, e)?,
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,7 @@ mod tests {
let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionState::new_with_config_rt(cfg, runtime);

let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();

let path = Path::from("csv/aggregate_test_100.csv");
let csv = CsvFormat::default().with_has_header(true);
let records_to_read = csv.options().schema_infer_max_rec;
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::datasource::TableProvider;
use crate::execution::context::SessionState;

use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::Result;
use datafusion_common::{arrow_datafusion_err, DataFusionError, FileType};
use datafusion_expr::CreateExternalTable;

Expand All @@ -56,13 +57,14 @@ impl TableProviderFactory for ListingTableFactory {
&self,
state: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
let mut table_options = state.default_table_options();
) -> Result<Arc<dyn TableProvider>> {
let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| {
DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type))
})?;
let mut table_options = state.default_table_options();
table_options.set_file_format(file_type.clone());
table_options.alter_with_string_hash_map(&cmd.options)?;

let file_extension = get_extension(cmd.location.as_str());
let file_format: Arc<dyn FileFormat> = match file_type {
FileType::CSV => {
Expand Down
127 changes: 114 additions & 13 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,84 @@ use crate::datasource::schema_adapter::{
pub use metrics::ParquetFileMetrics;
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for scanning one or more Parquet partitions
/// Execution plan for reading one or more Parquet files.
///
/// ```text
/// ▲
/// │
/// │ Produce a stream of
/// │ RecordBatches
/// │
/// ┌───────────────────────┐
/// │ │
/// │ ParquetExec │
/// │ │
/// └───────────────────────┘
/// ▲
/// │ Asynchronously read from one
/// │ or more parquet files via
/// │ ObjectStore interface
/// │
/// │
/// .───────────────────.
/// │ )
/// │`───────────────────'│
/// │ ObjectStore │
/// │.───────────────────.│
/// │ )
/// `───────────────────'
///
/// ```
/// # Features
///
/// Supports the following optimizations:
///
/// * Concurrent reads: Can read from one or more files in parallel as multiple
/// partitions, including concurrently reading multiple row groups from a single
/// file.
///
/// * Predicate push down: skips row groups and pages based on
/// min/max/null_counts in the row group metadata, the page index and bloom
/// filters.
///
/// * Projection pushdown: reads and decodes only the columns required.
///
/// * Limit pushdown: stop execution early after some number of rows are read.
///
/// * Custom readers: customize reading parquet files, e.g. to cache metadata,
/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
/// details.
///
/// * Schema adapters: read parquet files with different schemas into a unified
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
/// custom reader is used, it supplies the metadata directly and this parameter
/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more details.
///
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
/// configured to open parquet files with a [`ParquetOpener`].
///
/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the file metadata via
/// [`ParquetFileReaderFactory`] and applies any predicates
/// and projections to determine what pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
///
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
#[derive(Debug, Clone)]
pub struct ParquetExec {
/// Base configuration for this scan
Expand All @@ -86,9 +163,9 @@ pub struct ParquetExec {
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
predicate: Option<Arc<dyn PhysicalExpr>>,
/// Optional predicate for pruning row groups
/// Optional predicate for pruning row groups (derived from `predicate`)
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional predicate for pruning pages
/// Optional predicate for pruning pages (derived from `predicate`)
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
Expand Down Expand Up @@ -190,11 +267,13 @@ impl ParquetExec {

/// Optional user defined parquet file reader factory.
///
/// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom
/// implementation for data access operations.
/// You can use [`ParquetFileReaderFactory`] to more precisely control how
/// data is read from parquet files (e.g. skip re-reading metadata, coalesce
/// I/O operations, etc).
///
/// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed
/// to this factory instead of `ObjectStore`.
/// The default reader factory reads directly from an [`ObjectStore`]
/// instance using individual I/O operations for the footer and then for
/// each page.
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
Expand Down Expand Up @@ -643,11 +722,21 @@ fn should_enable_page_index(
.unwrap_or(false)
}

/// Factory of parquet file readers.
/// Interface for reading parquet files.
///
/// Provides means to implement custom data access interface.
/// The combined implementations of [`ParquetFileReaderFactory`] and
/// [`AsyncFileReader`] can be used to provide custom data access operations
/// such as pre-cached data, I/O coalescing, etc.
///
/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
/// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
/// Provides an `AsyncFileReader` for reading data from a parquet file specified
///
/// # Arguments
/// * partition_index - Index of the partition (for reporting metrics)
/// * file_meta - The file to be read
/// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer
/// * metrics - Execution metrics
fn create_reader(
&self,
partition_index: usize,
Expand All @@ -657,20 +746,32 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
) -> Result<Box<dyn AsyncFileReader + Send>>;
}

/// Default parquet reader factory.
/// Default implementation of [`ParquetFileReaderFactory`]
///
/// This implementation:
/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
/// 2. Reads the footer and page metadata on demand.
/// 3. Does not cache metadata or coalesce I/O operations.
#[derive(Debug)]
pub struct DefaultParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
}

impl DefaultParquetFileReaderFactory {
/// Create a factory.
/// Create a new `DefaultParquetFileReaderFactory`.
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
/// Implements [`AsyncFileReader`] for a parquet file in object storage.
///
/// This implementation uses the [`ParquetObjectReader`] to read data from the
/// object store on demand, as required, tracking the number of bytes read.
///
/// This implementation does not coalesce I/O operations or cache bytes. Such
/// optimizations can be done either at the object store level or by providing a
/// custom implementation of [`ParquetFileReaderFactory`].
pub(crate) struct ParquetFileReader {
file_metrics: ParquetFileMetrics,
inner: ParquetObjectReader,
Expand Down
28 changes: 17 additions & 11 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

//! Schema Adapter provides a method of translating the RecordBatches that come out of the
//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
//!
//! Adapter provides a method of translating the RecordBatches that come out of the
//! physical format into how they should be used by DataFusion. For instance, a schema
//! can be stored external to a parquet file that maps parquet logical types to arrow types.
Expand All @@ -26,35 +28,38 @@ use datafusion_common::plan_err;
use std::fmt::Debug;
use std::sync::Arc;

/// Factory of schema adapters.
/// Factory for creating [`SchemaAdapter`]
///
/// Provides means to implement custom schema adaptation.
/// This interface provides a way to implement custom schema adaptation logic
/// for ParquetExec (for example, to fill missing columns with default value
/// other than null)
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter`.
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
/// 1. Before reading the file, we have to map projected column indexes from the
/// table schema to the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
/// 2. After reading a record batch map the read columns back to the expected
/// columns indexes and insert null-valued columns wherever the file schema was
/// missing a column present in the table schema.
pub trait SchemaAdapter: Send + Sync {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
Expand All @@ -68,7 +73,8 @@ pub trait SchemaAdapter: Send + Sync {
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
}

/// Transforms a RecordBatch from the physical layer to a RecordBatch that meets the table schema.
/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
pub trait SchemaMapper: Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/expr_api/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ fn select_date_plus_interval() -> Result<()> {

// Note that constant folder runs and folds the entire
// expression down to a single constant (true)
let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
let expected = r#"Projection: Date32("2021-01-09") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
TableScan: test"#;
let actual = get_optimized_plan_formatted(plan, &time);

Expand Down
Loading

0 comments on commit 230d785

Please sign in to comment.