Skip to content

Commit

Permalink
fix: do not write empty parquet file/add on writer close; accurately … (
Browse files Browse the repository at this point in the history
delta-io#2123)

…track unflushed row group size

# Description
When writing a batch with the writer, if that batch would result in a
flush and no more batches follow, then on close an empty parquet file
(no rows) is written. This includes an Add in the log that looks like

```
    Add {
        path: "part-00002-3da49db6-e5e9-4426-8839-0092a56cc155-c000.parquet",
        partition_values: {},
        size: 346,
        modification_time: 1706297596165,
        data_change: true,
        stats: Some(
            "{\"numRecords\":0,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}",
        ),
        tags: None,
        deletion_vector: None,
        base_row_id: None,
        default_row_commit_version: None,
        clustering_provider: None,
        stats_parsed: None,
    },
```

The empty stats structs causes issues with scans in datafusion.

Also changed it so the writer tracking internal buffers includes the
parquet writer buffering within its row group writer.
  • Loading branch information
alexwilcoxson-rel authored and RobinLin666 committed Feb 2, 2024
1 parent 7fdb33b commit c9ee664
Showing 1 changed file with 66 additions and 6 deletions.
72 changes: 66 additions & 6 deletions crates/deltalake-core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ impl PartitionWriter {
// replace counter / buffers and close the current writer
let (writer, buffer) = self.reset_writer()?;
let metadata = writer.close()?;
// don't write empty file
if metadata.num_rows == 0 {
return Ok(());
}

let buffer = match buffer.into_inner() {
Some(buffer) => Bytes::from(buffer),
None => return Ok(()), // Nothing to write
Expand Down Expand Up @@ -367,8 +372,12 @@ impl PartitionWriter {
let length = usize::min(self.config.write_batch_size, max_offset - offset);
self.write_batch(&batch.slice(offset, length))?;
// flush currently buffered data to disk once we meet or exceed the target file size.
if self.buffer.len() >= self.config.target_file_size {
debug!("Writing file with size {:?} to disk.", self.buffer.len());
let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size();
if estimated_size >= self.config.target_file_size {
debug!(
"Writing file with estimated size {:?} to disk.",
estimated_size
);
self.flush_arrow_writer().await?;
}
}
Expand Down Expand Up @@ -401,7 +410,7 @@ mod tests {
let batch = get_record_batch(None, false);

// write single un-partitioned batch
let mut writer = get_writer(object_store.clone(), &batch, None, None);
let mut writer = get_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);
Expand Down Expand Up @@ -433,8 +442,8 @@ mod tests {
let properties = WriterProperties::builder()
.set_max_row_group_size(1024)
.build();
// configure small target file size and and row group size so we can observe multiple files written
let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000));
// configure small target file size and row group size so we can observe multiple files written
let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000), None);
writer.write(&batch).await.unwrap();

// check that we have written more then once file, and no more then 1 is below target size
Expand All @@ -446,18 +455,69 @@ mod tests {
assert!(target_file_count >= adds.len() as i32 - 1)
}

#[tokio::test]
async fn test_unflushed_row_group_size() {
let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();

let object_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap()
.object_store();
// configure small target file size so we can observe multiple files written
let mut writer = get_writer(object_store, &batch, None, Some(10_000), None);
writer.write(&batch).await.unwrap();

// check that we have written more then once file, and no more then 1 is below target size
let adds = writer.close().await.unwrap();
assert!(adds.len() > 1);
let target_file_count = adds
.iter()
.fold(0, |acc, add| acc + (add.size > 10_000) as i32);
assert!(target_file_count >= adds.len() as i32 - 1)
}

#[tokio::test]
async fn test_do_not_write_empty_file_on_close() {
let base_int = Arc::new(Int32Array::from((0..10000 as i32).collect::<Vec<i32>>()));
let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));
let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();

let object_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap()
.object_store();
// configure high batch size and low file size to observe one file written and flushed immediately
// upon writing batch, then ensures the buffer is empty upon closing writer
let mut writer = get_writer(object_store, &batch, None, Some(9000), Some(10000));
writer.write(&batch).await.unwrap();

let adds = writer.close().await.unwrap();
assert!(adds.len() == 1);
}

fn get_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
BTreeMap::new(),
writer_properties,
target_file_size,
None,
write_batch_size,
)
.unwrap();
PartitionWriter::try_with_config(object_store, config).unwrap()
Expand Down

0 comments on commit c9ee664

Please sign in to comment.