Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for limit pushdown to IPC reading #1135

Merged
merged 3 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
fn read_ipc(buf: &[u8]) -> Result<Chunk<Box<dyn Array>>> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
let mut reader = read::FileReader::new(cursor, metadata, None, None);
reader.next().unwrap()
}
18 changes: 10 additions & 8 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow2::io::print;

/// Simplest way: read all record batches from the file. This can be used e.g. for random access.
#[allow(clippy::type_complexity)]
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Box<dyn Array>>>)> {
fn read_chunks(path: &str) -> Result<(Schema, Vec<Chunk<Box<dyn Array>>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
Expand All @@ -18,10 +18,10 @@ fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Box<dyn Array>>>)> {
let schema = metadata.schema.clone();

// Simplest way: use the reader, an iterator over batches.
let reader = read::FileReader::new(file, metadata, None);
let reader = read::FileReader::new(file, metadata, None, None);

let columns = reader.collect::<Result<Vec<_>>>()?;
Ok((schema, columns))
let chunks = reader.collect::<Result<Vec<_>>>()?;
Ok((schema, chunks))
}

/// Random access way: read a single record batch from the file. This can be used e.g. for random access.
Expand All @@ -36,13 +36,15 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
// advanced way: read the dictionary
let dictionaries = read::read_file_dictionaries(&mut file, &metadata, &mut Default::default())?;

// and the chunk
let chunk_index = 0;

let chunk = read::read_batch(
&mut file,
&dictionaries,
&metadata,
None,
None,
chunk_index,
&mut Default::default(),
&mut Default::default(),
Expand All @@ -57,12 +59,12 @@ fn main() -> Result<()> {

let file_path = &args[1];

let (schema, batches) = read_batches(file_path)?;
let (schema, chunks) = read_chunks(file_path)?;
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&batches, &names));
println!("{}", print::write(&chunks, &names));

let (schema, batch) = read_batch(file_path)?;
let (schema, chunk) = read_batch(file_path)?;
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&[batch], &names));
println!("{}", print::write(&[chunk], &names));
Ok(())
}
10 changes: 5 additions & 5 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write;

fn write_batches(path: &str, schema: Schema, columns: &[Chunk<Box<dyn Array>>]) -> Result<()> {
fn write_batches(path: &str, schema: Schema, chunks: &[Chunk<Box<dyn Array>>]) -> Result<()> {
let file = File::create(path)?;

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::new(file, schema, None, options);

writer.start()?;
for columns in columns {
writer.write(columns, None)?
for chunk in chunks {
writer.write(chunk, None)?
}
writer.finish()
}
Expand All @@ -34,9 +34,9 @@ fn main() -> Result<()> {
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Utf8Array::<i32>::from_slice(&["a", "b", "c", "d", "e"]);

let batch = Chunk::try_new(vec![a.boxed(), b.boxed()])?;
let chunk = Chunk::try_new(vec![a.boxed(), b.boxed()])?;

// write it
write_batches(file_path, schema, &[batch])?;
write_batches(file_path, schema, &[chunk])?;
Ok(())
}
2 changes: 1 addition & 1 deletion examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {

// declare encodings
let encodings = (&schema.fields)
.par_iter()
.iter()
.map(|f| transverse(&f.data_type, encoding_map))
.collect::<Vec<_>>();

Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn main() -> Result<()> {
let filename = args.file_name;
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(f, metadata.clone(), None);
let mut reader = read::FileReader::new(f, metadata.clone(), None, None);

let options = write::WriteOptions { compression: None };
let mut writer = write::StreamWriter::new(std::io::stdout(), options);
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>

let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(arrow_file, metadata.clone(), None);
let reader = read::FileReader::new(arrow_file, metadata.clone(), None, None);

let names = metadata
.schema
Expand Down Expand Up @@ -118,7 +118,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
// open Arrow file
let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(arrow_file, metadata, None);
let reader = read::FileReader::new(arrow_file, metadata, None, None);
let arrow_schema = reader.schema();

// compare schemas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ async fn record_batch_from_message(
fields,
ipc_schema,
None,
None,
dictionaries,
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
Expand Down
1 change: 1 addition & 0 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub fn deserialize_batch(
fields,
ipc_schema,
None,
None,
dictionaries,
message.version()?,
&mut reader,
Expand Down
8 changes: 4 additions & 4 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
//! // Fetch some of the data and get the reader back
//! let mut reader = File::open(&path)?;
//! let metadata = read_file_metadata(&mut reader)?;
//! let mut filereader = FileReader::new(reader, metadata, None);
//! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let mut reader = filereader.into_inner();
//! let mut reader = FileReader::new(reader, metadata, None, None);
//! let row1 = reader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let row2 = reader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let mut reader = reader.into_inner();
//! // Do more stuff with the reader, like seeking ahead.
//! # Ok::<(), Error>(())
//! ```
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -34,13 +35,15 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let offsets: Buffer<O> = read_buffer(
buffers,
Expand All @@ -54,7 +57,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;

let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize();
let last_offset = offsets.last().unwrap().to_usize();
let values = read_buffer(
buffers,
last_offset,
Expand Down
13 changes: 8 additions & 5 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn read_boolean<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -26,21 +27,23 @@ pub fn read_boolean<R: Read + Seek>(
))
})?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let values = read_bitmap(
buffers,
length,
Expand Down
2 changes: 2 additions & 0 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
dictionaries: &Dictionaries,
block_offset: u64,
compression: Option<Compression>,
limit: Option<usize>,
is_little_endian: bool,
scratch: &mut Vec<u8>,
) -> Result<DictionaryArray<T>>
Expand Down Expand Up @@ -48,6 +49,7 @@ where
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

Expand Down
3 changes: 3 additions & 0 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -33,13 +34,15 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let length = length.saturating_mul(FixedSizeBinaryArray::maybe_get_size(&data_type)?);
let values = read_buffer(
Expand Down
7 changes: 6 additions & 1 deletion src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
version: Version,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeListArray> {
Expand All @@ -38,10 +39,13 @@ pub fn read_fixed_size_list<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let (field, _) = FixedSizeListArray::get_child_and_size(&data_type);
let (field, size) = FixedSizeListArray::get_child_and_size(&data_type);

let limit = limit.map(|x| x.saturating_mul(size));

let values = read(
field_nodes,
Expand All @@ -53,6 +57,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
version,
scratch,
)?;
Expand Down
6 changes: 6 additions & 0 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
version: Version,
scratch: &mut Vec<u8>,
) -> Result<ListArray<O>>
Expand All @@ -43,13 +44,15 @@ where
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let offsets = read_buffer::<O, _>(
buffers,
Expand All @@ -63,6 +66,8 @@ where
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;

let last_offset = offsets.last().unwrap().to_usize();

let field = ListArray::<O>::get_child_field(&data_type);

let values = read(
Expand All @@ -75,6 +80,7 @@ where
block_offset,
is_little_endian,
compression,
Some(last_offset),
version,
scratch,
)?;
Expand Down
6 changes: 6 additions & 0 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub fn read_map<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
version: Version,
scratch: &mut Vec<u8>,
) -> Result<MapArray> {
Expand All @@ -39,13 +40,15 @@ pub fn read_map<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let offsets = read_buffer::<i32, _>(
buffers,
Expand All @@ -61,6 +64,8 @@ pub fn read_map<R: Read + Seek>(

let field = MapArray::get_field(&data_type);

let last_offset: usize = offsets.last().copied().unwrap() as usize;

let field = read(
field_nodes,
field,
Expand All @@ -71,6 +76,7 @@ pub fn read_map<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
Some(last_offset),
version,
scratch,
)?;
Expand Down
Loading