From 41351c0d1f394015536d9df2f10bd89b467fbf57 Mon Sep 17 00:00:00 2001 From: ustcfy <96854327+ustcfy@users.noreply.github.com> Date: Fri, 27 Sep 2024 09:37:02 +0800 Subject: [PATCH] Fix FileAlreadyExistsException in LORE dump process (#11484) * Updated parameters to enable file overwriting when dumping. Signed-off-by: ustcfy * Validate LORE dump root path before execution Signed-off-by: ustcfy * Add loreOutputRootPathChecked map for tracking lore output root path checks. Signed-off-by: ustcfy * Delay path and filesystem initialization until actually needed. Signed-off-by: ustcfy * Add test and update dev/lore.md doc. Signed-off-by: ustcfy * Format code to ensure line length does not exceed 100 characters Signed-off-by: ustcfy * Format code to ensure line length does not exceed 100 characters Signed-off-by: ustcfy * Improved resource management by using withResource. Signed-off-by: ustcfy * Update docs/dev/lore.md Co-authored-by: Renjie Liu * Improved resource management by using withResource. Signed-off-by: ustcfy * Removed for FileSystem instance. Signed-off-by: ustcfy * Update docs/dev/lore.md Co-authored-by: Gera Shegalov --------- Signed-off-by: ustcfy Signed-off-by: ustcfy Co-authored-by: Renjie Liu Co-authored-by: Gera Shegalov --- docs/dev/lore.md | 5 ++- .../nvidia/spark/rapids/lore/GpuLore.scala | 42 +++++++++++++------ .../com/nvidia/spark/rapids/lore/dump.scala | 2 +- .../spark/rapids/lore/GpuLoreSuite.scala | 21 ++++++++++ 4 files changed, 56 insertions(+), 14 deletions(-) diff --git a/docs/dev/lore.md b/docs/dev/lore.md index 4a7725b4bfd..b155df54052 100644 --- a/docs/dev/lore.md +++ b/docs/dev/lore.md @@ -38,7 +38,10 @@ partitions. You also need to set `spark.rapids.sql.lore.dumpPath` to tell LORE where to dump the data, the value of which should point to a directory. All dumped data of a query will live in this -directory. A typical directory hierarchy would look like this: +directory. Note, the directory may either not exist, in which case it will be created, or it should be empty. +If the directory exists and contains files, an `IllegalArgumentException` will be thrown to prevent overwriting existing data. + +A typical directory hierarchy would look like this: ```console + loreId-10/ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala index a51a1e13a5e..312b277f077 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -89,23 +89,21 @@ object GpuLore { } def dumpObject[T: ClassTag](obj: T, path: Path, hadoopConf: Configuration): Unit = { - withResource(path.getFileSystem(hadoopConf)) { fs => - withResource(fs.create(path, false)) { fout => - val serializerStream = SparkEnv.get.serializer.newInstance().serializeStream(fout) - withResource(serializerStream) { ser => - ser.writeObject(obj) - } + val fs = path.getFileSystem(hadoopConf) + withResource(fs.create(path, true)) { fout => + val serializerStream = SparkEnv.get.serializer.newInstance().serializeStream(fout) + withResource(serializerStream) { ser => + ser.writeObject(obj) } } } def loadObject[T: ClassTag](path: Path, hadoopConf: Configuration): T = { - withResource(path.getFileSystem(hadoopConf)) { fs => - withResource(fs.open(path)) { fin => - val serializerStream = SparkEnv.get.serializer.newInstance().deserializeStream(fin) - withResource(serializerStream) { ser => - ser.readObject().asInstanceOf[T] - } + val fs = path.getFileSystem(hadoopConf) + withResource(fs.open(path)) { fin => + val serializerStream = SparkEnv.get.serializer.newInstance().deserializeStream(fin) + withResource(serializerStream) { ser => + ser.readObject().asInstanceOf[T] } } } @@ -186,6 +184,12 @@ object GpuLore { idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement() } } + /** + * Executions that have checked the lore output root path. + * Key is [[SQLExecution.EXECUTION_ID_KEY]]. + */ + private val loreOutputRootPathChecked: ConcurrentHashMap[String, Boolean] = + new ConcurrentHashMap[String, Boolean]() def tagForLore(sparkPlan: SparkPlan, rapidsConf: RapidsConf): SparkPlan = { val loreDumpIds = rapidsConf.loreDumpIds @@ -197,6 +201,20 @@ object GpuLore { s"when ${RapidsConf.LORE_DUMP_IDS.key} is set.")) val spark = SparkShimImpl.sessionFromPlan(sparkPlan) + + Option(spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).foreach { + executionId => + loreOutputRootPathChecked.computeIfAbsent(executionId, _ => { + val path = new Path(loreOutputRootPath) + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + if (fs.exists(path) && fs.listStatus(path).nonEmpty) { + throw new IllegalArgumentException( + s"LORE dump path $loreOutputRootPath already exists and is not empty.") + } + true + }) + } + val hadoopConf = { val sc = spark.sparkContext sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala index 1b9967e1bf4..ee0c7a7bd7a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala @@ -72,7 +72,7 @@ class GpuLoreDumpRDD(info: LoreDumpRDDInfo, input: RDD[ColumnarBatch]) private def dumpCurrentBatch(): ColumnarBatch = { val outputPath = pathOfBatch(split.index, batchIdx) val outputStream = outputPath.getFileSystem(info.hadoopConf.value.value) - .create(outputPath, false) + .create(outputPath, true) DumpUtils.dumpToParquet(nextBatch.get, outputStream) nextBatch.get } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala index 7db46718e89..057fbd7ecc3 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids.lore import com.nvidia.spark.rapids.{FunSuiteWithTempDir, GpuColumnarToRowExec, RapidsConf, SparkQueryCompareTestSuite} +import com.nvidia.spark.rapids.Arm.withResource import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -147,6 +148,26 @@ class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir w } } + test("Non-empty lore dump path") { + withGpuSparkSession{ spark => + spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) + spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "3[*]") + + //Create a file in the root path + val path = new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/test") + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + withResource(fs.create(path, true)) { _ => + } + + val df = spark.range(0, 1000, 1, 100) + .selectExpr("id % 10 as key", "id % 100 as value") + + assertThrows[IllegalArgumentException] { + df.collect() + } + } + } + private def doTestReplay(loreDumpIds: String)(dfFunc: SparkSession => DataFrame) = { val loreId = OutputLoreId.parse(loreDumpIds).head._1 withGpuSparkSession { spark =>