diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index 08a36bc49ea8..1e99957390d8 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -802,7 +802,7 @@ mod tests { fn endpoint(uri: String) -> Result { let endpoint = Endpoint::new(uri) - .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))? + .map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))? .connect_timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20)) .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait diff --git a/arrow-flight/src/bin/flight_sql_client.rs b/arrow-flight/src/bin/flight_sql_client.rs index e5aacc2e779a..20c8062f899e 100644 --- a/arrow-flight/src/bin/flight_sql_client.rs +++ b/arrow-flight/src/bin/flight_sql_client.rs @@ -151,7 +151,7 @@ async fn setup_client( let protocol = if args.tls { "https" } else { "http" }; let mut endpoint = Endpoint::new(format!("{}://{}:{}", protocol, args.host, port)) - .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))? + .map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))? .connect_timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20)) .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait @@ -162,15 +162,15 @@ async fn setup_client( if args.tls { let tls_config = ClientTlsConfig::new(); - endpoint = endpoint - .tls_config(tls_config) - .map_err(|_| ArrowError::IoError("Cannot create TLS endpoint".to_string()))?; + endpoint = endpoint.tls_config(tls_config).map_err(|_| { + ArrowError::IpcError("Cannot create TLS endpoint".to_string()) + })?; } let channel = endpoint .connect() .await - .map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?; + .map_err(|e| ArrowError::IpcError(format!("Cannot connect to endpoint: {e}")))?; let mut client = FlightSqlServiceClient::new(channel); info!("connected"); diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs index d661c9640908..4b1f38ebcbb7 100644 --- a/arrow-flight/src/sql/client.rs +++ b/arrow-flight/src/sql/client.rs @@ -150,7 +150,7 @@ impl FlightSqlServiceClient { .flight_client .handshake(req) .await - .map_err(|e| ArrowError::IoError(format!("Can't handshake {e}")))?; + .map_err(|e| ArrowError::IpcError(format!("Can't handshake {e}")))?; if let Some(auth) = resp.metadata().get("authorization") { let auth = auth.to_str().map_err(|_| { ArrowError::ParseError("Can't read auth header".to_string()) @@ -390,16 +390,20 @@ impl FlightSqlServiceClient { ) -> Result, ArrowError> { for (k, v) in &self.headers { let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| { - ArrowError::IoError(format!("Cannot convert header key \"{k}\": {e}")) + ArrowError::ParseError(format!("Cannot convert header key \"{k}\": {e}")) })?; let v = v.parse().map_err(|e| { - ArrowError::IoError(format!("Cannot convert header value \"{v}\": {e}")) + ArrowError::ParseError(format!( + "Cannot convert header value \"{v}\": {e}" + )) })?; req.metadata_mut().insert(k, v); } if let Some(token) = &self.token { let val = format!("Bearer {token}").parse().map_err(|e| { - ArrowError::IoError(format!("Cannot convert token to header value: {e}")) + ArrowError::ParseError(format!( + "Cannot convert token to header value: {e}" + )) })?; req.metadata_mut().insert("authorization", val); } @@ -504,11 +508,11 @@ impl PreparedStatement { } fn decode_error_to_arrow_error(err: prost::DecodeError) -> ArrowError { - ArrowError::IoError(err.to_string()) + ArrowError::IpcError(err.to_string()) } fn status_to_arrow_error(status: tonic::Status) -> ArrowError { - ArrowError::IoError(format!("{status:?}")) + ArrowError::IpcError(format!("{status:?}")) } // A polymorphic structure to natively represent different types of data contained in `FlightData` diff --git a/arrow-flight/tests/encode_decode.rs b/arrow-flight/tests/encode_decode.rs index 4f1a8e667ffc..71bcf4e0521a 100644 --- a/arrow-flight/tests/encode_decode.rs +++ b/arrow-flight/tests/encode_decode.rs @@ -386,7 +386,7 @@ async fn test_mismatched_schema_message() { do_test( make_primitive_batch(5), make_dictionary_batch(3), - "Error decoding ipc RecordBatch: Io error: Invalid data for schema", + "Error decoding ipc RecordBatch: Schema error: Invalid data for schema", ) .await; diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 07f716dea843..3569562af228 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -150,12 +150,12 @@ pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result Result Result ArrayReader<'a> { fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> { self.nodes.next().ok_or_else(|| { - ArrowError::IoError(format!( + ArrowError::SchemaError(format!( "Invalid data for schema. {} refers to node not found in schema", field )) @@ -402,10 +402,10 @@ pub fn read_record_batch( metadata: &MetadataVersion, ) -> Result { let buffers = batch.buffers().ok_or_else(|| { - ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string()) + ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string()) })?; let field_nodes = batch.nodes().ok_or_else(|| { - ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string()) + ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; let batch_compression = batch.compression(); let compression = batch_compression @@ -462,7 +462,7 @@ pub fn read_dictionary( metadata: &crate::MetadataVersion, ) -> Result<(), ArrowError> { if batch.isDelta() { - return Err(ArrowError::IoError( + return Err(ArrowError::InvalidArgumentError( "delta dictionary batches not supported".to_string(), )); } @@ -569,14 +569,14 @@ impl FileReader { let mut magic_buffer: [u8; 6] = [0; 6]; reader.read_exact(&mut magic_buffer)?; if magic_buffer != super::ARROW_MAGIC { - return Err(ArrowError::IoError( + return Err(ArrowError::ParseError( "Arrow file does not contain correct header".to_string(), )); } reader.seek(SeekFrom::End(-6))?; reader.read_exact(&mut magic_buffer)?; if magic_buffer != super::ARROW_MAGIC { - return Err(ArrowError::IoError( + return Err(ArrowError::ParseError( "Arrow file does not contain correct footer".to_string(), )); } @@ -592,11 +592,11 @@ impl FileReader { reader.read_exact(&mut footer_data)?; let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| { - ArrowError::IoError(format!("Unable to get root as footer: {err:?}")) + ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")) })?; let blocks = footer.recordBatches().ok_or_else(|| { - ArrowError::IoError( + ArrowError::ParseError( "Unable to get record batches from IPC Footer".to_string(), ) })?; @@ -633,7 +633,9 @@ impl FileReader { reader.read_exact(&mut block_data)?; let message = crate::root_as_message(&block_data[..]).map_err(|err| { - ArrowError::IoError(format!("Unable to get root as message: {err:?}")) + ArrowError::ParseError(format!( + "Unable to get root as message: {err:?}" + )) })?; match message.header_type() { @@ -657,7 +659,7 @@ impl FileReader { )?; } t => { - return Err(ArrowError::IoError(format!( + return Err(ArrowError::ParseError(format!( "Expecting DictionaryBatch in dictionary blocks, found {t:?}." ))); } @@ -705,7 +707,7 @@ impl FileReader { /// Sets the current block to the index, allowing random reads pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> { if index >= self.total_blocks { - Err(ArrowError::IoError(format!( + Err(ArrowError::InvalidArgumentError(format!( "Cannot set batch to index {} from {} total batches", index, self.total_blocks ))) @@ -732,25 +734,25 @@ impl FileReader { let mut block_data = vec![0; meta_len as usize]; self.reader.read_exact(&mut block_data)?; let message = crate::root_as_message(&block_data[..]).map_err(|err| { - ArrowError::IoError(format!("Unable to get root as footer: {err:?}")) + ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")) })?; // some old test data's footer metadata is not set, so we account for that if self.metadata_version != crate::MetadataVersion::V1 && message.version() != self.metadata_version { - return Err(ArrowError::IoError( + return Err(ArrowError::IpcError( "Could not read IPC message as metadata versions mismatch".to_string(), )); } match message.header_type() { - crate::MessageHeader::Schema => Err(ArrowError::IoError( + crate::MessageHeader::Schema => Err(ArrowError::IpcError( "Not expecting a schema when messages are read".to_string(), )), crate::MessageHeader::RecordBatch => { let batch = message.header_as_record_batch().ok_or_else(|| { - ArrowError::IoError( + ArrowError::IpcError( "Unable to read IPC message as record batch".to_string(), ) })?; @@ -774,7 +776,7 @@ impl FileReader { crate::MessageHeader::NONE => { Ok(None) } - t => Err(ArrowError::IoError(format!( + t => Err(ArrowError::InvalidArgumentError(format!( "Reading types other than record batches not yet supported, unable to read {t:?}" ))), } @@ -886,11 +888,11 @@ impl StreamReader { reader.read_exact(&mut meta_buffer)?; let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| { - ArrowError::IoError(format!("Unable to get root as message: {err:?}")) + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) })?; // message header is a Schema, so read it let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| { - ArrowError::IoError("Unable to read IPC message as schema".to_string()) + ArrowError::ParseError("Unable to read IPC message as schema".to_string()) })?; let schema = crate::convert::fb_to_schema(ipc_schema); @@ -965,16 +967,16 @@ impl StreamReader { let vecs = &meta_buffer.to_vec(); let message = crate::root_as_message(vecs).map_err(|err| { - ArrowError::IoError(format!("Unable to get root as message: {err:?}")) + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) })?; match message.header_type() { - crate::MessageHeader::Schema => Err(ArrowError::IoError( + crate::MessageHeader::Schema => Err(ArrowError::IpcError( "Not expecting a schema when messages are read".to_string(), )), crate::MessageHeader::RecordBatch => { let batch = message.header_as_record_batch().ok_or_else(|| { - ArrowError::IoError( + ArrowError::IpcError( "Unable to read IPC message as record batch".to_string(), ) })?; @@ -986,7 +988,7 @@ impl StreamReader { } crate::MessageHeader::DictionaryBatch => { let batch = message.header_as_dictionary_batch().ok_or_else(|| { - ArrowError::IoError( + ArrowError::IpcError( "Unable to read IPC message as dictionary batch".to_string(), ) })?; @@ -1004,7 +1006,7 @@ impl StreamReader { crate::MessageHeader::NONE => { Ok(None) } - t => Err(ArrowError::IoError( + t => Err(ArrowError::InvalidArgumentError( format!("Reading types other than record batches not yet supported, unable to read {t:?} ") )), } diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 1c56613d8f24..9c418d76e485 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -757,7 +757,7 @@ impl FileWriter { /// Write a record batch to the file pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { if self.finished { - return Err(ArrowError::IoError( + return Err(ArrowError::IpcError( "Cannot write record batch to file writer as it is closed".to_string(), )); } @@ -794,7 +794,7 @@ impl FileWriter { /// Write footer and closing tag, then mark the writer as done pub fn finish(&mut self) -> Result<(), ArrowError> { if self.finished { - return Err(ArrowError::IoError( + return Err(ArrowError::IpcError( "Cannot write footer to file writer as it is closed".to_string(), )); } @@ -909,7 +909,7 @@ impl StreamWriter { /// Write a record batch to the stream pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { if self.finished { - return Err(ArrowError::IoError( + return Err(ArrowError::IpcError( "Cannot write record batch to stream writer as it is closed".to_string(), )); } @@ -930,7 +930,7 @@ impl StreamWriter { /// Write continuation bytes, and mark the stream as done pub fn finish(&mut self) -> Result<(), ArrowError> { if self.finished { - return Err(ArrowError::IoError( + return Err(ArrowError::IpcError( "Cannot write footer to stream writer as it is closed".to_string(), )); } diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs index cd236c0871a6..8ea533db89af 100644 --- a/arrow-schema/src/error.rs +++ b/arrow-schema/src/error.rs @@ -35,7 +35,8 @@ pub enum ArrowError { DivideByZero, CsvError(String), JsonError(String), - IoError(String), + IoError(String, std::io::Error), + IpcError(String), InvalidArgumentError(String), ParquetError(String), /// Error during import or export to/from the C Data Interface @@ -53,7 +54,7 @@ impl ArrowError { impl From for ArrowError { fn from(error: std::io::Error) -> Self { - ArrowError::IoError(error.to_string()) + ArrowError::IoError(error.to_string(), error) } } @@ -65,7 +66,7 @@ impl From for ArrowError { impl From> for ArrowError { fn from(error: std::io::IntoInnerError) -> Self { - ArrowError::IoError(error.to_string()) + ArrowError::IoError(error.to_string(), error.into()) } } @@ -84,7 +85,8 @@ impl Display for ArrowError { ArrowError::DivideByZero => write!(f, "Divide by zero error"), ArrowError::CsvError(desc) => write!(f, "Csv error: {desc}"), ArrowError::JsonError(desc) => write!(f, "Json error: {desc}"), - ArrowError::IoError(desc) => write!(f, "Io error: {desc}"), + ArrowError::IoError(desc, _) => write!(f, "Io error: {desc}"), + ArrowError::IpcError(desc) => write!(f, "Ipc error: {desc}"), ArrowError::InvalidArgumentError(desc) => { write!(f, "Invalid argument error: {desc}") } diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index a9d2e8ab6bf2..7005cadc623c 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -258,7 +258,7 @@ fn get_error_code(err: &ArrowError) -> i32 { match err { ArrowError::NotYetImplemented(_) => ENOSYS, ArrowError::MemoryError(_) => ENOMEM, - ArrowError::IoError(_) => EIO, + ArrowError::IoError(_, _) => EIO, _ => EINVAL, } }