Skip to content

Commit

Permalink
Preserve bitmask (apache#1037)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 11, 2022
1 parent c6f98f7 commit c92e79b
Show file tree
Hide file tree
Showing 6 changed files with 444 additions and 81 deletions.
5 changes: 5 additions & 0 deletions arrow/src/array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ impl BooleanBufferBuilder {
);
}

/// Returns the packed bits
pub fn as_slice(&self) -> &[u8] {
self.buffer.as_slice()
}

#[inline]
pub fn finish(&mut self) -> Buffer {
let buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0));
Expand Down
9 changes: 7 additions & 2 deletions parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,13 @@ fn create_int32_primitive_array_reader(
column_desc: ColumnDescPtr,
) -> impl ArrayReader {
use parquet::arrow::array_reader::PrimitiveArrayReader;
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap()
PrimitiveArrayReader::<Int32Type>::new_with_options(
Box::new(page_iterator),
column_desc,
None,
true,
)
.unwrap()
}

fn create_string_arrow_array_reader(
Expand Down
92 changes: 62 additions & 30 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,17 @@ where
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Self> {
Self::new_with_options(pages, column_desc, arrow_type, false)
}

/// Construct primitive array reader with ability to only compute null mask and not
/// buffer level data
pub fn new_with_options(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
null_mask_only: bool,
) -> Result<Self> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Expand All @@ -256,7 +267,7 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new(column_desc.clone());
let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -1350,19 +1361,26 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
let mut new_context = context.clone();
new_context.path.append(vec![cur_type.name().to_string()]);

match cur_type.get_basic_info().repetition() {
let null_mask_only = match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
false
}
Repetition::OPTIONAL => {
new_context.def_level += 1;

// Can just compute null mask if no parent
context.def_level == 0 && context.rep_level == 0
}
_ => (),
}
_ => false,
};

let reader =
self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?;
let reader = self.build_for_primitive_type_inner(
cur_type.clone(),
&new_context,
null_mask_only,
)?;

if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
Err(ArrowError(
Expand Down Expand Up @@ -1641,6 +1659,7 @@ impl<'a> ArrayReaderBuilder {
&self,
cur_type: TypePtr,
context: &'a ArrayReaderBuilderContext,
null_mask_only: bool,
) -> Result<Box<dyn ArrayReader>> {
let column_desc = Arc::new(ColumnDescriptor::new(
cur_type.clone(),
Expand All @@ -1658,30 +1677,39 @@ impl<'a> ArrayReaderBuilder {
.map(|f| f.data_type().clone());

match cur_type.get_physical_type() {
PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::BOOLEAN => Ok(Box::new(
PrimitiveArrayReader::<BoolType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::INT32 => {
if let Some(ArrowType::Null) = arrow_type {
Ok(Box::new(NullArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?))
} else {
Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
arrow_type,
)?))
Ok(Box::new(
PrimitiveArrayReader::<Int32Type>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
))
}
}
PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::INT64 => Ok(Box::new(
PrimitiveArrayReader::<Int64Type>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::INT96 => {
// get the optional timezone information from arrow type
let timezone = arrow_type
Expand All @@ -1705,18 +1733,22 @@ impl<'a> ArrayReaderBuilder {
arrow_type,
)?))
}
PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::DOUBLE => {
Ok(Box::new(PrimitiveArrayReader::<DoubleType>::new(
PhysicalType::FLOAT => Ok(Box::new(
PrimitiveArrayReader::<FloatType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
)?))
}
null_mask_only,
)?,
)),
PhysicalType::DOUBLE => Ok(Box::new(
PrimitiveArrayReader::<DoubleType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::BYTE_ARRAY => {
if cur_type.get_basic_info().converted_type() == ConvertedType::UTF8 {
if let Some(ArrowType::LargeUtf8) = arrow_type {
Expand Down
21 changes: 14 additions & 7 deletions parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,19 @@ where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
/// Create a new [`GenericRecordReader`]
pub fn new(desc: ColumnDescPtr) -> Self {
let def_levels =
(desc.max_def_level() > 0).then(|| DefinitionLevelBuffer::new(&desc));
Self::new_with_options(desc, false)
}

/// Create a new [`GenericRecordReader`] with the ability to only generate the bitmask
///
/// If `null_mask_only` is true only the null bitmask will be generated and
/// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will always return `None`
///
pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool) -> Self {
let def_levels = (desc.max_def_level() > 0)
.then(|| DefinitionLevelBuffer::new(&desc, null_mask_only));

let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);

Expand Down Expand Up @@ -171,7 +181,7 @@ where
/// as record values, e.g. those from `self.num_values` to `self.values_written`.
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
Ok(match self.def_levels.as_mut() {
Some(x) => Some(x.split_off(self.num_values)),
Some(x) => x.split_levels(self.num_values),
None => None,
})
}
Expand Down Expand Up @@ -221,10 +231,7 @@ where
.as_mut()
.map(|levels| levels.spare_capacity_mut(batch_size));

let def_levels = self
.def_levels
.as_mut()
.map(|levels| levels.spare_capacity_mut(batch_size));
let def_levels = self.def_levels.as_mut();

let values = self.records.spare_capacity_mut(batch_size);

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ impl<T: ScalarValue> ScalarBuffer<T> {
self.len == 0
}

pub fn resize(&mut self, len: usize) {
self.buffer.resize(len * std::mem::size_of::<T>(), 0);
self.len = len;
}

#[inline]
pub fn as_slice(&self) -> &[T] {
let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
Expand Down
Loading

0 comments on commit c92e79b

Please sign in to comment.