From 30dd9c9f892797b9002b8e140baa6af4879ba8c2 Mon Sep 17 00:00:00 2001 From: DreaMer963 Date: Fri, 3 Dec 2021 11:34:32 +0800 Subject: [PATCH 1/5] Update rust vesion to 1.57 --- ballista-examples/Cargo.toml | 2 +- ballista/rust/client/Cargo.toml | 2 +- ballista/rust/scheduler/src/state/mod.rs | 2 +- benchmarks/Cargo.toml | 2 +- datafusion-cli/Cargo.toml | 2 +- datafusion-cli/Dockerfile | 2 +- datafusion-examples/Cargo.toml | 2 +- datafusion/Cargo.toml | 2 +- datafusion/src/physical_plan/datetime_expressions.rs | 6 +++++- datafusion/src/scalar.rs | 4 ++-- datafusion/src/sql/parser.rs | 12 ++++++++---- dev/docker/ballista-base.dockerfile | 2 +- python/Cargo.toml | 2 +- 13 files changed, 25 insertions(+), 17 deletions(-) diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml index 65cdec400e1d..a2d2fd65656d 100644 --- a/ballista-examples/Cargo.toml +++ b/ballista-examples/Cargo.toml @@ -26,7 +26,7 @@ license = "Apache-2.0" keywords = [ "arrow", "distributed", "query", "sql" ] edition = "2021" publish = false -rust-version = "1.56" +rust-version = "1.57" [dependencies] datafusion = { path = "../datafusion" } diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index f444689021d2..7736e949d29f 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-datafusion" repository = "https://github.com/apache/arrow-datafusion" authors = ["Apache Arrow "] edition = "2021" -rust-version = "1.56" +rust-version = "1.57" [dependencies] ballista-core = { path = "../core", version = "0.6.0" } diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 64f5953de8c0..ef6de8312702 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -567,7 +567,7 @@ fn find_unresolved_shuffles( Ok(plan .children() .iter() - .map(|child| find_unresolved_shuffles(child)) + .map(find_unresolved_shuffles) .collect::>>()? .into_iter() .flatten() diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 4fc41b33a7ec..c042778265ab 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -25,7 +25,7 @@ homepage = "https://github.com/apache/arrow-datafusion" repository = "https://github.com/apache/arrow-datafusion" license = "Apache-2.0" publish = false -rust-version = "1.56" +rust-version = "1.57" [features] simd = ["datafusion/simd"] diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 0434f090da01..27227507db4f 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -24,7 +24,7 @@ keywords = [ "arrow", "datafusion", "ballista", "query", "sql" ] license = "Apache-2.0" homepage = "https://github.com/apache/arrow-datafusion" repository = "https://github.com/apache/arrow-datafusion" -rust-version = "1.56" +rust-version = "1.57" [dependencies] clap = "2.33" diff --git a/datafusion-cli/Dockerfile b/datafusion-cli/Dockerfile index fe177b6dcb99..fed14188fded 100644 --- a/datafusion-cli/Dockerfile +++ b/datafusion-cli/Dockerfile @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -FROM rust:1.56 as builder +FROM rust:1.57 as builder COPY ./datafusion /usr/src/datafusion diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 3e8a6ec77f6d..d81100f70ac7 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -26,7 +26,7 @@ license = "Apache-2.0" keywords = [ "arrow", "query", "sql" ] edition = "2021" publish = false -rust-version = "1.56" +rust-version = "1.57" [[example]] name = "avro_sql" diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index fbe84e3ed0a0..73ed9c08ae59 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -31,7 +31,7 @@ include = [ "Cargo.toml", ] edition = "2021" -rust-version = "1.56" +rust-version = "1.57" [lib] name = "datafusion" diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index a776c42f3e9d..d10312798d3f 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -42,6 +42,7 @@ use arrow::{ }; use chrono::prelude::*; use chrono::Duration; +use std::borrow::Borrow; /// given a function `op` that maps a `&str` to a Result of an arrow native type, /// returns a `PrimitiveArray` after the application @@ -77,7 +78,10 @@ where })?; // first map is the iterator, second is for the `Option<_>` - array.iter().map(|x| x.map(|x| op(x)).transpose()).collect() + array + .iter() + .map(|x| x.map(op.borrow()).transpose()) + .collect() } // given an function that maps a `&str` to a arrow native type, diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 4a5a2c34a393..c06ccb1c81f3 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -68,7 +68,7 @@ pub enum ScalarValue { /// large binary LargeBinary(Option>), /// list of nested ScalarValue (boxed to reduce size_of(ScalarValue)) - #[allow(clippy::box_vec)] + #[allow(clippy::box_collection)] List(Option>>, Box), /// Date stored as a signed 32bit int Date32(Option), @@ -87,7 +87,7 @@ pub enum ScalarValue { /// Interval with DayTime unit IntervalDayTime(Option), /// struct of nested ScalarValue (boxed to reduce size_of(ScalarValue)) - #[allow(clippy::box_vec)] + #[allow(clippy::box_collection)] Struct(Option>>, Box>), } diff --git a/datafusion/src/sql/parser.rs b/datafusion/src/sql/parser.rs index 49ae86906c14..335257764442 100644 --- a/datafusion/src/sql/parser.rs +++ b/datafusion/src/sql/parser.rs @@ -85,7 +85,7 @@ pub struct CreateExternalTable { #[derive(Debug, Clone, PartialEq)] pub enum Statement { /// ANSI SQL AST node - Statement(SQLStatement), + Statement(Box), /// Extension: `CREATE EXTERNAL TABLE` CreateExternalTable(CreateExternalTable), } @@ -167,13 +167,17 @@ impl<'a> DFParser<'a> { } _ => { // use the native parser - Ok(Statement::Statement(self.parser.parse_statement()?)) + Ok(Statement::Statement(Box::from( + self.parser.parse_statement()?, + ))) } } } _ => { // use the native parser - Ok(Statement::Statement(self.parser.parse_statement()?)) + Ok(Statement::Statement(Box::from( + self.parser.parse_statement()?, + ))) } } } @@ -183,7 +187,7 @@ impl<'a> DFParser<'a> { if self.parser.parse_keyword(Keyword::EXTERNAL) { self.parse_create_external_table() } else { - Ok(Statement::Statement(self.parser.parse_create()?)) + Ok(Statement::Statement(Box::from(self.parser.parse_create()?))) } } diff --git a/dev/docker/ballista-base.dockerfile b/dev/docker/ballista-base.dockerfile index df4f32c0ce99..cf845e076016 100644 --- a/dev/docker/ballista-base.dockerfile +++ b/dev/docker/ballista-base.dockerfile @@ -23,7 +23,7 @@ # Base image extends debian:buster-slim -FROM rust:1.56.0-buster AS builder +FROM rust:1.57.0-buster AS builder RUN apt update && apt -y install musl musl-dev musl-tools libssl-dev openssl diff --git a/python/Cargo.toml b/python/Cargo.toml index 568f3c7b35d3..974a6140644e 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -25,7 +25,7 @@ description = "Build and run queries against data" readme = "README.md" license = "Apache-2.0" edition = "2021" -rust-version = "1.56" +rust-version = "1.57" [dependencies] tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } From f17bb24420bf439a4614d2aa0eaee39f0d3ff0f4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 3 Dec 2021 08:50:11 -0500 Subject: [PATCH 2/5] Clean ups related to clippy --- datafusion/src/physical_plan/expressions/average.rs | 3 +-- datafusion/src/physical_plan/sort_preserving_merge.rs | 6 +++--- datafusion/src/physical_plan/string_expressions.rs | 5 +---- datafusion/src/test/exec.rs | 4 ++-- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs index 2e218191f668..4ac477220869 100644 --- a/datafusion/src/physical_plan/expressions/average.rs +++ b/datafusion/src/physical_plan/expressions/average.rs @@ -37,8 +37,8 @@ use super::{format_state_name, sum}; #[derive(Debug)] pub struct Avg { name: String, + #[allow(dead_code)] data_type: DataType, - nullable: bool, expr: Arc, } @@ -73,7 +73,6 @@ impl Avg { name: name.into(), expr, data_type, - nullable: true, } } } diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 3f4827ba6a0a..c90c6531b59b 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -346,7 +346,7 @@ struct SortPreservingMergeStream { receivers: Vec>>, /// Drop helper for tasks feeding the [`receivers`](Self::receivers) - drop_helper: AbortOnDropMany<()>, + _drop_helper: AbortOnDropMany<()>, /// For each input stream maintain a dequeue of SortKeyCursor /// @@ -379,7 +379,7 @@ struct SortPreservingMergeStream { impl SortPreservingMergeStream { fn new( receivers: Vec>>, - drop_helper: AbortOnDropMany<()>, + _drop_helper: AbortOnDropMany<()>, schema: SchemaRef, expressions: &[PhysicalSortExpr], target_batch_size: usize, @@ -394,7 +394,7 @@ impl SortPreservingMergeStream { schema, cursors, receivers, - drop_helper, + _drop_helper, column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), sort_options: expressions.iter().map(|x| x.options).collect(), target_batch_size, diff --git a/datafusion/src/physical_plan/string_expressions.rs b/datafusion/src/physical_plan/string_expressions.rs index e6d234fbc2fb..a9e4c2fc54b1 100644 --- a/datafusion/src/physical_plan/string_expressions.rs +++ b/datafusion/src/physical_plan/string_expressions.rs @@ -117,10 +117,7 @@ where let string_array = downcast_string_arg!(args[0], "string", T); // first map is the iterator, second is for the `Option<_>` - Ok(string_array - .iter() - .map(|string| string.map(|s| op(s))) - .collect()) + Ok(string_array.iter().map(|string| string.map(&op)).collect()) } fn handle<'a, F, R>(args: &'a [ColumnarValue], op: F, name: &str) -> Result diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs index fd10b9c3fc90..4a9534feae00 100644 --- a/datafusion/src/test/exec.rs +++ b/datafusion/src/test/exec.rs @@ -549,7 +549,7 @@ impl ExecutionPlan for BlockingExec { async fn execute(&self, _partition: usize) -> Result { Ok(Box::pin(BlockingStream { schema: Arc::clone(&self.schema), - refs: Arc::clone(&self.refs), + _refs: Arc::clone(&self.refs), })) } @@ -577,7 +577,7 @@ pub struct BlockingStream { schema: SchemaRef, /// Ref-counting helper to check if the stream are still in memory. - refs: Arc<()>, + _refs: Arc<()>, } impl Stream for BlockingStream { From bfc372c8dc428b289451c528b1311c6f15f5aeaa Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 3 Dec 2021 08:56:26 -0500 Subject: [PATCH 3/5] remove more unused fields --- ballista/rust/core/src/config.rs | 12 ++++++------ ballista/rust/core/src/serde/scheduler/mod.rs | 12 +----------- datafusion/src/physical_plan/windows/built_in.rs | 10 +--------- datafusion/src/physical_plan/windows/mod.rs | 2 -- 4 files changed, 8 insertions(+), 28 deletions(-) diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs index dcc0bdb06cde..5d7b3c5cacb0 100644 --- a/ballista/rust/core/src/config.rs +++ b/ballista/rust/core/src/config.rs @@ -31,22 +31,22 @@ pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitio #[derive(Debug, Clone)] pub struct ConfigEntry { name: String, - description: String, - data_type: DataType, + _description: String, + _data_type: DataType, default_value: Option, } impl ConfigEntry { fn new( name: String, - description: String, - data_type: DataType, + _description: String, + _data_type: DataType, default_value: Option, ) -> Self { Self { name, - description, - data_type, + _description, + _data_type, default_value, } } diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index a20d955f28b2..8c13c3210eef 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -101,23 +101,13 @@ impl From for ExecutorMeta { } /// Summary of executed partition -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Default)] pub struct PartitionStats { pub(crate) num_rows: Option, pub(crate) num_batches: Option, pub(crate) num_bytes: Option, } -impl Default for PartitionStats { - fn default() -> Self { - Self { - num_rows: None, - num_batches: None, - num_bytes: None, - } - } -} - impl fmt::Display for PartitionStats { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( diff --git a/datafusion/src/physical_plan/windows/built_in.rs b/datafusion/src/physical_plan/windows/built_in.rs index 82040de6ef5c..de627cbcd27c 100644 --- a/datafusion/src/physical_plan/windows/built_in.rs +++ b/datafusion/src/physical_plan/windows/built_in.rs @@ -18,10 +18,8 @@ //! Physical exec for built-in window function expressions. use crate::error::{DataFusionError, Result}; -use crate::logical_plan::window_frames::WindowFrame; use crate::physical_plan::{ - expressions::PhysicalSortExpr, - window_functions::{BuiltInWindowFunction, BuiltInWindowFunctionExpr}, + expressions::PhysicalSortExpr, window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowExpr, }; use arrow::compute::concat; @@ -33,28 +31,22 @@ use std::sync::Arc; /// A window expr that takes the form of a built in window function #[derive(Debug)] pub struct BuiltInWindowExpr { - fun: BuiltInWindowFunction, expr: Arc, partition_by: Vec>, order_by: Vec, - window_frame: Option, } impl BuiltInWindowExpr { /// create a new built-in window function expression pub(super) fn new( - fun: BuiltInWindowFunction, expr: Arc, partition_by: &[Arc], order_by: &[PhysicalSortExpr], - window_frame: Option, ) -> Self { Self { - fun, expr, partition_by: partition_by.to_vec(), order_by: order_by.to_vec(), - window_frame, } } } diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs index 8b182f9a6138..497cbc3c446d 100644 --- a/datafusion/src/physical_plan/windows/mod.rs +++ b/datafusion/src/physical_plan/windows/mod.rs @@ -64,11 +64,9 @@ pub fn create_window_expr( window_frame, )), WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr::new( - fun.clone(), create_built_in_window_expr(fun, args, input_schema, name)?, partition_by, order_by, - window_frame, )), }) } From 31e6db8be62df42fb5888cf00e960bdf61ba0b5a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 3 Dec 2021 08:57:32 -0500 Subject: [PATCH 4/5] Remove some more unused code --- ballista/rust/executor/src/flight_service.rs | 6 +++--- benchmarks/src/bin/tpch.rs | 14 ++++++-------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index 27b1a33b7c87..cf5ab179813b 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -54,12 +54,12 @@ type FlightDataReceiver = Receiver>; #[derive(Clone)] pub struct BallistaFlightService { /// Executor - executor: Arc, + _executor: Arc, } impl BallistaFlightService { - pub fn new(executor: Arc) -> Self { - Self { executor } + pub fn new(_executor: Arc) -> Self { + Self { _executor } } } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 7bc6510ac2ed..2e074d2b5ce7 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -75,10 +75,9 @@ struct BallistaBenchmarkOpt { #[structopt(short = "i", long = "iterations", default_value = "3")] iterations: usize, - /// Batch size when reading CSV or Parquet files - #[structopt(short = "s", long = "batch-size", default_value = "8192")] - batch_size: usize, - + // /// Batch size when reading CSV or Parquet files + // #[structopt(short = "s", long = "batch-size", default_value = "8192")] + // batch_size: usize, /// Path to data files #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] path: PathBuf, @@ -87,10 +86,9 @@ struct BallistaBenchmarkOpt { #[structopt(short = "f", long = "format", default_value = "csv")] file_format: String, - /// Load the data into a MemTable before executing the query - #[structopt(short = "m", long = "mem-table")] - mem_table: bool, - + // /// Load the data into a MemTable before executing the query + // #[structopt(short = "m", long = "mem-table")] + // mem_table: bool, /// Number of partitions to process in parallel #[structopt(short = "p", long = "partitions", default_value = "2")] partitions: usize, From 4c8b3373e72510c6ace61117540fd29c4cc5b5cb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 3 Dec 2021 10:11:56 -0500 Subject: [PATCH 5/5] remove data_type from Average, but keep in consutrctor --- datafusion/src/physical_plan/expressions/average.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs index 4ac477220869..17d3041453d0 100644 --- a/datafusion/src/physical_plan/expressions/average.rs +++ b/datafusion/src/physical_plan/expressions/average.rs @@ -37,8 +37,6 @@ use super::{format_state_name, sum}; #[derive(Debug)] pub struct Avg { name: String, - #[allow(dead_code)] - data_type: DataType, expr: Arc, } @@ -69,10 +67,14 @@ impl Avg { name: impl Into, data_type: DataType, ) -> Self { + // Average is always Float64, but Avg::new() has a data_type + // parameter to keep a consistent signature with the other + // Aggregate expressions. + assert_eq!(data_type, DataType::Float64); + Self { name: name.into(), expr, - data_type, } } }