From 7fbc02b3a739615d3e4d31b5056e512a69bb3c11 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Sat, 27 Jan 2024 17:51:30 -0600 Subject: [PATCH] =?UTF-8?q?fix:=20do=20not=20write=20empty=20parquet=20fil?= =?UTF-8?q?e/add=20on=20writer=20close;=20accurately=20=E2=80=A6=20(#2123)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …track unflushed row group size # Description When writing a batch with the writer, if that batch would result in a flush and no more batches follow, then on close an empty parquet file (no rows) is written. This includes an Add in the log that looks like ``` Add { path: "part-00002-3da49db6-e5e9-4426-8839-0092a56cc155-c000.parquet", partition_values: {}, size: 346, modification_time: 1706297596165, data_change: true, stats: Some( "{\"numRecords\":0,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}", ), tags: None, deletion_vector: None, base_row_id: None, default_row_commit_version: None, clustering_provider: None, stats_parsed: None, }, ``` The empty stats structs causes issues with scans in datafusion. Also changed it so the writer tracking internal buffers includes the parquet writer buffering within its row group writer. --- .../deltalake-core/src/operations/writer.rs | 72 +++++++++++++++++-- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/crates/deltalake-core/src/operations/writer.rs b/crates/deltalake-core/src/operations/writer.rs index d1249f1766..5d8808fa3c 100644 --- a/crates/deltalake-core/src/operations/writer.rs +++ b/crates/deltalake-core/src/operations/writer.rs @@ -322,6 +322,11 @@ impl PartitionWriter { // replace counter / buffers and close the current writer let (writer, buffer) = self.reset_writer()?; let metadata = writer.close()?; + // don't write empty file + if metadata.num_rows == 0 { + return Ok(()); + } + let buffer = match buffer.into_inner() { Some(buffer) => Bytes::from(buffer), None => return Ok(()), // Nothing to write @@ -367,8 +372,12 @@ impl PartitionWriter { let length = usize::min(self.config.write_batch_size, max_offset - offset); self.write_batch(&batch.slice(offset, length))?; // flush currently buffered data to disk once we meet or exceed the target file size. - if self.buffer.len() >= self.config.target_file_size { - debug!("Writing file with size {:?} to disk.", self.buffer.len()); + let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size(); + if estimated_size >= self.config.target_file_size { + debug!( + "Writing file with estimated size {:?} to disk.", + estimated_size + ); self.flush_arrow_writer().await?; } } @@ -401,7 +410,7 @@ mod tests { let batch = get_record_batch(None, false); // write single un-partitioned batch - let mut writer = get_writer(object_store.clone(), &batch, None, None); + let mut writer = get_writer(object_store.clone(), &batch, None, None, None); writer.write(&batch).await.unwrap(); let files = list(object_store.as_ref(), None).await.unwrap(); assert_eq!(files.len(), 0); @@ -433,8 +442,8 @@ mod tests { let properties = WriterProperties::builder() .set_max_row_group_size(1024) .build(); - // configure small target file size and and row group size so we can observe multiple files written - let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000)); + // configure small target file size and row group size so we can observe multiple files written + let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000), None); writer.write(&batch).await.unwrap(); // check that we have written more then once file, and no more then 1 is below target size @@ -446,18 +455,69 @@ mod tests { assert!(target_file_count >= adds.len() as i32 - 1) } + #[tokio::test] + async fn test_unflushed_row_group_size() { + let base_int = Arc::new(Int32Array::from((0..10000).collect::>())); + let base_str = Arc::new(StringArray::from(vec!["A"; 10000])); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); + + let object_store = DeltaTableBuilder::from_uri("memory://") + .build_storage() + .unwrap() + .object_store(); + // configure small target file size so we can observe multiple files written + let mut writer = get_writer(object_store, &batch, None, Some(10_000), None); + writer.write(&batch).await.unwrap(); + + // check that we have written more then once file, and no more then 1 is below target size + let adds = writer.close().await.unwrap(); + assert!(adds.len() > 1); + let target_file_count = adds + .iter() + .fold(0, |acc, add| acc + (add.size > 10_000) as i32); + assert!(target_file_count >= adds.len() as i32 - 1) + } + + #[tokio::test] + async fn test_do_not_write_empty_file_on_close() { + let base_int = Arc::new(Int32Array::from((0..10000 as i32).collect::>())); + let base_str = Arc::new(StringArray::from(vec!["A"; 10000])); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); + + let object_store = DeltaTableBuilder::from_uri("memory://") + .build_storage() + .unwrap() + .object_store(); + // configure high batch size and low file size to observe one file written and flushed immediately + // upon writing batch, then ensures the buffer is empty upon closing writer + let mut writer = get_writer(object_store, &batch, None, Some(9000), Some(10000)); + writer.write(&batch).await.unwrap(); + + let adds = writer.close().await.unwrap(); + assert!(adds.len() == 1); + } + fn get_writer( object_store: ObjectStoreRef, batch: &RecordBatch, writer_properties: Option, target_file_size: Option, + write_batch_size: Option, ) -> PartitionWriter { let config = PartitionWriterConfig::try_new( batch.schema(), BTreeMap::new(), writer_properties, target_file_size, - None, + write_batch_size, ) .unwrap(); PartitionWriter::try_with_config(object_store, config).unwrap()