Skip to content

Commit

Permalink
fix(spill): Delete spill file when dropping for rust FileSpill (#660)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston authored Nov 26, 2024
1 parent 6d51289 commit 2f3aed3
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions native-engine/datafusion-ext-plans/src/memmgr/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::{
any::Any,
fs,
fs::{File, OpenOptions},
io::{BufReader, BufWriter, Cursor, Read, Seek, Write},
sync::Arc,
Expand All @@ -26,6 +27,7 @@ use blaze_jni_bridge::{
};
use datafusion::{common::Result, parquet::file::reader::Length, physical_plan::metrics::Time};
use jni::{objects::GlobalRef, sys::jlong};
use log::warn;

use crate::memmgr::metrics::SpillMetrics;

Expand Down Expand Up @@ -83,7 +85,7 @@ pub fn try_new_spill(spill_metrics: &SpillMetrics) -> Result<Box<dyn Spill>> {

/// A spill structure which write data to temporary files
/// used in driver side or executor side with on-heap memory is full
struct FileSpill(File, SpillMetrics);
struct FileSpill(File, SpillMetrics, Option<String>);
impl FileSpill {
fn try_new(spill_metrics: &SpillMetrics) -> Result<Self> {
if is_jni_bridge_inited() {
Expand All @@ -98,10 +100,10 @@ impl FileSpill {
.write(true)
.read(true)
.open(&file_name)?;
Ok(Self(file, spill_metrics.clone()))
Ok(Self(file, spill_metrics.clone(), Some(file_name)))
} else {
let file = tempfile::tempfile()?;
Ok(Self(file, spill_metrics.clone()))
Ok(Self(file, spill_metrics.clone(), None))
}
}
}
Expand Down Expand Up @@ -145,7 +147,15 @@ impl Drop for FileSpill {
self.1.disk_spill_size.add(self.0.len() as usize);
self.1
.disk_spill_iotime
.add_duration(Duration::from_nanos(self.1.mem_spill_iotime.value() as u64))
.add_duration(Duration::from_nanos(self.1.mem_spill_iotime.value() as u64));
if let Some(file_path) = &self.2 {
if let Err(e) = fs::remove_file(file_path) {
warn!(
"Was unable to delete spill file: {}. error: {}",
file_path, e
);
}
}
}
}

Expand Down

0 comments on commit 2f3aed3

Please sign in to comment.