Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add underlying std::io::Error to IoError and add IpcError variant #4726

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ mod tests {

fn endpoint(uri: String) -> Result<Endpoint, ArrowError> {
let endpoint = Endpoint::new(uri)
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
.map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
Expand Down
10 changes: 5 additions & 5 deletions arrow-flight/src/bin/flight_sql_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async fn setup_client(
let protocol = if args.tls { "https" } else { "http" };

let mut endpoint = Endpoint::new(format!("{}://{}:{}", protocol, args.host, port))
.map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
.map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
Expand All @@ -162,15 +162,15 @@ async fn setup_client(

if args.tls {
let tls_config = ClientTlsConfig::new();
endpoint = endpoint
.tls_config(tls_config)
.map_err(|_| ArrowError::IoError("Cannot create TLS endpoint".to_string()))?;
endpoint = endpoint.tls_config(tls_config).map_err(|_| {
ArrowError::IpcError("Cannot create TLS endpoint".to_string())
})?;
}

let channel = endpoint
.connect()
.await
.map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?;
.map_err(|e| ArrowError::IpcError(format!("Cannot connect to endpoint: {e}")))?;

let mut client = FlightSqlServiceClient::new(channel);
info!("connected");
Expand Down
16 changes: 10 additions & 6 deletions arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl FlightSqlServiceClient<Channel> {
.flight_client
.handshake(req)
.await
.map_err(|e| ArrowError::IoError(format!("Can't handshake {e}")))?;
.map_err(|e| ArrowError::IpcError(format!("Can't handshake {e}")))?;
if let Some(auth) = resp.metadata().get("authorization") {
let auth = auth.to_str().map_err(|_| {
ArrowError::ParseError("Can't read auth header".to_string())
Expand Down Expand Up @@ -390,16 +390,20 @@ impl FlightSqlServiceClient<Channel> {
) -> Result<tonic::Request<T>, ArrowError> {
for (k, v) in &self.headers {
let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| {
ArrowError::IoError(format!("Cannot convert header key \"{k}\": {e}"))
ArrowError::ParseError(format!("Cannot convert header key \"{k}\": {e}"))
})?;
let v = v.parse().map_err(|e| {
ArrowError::IoError(format!("Cannot convert header value \"{v}\": {e}"))
ArrowError::ParseError(format!(
"Cannot convert header value \"{v}\": {e}"
))
})?;
req.metadata_mut().insert(k, v);
}
if let Some(token) = &self.token {
let val = format!("Bearer {token}").parse().map_err(|e| {
ArrowError::IoError(format!("Cannot convert token to header value: {e}"))
ArrowError::ParseError(format!(
"Cannot convert token to header value: {e}"
))
})?;
req.metadata_mut().insert("authorization", val);
}
Expand Down Expand Up @@ -504,11 +508,11 @@ impl PreparedStatement<Channel> {
}

fn decode_error_to_arrow_error(err: prost::DecodeError) -> ArrowError {
ArrowError::IoError(err.to_string())
ArrowError::IpcError(err.to_string())
}

fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
ArrowError::IoError(format!("{status:?}"))
ArrowError::IpcError(format!("{status:?}"))
}

// A polymorphic structure to natively represent different types of data contained in `FlightData`
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/tests/encode_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ async fn test_mismatched_schema_message() {
do_test(
make_primitive_batch(5),
make_dictionary_batch(3),
"Error decoding ipc RecordBatch: Io error: Invalid data for schema",
"Error decoding ipc RecordBatch: Schema error: Invalid data for schema",
)
.await;

Expand Down
4 changes: 2 additions & 2 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowErr
if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
Ok(schema)
} else {
Err(ArrowError::IoError(
Err(ArrowError::ParseError(
"Unable to get head as schema".to_string(),
))
}
} else {
Err(ArrowError::IoError(
Err(ArrowError::ParseError(
"Unable to get root as message".to_string(),
))
}
Expand Down
54 changes: 28 additions & 26 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];

let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::IoError(format!("Field {field} does not have dict id"))
ArrowError::ParseError(format!("Field {field} does not have dict id"))
})?;

let value_array =
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
ArrowError::IoError(format!(
ArrowError::ParseError(format!(
"Cannot find a dictionary batch with dict id: {dict_id}"
))
})?;
Expand Down Expand Up @@ -193,7 +193,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
let null_count = node.null_count();

if length != null_count {
return Err(ArrowError::IoError(format!(
return Err(ArrowError::SchemaError(format!(
"Field {field} of NullArray has unequal null_count {null_count} and len {length}"
)));
}
Expand Down Expand Up @@ -325,7 +325,7 @@ impl<'a> ArrayReader<'a> {

fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
self.nodes.next().ok_or_else(|| {
ArrowError::IoError(format!(
ArrowError::SchemaError(format!(
"Invalid data for schema. {} refers to node not found in schema",
field
))
Expand Down Expand Up @@ -402,10 +402,10 @@ pub fn read_record_batch(
metadata: &MetadataVersion,
) -> Result<RecordBatch, ArrowError> {
let buffers = batch.buffers().ok_or_else(|| {
ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string())
ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
})?;
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;
let batch_compression = batch.compression();
let compression = batch_compression
Expand Down Expand Up @@ -462,7 +462,7 @@ pub fn read_dictionary(
metadata: &crate::MetadataVersion,
) -> Result<(), ArrowError> {
if batch.isDelta() {
return Err(ArrowError::IoError(
return Err(ArrowError::InvalidArgumentError(
"delta dictionary batches not supported".to_string(),
));
}
Expand Down Expand Up @@ -569,14 +569,14 @@ impl<R: Read + Seek> FileReader<R> {
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
return Err(ArrowError::ParseError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
return Err(ArrowError::ParseError(
"Arrow file does not contain correct footer".to_string(),
));
}
Expand All @@ -592,11 +592,11 @@ impl<R: Read + Seek> FileReader<R> {
reader.read_exact(&mut footer_data)?;

let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {err:?}"))
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;

let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::IoError(
ArrowError::ParseError(
"Unable to get record batches from IPC Footer".to_string(),
)
})?;
Expand Down Expand Up @@ -633,7 +633,9 @@ impl<R: Read + Seek> FileReader<R> {
reader.read_exact(&mut block_data)?;

let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
ArrowError::ParseError(format!(
"Unable to get root as message: {err:?}"
))
})?;

match message.header_type() {
Expand All @@ -657,7 +659,7 @@ impl<R: Read + Seek> FileReader<R> {
)?;
}
t => {
return Err(ArrowError::IoError(format!(
return Err(ArrowError::ParseError(format!(
"Expecting DictionaryBatch in dictionary blocks, found {t:?}."
)));
}
Expand Down Expand Up @@ -705,7 +707,7 @@ impl<R: Read + Seek> FileReader<R> {
/// Sets the current block to the index, allowing random reads
pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
if index >= self.total_blocks {
Err(ArrowError::IoError(format!(
Err(ArrowError::InvalidArgumentError(format!(
"Cannot set batch to index {} from {} total batches",
index, self.total_blocks
)))
Expand All @@ -732,25 +734,25 @@ impl<R: Read + Seek> FileReader<R> {
let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {err:?}"))
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;

// some old test data's footer metadata is not set, so we account for that
if self.metadata_version != crate::MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Could not read IPC message as metadata versions mismatch".to_string(),
));
}

match message.header_type() {
crate::MessageHeader::Schema => Err(ArrowError::IoError(
crate::MessageHeader::Schema => Err(ArrowError::IpcError(
"Not expecting a schema when messages are read".to_string(),
)),
crate::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
ArrowError::IpcError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
Expand All @@ -774,7 +776,7 @@ impl<R: Read + Seek> FileReader<R> {
crate::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(format!(
t => Err(ArrowError::InvalidArgumentError(format!(
"Reading types other than record batches not yet supported, unable to read {t:?}"
))),
}
Expand Down Expand Up @@ -886,11 +888,11 @@ impl<R: Read> StreamReader<R> {
reader.read_exact(&mut meta_buffer)?;

let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;
// message header is a Schema, so read it
let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IoError("Unable to read IPC message as schema".to_string())
ArrowError::ParseError("Unable to read IPC message as schema".to_string())
})?;
let schema = crate::convert::fb_to_schema(ipc_schema);

Expand Down Expand Up @@ -965,16 +967,16 @@ impl<R: Read> StreamReader<R> {

let vecs = &meta_buffer.to_vec();
let message = crate::root_as_message(vecs).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {err:?}"))
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;

match message.header_type() {
crate::MessageHeader::Schema => Err(ArrowError::IoError(
crate::MessageHeader::Schema => Err(ArrowError::IpcError(
"Not expecting a schema when messages are read".to_string(),
)),
crate::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
ArrowError::IpcError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
Expand All @@ -986,7 +988,7 @@ impl<R: Read> StreamReader<R> {
}
crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
ArrowError::IoError(
ArrowError::IpcError(
"Unable to read IPC message as dictionary batch".to_string(),
)
})?;
Expand All @@ -1004,7 +1006,7 @@ impl<R: Read> StreamReader<R> {
crate::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(
t => Err(ArrowError::InvalidArgumentError(
format!("Reading types other than record batches not yet supported, unable to read {t:?} ")
)),
}
Expand Down
8 changes: 4 additions & 4 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ impl<W: Write> FileWriter<W> {
/// Write a record batch to the file
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Cannot write record batch to file writer as it is closed".to_string(),
));
}
Expand Down Expand Up @@ -794,7 +794,7 @@ impl<W: Write> FileWriter<W> {
/// Write footer and closing tag, then mark the writer as done
pub fn finish(&mut self) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Cannot write footer to file writer as it is closed".to_string(),
));
}
Expand Down Expand Up @@ -909,7 +909,7 @@ impl<W: Write> StreamWriter<W> {
/// Write a record batch to the stream
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Cannot write record batch to stream writer as it is closed".to_string(),
));
}
Expand All @@ -930,7 +930,7 @@ impl<W: Write> StreamWriter<W> {
/// Write continuation bytes, and mark the stream as done
pub fn finish(&mut self) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
return Err(ArrowError::IpcError(
"Cannot write footer to stream writer as it is closed".to_string(),
));
}
Expand Down
10 changes: 6 additions & 4 deletions arrow-schema/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub enum ArrowError {
DivideByZero,
CsvError(String),
JsonError(String),
IoError(String),
IoError(String, std::io::Error),
IpcError(String),
InvalidArgumentError(String),
ParquetError(String),
/// Error during import or export to/from the C Data Interface
Expand All @@ -53,7 +54,7 @@ impl ArrowError {

impl From<std::io::Error> for ArrowError {
fn from(error: std::io::Error) -> Self {
ArrowError::IoError(error.to_string())
ArrowError::IoError(error.to_string(), error)
}
}

Expand All @@ -65,7 +66,7 @@ impl From<std::string::FromUtf8Error> for ArrowError {

impl<W: Write> From<std::io::IntoInnerError<W>> for ArrowError {
fn from(error: std::io::IntoInnerError<W>) -> Self {
ArrowError::IoError(error.to_string())
ArrowError::IoError(error.to_string(), error.into())
}
}

Expand All @@ -84,7 +85,8 @@ impl Display for ArrowError {
ArrowError::DivideByZero => write!(f, "Divide by zero error"),
ArrowError::CsvError(desc) => write!(f, "Csv error: {desc}"),
ArrowError::JsonError(desc) => write!(f, "Json error: {desc}"),
ArrowError::IoError(desc) => write!(f, "Io error: {desc}"),
ArrowError::IoError(desc, _) => write!(f, "Io error: {desc}"),
ArrowError::IpcError(desc) => write!(f, "Ipc error: {desc}"),
ArrowError::InvalidArgumentError(desc) => {
write!(f, "Invalid argument error: {desc}")
}
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ fn get_error_code(err: &ArrowError) -> i32 {
match err {
ArrowError::NotYetImplemented(_) => ENOSYS,
ArrowError::MemoryError(_) => ENOMEM,
ArrowError::IoError(_) => EIO,
ArrowError::IoError(_, _) => EIO,
_ => EINVAL,
}
}
Expand Down