Skip to content

Commit

Permalink
UTF-8 Validation (apache#786)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 12, 2022
1 parent 578ca91 commit 8572713
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 51 deletions.
137 changes: 88 additions & 49 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::buffer::{
BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
};
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::Encoding;
use crate::basic::{ConvertedType, Encoding};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice};
use crate::data_type::Int32Type;
Expand Down Expand Up @@ -192,7 +211,16 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
self.offsets.len() - 1
}

fn try_push(&mut self, data: &[u8]) -> Result<()> {
fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
if validate_utf8 {
if let Err(e) = std::str::from_utf8(data) {
return Err(ParquetError::General(format!(
"encountered non UTF-8 data: {}",
e
)));
}
}

self.values.extend_from_slice(data);

let index_offset = I::from_usize(self.values.len())
Expand Down Expand Up @@ -297,15 +325,18 @@ impl<I: ScalarValue> ValuesBufferSlice for OffsetBuffer<I> {
struct ByteArrayDecoder<I: ScalarValue> {
dict: Option<OffsetBuffer<I>>,
decoder: Option<StringDecoder>,
validate_utf8: bool,
}

impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder for ByteArrayDecoder<I> {
type Slice = OffsetBuffer<I>;

fn new(_: &ColumnDescPtr) -> Self {
fn new(desc: &ColumnDescPtr) -> Self {
let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
Self {
dict: None,
decoder: None,
validate_utf8,
}
}

Expand All @@ -327,7 +358,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder for ByteArrayDecoder<I
}

let mut buffer = OffsetBuffer::default();
let mut decoder = PlainDecoder::new(buf, num_values as usize);
let mut decoder = PlainDecoder::new(buf, num_values as usize, self.validate_utf8);
decoder.read(&mut buffer, usize::MAX)?;
self.dict = Some(buffer);
Ok(())
Expand All @@ -340,16 +371,20 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder for ByteArrayDecoder<I
num_values: usize,
) -> Result<()> {
let decoder = match encoding {
Encoding::PLAIN => StringDecoder::Plain(PlainDecoder::new(data, num_values)),
Encoding::PLAIN => StringDecoder::Plain(PlainDecoder::new(
data,
num_values,
self.validate_utf8,
)),
Encoding::RLE_DICTIONARY => {
StringDecoder::Dictionary(DictionaryDecoder::new(data))
}
Encoding::DELTA_LENGTH_BYTE_ARRAY => {
StringDecoder::DeltaLength(DeltaLengthDecoder::new(data, num_values)?)
}
Encoding::DELTA_BYTE_ARRAY => {
StringDecoder::DeltaStrings(DeltaStringsDecoder::new(data, num_values)?)
}
Encoding::DELTA_LENGTH_BYTE_ARRAY => StringDecoder::DeltaLength(
DeltaLengthDecoder::new(data, num_values, self.validate_utf8)?,
),
Encoding::DELTA_BYTE_ARRAY => StringDecoder::DeltaByteArray(
DeltaByteArrayDecoder::new(data, num_values, self.validate_utf8)?,
),
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
Expand All @@ -370,7 +405,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder for ByteArrayDecoder<I
d.read(out, dict, len)
}
StringDecoder::DeltaLength(d) => d.read(out, len),
StringDecoder::DeltaStrings(d) => d.read(out, len),
StringDecoder::DeltaByteArray(d) => d.read(out, len),
}
}
}
Expand All @@ -379,20 +414,22 @@ enum StringDecoder {
Plain(PlainDecoder),
Dictionary(DictionaryDecoder),
DeltaLength(DeltaLengthDecoder),
DeltaStrings(DeltaStringsDecoder),
DeltaByteArray(DeltaByteArrayDecoder),
}

/// Decoder for [`Encoding::PLAIN`]
struct PlainDecoder {
buf: ByteBufferPtr,
offset: usize,
remaining_values: usize,
validate_utf8: bool,
}

impl PlainDecoder {
fn new(buf: ByteBufferPtr, values: usize) -> Self {
fn new(buf: ByteBufferPtr, values: usize, validate_utf8: bool) -> Self {
Self {
buf,
validate_utf8,
offset: 0,
remaining_values: values,
}
Expand Down Expand Up @@ -435,7 +472,7 @@ impl PlainDecoder {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

output.try_push(&buf[start_offset..end_offset])?;
output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?;

self.offset = end_offset;
read += 1;
Expand All @@ -451,10 +488,11 @@ struct DeltaLengthDecoder {
data: ByteBufferPtr,
length_offset: usize,
data_offset: usize,
validate_utf8: bool,
}

impl DeltaLengthDecoder {
fn new(data: ByteBufferPtr, values: usize) -> Result<Self> {
fn new(data: ByteBufferPtr, values: usize, validate_utf8: bool) -> Result<Self> {
let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
len_decoder.set_data(data.all(), values)?;
let mut lengths = vec![0; values];
Expand All @@ -463,6 +501,7 @@ impl DeltaLengthDecoder {
Ok(Self {
lengths,
data,
validate_utf8,
length_offset: 0,
data_offset: len_decoder.get_offset(),
})
Expand All @@ -474,43 +513,48 @@ impl DeltaLengthDecoder {
len: usize,
) -> Result<usize> {
let to_read = len.min(self.lengths.len() - self.length_offset);

output.offsets.reserve(to_read);

let mut to_read_bytes: usize = 0;
let mut offset = output.values.len();
let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];

for length in &self.lengths[self.length_offset..self.length_offset + to_read] {
offset = offset.saturating_add(*length as usize);
to_read_bytes += *length as usize;
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
output.values.reserve(total_bytes);

let offset_i = I::from_usize(offset)
.ok_or_else(|| general_err!("index overflow decoding byte array"))?;
output.offsets.push(offset_i)
if self.data_offset + total_bytes > self.data.len() {
return Err(ParquetError::EOF(
"Insufficient delta length byte array bytes".to_string(),
));
}

output.values.extend_from_slice(
&self.data.as_ref()[self.data_offset..self.data_offset + to_read_bytes],
);
let mut start_offset = self.data_offset;
for length in src_lengths {
let end_offset = start_offset + *length as usize;
output.try_push(
&self.data.as_ref()[start_offset..end_offset],
self.validate_utf8,
)?;
start_offset = end_offset;
}

self.data_offset += to_read_bytes;
self.data_offset += start_offset;
self.length_offset += to_read;
Ok(to_read)
}
}

/// Decoder for [`Encoding::DELTA_BYTE_ARRAY`]
struct DeltaStringsDecoder {
struct DeltaByteArrayDecoder {
prefix_lengths: Vec<i32>,
suffix_lengths: Vec<i32>,
data: ByteBufferPtr,
length_offset: usize,
data_offset: usize,
last_value: Vec<u8>,
validate_utf8: bool,
}

impl DeltaStringsDecoder {
fn new(data: ByteBufferPtr, values: usize) -> Result<Self> {
impl DeltaByteArrayDecoder {
fn new(data: ByteBufferPtr, values: usize, validate_utf8: bool) -> Result<Self> {
let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
prefix.set_data(data.all(), values)?;
let mut prefix_lengths = vec![0; values];
Expand All @@ -528,6 +572,7 @@ impl DeltaStringsDecoder {
length_offset: 0,
data_offset: prefix.get_offset() + suffix.get_offset(),
last_value: vec![],
validate_utf8,
})
}

Expand All @@ -547,31 +592,23 @@ impl DeltaStringsDecoder {
.iter()
.zip(&self.suffix_lengths[length_range]);

let mut offset = output.values.len();
let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let total_length = *prefix_length as usize + *suffix_length as usize;
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + total_length > self.data.len() {
if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

offset = offset.saturating_add(total_length);

let offset_i = I::from_usize(offset)
.ok_or_else(|| general_err!("index overflow decoding byte array"))?;
output.offsets.push(offset_i);

self.last_value.truncate(*prefix_length as usize);
self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + total_length],
&data[self.data_offset..self.data_offset + suffix_length],
);
output.try_push(&self.last_value, self.validate_utf8)?;

output.values.reserve(total_length);
output.values.extend_from_slice(&self.last_value);

self.data_offset += total_length;
self.data_offset += suffix_length;
}

self.length_offset += to_read;
Expand Down Expand Up @@ -630,7 +667,9 @@ impl DictionaryDecoder {
}
let start_offset = offsets[index].to_usize().unwrap();
let end_offset = offsets[index + 1].to_usize().unwrap();
output.try_push(&values[start_offset..end_offset])?;

// Dictionary values are verified when decoding dictionary page
output.try_push(&values[start_offset..end_offset], false)?;
}

self.index_offset += to_read;
Expand Down
37 changes: 37 additions & 0 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ mod tests {
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::{FileWriter, SerializedFileWriter};
use crate::schema::types::{Type, TypePtr};
use crate::util::cursor::SliceableCursor;
use crate::util::test_common::{get_temp_filename, RandGen};
use arrow::array::*;
use arrow::datatypes::DataType as ArrowDataType;
Expand Down Expand Up @@ -868,4 +869,40 @@ mod tests {
batch.unwrap();
}
}

#[test]
fn test_invalid_utf8() {
// a parquet file with 1 column with invalid utf8
let data = vec![
80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4,
21, 0, 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255,
108, 111, 0, 0, 0, 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28,
21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38,
8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111,
0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116, 21, 2, 0, 21, 12, 37, 2,
24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28, 38, 110, 28,
21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38,
8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111,
0, 0, 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32,
78, 97, 116, 105, 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108,
101, 109, 101, 110, 116, 97, 116, 105, 111, 110, 32, 111, 102, 32, 65, 114,
114, 111, 119, 0, 130, 0, 0, 0, 80, 65, 82, 49,
];

let file = SliceableCursor::new(data);
let file_reader = SerializedFileReader::new(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));

let mut record_batch_reader = arrow_reader
.get_record_reader_by_columns(vec![0], 10)
.unwrap();

let error = record_batch_reader.next().unwrap().unwrap_err();

assert!(
error.to_string().contains("invalid utf-8 sequence"),
"{}",
error
);
}
}
4 changes: 2 additions & 2 deletions parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ impl<T: ScalarValue> ScalarBuffer<T> {
self.len == 0
}

pub fn reserve(&mut self, len: usize) {
self.buffer.reserve(len * std::mem::size_of::<T>());
pub fn reserve(&mut self, additional: usize) {
self.buffer.reserve(additional * std::mem::size_of::<T>());
}

pub fn resize(&mut self, len: usize) {
Expand Down

0 comments on commit 8572713

Please sign in to comment.