From 3338b9c52acf02c4e34229ba72d23410232dcb53 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 28 Jan 2023 12:01:32 -0800 Subject: [PATCH 1/2] Replace ArrowError with DataFusionError in DF context --- .../core/src/physical_plan/aggregates/mod.rs | 3 +- .../physical_plan/aggregates/no_grouping.rs | 9 ++-- .../src/physical_plan/aggregates/row_hash.rs | 14 ++---- datafusion/core/src/physical_plan/analyze.rs | 3 +- .../src/physical_plan/coalesce_batches.rs | 4 +- .../src/physical_plan/coalesce_partitions.rs | 8 ++-- datafusion/core/src/physical_plan/common.rs | 27 +++++------ .../core/src/physical_plan/file_format/csv.rs | 2 +- .../physical_plan/file_format/file_stream.rs | 26 +++++----- .../core/src/physical_plan/file_format/mod.rs | 7 ++- .../src/physical_plan/file_format/parquet.rs | 8 +++- datafusion/core/src/physical_plan/filter.rs | 10 ++-- .../src/physical_plan/joins/cross_join.rs | 8 ++-- .../core/src/physical_plan/joins/hash_join.rs | 17 +++---- .../physical_plan/joins/nested_loop_join.rs | 16 +++---- .../physical_plan/joins/sort_merge_join.rs | 42 ++++++++--------- .../core/src/physical_plan/joins/utils.rs | 14 +++--- datafusion/core/src/physical_plan/limit.rs | 5 +- datafusion/core/src/physical_plan/memory.rs | 3 +- .../src/physical_plan/metrics/baseline.rs | 9 ++-- .../core/src/physical_plan/metrics/tracker.rs | 7 +-- datafusion/core/src/physical_plan/mod.rs | 5 +- .../core/src/physical_plan/projection.rs | 8 ++-- .../core/src/physical_plan/repartition/mod.rs | 35 ++++++-------- .../core/src/physical_plan/sorts/sort.rs | 47 +++++++++---------- .../sorts/sort_preserving_merge.rs | 20 ++++---- datafusion/core/src/physical_plan/stream.rs | 17 ++++--- .../core/src/physical_plan/streaming.rs | 4 +- datafusion/core/src/physical_plan/union.rs | 5 +- .../windows/bounded_window_agg_exec.rs | 13 ++--- .../physical_plan/windows/window_agg_exec.rs | 24 +++++----- .../core/src/scheduler/pipeline/execution.rs | 4 +- datafusion/core/src/scheduler/task.rs | 5 +- datafusion/core/src/test/exec.rs | 17 ++++--- datafusion/core/tests/custom_sources.rs | 5 +- datafusion/core/tests/sql/timestamp.rs | 2 +- datafusion/core/tests/sql/window.rs | 2 +- datafusion/core/tests/user_defined_plan.rs | 20 ++++---- 38 files changed, 220 insertions(+), 255 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 0acbef9a5a47..8ca50fec85f8 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -722,7 +722,6 @@ mod tests { use crate::{assert_batches_sorted_eq, physical_plan::common}; use arrow::array::{Float64Array, UInt32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::{lit, ApproxDistinct, Count, Median}; @@ -1038,7 +1037,7 @@ mod tests { } impl Stream for TestYieldingStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs index 4d9de842f6f0..6356009a4915 100644 --- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs +++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs @@ -25,7 +25,6 @@ use crate::physical_plan::aggregates::{ use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; -use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; @@ -38,7 +37,7 @@ use futures::stream::{Stream, StreamExt}; /// stream struct for aggregation without grouping columns pub(crate) struct AggregateStream { - stream: BoxStream<'static, ArrowResult>, + stream: BoxStream<'static, Result>, schema: SchemaRef, } @@ -112,7 +111,7 @@ impl AggregateStream { .and_then(|allocated| this.reservation.try_grow(allocated)) { Ok(_) => continue, - Err(e) => Err(ArrowError::ExternalError(Box::new(e))), + Err(e) => Err(e), } } Some(Err(e)) => Err(e), @@ -120,9 +119,9 @@ impl AggregateStream { this.finished = true; let timer = this.baseline_metrics.elapsed_compute().timer(); let result = finalize_aggregation(&this.accumulators, &this.mode) - .map_err(|e| ArrowError::ExternalError(Box::new(e))) .and_then(|columns| { RecordBatch::try_new(this.schema.clone(), columns) + .map_err(Into::into) }) .record_output(&this.baseline_metrics); @@ -146,7 +145,7 @@ impl AggregateStream { } impl Stream for AggregateStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 994cdd8860a6..5a5e36a2480e 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -42,13 +42,10 @@ use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use arrow::array::{new_null_array, PrimitiveArray}; +use arrow::array::{Array, UInt32Builder}; use arrow::compute::cast; use arrow::datatypes::{DataType, Schema, UInt32Type}; use arrow::{array::ArrayRef, compute}; -use arrow::{ - array::{Array, UInt32Builder}, - error::{ArrowError, Result as ArrowResult}, -}; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::Accumulator; @@ -226,7 +223,7 @@ impl GroupedHashAggregateStream { } impl Stream for GroupedHashAggregateStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -252,9 +249,7 @@ impl Stream for GroupedHashAggregateStream { }); if let Err(e) = result { - return Poll::Ready(Some(Err( - ArrowError::ExternalError(Box::new(e)), - ))); + return Poll::Ready(Some(Err(e))); } } // inner had error, return to caller @@ -569,7 +564,7 @@ impl std::fmt::Debug for RowAggregationState { impl GroupedHashAggregateStream { /// Create a RecordBatch with all group keys and accumulator' states or values. - fn create_batch_from_map(&mut self) -> ArrowResult> { + fn create_batch_from_map(&mut self) -> Result> { let skip_items = self.row_group_skip_position; if skip_items > self.row_aggr_state.group_states.len() { return Ok(None); @@ -624,7 +619,6 @@ impl GroupedHashAggregateStream { // the intermediate GroupByScalar type was not the same as the // output cast(&item, field.data_type()) - .map_err(DataFusionError::ArrowError) }?; results.push(result); } diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 2e7285aac597..2e12251c2f9b 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -200,7 +200,8 @@ impl ExecutionPlan for AnalyzeExec { Arc::new(type_builder.finish()), Arc::new(plan_builder.finish()), ], - ); + ) + .map_err(Into::into); // again ignore error tx.send(maybe_batch).await.ok(); }); diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 2f7ebd85a5f9..2e7211fc3ae4 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -180,7 +180,7 @@ struct CoalesceBatchesStream { } impl Stream for CoalesceBatchesStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, @@ -200,7 +200,7 @@ impl CoalesceBatchesStream { fn poll_next_inner( self: &mut Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { // Get a clone (uses same underlying atomic) as self gets borrowed below let cloned_time = self.baseline_metrics.elapsed_compute().clone(); diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index 1b847372eed6..8ff8a37aa36a 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -25,8 +25,8 @@ use std::task::Poll; use futures::Stream; use tokio::sync::mpsc; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use arrow::{datatypes::SchemaRef, error::Result as ArrowResult}; use super::common::AbortOnDropMany; use super::expressions::PhysicalSortExpr; @@ -138,7 +138,7 @@ impl ExecutionPlan for CoalescePartitionsExec { // least one result in an attempt to maximize // parallelism. let (sender, receiver) = - mpsc::channel::>(input_partitions); + mpsc::channel::>(input_partitions); // spawn independent tasks whose resulting streams (of batches) // are sent to the channel for consumption. @@ -185,14 +185,14 @@ impl ExecutionPlan for CoalescePartitionsExec { struct MergeStream { schema: SchemaRef, - input: mpsc::Receiver>, + input: mpsc::Receiver>, baseline_metrics: BaselineMetrics, #[allow(unused)] drop_helper: AbortOnDropMany<()>, } impl Stream for MergeStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index c3e243d83904..2515689e136a 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -25,7 +25,6 @@ use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statist use arrow::compute::concat; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; -use arrow::error::Result as ArrowResult; use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; use arrow::record_batch::RecordBatch; use datafusion_physical_expr::utils::ordering_satisfy; @@ -68,7 +67,7 @@ impl SizedRecordBatchStream { } impl Stream for SizedRecordBatchStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -92,10 +91,7 @@ impl RecordBatchStream for SizedRecordBatchStream { /// Create a vector of record batches from a stream pub async fn collect(stream: SendableRecordBatchStream) -> Result> { - stream - .try_collect::>() - .await - .map_err(DataFusionError::from) + stream.try_collect::>().await } /// Merge two record batch references into a single record batch. @@ -104,15 +100,16 @@ pub fn merge_batches( first: &RecordBatch, second: &RecordBatch, schema: SchemaRef, -) -> ArrowResult { +) -> Result { let columns = (0..schema.fields.len()) .map(|index| { let first_column = first.column(index).as_ref(); let second_column = second.column(index).as_ref(); concat(&[first_column, second_column]) }) - .collect::>>()?; - RecordBatch::try_new(schema, columns) + .collect::, ArrowError>>() + .map_err(Into::::into)?; + RecordBatch::try_new(schema, columns).map_err(Into::into) } /// Merge a slice of record batch references into a single record batch, or @@ -121,7 +118,7 @@ pub fn merge_batches( pub fn merge_multiple_batches( batches: &[&RecordBatch], schema: SchemaRef, -) -> ArrowResult> { +) -> Result> { Ok(if batches.is_empty() { None } else { @@ -134,7 +131,8 @@ pub fn merge_multiple_batches( .collect::>(), ) }) - .collect::>>()?; + .collect::, ArrowError>>() + .map_err(Into::::into)?; Some(RecordBatch::try_new(schema, columns)?) }) } @@ -190,7 +188,7 @@ fn build_file_list_recurse( /// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender pub(crate) fn spawn_execution( input: Arc, - output: mpsc::Sender>, + output: mpsc::Sender>, partition: usize, context: Arc, ) -> JoinHandle<()> { @@ -199,8 +197,7 @@ pub(crate) fn spawn_execution( Err(e) => { // If send fails, plan being torn down, // there is no place to send the error. - let arrow_error = ArrowError::ExternalError(Box::new(e)); - output.send(Err(arrow_error)).await.ok(); + output.send(Err(e)).await.ok(); debug!( "Stopping execution: error executing input: {}", displayable(input.as_ref()).one_line() @@ -524,7 +521,7 @@ impl IPCWriter { /// Finish the writer pub fn finish(&mut self) -> Result<()> { - self.writer.finish().map_err(DataFusionError::ArrowError) + self.writer.finish().map_err(Into::into) } /// Path write to diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 6c3b1371ae2e..e6bedbbb9311 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -452,7 +452,7 @@ mod tests { let err = it.next().await.unwrap().unwrap_err().to_string(); assert_eq!( err, - "Csv error: incorrect number of fields for line 1, expected 14 got 13" + "Arrow error: Csv error: incorrect number of fields for line 1, expected 14 got 13" ); Ok(()) } diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index b336b19ef473..89c2cecb47d9 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -27,7 +27,8 @@ use std::task::{Context, Poll}; use std::time::Instant; use arrow::datatypes::SchemaRef; -use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use arrow::error::ArrowError; +use arrow::record_batch::RecordBatch; use datafusion_common::ScalarValue; use futures::future::BoxFuture; use futures::stream::BoxStream; @@ -45,7 +46,7 @@ use crate::physical_plan::RecordBatchStream; /// A fallible future that resolves to a stream of [`RecordBatch`] pub type FileOpenFuture = - BoxFuture<'static, Result>>>; + BoxFuture<'static, Result>>>; /// Generic API for opening a file using an [`ObjectStore`] and resolving to a /// stream of [`RecordBatch`] @@ -96,7 +97,7 @@ enum FileStreamState { /// Partitioning column values for the current batch_iter partition_values: Vec, /// The reader instance - reader: BoxStream<'static, ArrowResult>, + reader: BoxStream<'static, Result>, }, /// Encountered an error Error, @@ -201,10 +202,7 @@ impl FileStream { }) } - fn poll_inner( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match &mut self.state { FileStreamState::Idle => { @@ -230,7 +228,7 @@ impl FileStream { } Err(e) => { self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e.into()))); + return Poll::Ready(Some(Err(e))); } } } @@ -249,7 +247,7 @@ impl FileStream { } Err(e) => { self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e.into()))); + return Poll::Ready(Some(Err(e))); } }, FileStreamState::Scan { @@ -260,7 +258,11 @@ impl FileStream { self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); let result = result - .and_then(|b| self.pc_projector.project(b, partition_values)) + .and_then(|b| { + self.pc_projector + .project(b, partition_values) + .map_err(|e| ArrowError::ExternalError(e.into())) + }) .map(|batch| match &mut self.remain { Some(remain) => { if *remain > batch.num_rows() { @@ -280,7 +282,7 @@ impl FileStream { self.state = FileStreamState::Error } self.file_stream_metrics.time_scanning_total.start(); - return Poll::Ready(Some(result)); + return Poll::Ready(Some(result.map_err(Into::into))); } None => { self.file_stream_metrics.time_scanning_until_data.stop(); @@ -297,7 +299,7 @@ impl FileStream { } impl Stream for FileStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index de4b58c1edda..8f2bae7447d5 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -35,7 +35,6 @@ use arrow::{ array::{ArrayData, ArrayRef, DictionaryArray}, buffer::Buffer, datatypes::{DataType, Field, Schema, SchemaRef, UInt16Type}, - error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; pub use avro::AvroExec; @@ -379,12 +378,12 @@ impl PartitionColumnProjector { &mut self, file_batch: RecordBatch, partition_values: &[ScalarValue], - ) -> ArrowResult { + ) -> Result { let expected_cols = self.projected_schema.fields().len() - self.projected_partition_indexes.len(); if file_batch.columns().len() != expected_cols { - return Err(ArrowError::SchemaError(format!( + return Err(DataFusionError::Execution(format!( "Unexpected batch schema from file, expected {} cols but got {}", expected_cols, file_batch.columns().len() @@ -401,7 +400,7 @@ impl PartitionColumnProjector { ), ) } - RecordBatch::try_new(Arc::clone(&self.projected_schema), cols) + RecordBatch::try_new(Arc::clone(&self.projected_schema), cols).map_err(Into::into) } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 8f63947e3029..5e81602fe36d 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -640,7 +640,11 @@ pub async fn plan_to_parquet( let handle: tokio::task::JoinHandle> = tokio::task::spawn(async move { stream - .map(|batch| writer.write(&batch?)) + .map(|batch| { + writer + .write(&batch?) + .map_err(DataFusionError::ParquetError) + }) .try_collect() .await .map_err(DataFusionError::from)?; @@ -863,7 +867,7 @@ mod tests { .write_parquet(&out_dir, None) .await .expect_err("should fail because input file does not match inferred schema"); - assert_eq!("Parquet error: Arrow: underlying Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}")); + assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}")); Ok(()) } diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index 45c1cc25d61d..38d31a883580 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -33,7 +33,6 @@ use crate::physical_plan::{ }; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; -use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_expr::Operator; @@ -239,20 +238,21 @@ struct FilterExecStream { fn batch_filter( batch: &RecordBatch, predicate: &Arc, -) -> ArrowResult { +) -> Result { predicate .evaluate(batch) .map(|v| v.into_array(batch.num_rows())) - .map_err(DataFusionError::into) .and_then(|array| { Ok(as_boolean_array(&array)?) // apply filter array to record batch - .and_then(|filter_array| filter_record_batch(batch, filter_array)) + .and_then(|filter_array| { + filter_record_batch(batch, filter_array).map_err(Into::into) + }) }) } impl Stream for FilterExecStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 82733cee79eb..522cebe0cf0b 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -23,7 +23,6 @@ use futures::{Stream, TryStreamExt}; use std::{any::Any, sync::Arc, task::Poll}; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use crate::execution::context::TaskContext; @@ -345,7 +344,7 @@ fn build_batch( batch: &RecordBatch, left_data: &RecordBatch, schema: &Schema, -) -> ArrowResult { +) -> Result { // Repeat value on the left n times let arrays = left_data .columns() @@ -364,11 +363,12 @@ fn build_batch( .cloned() .collect(), ) + .map_err(Into::into) } #[async_trait] impl Stream for CrossJoinStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -384,7 +384,7 @@ impl CrossJoinStream { fn poll_next_impl( &mut self, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll>> { + ) -> std::task::Poll>> { let left_data = match ready!(self.left_fut.get(cx)) { Ok(left_data) => left_data, Err(e) => return Poll::Ready(Some(Err(e))), diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 16f143560173..e75b4072e6ed 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -43,7 +43,6 @@ use futures::{ready, Stream, StreamExt, TryStreamExt}; use arrow::array::Array; use arrow::datatypes::{ArrowNativeType, DataType}; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use arrow::array::{ @@ -1146,7 +1145,7 @@ impl HashJoinStream { fn poll_next_impl( &mut self, cx: &mut std::task::Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let build_timer = self.join_metrics.build_time.timer(); let left_data = match ready!(self.left_fut.get(cx)) { Ok(left_data) => left_data, @@ -1220,12 +1219,9 @@ impl HashJoinStream { self.join_metrics.output_rows.add(batch.num_rows()); Some(result) } - Err(_) => { - // TODO why the type of result stream is `Result`, and not the `DataFusionError` - Some(Err(ArrowError::ComputeError( - "Build left right indices error".to_string(), - ))) - } + Err(_) => Some(Err(DataFusionError::Execution( + "Build left right indices error".to_string(), + ))), }; timer.done(); result @@ -1272,7 +1268,7 @@ impl HashJoinStream { } impl Stream for HashJoinStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -1296,7 +1292,6 @@ mod tests { use arrow::array::UInt32Builder; use arrow::array::UInt64Builder; use arrow::datatypes::Field; - use arrow::error::ArrowError; use datafusion_expr::Operator; use super::*; @@ -2964,7 +2959,7 @@ mod tests { // right input stream returns one good batch and then one error. // The error should be returned. - let err = Err(ArrowError::ComputeError("bad data error".to_string())); + let err = Err(DataFusionError::Execution("bad data error".to_string())); let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); let on = vec![( diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs index 8343925125c0..1e7f6c4db0ca 100644 --- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs +++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs @@ -34,9 +34,8 @@ use arrow::array::{ BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, }; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; -use datafusion_common::Statistics; +use datafusion_common::{DataFusionError, Statistics}; use datafusion_expr::JoinType; use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortExpr}; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -356,7 +355,7 @@ impl NestedLoopJoinStream { fn poll_next_impl( &mut self, cx: &mut std::task::Context<'_>, - ) -> Poll>> { + ) -> Poll>> { // all left row let left_data = match ready!(self.left_fut.get(cx)) { Ok(left_data) => left_data, @@ -400,12 +399,9 @@ impl NestedLoopJoinStream { let mut left_indices_builder = UInt64Builder::new(); let mut right_indices_builder = UInt32Builder::new(); let left_right_indices = match indices_result { - Err(_) => { - // TODO why the type of result stream is `Result`, and not the `DataFusionError` - Err(ArrowError::ComputeError( - "Build left right indices error".to_string(), - )) - } + Err(_) => Err(DataFusionError::Execution( + "Build left right indices error".to_string(), + )), Ok(indices) => { for (left_side, right_side) in indices { left_indices_builder.append_values( @@ -486,7 +482,7 @@ impl NestedLoopJoinStream { } impl Stream for NestedLoopJoinStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 28df317a8ac3..14d08f729285 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -32,7 +32,7 @@ use std::task::{Context, Poll}; use arrow::array::*; use arrow::compute::{concat_batches, take, SortOptions}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; -use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use futures::{Stream, StreamExt}; @@ -574,7 +574,7 @@ impl RecordBatchStream for SMJStream { } impl Stream for SMJStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, @@ -712,7 +712,7 @@ impl SMJStream { } /// Poll next streamed row - fn poll_streamed_row(&mut self, cx: &mut Context) -> Poll>> { + fn poll_streamed_row(&mut self, cx: &mut Context) -> Poll>> { loop { match &self.streamed_state { StreamedState::Init => { @@ -755,10 +755,7 @@ impl SMJStream { } /// Poll next buffered batches - fn poll_buffered_batches( - &mut self, - cx: &mut Context, - ) -> Poll>> { + fn poll_buffered_batches(&mut self, cx: &mut Context) -> Poll>> { loop { match &self.buffered_state { BufferedState::Init => { @@ -856,7 +853,7 @@ impl SMJStream { } /// Get comparison result of streamed row and buffered batches - fn compare_streamed_buffered(&self) -> ArrowResult { + fn compare_streamed_buffered(&self) -> Result { if self.streamed_state == StreamedState::Exhausted { return Ok(Ordering::Greater); } @@ -876,7 +873,7 @@ impl SMJStream { /// Produce join and fill output buffer until reaching target batch size /// or the join is finished - fn join_partial(&mut self) -> ArrowResult<()> { + fn join_partial(&mut self) -> Result<()> { let mut join_streamed = false; let mut join_buffered = false; @@ -960,7 +957,7 @@ impl SMJStream { Ok(()) } - fn freeze_all(&mut self) -> ArrowResult<()> { + fn freeze_all(&mut self) -> Result<()> { self.freeze_streamed()?; self.freeze_buffered(self.buffered_data.batches.len())?; Ok(()) @@ -970,7 +967,7 @@ impl SMJStream { // no longer needed: // 1. freezes all indices joined to streamed side // 2. freezes NULLs joined to dequeued buffered batch to "release" it - fn freeze_dequeuing_buffered(&mut self) -> ArrowResult<()> { + fn freeze_dequeuing_buffered(&mut self) -> Result<()> { self.freeze_streamed()?; self.freeze_buffered(1)?; Ok(()) @@ -980,7 +977,7 @@ impl SMJStream { // NULLs on streamed side. // // Applicable only in case of Full join. - fn freeze_buffered(&mut self, batch_count: usize) -> ArrowResult<()> { + fn freeze_buffered(&mut self, batch_count: usize) -> Result<()> { if !matches!(self.join_type, JoinType::Full) { return Ok(()); } @@ -998,7 +995,8 @@ impl SMJStream { .columns() .iter() .map(|column| take(column, &buffered_indices, None)) - .collect::>>()?; + .collect::, ArrowError>>() + .map_err(Into::::into)?; let mut streamed_columns = self .streamed_schema @@ -1018,7 +1016,7 @@ impl SMJStream { // Produces and stages record batch for all output indices found // for current streamed batch and clears staged output indices. - fn freeze_streamed(&mut self) -> ArrowResult<()> { + fn freeze_streamed(&mut self) -> Result<()> { for chunk in self.streamed_batch.output_indices.iter_mut() { let streamed_indices = chunk.streamed_indices.finish(); @@ -1032,7 +1030,8 @@ impl SMJStream { .columns() .iter() .map(|column| take(column, &streamed_indices, None)) - .collect::>>()?; + .collect::, ArrowError>>() + .map_err(Into::::into)?; let buffered_indices: UInt64Array = chunk.buffered_indices.finish(); @@ -1045,7 +1044,8 @@ impl SMJStream { .columns() .iter() .map(|column| take(column, &buffered_indices, None)) - .collect::>>()? + .collect::, ArrowError>>() + .map_err(Into::::into)? } else { self.buffered_schema .fields() @@ -1071,7 +1071,7 @@ impl SMJStream { Ok(()) } - fn output_record_batch_and_reset(&mut self) -> ArrowResult { + fn output_record_batch_and_reset(&mut self) -> Result { let record_batch = concat_batches(&self.schema, &self.output_record_batches)?; self.join_metrics.output_batches.add(1); self.join_metrics.output_rows.add(record_batch.num_rows()); @@ -1163,7 +1163,7 @@ fn compare_join_arrays( right: usize, sort_options: &[SortOptions], null_equals_null: bool, -) -> ArrowResult { +) -> Result { let mut res = Ordering::Equal; for ((left_array, right_array), sort_options) in left_arrays.iter().zip(right_arrays).zip(sort_options) @@ -1231,7 +1231,7 @@ fn compare_join_arrays( DataType::Date32 => compare_value!(Date32Array), DataType::Date64 => compare_value!(Date64Array), _ => { - return Err(ArrowError::NotYetImplemented( + return Err(DataFusionError::NotImplemented( "Unsupported data type in sort merge join comparator".to_owned(), )); } @@ -1250,7 +1250,7 @@ fn is_join_arrays_equal( left: usize, right_arrays: &[ArrayRef], right: usize, -) -> ArrowResult { +) -> Result { let mut is_equal = true; for (left_array, right_array) in left_arrays.iter().zip(right_arrays) { macro_rules! compare_value { @@ -1297,7 +1297,7 @@ fn is_join_arrays_equal( DataType::Date32 => compare_value!(Date32Array), DataType::Date64 => compare_value!(Date64Array), _ => { - return Err(ArrowError::NotYetImplemented( + return Err(DataFusionError::NotImplemented( "Unsupported data type in sort merge join comparator".to_owned(), )); } diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 29b611bf7731..ae344fb61f6a 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -27,7 +27,6 @@ use arrow::array::{ }; use arrow::compute; use arrow::datatypes::{Field, Schema, UInt32Type, UInt64Type}; -use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::ScalarValue; @@ -681,7 +680,7 @@ impl OnceFut { } /// Get the result of the computation if it is ready, without consuming it - pub(crate) fn get(&mut self, cx: &mut Context<'_>) -> Poll> { + pub(crate) fn get(&mut self, cx: &mut Context<'_>) -> Poll> { if let OnceFutState::Pending(fut) = &mut self.state { let r = ready!(fut.poll_unpin(cx)); self.state = OnceFutState::Ready(r); @@ -693,7 +692,7 @@ impl OnceFut { OnceFutState::Ready(r) => Poll::Ready( r.as_ref() .map(|r| r.as_ref()) - .map_err(|e| ArrowError::ExternalError(Box::new(e.clone()))), + .map_err(|e| DataFusionError::External(Box::new(e.clone()))), ), } } @@ -789,7 +788,7 @@ pub(crate) fn build_batch_from_indices( left_indices: UInt64Array, right_indices: UInt32Array, column_indices: &[ColumnIndex], -) -> ArrowResult { +) -> Result { // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different RecordBatches @@ -821,7 +820,7 @@ pub(crate) fn build_batch_from_indices( }; columns.push(array); } - RecordBatch::try_new(Arc::new(schema.clone()), columns) + RecordBatch::try_new(Arc::new(schema.clone()), columns).map_err(Into::into) } /// The input is the matched indices for left and right and @@ -938,7 +937,8 @@ pub(crate) fn get_semi_indices( #[cfg(test)] mod tests { use super::*; - use arrow::datatypes::DataType; + use arrow::error::Result as ArrowResult; + use arrow::{datatypes::DataType, error::ArrowError}; use datafusion_common::ScalarValue; use std::pin::Pin; @@ -991,7 +991,7 @@ mod tests { ) -> Poll { match ready!(self.0.get(cx)) { Ok(()) => Poll::Ready(Ok(())), - Err(e) => Poll::Ready(Err(e)), + Err(e) => Poll::Ready(Err(e.into())), } } } diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 1b97089f7b62..e633c32920fb 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -31,7 +31,6 @@ use crate::physical_plan::{ }; use arrow::array::ArrayRef; use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use super::expressions::PhysicalSortExpr; @@ -412,7 +411,7 @@ impl LimitStream { fn poll_and_skip( &mut self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let input = self.input.as_mut().unwrap(); loop { let poll = input.poll_next_unpin(cx); @@ -469,7 +468,7 @@ impl LimitStream { } impl Stream for LimitStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs index a15b59552294..557daca61db9 100644 --- a/datafusion/core/src/physical_plan/memory.rs +++ b/datafusion/core/src/physical_plan/memory.rs @@ -29,7 +29,6 @@ use super::{ }; use crate::error::Result; use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use crate::execution::context::TaskContext; @@ -178,7 +177,7 @@ impl MemoryStream { } impl Stream for MemoryStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/metrics/baseline.rs b/datafusion/core/src/physical_plan/metrics/baseline.rs index 8dff5ee3fd77..9c4557fb139c 100644 --- a/datafusion/core/src/physical_plan/metrics/baseline.rs +++ b/datafusion/core/src/physical_plan/metrics/baseline.rs @@ -19,9 +19,10 @@ use std::task::Poll; -use arrow::{error::ArrowError, record_batch::RecordBatch}; +use arrow::record_batch::RecordBatch; use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, Timestamp}; +use crate::error::Result; /// Helper for creating and tracking common "baseline" metrics for /// each operator @@ -141,8 +142,8 @@ impl BaselineMetrics { /// returning the same poll result pub fn record_poll( &self, - poll: Poll>>, - ) -> Poll>> { + poll: Poll>>, + ) -> Poll>> { if let Poll::Ready(maybe_batch) = &poll { match maybe_batch { Some(Ok(batch)) => { @@ -210,7 +211,7 @@ impl RecordOutput for Option { } } -impl RecordOutput for arrow::error::Result { +impl RecordOutput for Result { fn record_output(self, bm: &BaselineMetrics) -> Self { if let Ok(record_batch) = &self { record_batch.record_output(bm); diff --git a/datafusion/core/src/physical_plan/metrics/tracker.rs b/datafusion/core/src/physical_plan/metrics/tracker.rs index 435cf19d8df8..f0980e6394b0 100644 --- a/datafusion/core/src/physical_plan/metrics/tracker.rs +++ b/datafusion/core/src/physical_plan/metrics/tracker.rs @@ -23,8 +23,9 @@ use crate::physical_plan::metrics::{ use std::sync::Arc; use std::task::Poll; +use crate::error::Result; use crate::execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; -use arrow::{error::ArrowError, record_batch::RecordBatch}; +use arrow::record_batch::RecordBatch; /// Wraps a [`BaselineMetrics`] and records memory usage on a [`MemoryReservation`] #[derive(Debug)] @@ -96,8 +97,8 @@ impl MemTrackingMetrics { /// returning the same poll result pub fn record_poll( &self, - poll: Poll>>, - ) -> Poll>> { + poll: Poll>>, + ) -> Poll>> { self.metrics.record_poll(poll) } } diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 0ec7b16ef131..d0a4e556689b 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -27,7 +27,6 @@ use crate::error::Result; use crate::physical_plan::expressions::PhysicalSortExpr; use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; pub use datafusion_expr::Accumulator; @@ -44,7 +43,7 @@ use std::task::{Context, Poll}; use std::{any::Any, pin::Pin}; /// Trait for types that stream [arrow::record_batch::RecordBatch] -pub trait RecordBatchStream: Stream> { +pub trait RecordBatchStream: Stream> { /// Returns the schema of this `RecordBatchStream`. /// /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this @@ -76,7 +75,7 @@ impl RecordBatchStream for EmptyRecordBatchStream { } impl Stream for EmptyRecordBatchStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( self: Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index f699a760b477..f35b7cee8cf9 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -32,7 +32,6 @@ use crate::physical_plan::{ Partitioning, PhysicalExpr, }; use arrow::datatypes::{Field, Schema, SchemaRef}; -use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use log::debug; @@ -318,7 +317,7 @@ fn stats_projection( } impl ProjectionStream { - fn batch_project(&self, batch: &RecordBatch) -> ArrowResult { + fn batch_project(&self, batch: &RecordBatch) -> Result { // records time on drop let _timer = self.baseline_metrics.elapsed_compute().timer(); let arrays = self @@ -332,8 +331,9 @@ impl ProjectionStream { let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); RecordBatch::try_new_with_options(self.schema.clone(), arrays, &options) + .map_err(Into::into) } else { - RecordBatch::try_new(self.schema.clone(), arrays) + RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into) } } } @@ -347,7 +347,7 @@ struct ProjectionStream { } impl Stream for ProjectionStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 4965979951ed..072bdfa3e6d0 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -32,7 +32,6 @@ use crate::physical_plan::{ }; use arrow::array::{ArrayRef, UInt64Builder}; use arrow::datatypes::SchemaRef; -use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use log::debug; @@ -53,7 +52,7 @@ use tokio::task::JoinHandle; mod distributor_channels; -type MaybeBatch = Option>; +type MaybeBatch = Option>; type SharedMemoryReservation = Arc>; /// Inner state of [`RepartitionExec`]. @@ -545,7 +544,7 @@ impl RepartitionExec { /// channels. async fn wait_for_task( input_task: AbortOnDropSingle>, - txs: HashMap>>>, + txs: HashMap>>>, ) { // wait for completion, and propagate error // note we ignore errors on send (.ok) as that means the receiver has already shutdown. @@ -555,12 +554,10 @@ impl RepartitionExec { let e = Arc::new(e); for (_, tx) in txs { - let err = Err(ArrowError::ExternalError(Box::new( - DataFusionError::Context( - "Join Error".to_string(), - Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))), - ), - ))); + let err = Err(DataFusionError::Context( + "Join Error".to_string(), + Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))), + )); tx.send(Some(err)).await.ok(); } } @@ -570,7 +567,7 @@ impl RepartitionExec { for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = Err(ArrowError::ExternalError(Box::new(e.clone()))); + let err = Err(DataFusionError::External(Box::new(e.clone()))); tx.send(Some(err)).await.ok(); } } @@ -607,7 +604,7 @@ struct RepartitionStream { } impl Stream for RepartitionStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, @@ -672,12 +669,9 @@ mod tests { }, }, }; + use arrow::array::{ArrayRef, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - use arrow::{ - array::{ArrayRef, StringArray}, - error::ArrowError, - }; use datafusion_common::cast::as_string_array; use futures::FutureExt; use std::collections::HashSet; @@ -808,9 +802,7 @@ mod tests { repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await }); - let output_partitions = join_handle - .await - .map_err(|e| DataFusionError::Internal(e.to_string()))??; + let output_partitions = join_handle.await.unwrap().unwrap(); assert_eq!(5, output_partitions.len()); assert_eq!(30, output_partitions[0].len()); @@ -892,7 +884,7 @@ mod tests { // input stream returns one good batch and then one error. The // error should be returned. - let err = Err(ArrowError::ComputeError("bad data error".to_string())); + let err = Err(DataFusionError::Execution("bad data error".to_string())); let schema = batch.schema(); let input = MockExec::new(vec![Ok(batch), err], schema); @@ -1179,8 +1171,9 @@ mod tests { // pull partitions for i in 0..exec.partitioning.partition_count() { let mut stream = exec.execute(i, task_ctx.clone())?; - let err = - DataFusionError::ArrowError(stream.next().await.unwrap().unwrap_err()); + let err = DataFusionError::ArrowError( + stream.next().await.unwrap().unwrap_err().into(), + ); let err = err.find_root(); assert!( matches!(err, DataFusionError::ResourcesExhausted(_)), diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index b75fe0d80d6c..bc4117c8f028 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -42,11 +42,11 @@ use arrow::array::{make_array, Array, ArrayRef, MutableArrayData}; pub use arrow::compute::SortOptions; use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; -use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow::error::ArrowError; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use datafusion_physical_expr::EquivalenceProperties; -use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use log::{debug, error}; use std::any::Any; use std::cmp::{min, Ordering}; @@ -503,7 +503,7 @@ impl SortedSizedRecordBatchStream { } impl Stream for SortedSizedRecordBatchStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -531,7 +531,8 @@ impl Stream for SortedSizedRecordBatchStream { make_array(mutable.freeze()) }) .collect::>(); - let batch = RecordBatch::try_new(self.schema.clone(), output); + let batch = + RecordBatch::try_new(self.schema.clone(), output).map_err(Into::into); let poll = Poll::Ready(Some(batch)); self.metrics.record_poll(poll) } @@ -575,10 +576,8 @@ fn read_spill_as_stream( path: NamedTempFile, schema: SchemaRef, ) -> Result { - let (sender, receiver): ( - Sender>, - Receiver>, - ) = tokio::sync::mpsc::channel(2); + let (sender, receiver): (Sender>, Receiver>) = + tokio::sync::mpsc::channel(2); let join_handle = task::spawn_blocking(move || { if let Err(e) = read_spill(sender, path.path()) { error!("Failure while reading spill file: {:?}. Error: {}", path, e); @@ -592,7 +591,7 @@ fn read_spill_as_stream( } fn write_sorted( - mut receiver: Receiver>, + mut receiver: Receiver>, path: PathBuf, schema: SchemaRef, ) -> Result<()> { @@ -610,12 +609,12 @@ fn write_sorted( Ok(()) } -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(path)?); let reader = FileReader::try_new(file, None)?; for batch in reader { sender - .blocking_send(batch) + .blocking_send(batch.map_err(Into::into)) .map_err(|e| DataFusionError::Execution(format!("{e}")))?; } Ok(()) @@ -771,17 +770,14 @@ impl ExecutionPlan for SortExec { Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), - futures::stream::once( - do_sort( - input, - partition, - self.expr.clone(), - self.metrics_set.clone(), - context, - self.fetch(), - ) - .map_err(|e| ArrowError::ExternalError(Box::new(e))), - ) + futures::stream::once(do_sort( + input, + partition, + self.expr.clone(), + self.metrics_set.clone(), + context, + self.fetch(), + )) .try_flatten(), ))) } @@ -818,7 +814,7 @@ fn sort_batch( schema: SchemaRef, expr: &[PhysicalSortExpr], fetch: Option, -) -> ArrowResult { +) -> Result { let sort_columns = expr .iter() .map(|e| e.evaluate_to_sort_column(&batch)) @@ -843,8 +839,9 @@ fn sort_batch( }), ) }) - .collect::>>()?, - )?; + .collect::, ArrowError>>()?, + ) + .map_err(Into::::into)?; let sort_arrays = sort_columns .into_iter() diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 658a5f9fc176..a324bcc486fd 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -23,12 +23,10 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::error::ArrowError; use arrow::row::{RowConverter, SortField}; use arrow::{ array::{make_array as make_arrow_array, MutableArrayData}, datatypes::SchemaRef, - error::Result as ArrowResult, record_batch::RecordBatch, }; use futures::stream::{Fuse, FusedStream}; @@ -397,7 +395,7 @@ impl SortPreservingMergeStream { &mut self, cx: &mut Context<'_>, idx: usize, - ) -> Poll> { + ) -> Poll> { if self.cursors[idx] .as_ref() .map(|cursor| !cursor.is_finished()) @@ -432,9 +430,7 @@ impl SortPreservingMergeStream { let rows = match self.row_converter.convert_columns(&cols) { Ok(rows) => rows, Err(e) => { - return Poll::Ready(Err(ArrowError::ExternalError( - Box::new(e), - ))); + return Poll::Ready(Err(DataFusionError::ArrowError(e))); } }; @@ -462,7 +458,7 @@ impl SortPreservingMergeStream { /// Drains the in_progress row indexes, and builds a new RecordBatch from them /// /// Will then drop any batches for which all rows have been yielded to the output - fn build_record_batch(&mut self) -> ArrowResult { + fn build_record_batch(&mut self) -> Result { // Mapping from stream index to the index of the first buffer from that stream let mut buffer_idx = 0; let mut stream_to_buffer_idx = Vec::with_capacity(self.batches.len()); @@ -544,12 +540,12 @@ impl SortPreservingMergeStream { } } - RecordBatch::try_new(self.schema.clone(), columns) + RecordBatch::try_new(self.schema.clone(), columns).map_err(Into::into) } } impl Stream for SortPreservingMergeStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, @@ -565,7 +561,7 @@ impl SortPreservingMergeStream { fn poll_next_inner( self: &mut Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { if self.aborted { return Poll::Ready(None); } @@ -621,7 +617,7 @@ impl SortPreservingMergeStream { fn init_loser_tree( self: &mut Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let num_streams = self.streams.num_streams(); if !self.loser_tree.is_empty() { @@ -674,7 +670,7 @@ impl SortPreservingMergeStream { fn update_loser_tree( self: &mut Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { if self.loser_tree_adjusted { return Poll::Ready(Ok(())); } diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs index 77be217e4cfa..2190022bc505 100644 --- a/datafusion/core/src/physical_plan/stream.rs +++ b/datafusion/core/src/physical_plan/stream.rs @@ -17,9 +17,8 @@ //! Stream wrappers for physical operators -use arrow::{ - datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch, -}; +use crate::error::Result; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use futures::{Stream, StreamExt}; use pin_project_lite::pin_project; use tokio::task::JoinHandle; @@ -34,7 +33,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream}; pub struct RecordBatchReceiverStream { schema: SchemaRef, - inner: ReceiverStream>, + inner: ReceiverStream>, #[allow(dead_code)] drop_helper: AbortOnDropSingle<()>, @@ -45,7 +44,7 @@ impl RecordBatchReceiverStream { /// batches of the specified schema from `inner` pub fn create( schema: &SchemaRef, - rx: tokio::sync::mpsc::Receiver>, + rx: tokio::sync::mpsc::Receiver>, join_handle: JoinHandle<()>, ) -> SendableRecordBatchStream { let schema = schema.clone(); @@ -59,7 +58,7 @@ impl RecordBatchReceiverStream { } impl Stream for RecordBatchReceiverStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -103,9 +102,9 @@ impl std::fmt::Debug for RecordBatchStreamAdapter { impl Stream for RecordBatchStreamAdapter where - S: Stream>, + S: Stream>, { - type Item = ArrowResult; + type Item = Result; fn poll_next( self: std::pin::Pin<&mut Self>, @@ -121,7 +120,7 @@ where impl RecordBatchStream for RecordBatchStreamAdapter where - S: Stream>, + S: Stream>, { fn schema(&self) -> SchemaRef { self.schema.clone() diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs index cd9ff1938e0d..efd43aca6b15 100644 --- a/datafusion/core/src/physical_plan/streaming.rs +++ b/datafusion/core/src/physical_plan/streaming.rs @@ -111,7 +111,9 @@ impl ExecutionPlan for StreamingTableExec { Ok(match self.projection.clone() { Some(projection) => Box::pin(RecordBatchStreamAdapter::new( self.projected_schema.clone(), - stream.map(move |x| x.and_then(|b| b.project(projection.as_ref()))), + stream.map(move |x| { + x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into)) + }), )), None => stream, }) diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index df78058082f5..2f665e79c511 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -25,7 +25,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; -use arrow::error::Result as ArrowResult; use arrow::{ datatypes::{Field, Schema, SchemaRef}, record_batch::RecordBatch, @@ -358,7 +357,7 @@ impl RecordBatchStream for CombinedRecordBatchStream { } impl Stream for CombinedRecordBatchStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, @@ -426,7 +425,7 @@ impl RecordBatchStream for ObservedStream { } impl futures::Stream for ObservedStream { - type Item = arrow::error::Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index 7fc3c638097f..c6f1aff43e47 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -35,7 +35,6 @@ use arrow::compute::{concat, lexicographical_partition_ranges, SortColumn}; use arrow::{ array::ArrayRef, datatypes::{Schema, SchemaRef}, - error::Result as ArrowResult, record_batch::RecordBatch, }; use datafusion_common::{DataFusionError, ScalarValue}; @@ -414,7 +413,7 @@ impl PartitionByHandler for SortedPartitionByBoundedWindowStream { } impl Stream for SortedPartitionByBoundedWindowStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, @@ -449,7 +448,7 @@ impl SortedPartitionByBoundedWindowStream { } } - fn compute_aggregates(&mut self) -> ArrowResult { + fn compute_aggregates(&mut self) -> Result { // calculate window cols for (cur_window_expr, state) in self.window_expr.iter().zip(&mut self.window_agg_states) @@ -462,7 +461,7 @@ impl SortedPartitionByBoundedWindowStream { if let Some(columns_to_show) = columns_to_show { let n_generated = columns_to_show[0].len(); self.prune_state(n_generated)?; - RecordBatch::try_new(schema, columns_to_show) + RecordBatch::try_new(schema, columns_to_show).map_err(Into::into) } else { Ok(RecordBatch::new_empty(schema)) } @@ -472,7 +471,7 @@ impl SortedPartitionByBoundedWindowStream { fn poll_next_inner( &mut self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { if self.finished { return Poll::Ready(None); } @@ -653,9 +652,7 @@ impl SortedPartitionByBoundedWindowStream { end: num_rows, }] } else { - lexicographical_partition_ranges(partition_columns) - .map_err(DataFusionError::ArrowError)? - .collect::>() + lexicographical_partition_ranges(partition_columns)?.collect() }) } } diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index fbd05fa88485..2c98561d3c03 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -32,10 +32,10 @@ use crate::physical_plan::{ use arrow::compute::{ concat, concat_batches, lexicographical_partition_ranges, SortColumn, }; +use arrow::error::ArrowError; use arrow::{ array::ArrayRef, datatypes::{Schema, SchemaRef}, - error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; use datafusion_common::DataFusionError; @@ -326,7 +326,7 @@ impl WindowAggStream { } } - fn compute_aggregates(&self) -> ArrowResult { + fn compute_aggregates(&self) -> Result { // record compute time on drop let _timer = self.baseline_metrics.elapsed_compute().timer(); @@ -348,20 +348,18 @@ impl WindowAggStream { // Calculate window cols for partition_point in partition_points { let length = partition_point.end - partition_point.start; - partition_results.push( - compute_window_aggregates( - &self.window_expr, - &batch.slice(partition_point.start, length), - ) - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?, - ) + partition_results.push(compute_window_aggregates( + &self.window_expr, + &batch.slice(partition_point.start, length), + )?) } let columns = transpose(partition_results) .iter() .map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::>())) .collect::>() .into_iter() - .collect::>>()?; + .collect::, ArrowError>>() + .map_err(Into::::into)?; // combine with the original cols // note the setup of window aggregates is that they newly calculated window @@ -369,7 +367,7 @@ impl WindowAggStream { let mut batch_columns = batch.columns().to_vec(); // calculate window cols batch_columns.extend_from_slice(&columns); - RecordBatch::try_new(self.schema.clone(), batch_columns) + RecordBatch::try_new(self.schema.clone(), batch_columns).map_err(Into::into) } /// Evaluates the partition points given the sort columns. If the sort columns are @@ -393,7 +391,7 @@ impl WindowAggStream { } impl Stream for WindowAggStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, @@ -409,7 +407,7 @@ impl WindowAggStream { fn poll_next_inner( &mut self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { if self.finished { return Poll::Ready(None); } diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs index 18d1aa2aa245..ea1643867bb8 100644 --- a/datafusion/core/src/scheduler/pipeline/execution.rs +++ b/datafusion/core/src/scheduler/pipeline/execution.rs @@ -26,7 +26,7 @@ use futures::{Stream, StreamExt}; use parking_lot::Mutex; use crate::arrow::datatypes::SchemaRef; -use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::TaskContext; use crate::physical_plan::expressions::PhysicalSortExpr; @@ -186,7 +186,7 @@ struct InputPartitionStream { } impl Stream for InputPartitionStream { - type Item = ArrowResult; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut partition = self.partition.lock(); diff --git a/datafusion/core/src/scheduler/task.rs b/datafusion/core/src/scheduler/task.rs index 9283810e787d..ffa9728a4b4a 100644 --- a/datafusion/core/src/scheduler/task.rs +++ b/datafusion/core/src/scheduler/task.rs @@ -23,7 +23,6 @@ use crate::scheduler::{ Spawner, }; use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use futures::channel::mpsc; use futures::task::ArcWake; @@ -261,14 +260,14 @@ struct ExecutionResultStream { } impl Stream for ExecutionResultStream { - type Item = arrow::error::Result; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten(); - Poll::Ready(opt.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e))))) + Poll::Ready(opt) } } diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs index 956774b62cad..b191735469e1 100644 --- a/datafusion/core/src/test/exec.rs +++ b/datafusion/core/src/test/exec.rs @@ -27,7 +27,6 @@ use tokio::sync::Barrier; use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, - error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; use futures::Stream; @@ -89,7 +88,7 @@ impl TestStream { } impl Stream for TestStream { - type Item = ArrowResult; + type Item = Result; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { let next_batch = self.index.value(); @@ -120,7 +119,7 @@ impl RecordBatchStream for TestStream { #[derive(Debug)] pub struct MockExec { /// the results to send back - data: Vec>, + data: Vec>, schema: SchemaRef, } @@ -129,7 +128,7 @@ impl MockExec { /// record batches in this Exec. Note the batches are not produced /// immediately (the caller has to actually yield and another task /// must run) to ensure any poll loops are correct. - pub fn new(data: Vec>, schema: SchemaRef) -> Self { + pub fn new(data: Vec>, schema: SchemaRef) -> Self { Self { data, schema } } } @@ -216,7 +215,7 @@ impl ExecutionPlan for MockExec { // Panics if one of the batches is an error fn statistics(&self) -> Statistics { - let data: ArrowResult> = self + let data: Result> = self .data .iter() .map(|r| match r { @@ -231,10 +230,10 @@ impl ExecutionPlan for MockExec { } } -fn clone_error(e: &ArrowError) -> ArrowError { - use ArrowError::*; +fn clone_error(e: &DataFusionError) -> DataFusionError { + use DataFusionError::*; match e { - ComputeError(msg) => ComputeError(msg.to_string()), + Execution(msg) => Execution(msg.to_string()), _ => unimplemented!(), } } @@ -683,7 +682,7 @@ pub struct BlockingStream { } impl Stream for BlockingStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( self: Pin<&mut Self>, diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index ba17c56e4efa..9842f1b596e7 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -18,7 +18,6 @@ use arrow::array::{Int32Array, Int64Array}; use arrow::compute::kernels::aggregate; use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef}; -use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::from_slice::FromSlice; @@ -85,7 +84,7 @@ impl RecordBatchStream for TestCustomRecordBatchStream { } impl Stream for TestCustomRecordBatchStream { - type Item = ArrowResult; + type Item = Result; fn poll_next( self: Pin<&mut Self>, @@ -93,7 +92,7 @@ impl Stream for TestCustomRecordBatchStream { ) -> Poll> { if self.nb_batch > 0 { self.get_mut().nb_batch -= 1; - Poll::Ready(Some(TEST_CUSTOM_RECORD_BATCH!())) + Poll::Ready(Some(TEST_CUSTOM_RECORD_BATCH!().map_err(Into::into))) } else { Poll::Ready(None) } diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs index c7dce1b00aa0..0e3a30a30c65 100644 --- a/datafusion/core/tests/sql/timestamp.rs +++ b/datafusion/core/tests/sql/timestamp.rs @@ -1350,7 +1350,7 @@ async fn date_bin() { let result = try_execute_to_batches(&ctx, sql).await; assert_eq!( result.err().unwrap().to_string(), - "Arrow error: External error: This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays" + "This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays" ); } diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 22feeed2cb49..08d6f1b32928 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1474,7 +1474,7 @@ async fn window_frame_creation() -> Result<()> { let results = df.collect().await; assert_contains!( results.err().unwrap().to_string(), - "Arrow error: External error: Internal error: Operator - is not implemented for types UInt32(1) and Utf8(\"1 DAY\")" + "External error: Internal error: Operator - is not implemented for types UInt32(1) and Utf8(\"1 DAY\"). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" ); Ok(()) diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index eca11184e71b..af7eb61f4605 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -63,7 +63,6 @@ use futures::{Stream, StreamExt}; use arrow::{ array::{Int64Array, StringArray}, datatypes::SchemaRef, - error::ArrowError, record_batch::RecordBatch, util::pretty::pretty_format_batches, }; @@ -563,7 +562,7 @@ fn accumulate_batch( } impl Stream for TopKReader { - type Item = Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -590,13 +589,16 @@ impl Stream for TopKReader { self.state.iter().rev().unzip(); let customer: Vec<&str> = customer.iter().map(|&s| &**s).collect(); - Poll::Ready(Some(RecordBatch::try_new( - schema, - vec![ - Arc::new(StringArray::from(customer)), - Arc::new(Int64Array::from(revenue)), - ], - ))) + Poll::Ready(Some( + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(customer)), + Arc::new(Int64Array::from(revenue)), + ], + ) + .map_err(Into::into), + )) } other => other, } From a1f4b5a7f160f908eea1d708103c2cbf3dbba98d Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 28 Jan 2023 16:37:25 -0800 Subject: [PATCH 2/2] fix comments --- datafusion/core/src/physical_plan/filter.rs | 4 +--- datafusion/core/src/physical_plan/joins/sort_merge_join.rs | 6 ++---- datafusion/core/src/physical_plan/joins/utils.rs | 2 +- datafusion/core/src/physical_plan/sorts/sort.rs | 3 +-- .../src/physical_plan/windows/bounded_window_agg_exec.rs | 2 +- .../core/src/physical_plan/windows/window_agg_exec.rs | 5 ++--- 6 files changed, 8 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index 38d31a883580..44283b49c5df 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -245,9 +245,7 @@ fn batch_filter( .and_then(|array| { Ok(as_boolean_array(&array)?) // apply filter array to record batch - .and_then(|filter_array| { - filter_record_batch(batch, filter_array).map_err(Into::into) - }) + .and_then(|filter_array| Ok(filter_record_batch(batch, filter_array)?)) }) } diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 14d08f729285..edcd4bfa8eb1 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -1030,8 +1030,7 @@ impl SMJStream { .columns() .iter() .map(|column| take(column, &streamed_indices, None)) - .collect::, ArrowError>>() - .map_err(Into::::into)?; + .collect::, ArrowError>>()?; let buffered_indices: UInt64Array = chunk.buffered_indices.finish(); @@ -1044,8 +1043,7 @@ impl SMJStream { .columns() .iter() .map(|column| take(column, &buffered_indices, None)) - .collect::, ArrowError>>() - .map_err(Into::::into)? + .collect::, ArrowError>>()? } else { self.buffered_schema .fields() diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index ae344fb61f6a..3724585238e6 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -820,7 +820,7 @@ pub(crate) fn build_batch_from_indices( }; columns.push(array); } - RecordBatch::try_new(Arc::new(schema.clone()), columns).map_err(Into::into) + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } /// The input is the matched indices for left and right and diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index bc4117c8f028..ad08504c3b86 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -840,8 +840,7 @@ fn sort_batch( ) }) .collect::, ArrowError>>()?, - ) - .map_err(Into::::into)?; + )?; let sort_arrays = sort_columns .into_iter() diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index c6f1aff43e47..8ad88ffc3aca 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -461,7 +461,7 @@ impl SortedPartitionByBoundedWindowStream { if let Some(columns_to_show) = columns_to_show { let n_generated = columns_to_show[0].len(); self.prune_state(n_generated)?; - RecordBatch::try_new(schema, columns_to_show).map_err(Into::into) + Ok(RecordBatch::try_new(schema, columns_to_show)?) } else { Ok(RecordBatch::new_empty(schema)) } diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 2c98561d3c03..c1f7087269b6 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -358,8 +358,7 @@ impl WindowAggStream { .map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::>())) .collect::>() .into_iter() - .collect::, ArrowError>>() - .map_err(Into::::into)?; + .collect::, ArrowError>>()?; // combine with the original cols // note the setup of window aggregates is that they newly calculated window @@ -367,7 +366,7 @@ impl WindowAggStream { let mut batch_columns = batch.columns().to_vec(); // calculate window cols batch_columns.extend_from_slice(&columns); - RecordBatch::try_new(self.schema.clone(), batch_columns).map_err(Into::into) + Ok(RecordBatch::try_new(self.schema.clone(), batch_columns)?) } /// Evaluates the partition points given the sort columns. If the sort columns are