Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: IPC writer should truncate string array with all empty string #2314

Merged
merged 5 commits into from
Aug 4, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,16 @@ fn get_buffer_element_width(spec: &BufferSpec) -> usize {
}
}

/// Returns byte width for binary value_offset buffer spec.
#[inline]
fn get_value_offset_byte_width(data_type: &DataType) -> usize {
match data_type {
DataType::Binary | DataType::Utf8 => 4,
DataType::LargeBinary | DataType::LargeUtf8 => 8,
_ => unreachable!(),
}
}

/// Returns the number of total bytes in base binary arrays.
fn get_binary_buffer_len(array_data: &ArrayData) -> usize {
if array_data.is_empty() {
Expand Down Expand Up @@ -1005,13 +1015,16 @@ fn write_array_data(
data_type,
DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8
) {
let total_bytes = get_binary_buffer_len(array_data);
let value_buffer = &array_data.buffers()[1];
let offset_buffer = &array_data.buffers()[0];
let value_offset_byte_width = get_value_offset_byte_width(data_type);
let min_length = array_data.len() * value_offset_byte_width;
JasonLi-cn marked this conversation as resolved.
Show resolved Hide resolved
if buffer_need_truncate(
array_data.offset(),
value_buffer,
&BufferSpec::VariableWidth,
total_bytes,
offset_buffer,
&BufferSpec::FixedWidth {
byte_width: value_offset_byte_width,
},
min_length,
) {
// Rebase offsets and truncate values
let (new_offsets, byte_offset) =
Expand All @@ -1029,6 +1042,8 @@ fn write_array_data(

offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data, offset);

let total_bytes = get_binary_buffer_len(array_data);
let value_buffer = &array_data.buffers()[1];
let buffer_length = min(total_bytes, value_buffer.len() - byte_offset);
let buffer_slice =
&value_buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
Expand Down Expand Up @@ -1832,4 +1847,21 @@ mod tests {
assert!(structs.column(1).is_null(1));
assert_eq!(record_batch_slice, deserialized_batch);
}

#[test]
fn truncate_ipc_string_array_with_all_empty_string() {
fn create_batch() -> RecordBatch {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let a =
StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
}

let record_batch = create_batch();
let record_batch_slice = record_batch.slice(0, 1);
let deserialized_batch = deserialize(serialize(&record_batch_slice));

assert!(serialize(&record_batch).len() > serialize(&record_batch_slice).len());
assert_eq!(record_batch_slice, deserialized_batch);
}
}