Skip to content

Commit

Permalink
[Spark 4.0] Account for PartitionedFileUtil.splitFiles signature ch…
Browse files Browse the repository at this point in the history
…ange. (#10857)

* Account for PartitionedFileUtil.splitFiles signature change.

Fixes #10299.

In Apache Spark 4.0, the signature of `PartitionedFileUtil.splitFiles` was changed
to remove unused parameters (apache/spark@eabea643c74).  This causes the Spark RAPIDS
plugin build to break with Spark 4.0.

This commit introduces a shim to account for the signature change.

Signed-off-by: MithunR <[email protected]>

* Common base for PartitionFileUtilsShims.

Signed-off-by: MithunR <[email protected]>

* Reusing existing PartitionedFileUtilsShims.

* More refactor, for pre-3.5 compile.

* Updated Copyright date.

* Fixed style error.

* Re-fixed the copyright year.

* Added missing import.

---------

Signed-off-by: MithunR <[email protected]>
  • Loading branch information
mythrocks authored May 31, 2024
1 parent 4024ef6 commit 822ad9b
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,7 @@
{"spark": "341"}
{"spark": "342"}
{"spark": "343"}
{"spark": "350"}
{"spark": "351"}
{"spark": "400"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile

object PartitionedFileUtilsShim {
// Wrapper for case class constructor so Java code can access
// the default values across Spark versions.
def newPartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long): PartitionedFile = PartitionedFile(partitionValues,
SparkPath.fromPathString(filePath), start, length)

def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = {
pf.copy(locations = locations.toArray)
}
}
object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "340"}
{"spark": "341"}
{"spark": "342"}
{"spark": "343"}
{"spark": "350"}
{"spark": "351"}
{"spark": "400"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile

trait PartitionedFileUtilsShimBase {

// Wrapper for case class constructor so Java code can access
// the default values across Spark versions.
def newPartitionedFile(partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long): PartitionedFile = PartitionedFile(partitionValues,
SparkPath.fromPathString(filePath), start, length)

def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = {
pf.copy(locations = locations.toArray)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,10 @@ spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile}

object PartitionedFileUtilsShim {
// Wrapper for case class constructor so Java code can access
Expand All @@ -37,4 +39,14 @@ object PartitionedFileUtilsShim {
def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = {
pf.copy(locations = locations)
}

// In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter.
// This pre-Spark-4.0 shim keeps the `sparkSession` parameter.
def splitFiles(sparkSession: SparkSession,
file: FileStatusWithMetadata,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.execution.rapids.shims

import com.nvidia.spark.rapids.shims.PartitionedFileUtilsShim
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory, PartitionedFile}

trait SplitFiles {
Expand All @@ -49,7 +49,7 @@ trait SplitFiles {

selectedPartitions.flatMap { partition =>
partition.files.flatMap { f =>
PartitionedFileUtil.splitFiles(
PartitionedFileUtilsShim.splitFiles(
sparkSession,
f,
isSplitable = canBeSplit(f.getPath, hadoopConf),
Expand All @@ -71,7 +71,7 @@ trait SplitFiles {
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
PartitionedFileUtilsShim.splitFiles(
sparkSession = relation.sparkSession,
file = file,
isSplitable = isSplitable,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "350"}
{"spark": "351"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile}

object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase {
// In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter.
// This pre-Spark-4.0 shim keeps the `sparkSession` parameter.
def splitFiles(sparkSession: SparkSession,
file: FileStatusWithMetadata,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "400"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile}

object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase {

// In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter.
// This Spark-4.0+ shim ignores the `sparkSession` parameter.
def splitFiles(sparkSession: SparkSession,
file: FileStatusWithMetadata,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
PartitionedFileUtil.splitFiles(file, isSplitable, maxSplitBytes, partitionValues)
}

} // object PartitionFileUtilsShim;

0 comments on commit 822ad9b

Please sign in to comment.