From 0bfa06030398a2ec8176ccce5a0fdf302c1148f7 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 30 Nov 2022 09:58:15 +0100 Subject: [PATCH] fix: do NOT convert errors to strings Wrap them into proper containers instead. Fixes #4434. --- datafusion-cli/src/command.rs | 12 +++--------- datafusion-cli/src/main.rs | 4 ++-- datafusion-cli/src/object_storage.rs | 4 ++-- datafusion-cli/src/print_format.rs | 6 +++--- .../src/datasource/listing_table_factory.rs | 6 +++--- .../core/src/physical_plan/repartition.rs | 17 ++++++++++++----- .../physical-expr/src/regex_expressions.rs | 8 ++++---- 7 files changed, 29 insertions(+), 28 deletions(-) diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index f1b6f67e3faf7..d9704f3ba6334 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -59,22 +59,16 @@ impl Command { ) -> Result<()> { let now = Instant::now(); match self { - Self::Help => print_options - .print_batches(&[all_commands_info()], now) - .map_err(|e| DataFusionError::Execution(e.to_string())), + Self::Help => print_options.print_batches(&[all_commands_info()], now), Self::ListTables => { let df = ctx.sql("SHOW TABLES").await?; let batches = df.collect().await?; - print_options - .print_batches(&batches, now) - .map_err(|e| DataFusionError::Execution(e.to_string())) + print_options.print_batches(&batches, now) } Self::DescribeTable(name) => { let df = ctx.sql(&format!("SHOW COLUMNS FROM {}", name)).await?; let batches = df.collect().await?; - print_options - .print_batches(&batches, now) - .map_err(|e| DataFusionError::Execution(e.to_string())) + print_options.print_batches(&batches, now) } Self::Include(filename) => { if let Some(filename) = filename { diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index a6a3daa2e98ac..c4e198ff009c8 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -146,8 +146,8 @@ fn create_runtime_env() -> Result { let object_store_provider = DatafusionCliObjectStoreProvider {}; let object_store_registry = ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider))); - let rn_config = RuntimeConfig::new() - .with_object_store_registry(Arc::new(object_store_registry)); + let rn_config = + RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); RuntimeEnv::new(rn_config) } diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 64c48840eefb5..177b12717ce0f 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -60,7 +60,7 @@ fn build_s3_object_store(url: &Url) -> Result let host = get_host_name(url)?; match AmazonS3Builder::from_env().with_bucket_name(host).build() { Ok(s3) => Ok(Arc::new(s3)), - Err(err) => Err(DataFusionError::Execution(err.to_string())), + Err(err) => Err(DataFusionError::External(Box::new(err))), } } @@ -73,7 +73,7 @@ fn build_gcs_object_store(url: &Url) -> Result Ok(Arc::new(gcs)), - Err(err) => Err(DataFusionError::Execution(err.to_string())), + Err(err) => Err(DataFusionError::External(Box::new(err))), } } diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index c3eb096054a39..6f23efb1adc42 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -49,7 +49,7 @@ macro_rules! batches_to_json { writer.write_batches($batches)?; writer.finish()?; } - String::from_utf8(bytes).map_err(|e| DataFusionError::Execution(e.to_string()))? + String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))? }}; } @@ -64,8 +64,8 @@ fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result>>()? .into_iter() diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs index fdcb448d2caff..9492fb7497a62 100644 --- a/datafusion/core/src/physical_plan/repartition.rs +++ b/datafusion/core/src/physical_plan/repartition.rs @@ -30,7 +30,7 @@ use crate::physical_plan::{ }; use arrow::array::{ArrayRef, UInt64Builder}; use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; +use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use log::debug; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -482,18 +482,25 @@ impl RepartitionExec { match input_task.await { // Error in joining task Err(e) => { + let e = Arc::new(e); + for (_, tx) in txs { - let err = DataFusionError::Execution(format!("Join Error: {}", e)); - let err = Err(err.into()); + let err = Err(ArrowError::ExternalError(Box::new( + DataFusionError::Context( + "Join Error".to_string(), + Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))), + ), + ))); tx.send(Some(err)).ok(); } } // Error from running input task Ok(Err(e)) => { + let e = Arc::new(e); + for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = DataFusionError::Execution(e.to_string()); - let err = Err(err.into()); + let err = Err(ArrowError::ExternalError(Box::new(e.clone()))); tx.send(Some(err)).ok(); } } diff --git a/datafusion/physical-expr/src/regex_expressions.rs b/datafusion/physical-expr/src/regex_expressions.rs index a0e7546644d31..4a5b51b8a2fed 100644 --- a/datafusion/physical-expr/src/regex_expressions.rs +++ b/datafusion/physical-expr/src/regex_expressions.rs @@ -136,7 +136,7 @@ pub fn regexp_replace(args: &[ArrayRef]) -> Result patterns.insert(pattern.to_string(), re.clone()); Ok(re) }, - Err(err) => Err(DataFusionError::Execution(err.to_string())), + Err(err) => Err(DataFusionError::External(Box::new(err))), } } }; @@ -182,7 +182,7 @@ pub fn regexp_replace(args: &[ArrayRef]) -> Result patterns.insert(pattern, re.clone()); Ok(re) }, - Err(err) => Err(DataFusionError::Execution(err.to_string())), + Err(err) => Err(DataFusionError::External(Box::new(err))), } } }; @@ -254,8 +254,8 @@ fn _regexp_replace_static_pattern_replace( None => (pattern.to_string(), 1), }; - let re = Regex::new(&pattern) - .map_err(|err| DataFusionError::Execution(err.to_string()))?; + let re = + Regex::new(&pattern).map_err(|err| DataFusionError::External(Box::new(err)))?; // Replaces the posix groups in the replacement string // with rust ones.