Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update rust vesion to 1.57 #1395

Merged
merged 5 commits into from
Dec 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
keywords = [ "arrow", "distributed", "query", "sql" ]
edition = "2021"
publish = false
rust-version = "1.56"
rust-version = "1.57"

[dependencies]
datafusion = { path = "../datafusion" }
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
authors = ["Apache Arrow <[email protected]>"]
edition = "2021"
rust-version = "1.56"
rust-version = "1.57"

[dependencies]
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
12 changes: 6 additions & 6 deletions ballista/rust/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitio
#[derive(Debug, Clone)]
pub struct ConfigEntry {
name: String,
description: String,
data_type: DataType,
_description: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know if this was important or not to keep in ballista so rather than removing it I renamed them to start with _ to satisfy clippy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, very much agree with you, _ is a conservative but useful way!

_data_type: DataType,
default_value: Option<String>,
}

impl ConfigEntry {
fn new(
name: String,
description: String,
data_type: DataType,
_description: String,
_data_type: DataType,
default_value: Option<String>,
) -> Self {
Self {
name,
description,
data_type,
_description,
_data_type,
default_value,
}
}
Expand Down
12 changes: 1 addition & 11 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,13 @@ impl From<protobuf::ExecutorMetadata> for ExecutorMeta {
}

/// Summary of executed partition
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Default)]
pub struct PartitionStats {
pub(crate) num_rows: Option<u64>,
pub(crate) num_batches: Option<u64>,
pub(crate) num_bytes: Option<u64>,
}

impl Default for PartitionStats {
fn default() -> Self {
Self {
num_rows: None,
num_batches: None,
num_bytes: None,
}
}
}

impl fmt::Display for PartitionStats {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ type FlightDataReceiver = Receiver<Result<FlightData, Status>>;
#[derive(Clone)]
pub struct BallistaFlightService {
/// Executor
executor: Arc<Executor>,
_executor: Arc<Executor>,
}

impl BallistaFlightService {
pub fn new(executor: Arc<Executor>) -> Self {
Self { executor }
pub fn new(_executor: Arc<Executor>) -> Self {
Self { _executor }
}
}

Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ fn find_unresolved_shuffles(
Ok(plan
.children()
.iter()
.map(|child| find_unresolved_shuffles(child))
.map(find_unresolved_shuffles)
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
license = "Apache-2.0"
publish = false
rust-version = "1.56"
rust-version = "1.57"

[features]
simd = ["datafusion/simd"]
Expand Down
14 changes: 6 additions & 8 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ struct BallistaBenchmarkOpt {
#[structopt(short = "i", long = "iterations", default_value = "3")]
iterations: usize,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
batch_size: usize,

// /// Batch size when reading CSV or Parquet files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters are never read (aka they don't do anything) so removing them from the CLI seemed like a reasonable thing to do

// #[structopt(short = "s", long = "batch-size", default_value = "8192")]
// batch_size: usize,
/// Path to data files
#[structopt(parse(from_os_str), required = true, short = "p", long = "path")]
path: PathBuf,
Expand All @@ -87,10 +86,9 @@ struct BallistaBenchmarkOpt {
#[structopt(short = "f", long = "format", default_value = "csv")]
file_format: String,

/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,

// /// Load the data into a MemTable before executing the query
// #[structopt(short = "m", long = "mem-table")]
// mem_table: bool,
/// Number of partitions to process in parallel
#[structopt(short = "p", long = "partitions", default_value = "2")]
partitions: usize,
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ keywords = [ "arrow", "datafusion", "ballista", "query", "sql" ]
license = "Apache-2.0"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.56"
rust-version = "1.57"

[dependencies]
clap = "2.33"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.56 as builder
FROM rust:1.57 as builder

COPY ./datafusion /usr/src/datafusion

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
keywords = [ "arrow", "query", "sql" ]
edition = "2021"
publish = false
rust-version = "1.56"
rust-version = "1.57"

[[example]]
name = "avro_sql"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ include = [
"Cargo.toml",
]
edition = "2021"
rust-version = "1.56"
rust-version = "1.57"

[lib]
name = "datafusion"
Expand Down
6 changes: 5 additions & 1 deletion datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use arrow::{
};
use chrono::prelude::*;
use chrono::Duration;
use std::borrow::Borrow;

/// given a function `op` that maps a `&str` to a Result of an arrow native type,
/// returns a `PrimitiveArray` after the application
Expand Down Expand Up @@ -77,7 +78,10 @@ where
})?;

// first map is the iterator, second is for the `Option<_>`
array.iter().map(|x| x.map(|x| op(x)).transpose()).collect()
array
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.iter()
.map(|x| x.map(op.borrow()).transpose())
.collect()
}

// given an function that maps a `&str` to a arrow native type,
Expand Down
9 changes: 5 additions & 4 deletions datafusion/src/physical_plan/expressions/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ use super::{format_state_name, sum};
#[derive(Debug)]
pub struct Avg {
name: String,
data_type: DataType,
nullable: bool,
expr: Arc<dyn PhysicalExpr>,
}

Expand Down Expand Up @@ -69,11 +67,14 @@ impl Avg {
name: impl Into<String>,
data_type: DataType,
) -> Self {
// Average is always Float64, but Avg::new() has a data_type
// parameter to keep a consistent signature with the other
// Aggregate expressions.
assert_eq!(data_type, DataType::Float64);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


Self {
name: name.into(),
expr,
data_type,
nullable: true,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ struct SortPreservingMergeStream {
receivers: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,

/// Drop helper for tasks feeding the [`receivers`](Self::receivers)
drop_helper: AbortOnDropMany<()>,
_drop_helper: AbortOnDropMany<()>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used (its drop() impl is used) so change its name


/// For each input stream maintain a dequeue of SortKeyCursor
///
Expand Down Expand Up @@ -379,7 +379,7 @@ struct SortPreservingMergeStream {
impl SortPreservingMergeStream {
fn new(
receivers: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
drop_helper: AbortOnDropMany<()>,
_drop_helper: AbortOnDropMany<()>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
target_batch_size: usize,
Expand All @@ -394,7 +394,7 @@ impl SortPreservingMergeStream {
schema,
cursors,
receivers,
drop_helper,
_drop_helper,
column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
sort_options: expressions.iter().map(|x| x.options).collect(),
target_batch_size,
Expand Down
5 changes: 1 addition & 4 deletions datafusion/src/physical_plan/string_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ where
let string_array = downcast_string_arg!(args[0], "string", T);

// first map is the iterator, second is for the `Option<_>`
Ok(string_array
.iter()
.map(|string| string.map(|s| op(s)))
.collect())
Ok(string_array.iter().map(|string| string.map(&op)).collect())
}

fn handle<'a, F, R>(args: &'a [ColumnarValue], op: F, name: &str) -> Result<ColumnarValue>
Expand Down
10 changes: 1 addition & 9 deletions datafusion/src/physical_plan/windows/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
//! Physical exec for built-in window function expressions.

use crate::error::{DataFusionError, Result};
use crate::logical_plan::window_frames::WindowFrame;
use crate::physical_plan::{
expressions::PhysicalSortExpr,
window_functions::{BuiltInWindowFunction, BuiltInWindowFunctionExpr},
expressions::PhysicalSortExpr, window_functions::BuiltInWindowFunctionExpr,
PhysicalExpr, WindowExpr,
};
use arrow::compute::concat;
Expand All @@ -33,28 +31,22 @@ use std::sync::Arc;
/// A window expr that takes the form of a built in window function
#[derive(Debug)]
pub struct BuiltInWindowExpr {
fun: BuiltInWindowFunction,
expr: Arc<dyn BuiltInWindowFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Option<WindowFrame>,
}

impl BuiltInWindowExpr {
/// create a new built-in window function expression
pub(super) fn new(
fun: BuiltInWindowFunction,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fun and window_frame are used to construct the BuiltInWindowFunctionExpr (so the don't need to be also encoded on the BuildInWindowExpr

expr: Arc<dyn BuiltInWindowFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Option<WindowFrame>,
) -> Self {
Self {
fun,
expr,
partition_by: partition_by.to_vec(),
order_by: order_by.to_vec(),
window_frame,
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ pub fn create_window_expr(
window_frame,
)),
WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr::new(
fun.clone(),
create_built_in_window_expr(fun, args, input_schema, name)?,
partition_by,
order_by,
window_frame,
)),
})
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub enum ScalarValue {
/// large binary
LargeBinary(Option<Vec<u8>>),
/// list of nested ScalarValue (boxed to reduce size_of(ScalarValue))
#[allow(clippy::box_vec)]
#[allow(clippy::box_collection)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh clippy, how wrong you are :)

the benefit is that the overall enum is smaller (as in the size of Vec<ScalarValue> is like 3 pointers (24 bytes) but a Box<Vec<ScalarValue>> is 8 bytes

List(Option<Box<Vec<ScalarValue>>>, Box<DataType>),
/// Date stored as a signed 32bit int
Date32(Option<i32>),
Expand All @@ -87,7 +87,7 @@ pub enum ScalarValue {
/// Interval with DayTime unit
IntervalDayTime(Option<i64>),
/// struct of nested ScalarValue (boxed to reduce size_of(ScalarValue))
#[allow(clippy::box_vec)]
#[allow(clippy::box_collection)]
Struct(Option<Box<Vec<ScalarValue>>>, Box<Vec<Field>>),
}

Expand Down
12 changes: 8 additions & 4 deletions datafusion/src/sql/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub struct CreateExternalTable {
#[derive(Debug, Clone, PartialEq)]
pub enum Statement {
/// ANSI SQL AST node
Statement(SQLStatement),
Statement(Box<SQLStatement>),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// Extension: `CREATE EXTERNAL TABLE`
CreateExternalTable(CreateExternalTable),
}
Expand Down Expand Up @@ -167,13 +167,17 @@ impl<'a> DFParser<'a> {
}
_ => {
// use the native parser
Ok(Statement::Statement(self.parser.parse_statement()?))
Ok(Statement::Statement(Box::from(
self.parser.parse_statement()?,
)))
}
}
}
_ => {
// use the native parser
Ok(Statement::Statement(self.parser.parse_statement()?))
Ok(Statement::Statement(Box::from(
self.parser.parse_statement()?,
)))
}
}
}
Expand All @@ -183,7 +187,7 @@ impl<'a> DFParser<'a> {
if self.parser.parse_keyword(Keyword::EXTERNAL) {
self.parse_create_external_table()
} else {
Ok(Statement::Statement(self.parser.parse_create()?))
Ok(Statement::Statement(Box::from(self.parser.parse_create()?)))
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/test/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ impl ExecutionPlan for BlockingExec {
async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(BlockingStream {
schema: Arc::clone(&self.schema),
refs: Arc::clone(&self.refs),
_refs: Arc::clone(&self.refs),
}))
}

Expand Down Expand Up @@ -577,7 +577,7 @@ pub struct BlockingStream {
schema: SchemaRef,

/// Ref-counting helper to check if the stream are still in memory.
refs: Arc<()>,
_refs: Arc<()>,
}

impl Stream for BlockingStream {
Expand Down
2 changes: 1 addition & 1 deletion dev/docker/ballista-base.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


# Base image extends debian:buster-slim
FROM rust:1.56.0-buster AS builder
FROM rust:1.57.0-buster AS builder

RUN apt update && apt -y install musl musl-dev musl-tools libssl-dev openssl

Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ description = "Build and run queries against data"
readme = "README.md"
license = "Apache-2.0"
edition = "2021"
rust-version = "1.56"
rust-version = "1.57"

[dependencies]
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
Expand Down