diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index f1207b1e5ec..e79ab232114 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -83,18 +83,17 @@ impl RecordBatchDecoder<'_> { field: &Field, variadic_counts: &mut VecDeque, ) -> Result { - let reader = self; let data_type = field.data_type(); match data_type { Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array( - reader.next_node(field)?, + self.next_node(field)?, data_type, &[ - reader.next_buffer()?, - reader.next_buffer()?, - reader.next_buffer()?, + self.next_buffer()?, + self.next_buffer()?, + self.next_buffer()?, ], - reader.require_alignment, + self.require_alignment, ), BinaryView | Utf8View => { let count = variadic_counts @@ -104,55 +103,55 @@ impl RecordBatchDecoder<'_> { )))?; let count = count + 2; // view and null buffer. let buffers = (0..count) - .map(|_| reader.next_buffer()) + .map(|_| self.next_buffer()) .collect::, _>>()?; create_primitive_array( - reader.next_node(field)?, + self.next_node(field)?, data_type, &buffers, - reader.require_alignment, + self.require_alignment, ) } FixedSizeBinary(_) => create_primitive_array( - reader.next_node(field)?, + self.next_node(field)?, data_type, - &[reader.next_buffer()?, reader.next_buffer()?], - reader.require_alignment, + &[self.next_buffer()?, self.next_buffer()?], + self.require_alignment, ), List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => { - let list_node = reader.next_node(field)?; - let list_buffers = [reader.next_buffer()?, reader.next_buffer()?]; - let values = reader.create_array(list_field, variadic_counts)?; + let list_node = self.next_node(field)?; + let list_buffers = [self.next_buffer()?, self.next_buffer()?]; + let values = self.create_array(list_field, variadic_counts)?; create_list_array( list_node, data_type, &list_buffers, values, - reader.require_alignment, + self.require_alignment, ) } FixedSizeList(ref list_field, _) => { - let list_node = reader.next_node(field)?; - let list_buffers = [reader.next_buffer()?]; - let values = reader.create_array(list_field, variadic_counts)?; + let list_node = self.next_node(field)?; + let list_buffers = [self.next_buffer()?]; + let values = self.create_array(list_field, variadic_counts)?; create_list_array( list_node, data_type, &list_buffers, values, - reader.require_alignment, + self.require_alignment, ) } Struct(struct_fields) => { - let struct_node = reader.next_node(field)?; - let null_buffer = reader.next_buffer()?; + let struct_node = self.next_node(field)?; + let null_buffer = self.next_buffer()?; // read the arrays for each field let mut struct_arrays = vec![]; // TODO investigate whether just knowing the number of buffers could // still work for struct_field in struct_fields { - let child = reader.create_array(struct_field, variadic_counts)?; + let child = self.create_array(struct_field, variadic_counts)?; struct_arrays.push(child); } let null_count = struct_node.null_count() as usize; @@ -175,9 +174,9 @@ impl RecordBatchDecoder<'_> { Ok(Arc::new(struct_array)) } RunEndEncoded(run_ends_field, values_field) => { - let run_node = reader.next_node(field)?; - let run_ends = reader.create_array(run_ends_field, variadic_counts)?; - let values = reader.create_array(values_field, variadic_counts)?; + let run_node = self.next_node(field)?; + let run_ends = self.create_array(run_ends_field, variadic_counts)?; + let values = self.create_array(values_field, variadic_counts)?; let run_array_length = run_node.length() as usize; let array_data = ArrayData::builder(data_type.clone()) @@ -185,22 +184,22 @@ impl RecordBatchDecoder<'_> { .offset(0) .add_child_data(run_ends.into_data()) .add_child_data(values.into_data()) - .align_buffers(!reader.require_alignment) + .align_buffers(!self.require_alignment) .build()?; Ok(make_array(array_data)) } // Create dictionary array from RecordBatch Dictionary(_, _) => { - let index_node = reader.next_node(field)?; - let index_buffers = [reader.next_buffer()?, reader.next_buffer()?]; + let index_node = self.next_node(field)?; + let index_buffers = [self.next_buffer()?, self.next_buffer()?]; #[allow(deprecated)] let dict_id = field.dict_id().ok_or_else(|| { ArrowError::ParseError(format!("Field {field} does not have dict id")) })?; - let value_array = reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| { + let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| { ArrowError::ParseError(format!( "Cannot find a dictionary batch with dict id: {dict_id}" )) @@ -211,26 +210,26 @@ impl RecordBatchDecoder<'_> { data_type, &index_buffers, value_array.clone(), - reader.require_alignment, + self.require_alignment, ) } Union(fields, mode) => { - let union_node = reader.next_node(field)?; + let union_node = self.next_node(field)?; let len = union_node.length() as usize; // In V4, union types has validity bitmap // In V5 and later, union types have no validity bitmap - if reader.version < MetadataVersion::V5 { - reader.next_buffer()?; + if self.version < MetadataVersion::V5 { + self.next_buffer()?; } let type_ids: ScalarBuffer = - reader.next_buffer()?.slice_with_length(0, len).into(); + self.next_buffer()?.slice_with_length(0, len).into(); let value_offsets = match mode { UnionMode::Dense => { let offsets: ScalarBuffer = - reader.next_buffer()?.slice_with_length(0, len * 4).into(); + self.next_buffer()?.slice_with_length(0, len * 4).into(); Some(offsets) } UnionMode::Sparse => None, @@ -239,7 +238,7 @@ impl RecordBatchDecoder<'_> { let mut children = Vec::with_capacity(fields.len()); for (_id, field) in fields.iter() { - let child = reader.create_array(field, variadic_counts)?; + let child = self.create_array(field, variadic_counts)?; children.push(child); } @@ -247,7 +246,7 @@ impl RecordBatchDecoder<'_> { Ok(Arc::new(array)) } Null => { - let node = reader.next_node(field)?; + let node = self.next_node(field)?; let length = node.length(); let null_count = node.null_count(); @@ -260,17 +259,17 @@ impl RecordBatchDecoder<'_> { let array_data = ArrayData::builder(data_type.clone()) .len(length as usize) .offset(0) - .align_buffers(!reader.require_alignment) + .align_buffers(!self.require_alignment) .build()?; // no buffer increases Ok(Arc::new(NullArray::from(array_data))) } _ => create_primitive_array( - reader.next_node(field)?, + self.next_node(field)?, data_type, - &[reader.next_buffer()?, reader.next_buffer()?], - reader.require_alignment, + &[self.next_buffer()?, self.next_buffer()?], + self.require_alignment, ), } }