diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index 8cc7d9a0e23..87b41ffc99b 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"] [dependencies] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } -arrow-format = { version = "0.7", features = ["full"] } +arrow-format = { version = "0.7", features = ["flight-data", "flight-service"] } async-trait = "0.1.41" clap = { version = "^3", features = ["derive"] } futures = "0.3" diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 3cdcd17f2e7..8955a3e121d 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -21,23 +21,15 @@ use arrow2::{ array::Array, chunk::Chunk, datatypes::*, - io::ipc::{ - read::{self, Dictionaries}, - write, IpcSchema, - }, io::{ flight::{self, deserialize_batch, serialize_batch}, - ipc::IpcField, + ipc::{read::Dictionaries, write, IpcField, IpcSchema}, }, }; -use arrow_format::flight::service::flight_service_client::FlightServiceClient; -use arrow_format::ipc; -use arrow_format::{ - flight::data::{ - flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, - }, - ipc::planus::ReadAsRoot, +use arrow_format::flight::data::{ + flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, }; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use tonic::{Request, Streaming}; @@ -233,30 +225,14 @@ async fn read_dictionaries( dictionaries: &mut Dictionaries, ) -> Option { let mut data = stream.next().await?.ok()?; - let mut message = - ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing first message"); - - while let ipc::MessageHeaderRef::DictionaryBatch(chunk) = message - .header() - .expect("Header to be valid flatbuffers") - .expect("Header to be present") - { - let length = data.data_body.len(); - let mut reader = std::io::Cursor::new(&data.data_body); - read::read_dictionary( - chunk, - fields, - ipc_schema, - dictionaries, - &mut reader, - 0, - length as u64, - &mut Default::default(), - ) - .expect("Error reading dictionary"); + let existing = dictionaries.len(); + loop { + flight::deserialize_dictionary(&data, fields, ipc_schema, dictionaries).ok()?; + if dictionaries.len() == existing { + break; + } data = stream.next().await?.ok()?; - message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing message"); } Some(data) diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 9d21cf0c975..89df0a041b5 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -26,14 +26,13 @@ use tonic::{transport::Server, Request, Response, Status, Streaming}; use arrow_format::flight::data::flight_descriptor::*; use arrow_format::flight::data::*; use arrow_format::flight::service::flight_service_server::*; -use arrow_format::ipc::planus::ReadAsRoot; -use arrow_format::ipc::MessageHeaderRef; use arrow2::array::Array; use arrow2::chunk::Chunk; -use arrow2::datatypes::{Field, Schema}; +use arrow2::datatypes::Schema; use arrow2::io::flight::{ - deserialize_schemas, serialize_batch, serialize_schema, serialize_schema_to_info, + deserialize_message, deserialize_schemas, serialize_batch, serialize_schema, + serialize_schema_to_info, }; use arrow2::io::ipc; use arrow2::io::ipc::read::Dictionaries; @@ -228,14 +227,14 @@ impl FlightService for Service { let stream = try_stream! { pin_mut!(input_stream); for await item in input_stream { - let FlightData {data_header, data_body, app_metadata, ..} = item.map_err(|_| Status::invalid_argument(format!("Invalid")))?; - save_message(&data_header, - &data_body, - &schema, + let data = item.map_err(|_| Status::invalid_argument(format!("Invalid")))?; + let maybe_chunk = deserialize_message(&data, &schema.fields, &ipc_schema, - &mut dictionaries, - &mut chunks)?; - yield PutResult {app_metadata} + &mut dictionaries).map_err(|_| Status::invalid_argument(format!("Invalid")))?; + if let Some(chunk) = maybe_chunk { + chunks.push(chunk) + } + yield PutResult {app_metadata: data.app_metadata} } let dataset = IntegrationDataset { schema, @@ -269,86 +268,3 @@ impl FlightService for Service { Err(Status::unimplemented("do_exchange")) } } - -fn chunk_from_message( - batch: arrow_format::ipc::RecordBatchRef<'_>, - data_body: &[u8], - fields: &[Field], - ipc_schema: &ipc::IpcSchema, - dictionaries: &mut Dictionaries, -) -> Result>, Status> { - let length = data_body.len(); - let mut reader = std::io::Cursor::new(data_body); - - ipc::read::read_record_batch( - batch, - fields, - ipc_schema, - None, - None, - dictionaries, - arrow_format::ipc::MetadataVersion::V5, - &mut reader, - 0, - length as u64, - &mut Default::default(), - ) - .map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e))) -} - -fn dictionary_from_message( - dict_batch: arrow_format::ipc::DictionaryBatchRef<'_>, - data_body: &[u8], - fields: &[Field], - ipc_schema: &ipc::IpcSchema, - dictionaries: &mut Dictionaries, -) -> Result<(), Status> { - let length = data_body.len(); - let mut reader = std::io::Cursor::new(data_body); - - ipc::read::read_dictionary( - dict_batch, - fields, - ipc_schema, - dictionaries, - &mut reader, - 0, - length as u64, - &mut Default::default(), - ) - .map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e))) -} - -fn save_message( - header: &[u8], - body: &[u8], - schema: &Schema, - ipc_schema: &ipc::IpcSchema, - dictionaries: &mut Dictionaries, - chunks: &mut Vec>>, -) -> Result<(), Status> { - let message = arrow_format::ipc::MessageRef::read_as_root(header) - .map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?; - let header = message - .header() - .map_err(|x| Status::internal(x.to_string()))? - .ok_or_else(|| Status::internal("Message must contain a header".to_string()))?; - - match header { - MessageHeaderRef::RecordBatch(batch) => { - let chunk = chunk_from_message(batch, body, &schema.fields, ipc_schema, dictionaries)?; - - chunks.push(chunk); - } - MessageHeaderRef::DictionaryBatch(dict_batch) => { - dictionary_from_message(dict_batch, body, &schema.fields, ipc_schema, dictionaries)?; - } - t => { - return Err(Status::internal(format!( - "Reading types other than record batches not yet supported, unable to read {:?}", - t - ))); - } - } - Ok(()) -} diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 4072c0d0d09..17f7c20d884 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -14,6 +14,7 @@ use crate::{ io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData}, }; +use super::ipc::read::Dictionaries; use super::ipc::{IpcField, IpcSchema}; pub use super::ipc::write::default_ipc_fields; @@ -114,7 +115,7 @@ pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> { read::deserialize_schema(bytes) } -/// Deserializes [`FlightData`] to [`Chunk`]. +/// Deserializes [`FlightData`] representing a record batch message to [`Chunk`]. pub fn deserialize_batch( data: &FlightData, fields: &[Field], @@ -149,3 +150,102 @@ pub fn deserialize_batch( )), } } + +/// Deserializes [`FlightData`], assuming it to be a dictionary message, into `dictionaries`. +pub fn deserialize_dictionary( + data: &FlightData, + fields: &[Field], + ipc_schema: &IpcSchema, + dictionaries: &mut read::Dictionaries, +) -> Result<()> { + let message = ipc::MessageRef::read_as_root(&data.data_header)?; + + let chunk = if let ipc::MessageHeaderRef::DictionaryBatch(chunk) = message + .header()? + .ok_or_else(|| Error::oos("Header is required"))? + { + chunk + } else { + return Ok(()); + }; + + let length = data.data_body.len(); + let mut reader = std::io::Cursor::new(&data.data_body); + read::read_dictionary( + chunk, + fields, + ipc_schema, + dictionaries, + &mut reader, + 0, + length as u64, + &mut Default::default(), + )?; + + Ok(()) +} + +/// Deserializes [`FlightData`] into either a [`Chunk`] (when the message is a record batch) +/// or by upserting into `dictionaries` (when the message is a dictionary) +pub fn deserialize_message( + data: &FlightData, + fields: &[Field], + ipc_schema: &IpcSchema, + dictionaries: &mut Dictionaries, +) -> Result>>> { + let FlightData { + data_header, + data_body, + .. + } = data; + + let message = arrow_format::ipc::MessageRef::read_as_root(data_header)?; + let header = message + .header()? + .ok_or_else(|| Error::oos("IPC Message must contain a header"))?; + + match header { + ipc::MessageHeaderRef::RecordBatch(batch) => { + let length = data_body.len(); + let mut reader = std::io::Cursor::new(data_body); + + let chunk = read::read_record_batch( + batch, + fields, + ipc_schema, + None, + None, + dictionaries, + arrow_format::ipc::MetadataVersion::V5, + &mut reader, + 0, + length as u64, + &mut Default::default(), + )?; + + Ok(chunk.into()) + } + ipc::MessageHeaderRef::DictionaryBatch(dict_batch) => { + let length = data_body.len(); + let mut reader = std::io::Cursor::new(data_body); + + read::read_dictionary( + dict_batch, + fields, + ipc_schema, + dictionaries, + &mut reader, + 0, + length as u64, + &mut Default::default(), + )?; + Ok(None) + } + t => { + return Err(Error::nyi(format!( + "Reading types other than record batches not yet supported, unable to read {:?}", + t + ))); + } + } +} diff --git a/src/io/ipc/read/file.rs b/src/io/ipc/read/file.rs new file mode 100644 index 00000000000..ad8dc348110 --- /dev/null +++ b/src/io/ipc/read/file.rs @@ -0,0 +1,319 @@ +use ahash::AHashMap; +use std::convert::TryInto; +use std::io::{Read, Seek, SeekFrom}; + +use crate::array::Array; +use crate::chunk::Chunk; +use crate::datatypes::Schema; +use crate::error::{Error, Result}; +use crate::io::ipc::IpcSchema; + +use super::super::{ARROW_MAGIC, CONTINUATION_MARKER}; +use super::common::*; +use super::schema::fb_to_schema; +use super::Dictionaries; +use super::OutOfSpecKind; +use arrow_format::ipc::planus::ReadAsRoot; + +/// Metadata of an Arrow IPC file, written in the footer of the file. +#[derive(Debug, Clone)] +pub struct FileMetadata { + /// The schema that is read from the file footer + pub schema: Schema, + + /// The files' [`IpcSchema`] + pub ipc_schema: IpcSchema, + + /// The blocks in the file + /// + /// A block indicates the regions in the file to read to get data + pub blocks: Vec, + + /// Dictionaries associated to each dict_id + pub(crate) dictionaries: Option>, + + /// The total size of the file in bytes + pub size: u64, +} + +fn read_dictionary_message( + reader: &mut R, + offset: u64, + data: &mut Vec, +) -> Result<()> { + let mut message_size: [u8; 4] = [0; 4]; + reader.seek(SeekFrom::Start(offset))?; + reader.read_exact(&mut message_size)?; + if message_size == CONTINUATION_MARKER { + reader.read_exact(&mut message_size)?; + }; + let message_length = i32::from_le_bytes(message_size); + + let message_length: usize = message_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + data.clear(); + data.try_reserve(message_length)?; + reader + .by_ref() + .take(message_length as u64) + .read_to_end(data)?; + + Ok(()) +} + +pub(crate) fn get_dictionary_batch<'a>( + message: &'a arrow_format::ipc::MessageRef, +) -> Result> { + let header = message + .header() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + match header { + arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch), + _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), + } +} + +fn read_dictionary_block( + reader: &mut R, + metadata: &FileMetadata, + block: &arrow_format::ipc::Block, + dictionaries: &mut Dictionaries, + message_scratch: &mut Vec, + dictionary_scratch: &mut Vec, +) -> Result<()> { + let offset: u64 = block + .offset + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + let length: u64 = block + .meta_data_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + read_dictionary_message(reader, offset, message_scratch)?; + + let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; + + let batch = get_dictionary_batch(&message)?; + + read_dictionary( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + dictionaries, + reader, + offset + length, + metadata.size, + dictionary_scratch, + ) +} + +/// Reads all file's dictionaries, if any +/// This function is IO-bounded +pub fn read_file_dictionaries( + reader: &mut R, + metadata: &FileMetadata, + scratch: &mut Vec, +) -> Result { + let mut dictionaries = Default::default(); + + let blocks = if let Some(blocks) = &metadata.dictionaries { + blocks + } else { + return Ok(AHashMap::new()); + }; + // use a temporary smaller scratch for the messages + let mut message_scratch = Default::default(); + + for block in blocks { + read_dictionary_block( + reader, + metadata, + block, + &mut dictionaries, + &mut message_scratch, + scratch, + )?; + } + Ok(dictionaries) +} + +/// Reads the footer's length and magic number in footer +fn read_footer_len(reader: &mut R) -> Result<(u64, usize)> { + // read footer length and magic number in footer + let end = reader.seek(SeekFrom::End(-10))? + 10; + + let mut footer: [u8; 10] = [0; 10]; + + reader.read_exact(&mut footer)?; + let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap()); + + if footer[4..] != ARROW_MAGIC { + return Err(Error::from(OutOfSpecKind::InvalidFooter)); + } + let footer_len = footer_len + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok((end, footer_len)) +} + +pub(super) fn deserialize_footer(footer_data: &[u8], size: u64) -> Result { + let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data) + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferFooter(err)))?; + + let blocks = footer + .record_batches() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingRecordBatches))?; + + let blocks = blocks + .iter() + .map(|block| { + block + .try_into() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferRecordBatches(err))) + }) + .collect::>>()?; + + let ipc_schema = footer + .schema() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferSchema(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingSchema))?; + let (schema, ipc_schema) = fb_to_schema(ipc_schema)?; + + let dictionaries = footer + .dictionaries() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferDictionaries(err)))? + .map(|dictionaries| { + dictionaries + .into_iter() + .map(|block| { + block.try_into().map_err(|err| { + Error::from(OutOfSpecKind::InvalidFlatbufferRecordBatches(err)) + }) + }) + .collect::>>() + }) + .transpose()?; + + Ok(FileMetadata { + schema, + ipc_schema, + blocks, + dictionaries, + size, + }) +} + +/// Read the Arrow IPC file's metadata +pub fn read_file_metadata(reader: &mut R) -> Result { + // check if header contain the correct magic bytes + let mut magic_buffer: [u8; 6] = [0; 6]; + let start = reader.seek(SeekFrom::Current(0))?; + reader.read_exact(&mut magic_buffer)?; + if magic_buffer != ARROW_MAGIC { + return Err(Error::from(OutOfSpecKind::InvalidHeader)); + } + + let (end, footer_len) = read_footer_len(reader)?; + + // read footer + reader.seek(SeekFrom::End(-10 - footer_len as i64))?; + + let mut serialized_footer = vec![]; + serialized_footer.try_reserve(footer_len)?; + reader + .by_ref() + .take(footer_len as u64) + .read_to_end(&mut serialized_footer)?; + + deserialize_footer(&serialized_footer, end - start) +} + +pub(crate) fn get_record_batch( + message: arrow_format::ipc::MessageRef, +) -> Result { + let header = message + .header() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + match header { + arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch), + _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), + } +} + +/// Reads the record batch at position `index` from the reader. +/// +/// This function is useful for random access to the file. For example, if +/// you have indexed the file somewhere else, this allows pruning +/// certain parts of the file. +/// # Panics +/// This function panics iff `index >= metadata.blocks.len()` +#[allow(clippy::too_many_arguments)] +pub fn read_batch( + reader: &mut R, + dictionaries: &Dictionaries, + metadata: &FileMetadata, + projection: Option<&[usize]>, + limit: Option, + index: usize, + message_scratch: &mut Vec, + data_scratch: &mut Vec, +) -> Result>> { + let block = metadata.blocks[index]; + + let offset: u64 = block + .offset + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let length: u64 = block + .meta_data_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + // read length + reader.seek(SeekFrom::Start(offset))?; + let mut meta_buf = [0; 4]; + reader.read_exact(&mut meta_buf)?; + if meta_buf == CONTINUATION_MARKER { + // continuation marker encountered, read message next + reader.read_exact(&mut meta_buf)?; + } + let meta_len = i32::from_le_bytes(meta_buf) + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + + message_scratch.clear(); + message_scratch.try_reserve(meta_len)?; + reader + .by_ref() + .take(meta_len as u64) + .read_to_end(message_scratch)?; + + let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; + + let batch = get_record_batch(message)?; + + read_record_batch( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + projection, + limit, + dictionaries, + message + .version() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferVersion(err)))?, + reader, + offset + length, + metadata.size, + data_scratch, + ) +} diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 82999c28ea0..bc74890cc25 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -14,7 +14,7 @@ use crate::error::{Error, Result}; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; -use super::reader::{deserialize_footer, get_record_batch}; +use super::file::{deserialize_footer, get_record_batch}; use super::Dictionaries; use super::FileMetadata; use super::OutOfSpecKind; diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 2645d91440f..88f1deaf3a7 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -12,8 +12,9 @@ mod array; mod common; mod deserialize; mod error; +pub(crate) mod file; mod read_basic; -pub(crate) mod reader; +mod reader; mod schema; mod stream; @@ -28,11 +29,10 @@ pub mod stream_async; pub mod file_async; pub(crate) use common::first_dict_field; -pub use common::{read_dictionary, read_record_batch}; -pub use reader::{ - read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader, -}; -pub use schema::deserialize_schema; +pub(crate) use common::{read_dictionary, read_record_batch}; +pub use file::{read_batch, read_file_dictionaries, read_file_metadata, FileMetadata}; +pub use reader::FileReader; +pub(crate) use schema::deserialize_schema; pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState}; /// how dictionaries are tracked in this crate diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 37b8437c271..da584fadeed 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -1,322 +1,14 @@ use ahash::AHashMap; -use std::convert::TryInto; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{Read, Seek}; use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; -use crate::error::{Error, Result}; -use crate::io::ipc::IpcSchema; +use crate::error::Result; -use super::super::{ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::*; -use super::schema::fb_to_schema; use super::Dictionaries; -use super::OutOfSpecKind; -use arrow_format::ipc::planus::ReadAsRoot; - -/// Metadata of an Arrow IPC file, written in the footer of the file. -#[derive(Debug, Clone)] -pub struct FileMetadata { - /// The schema that is read from the file footer - pub schema: Schema, - - /// The files' [`IpcSchema`] - pub ipc_schema: IpcSchema, - - /// The blocks in the file - /// - /// A block indicates the regions in the file to read to get data - pub blocks: Vec, - - /// Dictionaries associated to each dict_id - pub(crate) dictionaries: Option>, - - /// The total size of the file in bytes - pub size: u64, -} - -fn read_dictionary_message( - reader: &mut R, - offset: u64, - data: &mut Vec, -) -> Result<()> { - let mut message_size: [u8; 4] = [0; 4]; - reader.seek(SeekFrom::Start(offset))?; - reader.read_exact(&mut message_size)?; - if message_size == CONTINUATION_MARKER { - reader.read_exact(&mut message_size)?; - }; - let message_length = i32::from_le_bytes(message_size); - - let message_length: usize = message_length - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - data.clear(); - data.try_reserve(message_length)?; - reader - .by_ref() - .take(message_length as u64) - .read_to_end(data)?; - - Ok(()) -} - -pub(crate) fn get_dictionary_batch<'a>( - message: &'a arrow_format::ipc::MessageRef, -) -> Result> { - let header = message - .header() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? - .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; - match header { - arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch), - _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), - } -} - -fn read_dictionary_block( - reader: &mut R, - metadata: &FileMetadata, - block: &arrow_format::ipc::Block, - dictionaries: &mut Dictionaries, - message_scratch: &mut Vec, - dictionary_scratch: &mut Vec, -) -> Result<()> { - let offset: u64 = block - .offset - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - let length: u64 = block - .meta_data_length - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - read_dictionary_message(reader, offset, message_scratch)?; - - let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - - let batch = get_dictionary_batch(&message)?; - - read_dictionary( - batch, - &metadata.schema.fields, - &metadata.ipc_schema, - dictionaries, - reader, - offset + length, - metadata.size, - dictionary_scratch, - ) -} - -/// Reads all file's dictionaries, if any -/// This function is IO-bounded -pub fn read_file_dictionaries( - reader: &mut R, - metadata: &FileMetadata, - scratch: &mut Vec, -) -> Result { - let mut dictionaries = Default::default(); - - let blocks = if let Some(blocks) = &metadata.dictionaries { - blocks - } else { - return Ok(AHashMap::new()); - }; - // use a temporary smaller scratch for the messages - let mut message_scratch = Default::default(); - - for block in blocks { - read_dictionary_block( - reader, - metadata, - block, - &mut dictionaries, - &mut message_scratch, - scratch, - )?; - } - Ok(dictionaries) -} - -/// Reads the footer's length and magic number in footer -fn read_footer_len(reader: &mut R) -> Result<(u64, usize)> { - // read footer length and magic number in footer - let end = reader.seek(SeekFrom::End(-10))? + 10; - - let mut footer: [u8; 10] = [0; 10]; - - reader.read_exact(&mut footer)?; - let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap()); - - if footer[4..] != ARROW_MAGIC { - return Err(Error::from(OutOfSpecKind::InvalidFooter)); - } - let footer_len = footer_len - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - Ok((end, footer_len)) -} - -pub(super) fn deserialize_footer(footer_data: &[u8], size: u64) -> Result { - let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data) - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferFooter(err)))?; - - let blocks = footer - .record_batches() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))? - .ok_or_else(|| Error::from(OutOfSpecKind::MissingRecordBatches))?; - - let blocks = blocks - .iter() - .map(|block| { - block - .try_into() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferRecordBatches(err))) - }) - .collect::>>()?; - - let ipc_schema = footer - .schema() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferSchema(err)))? - .ok_or_else(|| Error::from(OutOfSpecKind::MissingSchema))?; - let (schema, ipc_schema) = fb_to_schema(ipc_schema)?; - - let dictionaries = footer - .dictionaries() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferDictionaries(err)))? - .map(|dictionaries| { - dictionaries - .into_iter() - .map(|block| { - block.try_into().map_err(|err| { - Error::from(OutOfSpecKind::InvalidFlatbufferRecordBatches(err)) - }) - }) - .collect::>>() - }) - .transpose()?; - - Ok(FileMetadata { - schema, - ipc_schema, - blocks, - dictionaries, - size, - }) -} - -/// Read the Arrow IPC file's metadata -pub fn read_file_metadata(reader: &mut R) -> Result { - // check if header contain the correct magic bytes - let mut magic_buffer: [u8; 6] = [0; 6]; - let start = reader.seek(SeekFrom::Current(0))?; - reader.read_exact(&mut magic_buffer)?; - if magic_buffer != ARROW_MAGIC { - return Err(Error::from(OutOfSpecKind::InvalidHeader)); - } - - let (end, footer_len) = read_footer_len(reader)?; - - // read footer - reader.seek(SeekFrom::End(-10 - footer_len as i64))?; - - let mut serialized_footer = vec![]; - serialized_footer.try_reserve(footer_len)?; - reader - .by_ref() - .take(footer_len as u64) - .read_to_end(&mut serialized_footer)?; - - deserialize_footer(&serialized_footer, end - start) -} - -pub(crate) fn get_record_batch( - message: arrow_format::ipc::MessageRef, -) -> Result { - let header = message - .header() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? - .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; - match header { - arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch), - _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), - } -} - -/// Reads the record batch at position `index` from the reader. -/// -/// This function is useful for random access to the file. For example, if -/// you have indexed the file somewhere else, this allows pruning -/// certain parts of the file. -/// # Panics -/// This function panics iff `index >= metadata.blocks.len()` -#[allow(clippy::too_many_arguments)] -pub fn read_batch( - reader: &mut R, - dictionaries: &Dictionaries, - metadata: &FileMetadata, - projection: Option<&[usize]>, - limit: Option, - index: usize, - message_scratch: &mut Vec, - data_scratch: &mut Vec, -) -> Result>> { - let block = metadata.blocks[index]; - - let offset: u64 = block - .offset - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let length: u64 = block - .meta_data_length - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - // read length - reader.seek(SeekFrom::Start(offset))?; - let mut meta_buf = [0; 4]; - reader.read_exact(&mut meta_buf)?; - if meta_buf == CONTINUATION_MARKER { - // continuation marker encountered, read message next - reader.read_exact(&mut meta_buf)?; - } - let meta_len = i32::from_le_bytes(meta_buf) - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - - message_scratch.clear(); - message_scratch.try_reserve(meta_len)?; - reader - .by_ref() - .take(meta_len as u64) - .read_to_end(message_scratch)?; - - let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - - let batch = get_record_batch(message)?; - - read_record_batch( - batch, - &metadata.schema.fields, - &metadata.ipc_schema, - projection, - limit, - dictionaries, - message - .version() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferVersion(err)))?, - reader, - offset + length, - metadata.size, - data_scratch, - ) -} +use super::{read_batch, read_file_dictionaries, FileMetadata}; /// An iterator of [`Chunk`]s from an Arrow IPC file. pub struct FileReader { diff --git a/src/mmap/mod.rs b/src/mmap/mod.rs index 55bd7d01f80..d8bd90c3a2a 100644 --- a/src/mmap/mod.rs +++ b/src/mmap/mod.rs @@ -8,7 +8,7 @@ use crate::datatypes::{DataType, Field}; use crate::error::Error; use crate::ffi::mmap; -use crate::io::ipc::read::reader::{get_dictionary_batch, get_record_batch}; +use crate::io::ipc::read::file::{get_dictionary_batch, get_record_batch}; use crate::io::ipc::read::{first_dict_field, Dictionaries, FileMetadata}; use crate::io::ipc::read::{IpcBuffer, Node, OutOfSpecKind}; use crate::io::ipc::{IpcField, CONTINUATION_MARKER};