diff --git a/Cargo.toml b/Cargo.toml index 0aab11698b36..652eca131369 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,5 +38,5 @@ lto = true codegen-units = 1 [patch.crates-io] -arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", rev = "v0.10.0" } -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", rev = "v0.10.1" } +arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", rev = "v0.12.0" } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", rev = "v0.13.2" } diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml index 9bc59bd4c1bd..0410e05ef7c3 100644 --- a/ballista-examples/Cargo.toml +++ b/ballista-examples/Cargo.toml @@ -31,8 +31,8 @@ rust-version = "1.59" [dependencies] datafusion = { path = "../datafusion" } ballista = { path = "../ballista/rust/client", version = "0.6.0"} -prost = "0.9" -tonic = "0.6" +prost = "0.10" +tonic = "0.7" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } futures = "0.3" num_cpus = "1.13.0" diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 83cf1992fd6d..fa4fa8bdba01 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -35,19 +35,19 @@ async-trait = "0.1.41" futures = "0.3" hashbrown = "0.12" log = "0.4" -prost = "0.9" -prost-types = "0.9" +prost = "0.10" +prost-types = "0.10" serde = {version = "1", features = ["derive"]} sqlparser = "0.15" tokio = "1.0" -tonic = "0.6" +tonic = "0.7" uuid = { version = "0.8", features = ["v4"] } chrono = { version = "0.4", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } parse_arg = "0.1.3" -arrow-format = { version = "0.4", features = ["flight-data", "flight-service"] } -arrow = { package = "arrow2", version="0.10", features = ["io_ipc", "io_flight"] } +arrow-format = { version = "0.6", features = ["flight-data", "flight-service"] } +arrow = { package = "arrow2", version="0.12", features = ["io_ipc", "io_flight"] } datafusion = { path = "../../../datafusion", version = "7.0.0" } datafusion-proto = { path = "../../../datafusion-proto", version = "7.0.0" } @@ -58,4 +58,4 @@ parking_lot = "0.12" tempfile = "3" [build-dependencies] -tonic-build = { version = "0.6" } +tonic-build = { version = "0.7" } diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index ed8886f67f9b..de094c11f8b6 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -34,7 +34,7 @@ use arrow_format::flight::data::{FlightData, Ticket}; use arrow_format::flight::service::flight_service_client::FlightServiceClient; use datafusion::arrow::{ datatypes::SchemaRef, - error::{ArrowError, Result as ArrowResult}, + error::{Error as ArrowError, Result as ArrowResult}, }; use datafusion::field_util::SchemaExt; use datafusion::physical_plan::RecordBatchStream; diff --git a/ballista/rust/core/src/error.rs b/ballista/rust/core/src/error.rs index 64b20e37bb51..e330aac42b9b 100644 --- a/ballista/rust/core/src/error.rs +++ b/ballista/rust/core/src/error.rs @@ -23,7 +23,7 @@ use std::{ io, result, }; -use datafusion::arrow::error::ArrowError; +use datafusion::arrow::error::Error as ArrowError; use datafusion::error::DataFusionError; use sqlparser::parser; diff --git a/ballista/rust/core/src/memory_stream.rs b/ballista/rust/core/src/memory_stream.rs index 8b137891791f..6e778edd7530 100644 --- a/ballista/rust/core/src/memory_stream.rs +++ b/ballista/rust/core/src/memory_stream.rs @@ -1 +1,16 @@ - +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. \ No newline at end of file diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 241af8edf2e9..9561d6269a26 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -29,8 +29,8 @@ edition = "2018" snmalloc = ["snmalloc-rs"] [dependencies] -arrow-format = { version = "0.4", features = ["flight-data", "flight-service"] } -arrow = { package = "arrow2", version="0.10", features = ["io_ipc"] } +arrow-format = { version = "0.6", features = ["flight-data", "flight-service"] } +arrow = { package = "arrow2", version="0.12", features = ["io_ipc"] } anyhow = "1" async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.6.0" } @@ -43,7 +43,7 @@ snmalloc-rs = {version = "0.2", optional = true} tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = "0.6" +tonic = "0.7" uuid = { version = "0.8", features = ["v4"] } hyper = "0.14.4" parking_lot = "0.12" diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs index ddb2c972a70f..47946070fd73 100644 --- a/ballista/rust/executor/src/execution_loop.rs +++ b/ballista/rust/executor/src/execution_loop.rs @@ -62,16 +62,14 @@ pub async fn poll_loop // to avoid going in sleep mode between polling let mut active_job = false; - let poll_work_result: anyhow::Result< - tonic::Response, - tonic::Status, - > = scheduler - .poll_work(PollWorkParams { - metadata: Some(executor.metadata.clone()), - can_accept_task: available_tasks_slots.load(Ordering::SeqCst) > 0, - task_status, - }) - .await; + let poll_work_result: Result, tonic::Status> = + scheduler + .poll_work(PollWorkParams { + metadata: Some(executor.metadata.clone()), + can_accept_task: available_tasks_slots.load(Ordering::SeqCst) > 0, + task_status, + }) + .await; let task_status_sender = task_status_sender.clone(); diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index a936768006e7..918a833d2279 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -35,7 +35,7 @@ use arrow_format::flight::data::{ }; use arrow_format::flight::service::flight_service_server::FlightService; use datafusion::arrow::{ - error::ArrowError, io::ipc::read::FileReader, io::ipc::write::WriteOptions, + error::Error as ArrowError, io::ipc::read::FileReader, io::ipc::write::WriteOptions, }; use futures::{Stream, StreamExt}; use log::{info, warn}; diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index 2ff0073756a1..0b5e37f3ccbe 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -44,13 +44,13 @@ http-body = "0.4" hyper = "0.14.4" log = "0.4" parse_arg = "0.1.3" -prost = "0.9" +prost = "0.10" rand = "0.8" serde = {version = "1", features = ["derive"]} sled_package = { package = "sled", version = "0.34", optional = true } tokio = { version = "1.0", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"], optional = true } -tonic = "0.6" +tonic = "0.7" tower = { version = "0.4" } warp = "0.3" parking_lot = "0.12" @@ -62,7 +62,7 @@ uuid = { version = "0.8", features = ["v4"] } [build-dependencies] configure_me_codegen = "0.4.1" -tonic-build = { version = "0.6" } +tonic-build = { version = "0.7" } [package.metadata.configure_me.bin] scheduler = "scheduler_config_spec.toml" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 1b4c08949911..c520db2e5d1c 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -32,7 +32,8 @@ simd = ["datafusion/simd"] snmalloc = ["snmalloc-rs"] [dependencies] -arrow = { package = "arrow2", version="0.10", features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "io_print", "ahash", "compute_merge_sort", "compute", "regex"] } +arrow = { package = "arrow2", version="0.12", features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "io_print", "ahash", "compute_merge_sort", "compute", "regex"] } +parquet2 = "0.13" datafusion = { path = "../datafusion" } ballista = { path = "../ballista/rust/client" } structopt = { version = "0.3", default-features = false } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 7671c78f228b..fae5ba9e247d 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -19,6 +19,7 @@ use arrow::array::ArrayRef; use arrow::chunk::Chunk; +use datafusion::arrow::io::print; use futures::future::join_all; use rand::prelude::*; use std::ops::Div; @@ -31,8 +32,6 @@ use std::{ time::{Instant, SystemTime}, }; -use datafusion::arrow::io::print; - use datafusion::datasource::{ listing::{ListingOptions, ListingTable}, object_store::local::LocalFileSystem, @@ -52,7 +51,8 @@ use datafusion::{ datasource::file_format::parquet::ParquetFormat, record_batch::RecordBatch, }; -use arrow::io::parquet::write::{Compression, Version, WriteOptions}; +use arrow::io::parquet::write::Version; +use parquet2::compression::CompressionOptions; use ballista::prelude::{ BallistaConfig, BallistaContext, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, }; @@ -665,14 +665,15 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { match opt.file_format.as_str() { "csv" => ctx.write_csv(csv, output_path).await?, "parquet" => { + let compression = match opt.compression.as_str() { - "none" => Compression::Uncompressed, - "snappy" => Compression::Snappy, - "brotli" => Compression::Brotli, - "gzip" => Compression::Gzip, - "lz4" => Compression::Lz4, - "lz0" => Compression::Lzo, - "zstd" => Compression::Zstd, + "none" => CompressionOptions::Uncompressed, + "snappy" => CompressionOptions::Snappy, + "brotli" => CompressionOptions::Brotli(None), + "gzip" => CompressionOptions::Gzip(None), + "lz4" => CompressionOptions::Lz4, + "lz0" => CompressionOptions::Lzo, + "zstd" => CompressionOptions::Zstd(None), other => { return Err(DataFusionError::NotImplemented(format!( "Invalid compression format: {}", @@ -680,11 +681,10 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { ))) } }; - - let options = WriteOptions { - compression, + let options = arrow::io::parquet::write::WriteOptions { write_statistics: false, version: Version::V1, + compression, }; ctx.write_parquet(csv, output_path, options).await? } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index e546c5bdb68c..1667474b3f61 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -32,7 +32,7 @@ clap = { version = "3", features = ["derive", "cargo"] } rustyline = "9.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } datafusion = { path = "../datafusion", version = "7.0.0" } -arrow = { package = "arrow2", version="0.10", features = ["io_print"] } +arrow = { package = "arrow2", version="0.12", features = ["io_print"] } ballista = { path = "../ballista/rust/client", version = "0.6.0", optional=true } env_logger = "0.9" mimalloc = { version = "*", default-features = false } diff --git a/datafusion-common/Cargo.toml b/datafusion-common/Cargo.toml index 069fa7e06c86..7cddf2409001 100644 --- a/datafusion-common/Cargo.toml +++ b/datafusion-common/Cargo.toml @@ -37,8 +37,8 @@ pyarrow = ["pyo3"] jit = ["cranelift-module"] [dependencies] -arrow = { package = "arrow2", version = "0.10", default-features = false } -parquet = { package = "parquet2", version = "0.10", default_features = false, features = ["stream"], optional = true } +arrow = { package = "arrow2", version = "0.12.0", default-features = false } +parquet = { package = "parquet2", version = "0.13", default_features = false, optional = true } pyo3 = { version = "0.16", optional = true } sqlparser = "0.15" ordered-float = "2.10" diff --git a/datafusion-common/src/error.rs b/datafusion-common/src/error.rs index 5aa63c1f8655..5264a5af2288 100644 --- a/datafusion-common/src/error.rs +++ b/datafusion-common/src/error.rs @@ -22,13 +22,13 @@ use std::fmt::{Display, Formatter}; use std::io; use std::result; -use arrow::error::ArrowError; +use arrow::error::Error as ArrowError; #[cfg(feature = "avro")] use avro_rs::Error as AvroError; #[cfg(feature = "jit")] use cranelift_module::ModuleError; #[cfg(feature = "parquet")] -use parquet::error::ParquetError; +use parquet::error::Error as ParquetError; use sqlparser::parser::ParserError; /// Result type for operations that could result in an [DataFusionError] @@ -181,7 +181,7 @@ impl error::Error for DataFusionError {} #[cfg(test)] mod test { use crate::error::DataFusionError; - use arrow::error::ArrowError; + use arrow::error::Error as ArrowError; #[test] fn arrow_error_to_datafusion() { diff --git a/datafusion-common/src/field_util.rs b/datafusion-common/src/field_util.rs index 639e484980ad..bf22ea58e8f5 100644 --- a/datafusion-common/src/field_util.rs +++ b/datafusion-common/src/field_util.rs @@ -19,7 +19,7 @@ use arrow::array::{ArrayRef, StructArray}; use arrow::datatypes::{DataType, Field, Metadata, Schema}; -use arrow::error::ArrowError; +use arrow::error::Error as ArrowError; use std::borrow::Borrow; use std::collections::BTreeMap; diff --git a/datafusion-common/src/record_batch.rs b/datafusion-common/src/record_batch.rs index a1fa3101ecf0..309977146ce1 100644 --- a/datafusion-common/src/record_batch.rs +++ b/datafusion-common/src/record_batch.rs @@ -23,7 +23,7 @@ use arrow::array::*; use arrow::chunk::Chunk; use arrow::compute::filter::{build_filter, filter}; use arrow::datatypes::*; -use arrow::error::{ArrowError, Result}; +use arrow::error::{Error as ArrowError, Result}; /// A two-dimensional dataset with a number of /// columns ([`Array`]) and rows and defined [`Schema`](crate::datatypes::Schema). diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 599dc09a0840..5578d305aafb 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,11 +34,11 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-format = { version = "0.4", features = ["flight-service", "flight-data"] } -arrow = { package = "arrow2", version="0.10", features = ["io_ipc", "io_flight"] } +arrow-format = { version = "0.6", features = ["flight-service", "flight-data"] } +arrow = { package = "arrow2", version="0.12", features = ["io_ipc", "io_flight"] } datafusion = { path = "../datafusion" } -prost = "0.9" -tonic = "0.6" +prost = "0.10" +tonic = "0.7" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } futures = "0.3" num_cpus = "1.13.0" diff --git a/datafusion-expr/Cargo.toml b/datafusion-expr/Cargo.toml index a02119793b04..c4142581ab33 100644 --- a/datafusion-expr/Cargo.toml +++ b/datafusion-expr/Cargo.toml @@ -36,6 +36,6 @@ path = "src/lib.rs" [dependencies] datafusion-common = { path = "../datafusion-common", version = "7.0.0" } -arrow = { package = "arrow2", version = "0.10", default-features = false } +arrow = { package = "arrow2", version = "0.12", default-features = false } sqlparser = "0.15" ahash = { version = "0.7", default-features = false } diff --git a/datafusion-physical-expr/Cargo.toml b/datafusion-physical-expr/Cargo.toml index fc3f2257ca20..bf52d107aafb 100644 --- a/datafusion-physical-expr/Cargo.toml +++ b/datafusion-physical-expr/Cargo.toml @@ -41,7 +41,7 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] datafusion-common = { path = "../datafusion-common", version = "7.0.0" } datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" } -arrow = { package = "arrow2", version = "0.10" } +arrow = { package = "arrow2", version = "0.12" } paste = "^1.0" ahash = { version = "0.7", default-features = false } ordered-float = "2.10" diff --git a/datafusion-physical-expr/src/arrow_temporal_util.rs b/datafusion-physical-expr/src/arrow_temporal_util.rs index fdc841846393..991e85e631b1 100644 --- a/datafusion-physical-expr/src/arrow_temporal_util.rs +++ b/datafusion-physical-expr/src/arrow_temporal_util.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::error::{ArrowError, Result}; +use arrow::error::{Error as ArrowError, Result}; use chrono::{prelude::*, LocalResult}; /// Accepts a string in RFC3339 / ISO8601 standard format and some diff --git a/datafusion-physical-expr/src/expressions/binary.rs b/datafusion-physical-expr/src/expressions/binary.rs index ab0479053114..5446ae99528f 100644 --- a/datafusion-physical-expr/src/expressions/binary.rs +++ b/datafusion-physical-expr/src/expressions/binary.rs @@ -936,7 +936,7 @@ mod tests { use crate::expressions::{col, lit}; use crate::test_util::create_decimal_array; use arrow::datatypes::{Field, SchemaRef}; - use arrow::error::ArrowError; + use arrow::error::Error as ArrowError; use datafusion_common::field_util::SchemaExt; // TODO add iter for decimal array diff --git a/datafusion-physical-expr/src/regex_expressions.rs b/datafusion-physical-expr/src/regex_expressions.rs index fd8b7a35203e..3d15ed9d850b 100644 --- a/datafusion-physical-expr/src/regex_expressions.rs +++ b/datafusion-physical-expr/src/regex_expressions.rs @@ -28,7 +28,7 @@ use std::any::type_name; use std::sync::Arc; use arrow::array::*; -use arrow::error::ArrowError; +use arrow::error::Error as ArrowError; use datafusion_common::{DataFusionError, Result}; diff --git a/datafusion-physical-expr/src/test_util.rs b/datafusion-physical-expr/src/test_util.rs index 50b199473e2b..86af407e2873 100644 --- a/datafusion-physical-expr/src/test_util.rs +++ b/datafusion-physical-expr/src/test_util.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use arrow::datatypes::DataType; #[cfg(test)] diff --git a/datafusion-proto/Cargo.toml b/datafusion-proto/Cargo.toml index b71e434531cc..9633af33f3f8 100644 --- a/datafusion-proto/Cargo.toml +++ b/datafusion-proto/Cargo.toml @@ -36,7 +36,7 @@ path = "src/lib.rs" [dependencies] datafusion = { path = "../datafusion", version = "7.0.0" } -prost = "0.9" +prost = "0.10" [build-dependencies] -tonic-build = { version = "0.6" } +tonic-build = { version = "0.7" } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 380f65f16f5a..fb6d1ee71c0c 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -60,7 +60,7 @@ datafusion-jit = { path = "../datafusion-jit", version = "7.0.0", optional = tru datafusion-physical-expr = { path = "../datafusion-physical-expr", version = "7.0.0" } ahash = { version = "0.7", default-features = false } hashbrown = { version = "0.12", features = ["raw"] } -parquet = { package = "parquet2", version = "0.10", default_features = false, features = ["stream"] } +parquet = { package = "parquet2", version = "0.13", default_features = false } sqlparser = "0.15" paste = "^1.0" num_cpus = "1.13.0" @@ -86,14 +86,14 @@ comfy-table = { version = "5.0", default-features = false } [dependencies.arrow] package = "arrow2" -version="0.10" +version="0.12" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"] [dev-dependencies] criterion = "0.3" doc-comment = "0.3" fuzz-utils = { path = "fuzz-utils" } -parquet-format-async-temp = "0.2" +parquet-format-async-temp = "0.3" [[bench]] name = "aggregate_query_sql" diff --git a/datafusion/benches/parquet_query_sql.rs b/datafusion/benches/parquet_query_sql.rs index bc5c300a26ca..343dea5d53f3 100644 --- a/datafusion/benches/parquet_query_sql.rs +++ b/datafusion/benches/parquet_query_sql.rs @@ -29,7 +29,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::prelude::ExecutionContext; use datafusion_common::field_util::SchemaExt; use datafusion_common::record_batch::RecordBatch; -use parquet::compression::Compression; +use parquet::compression::{Compression, CompressionOptions}; use parquet::encoding::Encoding; use parquet::write::Version; use rand::distributions::uniform::SampleUniform; @@ -153,7 +153,7 @@ fn generate_file() -> NamedTempFile { let options = arrow::io::parquet::write::WriteOptions { write_statistics: true, - compression: Compression::Uncompressed, + compression: CompressionOptions::Uncompressed, version: Version::V2, }; @@ -172,12 +172,11 @@ fn generate_file() -> NamedTempFile { iter.into_iter(), schema.as_ref(), options, - vec![Encoding::Plain].repeat(schema.fields().len()), + vec![vec![Encoding::Plain]].repeat(schema.fields().len()), ) .unwrap(); for rg in row_groups { - let (group, len) = rg.unwrap(); - writer.write(group, len).unwrap(); + writer.write(rg.unwrap()).unwrap(); } } let (_total_size, mut w) = writer.end(None).unwrap(); diff --git a/datafusion/fuzz-utils/Cargo.toml b/datafusion/fuzz-utils/Cargo.toml index 46c5b1e186cd..a7fa65da59ee 100644 --- a/datafusion/fuzz-utils/Cargo.toml +++ b/datafusion/fuzz-utils/Cargo.toml @@ -24,6 +24,6 @@ edition = "2021" [dependencies] datafusion-common = { path = "../../datafusion-common", version = "^7.0.0" } -arrow = { package = "arrow2", version="0.10", features = ["io_print"] } +arrow = { package = "arrow2", version="0.12", features = ["io_print"] } rand = "0.8" env_logger = "0.9.0" diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index 04e469574e12..03b46f7f3421 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use crate::physical_plan::SendableRecordBatchStream; use async_trait::async_trait; -use parquet::write::WriteOptions; /// DataFrame represents a logical set of rows with the same named columns. /// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or @@ -414,6 +413,6 @@ pub trait DataFrame: Send + Sync { async fn write_parquet( &self, path: &str, - writer_properties: Option, + writer_properties: Option, ) -> Result<()>; } diff --git a/datafusion/src/datasource/memory.rs b/datafusion/src/datasource/memory.rs index 735abe0b7e29..9a9119c79116 100644 --- a/datafusion/src/datasource/memory.rs +++ b/datafusion/src/datasource/memory.rs @@ -165,7 +165,7 @@ mod tests { use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; - use arrow::error::ArrowError; + use arrow::error::Error as ArrowError; use std::collections::BTreeMap; #[tokio::test] diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index cfc99c6e0751..bf98ba6a4cd2 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -89,7 +89,6 @@ use crate::variable::{VarProvider, VarType}; use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use parquet::write::WriteOptions; use super::{ disk_manager::DiskManagerConfig, @@ -723,7 +722,7 @@ impl ExecutionContext { &self, plan: Arc, path: impl AsRef, - writer_properties: WriteOptions, + writer_properties: arrow::io::parquet::write::WriteOptions, ) -> Result<()> { plan_to_parquet(self, plan, path, Some(writer_properties)).await } diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 9d50546d78e4..bf7bd663bdfd 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -20,7 +20,6 @@ use async_trait::async_trait; use datafusion_common::field_util::{FieldExt, SchemaExt}; use parking_lot::Mutex; -use parquet::write::WriteOptions; use std::any::Any; use std::sync::Arc; @@ -327,7 +326,7 @@ impl DataFrame for DataFrameImpl { async fn write_parquet( &self, path: &str, - writer_properties: Option, + writer_properties: Option, ) -> Result<()> { let plan = self.create_physical_plan().await?; let state = self.ctx_state.lock().clone(); diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index 59a3dc32994f..e14f55e1f202 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -26,7 +26,7 @@ use crate::record_batch::RecordBatch; use arrow::compute::aggregate::estimated_bytes_size; use arrow::compute::concatenate::concatenate; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::error::ArrowError; +use arrow::error::Error as ArrowError; use arrow::error::Result as ArrowResult; use arrow::io::ipc::write::{FileWriter, WriteOptions}; use datafusion_common::field_util::SchemaExt; diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index 4c7800695c03..ee34ce3555a3 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -29,7 +29,7 @@ use crate::{ }; use arrow::{ datatypes::SchemaRef, - error::{ArrowError, Result as ArrowResult}, + error::{Error as ArrowError, Result as ArrowResult}, }; use datafusion_common::record_batch::RecordBatch; use futures::Stream; diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index f773929c5ca0..1c83ea8a536d 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -28,7 +28,7 @@ pub use self::parquet::ParquetExec; use arrow::{ array::{ArrayRef, DictionaryArray}, datatypes::{DataType, Field, Schema, SchemaRef}, - error::{ArrowError, Result as ArrowResult}, + error::{Error as ArrowError, Result as ArrowResult}, }; pub use avro::AvroExec; pub(crate) use csv::plan_to_csv; diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index f89976f14d8c..59c9280c34d1 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -44,7 +44,7 @@ use crate::{ use arrow::{ array::ArrayRef, datatypes::{Schema, SchemaRef}, - error::{ArrowError, Result as ArrowResult}, + error::{Error as ArrowError, Result as ArrowResult}, }; use datafusion_common::field_util::SchemaExt; use datafusion_common::Column; @@ -56,7 +56,7 @@ use parquet::statistics::{ PrimitiveStatistics as ParquetPrimitiveStatistics, }; -use arrow::io::parquet::write::RowGroupIterator; +use arrow::io::parquet::write::{transverse, RowGroupIterator}; use fmt::Debug; use tokio::task::JoinHandle; @@ -71,7 +71,6 @@ use crate::physical_plan::file_format::SchemaAdapter; use async_trait::async_trait; use parquet::encoding::Encoding; use parquet::metadata::RowGroupMetaData; -use parquet::write::WriteOptions; use super::PartitionColumnProjector; @@ -384,7 +383,7 @@ macro_rules! get_min_max_values { let scalar_values : Vec = $self.row_group_metadata .iter() .flat_map(|meta| { - meta.column(column_index).statistics() + meta.columns()[column_index].statistics() }) .map(|stats| { get_statistic!(stats.as_ref().unwrap(), $attr) @@ -414,7 +413,7 @@ macro_rules! get_null_count_values { let scalar_values: Vec = $self .row_group_metadata .iter() - .flat_map(|meta| meta.column(column_index).statistics()) + .flat_map(|meta| meta.columns()[column_index].statistics()) .flatten() .map(|stats| ScalarValue::Int64(stats.null_count())) .collect(); @@ -446,7 +445,7 @@ fn build_row_group_predicate( pruning_predicate: &PruningPredicate, metrics: ParquetFileMetrics, row_group_metadata: &[RowGroupMetaData], -) -> Box bool> { +) -> Box bool + Send + Sync> { let parquet_schema = pruning_predicate.schema().as_ref(); let pruning_stats = RowGroupPruningStatistics { @@ -564,7 +563,7 @@ pub async fn plan_to_parquet( context: &ExecutionContext, plan: Arc, path: impl AsRef, - writer_properties: Option, + writer_properties: Option, ) -> Result<()> { let options = writer_properties.clone().ok_or_else(|| { DataFusionError::Execution("missing parquet writer properties".to_string()) @@ -574,6 +573,7 @@ pub async fn plan_to_parquet( // create directory to contain the Parquet files (one per partition) let fs_path = Path::new(path); let runtime = context.runtime_env(); + match fs::create_dir(fs_path) { Ok(()) => { let mut tasks = vec![]; @@ -587,8 +587,17 @@ pub async fn plan_to_parquet( plan.schema().as_ref().clone(), options, )?; - writer.start()?; + let stream = plan.execute(i, runtime.clone()).await?; + + let encodings: Vec> = plan + .schema() + .as_ref() + .fields + .iter() + .map(|f| transverse(&f.data_type, |_| Encoding::Plain)) + .collect(); + let handle: JoinHandle> = task::spawn(async move { stream .map(|batch| { @@ -597,13 +606,12 @@ pub async fn plan_to_parquet( iter.into_iter(), plan.schema().as_ref(), options, - vec![Encoding::Plain] - .repeat(plan.schema().as_ref().fields.len()), + encodings.clone(), ) .unwrap(); + for rg in row_groups { - let (group, len) = rg?; - writer.write(group, len)?; + writer.write(rg.unwrap())?; } crate::error::Result::<()>::Ok(()) }) @@ -642,17 +650,19 @@ mod tests { use crate::prelude::ExecutionConfig; use ::parquet::statistics::Statistics as ParquetStatistics; use arrow::datatypes::{DataType, Field}; - use arrow::io::parquet; use arrow::io::parquet::read::ColumnChunkMetaData; use arrow::io::parquet::write::{ - to_parquet_schema, ColumnDescriptor, Compression, Encoding, FileWriter, - RowGroupIterator, SchemaDescriptor, Version, WriteOptions, + to_parquet_schema, Encoding, FileWriter, RowGroupIterator, SchemaDescriptor, + Version, WriteOptions, }; use datafusion_common::field_util::{FieldExt, SchemaExt}; use futures::StreamExt; - use parquet_format_async_temp::RowGroup; + use parquet::schema::types::{PhysicalType, PrimitiveType as ParquetPrimitiveType}; + use parquet::write::WriteOptions as ParquetWriteOptions; + use parquet_format_async_temp::{RowGroup, Type}; use std::fs::File; use std::io::Write; + use parquet::compression::CompressionOptions; use tempfile::TempDir; /// writes each RecordBatch as an individual parquet file and then @@ -673,27 +683,31 @@ mod tests { .expect("cloning file descriptor"); let options = WriteOptions { write_statistics: true, - compression: Compression::Uncompressed, + compression: CompressionOptions::Uncompressed, version: Version::V2, }; let schema_ref = &batch.schema().clone(); + let encodings = schema_ref + .fields + .iter() + .map(|f| transverse(&f.data_type, |_| Encoding::Plain)) + .collect(); let iter = vec![Ok(batch.into())]; let row_groups = RowGroupIterator::try_new( iter.into_iter(), schema_ref, options, - vec![Encoding::Plain].repeat(schema_ref.fields.len()), + encodings, ) .unwrap(); let mut writer = FileWriter::try_new(file, schema_ref.as_ref().clone(), options) .unwrap(); - writer.start().unwrap(); for rg in row_groups { - let (group, len) = rg.unwrap(); - writer.write(group, len).unwrap(); + let group = rg.unwrap(); + writer.write(group).unwrap(); } writer.end(None).unwrap(); output @@ -1086,12 +1100,12 @@ mod tests { let mut results = parquet_exec.execute(0, runtime).await?; let batch = results.next().await.unwrap(); // invalid file should produce an error to that effect + let error = batch.unwrap_err(); assert_contains!( - batch.unwrap_err().to_string(), - "External error: Parquet error: Arrow: IO error" + error.to_string(), + "External error: IO error" ); assert!(results.next().await.is_none()); - Ok(()) } @@ -1101,14 +1115,14 @@ mod tests { } fn parquet_primitive_column_stats( - column_descr: ColumnDescriptor, + primitive_type: ParquetPrimitiveType, min: Option, max: Option, distinct: Option, nulls: i64, ) -> ParquetPrimitiveStatistics { ParquetPrimitiveStatistics:: { - descriptor: column_descr, + primitive_type, min_value: min, max_value: max, null_count: Some(nulls), @@ -1129,7 +1143,7 @@ mod tests { let rgm1 = get_row_group_meta_data( &schema_descr, vec![&parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(1), Some(10), None, @@ -1139,7 +1153,7 @@ mod tests { let rgm2 = get_row_group_meta_data( &schema_descr, vec![&parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(11), Some(20), None, @@ -1175,7 +1189,7 @@ mod tests { let rgm1 = get_row_group_meta_data( &schema_descr, vec![&parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), None, None, None, @@ -1185,7 +1199,7 @@ mod tests { let rgm2 = get_row_group_meta_data( &schema_descr, vec![&parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(11), Some(20), None, @@ -1227,14 +1241,14 @@ mod tests { &schema_descr, vec![ &parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(1), Some(10), None, 0, ), &parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(1), Some(10), None, @@ -1246,14 +1260,14 @@ mod tests { &schema_descr, vec![ &parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(11), Some(20), None, 0, ), &parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(11), Some(20), None, @@ -1305,7 +1319,7 @@ mod tests { &schema_descr, vec![ &parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(1), Some(10), None, @@ -1323,7 +1337,7 @@ mod tests { &schema_descr, vec![ &parquet_primitive_column_stats::( - schema_descr.column(0).clone(), + ParquetPrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), Some(11), Some(20), None, @@ -1411,23 +1425,29 @@ mod tests { let mut chunks = vec![]; let mut columns = vec![]; for (i, s) in column_statistics.into_iter().enumerate() { - let column_descr = schema_descr.column(i); - let type_ = match column_descr.type_() { - parquet::write::ParquetType::PrimitiveType { physical_type, .. } => { - ::parquet::schema::types::physical_type_to_type(physical_type).0 - } - _ => { - panic!("Trying to write a row group of a non-physical type") + let column_descr = &schema_descr.columns()[i]; + + let type_ = match column_descr.descriptor.primitive_type.physical_type { + PhysicalType::Boolean => { Type::BOOLEAN } + PhysicalType::Int32 => { Type::INT32 } + PhysicalType::Int64 => { Type::INT64 } + PhysicalType::Int96 => { Type::INT96 } + PhysicalType::Float => { Type::FLOAT } + PhysicalType::Double => { Type::DOUBLE } + PhysicalType::ByteArray => { Type::BYTE_ARRAY } + PhysicalType::FixedLenByteArray(_) => { + Type::FIXED_LEN_BYTE_ARRAY } }; + let column_chunk = ColumnChunk { file_path: None, file_offset: 0, meta_data: Some(ColumnMetaData::new( - type_, + type_, // parquet_format_async_temp::parquet_format::Type Vec::new(), - column_descr.path_in_schema().to_vec(), - Compression::Uncompressed.into(), + column_descr.path_in_schema.clone(), + CompressionOptions::Uncompressed.into(), 0, 0, 0, @@ -1446,16 +1466,23 @@ mod tests { crypto_metadata: None, encrypted_column_metadata: None, }; - let column = ColumnChunkMetaData::try_from_thrift( - column_descr.clone(), + let column = ColumnChunkMetaData::new( column_chunk.clone(), - ) - .unwrap(); + column_descr.clone(), + ); columns.push(column); chunks.push(column_chunk); } let rg = RowGroup::new(chunks, 0, 0, None, None, None, None); - RowGroupMetaData::try_from_thrift(schema_descr, rg).unwrap() + + let total_byte_size = rg.total_byte_size as usize; + let num_rows = rg.num_rows as usize; + let mut columns = vec![]; + for (cc, d) in rg.columns.into_iter().zip(schema_descr.columns()) { + let cc = ColumnChunkMetaData::new(cc, d.clone()); + columns.push(cc); + } + RowGroupMetaData::new(columns, num_rows, total_byte_size) } fn populate_csv_partitions( @@ -1506,7 +1533,15 @@ mod tests { // execute a simple query and write the results to parquet let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; let df = ctx.sql("SELECT c1, c2 FROM test").await?; - df.write_parquet(&out_dir, None).await?; + df.write_parquet( + &out_dir, + Some(arrow::io::parquet::write::WriteOptions { + version: Version::V2, + write_statistics: true, + compression: CompressionOptions::Uncompressed, + }), + ) + .await?; // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, None).await?; // create a new context and verify that the results were saved to a partitioned csv file diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 7653c6858689..5f57c9e42968 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -42,7 +42,7 @@ use crate::{ scalar::ScalarValue, }; use arrow::array::{Array, PrimitiveArray, Utf8Array}; -use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow::error::{Error as ArrowError, Result as ArrowResult}; use arrow::types::{NativeType, Offset}; use arrow::{ array::ArrayRef, @@ -740,9 +740,9 @@ where .windows(2) .map(|offset| op(offset[1] - offset[0])); - let values = arrow::buffer::Buffer::from_trusted_len_iter(values); + let values = arrow::buffer::Buffer::from_iter(values); - let data_type = if O::is_large() { + let data_type = if O::IS_LARGE { DataType::Int64 } else { DataType::Int32 diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index e870d3935f73..d46f2041fe64 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -42,7 +42,7 @@ use arrow::compute::{concatenate, take}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::{ array::Array, - error::{ArrowError, Result as ArrowResult}, + error::{Error as ArrowError, Result as ArrowResult}, }; use arrow::{array::ArrayRef, compute::cast}; use hashbrown::raw::RawTable; diff --git a/datafusion/src/physical_plan/metrics/baseline.rs b/datafusion/src/physical_plan/metrics/baseline.rs index b77cd633f336..972600ffc636 100644 --- a/datafusion/src/physical_plan/metrics/baseline.rs +++ b/datafusion/src/physical_plan/metrics/baseline.rs @@ -20,7 +20,7 @@ use std::task::Poll; use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, Timestamp}; -use arrow::error::ArrowError; +use arrow::error::Error as ArrowError; use datafusion_common::record_batch::RecordBatch; /// Helper for creating and tracking common "baseline" metrics for diff --git a/datafusion/src/physical_plan/metrics/tracker.rs b/datafusion/src/physical_plan/metrics/tracker.rs index bfeb85313c6c..24099df087c7 100644 --- a/datafusion/src/physical_plan/metrics/tracker.rs +++ b/datafusion/src/physical_plan/metrics/tracker.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use std::task::Poll; use crate::record_batch::RecordBatch; -use arrow::error::ArrowError; +use arrow::error::Error as ArrowError; /// Simplified version of tracking memory consumer, /// see also: [`Tracking`](crate::execution::memory_manager::ConsumerType::Tracking) diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index b6ea2294cdd8..59c1862b198d 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -522,7 +522,7 @@ mod tests { }; use arrow::array::{ArrayRef, Utf8Array}; use arrow::datatypes::{DataType, Field, Schema}; - use arrow::error::ArrowError; + use arrow::error::Error as ArrowError; use datafusion_common::field_util::SchemaExt; use datafusion_common::record_batch::RecordBatch; use futures::FutureExt; diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index 8b137891791f..b248758bc120 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -1 +1,16 @@ - +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index 99c6870c808d..1f8fbc56c37e 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -34,7 +34,7 @@ use arrow::array::growable::make_growable; use arrow::{ compute::sort::SortOptions, datatypes::SchemaRef, - error::{ArrowError, Result as ArrowResult}, + error::{Error as ArrowError, Result as ArrowResult}, }; use async_trait::async_trait; use datafusion_common::field_util::SchemaExt; diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs index 3427a81efff2..a6ef5f810159 100644 --- a/datafusion/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -32,7 +32,7 @@ use crate::record_batch::RecordBatch; use arrow::{ array::ArrayRef, datatypes::{Schema, SchemaRef}, - error::{ArrowError, Result as ArrowResult}, + error::{Error as ArrowError, Result as ArrowResult}, }; use async_trait::async_trait; use datafusion_common::field_util::SchemaExt; diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs index 13bfa909c91a..2d27bfcc5185 100644 --- a/datafusion/src/test/exec.rs +++ b/datafusion/src/test/exec.rs @@ -29,7 +29,7 @@ use tokio::sync::Barrier; use crate::record_batch::RecordBatch; use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, - error::{ArrowError, Result as ArrowResult}, + error::{Error as ArrowError, Result as ArrowResult}, }; use datafusion_common::field_util::SchemaExt; use futures::Stream; @@ -239,7 +239,7 @@ impl ExecutionPlan for MockExec { } fn clone_error(e: &ArrowError) -> ArrowError { - use ArrowError::*; + use arrow::error::Error::InvalidArgumentError; match e { InvalidArgumentError(msg) => InvalidArgumentError(msg.to_string()), _ => unimplemented!(), diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index d74b5ec8d72a..c52b3ea55911 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -25,7 +25,7 @@ use arrow::io::parquet::write::{FileWriter, RowGroupIterator}; use arrow::{ array::{Array, ArrayRef, Float64Array, Int32Array, Int64Array, Utf8Array}, datatypes::{DataType, Field, Schema}, - io::parquet::write::{Compression, Encoding, Version, WriteOptions}, + io::parquet::write::{CompressionOptions, Encoding, Version, WriteOptions}, }; use chrono::{Datelike, Duration}; use datafusion::record_batch::RecordBatch; @@ -624,18 +624,18 @@ async fn make_test_file(scenario: Scenario) -> NamedTempFile { let schema = batches[0].schema(); let options = WriteOptions { - compression: Compression::Uncompressed, + compression: CompressionOptions::Uncompressed, write_statistics: true, version: Version::V1, }; - let encodings: Vec = schema + let encodings: Vec> = schema .fields() .iter() .map(|field| { if let DataType::Dictionary(_, _, _) = field.data_type() { - Encoding::RleDictionary + vec![Encoding::RleDictionary] } else { - Encoding::Plain + vec![Encoding::Plain] } }) .collect(); @@ -650,10 +650,9 @@ async fn make_test_file(scenario: Scenario) -> NamedTempFile { let mut writer = FileWriter::try_new(&mut file, schema.as_ref().clone(), options).unwrap(); - writer.start().unwrap(); for rg in row_groups.unwrap() { - let (group, len) = rg.unwrap(); - writer.write(group, len).unwrap(); + let group = rg.unwrap(); + writer.write(group).unwrap(); } writer.end(None).unwrap(); diff --git a/datafusion/tests/sql_integration.rs b/datafusion/tests/sql_integration.rs index 8b137891791f..b248758bc120 100644 --- a/datafusion/tests/sql_integration.rs +++ b/datafusion/tests/sql_integration.rs @@ -1 +1,16 @@ - +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 4b7722dbaced..0ac405ef8351 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -63,7 +63,7 @@ use futures::{Stream, StreamExt}; use arrow::{ array::{Int64Array, Utf8Array}, datatypes::SchemaRef, - error::ArrowError, + error::Error as ArrowError, }; use datafusion::record_batch::RecordBatch; use datafusion::{