Skip to content

Commit

Permalink
fix RowWriter index out of bounds error (#2968)
Browse files Browse the repository at this point in the history
* fix writer index out of bounds

* fix comments

* removed obsolete comments
  • Loading branch information
comphead authored Jul 29, 2022
1 parent 176f432 commit 811bad4
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
37 changes: 37 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1266,4 +1266,41 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn row_writer_resize_test() -> Result<()> {
let schema = Arc::new(Schema::new(vec![arrow::datatypes::Field::new(
"column_1",
DataType::Utf8,
false,
)]));

let data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
Some("2a0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"),
Some("3a0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000800"),
]))
],
)?;

let table = crate::datasource::MemTable::try_new(schema, vec![vec![data]])?;

let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table))?;

let sql = r#"
SELECT
COUNT(1)
FROM
test
GROUP BY
column_1"#;

let df = ctx.sql(sql).await.unwrap();
df.show_limit(10).await.unwrap();

Ok(())
}
}
28 changes: 13 additions & 15 deletions datafusion/row/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl RowWriter {
pub(crate) fn end_padding(&mut self) {
let payload_width = self.current_width();
self.row_width = round_upto_power_of_2(payload_width, 8);
if self.data.capacity() < self.row_width {
if self.data.len() < self.row_width {
self.data.resize(self.row_width, 0);
}
}
Expand All @@ -269,29 +269,29 @@ impl RowWriter {

/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width
pub fn write_row(
row: &mut RowWriter,
row_writer: &mut RowWriter,
row_idx: usize,
schema: &Schema,
columns: &[ArrayRef],
) -> usize {
// Get the row from the batch denoted by row_idx
if row.null_free() {
if row_writer.null_free() {
for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) {
write_field(i, row_idx, col, f.data_type(), row);
write_field(i, row_idx, col, f.data_type(), row_writer);
}
} else {
for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) {
if !col.is_null(row_idx) {
row.set_non_null_at(i);
write_field(i, row_idx, col, f.data_type(), row);
row_writer.set_non_null_at(i);
write_field(i, row_idx, col, f.data_type(), row_writer);
} else {
row.set_null_at(i);
row_writer.set_null_at(i);
}
}
}

row.end_padding();
row.row_width
row_writer.end_padding();
row_writer.row_width
}

macro_rules! fn_write_field {
Expand Down Expand Up @@ -349,9 +349,8 @@ pub(crate) fn write_field_utf8(
let from = from.as_any().downcast_ref::<StringArray>().unwrap();
let s = from.value(row_idx);
let new_width = to.current_width() + s.as_bytes().len();
if new_width > to.data.capacity() {
// double the capacity to avoid repeated resize
to.data.resize(max(to.data.capacity() * 2, new_width), 0);
if new_width > to.data.len() {
to.data.resize(max(to.data.capacity(), new_width), 0);
}
to.set_utf8(col_idx, s);
}
Expand All @@ -365,9 +364,8 @@ pub(crate) fn write_field_binary(
let from = from.as_any().downcast_ref::<BinaryArray>().unwrap();
let s = from.value(row_idx);
let new_width = to.current_width() + s.len();
if new_width > to.data.capacity() {
// double the capacity to avoid repeated resize
to.data.resize(max(to.data.capacity() * 2, new_width), 0);
if new_width > to.data.len() {
to.data.resize(max(to.data.capacity(), new_width), 0);
}
to.set_binary(col_idx, s);
}
Expand Down

0 comments on commit 811bad4

Please sign in to comment.