Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved read of IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 29, 2022
1 parent e89fe34 commit 339610e
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 167 deletions.
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_binary<O: Offset, R: Read + Seek>(
Expand All @@ -18,7 +18,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::array::{DictionaryArray, DictionaryKey};
use crate::error::{Error, Result};

use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node};
use super::{read_primitive, skip_primitive};

#[allow(clippy::too_many_arguments)]
Expand All @@ -19,7 +19,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
block_offset: u64,
compression: Option<Compression>,
is_little_endian: bool,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<DictionaryArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_binary<R: Read + Seek>(
Expand All @@ -17,7 +17,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_list<R: Read + Seek>(
Expand All @@ -22,7 +22,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
6 changes: 2 additions & 4 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
Expand All @@ -26,7 +24,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<ListArray<O>>
where
Vec<u8>: TryInto<O::Bytes>,
Expand Down
6 changes: 2 additions & 4 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_map<R: Read + Seek>(
Expand All @@ -25,7 +23,7 @@ pub fn read_map<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<MapArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{Error, Result};
use crate::{array::PrimitiveArray, types::NativeType};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_primitive<T: NativeType, R: Read + Seek>(
Expand All @@ -17,7 +17,7 @@ pub fn read_primitive<T: NativeType, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<PrimitiveArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_struct<R: Read + Seek>(
Expand All @@ -22,7 +22,7 @@ pub fn read_struct<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<StructArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
6 changes: 2 additions & 4 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_union<R: Read + Seek>(
Expand All @@ -25,7 +23,7 @@ pub fn read_union<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<UnionArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_utf8<O: Offset, R: Read + Seek>(
Expand All @@ -18,7 +18,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Utf8Array<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
5 changes: 2 additions & 3 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::datatypes::{DataType, Field};
use crate::error::{Error, Result};
use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::{IpcField, IpcSchema};
use crate::io::ReadBuffer;

use super::deserialize::{read, skip};
use super::Dictionaries;
Expand Down Expand Up @@ -86,7 +85,7 @@ pub fn read_record_batch<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
file_size: u64,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
let buffers = batch
Expand Down Expand Up @@ -234,7 +233,7 @@ pub fn read_dictionary<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
file_size: u64,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<()> {
if batch
.is_delta()
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::Result;
use crate::io::ipc::IpcField;

use super::{array::*, Dictionaries};
use super::{IpcBuffer, Node, ReadBuffer};
use super::{IpcBuffer, Node};

#[allow(clippy::too_many_arguments)]
pub fn read<R: Read + Seek>(
Expand All @@ -24,7 +24,7 @@ pub fn read<R: Read + Seek>(
is_little_endian: bool,
compression: Option<BodyCompressionRef>,
version: MetadataVersion,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Box<dyn Array>> {
use PhysicalType::*;
let data_type = field.data_type.clone();
Expand Down
63 changes: 42 additions & 21 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use super::reader::{deserialize_footer, get_serialized_batch};
use super::Dictionaries;
use super::FileMetadata;
use super::OutOfSpecKind;
use super::ReadBuffer;

/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
Expand Down Expand Up @@ -142,9 +141,14 @@ where
{
let footer_size = read_footer_len(reader).await?;
// Read footer
let mut footer = vec![0; footer_size];
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
reader.read_exact(&mut footer).await?;

let mut footer = vec![];
footer.try_reserve(footer_size)?;
reader
.take(footer_size as u64)
.read_to_end(&mut footer)
.await?;

deserialize_footer(&footer, u64::MAX)
}
Expand All @@ -156,9 +160,9 @@ async fn read_batch<R>(
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
meta_buffer: &mut ReadBuffer,
block_buffer: &mut ReadBuffer,
scratch: &mut ReadBuffer,
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
Expand All @@ -181,10 +185,14 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

meta_buffer.set_len(meta_len);
reader.read_exact(meta_buffer.as_mut()).await?;
meta_buffer.clear();
meta_buffer.try_reserve(meta_len)?;
(&mut reader)
.take(meta_len as u64)
.read_to_end(meta_buffer)
.await?;

let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_ref())
let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer)
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let batch = get_serialized_batch(&message)?;
Expand All @@ -195,9 +203,14 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

block_buffer.set_len(block_length);
reader.read_exact(block_buffer.as_mut()).await?;
let mut cursor = std::io::Cursor::new(block_buffer.as_ref());
block_buffer.clear();
block_buffer.try_reserve(block_length)?;
reader
.take(block_length as u64)
.read_to_end(block_buffer)
.await?;

let mut cursor = std::io::Cursor::new(&block_buffer);

read_record_batch(
batch,
Expand All @@ -220,14 +233,14 @@ async fn read_dictionaries<R>(
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: &[Block],
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Dictionaries>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut dictionaries = Default::default();
let mut data: ReadBuffer = vec![].into();
let mut buffer: ReadBuffer = vec![].into();
let mut data: Vec<u8> = vec![];
let mut buffer: Vec<u8> = vec![];

for block in blocks {
let offset: u64 = block
Expand All @@ -250,11 +263,15 @@ where
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?;

buffer.set_len(length);
match header {
MessageHeaderRef::DictionaryBatch(batch) => {
reader.read_exact(buffer.as_mut()).await?;
let mut cursor = std::io::Cursor::new(buffer.as_ref());
buffer.clear();
buffer.try_reserve(length)?;
(&mut reader)
.take(length as u64)
.read_to_end(&mut buffer)
.await?;
let mut cursor = std::io::Cursor::new(&buffer);
read_dictionary(
batch,
fields,
Expand All @@ -272,7 +289,7 @@ where
Ok(dictionaries)
}

async fn read_dictionary_message<R>(mut reader: R, offset: u64, data: &mut ReadBuffer) -> Result<()>
async fn read_dictionary_message<R>(mut reader: R, offset: u64, data: &mut Vec<u8>) -> Result<()>
where
R: AsyncRead + AsyncSeek + Unpin,
{
Expand All @@ -288,8 +305,12 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

data.set_len(footer_size);
reader.read_exact(data.as_mut()).await?;
data.clear();
data.try_reserve(footer_size)?;
(&mut reader)
.take(footer_size as u64)
.read_to_end(data)
.await?;

Ok(())
}
Expand Down
2 changes: 0 additions & 2 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub mod stream_async;
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))]
pub mod file_async;

use super::super::ReadBuffer;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{
read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader,
Expand Down
Loading

0 comments on commit 339610e

Please sign in to comment.