Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed API bug in async read of IPC metadata #969

Merged
merged 1 commit into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 34 additions & 46 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use arrow_format::ipc::{
planus::{ReadAsRoot, Vector},
BlockRef, FooterRef, MessageHeaderRef, MessageRef,
BlockRef, MessageHeaderRef, MessageRef,
};
use futures::{
stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt,
Expand All @@ -18,8 +18,7 @@ use crate::error::{ArrowError, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::reader::get_serialized_batch;
use super::schema::fb_to_schema;
use super::reader::{deserialize_footer, get_serialized_batch};
use super::Dictionaries;
use super::FileMetadata;

Expand Down Expand Up @@ -114,62 +113,51 @@ impl<'a> Stream for FileStream<'a> {
}
}

/// Reads the footer's length and magic number in footer
async fn read_footer_len<R: AsyncRead + AsyncSeek + Unpin>(reader: &mut R) -> Result<usize> {
// read footer length and magic number in footer
reader.seek(SeekFrom::End(-10)).await?;
let mut footer: [u8; 10] = [0; 10];

reader.read_exact(&mut footer).await?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct footer".to_string(),
));
}
footer_len
.try_into()
.map_err(|_| ArrowError::oos("The footer's lenght must be a positive number"))
}

/// Read the metadata from an IPC file.
pub async fn read_file_metadata_async<R>(mut reader: R) -> Result<FileMetadata>
pub async fn read_file_metadata_async<R>(reader: &mut R) -> Result<FileMetadata>
where
R: AsyncRead + AsyncSeek + Unpin,
{
// Check header
let mut magic = [0; 6];
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow header".to_string(),
));
}
// Check footer
reader.seek(SeekFrom::End(-6)).await?;
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow footer".to_string(),
));
}
// Get footer size
let mut footer_size = [0; 4];
reader.seek(SeekFrom::End(-10)).await?;
reader.read_exact(&mut footer_size).await?;
let footer_size = i32::from_le_bytes(footer_size);
let footer_size = read_footer_len(reader).await?;
// Read footer
let mut footer = vec![0; footer_size as usize];
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
reader.read_exact(&mut footer).await?;
let footer = FooterRef::read_as_root(&footer[..])
.map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as footer: {:?}", err)))?;

let blocks = footer.record_batches()?.ok_or_else(|| {
ArrowError::OutOfSpec("unable to get record batches from footer".to_string())
})?;
let schema = footer
.schema()?
.ok_or_else(|| ArrowError::OutOfSpec("unable to get schema from footer".to_string()))?;
let (schema, ipc_schema) = fb_to_schema(schema)?;
let dictionary_blocks = footer.dictionaries()?;
let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema.fields[..], &ipc_schema, blocks).await?
let (mut metadata, dictionary_blocks) = deserialize_footer(&footer)?;

metadata.dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)
.await?
} else {
Default::default()
};

Ok(FileMetadata {
schema,
ipc_schema,
blocks: blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
Ok(metadata)
}

async fn read_dictionaries<R>(
Expand Down
139 changes: 83 additions & 56 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,63 +108,89 @@ fn read_dictionaries<R: Read + Seek>(
Ok(dictionaries)
}

/// Read the IPC file's metadata
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata> {
// check if header and footer contain correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
/// Reads the footer's length and magic number in footer
fn read_footer_len<R: Read + Seek>(reader: &mut R) -> Result<usize> {
// read footer length and magic number in footer
reader.seek(SeekFrom::End(-10))?;
let mut footer: [u8; 10] = [0; 10];

reader.read_exact(&mut footer)?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct footer".to_string(),
));
}
// read footer length
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = i32::from_le_bytes(footer_size);

// read footer
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
footer_len
.try_into()
.map_err(|_| ArrowError::oos("The footer's lenght must be a positive number"))
}

let footer = arrow_format::ipc::FooterRef::read_as_root(&footer_data)
pub(super) fn deserialize_footer(
footer_data: &[u8],
) -> Result<(FileMetadata, Option<Vector<arrow_format::ipc::BlockRef>>)> {
let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
.map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?;

let blocks = footer.record_batches()?.ok_or_else(|| {
ArrowError::OutOfSpec("Unable to get record batches from footer".to_string())
})?;

let blocks = blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?;

let ipc_schema = footer
.schema()?
.ok_or_else(|| ArrowError::OutOfSpec("Unable to get the schema from footer".to_string()))?;
let (schema, ipc_schema) = fb_to_schema(ipc_schema)?;

let dictionary_blocks = footer.dictionaries()?;
Ok((
FileMetadata {
schema,
ipc_schema,
blocks,
dictionaries: Default::default(),
},
footer.dictionaries()?,
))
}

/// Read the IPC file's metadata
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata> {
// check if header contain the correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct header".to_string(),
));
}

let footer_len = read_footer_len(reader)?;

// read footer
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;

let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema.fields, &ipc_schema, blocks)?
let (mut metadata, dictionary_blocks) = deserialize_footer(&footer_data)?;

// read dictionaries
metadata.dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)?
} else {
Default::default()
};

Ok(FileMetadata {
schema,
ipc_schema,
blocks: blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
Ok(metadata)
}

pub(super) fn get_serialized_batch<'a>(
Expand Down Expand Up @@ -272,26 +298,27 @@ impl<R: Read + Seek> Iterator for FileReader<R> {

fn next(&mut self) -> Option<Self::Item> {
// get current block
if self.current_block < self.metadata.blocks.len() {
let block = self.current_block;
self.current_block += 1;
let chunk = read_batch(
&mut self.reader,
&self.metadata,
self.projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut self.buffer,
);

let chunk = if let Some((projection, map, _)) = &self.projection {
// re-order according to projection
chunk.map(|chunk| apply_projection(chunk, projection, map))
} else {
chunk
};
Some(chunk)
} else {
None
if self.current_block == self.metadata.blocks.len() {
return None;
}

let block = self.current_block;
self.current_block += 1;

let chunk = read_batch(
&mut self.reader,
&self.metadata,
self.projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut self.buffer,
);

let chunk = if let Some((projection, map, _)) = &self.projection {
// re-order according to projection
chunk.map(|chunk| apply_projection(chunk, projection, map))
} else {
chunk
};
Some(chunk)
}
}