From d59289c46d0cd9f0661e33129ae53fa5060a20e6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Jan 2025 05:28:56 -0500 Subject: [PATCH 1/2] Rename `ArrayReader` to `RecordBatchDecoder` --- arrow-ipc/src/reader.rs | 27 +++++++++++++++------------ arrow-ipc/src/reader/stream.rs | 4 ++-- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index ff509c4f576..f1207b1e5ec 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -65,7 +65,7 @@ fn read_buffer( (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data), } } -impl ArrayReader<'_> { +impl RecordBatchDecoder<'_> { /// Coordinates reading arrays based on data types. /// /// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView) @@ -370,8 +370,11 @@ fn create_dictionary_array( } } -/// State for decoding arrays from an encoded [`RecordBatch`] -struct ArrayReader<'a> { +/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to +/// [`RecordBatch`] +/// +/// [IPC RecordBatch]: crate::RecordBatch +struct RecordBatchDecoder<'a> { /// The flatbuffers encoded record batch batch: crate::RecordBatch<'a>, /// The output schema @@ -389,14 +392,14 @@ struct ArrayReader<'a> { /// The buffers comprising this array buffers: VectorIter<'a, crate::Buffer>, /// Projection (subset of columns) to read, if any - /// See [`ArrayReader::with_projection`] for details + /// See [`RecordBatchDecoder::with_projection`] for details projection: Option<&'a [usize]>, /// Are buffers required to already be aligned? See - /// [`ArrayReader::with_require_alignment`] for details + /// [`RecordBatchDecoder::with_require_alignment`] for details require_alignment: bool, } -impl<'a> ArrayReader<'a> { +impl<'a> RecordBatchDecoder<'a> { /// Create a reader for decoding arrays from an encoded [`RecordBatch`] fn try_new( buf: &'a Buffer, @@ -604,7 +607,7 @@ pub fn read_record_batch( projection: Option<&[usize]>, metadata: &MetadataVersion, ) -> Result { - ArrayReader::try_new(buf, batch, schema, dictionaries_by_id, metadata)? + RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? .with_projection(projection) .with_require_alignment(false) .read_record_batch() @@ -652,7 +655,7 @@ fn read_dictionary_impl( let value = value_type.as_ref().clone(); let schema = Schema::new(vec![Field::new("", value, true)]); // Read a single column - let record_batch = ArrayReader::try_new( + let record_batch = RecordBatchDecoder::try_new( buf, batch.data().unwrap(), Arc::new(schema), @@ -876,7 +879,7 @@ impl FileDecoder { ArrowError::IpcError("Unable to read IPC message as record batch".to_string()) })?; // read the block that makes up the record batch into a buffer - ArrayReader::try_new( + RecordBatchDecoder::try_new( &buf.slice(block.metaDataLength() as _), batch, self.schema.clone(), @@ -1426,7 +1429,7 @@ impl StreamReader { let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); self.reader.read_exact(&mut buf)?; - ArrayReader::try_new( + RecordBatchDecoder::try_new( &buf.into(), batch, self.schema(), @@ -2277,7 +2280,7 @@ mod tests { assert_ne!(b.as_ptr().align_offset(8), 0); let ipc_batch = message.header_as_record_batch().unwrap(); - let roundtrip = ArrayReader::try_new( + let roundtrip = RecordBatchDecoder::try_new( &b, ipc_batch, batch.schema(), @@ -2316,7 +2319,7 @@ mod tests { assert_ne!(b.as_ptr().align_offset(8), 0); let ipc_batch = message.header_as_record_batch().unwrap(); - let result = ArrayReader::try_new( + let result = RecordBatchDecoder::try_new( &b, ipc_batch, batch.schema(), diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index e66896f151d..174e69c1f67 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -24,7 +24,7 @@ use arrow_buffer::{Buffer, MutableBuffer}; use arrow_schema::{ArrowError, SchemaRef}; use crate::convert::MessageBuffer; -use crate::reader::{read_dictionary_impl, ArrayReader}; +use crate::reader::{read_dictionary_impl, RecordBatchDecoder}; use crate::{MessageHeader, CONTINUATION_MARKER}; /// A low-level interface for reading [`RecordBatch`] data from a stream of bytes @@ -211,7 +211,7 @@ impl StreamDecoder { let schema = self.schema.clone().ok_or_else(|| { ArrowError::IpcError("Missing schema".to_string()) })?; - let batch = ArrayReader::try_new( + let batch = RecordBatchDecoder::try_new( &body, batch, schema, From 483800234f14968c84f8296039d7a4b9a3e6879c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 27 Jan 2025 05:30:49 -0500 Subject: [PATCH 2/2] Remove alias for `self` --- arrow-ipc/src/reader.rs | 83 ++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 42 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index f1207b1e5ec..e79ab232114 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -83,18 +83,17 @@ impl RecordBatchDecoder<'_> { field: &Field, variadic_counts: &mut VecDeque, ) -> Result { - let reader = self; let data_type = field.data_type(); match data_type { Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array( - reader.next_node(field)?, + self.next_node(field)?, data_type, &[ - reader.next_buffer()?, - reader.next_buffer()?, - reader.next_buffer()?, + self.next_buffer()?, + self.next_buffer()?, + self.next_buffer()?, ], - reader.require_alignment, + self.require_alignment, ), BinaryView | Utf8View => { let count = variadic_counts @@ -104,55 +103,55 @@ impl RecordBatchDecoder<'_> { )))?; let count = count + 2; // view and null buffer. let buffers = (0..count) - .map(|_| reader.next_buffer()) + .map(|_| self.next_buffer()) .collect::, _>>()?; create_primitive_array( - reader.next_node(field)?, + self.next_node(field)?, data_type, &buffers, - reader.require_alignment, + self.require_alignment, ) } FixedSizeBinary(_) => create_primitive_array( - reader.next_node(field)?, + self.next_node(field)?, data_type, - &[reader.next_buffer()?, reader.next_buffer()?], - reader.require_alignment, + &[self.next_buffer()?, self.next_buffer()?], + self.require_alignment, ), List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => { - let list_node = reader.next_node(field)?; - let list_buffers = [reader.next_buffer()?, reader.next_buffer()?]; - let values = reader.create_array(list_field, variadic_counts)?; + let list_node = self.next_node(field)?; + let list_buffers = [self.next_buffer()?, self.next_buffer()?]; + let values = self.create_array(list_field, variadic_counts)?; create_list_array( list_node, data_type, &list_buffers, values, - reader.require_alignment, + self.require_alignment, ) } FixedSizeList(ref list_field, _) => { - let list_node = reader.next_node(field)?; - let list_buffers = [reader.next_buffer()?]; - let values = reader.create_array(list_field, variadic_counts)?; + let list_node = self.next_node(field)?; + let list_buffers = [self.next_buffer()?]; + let values = self.create_array(list_field, variadic_counts)?; create_list_array( list_node, data_type, &list_buffers, values, - reader.require_alignment, + self.require_alignment, ) } Struct(struct_fields) => { - let struct_node = reader.next_node(field)?; - let null_buffer = reader.next_buffer()?; + let struct_node = self.next_node(field)?; + let null_buffer = self.next_buffer()?; // read the arrays for each field let mut struct_arrays = vec![]; // TODO investigate whether just knowing the number of buffers could // still work for struct_field in struct_fields { - let child = reader.create_array(struct_field, variadic_counts)?; + let child = self.create_array(struct_field, variadic_counts)?; struct_arrays.push(child); } let null_count = struct_node.null_count() as usize; @@ -175,9 +174,9 @@ impl RecordBatchDecoder<'_> { Ok(Arc::new(struct_array)) } RunEndEncoded(run_ends_field, values_field) => { - let run_node = reader.next_node(field)?; - let run_ends = reader.create_array(run_ends_field, variadic_counts)?; - let values = reader.create_array(values_field, variadic_counts)?; + let run_node = self.next_node(field)?; + let run_ends = self.create_array(run_ends_field, variadic_counts)?; + let values = self.create_array(values_field, variadic_counts)?; let run_array_length = run_node.length() as usize; let array_data = ArrayData::builder(data_type.clone()) @@ -185,22 +184,22 @@ impl RecordBatchDecoder<'_> { .offset(0) .add_child_data(run_ends.into_data()) .add_child_data(values.into_data()) - .align_buffers(!reader.require_alignment) + .align_buffers(!self.require_alignment) .build()?; Ok(make_array(array_data)) } // Create dictionary array from RecordBatch Dictionary(_, _) => { - let index_node = reader.next_node(field)?; - let index_buffers = [reader.next_buffer()?, reader.next_buffer()?]; + let index_node = self.next_node(field)?; + let index_buffers = [self.next_buffer()?, self.next_buffer()?]; #[allow(deprecated)] let dict_id = field.dict_id().ok_or_else(|| { ArrowError::ParseError(format!("Field {field} does not have dict id")) })?; - let value_array = reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| { + let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| { ArrowError::ParseError(format!( "Cannot find a dictionary batch with dict id: {dict_id}" )) @@ -211,26 +210,26 @@ impl RecordBatchDecoder<'_> { data_type, &index_buffers, value_array.clone(), - reader.require_alignment, + self.require_alignment, ) } Union(fields, mode) => { - let union_node = reader.next_node(field)?; + let union_node = self.next_node(field)?; let len = union_node.length() as usize; // In V4, union types has validity bitmap // In V5 and later, union types have no validity bitmap - if reader.version < MetadataVersion::V5 { - reader.next_buffer()?; + if self.version < MetadataVersion::V5 { + self.next_buffer()?; } let type_ids: ScalarBuffer = - reader.next_buffer()?.slice_with_length(0, len).into(); + self.next_buffer()?.slice_with_length(0, len).into(); let value_offsets = match mode { UnionMode::Dense => { let offsets: ScalarBuffer = - reader.next_buffer()?.slice_with_length(0, len * 4).into(); + self.next_buffer()?.slice_with_length(0, len * 4).into(); Some(offsets) } UnionMode::Sparse => None, @@ -239,7 +238,7 @@ impl RecordBatchDecoder<'_> { let mut children = Vec::with_capacity(fields.len()); for (_id, field) in fields.iter() { - let child = reader.create_array(field, variadic_counts)?; + let child = self.create_array(field, variadic_counts)?; children.push(child); } @@ -247,7 +246,7 @@ impl RecordBatchDecoder<'_> { Ok(Arc::new(array)) } Null => { - let node = reader.next_node(field)?; + let node = self.next_node(field)?; let length = node.length(); let null_count = node.null_count(); @@ -260,17 +259,17 @@ impl RecordBatchDecoder<'_> { let array_data = ArrayData::builder(data_type.clone()) .len(length as usize) .offset(0) - .align_buffers(!reader.require_alignment) + .align_buffers(!self.require_alignment) .build()?; // no buffer increases Ok(Arc::new(NullArray::from(array_data))) } _ => create_primitive_array( - reader.next_node(field)?, + self.next_node(field)?, data_type, - &[reader.next_buffer()?, reader.next_buffer()?], - reader.require_alignment, + &[self.next_buffer()?, self.next_buffer()?], + self.require_alignment, ), } }