From 6d51289a73f469371098988d2eb8cd527115c76f Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Sun, 24 Nov 2024 18:22:20 +0800 Subject: [PATCH] fix(spill): Explicitly delete spill file for FileBasedSpillBuf (#654) --- .../apache/spark/sql/blaze/memory/SpillBuf.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/memory/SpillBuf.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/memory/SpillBuf.scala index d68c3375c..5e0c4284e 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/memory/SpillBuf.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/memory/SpillBuf.scala @@ -15,13 +15,11 @@ */ package org.apache.spark.sql.blaze.memory -import java.io.RandomAccessFile +import java.io.{File, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.util - import org.apache.spark.internal.Logging - import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled @@ -85,11 +83,13 @@ class MemBasedSpillBuf extends SpillBuf with Logging { } } val endTimeNs = System.nanoTime - new FileBasedSpillBuf(channel, endTimeNs - startTimeNs) + new FileBasedSpillBuf(file, channel, endTimeNs - startTimeNs) } } -class FileBasedSpillBuf(fileChannel: FileChannel, var diskIOTimeNs: Long) extends SpillBuf { +class FileBasedSpillBuf(file: File, fileChannel: FileChannel, var diskIOTimeNs: Long) + extends SpillBuf + with Logging { private var readPosition: Long = 0 override def write(buf: ByteBuffer): Unit = { @@ -114,5 +114,8 @@ class FileBasedSpillBuf(fileChannel: FileChannel, var diskIOTimeNs: Long) extend override def release(): Unit = { fileChannel.close() + if (!file.delete()) { + logWarning(s"Was unable to delete spill file: ${file.getAbsolutePath}") + } } }