Skip to content

Commit

Permalink
improve BOOLEAN writing logic and report error on encoding fail
Browse files Browse the repository at this point in the history
When writing BOOLEAN data, writing more than 2048 rows of data will
overflow the hard-coded 256 buffer set for the bit-writer in the
PlainEncoder. Once this occurs, further attempts to write to the encoder
fail, becuase capacity is exceeded, but the errors are silently ignored.

This fix improves the error detection and reporting at the point of
encoding and modifies the logic for bit_writing (BOOLEANS). The
bit_writer is initially allocated 256 bytes (as at present), then each
time the capacity is exceeded the capacity is incremented by another
256 bytes.

This certainly resolves the current problem, but it's not exactly a
great fix because the capacity of the bit_writer could now grow
substantially.

Other data types seem to have a more sophisticated mechanism for writing
data which doesn't involve growing or having a fixed size buffer. It
would be desirable to make the BOOLEAN type use this same mechanism if
possible, but that level of change is more intrusive and probably
requires greater knowledge of the implementation than I possess.

resolves: apache#349
  • Loading branch information
garyanaplan committed Jun 10, 2021
1 parent 0c00776 commit db8af9b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
6 changes: 5 additions & 1 deletion parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,11 @@ pub(crate) mod private {
bit_writer: &mut BitWriter,
) -> Result<()> {
for value in values {
bit_writer.put_value(*value as u64, 1);
if !bit_writer.put_value(*value as u64, 1) {
return Err(ParquetError::EOF(
"unable to put boolean value".to_string(),
));
}
}
Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/encodings/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub struct PlainEncoder<T: DataType> {
buffer: ByteBuffer,
bit_writer: BitWriter,
desc: ColumnDescPtr,
bw_bytes_written: usize,
_phantom: PhantomData<T>,
}

Expand All @@ -124,6 +125,7 @@ impl<T: DataType> PlainEncoder<T> {
buffer: byte_buffer,
bit_writer: BitWriter::new(256),
desc,
bw_bytes_written: 0,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -153,7 +155,11 @@ impl<T: DataType> Encoder<T> for PlainEncoder<T> {

#[inline]
fn put(&mut self, values: &[T::T]) -> Result<()> {
if self.bw_bytes_written + values.len() >= self.bit_writer.capacity() {
self.bit_writer.extend(256);
}
T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?;
self.bw_bytes_written += values.len();
Ok(())
}
}
Expand Down
14 changes: 14 additions & 0 deletions parquet/src/util/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ impl BitWriter {
}
}

/// Extend buffer size
#[inline]
pub fn extend(&mut self, increment: usize) {
self.max_bytes += increment;
let extra = vec![0; increment];
self.buffer.extend(extra);
}

/// Report buffer size
#[inline]
pub fn capacity(&mut self) -> usize {
self.max_bytes
}

/// Consumes and returns the current buffer.
#[inline]
pub fn consume(mut self) -> Vec<u8> {
Expand Down

0 comments on commit db8af9b

Please sign in to comment.