Skip to content

Commit

Permalink
Merge branch 'main' into cse-numeric-aliases
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Jun 14, 2024
2 parents f76087c + cc60278 commit 476953d
Show file tree
Hide file tree
Showing 93 changed files with 1,726 additions and 2,197 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.73"
rust-version = "1.75"
version = "39.0.0"

[workspace.dependencies]
Expand Down Expand Up @@ -107,7 +107,7 @@ doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
hashbrown = { version = "0.14", features = ["raw"] }
hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.73"
rust-version = "1.75"
readme = "README.md"

[dependencies]
Expand Down
14 changes: 7 additions & 7 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)),
offsets,
arr,
None,
Expand All @@ -366,7 +366,7 @@ pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
pub fn array_into_large_list_array(arr: ArrayRef) -> LargeListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
LargeListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)),
offsets,
arr,
None,
Expand All @@ -379,7 +379,7 @@ pub fn array_into_fixed_size_list_array(
) -> FixedSizeListArray {
let list_size = list_size as i32;
FixedSizeListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)),
list_size,
arr,
None,
Expand Down Expand Up @@ -420,7 +420,7 @@ pub fn arrays_into_list_array(
let data_type = arr[0].data_type().to_owned();
let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(ListArray::new(
Arc::new(Field::new("item", data_type, true)),
Arc::new(Field::new_list_field(data_type, true)),
OffsetBuffer::from_lengths(lens),
arrow::compute::concat(values.as_slice())?,
None,
Expand All @@ -435,7 +435,7 @@ pub fn arrays_into_list_array(
/// use datafusion_common::utils::base_type;
/// use std::sync::Arc;
///
/// let data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
/// assert_eq!(base_type(&data_type), DataType::Int32);
///
/// let data_type = DataType::Int32;
Expand All @@ -458,10 +458,10 @@ pub fn base_type(data_type: &DataType) -> DataType {
/// use datafusion_common::utils::coerced_type_with_base_type_only;
/// use std::sync::Arc;
///
/// let data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
/// let base_type = DataType::Float64;
/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type);
/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new("item", DataType::Float64, true))));
/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))));
pub fn coerced_type_with_base_type_only(
data_type: &DataType,
base_type: &DataType,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.73"
rust-version = "1.75"

[lints]
workspace = true
Expand Down
13 changes: 5 additions & 8 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ use datafusion_common::{
};
use datafusion_expr::lit;
use datafusion_expr::{
avg, count, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
avg, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
UNNAMED_TABLE,
};
use datafusion_expr::{case, is_null};
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_functions_aggregate::expr_fn::{median, stddev};
use datafusion_functions_aggregate::expr_fn::{count, median, stddev, sum};

use async_trait::async_trait;

Expand Down Expand Up @@ -854,10 +853,7 @@ impl DataFrame {
/// ```
pub async fn count(self) -> Result<usize> {
let rows = self
.aggregate(
vec![],
vec![datafusion_expr::count(Expr::Literal(COUNT_STAR_EXPANSION))],
)?
.aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?
.collect()
.await?;
let len = *rows
Expand Down Expand Up @@ -1594,9 +1590,10 @@ mod tests {
use datafusion_common::{Constraint, Constraints};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
array_agg, cast, count_distinct, create_udf, expr, lit, BuiltInWindowFunction,
array_agg, cast, create_udf, expr, lit, BuiltInWindowFunction,
ScalarFunctionImplementation, Volatility, WindowFrame, WindowFunctionDefinition,
};
use datafusion_functions_aggregate::expr_fn::count_distinct;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

Expand Down
73 changes: 73 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,26 @@ async fn fetch_schema(
}

/// Read and parse the statistics of the Parquet file at location `path`
///
/// See [`statistics_from_parquet_meta`] for more details
async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Statistics> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
statistics_from_parquet_meta(&metadata, table_schema).await
}

/// Convert statistics in [`ParquetMetaData`] into [`Statistics`]
///
/// The statistics are calculated for each column in the table schema
/// using the row group statistics in the parquet metadata.
pub async fn statistics_from_parquet_meta(
metadata: &ParquetMetaData,
table_schema: SchemaRef,
) -> Result<Statistics> {
let file_metadata = metadata.file_metadata();

let file_schema = parquet_to_arrow_schema(
Expand Down Expand Up @@ -1402,6 +1415,66 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_statistics_from_parquet_metadata() -> Result<()> {
// Data for column c1: ["Foo", null, "bar"]
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();

// Data for column c2: [1, 2, null]
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();

// Use store_parquet to write each batch to its own file
// . batch1 written into first file and includes:
// - column c1 that has 3 rows with one null. Stats min and max of string column is missing for this test even the column has values
// . batch2 written into second file and includes:
// - column c2 that has 3 rows with one null. Stats min and max of int are avaialble and 1 and 2 respectively
let store = Arc::new(LocalFileSystem::new()) as _;
let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?;

let state = SessionContext::new().state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&state, &store, &files).await.unwrap();

let null_i64 = ScalarValue::Int64(None);
let null_utf8 = ScalarValue::Utf8(None);

// Fetch statistics for first file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
//
assert_eq!(stats.num_rows, Precision::Exact(3));
// column c1
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c1_stats.max_value, Precision::Absent);
assert_eq!(c1_stats.min_value, Precision::Absent);
// column c2: missing from the file so the table treats all 3 rows as null
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(3));
assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone()));
assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));

// Fetch statistics for second file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
assert_eq!(stats.num_rows, Precision::Exact(3));
// column c1: missing from the file so the table treats all 3 rows as null
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(3));
assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone()));
assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone()));
// column c2
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(1));
assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into()));
assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into()));

Ok(())
}

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
Expand Down
49 changes: 39 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,20 +547,49 @@ impl ListingOptions {
}
}

/// Reads data from one or more files via an
/// [`ObjectStore`]. For example, from
/// local files or objects from AWS S3. Implements [`TableProvider`],
/// a DataFusion data source.
/// Reads data from one or more files as a single table.
///
/// # Features
/// Implements [`TableProvider`], a DataFusion data source. The files are read
/// using an [`ObjectStore`] instance, for example from local files or objects
/// from AWS S3.
///
/// 1. Merges schemas if the files have compatible but not identical schemas
/// For example, given the `table1` directory (or object store prefix)
///
/// 2. Hive-style partitioning support, where a path such as
/// `/files/date=1/1/2022/data.parquet` is injected as a `date` column.
/// ```text
/// table1
/// ├── file1.parquet
/// └── file2.parquet
/// ```
///
/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
/// a single table, merging the schemas if the files have compatible but not
/// identical schemas.
///
/// Given the `table2` directory (or object store prefix)
///
/// ```text
/// table2
/// ├── date=2024-06-01
/// │ ├── file3.parquet
/// │ └── file4.parquet
/// └── date=2024-06-02
/// └── file5.parquet
/// ```
///
/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
/// `file5.parquet` as a single table, again merging schemas if necessary.
///
/// Given the hive style partitioning structure (e.g,. directories named
/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
/// column when reading the table:
/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
///
/// If the query has a predicate like `WHERE date = '2024-06-01'`
/// only the corresponding directory will be read.
///
/// 3. Projection pushdown for formats that support it such as such as
/// Parquet
/// `ListingTable` also supports filter and projection pushdown for formats that
/// support it as such as Parquet.
///
/// # Example
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ mod test {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
// select / skip all 20 rows in row group 1
// specifies all 20 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
Expand Down Expand Up @@ -463,7 +463,7 @@ mod test {
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 12 rows, but row group 1 has 20
// specify only 12 rows in selection, but row group 1 has 20
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
),
Expand All @@ -484,7 +484,7 @@ mod test {
fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 22 rows, but row group 1 has only 20
// specify 22 rows in selection, but row group 1 has only 20
RowGroupAccess::Selection(
vec![
RowSelector::select(10),
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ pub use writer::plan_to_parquet;
/// used to implement external indexes on top of parquet files and select only
/// portions of the files.
///
/// The `ParquetExec` will try and further reduce any provided
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and
/// other settings.
/// The `ParquetExec` will try and reduce any provided `ParquetAccessPlan`
/// further based on the contents of `ParquetMetadata` and other settings.
///
/// ## Example of providing a ParquetAccessPlan
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ fn create_initial_plan(

// check row group count matches the plan
return Ok(access_plan.clone());
} else {
debug!("ParquetExec Ignoring unknown extension specified for {file_name}");
}
}

Expand Down
Loading

0 comments on commit 476953d

Please sign in to comment.