diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 6fefdca23e1b..53f7e2943dad 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -250,8 +250,17 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { current_decoder.get(&mut out[range]) } - fn skip_values(&mut self, _num_values: usize) -> Result { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + fn skip_values(&mut self, num_values: usize) -> Result { + let encoding = self + .current_encoding + .expect("current_encoding should be set"); + + let current_decoder = self + .decoders + .get_mut(&encoding) + .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding)); + + current_decoder.skip(num_values) } } diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 86ccefbd85eb..7b6fb04a74bb 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -613,6 +613,8 @@ pub(crate) mod private { decoder: &mut PlainDecoderDetails, ) -> Result; + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result; + /// Return the encoded size for a type fn dict_encoding_size(&self) -> (usize, usize) { (std::mem::size_of::(), 1) @@ -690,6 +692,14 @@ pub(crate) mod private { Ok(values_read) } + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result { + let bit_reader = decoder.bit_reader.as_mut().unwrap(); + let num_values = std::cmp::min(num_values, decoder.num_values); + let values_read = bit_reader.skip(num_values, 1); + decoder.num_values -= values_read; + Ok(values_read) + } + #[inline] fn as_i64(&self) -> Result { Ok(*self as i64) @@ -764,6 +774,23 @@ pub(crate) mod private { Ok(num_values) } + #[inline] + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result { + let data = decoder.data.as_ref().expect("set_data should have been called"); + let num_values = num_values.min(decoder.num_values); + let bytes_left = data.len() - decoder.start; + let bytes_to_skip = std::mem::size_of::() * num_values; + + if bytes_left < bytes_to_skip { + return Err(eof_err!("Not enough bytes to skip")); + } + + decoder.start += bytes_to_skip; + decoder.num_values -= num_values; + + Ok(num_values) + } + #[inline] fn as_i64(&$self) -> Result { $as_i64 @@ -853,6 +880,24 @@ pub(crate) mod private { Ok(num_values) } + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result { + let data = decoder + .data + .as_ref() + .expect("set_data should have been called"); + let num_values = std::cmp::min(num_values, decoder.num_values); + let bytes_left = data.len() - decoder.start; + let bytes_to_skip = 12 * num_values; + + if bytes_left < bytes_to_skip { + return Err(eof_err!("Not enough bytes to skip")); + } + decoder.start += bytes_to_skip; + decoder.num_values -= num_values; + + Ok(num_values) + } + #[inline] fn as_any(&self) -> &dyn std::any::Any { self @@ -936,6 +981,24 @@ pub(crate) mod private { Ok(num_values) } + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result { + let data = decoder + .data + .as_mut() + .expect("set_data should have been called"); + let num_values = num_values.min(decoder.num_values); + + for _ in 0..num_values { + let len: usize = + read_num_bytes!(u32, 4, data.start_from(decoder.start).as_ref()) + as usize; + decoder.start += std::mem::size_of::() + len; + } + decoder.num_values -= num_values; + + Ok(num_values) + } + #[inline] fn dict_encoding_size(&self) -> (usize, usize) { (std::mem::size_of::(), self.len()) @@ -1005,6 +1068,28 @@ pub(crate) mod private { Ok(num_values) } + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result { + assert!(decoder.type_length > 0); + + let data = decoder + .data + .as_mut() + .expect("set_data should have been called"); + let num_values = std::cmp::min(num_values, decoder.num_values); + for _ in 0..num_values { + let len = decoder.type_length as usize; + + if data.len() < decoder.start + len { + return Err(eof_err!("Not enough bytes to skip")); + } + + decoder.start += len; + } + decoder.num_values -= num_values; + + Ok(num_values) + } + #[inline] fn dict_encoding_size(&self) -> (usize, usize) { (std::mem::size_of::(), self.len()) diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index b33514aaf629..58aa592d1424 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -206,6 +206,9 @@ pub trait Decoder: Send { /// Returns the encoding for this decoder. fn encoding(&self) -> Encoding; + + /// Skip the specified number of values in this decoder stream. + fn skip(&mut self, num_values: usize) -> Result; } /// Gets a decoder for the column descriptor `descr` and encoding type `encoding`. @@ -291,6 +294,11 @@ impl Decoder for PlainDecoder { fn get(&mut self, buffer: &mut [T::T]) -> Result { T::T::decode(buffer, &mut self.inner) } + + #[inline] + fn skip(&mut self, num_values: usize) -> Result { + T::T::skip(&mut self.inner, num_values) + } } // ---------------------------------------------------------------------- @@ -363,6 +371,15 @@ impl Decoder for DictDecoder { fn encoding(&self) -> Encoding { Encoding::RLE_DICTIONARY } + + fn skip(&mut self, num_values: usize) -> Result { + assert!(self.rle_decoder.is_some()); + assert!(self.has_dictionary, "Must call set_dict() first!"); + + let rle = self.rle_decoder.as_mut().unwrap(); + let num_values = cmp::min(num_values, self.num_values); + rle.skip(num_values) + } } // ---------------------------------------------------------------------- @@ -419,6 +436,14 @@ impl Decoder for RleValueDecoder { self.values_left -= values_read; Ok(values_read) } + + #[inline] + fn skip(&mut self, num_values: usize) -> Result { + let num_values = cmp::min(num_values, self.values_left); + let values_skipped = self.decoder.skip(num_values)?; + self.values_left -= values_skipped; + Ok(values_skipped) + } } // ---------------------------------------------------------------------- @@ -681,6 +706,8 @@ where Ok(to_read) } + + fn values_left(&self) -> usize { self.values_left } @@ -688,6 +715,11 @@ where fn encoding(&self) -> Encoding { Encoding::DELTA_BINARY_PACKED } + + fn skip(&mut self, num_values: usize) -> Result { + let mut buffer = vec![T::T::default(); num_values]; + self.get(&mut buffer) + } } // ---------------------------------------------------------------------- @@ -791,6 +823,25 @@ impl Decoder for DeltaLengthByteArrayDecoder { fn encoding(&self) -> Encoding { Encoding::DELTA_LENGTH_BYTE_ARRAY } + + fn skip(&mut self, num_values: usize) -> Result { + match T::get_physical_type() { + Type::BYTE_ARRAY => { + let num_values = cmp::min(num_values, self.num_values); + + let next_offset: i32 = self.lengths[self.current_idx..self.current_idx + num_values].iter().sum(); + + self.current_idx += num_values; + self.offset += next_offset as usize; + + self.num_values -= num_values; + Ok(num_values) + } + other_type => Err(general_err!( + "DeltaLengthByteArrayDecoder not support {}, only support byte array", other_type + )), + } + } } // ---------------------------------------------------------------------- @@ -922,6 +973,11 @@ impl Decoder for DeltaByteArrayDecoder { fn encoding(&self) -> Encoding { Encoding::DELTA_BYTE_ARRAY } + + fn skip(&mut self, num_values: usize) -> Result { + let mut buffer = vec![T::T::default(); num_values]; + self.get(&mut buffer) + } } #[cfg(test)] @@ -995,6 +1051,32 @@ mod tests { ); } + #[test] + fn test_plain_skip_int32() { + let data = vec![42, 18, 52]; + let data_bytes = Int32Type::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 1, + -1, + &data[1..], + ); + } + + #[test] + fn test_plain_skip_all_int32() { + let data = vec![42, 18, 52]; + let data_bytes = Int32Type::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 5, + -1, + &[], + ); + } + #[test] fn test_plain_decode_int32_spaced() { let data = [42, 18, 52]; @@ -1014,6 +1096,7 @@ mod tests { ); } + #[test] fn test_plain_decode_int64() { let data = vec![42, 18, 52]; @@ -1028,6 +1111,33 @@ mod tests { ); } + #[test] + fn test_plain_skip_int64() { + let data = vec![42, 18, 52]; + let data_bytes = Int64Type::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 2, + -1, + &data[2..], + ); + } + + #[test] + fn test_plain_skip_all_int64() { + let data = vec![42, 18, 52]; + let data_bytes = Int64Type::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 3, + -1, + &[], + ); + } + + #[test] fn test_plain_decode_float() { let data = vec![3.14, 2.414, 12.51]; @@ -1042,6 +1152,58 @@ mod tests { ); } + #[test] + fn test_plain_skip_float() { + let data = vec![3.14, 2.414, 12.51]; + let data_bytes = FloatType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 1, + -1, + &data[1..], + ); + } + + #[test] + fn test_plain_skip_all_float() { + let data = vec![3.14, 2.414, 12.51]; + let data_bytes = FloatType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 4, + -1, + &[], + ); + } + + #[test] + fn test_plain_skip_double() { + let data = vec![3.14f64, 2.414f64, 12.51f64]; + let data_bytes = DoubleType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 1, + -1, + &data[1..], + ); + } + + #[test] + fn test_plain_skip_all_double() { + let data = vec![3.14f64, 2.414f64, 12.51f64]; + let data_bytes = DoubleType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 5, + -1, + &[], + ); + } + #[test] fn test_plain_decode_double() { let data = vec![3.14f64, 2.414f64, 12.51f64]; @@ -1074,6 +1236,40 @@ mod tests { ); } + #[test] + fn test_plain_skip_int96() { + let mut data = vec![Int96::new(); 4]; + data[0].set_data(11, 22, 33); + data[1].set_data(44, 55, 66); + data[2].set_data(10, 20, 30); + data[3].set_data(40, 50, 60); + let data_bytes = Int96Type::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 4, + 2, + -1, + &data[2..], + ); + } + + #[test] + fn test_plain_skip_all_int96() { + let mut data = vec![Int96::new(); 4]; + data[0].set_data(11, 22, 33); + data[1].set_data(44, 55, 66); + data[2].set_data(10, 20, 30); + data[3].set_data(40, 50, 60); + let data_bytes = Int96Type::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 4, + 8, + -1, + &[], + ); + } + #[test] fn test_plain_decode_bool() { let data = vec![ @@ -1090,6 +1286,37 @@ mod tests { ); } + #[test] + fn test_plain_skip_bool() { + let data = vec![ + false, true, false, false, true, false, true, true, false, true, + ]; + let data_bytes = BoolType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 10, + 5, + -1, + &data[5..], + ); + } + + #[test] + fn test_plain_skip_all_bool() { + let data = vec![ + false, true, false, false, true, false, true, true, false, true, + ]; + let data_bytes = BoolType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 10, + 20, + -1, + &[], + ); + } + + #[test] fn test_plain_decode_byte_array() { let mut data = vec![ByteArray::new(); 2]; @@ -1106,6 +1333,36 @@ mod tests { ); } + #[test] + fn test_plain_skip_byte_array() { + let mut data = vec![ByteArray::new(); 2]; + data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes())); + data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes())); + let data_bytes = ByteArrayType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 2, + 1, + -1, + &data[1..], + ); + } + + #[test] + fn test_plain_skip_all_byte_array() { + let mut data = vec![ByteArray::new(); 2]; + data[0].set_data(ByteBufferPtr::new(String::from("hello").into_bytes())); + data[1].set_data(ByteBufferPtr::new(String::from("parquet").into_bytes())); + let data_bytes = ByteArrayType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 2, + 2, + -1, + &[], + ); + } + #[test] fn test_plain_decode_fixed_len_byte_array() { let mut data = vec![FixedLenByteArray::default(); 3]; @@ -1123,6 +1380,38 @@ mod tests { ); } + #[test] + fn test_plain_skip_fixed_len_byte_array() { + let mut data = vec![FixedLenByteArray::default(); 3]; + data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes())); + data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes())); + data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes())); + let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 1, + 4, + &data[1..], + ); + } + + #[test] + fn test_plain_skip_all_fixed_len_byte_array() { + let mut data = vec![FixedLenByteArray::default(); 3]; + data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes())); + data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes())); + data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes())); + let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]); + test_plain_skip::( + ByteBufferPtr::new(data_bytes), + 3, + 6, + 4, + &[], + ); + } + fn test_plain_decode( data: ByteBufferPtr, num_values: usize, @@ -1139,6 +1428,34 @@ mod tests { assert_eq!(buffer, expected); } + fn test_plain_skip( + data: ByteBufferPtr, + num_values: usize, + skip: usize, + type_length: i32, + expected: &[T::T], + ) { + let mut decoder: PlainDecoder = PlainDecoder::new(type_length); + let result = decoder.set_data(data, num_values); + assert!(result.is_ok()); + let skipped = decoder.skip(skip).expect("skipping values"); + + if skip >= num_values { + assert_eq!(skipped, num_values); + + let mut buffer = vec![T::T::default(); 1]; + let remaining = decoder.get(&mut buffer).expect("getting remaining values"); + assert_eq!(remaining, 0); + } else { + assert_eq!(skipped, skip); + let mut buffer = vec![T::T::default(); num_values - skip]; + let remaining = decoder.get(&mut buffer).expect("getting remaining values"); + assert_eq!(remaining, num_values - skip); + assert_eq!(decoder.values_left(), 0); + assert_eq!(buffer, expected); + } + } + fn test_plain_decode_spaced( data: ByteBufferPtr, num_values: usize, @@ -1217,12 +1534,29 @@ mod tests { test_delta_bit_packed_decode::(vec![block_data]); } + #[test] + fn test_skip_delta_bit_packed_int32_repeat() { + let block_data = vec![ + 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, + 3, 4, 5, 6, 7, 8, + ]; + test_skip::(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10); + test_skip::(block_data, Encoding::DELTA_BINARY_PACKED, 100); + } + #[test] fn test_delta_bit_packed_int32_uneven() { let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11]; test_delta_bit_packed_decode::(vec![block_data]); } + #[test] + fn test_skip_delta_bit_packed_int32_uneven() { + let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11]; + test_skip::(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5); + test_skip::(block_data, Encoding::DELTA_BINARY_PACKED, 100); + } + #[test] fn test_delta_bit_packed_int32_same_values() { let block_data = vec![ @@ -1238,21 +1572,55 @@ mod tests { test_delta_bit_packed_decode::(vec![block_data]); } + #[test] + fn test_skip_delta_bit_packed_int32_same_values() { + let block_data = vec![ + 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, + 127, + ]; + test_skip::(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5); + test_skip::(block_data, Encoding::DELTA_BINARY_PACKED, 100); + + let block_data = vec![ + -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, + -127, -127, -127, + ]; + test_skip::(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5); + test_skip::(block_data, Encoding::DELTA_BINARY_PACKED, 100); + + } + #[test] fn test_delta_bit_packed_int32_min_max() { let block_data = vec![ - i32::min_value(), - i32::max_value(), - i32::min_value(), - i32::max_value(), - i32::min_value(), - i32::max_value(), - i32::min_value(), - i32::max_value(), + i32::MIN, + i32::MIN, + i32::MIN, + i32::MAX, + i32::MIN, + i32::MAX, + i32::MIN, + i32::MAX, ]; test_delta_bit_packed_decode::(vec![block_data]); } + #[test] + fn test_skip_delta_bit_packed_int32_min_max() { + let block_data = vec![ + i32::MIN, + i32::MIN, + i32::MIN, + i32::MAX, + i32::MIN, + i32::MAX, + i32::MIN, + i32::MAX, + ]; + test_skip::(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5); + test_skip::(block_data, Encoding::DELTA_BINARY_PACKED, 100); + } + #[test] fn test_delta_bit_packed_int32_multiple_blocks() { // Test multiple 'put' calls on the same encoder @@ -1493,6 +1861,44 @@ mod tests { assert_eq!(result, expected); } + fn test_skip(data: Vec, encoding: Encoding, skip: usize) { + // Type length should not really matter for encode/decode test, + // otherwise change it based on type + let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type()); + + // Encode data + let mut encoder = + get_encoder::(col_descr.clone(), encoding).expect("get encoder"); + + encoder.put(&data).expect("ok to encode"); + + let bytes = encoder.flush_buffer().expect("ok to flush buffer"); + + let mut decoder = get_decoder::(col_descr, encoding).expect("get decoder"); + decoder + .set_data(bytes, data.len()) + .expect("ok to set data"); + + if skip >= data.len() { + let skipped = decoder.skip(skip).expect("ok to skip"); + assert_eq!(skipped, data.len()); + + let skipped_again = decoder.skip(skip).expect("ok to skip again"); + assert_eq!(skipped_again, 0); + } else { + let skipped = decoder.skip(skip).expect("ok to skip"); + assert_eq!(skipped, skip); + + let remaining = data.len() - skip; + + let expected = &data[skip..]; + let mut buffer = vec![T::T::default(); remaining]; + let fetched = decoder.get(&mut buffer).expect("ok to decode"); + assert_eq!(remaining,fetched); + assert_eq!(&buffer, expected); + } + } + fn create_and_check_decoder( encoding: Encoding, err: Option, diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 5f6f91a8bd0a..808c8f0d49a6 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -434,6 +434,39 @@ impl RleDecoder { Ok(values_read) } + #[inline(never)] + pub fn skip(&mut self, num_values: usize) -> Result { + let mut values_skipped = 0; + while values_skipped < num_values { + if self.rle_left > 0 { + let num_values = cmp::min(num_values - values_skipped, self.rle_left as usize); + self.rle_left -= num_values as u32; + values_skipped += num_values; + } else if self.bit_packed_left > 0 { + let mut num_values = + cmp::min(num_values - values_skipped, self.bit_packed_left as usize); + let bit_reader = + self.bit_reader.as_mut().expect("bit_reader should be set"); + + num_values = bit_reader.skip( + num_values, + self.bit_width as usize, + ); + if num_values == 0 { + // Handle writers which truncate the final block + self.bit_packed_left = 0; + continue; + } + self.bit_packed_left -= num_values as u32; + values_skipped += num_values; + } else if !self.reload() { + break; + } + } + + Ok(values_skipped) + } + #[inline(never)] pub fn get_batch_with_dict( &mut self, @@ -538,6 +571,23 @@ mod tests { assert_eq!(buffer, expected); } + #[test] + fn test_rle_skip_int32() { + // Test data: 0-7 with bit width 3 + // 00000011 10001000 11000110 11111010 + let data = ByteBufferPtr::new(vec![0x03, 0x88, 0xC6, 0xFA]); + let mut decoder: RleDecoder = RleDecoder::new(3); + decoder.set_data(data); + let expected = vec![2, 3, 4, 5, 6, 7]; + let skipped = decoder.skip(2).expect("skipping values"); + assert_eq!(skipped, 2); + + let mut buffer = vec![0; 6]; + let remaining = decoder.get_batch::(&mut buffer).expect("getting remaining"); + assert_eq!(remaining, 6); + assert_eq!(buffer, expected); + } + #[test] fn test_rle_consume_flush_buffer() { let data = vec![1, 1, 1, 2, 2, 3, 3, 3]; @@ -596,6 +646,48 @@ mod tests { assert_eq!(buffer, expected); } + #[test] + fn test_rle_skip_bool() { + // RLE test data: 50 1s followed by 50 0s + // 01100100 00000001 01100100 00000000 + let data1 = ByteBufferPtr::new(vec![0x64, 0x01, 0x64, 0x00]); + + // Bit-packing test data: alternating 1s and 0s, 100 total + // 100 / 8 = 13 groups + // 00011011 10101010 ... 00001010 + let data2 = ByteBufferPtr::new(vec![ + 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, + 0x0A, + ]); + + let mut decoder: RleDecoder = RleDecoder::new(1); + decoder.set_data(data1); + let mut buffer = vec![true; 50]; + let expected = vec![false; 50]; + + let skipped = decoder.skip(50).expect("skipping first 50"); + assert_eq!(skipped, 50); + let remainder = decoder.get_batch::(&mut buffer).expect("getting remaining 50"); + assert_eq!(remainder, 50); + assert_eq!(buffer, expected); + + decoder.set_data(data2); + let mut buffer = vec![false; 50]; + let mut expected = vec![]; + for i in 0..50 { + if i % 2 == 0 { + expected.push(false); + } else { + expected.push(true); + } + } + let skipped = decoder.skip(50).expect("skipping first 50"); + assert_eq!(skipped, 50); + let remainder = decoder.get_batch::(&mut buffer).expect("getting remaining 50"); + assert_eq!(remainder, 50); + assert_eq!(buffer, expected); + } + #[test] fn test_rle_decode_with_dict_int32() { // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s @@ -631,6 +723,45 @@ mod tests { assert_eq!(buffer, expected); } + #[test] + fn test_rle_skip_dict() { + // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s + // 00000110 00000000 00001000 00000001 00001010 00000010 + let dict = vec![10, 20, 30]; + let data = ByteBufferPtr::new(vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]); + let mut decoder: RleDecoder = RleDecoder::new(3); + decoder.set_data(data); + let mut buffer = vec![0; 10]; + let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30]; + let skipped = decoder.skip(2).expect("skipping two values"); + assert_eq!(skipped, 2); + let remainder = decoder.get_batch_with_dict::(&dict, &mut buffer, 10).expect("getting remainder"); + assert_eq!(remainder, 10); + assert_eq!(buffer, expected); + + // Test bit-pack encoding: 345345345455 (2 groups: 8 and 4) + // 011 100 101 011 100 101 011 100 101 100 101 101 + // 00000011 01100011 11000111 10001110 00000011 01100101 00001011 + let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"]; + let data = ByteBufferPtr::new(vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]); + let mut decoder: RleDecoder = RleDecoder::new(3); + decoder.set_data(data); + let mut buffer = vec![""; 8]; + let expected = vec![ + "eee", "fff", "ddd", "eee", "fff", "eee", "fff", + "fff", + ]; + let skipped = decoder.skip(4).expect("skipping four values"); + assert_eq!(skipped, 4); + let remainder = decoder.get_batch_with_dict::<&str>( + dict.as_slice(), + buffer.as_mut_slice(), + 8, + ).expect("getting remainder"); + assert_eq!(remainder, 8); + assert_eq!(buffer, expected); + } + fn validate_rle( values: &[i64], bit_width: u8, diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index c4c1f96f9f4c..29269c4ad7e2 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -519,6 +519,28 @@ impl BitReader { Some(from_ne_slice(v.as_bytes())) } + /// Skip one value of size `num_bits`. + /// + /// Returns `false` if there are no more values to skip, `true` otherwise. + pub fn skip_value(&mut self, num_bits: usize) -> bool { + assert!(num_bits <= 64); + + if self.byte_offset * 8 + self.bit_offset + num_bits > self.total_bytes * 8 { + return false; + } + + self.bit_offset += num_bits; + + if self.bit_offset >= 64 { + self.byte_offset += 8; + self.bit_offset -= 64; + + self.reload_buffer_values(); + } + + true + } + /// Read multiple values from their packed representation /// /// # Panics @@ -605,6 +627,47 @@ impl BitReader { values_to_read } + /// Skip num_value values with num_bits bit width + /// + /// Return the number of values skipped (up to num_values) + pub fn skip(&mut self, num_values: usize, num_bits: usize) -> usize { + assert!(num_bits <= 64); + + let mut num_values = num_values; + let needed_bits = num_bits * num_values; + let remaining_bits = (self.total_bytes - self.byte_offset) * 8 - self.bit_offset; + if remaining_bits < needed_bits { + num_values = remaining_bits / num_bits; + } + + let mut values_skipped = 0; + + // First align bit offset to byte offset + if self.bit_offset != 0 { + while values_skipped < num_values && self.bit_offset != 0 { + self + .skip_value(num_bits); + values_skipped += 1; + } + } + + while num_values - values_skipped >= 32 { + self.byte_offset += 4 * num_bits; + values_skipped += 32; + } + + + assert!(num_values - values_skipped < 32); + + self.reload_buffer_values(); + while values_skipped < num_values { + self.skip_value(num_bits); + values_skipped += 1; + } + + num_values + } + /// Reads up to `num_bytes` to `buf` returning the number of bytes read pub(crate) fn get_aligned_bytes( &mut self, @@ -759,6 +822,43 @@ mod tests { assert_eq!(bit_reader.get_value::(4), Some(3)); } + #[test] + fn test_bit_reader_skip_value() { + let buffer = vec![255, 0]; + let mut bit_reader = BitReader::from(buffer); + let skipped = bit_reader.skip_value(1); + assert!(skipped); + assert_eq!(bit_reader.get_value::(1), Some(1)); + let skipped = bit_reader.skip_value(2); + assert!(skipped); + assert_eq!(bit_reader.get_value::(2), Some(3)); + let skipped = bit_reader.skip_value(1); + assert!(skipped); + assert_eq!(bit_reader.get_value::(4), Some(1)); + let skipped = bit_reader.skip_value(1); + assert!(skipped); + assert_eq!(bit_reader.get_value::(4), Some(0)); + let skipped = bit_reader.skip_value(1); + assert!(!skipped); + } + + #[test] + fn test_bit_reader_skip() { + let buffer = vec![255, 0]; + let mut bit_reader = BitReader::from(buffer); + let skipped = bit_reader.skip(1,1); + assert_eq!(skipped, 1); + assert_eq!(bit_reader.get_value::(1), Some(1)); + let skipped = bit_reader.skip(2,2); + assert_eq!(skipped, 2); + assert_eq!(bit_reader.get_value::(2), Some(3)); + let skipped = bit_reader.skip(4,1); + assert_eq!(skipped, 4); + assert_eq!(bit_reader.get_value::(4), Some(0)); + let skipped = bit_reader.skip(1,1); + assert_eq!(skipped, 0); + } + #[test] fn test_bit_reader_get_value_boundary() { let buffer = vec![10, 0, 0, 0, 20, 0, 30, 0, 0, 0, 40, 0]; @@ -769,6 +869,17 @@ mod tests { assert_eq!(bit_reader.get_value::(16), Some(40)); } + #[test] + fn test_bit_reader_skip_boundary() { + let buffer = vec![10, 0, 0, 0, 20, 0, 30, 0, 0, 0, 40, 0]; + let mut bit_reader = BitReader::from(buffer); + assert_eq!(bit_reader.get_value::(32), Some(10)); + let skipped = bit_reader.skip_value(16); + assert!(skipped); + assert_eq!(bit_reader.get_value::(32), Some(30)); + assert_eq!(bit_reader.get_value::(16), Some(40)); + } + #[test] fn test_bit_reader_get_aligned() { // 01110101 11001011