From 6e819d6c2b9280198c67fa16df3e54c79ce46ca2 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 11 Apr 2023 18:44:15 +0100 Subject: [PATCH] Update arrow 37 (#5782) * Update arrow 37 * Fix avro * Fix sql_planner benchmark * Update arrow pin * Format * Fix test * Remove pin * Update version * Fix logical merge conflicts * Pyarrow clippy * More clippy * fixup --------- Co-authored-by: Andrew Lamb --- Cargo.toml | 12 +- benchmarks/src/tpch.rs | 14 +- datafusion-cli/Cargo.lock | 61 +++---- datafusion-cli/Cargo.toml | 2 +- datafusion-examples/Cargo.toml | 2 +- datafusion/common/src/dfschema.rs | 111 ++++++------ datafusion/common/src/pyarrow.rs | 3 +- datafusion/common/src/scalar.rs | 167 +++++++++--------- datafusion/core/benches/sql_planner.rs | 4 +- .../src/avro_to_arrow/arrow_array_reader.rs | 28 ++- datafusion/core/src/avro_to_arrow/schema.rs | 13 +- .../core/src/datasource/file_format/csv.rs | 6 +- .../src/datasource/file_format/parquet.rs | 14 +- .../core/src/datasource/listing/table.rs | 10 +- datafusion/core/src/physical_plan/empty.rs | 4 +- .../src/physical_plan/file_format/avro.rs | 8 +- .../src/physical_plan/file_format/json.rs | 10 +- .../core/src/physical_plan/file_format/mod.rs | 2 +- .../src/physical_plan/file_format/parquet.rs | 15 +- .../src/physical_plan/joins/cross_join.rs | 4 +- .../core/src/physical_plan/joins/utils.rs | 9 +- datafusion/core/src/physical_plan/planner.rs | 2 +- .../core/src/physical_plan/sorts/sort.rs | 3 +- .../windows/bounded_window_agg_exec.rs | 11 +- .../core/src/physical_plan/windows/mod.rs | 2 +- .../physical_plan/windows/window_agg_exec.rs | 10 +- datafusion/core/src/test/mod.rs | 2 +- .../core/tests/parquet/custom_reader.rs | 8 +- datafusion/core/tests/sql/aggregates.rs | 4 +- datafusion/core/tests/sql/mod.rs | 6 +- datafusion/core/tests/sql/parquet.rs | 12 +- datafusion/core/tests/sql/select.rs | 12 +- datafusion/core/tests/sql/timestamp.rs | 13 +- .../sqllogictests/test_files/arrow_typeof.slt | 6 +- .../core/tests/user_defined_aggregates.rs | 6 +- datafusion/expr/src/aggregate_function.rs | 3 +- datafusion/expr/src/field_util.rs | 2 +- datafusion/expr/src/function.rs | 14 +- datafusion/expr/src/logical_plan/builder.rs | 16 +- datafusion/expr/src/logical_plan/display.rs | 2 +- datafusion/optimizer/src/optimizer.rs | 2 +- .../src/unwrap_cast_in_comparison.rs | 8 +- .../src/aggregate/approx_percentile_cont.rs | 4 +- .../physical-expr/src/aggregate/array_agg.rs | 30 ++-- .../src/aggregate/array_agg_distinct.rs | 28 ++- .../physical-expr/src/aggregate/average.rs | 8 +- .../physical-expr/src/aggregate/build_in.rs | 16 +- .../physical-expr/src/aggregate/count.rs | 2 +- .../src/aggregate/count_distinct.rs | 8 +- .../physical-expr/src/aggregate/median.rs | 2 +- .../src/aggregate/sum_distinct.rs | 4 +- .../physical-expr/src/datetime_expressions.rs | 4 +- .../physical-expr/src/expressions/binary.rs | 6 +- .../src/expressions/binary/kernels_arrow.rs | 10 +- .../src/expressions/get_indexed_field.rs | 18 +- .../physical-expr/src/expressions/literal.rs | 2 +- datafusion/physical-expr/src/functions.rs | 6 +- datafusion/physical-expr/src/physical_expr.rs | 4 +- datafusion/physical-expr/src/type_coercion.rs | 3 +- datafusion/physical-expr/src/utils.rs | 5 +- .../physical-expr/src/window/lead_lag.rs | 2 +- .../proto/src/logical_plan/from_proto.rs | 39 ++-- datafusion/proto/src/logical_plan/mod.rs | 126 ++++++------- datafusion/proto/src/logical_plan/to_proto.rs | 28 +-- datafusion/proto/src/physical_plan/mod.rs | 6 +- datafusion/sql/src/expr/arrow_cast.rs | 2 +- datafusion/sql/src/planner.rs | 4 +- .../substrait/src/logical_plan/consumer.rs | 2 +- .../substrait/tests/roundtrip_logical_plan.rs | 10 +- 69 files changed, 497 insertions(+), 525 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index be99180947c4..228f414adca8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,12 +46,12 @@ repository = "https://github.com/apache/arrow-datafusion" rust-version = "1.64" [workspace.dependencies] -arrow = { version = "36.0.0", features = ["prettyprint"] } -arrow-flight = { version = "36.0.0", features = ["flight-sql-experimental"] } -arrow-buffer = { version = "36.0.0", default-features = false } -arrow-schema = { version = "36.0.0", default-features = false } -arrow-array = { version = "36.0.0", default-features = false, features = ["chrono-tz"] } -parquet = { version = "36.0.0", features = ["arrow", "async"] } +arrow = { version = "37.0.0", features = ["prettyprint"] } +arrow-flight = { version = "37.0.0", features = ["flight-sql-experimental"] } +arrow-buffer = { version = "37.0.0", default-features = false } +arrow-schema = { version = "37.0.0", default-features = false } +arrow-array = { version = "37.0.0", default-features = false, features = ["chrono-tz"] } +parquet = { version = "37.0.0", features = ["arrow", "async"] } [profile.release] codegen-units = 1 diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch.rs index 47a69f62d198..4e144edcc86a 100644 --- a/benchmarks/src/tpch.rs +++ b/benchmarks/src/tpch.rs @@ -16,7 +16,7 @@ // under the License. use arrow::array::{Array, ArrayRef}; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Fields, SchemaBuilder, SchemaRef}; use arrow::record_batch::RecordBatch; use std::fs; use std::ops::{Div, Mul}; @@ -45,11 +45,9 @@ pub const TPCH_TABLES: &[&str] = &[ /// The `.tbl` file contains a trailing column pub fn get_tbl_tpch_table_schema(table: &str) -> Schema { - let mut schema = get_tpch_table_schema(table); - schema - .fields - .push(Field::new("__placeholder", DataType::Utf8, false)); - schema + let mut schema = SchemaBuilder::from(get_tpch_table_schema(table).fields); + schema.push(Field::new("__placeholder", DataType::Utf8, false)); + schema.finish() } /// Get the schema for the benchmarks derived from TPC-H @@ -476,7 +474,7 @@ pub async fn transform_actual_result( // we need to round the decimal columns and trim the Utf8 columns // we also need to rewrite the batches to use a compatible schema let ctx = SessionContext::new(); - let fields = result[0] + let fields: Fields = result[0] .schema() .fields() .iter() @@ -485,7 +483,7 @@ pub async fn transform_actual_result( Some(i) => f.name()[i + 1..].to_string(), _ => f.name().to_string(), }; - f.clone().with_name(simple_name) + f.as_ref().clone().with_name(simple_name) }) .collect(); let result_schema = SchemaRef::new(Schema::new(fields)); diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index cd997b134d0c..616a0ba9da1e 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -68,9 +68,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990dfa1a9328504aa135820da1c95066537b69ad94c04881b785f64328e0fa6b" +checksum = "1aea9fcb25bbb70f7f922f95b99ca29c1013dab47f6df61a6f24861842dd7f2e" dependencies = [ "ahash", "arrow-arith", @@ -90,9 +90,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2b2e52de0ab54173f9b08232b7184c26af82ee7ab4ac77c83396633c90199fa" +checksum = "8d967b42f7b12c91fd78acd396b20c2973b184c8866846674abbb00c963e93ab" dependencies = [ "arrow-array", "arrow-buffer", @@ -105,9 +105,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10849b60c17dbabb334be1f4ef7550701aa58082b71335ce1ed586601b2f423" +checksum = "3190f208ee7aa0f3596fa0098d42911dec5e123ca88c002a08b24877ad14c71e" dependencies = [ "ahash", "arrow-buffer", @@ -122,9 +122,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0746ae991b186be39933147117f8339eb1c4bbbea1c8ad37e7bf5851a1a06ba" +checksum = "5d33c733c5b6c44a0fc526f29c09546e04eb56772a7a21e48e602f368be381f6" dependencies = [ "half", "num", @@ -132,9 +132,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88897802515d7b193e38b27ddd9d9e43923d410a9e46307582d756959ee9595" +checksum = "abd349520b6a1ed4924ae2afc9d23330a3044319e4ec3d5b124c09e4d440ae87" dependencies = [ "arrow-array", "arrow-buffer", @@ -149,9 +149,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c8220d9741fc37961262710ceebd8451a5b393de57c464f0267ffdda1775c0a" +checksum = "c80af3c3e290a2a7e1cc518f1471dff331878cb4af9a5b088bf030b89debf649" dependencies = [ "arrow-array", "arrow-buffer", @@ -168,9 +168,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "533f937efa1aaad9dc86f6a0e382c2fa736a4943e2090c946138079bdf060cef" +checksum = "b1c8361947aaa96d331da9df3f7a08bdd8ab805a449994c97f5c4d24c4b7e2cf" dependencies = [ "arrow-buffer", "arrow-schema", @@ -180,9 +180,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18b75296ff01833f602552dff26a423fc213db8e5049b540ca4a00b1c957e41c" +checksum = "9a46ee000b9fbd1e8db6e8b26acb8c760838512b39d8c9f9d73892cb55351d50" dependencies = [ "arrow-array", "arrow-buffer", @@ -194,9 +194,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e501d3de4d612c90677594896ca6c0fa075665a7ff980dc4189bb531c17e19f6" +checksum = "4bf2366607be867ced681ad7f272371a5cf1fc2941328eef7b4fee14565166fb" dependencies = [ "arrow-array", "arrow-buffer", @@ -208,14 +208,15 @@ dependencies = [ "indexmap", "lexical-core", "num", + "serde", "serde_json", ] [[package]] name = "arrow-ord" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d2671eb3793f9410230ac3efb0e6d36307be8a2dac5fad58ac9abde8e9f01e" +checksum = "304069901c867200e21ec868ae7521165875470ef2f1f6d58f979a443d63997e" dependencies = [ "arrow-array", "arrow-buffer", @@ -228,9 +229,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc11fa039338cebbf4e29cf709c8ac1d6a65c7540063d4a25f991ab255ca85c8" +checksum = "0d57fe8ceef3392fdd493269d8a2d589de17bafce151aacbffbddac7a57f441a" dependencies = [ "ahash", "arrow-array", @@ -243,15 +244,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d04f17f7b86ded0b5baf98fe6123391c4343e031acc3ccc5fa604cc180bff220" +checksum = "a16b88a93ac8350f0200b1cd336a1f887315925b8dd7aa145a37b8bdbd8497a4" [[package]] name = "arrow-select" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "163e35de698098ff5f5f672ada9dc1f82533f10407c7a11e2cd09f3bcf31d18a" +checksum = "98e8a4d6ca37d5212439b24caad4d80743fcbb706706200dd174bb98e68fe9d8" dependencies = [ "arrow-array", "arrow-buffer", @@ -262,9 +263,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfdfbed1b10209f0dc68e6aa4c43dc76079af65880965c7c3b73f641f23d4aba" +checksum = "cbb594efa397eb6a546f42b1f8df3d242ea84dbfda5232e06035dc2b2e2c8459" dependencies = [ "arrow-array", "arrow-buffer", @@ -1815,9 +1816,9 @@ dependencies = [ [[package]] name = "parquet" -version = "36.0.0" +version = "37.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "321a15f8332645759f29875b07f8233d16ed8ec1b3582223de81625a9f8506b7" +checksum = "b5022d98333271f4ca3e87bab760498e61726bf5a6ca919123c80517e20ded29" dependencies = [ "ahash", "arrow-array", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index e3ae99f962f9..7fda75eb073d 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -29,7 +29,7 @@ rust-version = "1.62" readme = "README.md" [dependencies] -arrow = "36.0.0" +arrow = "37.0.0" async-trait = "0.1.41" clap = { version = "3", features = ["derive", "cargo"] } datafusion = { path = "../datafusion/core", version = "22.0.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index d459bb52fe43..f53316d00bd9 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -56,6 +56,6 @@ prost-derive = { version = "0.11", default-features = false } serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.82" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } -tonic = "0.8" +tonic = "0.9" url = "2.2" uuid = "1.2" diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index bb5820cdaab8..77f20f21e24c 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -28,7 +28,7 @@ use crate::utils::quote_identifier; use crate::{field_not_found, Column, OwnedTableReference, TableReference}; use arrow::compute::can_cast_types; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use std::fmt::{Display, Formatter}; /// A reference-counted reference to a `DFSchema`. @@ -428,8 +428,7 @@ impl DFSchema { | (DataType::Map(f1, _), DataType::Map(f2, _)) => { Self::field_is_semantically_equal(f1, f2) } - (DataType::Struct(fields1), DataType::Struct(fields2)) - | (DataType::Union(fields1, _, _), DataType::Union(fields2, _, _)) => { + (DataType::Struct(fields1), DataType::Struct(fields2)) => { let iter1 = fields1.iter(); let iter2 = fields2.iter(); fields1.len() == fields2.len() && @@ -438,6 +437,15 @@ impl DFSchema { .zip(iter2) .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2)) } + (DataType::Union(fields1, _), DataType::Union(fields2, _)) => { + let iter1 = fields1.iter(); + let iter2 = fields2.iter(); + fields1.len() == fields2.len() && + // all fields have to be the same + iter1 + .zip(iter2) + .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2)) + } _ => dt1 == dt2, } } @@ -489,20 +497,16 @@ impl DFSchema { impl From for Schema { /// Convert DFSchema into a Schema fn from(df_schema: DFSchema) -> Self { - Schema::new_with_metadata( - df_schema.fields.into_iter().map(|f| f.field).collect(), - df_schema.metadata, - ) + let fields: Fields = df_schema.fields.into_iter().map(|f| f.field).collect(); + Schema::new_with_metadata(fields, df_schema.metadata) } } impl From<&DFSchema> for Schema { /// Convert DFSchema reference into a Schema fn from(df_schema: &DFSchema) -> Self { - Schema::new_with_metadata( - df_schema.fields.iter().map(|f| f.field.clone()).collect(), - df_schema.metadata.clone(), - ) + let fields: Fields = df_schema.fields.iter().map(|f| f.field.clone()).collect(); + Schema::new_with_metadata(fields, df_schema.metadata.clone()) } } @@ -632,7 +636,7 @@ pub struct DFField { /// Optional qualifier (usually a table or relation name) qualifier: Option, /// Arrow field definition - field: Field, + field: FieldRef, } impl DFField { @@ -645,7 +649,7 @@ impl DFField { ) -> Self { DFField { qualifier: qualifier.map(|s| s.into()), - field: Field::new(name, data_type, nullable), + field: Arc::new(Field::new(name, data_type, nullable)), } } @@ -653,26 +657,18 @@ impl DFField { pub fn new_unqualified(name: &str, data_type: DataType, nullable: bool) -> Self { DFField { qualifier: None, - field: Field::new(name, data_type, nullable), - } - } - - /// Create an unqualified field from an existing Arrow field - pub fn from(field: Field) -> Self { - Self { - qualifier: None, - field, + field: Arc::new(Field::new(name, data_type, nullable)), } } /// Create a qualified field from an existing Arrow field pub fn from_qualified<'a>( qualifier: impl Into>, - field: Field, + field: impl Into, ) -> Self { Self { qualifier: Some(qualifier.into().to_owned_reference()), - field, + field: field.into(), } } @@ -722,7 +718,7 @@ impl DFField { } /// Get the arrow field - pub fn field(&self) -> &Field { + pub fn field(&self) -> &FieldRef { &self.field } @@ -733,6 +729,21 @@ impl DFField { } } +impl From for DFField { + fn from(value: FieldRef) -> Self { + Self { + qualifier: None, + field: value, + } + } +} + +impl From for DFField { + fn from(value: Field) -> Self { + Self::from(Arc::new(value)) + } +} + #[cfg(test)] mod tests { use crate::assert_contains; @@ -941,13 +952,11 @@ mod tests { #[test] fn equivalent_names_and_types() { - let field1_i16_t = DFField::from(Field::new("f1", DataType::Int16, true)); - let field1_i16_t_meta = DFField::from( - field1_i16_t - .field() - .clone() - .with_metadata(test_metadata_n(2)), - ); + let arrow_field1 = Field::new("f1", DataType::Int16, true); + let arrow_field1_meta = arrow_field1.clone().with_metadata(test_metadata_n(2)); + + let field1_i16_t = DFField::from(arrow_field1); + let field1_i16_t_meta = DFField::from(arrow_field1_meta); let field1_i16_t_qualified = DFField::from_qualified("foo", field1_i16_t.field().clone()); let field1_i16_f = DFField::from(Field::new("f1", DataType::Int16, false)); @@ -960,43 +969,43 @@ mod tests { let field_dict_t = DFField::from(Field::new("f_dict", dict.clone(), true)); let field_dict_f = DFField::from(Field::new("f_dict", dict, false)); - let list_t = DFField::from(Field::new( + let list_t = DFField::from(Field::new_list( "f_list", - DataType::List(Box::new(field1_i16_t.field().clone())), + field1_i16_t.field().clone(), true, )); - let list_f = DFField::from(Field::new( + let list_f = DFField::from(Field::new_list( "f_list", - DataType::List(Box::new(field1_i16_f.field().clone())), + field1_i16_f.field().clone(), false, )); - let list_f_name = DFField::from(Field::new( + let list_f_name = DFField::from(Field::new_list( "f_list", - DataType::List(Box::new(field2_i16_t.field().clone())), + field2_i16_t.field().clone(), false, )); - let struct_t = DFField::from(Field::new( + let struct_t = DFField::from(Field::new_struct( "f_struct", - DataType::Struct(vec![field1_i16_t.field().clone()]), + vec![field1_i16_t.field().clone()], true, )); - let struct_f = DFField::from(Field::new( + let struct_f = DFField::from(Field::new_struct( "f_struct", - DataType::Struct(vec![field1_i16_f.field().clone()]), + vec![field1_i16_f.field().clone()], false, )); - let struct_f_meta = DFField::from(Field::new( + let struct_f_meta = DFField::from(Field::new_struct( "f_struct", - DataType::Struct(vec![field1_i16_t_meta.field().clone()]), + vec![field1_i16_t_meta.field().clone()], false, )); - let struct_f_type = DFField::from(Field::new( + let struct_f_type = DFField::from(Field::new_struct( "f_struct", - DataType::Struct(vec![field1_i32_t.field().clone()]), + vec![field1_i32_t.field().clone()], false, )); @@ -1204,14 +1213,16 @@ mod tests { } #[test] fn test_dfschema_to_schema_convertion() { - let mut a: DFField = DFField::new(Some("table1"), "a", DataType::Int64, false); - let mut b: DFField = DFField::new(Some("table1"), "b", DataType::Int64, false); let mut a_metadata = HashMap::new(); a_metadata.insert("key".to_string(), "value".to_string()); - a.field.set_metadata(a_metadata); + let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata); + let mut b_metadata = HashMap::new(); b_metadata.insert("key".to_string(), "value".to_string()); - b.field.set_metadata(b_metadata); + let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata); + + let a: DFField = DFField::from_qualified("table1", a_field); + let b: DFField = DFField::from_qualified("table1", b_field); let df_schema = Arc::new( DFSchema::new_with_metadata([a, b].to_vec(), HashMap::new()).unwrap(), diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs index e8024ab40412..14f5380fa6e1 100644 --- a/datafusion/common/src/pyarrow.rs +++ b/datafusion/common/src/pyarrow.rs @@ -20,6 +20,7 @@ use crate::{DataFusionError, ScalarValue}; use arrow::array::ArrayData; use arrow::pyarrow::PyArrowConvert; +use arrow_array::Array; use pyo3::exceptions::PyException; use pyo3::prelude::PyErr; use pyo3::types::PyList; @@ -52,7 +53,7 @@ impl PyArrowConvert for ScalarValue { fn to_pyarrow(&self, py: Python) -> PyResult { let array = self.to_array(); // convert to pyarrow array using C data interface - let pyarray = array.data().to_pyarrow(py)?; + let pyarray = array.to_data().to_pyarrow(py)?; let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?; Ok(pyscalar) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index f0e171b4d036..8c26ea9b0432 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -31,6 +31,7 @@ use crate::cast::{ }; use crate::delta::shift_months; use crate::error::{DataFusionError, Result}; +use arrow::datatypes::{FieldRef, Fields, SchemaBuilder}; use arrow::{ array::*, compute::kernels::cast::{cast, cast_with_options, CastOptions}, @@ -99,7 +100,7 @@ pub enum ScalarValue { /// large binary LargeBinary(Option>), /// list of nested ScalarValue - List(Option>, Box), + List(Option>, FieldRef), /// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01 Date32(Option), /// Date stored as a signed 64bit int milliseconds since UNIX epoch 1970-01-01 @@ -113,13 +114,13 @@ pub enum ScalarValue { /// Time stored as a signed 64bit int as nanoseconds since midnight Time64Nanosecond(Option), /// Timestamp Second - TimestampSecond(Option, Option), + TimestampSecond(Option, Option>), /// Timestamp Milliseconds - TimestampMillisecond(Option, Option), + TimestampMillisecond(Option, Option>), /// Timestamp Microseconds - TimestampMicrosecond(Option, Option), + TimestampMicrosecond(Option, Option>), /// Timestamp Nanoseconds - TimestampNanosecond(Option, Option), + TimestampNanosecond(Option, Option>), /// Number of elapsed whole months IntervalYearMonth(Option), /// Number of elapsed days and milliseconds (no leap seconds) @@ -130,7 +131,7 @@ pub enum ScalarValue { /// Nanoseconds is encoded as a 64-bit signed integer (no leap seconds). IntervalMonthDayNano(Option), /// struct of nested ScalarValue - Struct(Option>, Box>), + Struct(Option>, Fields), /// Dictionary type: index type and value Dictionary(Box, Box), } @@ -642,8 +643,8 @@ macro_rules! impl_op { ts_sub_to_interval::( ts_lhs.checked_mul(1_000).ok_or_else(err)?, ts_rhs.checked_mul(1_000).ok_or_else(err)?, - &tz_lhs, - &tz_rhs, + tz_lhs.as_deref(), + tz_rhs.as_deref(), ) }, ( @@ -652,8 +653,8 @@ macro_rules! impl_op { ) => ts_sub_to_interval::( *ts_lhs, *ts_rhs, - tz_lhs, - tz_rhs, + tz_lhs.as_deref(), + tz_rhs.as_deref(), ), ( ScalarValue::TimestampMicrosecond(Some(ts_lhs), tz_lhs), @@ -667,8 +668,8 @@ macro_rules! impl_op { ts_sub_to_interval::( ts_lhs.checked_mul(1_000).ok_or_else(err)?, ts_rhs.checked_mul(1_000).ok_or_else(err)?, - tz_lhs, - tz_rhs, + tz_lhs.as_deref(), + tz_rhs.as_deref(), ) }, ( @@ -677,8 +678,8 @@ macro_rules! impl_op { ) => ts_sub_to_interval::( *ts_lhs, *ts_rhs, - tz_lhs, - tz_rhs, + tz_lhs.as_deref(), + tz_rhs.as_deref(), ), _ => impl_op_arithmetic!($LHS, $RHS, -) } @@ -978,8 +979,8 @@ pub const NANOSECOND_MODE: bool = true; fn ts_sub_to_interval( lhs_ts: i64, rhs_ts: i64, - lhs_tz: &Option, - rhs_tz: &Option, + lhs_tz: Option<&str>, + rhs_tz: Option<&str>, ) -> Result { let parsed_lhs_tz = parse_timezones(lhs_tz)?; let parsed_rhs_tz = parse_timezones(rhs_tz)?; @@ -1013,9 +1014,9 @@ fn ts_sub_to_interval( /// This function parses the timezone from string to Tz. /// If it cannot parse or timezone field is [`None`], it returns [`None`]. -pub fn parse_timezones(tz: &Option) -> Result> { +pub fn parse_timezones(tz: Option<&str>) -> Result> { if let Some(tz) = tz { - let parsed_tz: Tz = FromStr::from_str(tz).map_err(|_| { + let parsed_tz: Tz = tz.parse().map_err(|_| { DataFusionError::Execution("cannot parse given timezone".to_string()) })?; Ok(Some(parsed_tz)) @@ -1481,7 +1482,7 @@ macro_rules! build_list { // the return on the macro is necessary, to short-circuit and return ArrayRef None => { return new_null_array( - &DataType::List(Box::new(Field::new( + &DataType::List(Arc::new(Field::new( "item", DataType::$SCALAR_TY, true, @@ -1502,7 +1503,7 @@ macro_rules! build_timestamp_list { // the return on the macro is necessary, to short-circuit and return ArrayRef None => { return new_null_array( - &DataType::List(Box::new(Field::new( + &DataType::List(Arc::new(Field::new( "item", DataType::Timestamp($TIME_UNIT, $TIME_ZONE), true, @@ -1679,7 +1680,7 @@ impl ScalarValue { /// Create a new nullable ScalarValue::List with the specified child_type pub fn new_list(scalars: Option>, child_type: DataType) -> Self { - Self::List(scalars, Box::new(Field::new("item", child_type, true))) + Self::List(scalars, Arc::new(Field::new("item", child_type, true))) } /// Create a zero value in the given type. @@ -1800,7 +1801,7 @@ impl ScalarValue { ScalarValue::Binary(_) => DataType::Binary, ScalarValue::FixedSizeBinary(sz, _) => DataType::FixedSizeBinary(*sz), ScalarValue::LargeBinary(_) => DataType::LargeBinary, - ScalarValue::List(_, field) => DataType::List(Box::new(Field::new( + ScalarValue::List(_, field) => DataType::List(Arc::new(Field::new( "item", field.data_type().clone(), true, @@ -1818,7 +1819,7 @@ impl ScalarValue { ScalarValue::IntervalMonthDayNano(_) => { DataType::Interval(IntervalUnit::MonthDayNano) } - ScalarValue::Struct(_, fields) => DataType::Struct(fields.as_ref().clone()), + ScalarValue::Struct(_, fields) => DataType::Struct(fields.clone()), ScalarValue::Dictionary(k, v) => { DataType::Dictionary(k.clone(), Box::new(v.get_datatype())) } @@ -2279,7 +2280,7 @@ impl ScalarValue { .iter() .zip(columns) .map(|(field, column)| -> Result<(Field, ArrayRef)> { - Ok((field.clone(), Self::iter_to_array(column)?)) + Ok((field.as_ref().clone(), Self::iter_to_array(column)?)) }) .collect::>>()?; @@ -2351,7 +2352,7 @@ impl ScalarValue { | DataType::Duration(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) - | DataType::Union(_, _, _) + | DataType::Union(_, _) | DataType::Map(_, _) | DataType::RunEndEncoded(_, _) => { return Err(DataFusionError::Internal(format!( @@ -2449,8 +2450,8 @@ impl ScalarValue { let array_data = ArrayDataBuilder::new(data_type.clone()) .len(offsets_array.len() - 1) .null_bit_buffer(Some(valid.finish())) - .add_buffer(offsets_array.data().buffers()[0].clone()) - .add_child_data(flat_array.data().clone()); + .add_buffer(offsets_array.values().inner().clone()) + .add_child_data(flat_array.to_data()); let list_array = ListArray::from(array_data.build()?); Ok(list_array) @@ -2602,7 +2603,7 @@ impl ScalarValue { } _ => ScalarValue::iter_to_array_list( repeat(self.clone()).take(size), - &DataType::List(Box::new(Field::new( + &DataType::List(Arc::new(Field::new( "item", field.data_type().clone(), true, @@ -2679,7 +2680,7 @@ impl ScalarValue { .iter() .zip(values.iter()) .map(|(field, value)| { - (field.clone(), value.to_array_of_size(size)) + (field.as_ref().clone(), value.to_array_of_size(size)) }) .collect(); @@ -2691,7 +2692,7 @@ impl ScalarValue { .map(|field| { let none_field = Self::try_from(field.data_type()) .expect("Failed to construct null ScalarValue from Struct field type"); - (field.clone(), none_field.to_array_of_size(size)) + (field.as_ref().clone(), none_field.to_array_of_size(size)) }) .collect(); @@ -2861,7 +2862,7 @@ impl ScalarValue { let col_scalar = ScalarValue::try_from_array(col_array, index)?; field_values.push(col_scalar); } - Self::Struct(Some(field_values), Box::new(fields.clone())) + Self::Struct(Some(field_values), fields.clone()) } DataType::FixedSizeList(nested_type, _len) => { let list_array = as_fixed_size_list_array(array)?; @@ -3095,13 +3096,14 @@ impl ScalarValue { | ScalarValue::IntervalYearMonth(_) | ScalarValue::IntervalDayTime(_) | ScalarValue::IntervalMonthDayNano(_) => 0, - ScalarValue::Utf8(s) - | ScalarValue::LargeUtf8(s) - | ScalarValue::TimestampSecond(_, s) + ScalarValue::Utf8(s) | ScalarValue::LargeUtf8(s) => { + s.as_ref().map(|s| s.capacity()).unwrap_or_default() + } + ScalarValue::TimestampSecond(_, s) | ScalarValue::TimestampMillisecond(_, s) | ScalarValue::TimestampMicrosecond(_, s) | ScalarValue::TimestampNanosecond(_, s) => { - s.as_ref().map(|s| s.capacity()).unwrap_or_default() + s.as_ref().map(|s| s.len()).unwrap_or_default() } ScalarValue::Binary(b) | ScalarValue::FixedSizeBinary(_, b) @@ -3126,7 +3128,7 @@ impl ScalarValue { .unwrap_or_default() // `fields` is boxed, so it is NOT already included in `self` + std::mem::size_of_val(fields) - + (std::mem::size_of::() * fields.capacity()) + + (std::mem::size_of::() * fields.len()) + fields.iter().map(|field| field.size() - std::mem::size_of_val(field)).sum::() } ScalarValue::Dictionary(dt, sv) => { @@ -3212,14 +3214,14 @@ impl FromStr for ScalarValue { impl From> for ScalarValue { fn from(value: Vec<(&str, ScalarValue)>) -> Self { - let (fields, scalars): (Vec<_>, Vec<_>) = value + let (fields, scalars): (SchemaBuilder, Vec<_>) = value .into_iter() .map(|(name, scalar)| { (Field::new(name, scalar.get_datatype(), false), scalar) }) .unzip(); - Self::Struct(Some(scalars), Box::new(fields)) + Self::Struct(Some(scalars), fields.finish().fields) } } @@ -3383,9 +3385,7 @@ impl TryFrom<&DataType> for ScalarValue { DataType::List(ref nested_type) => { ScalarValue::new_list(None, nested_type.data_type().clone()) } - DataType::Struct(fields) => { - ScalarValue::Struct(None, Box::new(fields.clone())) - } + DataType::Struct(fields) => ScalarValue::Struct(None, fields.clone()), DataType::Null => ScalarValue::Null, _ => { return Err(DataFusionError::NotImplemented(format!( @@ -3819,7 +3819,7 @@ mod tests { fn scalar_list_null_to_array() { let list_array_ref = ScalarValue::List( None, - Box::new(Field::new("item", DataType::UInt64, false)), + Arc::new(Field::new("item", DataType::UInt64, false)), ) .to_array(); let list_array = as_list_array(&list_array_ref).unwrap(); @@ -3837,7 +3837,7 @@ mod tests { ScalarValue::UInt64(None), ScalarValue::UInt64(Some(101)), ]), - Box::new(Field::new("item", DataType::UInt64, false)), + Arc::new(Field::new("item", DataType::UInt64, false)), ) .to_array(); @@ -4241,7 +4241,7 @@ mod tests { i64_vals, TimestampSecondArray, TimestampSecond, - Some("UTC".to_owned()) + Some("UTC".into()) ), make_test_case!( i64_vals, @@ -4253,7 +4253,7 @@ mod tests { i64_vals, TimestampMillisecondArray, TimestampMillisecond, - Some("UTC".to_owned()) + Some("UTC".into()) ), make_test_case!( i64_vals, @@ -4265,7 +4265,7 @@ mod tests { i64_vals, TimestampMicrosecondArray, TimestampMicrosecond, - Some("UTC".to_owned()) + Some("UTC".into()) ), make_test_case!( i64_vals, @@ -4277,7 +4277,7 @@ mod tests { i64_vals, TimestampNanosecondArray, TimestampNanosecond, - Some("UTC".to_owned()) + Some("UTC".into()) ), make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth), make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime), @@ -4340,11 +4340,11 @@ mod tests { assert_eq!( List( Some(vec![Int32(Some(1)), Int32(Some(5))]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), ) .partial_cmp(&List( Some(vec![Int32(Some(1)), Int32(Some(5))]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), )), Some(Ordering::Equal) ); @@ -4352,11 +4352,11 @@ mod tests { assert_eq!( List( Some(vec![Int32(Some(10)), Int32(Some(5))]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), ) .partial_cmp(&List( Some(vec![Int32(Some(1)), Int32(Some(5))]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), )), Some(Ordering::Greater) ); @@ -4364,11 +4364,11 @@ mod tests { assert_eq!( List( Some(vec![Int32(Some(1)), Int32(Some(5))]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), ) .partial_cmp(&List( Some(vec![Int32(Some(10)), Int32(Some(5))]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), )), Some(Ordering::Less) ); @@ -4377,11 +4377,11 @@ mod tests { assert_eq!( List( Some(vec![Int64(Some(1)), Int64(Some(5))]), - Box::new(Field::new("item", DataType::Int64, false)), + Arc::new(Field::new("item", DataType::Int64, false)), ) .partial_cmp(&List( Some(vec![Int32(Some(1)), Int32(Some(5))]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), )), None ); @@ -4469,7 +4469,7 @@ mod tests { let field_f = Field::new("f", DataType::Int64, false); let field_d = Field::new( "D", - DataType::Struct(vec![field_e.clone(), field_f.clone()]), + DataType::Struct(vec![field_e.clone(), field_f.clone()].into()), false, ); @@ -4483,12 +4483,13 @@ mod tests { ("f", ScalarValue::from(3i64)), ]), ]), - Box::new(vec![ + vec![ field_a.clone(), field_b.clone(), field_c.clone(), field_d.clone(), - ]), + ] + .into(), ); // Check Display @@ -4640,7 +4641,7 @@ mod tests { let field_a = Field::new("A", DataType::Utf8, false); let field_primitive_list = Field::new( "primitive_list", - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), false, ); @@ -4651,17 +4652,17 @@ mod tests { ScalarValue::from(2i32), ScalarValue::from(3i32), ]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), ); let l1 = ScalarValue::List( Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), ); let l2 = ScalarValue::List( Some(vec![ScalarValue::from(6i32)]), - Box::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new("item", DataType::Int32, false)), ); // Define struct scalars @@ -4849,7 +4850,7 @@ mod tests { DataType::Int32, ), ]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); let l2 = ScalarValue::new_list( @@ -4863,7 +4864,7 @@ mod tests { DataType::Int32, ), ]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); let l3 = ScalarValue::new_list( @@ -4871,7 +4872,7 @@ mod tests { Some(vec![ScalarValue::from(9i32)]), DataType::Int32, )]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap(); @@ -4913,25 +4914,25 @@ mod tests { fn scalar_timestamp_ns_utc_timezone() { let scalar = ScalarValue::TimestampNanosecond( Some(1599566400000000000), - Some("UTC".to_owned()), + Some("UTC".into()), ); assert_eq!( scalar.get_datatype(), - DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) ); let array = scalar.to_array(); assert_eq!(array.len(), 1); assert_eq!( array.data_type(), - &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) ); let newscalar = ScalarValue::try_from_array(&array, 0).unwrap(); assert_eq!( newscalar.get_datatype(), - DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())) ); } @@ -5408,7 +5409,7 @@ mod tests { .unwrap() .timestamp_nanos(), ), - Some("+12:00".to_string()), + Some("+12:00".into()), ), ScalarValue::TimestampNanosecond( Some( @@ -5418,7 +5419,7 @@ mod tests { .unwrap() .timestamp_nanos(), ), - Some("+00:00".to_string()), + Some("+00:00".into()), ), ScalarValue::new_interval_mdn(0, 0, 0), ), @@ -5432,7 +5433,7 @@ mod tests { .unwrap() .timestamp_micros(), ), - Some("+01:00".to_string()), + Some("+01:00".into()), ), ScalarValue::TimestampMicrosecond( Some( @@ -5442,7 +5443,7 @@ mod tests { .unwrap() .timestamp_micros(), ), - Some("-01:00".to_string()), + Some("-01:00".into()), ), ScalarValue::new_interval_mdn(0, sign * 59, 0), ), @@ -5456,7 +5457,7 @@ mod tests { .unwrap() .timestamp_millis(), ), - Some("+10:10".to_string()), + Some("+10:10".into()), ), ScalarValue::TimestampMillisecond( Some( @@ -5466,7 +5467,7 @@ mod tests { .unwrap() .timestamp_millis(), ), - Some("+01:00".to_string()), + Some("+01:00".into()), ), ScalarValue::new_interval_dt(sign * 60, 0), ), @@ -5481,7 +5482,7 @@ mod tests { .unwrap() .timestamp(), ), - Some("-11:59".to_string()), + Some("-11:59".into()), ), ScalarValue::TimestampSecond( Some( @@ -5491,7 +5492,7 @@ mod tests { .unwrap() .timestamp(), ), - Some("+11:59".to_string()), + Some("+11:59".into()), ), ScalarValue::new_interval_dt(sign * 59, 0), ), @@ -5506,7 +5507,7 @@ mod tests { .unwrap() .timestamp_millis(), ), - Some("+06:00".to_string()), + Some("+06:00".into()), ), ScalarValue::TimestampMillisecond( Some( @@ -5516,7 +5517,7 @@ mod tests { .unwrap() .timestamp_millis(), ), - Some("-12:00".to_string()), + Some("-12:00".into()), ), ScalarValue::new_interval_dt(0, sign * -43_200_000), ), @@ -5606,7 +5607,7 @@ mod tests { .unwrap() .timestamp(), ), - Some("+23:59".to_string()), + Some("+23:59".into()), ), ScalarValue::TimestampSecond( Some( @@ -5616,7 +5617,7 @@ mod tests { .unwrap() .timestamp(), ), - Some("-23:59".to_string()), + Some("-23:59".into()), ), ScalarValue::new_interval_dt(0, 0), ), @@ -5630,7 +5631,7 @@ mod tests { .unwrap() .timestamp(), ), - Some("Europe/Istanbul".to_string()), + Some("Europe/Istanbul".into()), ), ScalarValue::TimestampSecond( Some( @@ -5640,7 +5641,7 @@ mod tests { .unwrap() .timestamp(), ), - Some("America/Los_Angeles".to_string()), + Some("America/Los_Angeles".into()), ), ScalarValue::new_interval_dt(0, 0), ), diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 559de4ba3f17..7a41b6bec6f5 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -22,7 +22,7 @@ extern crate datafusion; mod data_utils; use crate::criterion::Criterion; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Fields, Schema}; use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; use std::sync::Arc; @@ -49,7 +49,7 @@ fn physical_plan(ctx: &SessionContext, sql: &str) { /// Create schema with the specified number of columns pub fn create_schema(column_prefix: &str, num_columns: usize) -> Schema { - let fields = (0..num_columns) + let fields: Fields = (0..num_columns) .map(|i| Field::new(format!("{column_prefix}{i}"), DataType::Int32, true)) .collect(); Schema::new(fields) diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs index 313c2a1596ea..a4254c86ce51 100644 --- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs @@ -41,7 +41,7 @@ use apache_avro::{ AvroResult, Error as AvroError, Reader as AvroReader, }; use arrow::array::{BinaryArray, GenericListArray}; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Fields, SchemaRef}; use arrow::error::ArrowError::SchemaError; use arrow::error::Result as ArrowResult; use num_traits::NumCast; @@ -112,8 +112,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { let projection = self.projection.clone().unwrap_or_default(); let arrays = self.build_struct_array(rows.as_slice(), self.schema.fields(), &projection); - let projected_fields: Vec = if projection.is_empty() { - self.schema.fields().to_vec() + let projected_fields = if projection.is_empty() { + self.schema.fields().clone() } else { projection .iter() @@ -504,12 +504,12 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { DataType::List(field) => { let child = self.build_nested_list_array::(&flatten_values(rows), field)?; - child.data().clone() + child.to_data() } DataType::LargeList(field) => { let child = self.build_nested_list_array::(&flatten_values(rows), field)?; - child.data().clone() + child.to_data() } DataType::Struct(fields) => { // extract list values, with non-lists converted to Value::Null @@ -545,13 +545,12 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { }) .collect(); let rows = rows.iter().collect::>>(); - let arrays = - self.build_struct_array(rows.as_slice(), fields.as_slice(), &[])?; + let arrays = self.build_struct_array(rows.as_slice(), fields, &[])?; let data_type = DataType::Struct(fields.clone()); ArrayDataBuilder::new(data_type) .len(rows.len()) .null_bit_buffer(Some(null_buffer.into())) - .child_data(arrays.into_iter().map(|a| a.data().clone()).collect()) + .child_data(arrays.into_iter().map(|a| a.to_data()).collect()) .build() .unwrap() } @@ -562,7 +561,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { } }; // build list - let list_data = ArrayData::builder(DataType::List(Box::new(list_field.clone()))) + let list_data = ArrayData::builder(DataType::List(Arc::new(list_field.clone()))) .len(list_len) .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(array_data) @@ -583,7 +582,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { fn build_struct_array( &self, rows: RecordSlice, - struct_fields: &[Field], + struct_fields: &Fields, projection: &[String], ) -> ArrowResult> { let arrays: ArrowResult> = struct_fields @@ -756,9 +755,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { let data = ArrayDataBuilder::new(data_type) .len(len) .null_bit_buffer(Some(null_buffer.into())) - .child_data( - arrays.into_iter().map(|a| a.data().clone()).collect(), - ) + .child_data(arrays.into_iter().map(|a| a.to_data()).collect()) .build()?; make_array(data) } @@ -798,7 +795,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { }) .collect::>>(); let array = values.iter().collect::>(); - array.data().clone() + array.to_data() } fn field_lookup<'b>( @@ -970,6 +967,7 @@ mod test { as_int32_array, as_int64_array, as_list_array, as_timestamp_microsecond_array, }; use std::fs::File; + use std::sync::Arc; fn build_reader(name: &str, batch_size: usize) -> Reader { let testdata = crate::test_util::arrow_test_data(); @@ -1025,7 +1023,7 @@ mod test { let a_array = as_list_array(batch.column(col_id_index)).unwrap(); assert_eq!( *a_array.data_type(), - DataType::List(Box::new(Field::new("bigint", DataType::Int64, true))) + DataType::List(Arc::new(Field::new("bigint", DataType::Int64, true))) ); let array = a_array.value(0); assert_eq!(*array.data_type(), DataType::Int64); diff --git a/datafusion/core/src/avro_to_arrow/schema.rs b/datafusion/core/src/avro_to_arrow/schema.rs index a3d5986da83d..d4c881ca54eb 100644 --- a/datafusion/core/src/avro_to_arrow/schema.rs +++ b/datafusion/core/src/avro_to_arrow/schema.rs @@ -20,9 +20,10 @@ use crate::error::{DataFusionError, Result}; use apache_avro::schema::{Alias, Name}; use apache_avro::types::Value; use apache_avro::Schema as AvroSchema; -use arrow::datatypes::Field; +use arrow::datatypes::{Field, UnionFields}; use std::collections::HashMap; use std::convert::TryFrom; +use std::sync::Arc; /// Converts an avro schema to an arrow schema pub fn to_arrow_schema(avro_schema: &apache_avro::Schema) -> Result { @@ -70,7 +71,7 @@ fn schema_to_field_with_props( AvroSchema::Double => DataType::Float64, AvroSchema::Bytes => DataType::Binary, AvroSchema::String => DataType::Utf8, - AvroSchema::Array(item_schema) => DataType::List(Box::new( + AvroSchema::Array(item_schema) => DataType::List(Arc::new( schema_to_field_with_props(item_schema, None, false, None)?, )), AvroSchema::Map(value_schema) => { @@ -104,12 +105,12 @@ fn schema_to_field_with_props( .iter() .map(|s| schema_to_field_with_props(s, None, has_nullable, None)) .collect::>>()?; - let type_ids = (0_i8..fields.len() as i8).collect(); - DataType::Union(fields, type_ids, UnionMode::Dense) + let type_ids = 0_i8..fields.len() as i8; + DataType::Union(UnionFields::new(type_ids, fields), UnionMode::Dense) } } AvroSchema::Record { name, fields, .. } => { - let fields: Result> = fields + let fields: Result<_> = fields .iter() .map(|field| { let mut props = HashMap::new(); @@ -214,7 +215,7 @@ fn default_field_name(dt: &DataType) -> &str { DataType::FixedSizeList(_, _) => "fixed_size_list", DataType::LargeList(_) => "largelist", DataType::Struct(_) => "struct", - DataType::Union(_, _, _) => "union", + DataType::Union(_, _) => "union", DataType::Dictionary(_, _) => "map", DataType::Map(_, _) => unimplemented!("Map support not implemented"), DataType::RunEndEncoded(_, _) => { diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index fcab651e3514..b3969bad9032 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -22,7 +22,7 @@ use std::any::Any; use std::collections::HashSet; use std::sync::Arc; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; use bytes::{Buf, Bytes}; @@ -242,7 +242,7 @@ impl CsvFormat { )); } - column_type_possibilities.iter_mut().zip(fields).for_each( + column_type_possibilities.iter_mut().zip(&fields).for_each( |(possibilities, field)| { possibilities.insert(field.data_type().clone()); }, @@ -287,7 +287,7 @@ fn build_schema_helper(names: Vec, types: &[HashSet]) -> Schem _ => Field::new(field_name, DataType::Utf8, true), } }) - .collect(); + .collect::(); Schema::new(fields) } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index ba18e9f62c25..0ffc16cb11b2 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -20,8 +20,8 @@ use std::any::Any; use std::sync::Arc; -use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Fields, Schema}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use datafusion_common::DataFusionError; @@ -38,7 +38,7 @@ use super::FileScanConfig; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; -use crate::arrow::datatypes::{DataType, Field}; +use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; use crate::datasource::{create_max_min_accs, get_col_stats}; @@ -132,9 +132,9 @@ fn clear_metadata( .fields() .iter() .map(|field| { - field.clone().with_metadata(Default::default()) // clear meta + field.as_ref().clone().with_metadata(Default::default()) // clear meta }) - .collect::>(); + .collect::(); Schema::new(fields) }) } @@ -209,7 +209,7 @@ impl FileFormat for ParquetFormat { fn summarize_min_max( max_values: &mut [Option], min_values: &mut [Option], - fields: &[Field], + fields: &Fields, i: usize, stat: &ParquetStatistics, ) { @@ -468,7 +468,7 @@ async fn fetch_statistics( )?; let num_fields = table_schema.fields().len(); - let fields = table_schema.fields().to_vec(); + let fields = table_schema.fields(); let mut num_rows = 0; let mut total_byte_size = 0; @@ -502,7 +502,7 @@ async fn fetch_statistics( summarize_min_max( &mut max_values, &mut min_values, - &fields, + fields, table_idx, stats, ) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index f85492d8c2ed..4845cb49a070 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -21,7 +21,7 @@ use std::str::FromStr; use std::{any::Any, sync::Arc}; use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use async_trait::async_trait; use dashmap::DashMap; use datafusion_common::ToDFSchema; @@ -575,16 +575,16 @@ impl ListingTable { })?; // Add the partition columns to the file schema - let mut table_fields = file_schema.fields().clone(); + let mut builder = SchemaBuilder::from(file_schema.fields()); for (part_col_name, part_col_type) in &options.table_partition_cols { - table_fields.push(Field::new(part_col_name, part_col_type.clone(), false)); + builder.push(Field::new(part_col_name, part_col_type.clone(), false)); } let infinite_source = options.infinite_source; let table = Self { table_paths: config.table_paths, file_schema, - table_schema: Arc::new(Schema::new(table_fields)), + table_schema: Arc::new(builder.finish()), options, definition: None, collected_statistics: Default::default(), @@ -836,7 +836,7 @@ mod tests { logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, }; - use arrow::datatypes::DataType; + use arrow::datatypes::{DataType, Schema}; use chrono::DateTime; use datafusion_common::assert_contains; use rstest::*; diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index 18a712b6cf42..d35db5b645f0 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -25,7 +25,7 @@ use crate::physical_plan::{ memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning, }; use arrow::array::{ArrayRef, NullArray}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use log::debug; @@ -77,7 +77,7 @@ impl EmptyExec { .map(|i| { Field::new(format!("placeholder_{i}"), DataType::Null, true) }) - .collect(), + .collect::(), )), (0..n_field) .map(|_i| { diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index ed27dfac0317..7a4883b0a92a 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -216,7 +216,7 @@ mod tests { use crate::prelude::SessionContext; use crate::scalar::ScalarValue; use crate::test::object_store::local_unpartitioned_file; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::datatypes::{DataType, Field, SchemaBuilder}; use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::ObjectStore; @@ -324,10 +324,10 @@ mod tests { .infer_schema(&state, &object_store, &[meta.clone()]) .await?; - let mut fields = actual_schema.fields().clone(); - fields.push(Field::new("missing_col", DataType::Int32, true)); + let mut builder = SchemaBuilder::from(actual_schema.fields()); + builder.push(Field::new("missing_col", DataType::Int32, true)); - let file_schema = Arc::new(Schema::new(fields)); + let file_schema = Arc::new(builder.finish()); // Include the missing column in the projection let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]); diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index ebbae7417889..379833bf4c33 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -271,7 +271,7 @@ pub async fn plan_to_json( #[cfg(test)] mod tests { use arrow::array::Array; - use arrow::datatypes::{Field, Schema}; + use arrow::datatypes::{Field, SchemaBuilder}; use futures::StreamExt; use object_store::local::LocalFileSystem; @@ -471,11 +471,11 @@ mod tests { let (object_store_url, file_groups, actual_schema) = prepare_store(&state, file_compression_type.to_owned()).await; - let mut fields = actual_schema.fields().clone(); - fields.push(Field::new("missing_col", DataType::Int32, true)); - let missing_field_idx = fields.len() - 1; + let mut builder = SchemaBuilder::from(actual_schema.fields()); + builder.push(Field::new("missing_col", DataType::Int32, true)); - let file_schema = Arc::new(Schema::new(fields)); + let file_schema = Arc::new(builder.finish()); + let missing_field_idx = file_schema.fields.len() - 1; let exec = NdJsonExec::new( FileScanConfig { diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index fdf34e75de8d..141a737314dc 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -532,7 +532,7 @@ where let mut builder = ArrayData::builder(data_type) .len(len) .add_buffer(sliced_key_buffer); - builder = builder.add_child_data(dict_vals.data().clone()); + builder = builder.add_child_data(dict_vals.to_data()); Arc::new(DictionaryArray::::from( builder.build().unwrap(), )) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 92be32f47649..349fa68a4b99 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -826,7 +826,7 @@ mod tests { use arrow::record_batch::RecordBatch; use arrow::{ array::{Int64Array, Int8Array, StringArray}, - datatypes::{DataType, Field}, + datatypes::{DataType, Field, SchemaBuilder}, }; use chrono::{TimeZone, Utc}; use datafusion_common::ScalarValue; @@ -971,9 +971,9 @@ mod tests { field_name: &str, array: ArrayRef, ) -> RecordBatch { - let mut fields = batch.schema().fields().clone(); + let mut fields = SchemaBuilder::from(batch.schema().fields()); fields.push(Field::new(field_name, array.data_type().clone(), true)); - let schema = Arc::new(Schema::new(fields)); + let schema = Arc::new(fields.finish()); let mut columns = batch.columns().to_vec(); columns.push(array); @@ -982,7 +982,7 @@ mod tests { fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { columns.into_iter().fold( - RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + RecordBatch::new_empty(Arc::new(Schema::empty())), |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), ) } @@ -1009,11 +1009,8 @@ mod tests { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); // batch1: c1(string) - let batch1 = add_to_batch( - &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), - "c1", - c1, - ); + let batch1 = + add_to_batch(&RecordBatch::new_empty(Arc::new(Schema::empty())), "c1", c1); // batch2: c1(string) and c2(int64) let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 8492e5e6b20d..44a91865e27c 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -22,7 +22,7 @@ use futures::{ready, StreamExt}; use futures::{Stream, TryStreamExt}; use std::{any::Any, sync::Arc, task::Poll}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Fields, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use crate::execution::context::TaskContext; @@ -68,7 +68,7 @@ impl CrossJoinExec { /// Create a new [CrossJoinExec]. pub fn new(left: Arc, right: Arc) -> Self { // left then right - let all_columns = { + let all_columns: Fields = { let left_schema = left.schema(); let right_schema = right.schema(); let left_fields = left_schema.fields().iter(); diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index ac98e51b02d8..9e25e0353772 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -22,7 +22,7 @@ use arrow::array::{ UInt32Builder, UInt64Array, }; use arrow::compute; -use arrow::datatypes::{Field, Schema}; +use arrow::datatypes::{Field, Schema, SchemaBuilder}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; @@ -351,7 +351,7 @@ pub fn build_join_schema( right: &Schema, join_type: &JoinType, ) -> (Schema, Vec) { - let (fields, column_indices): (Vec, Vec) = match join_type { + let (fields, column_indices): (SchemaBuilder, Vec) = match join_type { JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { let left_fields = left .fields() @@ -417,7 +417,7 @@ pub fn build_join_schema( .unzip(), }; - (Schema::new(fields), column_indices) + (fields.finish(), column_indices) } /// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls @@ -1066,6 +1066,7 @@ impl BuildProbeJoinMetrics { #[cfg(test)] mod tests { use super::*; + use arrow::datatypes::Fields; use arrow::error::Result as ArrowResult; use arrow::{datatypes::DataType, error::ArrowError}; use datafusion_common::ScalarValue; @@ -1202,7 +1203,7 @@ mod tests { .iter() .cloned() .chain(right_out.fields().iter().cloned()) - .collect(); + .collect::(); let expected_schema = Schema::new(expected_fields); assert_eq!( diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 01efa2f7964c..2064357f7d03 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -2176,7 +2176,7 @@ mod tests { fn struct_literal() -> Expr { let struct_literal = ScalarValue::Struct( None, - Box::new(vec![Field::new("foo", DataType::Boolean, false)]), + vec![Field::new("foo", DataType::Boolean, false)].into(), ); lit(struct_literal) } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 6037bc5c8b62..1651ed1c6c9d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -519,8 +519,9 @@ impl Stream for SortedSizedRecordBatchStream { let arrays = self .batches .iter() - .map(|b| b.column(i).data()) + .map(|b| b.column(i).to_data()) .collect::>(); + let arrays = arrays.iter().collect(); let mut mutable = MutableArrayData::new(arrays, false, num_rows); for x in slices.iter() { mutable.extend( diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index 443327e51632..cc35541fc6c9 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -33,7 +33,7 @@ use crate::physical_plan::{ use arrow::{ array::{Array, ArrayRef}, compute::{concat, concat_batches, SortColumn}, - datatypes::{Schema, SchemaRef}, + datatypes::{Schema, SchemaBuilder, SchemaRef}, record_batch::RecordBatch, }; use datafusion_common::{DataFusionError, ScalarValue}; @@ -270,13 +270,14 @@ fn create_schema( input_schema: &Schema, window_expr: &[Arc], ) -> Result { - let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len()); - fields.extend_from_slice(input_schema.fields()); + let capacity = input_schema.fields().len() + window_expr.len(); + let mut builder = SchemaBuilder::with_capacity(capacity); + builder.extend(input_schema.fields.iter().cloned()); // append results to the schema for expr in window_expr { - fields.push(expr.field()?); + builder.push(expr.field()?); } - Ok(Schema::new(fields)) + Ok(builder.finish()) } /// This trait defines the interface for updating the state and calculating diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index 5b22e4f02efe..d418e4b3d5d9 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -315,7 +315,7 @@ mod tests { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array = &values[0]; - self.0 += (array.len() - array.data().null_count()) as i64; + self.0 += (array.len() - array.null_count()) as i64; Ok(()) } diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 9e738cafd98a..520784fb9a9c 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -31,6 +31,7 @@ use crate::physical_plan::{ SendableRecordBatchStream, Statistics, WindowExpr, }; use arrow::compute::{concat, concat_batches}; +use arrow::datatypes::SchemaBuilder; use arrow::error::ArrowError; use arrow::{ array::ArrayRef, @@ -269,13 +270,14 @@ fn create_schema( input_schema: &Schema, window_expr: &[Arc], ) -> Result { - let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len()); - fields.extend_from_slice(input_schema.fields()); + let capacity = input_schema.fields().len() + window_expr.len(); + let mut builder = SchemaBuilder::with_capacity(capacity); + builder.extend(input_schema.fields().iter().cloned()); // append results to the schema for expr in window_expr { - fields.push(expr.field()?); + builder.push(expr.field()?); } - Ok(Schema::new(fields)) + Ok(builder.finish()) } /// Compute the window aggregate columns diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 54a8272c5f8d..e2035fb22761 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -277,7 +277,7 @@ pub fn make_partition(sz: i32) -> RecordBatch { /// Return a RecordBatch with a single array with row_count sz pub fn make_batch_no_column(sz: usize) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![])); + let schema = Arc::new(Schema::empty()); let options = RecordBatchOptions::new().with_row_count(Option::from(sz)); RecordBatch::try_new_with_options(schema, vec![], &options).unwrap() diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 041497eb74fd..62ca6ae8596a 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -16,7 +16,7 @@ // under the License. use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; -use arrow::datatypes::{Field, Schema}; +use arrow::datatypes::{Field, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; use bytes::Bytes; use datafusion::assert_batches_sorted_eq; @@ -147,15 +147,15 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { columns.into_iter().fold( - RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + RecordBatch::new_empty(Arc::new(Schema::empty())), |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), ) } fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch { - let mut fields = batch.schema().fields().clone(); + let mut fields = SchemaBuilder::from(batch.schema().fields()); fields.push(Field::new(field_name, array.data_type().clone(), true)); - let schema = Arc::new(Schema::new(fields)); + let schema = Arc::new(fields.finish()); let mut columns = batch.columns().to_vec(); columns.push(array); diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 2496331a019c..10f838e24963 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -36,9 +36,9 @@ async fn csv_query_array_agg_distinct() -> Result<()> { // Since ARRAY_AGG(DISTINCT) ordering is nondeterministic, check the schema and contents. assert_eq!( *actual[0].schema(), - Schema::new(vec![Field::new( + Schema::new(vec![Field::new_list( "ARRAYAGG(DISTINCT aggregate_test_100.c2)", - DataType::List(Box::new(Field::new("item", DataType::UInt32, true))), + Field::new("item", DataType::UInt32, true), false ),]) ); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 8351dee7964a..8ccde3ed9ecb 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -1283,7 +1283,7 @@ where make_timestamp_tz_table::(None) } -fn make_timestamp_tz_table(tz: Option) -> Result> +fn make_timestamp_tz_table(tz: Option>) -> Result> where A: ArrowTimestampType, { @@ -1319,8 +1319,8 @@ where } fn make_timestamp_tz_sub_table( - tz1: Option, - tz2: Option, + tz1: Option>, + tz2: Option>, ) -> Result> where A: ArrowTimestampType, diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 725324d2df81..5aa81e047b70 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -190,7 +190,7 @@ async fn window_fn_timestamp_tz() { let ty = batch.column(1).data_type().clone(); assert_eq!( - DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_owned())), + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), ty ); } @@ -232,16 +232,12 @@ async fn parquet_list_columns() { .unwrap(); let schema = Arc::new(Schema::new(vec![ - Field::new( + Field::new_list( "int64_list", - DataType::List(Box::new(Field::new("item", DataType::Int64, true))), - true, - ), - Field::new( - "utf8_list", - DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), + Field::new("item", DataType::Int64, true), true, ), + Field::new_list("utf8_list", Field::new("item", DataType::Utf8, true), true), ])); let sql = "SELECT int64_list, utf8_list FROM list_columns"; diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index b6017f826c76..6218640b9c9f 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -278,9 +278,9 @@ async fn use_between_expression_in_select_query() -> Result<()> { #[tokio::test] async fn query_get_indexed_field() -> Result<()> { let ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(vec![Field::new( + let schema = Arc::new(Schema::new(vec![Field::new_list( "some_list", - DataType::List(Box::new(Field::new("item", DataType::Int64, true))), + Field::new("item", DataType::Int64, true), false, )])); let builder = PrimitiveBuilder::::with_capacity(3); @@ -317,11 +317,11 @@ async fn query_get_indexed_field() -> Result<()> { #[tokio::test] async fn query_nested_get_indexed_field() -> Result<()> { let ctx = SessionContext::new(); - let nested_dt = DataType::List(Box::new(Field::new("item", DataType::Int64, true))); + let nested_dt = DataType::List(Arc::new(Field::new("item", DataType::Int64, true))); // Nested schema of { "some_list": [[i64]] } let schema = Arc::new(Schema::new(vec![Field::new( "some_list", - DataType::List(Box::new(Field::new("item", nested_dt.clone(), true))), + DataType::List(Arc::new(Field::new("item", nested_dt.clone(), true))), false, )])); @@ -380,12 +380,12 @@ async fn query_nested_get_indexed_field() -> Result<()> { #[tokio::test] async fn query_nested_get_indexed_field_on_struct() -> Result<()> { let ctx = SessionContext::new(); - let nested_dt = DataType::List(Box::new(Field::new("item", DataType::Int64, true))); + let nested_dt = DataType::List(Arc::new(Field::new("item", DataType::Int64, true))); // Nested schema of { "some_struct": { "bar": [i64] } } let struct_fields = vec![Field::new("bar", nested_dt.clone(), true)]; let schema = Arc::new(Schema::new(vec![Field::new( "some_struct", - DataType::Struct(struct_fields.clone()), + DataType::Struct(struct_fields.clone().into()), false, )])); diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs index d6fffddcbf23..a1284cf9bbad 100644 --- a/datafusion/core/tests/sql/timestamp.rs +++ b/datafusion/core/tests/sql/timestamp.rs @@ -133,7 +133,7 @@ async fn timestamp_minmax() -> Result<()> { let ctx = SessionContext::new(); let table_a = make_timestamp_tz_table::(None)?; let table_b = - make_timestamp_tz_table::(Some("+00:00".to_owned()))?; + make_timestamp_tz_table::(Some("+00:00".into()))?; ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; @@ -156,10 +156,9 @@ async fn timestamp_coercion() -> Result<()> { { let ctx = SessionContext::new(); let table_a = - make_timestamp_tz_table::(Some("+00:00".to_owned()))?; - let table_b = make_timestamp_tz_table::(Some( - "+00:00".to_owned(), - ))?; + make_timestamp_tz_table::(Some("+00:00".into()))?; + let table_b = + make_timestamp_tz_table::(Some("+00:00".into()))?; ctx.register_table("table_a", table_a)?; ctx.register_table("table_b", table_b)?; @@ -1030,8 +1029,8 @@ async fn test_ts_dt_binary_ops() -> Result<()> { async fn timestamp_sub_with_tz() -> Result<()> { let ctx = SessionContext::new(); let table_a = make_timestamp_tz_sub_table::( - Some("America/Los_Angeles".to_string()), - Some("Europe/Istanbul".to_string()), + Some("America/Los_Angeles".into()), + Some("Europe/Istanbul".into()), )?; ctx.register_table("table_a", table_a)?; diff --git a/datafusion/core/tests/sqllogictests/test_files/arrow_typeof.slt b/datafusion/core/tests/sqllogictests/test_files/arrow_typeof.slt index 18e7fee6c735..b17521aeeaa1 100644 --- a/datafusion/core/tests/sqllogictests/test_files/arrow_typeof.slt +++ b/datafusion/core/tests/sqllogictests/test_files/arrow_typeof.slt @@ -180,10 +180,10 @@ drop table foo statement ok create table foo as select - arrow_cast(100, 'Decimal128(3,2)') as col_d128 + arrow_cast(100, 'Decimal128(5,2)') as col_d128 -- Can't make a decimal 156: -- This feature is not implemented: Can't create a scalar from array of type "Decimal256(3, 2)" - --arrow_cast(100, 'Decimal256(3,2)') as col_d256 + --arrow_cast(100, 'Decimal256(5,2)') as col_d256 ; @@ -195,7 +195,7 @@ SELECT -- arrow_typeof(col_d256), FROM foo; ---- -Decimal128(3, 2) +Decimal128(5, 2) statement ok diff --git a/datafusion/core/tests/user_defined_aggregates.rs b/datafusion/core/tests/user_defined_aggregates.rs index 25183a1b21dd..b36e497f1f9c 100644 --- a/datafusion/core/tests/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined_aggregates.rs @@ -18,6 +18,7 @@ //! This module contains end to end demonstrations of creating //! user defined aggregate functions +use arrow::datatypes::Fields; use std::sync::Arc; use datafusion::{ @@ -151,7 +152,7 @@ impl FirstSelector { } /// Return the schema fields - fn fields() -> Vec { + fn fields() -> Fields { vec![ Field::new("value", DataType::Float64, true), Field::new( @@ -160,6 +161,7 @@ impl FirstSelector { true, ), ] + .into() } fn update(&mut self, val: f64, time: i64) { @@ -201,7 +203,7 @@ impl FirstSelector { /// return this selector as a single scalar (struct) value fn to_scalar(&self) -> ScalarValue { - ScalarValue::Struct(Some(self.to_state()), Box::new(Self::fields())) + ScalarValue::Struct(Some(self.to_state()), Self::fields()) } } diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 968fd26ab0f5..6a4354d20470 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -20,6 +20,7 @@ use crate::{type_coercion::aggregates::*, Signature, TypeSignature, Volatility}; use arrow::datatypes::{DataType, Field}; use datafusion_common::{DataFusionError, Result}; +use std::sync::Arc; use std::{fmt, str::FromStr}; /// Enum of all built-in aggregate functions @@ -145,7 +146,7 @@ pub fn return_type( AggregateFunction::Stddev => stddev_return_type(&coerced_data_types[0]), AggregateFunction::StddevPop => stddev_return_type(&coerced_data_types[0]), AggregateFunction::Avg => avg_return_type(&coerced_data_types[0]), - AggregateFunction::ArrayAgg => Ok(DataType::List(Box::new(Field::new( + AggregateFunction::ArrayAgg => Ok(DataType::List(Arc::new(Field::new( "item", coerced_data_types[0].clone(), true, diff --git a/datafusion/expr/src/field_util.rs b/datafusion/expr/src/field_util.rs index 1bc88a54e8c4..985e92d437cf 100644 --- a/datafusion/expr/src/field_util.rs +++ b/datafusion/expr/src/field_util.rs @@ -41,7 +41,7 @@ pub fn get_indexed_field(data_type: &DataType, key: &ScalarValue) -> Result Err(DataFusionError::Plan(format!( "Field {s} not found in struct" ))), - Some(f) => Ok(f.clone()), + Some(f) => Ok(f.as_ref().clone()), } } } diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index 092ecd226c35..3b335b477c3f 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -24,7 +24,7 @@ use crate::{ array_expressions, conditional_expressions, struct_expressions, Accumulator, BuiltinScalarFunction, Signature, TypeSignature, }; -use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; +use arrow::datatypes::{DataType, Field, Fields, IntervalUnit, TimeUnit}; use datafusion_common::{DataFusionError, Result}; use std::sync::Arc; @@ -112,7 +112,7 @@ pub fn return_type( // Some built-in functions' return type depends on the incoming type. match fun { BuiltinScalarFunction::MakeArray => Ok(DataType::FixedSizeList( - Box::new(Field::new("item", input_expr_types[0].clone(), true)), + Arc::new(Field::new("item", input_expr_types[0].clone(), true)), input_expr_types.len() as i32, )), BuiltinScalarFunction::Ascii => Ok(DataType::Int32), @@ -233,7 +233,7 @@ pub fn return_type( } BuiltinScalarFunction::Now => Ok(DataType::Timestamp( TimeUnit::Nanosecond, - Some("+00:00".to_owned()), + Some("+00:00".into()), )), BuiltinScalarFunction::CurrentDate => Ok(DataType::Date32), BuiltinScalarFunction::CurrentTime => Ok(DataType::Time64(TimeUnit::Nanosecond)), @@ -244,10 +244,10 @@ pub fn return_type( BuiltinScalarFunction::Upper => utf8_to_str_type(&input_expr_types[0], "upper"), BuiltinScalarFunction::RegexpMatch => Ok(match input_expr_types[0] { DataType::LargeUtf8 => { - DataType::List(Box::new(Field::new("item", DataType::LargeUtf8, true))) + DataType::List(Arc::new(Field::new("item", DataType::LargeUtf8, true))) } DataType::Utf8 => { - DataType::List(Box::new(Field::new("item", DataType::Utf8, true))) + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) } DataType::Null => DataType::Null, _ => { @@ -263,7 +263,7 @@ pub fn return_type( _ => Ok(DataType::Float64), }, - BuiltinScalarFunction::Struct => Ok(DataType::Struct(vec![])), + BuiltinScalarFunction::Struct => Ok(DataType::Struct(Fields::empty())), BuiltinScalarFunction::Atan2 => match &input_expr_types[0] { DataType::Float32 => Ok(DataType::Float32), @@ -515,7 +515,7 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature { ]), TypeSignature::Exact(vec![ DataType::Utf8, - DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".to_owned())), + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), ]), ], fun.volatility(), diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 421ad0f4457c..cd507ec4080d 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1058,7 +1058,7 @@ pub fn build_join_schema( let right_fields_nullable: Vec = right_fields .iter() .map(|f| { - let field = f.field().clone().with_nullable(true); + let field = f.field().as_ref().clone().with_nullable(true); if let Some(q) = f.qualifier() { DFField::from_qualified(q, field) } else { @@ -1866,19 +1866,19 @@ mod tests { fn nested_table_scan(table_name: &str) -> Result { // Create a schema with a scalar field, a list of strings, and a list of structs. - let struct_field = Box::new(Field::new( + let struct_field = Field::new_struct( "item", - DataType::Struct(vec![ + vec![ Field::new("a", DataType::UInt32, false), Field::new("b", DataType::UInt32, false), - ]), + ], false, - )); - let string_field = Box::new(Field::new("item", DataType::Utf8, false)); + ); + let string_field = Field::new("item", DataType::Utf8, false); let schema = Schema::new(vec![ Field::new("scalar", DataType::UInt32, false), - Field::new("strings", DataType::List(string_field), false), - Field::new("structs", DataType::List(struct_field), false), + Field::new_list("strings", string_field, false), + Field::new_list("structs", struct_field, false), ]); table_scan(Some(table_name), &schema, None) diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 8c8cb9bcf241..68f567478079 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -257,7 +257,7 @@ mod tests { #[test] fn test_display_empty_schema() { - let schema = Schema::new(vec![]); + let schema = Schema::empty(); assert_eq!("[]", format!("{}", display_schema(&schema))); } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 6d02c46cc0f5..2d3c1e816ff6 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -592,7 +592,7 @@ mod tests { let metadata = [("key".into(), format!("value {i}"))].into_iter().collect(); - let new_arrow_field = f.field().clone().with_metadata(metadata); + let new_arrow_field = f.field().as_ref().clone().with_metadata(metadata); if let Some(qualifier) = f.qualifier() { DFField::from_qualified(qualifier.clone(), new_arrow_field) } else { diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 296b3b33c960..d3f46472aa4e 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -789,7 +789,7 @@ mod tests { } fn lit_timestamp_nano_utc(ts: i64) -> Expr { - let utc = Some("+0:00".to_string()); + let utc = Some("+0:00".into()); lit(ScalarValue::TimestampNanosecond(Some(ts), utc)) } @@ -803,7 +803,7 @@ mod tests { // this is the type that now() returns fn timestamp_nano_utc_type() -> DataType { - let utc = Some("+0:00".to_string()); + let utc = Some("+0:00".into()); DataType::Timestamp(TimeUnit::Nanosecond, utc) } @@ -957,7 +957,7 @@ mod tests { TimeUnit::Microsecond, TimeUnit::Nanosecond, ] { - let utc = Some("+0:00".to_string()); + let utc = Some("+0:00".into()); // No timezone, utc timezone let (lit_tz_none, lit_tz_utc) = match time_unit { TimeUnit::Second => ( @@ -1054,7 +1054,7 @@ mod tests { // int64 to list expect_cast( ScalarValue::Int64(Some(12345)), - DataType::List(Box::new(Field::new("f", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("f", DataType::Int32, true))), ExpectedCast::NoValue, ); } diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index 083671c6b55d..fcb8e914ff89 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -210,9 +210,9 @@ impl AggregateExpr for ApproxPercentileCont { DataType::Float64, false, ), - Field::new( + Field::new_list( format_state_name(&self.name, "centroids"), - DataType::List(Box::new(Field::new("item", DataType::Float64, true))), + Field::new("item", DataType::Float64, true), false, ), ]) diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index e68603ae9bf8..35f6caa643a8 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -56,13 +56,9 @@ impl AggregateExpr for ArrayAgg { } fn field(&self) -> Result { - Ok(Field::new( + Ok(Field::new_list( &self.name, - DataType::List(Box::new(Field::new( - "item", - self.input_data_type.clone(), - true, - ))), + Field::new("item", self.input_data_type.clone(), true), false, )) } @@ -74,13 +70,9 @@ impl AggregateExpr for ArrayAgg { } fn state_fields(&self) -> Result> { - Ok(vec![Field::new( + Ok(vec![Field::new_list( format_state_name(&self.name, "array_agg"), - DataType::List(Box::new(Field::new( - "item", - self.input_data_type.clone(), - true, - ))), + Field::new("item", self.input_data_type.clone(), true), false, )]) } @@ -209,7 +201,7 @@ mod tests { DataType::Int32, ), ]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); let l2 = ScalarValue::new_list( @@ -223,7 +215,7 @@ mod tests { DataType::Int32, ), ]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); let l3 = ScalarValue::new_list( @@ -231,26 +223,26 @@ mod tests { Some(vec![ScalarValue::from(9i32)]), DataType::Int32, )]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); let list = ScalarValue::new_list( Some(vec![l1.clone(), l2.clone(), l3.clone()]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap(); generic_test_op!( array, - DataType::List(Box::new(Field::new( + DataType::List(Arc::new(Field::new_list( "item", - DataType::List(Box::new(Field::new("item", DataType::Int32, true,))), + Field::new("item", DataType::Int32, true), true, ))), ArrayAgg, list, - DataType::List(Box::new(Field::new("item", DataType::Int32, true,))) + DataType::List(Arc::new(Field::new("item", DataType::Int32, true,))) ) } } diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 29a9ed718205..d292e8baca86 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -65,13 +65,9 @@ impl AggregateExpr for DistinctArrayAgg { } fn field(&self) -> Result { - Ok(Field::new( + Ok(Field::new_list( &self.name, - DataType::List(Box::new(Field::new( - "item", - self.input_data_type.clone(), - true, - ))), + Field::new("item", self.input_data_type.clone(), true), false, )) } @@ -83,13 +79,9 @@ impl AggregateExpr for DistinctArrayAgg { } fn state_fields(&self) -> Result> { - Ok(vec![Field::new( + Ok(vec![Field::new_list( format_state_name(&self.name, "distinct_array_agg"), - DataType::List(Box::new(Field::new( - "item", - self.input_data_type.clone(), - true, - ))), + Field::new("item", self.input_data_type.clone(), true), false, )]) } @@ -245,7 +237,7 @@ mod tests { DataType::Int32, ), ]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); // [[6], [7, 8]] @@ -260,7 +252,7 @@ mod tests { DataType::Int32, ), ]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); // [[9]] @@ -269,12 +261,12 @@ mod tests { Some(vec![ScalarValue::from(9i32)]), DataType::Int32, )]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); let list = ScalarValue::new_list( Some(vec![l1.clone(), l2.clone(), l3.clone()]), - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), ); // Duplicate l1 in the input array and check that it is deduped in the output. @@ -283,9 +275,9 @@ mod tests { check_distinct_array_agg( array, list, - DataType::List(Box::new(Field::new( + DataType::List(Arc::new(Field::new_list( "item", - DataType::List(Box::new(Field::new("item", DataType::Int32, true))), + Field::new("item", DataType::Int32, true), true, ))), ) diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 7ad484ac4b92..eaf30ea10f82 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -189,7 +189,7 @@ impl Accumulator for AvgAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; - self.count += (values.len() - values.data().null_count()) as u64; + self.count += (values.len() - values.null_count()) as u64; self.sum = self .sum .add(&sum::sum_batch(values, &self.sum_data_type)?)?; @@ -198,8 +198,8 @@ impl Accumulator for AvgAccumulator { fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; - self.count -= (values.len() - values.data().null_count()) as u64; - let delta = sum_batch(values, &self.sum_data_type)?; + self.count -= (values.len() - values.null_count()) as u64; + let delta = sum_batch(values, &self.sum.get_datatype())?; self.sum = self.sum.sub(&delta)?; Ok(()) } @@ -269,7 +269,7 @@ impl RowAccumulator for AvgRowAccumulator { ) -> Result<()> { let values = &values[0]; // count - let delta = (values.len() - values.data().null_count()) as u64; + let delta = (values.len() - values.null_count()) as u64; accessor.add_u64(self.state_index(), delta); // sum diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 91415b06866d..d3b85d706d7b 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -324,13 +324,9 @@ mod tests { assert!(result_agg_phy_exprs.as_any().is::()); assert_eq!("c1", result_agg_phy_exprs.name()); assert_eq!( - Field::new( + Field::new_list( "c1", - DataType::List(Box::new(Field::new( - "item", - data_type.clone(), - true, - ))), + Field::new("item", data_type.clone(), true,), false, ), result_agg_phy_exprs.field().unwrap() @@ -367,13 +363,9 @@ mod tests { assert!(result_distinct.as_any().is::()); assert_eq!("c1", result_distinct.name()); assert_eq!( - Field::new( + Field::new_list( "c1", - DataType::List(Box::new(Field::new( - "item", - data_type.clone(), - true, - ))), + Field::new("item", data_type.clone(), true,), false, ), result_agg_phy_exprs.field().unwrap() diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 03a5c60a94c0..ae581b5c1ad9 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -81,7 +81,7 @@ fn null_count_for_multiple_cols(values: &[ArrayRef]) -> usize { if values.len() > 1 { let result_bool_buf: Option = values .iter() - .map(|a| a.data().nulls()) + .map(|a| a.nulls()) .fold(None, |acc, b| match (acc, b) { (Some(acc), Some(b)) => Some(acc.bitand(b.inner())), (Some(acc), None) => Some(acc), diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index 078e38ceb09d..be62dcea6c11 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -69,13 +69,9 @@ impl AggregateExpr for DistinctCount { } fn state_fields(&self) -> Result> { - Ok(vec![Field::new( + Ok(vec![Field::new_list( format_state_name(&self.name, "count distinct"), - DataType::List(Box::new(Field::new( - "item", - self.state_data_type.clone(), - true, - ))), + Field::new("item", self.state_data_type.clone(), true), false, )]) } diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs index c933f1f75b59..e42e91c3a295 100644 --- a/datafusion/physical-expr/src/aggregate/median.rs +++ b/datafusion/physical-expr/src/aggregate/median.rs @@ -72,7 +72,7 @@ impl AggregateExpr for Median { fn state_fields(&self) -> Result> { //Intermediate state is a list of the elements we have collected so far let field = Field::new("item", self.data_type.clone(), true); - let data_type = DataType::List(Box::new(field)); + let data_type = DataType::List(Arc::new(field)); Ok(vec![Field::new( format_state_name(&self.name, "median"), diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs index dab2348ba3de..f1815d05a350 100644 --- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs @@ -67,9 +67,9 @@ impl AggregateExpr for DistinctSum { fn state_fields(&self) -> Result> { // State field is a List which stores items to rebuild hash set. - Ok(vec![Field::new( + Ok(vec![Field::new_list( format_state_name(&self.name, "sum distinct"), - DataType::List(Box::new(Field::new("item", self.data_type.clone(), true))), + Field::new("item", self.data_type.clone(), true), false, )]) } diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index c71a6141f0b6..6da65f659214 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -175,7 +175,7 @@ pub fn make_now( move |_arg| { Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( now_ts, - Some("+00:00".to_owned()), + Some("+00:00".into()), ))) } } @@ -354,7 +354,7 @@ pub fn date_bin(args: &[ColumnarValue]) -> Result { // Default to unix EPOCH let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( Some(0), - Some("+00:00".to_owned()), + Some("+00:00".into()), )); date_bin_impl(&args[0], &args[1], &origin) } else if args.len() == 3 { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 77a3a8b4b3a0..001614f4da9b 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -1247,8 +1247,10 @@ macro_rules! ts_sub_op { let prim_array_rhs = $caster(&$rhs)?; let ret: PrimitiveArray<$type_out> = arrow::compute::try_binary(prim_array_lhs, prim_array_rhs, |ts1, ts2| { - let (parsed_lhs_tz, parsed_rhs_tz) = - (parse_timezones($lhs_tz)?, parse_timezones($rhs_tz)?); + let (parsed_lhs_tz, parsed_rhs_tz) = ( + parse_timezones($lhs_tz.as_deref())?, + parse_timezones($rhs_tz.as_deref())?, + ); let (naive_lhs, naive_rhs) = calculate_naives::<$mode>( ts1.mul_wrapping($coef), parsed_lhs_tz, diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs index 57cf6a1cf80d..836ea93450be 100644 --- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs +++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs @@ -114,16 +114,16 @@ where { let left_values = left.values(); let right_values = right.values(); - let left_data = left.data(); - let right_data = right.data(); + let left_nulls = left.nulls(); + let right_nulls = right.nulls(); - let array_len = left_data.len().min(right_data.len()); + let array_len = left.len().min(right.len()); let distinct = arrow_buffer::MutableBuffer::collect_bool(array_len, |i| { op( left_values[i], right_values[i], - left_data.is_null(i), - right_data.is_null(i), + left_nulls.map(|x| x.is_null(i)).unwrap_or_default(), + right_nulls.map(|x| x.is_null(i)).unwrap_or_default(), ) }); let array_data = ArrayData::builder(arrow_schema::DataType::Boolean) diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs index e0f0e884a1f6..c07641796aa4 100644 --- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs +++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs @@ -213,9 +213,9 @@ mod tests { } fn list_schema(col: &str) -> Schema { - Schema::new(vec![Field::new( + Schema::new(vec![Field::new_list( col, - DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), + Field::new("item", DataType::Utf8, true), true, )]) } @@ -329,15 +329,11 @@ mod tests { let struct_col = "s"; let fields = vec![ Field::new("foo", DataType::Int64, true), - Field::new( - "bar", - DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), - true, - ), + Field::new_list("bar", Field::new("item", DataType::Utf8, true), true), ]; let schema = Schema::new(vec![Field::new( struct_col, - DataType::Struct(fields.clone()), + DataType::Struct(fields.clone().into()), true, )]); let struct_col = build_struct(fields, list_of_tuples.clone()); @@ -409,11 +405,7 @@ mod tests { fn get_indexed_field_list_out_of_bounds() { let fields = vec![ Field::new("id", DataType::Int64, true), - Field::new( - "a", - DataType::List(Box::new(Field::new("item", DataType::Float64, true))), - true, - ), + Field::new_list("a", Field::new("item", DataType::Float64, true), true), ]; let schema = Schema::new(fields); diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 403768aa39e7..013169ccf785 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -145,7 +145,7 @@ mod tests { #[test] fn literal_bounds_analysis() -> Result<()> { - let schema = Schema::new(vec![]); + let schema = Schema::empty(); let context = AnalysisContext::new(&schema, vec![]); let literal_expr = lit(42i32); diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index b60877f6ef90..f2297a535584 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -2798,7 +2798,7 @@ mod tests { assert_eq!( expr.data_type(&schema)?, // type equals to a common coercion - DataType::FixedSizeList(Box::new(Field::new("item", expected_type, true)), 2) + DataType::FixedSizeList(Arc::new(Field::new("item", expected_type, true)), 2) ); // evaluate works @@ -2860,7 +2860,7 @@ mod tests { // type is correct assert_eq!( expr.data_type(&schema)?, - DataType::List(Box::new(Field::new("item", DataType::Utf8, true))) + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) ); // evaluate works @@ -2899,7 +2899,7 @@ mod tests { // type is correct assert_eq!( expr.data_type(&schema)?, - DataType::List(Box::new(Field::new("item", DataType::Utf8, true))) + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) ); // evaluate works diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index e2588cc2aa81..04b4393676f0 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -280,13 +280,13 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { /// * `mask` - Boolean values used to determine where to put the `truthy` values /// * `truthy` - All values of this array are to scatter according to `mask` into final result. fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result { - let truthy = truthy.data(); + let truthy = truthy.to_data(); // update the mask so that any null values become false // (SlicesIterator doesn't respect nulls) let mask = and_kleene(mask, &is_not_null(mask)?)?; - let mut mutable = MutableArrayData::new(vec![truthy], true, mask.len()); + let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len()); // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to // fill with falsy values diff --git a/datafusion/physical-expr/src/type_coercion.rs b/datafusion/physical-expr/src/type_coercion.rs index 8e55b739f373..7fe002a85906 100644 --- a/datafusion/physical-expr/src/type_coercion.rs +++ b/datafusion/physical-expr/src/type_coercion.rs @@ -68,6 +68,7 @@ mod tests { use super::*; use crate::expressions::col; use arrow::datatypes::{DataType, Field, Schema}; + use arrow_schema::Fields; use datafusion_common::DataFusionError; use datafusion_expr::Volatility; @@ -79,7 +80,7 @@ mod tests { t.iter() .enumerate() .map(|(i, t)| Field::new(format!("c{i}"), t.clone(), true)) - .collect(), + .collect::(), ) }; diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index e95aa28b37b3..1dd7a2609ae1 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -839,10 +839,7 @@ mod tests { }, ]; let finer = Some(&finer[..]); - let empty_schema = &Arc::new(Schema { - fields: vec![], - metadata: Default::default(), - }); + let empty_schema = &Arc::new(Schema::empty()); assert!(ordering_satisfy(finer, crude, || { EquivalenceProperties::new(empty_schema.clone()) })); diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index e2dfd52daf71..8d97d5ebc0b3 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -159,7 +159,7 @@ fn shift_with_default_value( let value_len = array.len() as i64; if offset == 0 { - Ok(arrow::array::make_array(array.data_ref().clone())) + Ok(array.clone()) } else if offset == i64::MIN || offset.abs() >= value_len { create_empty_array(value, array.data_type(), array.len()) } else { diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 75d28ba97731..eff450c98a67 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -25,7 +25,8 @@ use crate::protobuf::{ PlaceholderNode, RollupNode, }; use arrow::datatypes::{ - DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, TimeUnit, UnionMode, + DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, TimeUnit, + UnionFields, UnionMode, }; use datafusion::execution::registry::FunctionRegistry; use datafusion_common::{ @@ -184,7 +185,7 @@ impl TryFrom<&protobuf::DfField> for DFField { type Error = Error; fn try_from(df_field: &protobuf::DfField) -> Result { - let field = df_field.field.as_ref().required("field")?; + let field: Field = df_field.field.as_ref().required("field")?; Ok(match &df_field.qualifier { Some(q) => DFField::from_qualified(q.relation.clone(), field), @@ -279,7 +280,7 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { parse_i32_to_time_unit(time_unit)?, match timezone.len() { 0 => None, - _ => Some(timezone.to_owned()), + _ => Some(timezone.as_str().into()), }, ), arrow_type::ArrowTypeEnum::Time32(time_unit) => { @@ -298,25 +299,25 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { arrow_type::ArrowTypeEnum::List(list) => { let list_type = list.as_ref().field_type.as_deref().required("field_type")?; - DataType::List(Box::new(list_type)) + DataType::List(Arc::new(list_type)) } arrow_type::ArrowTypeEnum::LargeList(list) => { let list_type = list.as_ref().field_type.as_deref().required("field_type")?; - DataType::LargeList(Box::new(list_type)) + DataType::LargeList(Arc::new(list_type)) } arrow_type::ArrowTypeEnum::FixedSizeList(list) => { let list_type = list.as_ref().field_type.as_deref().required("field_type")?; let list_size = list.list_size; - DataType::FixedSizeList(Box::new(list_type), list_size) + DataType::FixedSizeList(Arc::new(list_type), list_size) } arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct( strct .sub_field_types .iter() - .map(|field| field.try_into()) - .collect::, _>>()?, + .map(Field::try_from) + .collect::>()?, ), arrow_type::ArrowTypeEnum::Union(union) => { let union_mode = protobuf::UnionMode::from_i32(union.union_mode) @@ -325,19 +326,19 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { protobuf::UnionMode::Dense => UnionMode::Dense, protobuf::UnionMode::Sparse => UnionMode::Sparse, }; - let union_types = union + let union_fields = union .union_types .iter() .map(TryInto::try_into) - .collect::, _>>()?; + .collect::, _>>()?; // Default to index based type ids if not provided - let type_ids = match union.type_ids.is_empty() { - true => (0..union_types.len() as i8).collect(), + let type_ids: Vec<_> = match union.type_ids.is_empty() { + true => (0..union_fields.len() as i8).collect(), false => union.type_ids.iter().map(|i| *i as i8).collect(), }; - DataType::Union(union_types, type_ids, union_mode) + DataType::Union(UnionFields::new(type_ids, union_fields), union_mode) } arrow_type::ArrowTypeEnum::Dictionary(dict) => { let key_datatype = dict.as_ref().key.as_deref().required("key")?; @@ -348,7 +349,7 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { let field: Field = map.as_ref().field_type.as_deref().required("field_type")?; let keys_sorted = map.keys_sorted; - DataType::Map(Box::new(field), keys_sorted) + DataType::Map(Arc::new(field), keys_sorted) } }) } @@ -588,7 +589,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { } = &scalar_list; let field: Field = field.as_ref().required("field")?; - let field = Box::new(field); + let field = Arc::new(field); let values: Result, Error> = values.iter().map(|val| val.try_into()).collect(); @@ -643,7 +644,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { let timezone = if v.timezone.is_empty() { None } else { - Some(v.timezone.clone()) + Some(v.timezone.as_str().into()) }; let ts_value = @@ -702,10 +703,10 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { let fields = v .fields .iter() - .map(|f| f.try_into()) - .collect::, _>>()?; + .map(Field::try_from) + .collect::>()?; - Self::Struct(values, Box::new(fields)) + Self::Struct(values, fields) } Value::FixedSizeBinaryValue(v) => { Self::FixedSizeBinary(v.length, Some(v.clone().values)) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index ff86fc86347d..5aa956c97e0a 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1382,7 +1382,7 @@ mod roundtrip_tests { logical_plan_to_bytes, logical_plan_to_bytes_with_extension_codec, }; use crate::logical_plan::LogicalExtensionCodec; - use arrow::datatypes::{Schema, SchemaRef}; + use arrow::datatypes::{Fields, Schema, SchemaRef, UnionFields}; use arrow::{ array::ArrayRef, datatypes::{ @@ -1444,8 +1444,8 @@ mod roundtrip_tests { roundtrip_json_test(&proto); } - fn new_box_field(name: &str, dt: DataType, nullable: bool) -> Box { - Box::new(Field::new(name, dt, nullable)) + fn new_arc_field(name: &str, dt: DataType, nullable: bool) -> Arc { + Arc::new(Field::new(name, dt, nullable)) } #[tokio::test] @@ -1796,7 +1796,7 @@ mod roundtrip_tests { // Should fail due to empty values ScalarValue::Struct( Some(vec![]), - Box::new(vec![Field::new("item", DataType::Int16, true)]), + vec![Field::new("item", DataType::Int16, true)].into(), ), // Should fail due to inconsistent types in the list ScalarValue::new_list( @@ -1804,14 +1804,14 @@ mod roundtrip_tests { ScalarValue::Int16(None), ScalarValue::Float32(Some(32.0)), ]), - DataType::List(new_box_field("item", DataType::Int16, true)), + DataType::List(new_arc_field("item", DataType::Int16, true)), ), ScalarValue::new_list( Some(vec![ ScalarValue::Float32(None), ScalarValue::Float32(Some(32.0)), ]), - DataType::List(new_box_field("item", DataType::Int16, true)), + DataType::List(new_arc_field("item", DataType::Int16, true)), ), ScalarValue::new_list( Some(vec![ @@ -1824,7 +1824,7 @@ mod roundtrip_tests { Some(vec![ ScalarValue::new_list( None, - DataType::List(new_box_field("level2", DataType::Float32, true)), + DataType::List(new_arc_field("level2", DataType::Float32, true)), ), ScalarValue::new_list( Some(vec![ @@ -1834,20 +1834,20 @@ mod roundtrip_tests { ScalarValue::Float32(Some(2.0)), ScalarValue::Float32(Some(1.0)), ]), - DataType::List(new_box_field("level2", DataType::Float32, true)), + DataType::List(new_arc_field("level2", DataType::Float32, true)), ), ScalarValue::new_list( None, - DataType::List(new_box_field( + DataType::List(new_arc_field( "lists are typed inconsistently", DataType::Int16, true, )), ), ]), - DataType::List(new_box_field( + DataType::List(new_arc_field( "level1", - DataType::List(new_box_field("level2", DataType::Float32, true)), + DataType::List(new_arc_field("level2", DataType::Float32, true)), true, )), ), @@ -1943,19 +1943,19 @@ mod roundtrip_tests { ScalarValue::Time64Nanosecond(None), ScalarValue::TimestampNanosecond(Some(0), None), ScalarValue::TimestampNanosecond(Some(i64::MAX), None), - ScalarValue::TimestampNanosecond(Some(0), Some("UTC".to_string())), + ScalarValue::TimestampNanosecond(Some(0), Some("UTC".into())), ScalarValue::TimestampNanosecond(None, None), ScalarValue::TimestampMicrosecond(Some(0), None), ScalarValue::TimestampMicrosecond(Some(i64::MAX), None), - ScalarValue::TimestampMicrosecond(Some(0), Some("UTC".to_string())), + ScalarValue::TimestampMicrosecond(Some(0), Some("UTC".into())), ScalarValue::TimestampMicrosecond(None, None), ScalarValue::TimestampMillisecond(Some(0), None), ScalarValue::TimestampMillisecond(Some(i64::MAX), None), - ScalarValue::TimestampMillisecond(Some(0), Some("UTC".to_string())), + ScalarValue::TimestampMillisecond(Some(0), Some("UTC".into())), ScalarValue::TimestampMillisecond(None, None), ScalarValue::TimestampSecond(Some(0), None), ScalarValue::TimestampSecond(Some(i64::MAX), None), - ScalarValue::TimestampSecond(Some(0), Some("UTC".to_string())), + ScalarValue::TimestampSecond(Some(0), Some("UTC".into())), ScalarValue::TimestampSecond(None, None), ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(0, 0))), ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(1, 2))), @@ -1998,7 +1998,7 @@ mod roundtrip_tests { DataType::Float32, ), ]), - DataType::List(new_box_field("item", DataType::Float32, true)), + DataType::List(new_arc_field("item", DataType::Float32, true)), ), ScalarValue::Dictionary( Box::new(DataType::Int32), @@ -2017,14 +2017,14 @@ mod roundtrip_tests { ScalarValue::Int32(Some(23)), ScalarValue::Boolean(Some(false)), ]), - Box::new(vec![ + Fields::from(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Boolean, false), ]), ), ScalarValue::Struct( None, - Box::new(vec![ + Fields::from(vec![ Field::new("a", DataType::Int32, true), Field::new("a", DataType::Boolean, false), ]), @@ -2074,10 +2074,10 @@ mod roundtrip_tests { DataType::Utf8, DataType::LargeUtf8, // Recursive list tests - DataType::List(new_box_field("level1", DataType::Boolean, true)), - DataType::List(new_box_field( + DataType::List(new_arc_field("level1", DataType::Boolean, true)), + DataType::List(new_arc_field( "Level1", - DataType::List(new_box_field("level2", DataType::Date32, true)), + DataType::List(new_arc_field("level2", DataType::Date32, true)), true, )), ]; @@ -2136,10 +2136,10 @@ mod roundtrip_tests { DataType::LargeUtf8, DataType::Decimal128(7, 12), // Recursive list tests - DataType::List(new_box_field("Level1", DataType::Binary, true)), - DataType::List(new_box_field( + DataType::List(new_arc_field("Level1", DataType::Binary, true)), + DataType::List(new_arc_field( "Level1", - DataType::List(new_box_field( + DataType::List(new_arc_field( "Level2", DataType::FixedSizeBinary(53), false, @@ -2147,11 +2147,11 @@ mod roundtrip_tests { true, )), // Fixed size lists - DataType::FixedSizeList(new_box_field("Level1", DataType::Binary, true), 4), + DataType::FixedSizeList(new_arc_field("Level1", DataType::Binary, true), 4), DataType::FixedSizeList( - new_box_field( + new_arc_field( "Level1", - DataType::List(new_box_field( + DataType::List(new_arc_field( "Level2", DataType::FixedSizeBinary(53), false, @@ -2161,74 +2161,78 @@ mod roundtrip_tests { 41, ), // Struct Testing - DataType::Struct(vec![ + DataType::Struct(Fields::from(vec![ Field::new("nullable", DataType::Boolean, false), Field::new("name", DataType::Utf8, false), Field::new("datatype", DataType::Binary, false), - ]), - DataType::Struct(vec![ + ])), + DataType::Struct(Fields::from(vec![ Field::new("nullable", DataType::Boolean, false), Field::new("name", DataType::Utf8, false), Field::new("datatype", DataType::Binary, false), Field::new( "nested_struct", - DataType::Struct(vec![ + DataType::Struct(Fields::from(vec![ Field::new("nullable", DataType::Boolean, false), Field::new("name", DataType::Utf8, false), Field::new("datatype", DataType::Binary, false), - ]), + ])), true, ), - ]), + ])), DataType::Union( - vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - ], - vec![7, 5, 3], + UnionFields::new( + vec![7, 5, 3], + vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + ], + ), UnionMode::Sparse, ), DataType::Union( - vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - Field::new( - "nested_struct", - DataType::Struct(vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - ]), - true, - ), - ], - vec![5, 8, 1], + UnionFields::new( + vec![5, 8, 1], + vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + Field::new_struct( + "nested_struct", + vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + ], + true, + ), + ], + ), UnionMode::Dense, ), DataType::Dictionary( Box::new(DataType::Utf8), - Box::new(DataType::Struct(vec![ + Box::new(DataType::Struct(Fields::from(vec![ Field::new("nullable", DataType::Boolean, false), Field::new("name", DataType::Utf8, false), Field::new("datatype", DataType::Binary, false), - ])), + ]))), ), DataType::Dictionary( Box::new(DataType::Decimal128(10, 50)), Box::new(DataType::FixedSizeList( - new_box_field("Level1", DataType::Binary, true), + new_arc_field("Level1", DataType::Binary, true), 4, )), ), DataType::Map( - new_box_field( + new_arc_field( "entries", - DataType::Struct(vec![ + DataType::Struct(Fields::from(vec![ Field::new("keys", DataType::Utf8, false), Field::new("values", DataType::Int32, true), - ]), + ])), true, ), false, @@ -2263,7 +2267,7 @@ mod roundtrip_tests { ScalarValue::TimestampNanosecond(None, None), ScalarValue::List( None, - Box::new(Field::new("item", DataType::Boolean, false)), + Arc::new(Field::new("item", DataType::Boolean, false)), ), ]; diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index b2a30f6cd714..9465c73612b1 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -151,7 +151,7 @@ impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum { DataType::Timestamp(time_unit, timezone) => { Self::Timestamp(protobuf::Timestamp { time_unit: protobuf::TimeUnit::from(time_unit) as i32, - timezone: timezone.to_owned().unwrap_or_default(), + timezone: timezone.as_deref().unwrap_or("").to_string(), }) } DataType::Date32 => Self::Date32(EmptyMessage {}), @@ -188,21 +188,21 @@ impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum { DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct { sub_field_types: struct_fields .iter() - .map(|field| field.try_into()) + .map(|field| field.as_ref().try_into()) .collect::, Error>>()?, }), - DataType::Union(union_types, type_ids, union_mode) => { + DataType::Union(fields, union_mode) => { let union_mode = match union_mode { UnionMode::Sparse => protobuf::UnionMode::Sparse, UnionMode::Dense => protobuf::UnionMode::Dense, }; Self::Union(protobuf::Union { - union_types: union_types + union_types: fields .iter() - .map(|field| field.try_into()) + .map(|(_, field)| field.as_ref().try_into()) .collect::, Error>>()?, union_mode: union_mode.into(), - type_ids: type_ids.iter().map(|x| *x as i32).collect(), + type_ids: fields.iter().map(|(x, _)| x as i32).collect(), }) } DataType::Dictionary(key_type, value_type) => { @@ -262,7 +262,7 @@ impl TryFrom<&Schema> for protobuf::Schema { columns: schema .fields() .iter() - .map(protobuf::Field::try_from) + .map(|f| f.as_ref().try_into()) .collect::, Error>>()?, }) } @@ -276,7 +276,7 @@ impl TryFrom for protobuf::Schema { columns: schema .fields() .iter() - .map(protobuf::Field::try_from) + .map(|f| f.as_ref().try_into()) .collect::, Error>>()?, }) } @@ -287,7 +287,7 @@ impl TryFrom<&DFField> for protobuf::DfField { fn try_from(f: &DFField) -> Result { Ok(Self { - field: Some(f.field().try_into()?), + field: Some(f.field().as_ref().try_into()?), qualifier: f.qualifier().map(|r| protobuf::ColumnRelation { relation: r.to_string(), }), @@ -1045,7 +1045,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { datafusion::scalar::ScalarValue::TimestampMicrosecond(val, tz) => { create_proto_scalar(val.as_ref(), &data_type, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { - timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), + timezone: tz.as_deref().unwrap_or("").to_string(), value: Some( protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue( *s, @@ -1057,7 +1057,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { datafusion::scalar::ScalarValue::TimestampNanosecond(val, tz) => { create_proto_scalar(val.as_ref(), &data_type, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { - timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), + timezone: tz.as_deref().unwrap_or("").to_string(), value: Some( protobuf::scalar_timestamp_value::Value::TimeNanosecondValue( *s, @@ -1090,7 +1090,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { datafusion::scalar::ScalarValue::TimestampSecond(val, tz) => { create_proto_scalar(val.as_ref(), &data_type, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { - timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), + timezone: tz.as_deref().unwrap_or("").to_string(), value: Some( protobuf::scalar_timestamp_value::Value::TimeSecondValue(*s), ), @@ -1100,7 +1100,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { datafusion::scalar::ScalarValue::TimestampMillisecond(val, tz) => { create_proto_scalar(val.as_ref(), &data_type, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { - timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), + timezone: tz.as_deref().unwrap_or("").to_string(), value: Some( protobuf::scalar_timestamp_value::Value::TimeMillisecondValue( *s, @@ -1219,7 +1219,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { let fields = fields .iter() - .map(|f| f.try_into()) + .map(|f| f.as_ref().try_into()) .collect::, _>>()?; Ok(protobuf::ScalarValue { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index d45eeaed6016..8fd57f002b22 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1631,11 +1631,7 @@ mod roundtrip_tests { fn roundtrip_get_indexed_field() -> Result<()> { let fields = vec![ Field::new("id", DataType::Int64, true), - Field::new( - "a", - DataType::List(Box::new(Field::new("item", DataType::Float64, true))), - true, - ), + Field::new_list("a", Field::new("item", DataType::Float64, true), true), ]; let schema = Schema::new(fields); diff --git a/datafusion/sql/src/expr/arrow_cast.rs b/datafusion/sql/src/expr/arrow_cast.rs index 91a05e60f6dc..043d078d8d09 100644 --- a/datafusion/sql/src/expr/arrow_cast.rs +++ b/datafusion/sql/src/expr/arrow_cast.rs @@ -246,7 +246,7 @@ impl<'a> Parser<'a> { self.expect_token(Token::Comma)?; let timezone = self.parse_timezone("Timestamp")?; self.expect_token(Token::RParen)?; - Ok(DataType::Timestamp(time_unit, timezone)) + Ok(DataType::Timestamp(time_unit, timezone.map(Into::into))) } /// Parses the next Time32 (called after `Time32` has been consumed) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index f872aa5676e2..ceec01037425 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -290,7 +290,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLDataType::Array(Some(inner_sql_type)) => { let data_type = self.convert_simple_data_type(inner_sql_type)?; - Ok(DataType::List(Box::new(Field::new( + Ok(DataType::List(Arc::new(Field::new( "field", data_type, true, )))) } @@ -333,7 +333,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Timestamp Without Time zone None }; - Ok(DataType::Timestamp(TimeUnit::Nanosecond, tz)) + Ok(DataType::Timestamp(TimeUnit::Nanosecond, tz.map(Into::into))) } SQLDataType::Date => Ok(DataType::Date32), SQLDataType::Time(None, tz_info) => { diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index b807a9acf7b4..6e67a5d7c24c 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -845,7 +845,7 @@ fn from_substrait_type(dt: &substrait::proto::Type) -> Result { "List type must have inner type".to_string(), ) })?)?; - let field = Box::new(Field::new("list_item", inner_type, true)); + let field = Arc::new(Field::new("list_item", inner_type, true)); match list.type_variation_reference { DEFAULT_CONTAINER_TYPE_REF => Ok(DataType::List(field)), LARGE_CONTAINER_TYPE_REF => Ok(DataType::LargeList(field)), diff --git a/datafusion/substrait/tests/roundtrip_logical_plan.rs b/datafusion/substrait/tests/roundtrip_logical_plan.rs index 0b012ffded06..2a79f61eb83f 100644 --- a/datafusion/substrait/tests/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/roundtrip_logical_plan.rs @@ -492,14 +492,10 @@ mod tests { Field::new("fixed_size_binary_col", DataType::FixedSizeBinary(42), true), Field::new("utf8_col", DataType::Utf8, true), Field::new("large_utf8_col", DataType::LargeUtf8, true), - Field::new( - "list_col", - DataType::List(Box::new(Field::new("item", DataType::Int64, true))), - true, - ), - Field::new( + Field::new_list("list_col", Field::new("item", DataType::Int64, true), true), + Field::new_list( "large_list_col", - DataType::LargeList(Box::new(Field::new("item", DataType::Int64, true))), + Field::new("item", DataType::Int64, true), true, ), Field::new("decimal_128_col", DataType::Decimal128(10, 2), true),