Skip to content

Commit

Permalink
Remove alias for self
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 27, 2025
1 parent d59289c commit 4838002
Showing 1 changed file with 41 additions and 42 deletions.
83 changes: 41 additions & 42 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,17 @@ impl RecordBatchDecoder<'_> {
field: &Field,
variadic_counts: &mut VecDeque<i64>,
) -> Result<ArrayRef, ArrowError> {
let reader = self;
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
reader.next_node(field)?,
self.next_node(field)?,
data_type,
&[
reader.next_buffer()?,
reader.next_buffer()?,
reader.next_buffer()?,
self.next_buffer()?,
self.next_buffer()?,
self.next_buffer()?,
],
reader.require_alignment,
self.require_alignment,
),
BinaryView | Utf8View => {
let count = variadic_counts
Expand All @@ -104,55 +103,55 @@ impl RecordBatchDecoder<'_> {
)))?;
let count = count + 2; // view and null buffer.
let buffers = (0..count)
.map(|_| reader.next_buffer())
.map(|_| self.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
create_primitive_array(
reader.next_node(field)?,
self.next_node(field)?,
data_type,
&buffers,
reader.require_alignment,
self.require_alignment,
)
}
FixedSizeBinary(_) => create_primitive_array(
reader.next_node(field)?,
self.next_node(field)?,
data_type,
&[reader.next_buffer()?, reader.next_buffer()?],
reader.require_alignment,
&[self.next_buffer()?, self.next_buffer()?],
self.require_alignment,
),
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let values = reader.create_array(list_field, variadic_counts)?;
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?, self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
create_list_array(
list_node,
data_type,
&list_buffers,
values,
reader.require_alignment,
self.require_alignment,
)
}
FixedSizeList(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?];
let values = reader.create_array(list_field, variadic_counts)?;
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
create_list_array(
list_node,
data_type,
&list_buffers,
values,
reader.require_alignment,
self.require_alignment,
)
}
Struct(struct_fields) => {
let struct_node = reader.next_node(field)?;
let null_buffer = reader.next_buffer()?;
let struct_node = self.next_node(field)?;
let null_buffer = self.next_buffer()?;

// read the arrays for each field
let mut struct_arrays = vec![];
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
let child = reader.create_array(struct_field, variadic_counts)?;
let child = self.create_array(struct_field, variadic_counts)?;
struct_arrays.push(child);
}
let null_count = struct_node.null_count() as usize;
Expand All @@ -175,32 +174,32 @@ impl RecordBatchDecoder<'_> {
Ok(Arc::new(struct_array))
}
RunEndEncoded(run_ends_field, values_field) => {
let run_node = reader.next_node(field)?;
let run_ends = reader.create_array(run_ends_field, variadic_counts)?;
let values = reader.create_array(values_field, variadic_counts)?;
let run_node = self.next_node(field)?;
let run_ends = self.create_array(run_ends_field, variadic_counts)?;
let values = self.create_array(values_field, variadic_counts)?;

let run_array_length = run_node.length() as usize;
let array_data = ArrayData::builder(data_type.clone())
.len(run_array_length)
.offset(0)
.add_child_data(run_ends.into_data())
.add_child_data(values.into_data())
.align_buffers(!reader.require_alignment)
.align_buffers(!self.require_alignment)
.build()?;

Ok(make_array(array_data))
}
// Create dictionary array from RecordBatch
Dictionary(_, _) => {
let index_node = reader.next_node(field)?;
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let index_node = self.next_node(field)?;
let index_buffers = [self.next_buffer()?, self.next_buffer()?];

#[allow(deprecated)]
let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::ParseError(format!("Field {field} does not have dict id"))
})?;

let value_array = reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
ArrowError::ParseError(format!(
"Cannot find a dictionary batch with dict id: {dict_id}"
))
Expand All @@ -211,26 +210,26 @@ impl RecordBatchDecoder<'_> {
data_type,
&index_buffers,
value_array.clone(),
reader.require_alignment,
self.require_alignment,
)
}
Union(fields, mode) => {
let union_node = reader.next_node(field)?;
let union_node = self.next_node(field)?;
let len = union_node.length() as usize;

// In V4, union types has validity bitmap
// In V5 and later, union types have no validity bitmap
if reader.version < MetadataVersion::V5 {
reader.next_buffer()?;
if self.version < MetadataVersion::V5 {
self.next_buffer()?;
}

let type_ids: ScalarBuffer<i8> =
reader.next_buffer()?.slice_with_length(0, len).into();
self.next_buffer()?.slice_with_length(0, len).into();

let value_offsets = match mode {
UnionMode::Dense => {
let offsets: ScalarBuffer<i32> =
reader.next_buffer()?.slice_with_length(0, len * 4).into();
self.next_buffer()?.slice_with_length(0, len * 4).into();
Some(offsets)
}
UnionMode::Sparse => None,
Expand All @@ -239,15 +238,15 @@ impl RecordBatchDecoder<'_> {
let mut children = Vec::with_capacity(fields.len());

for (_id, field) in fields.iter() {
let child = reader.create_array(field, variadic_counts)?;
let child = self.create_array(field, variadic_counts)?;
children.push(child);
}

let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?;
Ok(Arc::new(array))
}
Null => {
let node = reader.next_node(field)?;
let node = self.next_node(field)?;
let length = node.length();
let null_count = node.null_count();

Expand All @@ -260,17 +259,17 @@ impl RecordBatchDecoder<'_> {
let array_data = ArrayData::builder(data_type.clone())
.len(length as usize)
.offset(0)
.align_buffers(!reader.require_alignment)
.align_buffers(!self.require_alignment)
.build()?;

// no buffer increases
Ok(Arc::new(NullArray::from(array_data)))
}
_ => create_primitive_array(
reader.next_node(field)?,
self.next_node(field)?,
data_type,
&[reader.next_buffer()?, reader.next_buffer()?],
reader.require_alignment,
&[self.next_buffer()?, self.next_buffer()?],
self.require_alignment,
),
}
}
Expand Down

0 comments on commit 4838002

Please sign in to comment.