Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/API_for_decorre…
Browse files Browse the repository at this point in the history
…late
  • Loading branch information
alamb committed May 17, 2024
2 parents e2ca5a7 + 32b63ff commit 8bb9fd2
Show file tree
Hide file tree
Showing 77 changed files with 1,756 additions and 2,030 deletions.
15 changes: 5 additions & 10 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{cast::as_float64_array, ScalarValue};
use datafusion_expr::{
function::AccumulatorArgs, Accumulator, AggregateUDF, AggregateUDFImpl,
GroupsAccumulator, Signature,
function::{AccumulatorArgs, StateFieldsArgs},
Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature,
};

/// This example shows how to use the full AggregateUDFImpl API to implement a user
Expand Down Expand Up @@ -92,21 +92,16 @@ impl AggregateUDFImpl for GeoMeanUdaf {
}

/// This is the description of the state. accumulator's state() must match the types here.
fn state_fields(
&self,
_name: &str,
value_type: DataType,
_ordering_fields: Vec<arrow_schema::Field>,
) -> Result<Vec<arrow_schema::Field>> {
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<arrow_schema::Field>> {
Ok(vec![
Field::new("prod", value_type, true),
Field::new("prod", args.return_type.clone(), true),
Field::new("n", DataType::UInt32, true),
])
}

/// Tell DataFusion that this aggregate supports the more performant `GroupsAccumulator`
/// which is used for cases when there are grouping columns in the query
fn groups_accumulator_supported(&self) -> bool {
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}

Expand Down
11 changes: 3 additions & 8 deletions datafusion-examples/examples/simplify_udaf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow_schema::{Field, Schema};
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use datafusion_expr::function::AggregateFunctionSimplification;
use datafusion_expr::function::{AggregateFunctionSimplification, StateFieldsArgs};
use datafusion_expr::simplify::SimplifyInfo;

use std::{any::Any, sync::Arc};
Expand Down Expand Up @@ -70,16 +70,11 @@ impl AggregateUDFImpl for BetterAvgUdaf {
unimplemented!("should not be invoked")
}

fn state_fields(
&self,
_name: &str,
_value_type: DataType,
_ordering_fields: Vec<arrow_schema::Field>,
) -> Result<Vec<arrow_schema::Field>> {
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<arrow_schema::Field>> {
unimplemented!("should not be invoked")
}

fn groups_accumulator_supported(&self) -> bool {
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}

Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::IpcWriteOptions;
use arrow::ipc::{root_as_message, CompressionType};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics};
use datafusion_common::{not_impl_err, DataFusionError, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
Expand Down Expand Up @@ -136,10 +136,6 @@ impl FileFormat for ArrowFormat {
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
FileType::ARROW
}
}

/// Implements [`DataSink`] for writing to arrow_ipc files
Expand Down
5 changes: 0 additions & 5 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::FileType;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

Expand Down Expand Up @@ -89,10 +88,6 @@ impl FileFormat for AvroFormat {
let exec = AvroExec::new(conf);
Ok(Arc::new(exec))
}

fn file_type(&self) -> FileType {
FileType::AVRO
}
}

#[cfg(test)]
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use arrow::datatypes::SchemaRef;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use datafusion_common::config::CsvOptions;
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_common::{exec_err, not_impl_err, DataFusionError};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -280,10 +280,6 @@ impl FileFormat for CsvFormat {
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
FileType::CSV
}
}

impl CsvFormat {
Expand Down Expand Up @@ -549,8 +545,9 @@ mod tests {

use arrow::compute::concat_batches;
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, GetExt};
use datafusion_common::{FileType, GetExt};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::{col, lit};

Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow_array::RecordBatch;
use datafusion_common::config::JsonOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, FileType};
use datafusion_common::not_impl_err;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -184,10 +184,6 @@ impl FileFormat for JsonFormat {
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
FileType::JSON
}
}

impl Default for JsonSerializer {
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use datafusion_common::{not_impl_err, FileType};
use datafusion_common::not_impl_err;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use async_trait::async_trait;
Expand Down Expand Up @@ -104,9 +104,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}

/// Returns the FileType corresponding to this FileFormat
fn file_type(&self) -> FileType;
}

#[cfg(test)]
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_common::config::TableParquetOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{
exec_err, internal_datafusion_err, not_impl_err, DataFusionError, FileType,
exec_err, internal_datafusion_err, not_impl_err, DataFusionError,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -286,10 +286,6 @@ impl FileFormat for ParquetFormat {
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
FileType::PARQUET
}
}

fn summarize_min_max(
Expand Down
79 changes: 52 additions & 27 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl ExecutionPlan for NdJsonExec {
target_partitions: usize,
config: &datafusion_common::config::ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if self.file_compression_type == FileCompressionType::GZIP {
if self.file_compression_type.is_compressed() {
return Ok(None);
}
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
Expand Down Expand Up @@ -383,7 +383,6 @@ mod tests {
use std::path::Path;

use super::*;
use crate::assert_batches_eq;
use crate::dataframe::DataFrameWriteOptions;
use crate::datasource::file_format::file_compression_type::FileTypeExt;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
Expand All @@ -393,18 +392,15 @@ mod tests {
CsvReadOptions, NdJsonReadOptions, SessionConfig, SessionContext,
};
use crate::test::partitioned_file_groups;
use crate::{assert_batches_eq, assert_batches_sorted_eq};

use arrow::array::Array;
use arrow::datatypes::{Field, SchemaBuilder};
use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array};
use datafusion_common::FileType;
use flate2::write::GzEncoder;
use flate2::Compression;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
use rstest::*;
use std::fs::File;
use std::io;
use tempfile::TempDir;
use url::Url;

Expand Down Expand Up @@ -892,36 +888,65 @@ mod tests {

Ok(())
}
fn compress_file(path: &str, output_path: &str) -> io::Result<()> {
let input_file = File::open(path)?;
let mut reader = BufReader::new(input_file);

let output_file = File::create(output_path)?;
let writer = std::io::BufWriter::new(output_file);

let mut encoder = GzEncoder::new(writer, Compression::default());
io::copy(&mut reader, &mut encoder)?;

encoder.finish()?;
Ok(())
}
#[rstest(
file_compression_type,
case::uncompressed(FileCompressionType::UNCOMPRESSED),
case::gzip(FileCompressionType::GZIP),
case::bzip2(FileCompressionType::BZIP2),
case::xz(FileCompressionType::XZ),
case::zstd(FileCompressionType::ZSTD)
)]
#[cfg(feature = "compression")]
#[tokio::test]
async fn test_disable_parallel_for_json_gz() -> Result<()> {
async fn test_json_with_repartitioing(
file_compression_type: FileCompressionType,
) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
let path = format!("{TEST_DATA_BASE}/1.json");
let compressed_path = format!("{}.gz", &path);
compress_file(&path, &compressed_path)?;

let tmp_dir = TempDir::new()?;
let (store_url, file_groups, _) =
prepare_store(&ctx.state(), file_compression_type, tmp_dir.path()).await;

// It's important to have less than `target_partitions` amount of file groups, to
// trigger repartitioning.
assert_eq!(
file_groups.len(),
1,
"Expected prepared store with single file group"
);

let path = file_groups
.first()
.unwrap()
.first()
.unwrap()
.object_meta
.location
.as_ref();

let url: &Url = store_url.as_ref();
let path_buf = Path::new(url.path()).join(path);
let path = path_buf.to_str().unwrap();
let ext = FileType::JSON
.get_ext_with_compression(file_compression_type.to_owned())
.unwrap();

let read_option = NdJsonReadOptions::default()
.file_compression_type(FileCompressionType::GZIP)
.file_extension("gz");
let df = ctx.read_json(compressed_path.clone(), read_option).await?;
.file_compression_type(file_compression_type)
.file_extension(ext.as_str());

let df = ctx.read_json(path, read_option).await?;
let res = df.collect().await;
fs::remove_file(&compressed_path)?;
assert_batches_eq!(

// Output sort order is nondeterministic due to multiple
// target partitions. To handle it, assert compares sorted
// result.
assert_batches_sorted_eq!(
&[
"+-----+------------------+---------------+------+",
"| a | b | c | d |",
Expand Down
Loading

0 comments on commit 8bb9fd2

Please sign in to comment.