Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/upgrade_to_arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 21, 2024
2 parents 794eb38 + ede665b commit 5e0ea62
Show file tree
Hide file tree
Showing 26 changed files with 1,017 additions and 301 deletions.
16 changes: 13 additions & 3 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async-compression = { version = "0.4.0", features = [
], optional = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { version = "0.4.3", optional = true }
bzip2 = { version = "0.5.0", optional = true }
chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-catalog = { workspace = true }
Expand Down
67 changes: 41 additions & 26 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow_array::builder::BooleanBuilder;
use arrow_array::builder::{BooleanBuilder, UInt8Builder};
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -247,6 +247,7 @@ impl InformationSchemaConfig {
return_type,
"SCALAR",
udf.documentation().map(|d| d.description.to_string()),
udf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -266,6 +267,7 @@ impl InformationSchemaConfig {
return_type,
"AGGREGATE",
udaf.documentation().map(|d| d.description.to_string()),
udaf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -285,6 +287,7 @@ impl InformationSchemaConfig {
return_type,
"WINDOW",
udwf.documentation().map(|d| d.description.to_string()),
udwf.documentation().map(|d| d.syntax_example.to_string()),
)
}
}
Expand All @@ -308,7 +311,8 @@ impl InformationSchemaConfig {
args: Option<&Vec<(String, String)>>,
arg_types: Vec<String>,
return_type: Option<String>,
is_variadic: bool| {
is_variadic: bool,
rid: u8| {
for (position, type_name) in arg_types.iter().enumerate() {
let param_name =
args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
Expand All @@ -322,6 +326,7 @@ impl InformationSchemaConfig {
type_name,
None::<&str>,
is_variadic,
rid,
);
}
if let Some(return_type) = return_type {
Expand All @@ -335,48 +340,52 @@ impl InformationSchemaConfig {
return_type.as_str(),
None::<&str>,
false,
rid,
);
}
};

for (func_name, udf) in udfs {
let args = udf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udf_args_and_return_types(udf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udf.signature()),
rid as u8,
);
}
}

for (func_name, udaf) in udafs {
let args = udaf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udaf_args_and_return_types(udaf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udaf.signature()),
rid as u8,
);
}
}

for (func_name, udwf) in udwfs {
let args = udwf.documentation().and_then(|d| d.arguments.clone());
let combinations = get_udwf_args_and_return_types(udwf)?;
for (arg_types, return_type) in combinations {
for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
add_parameters(
func_name,
args.as_ref(),
arg_types,
return_type,
Self::is_variadic(udwf.signature()),
rid as u8,
);
}
}
Expand Down Expand Up @@ -1095,6 +1104,7 @@ impl InformationSchemaRoutines {
Field::new("data_type", DataType::Utf8, true),
Field::new("function_type", DataType::Utf8, true),
Field::new("description", DataType::Utf8, true),
Field::new("syntax_example", DataType::Utf8, true),
]));

Self { schema, config }
Expand All @@ -1114,6 +1124,7 @@ impl InformationSchemaRoutines {
data_type: StringBuilder::new(),
function_type: StringBuilder::new(),
description: StringBuilder::new(),
syntax_example: StringBuilder::new(),
}
}
}
Expand All @@ -1131,6 +1142,7 @@ struct InformationSchemaRoutinesBuilder {
data_type: StringBuilder,
function_type: StringBuilder,
description: StringBuilder,
syntax_example: StringBuilder,
}

impl InformationSchemaRoutinesBuilder {
Expand All @@ -1145,6 +1157,7 @@ impl InformationSchemaRoutinesBuilder {
data_type: Option<impl AsRef<str>>,
function_type: impl AsRef<str>,
description: Option<impl AsRef<str>>,
syntax_example: Option<impl AsRef<str>>,
) {
self.specific_catalog.append_value(catalog_name.as_ref());
self.specific_schema.append_value(schema_name.as_ref());
Expand All @@ -1157,6 +1170,7 @@ impl InformationSchemaRoutinesBuilder {
self.data_type.append_option(data_type.as_ref());
self.function_type.append_value(function_type.as_ref());
self.description.append_option(description);
self.syntax_example.append_option(syntax_example);
}

fn finish(&mut self) -> RecordBatch {
Expand All @@ -1174,6 +1188,7 @@ impl InformationSchemaRoutinesBuilder {
Arc::new(self.data_type.finish()),
Arc::new(self.function_type.finish()),
Arc::new(self.description.finish()),
Arc::new(self.syntax_example.finish()),
],
)
.unwrap()
Expand Down Expand Up @@ -1222,6 +1237,12 @@ impl InformationSchemaParameters {
Field::new("data_type", DataType::Utf8, false),
Field::new("parameter_default", DataType::Utf8, true),
Field::new("is_variadic", DataType::Boolean, false),
// `rid` (short for `routine id`) is used to differentiate parameters from different signatures
// (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
// For example, the following signatures have different `rid` values:
// - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
// - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
Field::new("rid", DataType::UInt8, false),
]));

Self { schema, config }
Expand All @@ -1239,7 +1260,7 @@ impl InformationSchemaParameters {
data_type: StringBuilder::new(),
parameter_default: StringBuilder::new(),
is_variadic: BooleanBuilder::new(),
inserted: HashSet::new(),
rid: UInt8Builder::new(),
}
}
}
Expand All @@ -1255,8 +1276,7 @@ struct InformationSchemaParametersBuilder {
data_type: StringBuilder,
parameter_default: StringBuilder,
is_variadic: BooleanBuilder,
// use HashSet to avoid duplicate rows. The key is (specific_name, ordinal_position, parameter_mode, data_type)
inserted: HashSet<(String, u64, String, String)>,
rid: UInt8Builder,
}

impl InformationSchemaParametersBuilder {
Expand All @@ -1272,25 +1292,19 @@ impl InformationSchemaParametersBuilder {
data_type: impl AsRef<str>,
parameter_default: Option<impl AsRef<str>>,
is_variadic: bool,
rid: u8,
) {
let key = (
specific_name.as_ref().to_string(),
ordinal_position,
parameter_mode.as_ref().to_string(),
data_type.as_ref().to_string(),
);
if self.inserted.insert(key) {
self.specific_catalog
.append_value(specific_catalog.as_ref());
self.specific_schema.append_value(specific_schema.as_ref());
self.specific_name.append_value(specific_name.as_ref());
self.ordinal_position.append_value(ordinal_position);
self.parameter_mode.append_value(parameter_mode.as_ref());
self.parameter_name.append_option(parameter_name.as_ref());
self.data_type.append_value(data_type.as_ref());
self.parameter_default.append_option(parameter_default);
self.is_variadic.append_value(is_variadic);
}
self.specific_catalog
.append_value(specific_catalog.as_ref());
self.specific_schema.append_value(specific_schema.as_ref());
self.specific_name.append_value(specific_name.as_ref());
self.ordinal_position.append_value(ordinal_position);
self.parameter_mode.append_value(parameter_mode.as_ref());
self.parameter_name.append_option(parameter_name.as_ref());
self.data_type.append_value(data_type.as_ref());
self.parameter_default.append_option(parameter_default);
self.is_variadic.append_value(is_variadic);
self.rid.append_value(rid);
}

fn finish(&mut self) -> RecordBatch {
Expand All @@ -1306,6 +1320,7 @@ impl InformationSchemaParametersBuilder {
Arc::new(self.data_type.finish()),
Arc::new(self.parameter_default.finish()),
Arc::new(self.is_variadic.finish()),
Arc::new(self.rid.finish()),
],
)
.unwrap()
Expand Down
33 changes: 28 additions & 5 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<Cs
))
}

/// Auto finish the wrapped BzEncoder on drop
#[cfg(feature = "compression")]
struct AutoFinishBzEncoder<W: Write>(BzEncoder<W>);

#[cfg(feature = "compression")]
impl<W: Write> Write for AutoFinishBzEncoder<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}

fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}

#[cfg(feature = "compression")]
impl<W: Write> Drop for AutoFinishBzEncoder<W> {
fn drop(&mut self) {
let _ = self.0.try_finish();
}
}

/// Returns file groups [`Vec<Vec<PartitionedFile>>`] for scanning `partitions` of `filename`
pub fn partitioned_file_groups(
path: &str,
Expand Down Expand Up @@ -147,9 +169,10 @@ pub fn partitioned_file_groups(
Box::new(encoder)
}
#[cfg(feature = "compression")]
FileCompressionType::BZIP2 => {
Box::new(BzEncoder::new(file, BzCompression::default()))
}
FileCompressionType::BZIP2 => Box::new(AutoFinishBzEncoder(BzEncoder::new(
file,
BzCompression::default(),
))),
#[cfg(not(feature = "compression"))]
FileCompressionType::GZIP
| FileCompressionType::BZIP2
Expand Down Expand Up @@ -183,8 +206,8 @@ pub fn partitioned_file_groups(
}
}

// Must drop the stream before creating ObjectMeta below as drop
// triggers finish for ZstdEncoder which writes additional data
// Must drop the stream before creating ObjectMeta below as drop triggers
// finish for ZstdEncoder/BzEncoder which writes additional data
for mut w in writers.into_iter() {
w.flush().unwrap();
}
Expand Down
7 changes: 3 additions & 4 deletions datafusion/functions-nested/src/distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@

//! [ScalarUDFImpl] definitions for array_distance function.
use crate::utils::{downcast_arg, make_scalar_function};
use crate::utils::make_scalar_function;
use arrow_array::{
Array, ArrayRef, Float64Array, LargeListArray, ListArray, OffsetSizeTrait,
};
use arrow_schema::DataType;
use arrow_schema::DataType::{FixedSizeList, Float64, LargeList, List};
use core::any::type_name;
use datafusion_common::cast::{
as_float32_array, as_float64_array, as_generic_list_array, as_int32_array,
as_int64_array,
};
use datafusion_common::utils::coerced_fixed_size_list_to_list;
use datafusion_common::DataFusionError;
use datafusion_common::{exec_err, Result};
use datafusion_common::{exec_err, internal_datafusion_err, Result};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_ARRAY;
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};
use datafusion_functions::{downcast_arg, downcast_named_arg};
use std::any::Any;
use std::sync::{Arc, OnceLock};

Expand Down
7 changes: 3 additions & 4 deletions datafusion/functions-nested/src/length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

//! [`ScalarUDFImpl`] definitions for array_length function.
use crate::utils::{downcast_arg, make_scalar_function};
use crate::utils::make_scalar_function;
use arrow_array::{
Array, ArrayRef, Int64Array, LargeListArray, ListArray, OffsetSizeTrait, UInt64Array,
};
use arrow_schema::DataType;
use arrow_schema::DataType::{FixedSizeList, LargeList, List, UInt64};
use core::any::type_name;
use datafusion_common::cast::{as_generic_list_array, as_int64_array};
use datafusion_common::DataFusionError;
use datafusion_common::{exec_err, plan_err, Result};
use datafusion_common::{exec_err, internal_datafusion_err, plan_err, Result};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_ARRAY;
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};
use datafusion_functions::{downcast_arg, downcast_named_arg};
use std::any::Any;
use std::sync::{Arc, OnceLock};

Expand Down
Loading

0 comments on commit 5e0ea62

Please sign in to comment.