From dcae5794315b67b672d7c057463e83585412945d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 11 Nov 2021 06:59:04 -0500 Subject: [PATCH 1/6] Update tonic/prost deps --- Cargo.toml | 8 ++++++++ ballista-examples/Cargo.toml | 4 ++-- ballista/rust/core/Cargo.toml | 6 +++--- ballista/rust/core/src/client.rs | 14 +++++++++----- ballista/rust/executor/Cargo.toml | 2 +- ballista/rust/scheduler/Cargo.toml | 6 +++--- datafusion-examples/Cargo.toml | 4 ++-- 7 files changed, 28 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c722851e72de..38cdd3ce8f6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,11 @@ exclude = ["python"] [profile.release] lto = true codegen-units = 1 + + +# experimentally try arrow 6.x with upgraded prost/tonic +# from https://github.com/apache/arrow-rs/pull/945 +[patch.crates-io] +arrow = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } +arrow-flight = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } +parquet = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml index a2d2fd65656d..338f69994bfd 100644 --- a/ballista-examples/Cargo.toml +++ b/ballista-examples/Cargo.toml @@ -31,8 +31,8 @@ rust-version = "1.57" [dependencies] datafusion = { path = "../datafusion" } ballista = { path = "../ballista/rust/client", version = "0.6.0"} -prost = "0.8" -tonic = "0.5" +prost = "0.9" +tonic = "0.6" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } futures = "0.3" num_cpus = "1.13.0" diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 16ec07acc98d..f0d5cc7fc9f5 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -35,11 +35,11 @@ async-trait = "0.1.36" futures = "0.3" hashbrown = "0.11" log = "0.4" -prost = "0.8" +prost = "0.9" serde = {version = "1", features = ["derive"]} sqlparser = "0.13" tokio = "1.0" -tonic = "0.5" +tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } chrono = { version = "0.4", default-features = false } @@ -54,4 +54,4 @@ datafusion = { path = "../../../datafusion", version = "6.0.0" } tempfile = "3" [build-dependencies] -tonic-build = { version = "0.5" } +tonic-build = { version = "0.6" } diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index 26c8d22b405d..4b82c34644f9 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -17,7 +17,7 @@ //! Client API for sending requests to executors. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::{collections::HashMap, pin::Pin}; use std::{ convert::{TryFrom, TryInto}, @@ -135,13 +135,16 @@ impl BallistaClient { } struct FlightDataStream { - stream: Streaming, + stream: Mutex>, schema: SchemaRef, } impl FlightDataStream { pub fn new(stream: Streaming, schema: SchemaRef) -> Self { - Self { stream, schema } + Self { + stream: Mutex::new(stream), + schema, + } } } @@ -149,10 +152,11 @@ impl Stream for FlightDataStream { type Item = ArrowResult; fn poll_next( - mut self: std::pin::Pin<&mut Self>, + self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.stream.poll_next_unpin(cx).map(|x| match x { + let mut stream = self.stream.lock().expect("mutex is bad"); + stream.poll_next_unpin(cx).map(|x| match x { Some(flight_data_chunk_result) => { let converted_chunk = flight_data_chunk_result .map_err(|e| ArrowError::from_external_error(Box::new(e))) diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 00f3aab745ff..18896f525ead 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -43,7 +43,7 @@ snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true} tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = "0.5" +tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index a71be406fecc..0bacccf031d8 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -44,13 +44,13 @@ http-body = "0.4" hyper = "0.14.4" log = "0.4" parse_arg = "0.1.3" -prost = "0.8" +prost = "0.9" rand = "0.8" serde = {version = "1", features = ["derive"]} sled_package = { package = "sled", version = "0.34", optional = true } tokio = { version = "1.0", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"], optional = true } -tonic = "0.5" +tonic = "0.6" tower = { version = "0.4" } warp = "0.3" @@ -60,7 +60,7 @@ uuid = { version = "0.8", features = ["v4"] } [build-dependencies] configure_me_codegen = "0.4.1" -tonic-build = { version = "0.5" } +tonic-build = { version = "0.6" } [package.metadata.configure_me.bin] scheduler = "scheduler_config_spec.toml" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index f7ef66d99bde..f536e4094ebd 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -36,8 +36,8 @@ required-features = ["datafusion/avro"] [dev-dependencies] arrow-flight = { version = "6.4.0" } datafusion = { path = "../datafusion" } -prost = "0.8" -tonic = "0.5" +prost = "0.9" +tonic = "0.6" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } futures = "0.3" num_cpus = "1.13.0" From 8dc7e8e26d503e555339ef9242072c884a0b4c74 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Jan 2022 15:59:49 -0500 Subject: [PATCH 2/6] Update to arrow 7.0.0-SNAPSHOT --- Cargo.toml | 9 ++++----- ballista/rust/core/Cargo.toml | 5 +---- ballista/rust/executor/Cargo.toml | 4 ++-- datafusion-examples/Cargo.toml | 2 +- datafusion/Cargo.toml | 4 ++-- 5 files changed, 10 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 38cdd3ce8f6d..39e0c852dacd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,8 @@ lto = true codegen-units = 1 -# experimentally try arrow 6.x with upgraded prost/tonic -# from https://github.com/apache/arrow-rs/pull/945 +# TEMP: patch crates to test arrow [patch.crates-io] -arrow = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } -arrow-flight = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } -parquet = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/try_backporting_deps" } +arrow = { git = "https://github.com/apache/arrow-rs.git", branch = "master" } +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", branch = "master" } +parquet = { git = "https://github.com/apache/arrow-rs.git", branch = "master" } diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index f0d5cc7fc9f5..a1a5ff0b1cc8 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -43,10 +43,7 @@ tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } chrono = { version = "0.4", default-features = false } -# workaround for https://github.com/apache/arrow-datafusion/issues/1498 -# should be able to remove when we update arrow-flight -quote = "=1.0.10" -arrow-flight = { version = "6.4.0" } +arrow-flight = { version = "7.0.0-SNAPSHOT" } datafusion = { path = "../../../datafusion", version = "6.0.0" } diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 18896f525ead..d7fe13d15cbe 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -29,8 +29,8 @@ edition = "2018" snmalloc = ["snmalloc-rs"] [dependencies] -arrow = { version = "6.4.0" } -arrow-flight = { version = "6.4.0" } +arrow = { version = "7.0.0-SNAPSHOT" } +arrow-flight = { version = "7.0.0-SNAPSHOT" } anyhow = "1" async-trait = "0.1.36" ballista-core = { path = "../core", version = "0.6.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index f536e4094ebd..dd5de8d320b6 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "6.4.0" } +arrow-flight = { version = "7.0.0-SNAPSHOT" } datafusion = { path = "../datafusion" } prost = "0.9" tonic = "0.6" diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index b9192826120e..2b14c53a80cb 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -52,8 +52,8 @@ avro = ["avro-rs", "num-traits"] [dependencies] ahash = { version = "0.7", default-features = false } hashbrown = { version = "0.11", features = ["raw"] } -arrow = { version = "6.4.0", features = ["prettyprint"] } -parquet = { version = "6.4.0", features = ["arrow"] } +arrow = { version = "7.0.0-SNAPSHOT", features = ["prettyprint"] } +parquet = { version = "7.0.0-SNAPSHOT", features = ["arrow"] } sqlparser = "0.13" paste = "^1.0" num_cpus = "1.13.0" From 777a6f7d2a1b09ff35f6e9b7e24076d7cd817f36 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Jan 2022 16:06:47 -0500 Subject: [PATCH 3/6] Update datafusion and tests for arrow changes --- ballista/rust/core/proto/ballista.proto | 8 ++ .../rust/core/src/serde/logical_plan/mod.rs | 89 +++++++++++-------- .../core/src/serde/logical_plan/to_proto.rs | 37 ++++++-- ballista/rust/core/src/serde/mod.rs | 20 ++++- datafusion-cli/Cargo.toml | 2 +- .../src/physical_plan/file_format/csv.rs | 2 + .../physical_plan/sort_preserving_merge.rs | 33 +++++-- datafusion/src/test_util.rs | 8 +- datafusion/tests/parquet_pruning.rs | 4 +- datafusion/tests/sql/explain_analyze.rs | 20 +++-- datafusion/tests/sql/select.rs | 4 +- datafusion/tests/user_defined_plan.rs | 4 +- 12 files changed, 159 insertions(+), 72 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 493fb97b82b1..3ea0d1977ffa 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -1011,6 +1011,7 @@ enum TimeUnit{ enum IntervalUnit{ YearMonth = 0; DayTime = 1; + MonthDayNano = 2; } message Decimal{ @@ -1036,11 +1037,18 @@ message Struct{ repeated Field sub_field_types = 1; } +enum UnionMode{ + sparse = 0; + dense = 1; +} + message Union{ repeated Field union_types = 1; + UnionMode union_mode = 2; } + message ScalarListValue{ ScalarType datatype = 1; repeated ScalarValue values = 2; diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index a0f481a80325..94d4e8ec5c61 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -24,6 +24,7 @@ mod roundtrip_tests { use super::super::{super::error::Result, protobuf}; use crate::error::BallistaError; use core::panic; + use datafusion::arrow::datatypes::UnionMode; use datafusion::logical_plan::Repartition; use datafusion::{ arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}, @@ -413,25 +414,31 @@ mod roundtrip_tests { true, ), ]), - DataType::Union(vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - ]), - DataType::Union(vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - Field::new( - "nested_struct", - DataType::Struct(vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - ]), - true, - ), - ]), + DataType::Union( + vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + ], + UnionMode::Dense, + ), + DataType::Union( + vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + Field::new( + "nested_struct", + DataType::Struct(vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + ]), + true, + ), + ], + UnionMode::Sparse, + ), DataType::Dictionary( Box::new(DataType::Utf8), Box::new(DataType::Struct(vec![ @@ -558,25 +565,31 @@ mod roundtrip_tests { true, ), ]), - DataType::Union(vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - ]), - DataType::Union(vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - Field::new( - "nested_struct", - DataType::Struct(vec![ - Field::new("nullable", DataType::Boolean, false), - Field::new("name", DataType::Utf8, false), - Field::new("datatype", DataType::Binary, false), - ]), - true, - ), - ]), + DataType::Union( + vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + ], + UnionMode::Sparse, + ), + DataType::Union( + vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + Field::new( + "nested_struct", + DataType::Struct(vec![ + Field::new("nullable", DataType::Boolean, false), + Field::new("name", DataType::Utf8, false), + Field::new("datatype", DataType::Binary, false), + ]), + true, + ), + ], + UnionMode::Dense, + ), DataType::Dictionary( Box::new(DataType::Utf8), Box::new(DataType::Struct(vec![ diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 47b5df47cd73..6726810ecb2e 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -22,7 +22,7 @@ use super::super::proto_error; use crate::serde::{byte_to_string, protobuf, BallistaError}; use datafusion::arrow::datatypes::{ - DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, + DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode, }; use datafusion::datasource::file_format::avro::AvroFormat; use datafusion::datasource::file_format::csv::CsvFormat; @@ -60,6 +60,7 @@ impl protobuf::IntervalUnit { match interval_unit { IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth, IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime, + IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano, } } @@ -71,6 +72,7 @@ impl protobuf::IntervalUnit { Some(interval_unit) => Ok(match interval_unit { protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth, protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime, + protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano, }), None => Err(proto_error( "Error converting i32 to DateUnit: Passed invalid variant", @@ -239,12 +241,22 @@ impl TryInto for &protobuf::ArrowType { DataType::Struct(fields) } protobuf::arrow_type::ArrowTypeEnum::Union(union) => { + let union_mode = protobuf::UnionMode::from_i32(union.union_mode) + .ok_or_else(|| { + proto_error( + "Protobuf deserialization error: Unknown union mode type", + ) + })?; + let union_mode = match union_mode { + protobuf::UnionMode::Dense => UnionMode::Dense, + protobuf::UnionMode::Sparse => UnionMode::Sparse, + }; let union_types = union .union_types .iter() .map(|field| field.try_into()) .collect::, _>>()?; - DataType::Union(union_types) + DataType::Union(union_types, union_mode) } protobuf::arrow_type::ArrowTypeEnum::Dictionary(boxed_dict) => { let dict_ref = boxed_dict.as_ref(); @@ -346,12 +358,19 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { .map(|field| field.into()) .collect::>(), }), - DataType::Union(union_types) => ArrowTypeEnum::Union(protobuf::Union { - union_types: union_types - .iter() - .map(|field| field.into()) - .collect::>(), - }), + DataType::Union(union_types, union_mode) => { + let union_mode = match union_mode { + UnionMode::Sparse => protobuf::UnionMode::Sparse, + UnionMode::Dense => protobuf::UnionMode::Dense, + }; + ArrowTypeEnum::Union(protobuf::Union { + union_types: union_types + .iter() + .map(|field| field.into()) + .collect::>(), + union_mode: union_mode.into(), + }) + } DataType::Dictionary(key_type, value_type) => { ArrowTypeEnum::Dictionary(Box::new(protobuf::Dictionary { key: Some(Box::new(key_type.as_ref().into())), @@ -495,7 +514,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { | DataType::FixedSizeList(_, _) | DataType::LargeList(_) | DataType::Struct(_) - | DataType::Union(_) + | DataType::Union(_, _) | DataType::Dictionary(_, _) | DataType::Map(_, _) | DataType::Decimal(_, _) => { diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index f5442c40e660..8c7c2f7c28e2 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -20,6 +20,7 @@ use std::{convert::TryInto, io::Cursor}; +use datafusion::arrow::datatypes::UnionMode; use datafusion::logical_plan::{JoinConstraint, JoinType, Operator}; use datafusion::physical_plan::aggregates::AggregateFunction; use datafusion::physical_plan::window_functions::BuiltInWindowFunction; @@ -242,13 +243,24 @@ impl TryInto .map(|field| field.try_into()) .collect::, _>>()?, ), - arrow_type::ArrowTypeEnum::Union(union) => DataType::Union( - union + arrow_type::ArrowTypeEnum::Union(union) => { + let union_mode = protobuf::UnionMode::from_i32(union.union_mode) + .ok_or_else(|| { + proto_error( + "Protobuf deserialization error: Unknown union mode type", + ) + })?; + let union_mode = match union_mode { + protobuf::UnionMode::Dense => UnionMode::Dense, + protobuf::UnionMode::Sparse => UnionMode::Sparse, + }; + let union_types = union .union_types .iter() .map(|field| field.try_into()) - .collect::, _>>()?, - ), + .collect::, _>>()?; + DataType::Union(union_types, union_mode) + } arrow_type::ArrowTypeEnum::Dictionary(dict) => { let pb_key_datatype = dict .as_ref() diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 394bd1e3a29b..35b202353f6f 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -31,5 +31,5 @@ clap = "2.33" rustyline = "9.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion", version = "6.0.0" } -arrow = { version = "6.4.0" } +arrow = { version = "7.0.0-SNAPSHOT" } ballista = { path = "../ballista/rust/client", version = "0.6.0" } diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index efea300bc8ee..f250baa1b36c 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -116,6 +116,7 @@ impl ExecutionPlan for CsvExec { let fun = move |file, remaining: &Option| { let bounds = remaining.map(|x| (0, x + start_line)); + let datetime_format = None; Box::new(csv::Reader::new( file, Arc::clone(&file_schema), @@ -124,6 +125,7 @@ impl ExecutionPlan for CsvExec { batch_size, bounds, file_projection.clone(), + datetime_format, )) as BatchIter }; diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index c90c6531b59b..632658058e3b 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -975,8 +975,12 @@ mod tests { let basic = basic_sort(csv.clone(), sort.clone()).await; let partition = partition_sort(csv, sort).await; - let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); - let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap(); + let basic = arrow::util::pretty::pretty_format_batches(&[basic]) + .unwrap() + .to_string(); + let partition = arrow::util::pretty::pretty_format_batches(&[partition]) + .unwrap() + .to_string(); assert_eq!( basic, partition, @@ -1072,8 +1076,12 @@ mod tests { assert_eq!(basic.num_rows(), 300); assert_eq!(partition.num_rows(), 300); - let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); - let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap(); + let basic = arrow::util::pretty::pretty_format_batches(&[basic]) + .unwrap() + .to_string(); + let partition = arrow::util::pretty::pretty_format_batches(&[partition]) + .unwrap() + .to_string(); assert_eq!(basic, partition); } @@ -1106,9 +1114,12 @@ mod tests { assert_eq!(basic.num_rows(), 300); assert_eq!(merged.iter().map(|x| x.num_rows()).sum::(), 300); - let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); - let partition = - arrow::util::pretty::pretty_format_batches(merged.as_slice()).unwrap(); + let basic = arrow::util::pretty::pretty_format_batches(&[basic]) + .unwrap() + .to_string(); + let partition = arrow::util::pretty::pretty_format_batches(merged.as_slice()) + .unwrap() + .to_string(); assert_eq!(basic, partition); } @@ -1245,8 +1256,12 @@ mod tests { let merged = merged.remove(0); let basic = basic_sort(batches, sort.clone()).await; - let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); - let partition = arrow::util::pretty::pretty_format_batches(&[merged]).unwrap(); + let basic = arrow::util::pretty::pretty_format_batches(&[basic]) + .unwrap() + .to_string(); + let partition = arrow::util::pretty::pretty_format_batches(&[merged]) + .unwrap() + .to_string(); assert_eq!( basic, partition, diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs index f1fb4dba015f..af6650361123 100644 --- a/datafusion/src/test_util.rs +++ b/datafusion/src/test_util.rs @@ -38,7 +38,9 @@ macro_rules! assert_batches_eq { let expected_lines: Vec = $EXPECTED_LINES.iter().map(|&s| s.into()).collect(); - let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap(); + let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) + .unwrap() + .to_string(); let actual_lines: Vec<&str> = formatted.trim().lines().collect(); @@ -72,7 +74,9 @@ macro_rules! assert_batches_sorted_eq { expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable() } - let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap(); + let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) + .unwrap() + .to_string(); // fix for windows: \r\n --> let mut actual_lines: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 194563a240eb..ee27a33f86f2 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -528,7 +528,7 @@ impl ContextWithParquet { .collect() .await .expect("getting input"); - let pretty_input = pretty_format_batches(&input).unwrap(); + let pretty_input = pretty_format_batches(&input).unwrap().to_string(); let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan"); let physical_plan = self @@ -564,7 +564,7 @@ impl ContextWithParquet { let result_rows = results.iter().map(|b| b.num_rows()).sum(); - let pretty_results = pretty_format_batches(&results).unwrap(); + let pretty_results = pretty_format_batches(&results).unwrap().to_string(); let sql = sql.into(); TestOutput { diff --git a/datafusion/tests/sql/explain_analyze.rs b/datafusion/tests/sql/explain_analyze.rs index 47e729038c3b..a9cef73521eb 100644 --- a/datafusion/tests/sql/explain_analyze.rs +++ b/datafusion/tests/sql/explain_analyze.rs @@ -42,7 +42,9 @@ async fn explain_analyze_baseline_metrics() { let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let results = collect(physical_plan.clone()).await.unwrap(); - let formatted = arrow::util::pretty::pretty_format_batches(&results).unwrap(); + let formatted = arrow::util::pretty::pretty_format_batches(&results) + .unwrap() + .to_string(); println!("Query Output:\n\n{}", formatted); assert_metrics!( @@ -548,13 +550,17 @@ async fn explain_analyze_runs_optimizers() { let sql = "EXPLAIN SELECT count(*) from alltypes_plain"; let actual = execute_to_batches(&mut ctx, sql).await; - let actual = arrow::util::pretty::pretty_format_batches(&actual).unwrap(); + let actual = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); assert_contains!(actual, expected); // EXPLAIN ANALYZE should work the same let sql = "EXPLAIN ANALYZE SELECT count(*) from alltypes_plain"; let actual = execute_to_batches(&mut ctx, sql).await; - let actual = arrow::util::pretty::pretty_format_batches(&actual).unwrap(); + let actual = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); assert_contains!(actual, expected); } @@ -760,7 +766,9 @@ async fn csv_explain_analyze() { register_aggregate_csv_by_sql(&mut ctx).await; let sql = "EXPLAIN ANALYZE SELECT count(*), c1 FROM aggregate_test_100 group by c1"; let actual = execute_to_batches(&mut ctx, sql).await; - let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap(); + let formatted = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); // Only test basic plumbing and try to avoid having to change too // many things. explain_analyze_baseline_metrics covers the values @@ -780,7 +788,9 @@ async fn csv_explain_analyze_verbose() { let sql = "EXPLAIN ANALYZE VERBOSE SELECT count(*), c1 FROM aggregate_test_100 group by c1"; let actual = execute_to_batches(&mut ctx, sql).await; - let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap(); + let formatted = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); let verbose_needle = "Output Rows"; assert_contains!(formatted, verbose_needle); diff --git a/datafusion/tests/sql/select.rs b/datafusion/tests/sql/select.rs index 8d0d12f18d1e..cfe0faccf20c 100644 --- a/datafusion/tests/sql/select.rs +++ b/datafusion/tests/sql/select.rs @@ -495,7 +495,9 @@ async fn use_between_expression_in_select_query() -> Result<()> { let sql = "EXPLAIN SELECT c1 BETWEEN 2 AND 3 FROM test"; let actual = execute_to_batches(&mut ctx, sql).await; - let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap(); + let formatted = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); // Only test that the projection exprs arecorrect, rather than entire output let needle = "ProjectionExec: expr=[c1@0 >= 2 AND c1@0 <= 3 as test.c1 BETWEEN Int64(2) AND Int64(3)]"; diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index d3c6083adefb..b603f6a87701 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -94,7 +94,9 @@ use datafusion::logical_plan::{DFSchemaRef, Limit}; async fn exec_sql(ctx: &mut ExecutionContext, sql: &str) -> Result { let df = ctx.sql(sql).await?; let batches = df.collect().await?; - pretty_format_batches(&batches).map_err(DataFusionError::ArrowError) + pretty_format_batches(&batches) + .map_err(DataFusionError::ArrowError) + .map(|d| d.to_string()) } /// Create a test table. From e7003cd66c9b394b450fe6d306dd2f824d5484fd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Jan 2022 16:39:03 -0500 Subject: [PATCH 4/6] fix doc tests --- datafusion/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index df9efafaeb38..aa178618319f 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -57,7 +57,8 @@ //! let results: Vec = df.collect().await?; //! //! // format the results -//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?; +//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)? +//! .to_string(); //! //! let expected = vec![ //! "+---+--------------------------+", @@ -92,7 +93,8 @@ //! let results: Vec = df.collect().await?; //! //! // format the results -//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?; +//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)? +//! .to_string(); //! //! let expected = vec![ //! "+---+----------------+", From 1ddfb0fb6bf5fb0ef1b61181d5750f0649e47d68 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Jan 2022 07:09:00 -0500 Subject: [PATCH 5/6] Update avro support --- datafusion/src/avro_to_arrow/schema.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/src/avro_to_arrow/schema.rs b/datafusion/src/avro_to_arrow/schema.rs index c6eda8017012..2e9a17de38db 100644 --- a/datafusion/src/avro_to_arrow/schema.rs +++ b/datafusion/src/avro_to_arrow/schema.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit}; +use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit, UnionMode}; use crate::error::{DataFusionError, Result}; use arrow::datatypes::Field; use avro_rs::schema::Name; @@ -103,7 +103,7 @@ fn schema_to_field_with_props( .iter() .map(|s| schema_to_field_with_props(s, None, has_nullable, None)) .collect::>>()?; - DataType::Union(fields) + DataType::Union(fields, UnionMode::Dense) } } AvroSchema::Record { name, fields, .. } => { @@ -201,6 +201,7 @@ fn default_field_name(dt: &DataType) -> &str { DataType::Interval(unit) => match unit { IntervalUnit::YearMonth => "intervalyear", IntervalUnit::DayTime => "intervalmonth", + IntervalUnit::MonthDayNano => "intervalmonthdaynano", }, DataType::Binary => "varbinary", DataType::FixedSizeBinary(_) => "fixedsizebinary", @@ -211,7 +212,7 @@ fn default_field_name(dt: &DataType) -> &str { DataType::FixedSizeList(_, _) => "fixed_size_list", DataType::LargeList(_) => "largelist", DataType::Struct(_) => "struct", - DataType::Union(_) => "union", + DataType::Union(_, _) => "union", DataType::Dictionary(_, _) => "map", DataType::Map(_, _) => unimplemented!("Map support not implemented"), DataType::Decimal(_, _) => "decimal", From 4078a8fda04b2c154b5bd342cf282b7ac4bd7545 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 12 Jan 2022 07:14:14 -0500 Subject: [PATCH 6/6] Use released arrow 7.0.0 --- Cargo.toml | 7 ------- ballista/rust/core/Cargo.toml | 2 +- ballista/rust/executor/Cargo.toml | 4 ++-- datafusion-cli/Cargo.toml | 2 +- datafusion-examples/Cargo.toml | 2 +- datafusion/Cargo.toml | 4 ++-- 6 files changed, 7 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 39e0c852dacd..c722851e72de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,10 +33,3 @@ exclude = ["python"] [profile.release] lto = true codegen-units = 1 - - -# TEMP: patch crates to test arrow -[patch.crates-io] -arrow = { git = "https://github.com/apache/arrow-rs.git", branch = "master" } -arrow-flight = { git = "https://github.com/apache/arrow-rs.git", branch = "master" } -parquet = { git = "https://github.com/apache/arrow-rs.git", branch = "master" } diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index a1a5ff0b1cc8..bbf8e274c5cd 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -43,7 +43,7 @@ tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } chrono = { version = "0.4", default-features = false } -arrow-flight = { version = "7.0.0-SNAPSHOT" } +arrow-flight = { version = "7.0.0" } datafusion = { path = "../../../datafusion", version = "6.0.0" } diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index d7fe13d15cbe..c01bb20681db 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -29,8 +29,8 @@ edition = "2018" snmalloc = ["snmalloc-rs"] [dependencies] -arrow = { version = "7.0.0-SNAPSHOT" } -arrow-flight = { version = "7.0.0-SNAPSHOT" } +arrow = { version = "7.0.0" } +arrow-flight = { version = "7.0.0" } anyhow = "1" async-trait = "0.1.36" ballista-core = { path = "../core", version = "0.6.0" } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 35b202353f6f..d5347d8e0009 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -31,5 +31,5 @@ clap = "2.33" rustyline = "9.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion", version = "6.0.0" } -arrow = { version = "7.0.0-SNAPSHOT" } +arrow = { version = "7.0.0" } ballista = { path = "../ballista/rust/client", version = "0.6.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index dd5de8d320b6..24d453e0b1d4 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "7.0.0-SNAPSHOT" } +arrow-flight = { version = "7.0.0" } datafusion = { path = "../datafusion" } prost = "0.9" tonic = "0.6" diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 2b14c53a80cb..46e2cbec56e2 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -52,8 +52,8 @@ avro = ["avro-rs", "num-traits"] [dependencies] ahash = { version = "0.7", default-features = false } hashbrown = { version = "0.11", features = ["raw"] } -arrow = { version = "7.0.0-SNAPSHOT", features = ["prettyprint"] } -parquet = { version = "7.0.0-SNAPSHOT", features = ["arrow"] } +arrow = { version = "7.0.0", features = ["prettyprint"] } +parquet = { version = "7.0.0", features = ["arrow"] } sqlparser = "0.13" paste = "^1.0" num_cpus = "1.13.0"