diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 2c99f5be14b6..9084619e4a1d 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1266,4 +1266,41 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn row_writer_resize_test() -> Result<()> { + let schema = Arc::new(Schema::new(vec![arrow::datatypes::Field::new( + "column_1", + DataType::Utf8, + false, + )])); + + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + Some("2a0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), + Some("3a0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000800"), + ])) + ], + )?; + + let table = crate::datasource::MemTable::try_new(schema, vec![vec![data]])?; + + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(table))?; + + let sql = r#" + SELECT + COUNT(1) + FROM + test + GROUP BY + column_1"#; + + let df = ctx.sql(sql).await.unwrap(); + df.show_limit(10).await.unwrap(); + + Ok(()) + } } diff --git a/datafusion/row/src/writer.rs b/datafusion/row/src/writer.rs index 2992ec175b65..fcfaae976c45 100644 --- a/datafusion/row/src/writer.rs +++ b/datafusion/row/src/writer.rs @@ -256,7 +256,7 @@ impl RowWriter { pub(crate) fn end_padding(&mut self) { let payload_width = self.current_width(); self.row_width = round_upto_power_of_2(payload_width, 8); - if self.data.capacity() < self.row_width { + if self.data.len() < self.row_width { self.data.resize(self.row_width, 0); } } @@ -269,29 +269,29 @@ impl RowWriter { /// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width pub fn write_row( - row: &mut RowWriter, + row_writer: &mut RowWriter, row_idx: usize, schema: &Schema, columns: &[ArrayRef], ) -> usize { // Get the row from the batch denoted by row_idx - if row.null_free() { + if row_writer.null_free() { for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) { - write_field(i, row_idx, col, f.data_type(), row); + write_field(i, row_idx, col, f.data_type(), row_writer); } } else { for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) { if !col.is_null(row_idx) { - row.set_non_null_at(i); - write_field(i, row_idx, col, f.data_type(), row); + row_writer.set_non_null_at(i); + write_field(i, row_idx, col, f.data_type(), row_writer); } else { - row.set_null_at(i); + row_writer.set_null_at(i); } } } - row.end_padding(); - row.row_width + row_writer.end_padding(); + row_writer.row_width } macro_rules! fn_write_field { @@ -349,9 +349,8 @@ pub(crate) fn write_field_utf8( let from = from.as_any().downcast_ref::().unwrap(); let s = from.value(row_idx); let new_width = to.current_width() + s.as_bytes().len(); - if new_width > to.data.capacity() { - // double the capacity to avoid repeated resize - to.data.resize(max(to.data.capacity() * 2, new_width), 0); + if new_width > to.data.len() { + to.data.resize(max(to.data.capacity(), new_width), 0); } to.set_utf8(col_idx, s); } @@ -365,9 +364,8 @@ pub(crate) fn write_field_binary( let from = from.as_any().downcast_ref::().unwrap(); let s = from.value(row_idx); let new_width = to.current_width() + s.len(); - if new_width > to.data.capacity() { - // double the capacity to avoid repeated resize - to.data.resize(max(to.data.capacity() * 2, new_width), 0); + if new_width > to.data.len() { + to.data.resize(max(to.data.capacity(), new_width), 0); } to.set_binary(col_idx, s); }