From 20db73c8fc3ab47a01ecba67aeb4b37138d9cf26 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Sat, 17 Apr 2021 11:35:48 -0700 Subject: [PATCH] Remove unused symbols. Contributes to #2109 Signed-off-by: Gera Shegalov --- .../tests/datasourcev2/TestingV2Source.scala | 5 +-- .../spark/rapids/StringFunctionSuite.scala | 2 +- .../GpuBroadcastNestedLoopJoinExec.scala | 2 +- .../GpuBroadcastNestedLoopJoinExec.scala | 2 +- .../ParquetCachedBatchSerializer.scala | 18 ++++---- .../spark311/GpuInMemoryTableScanExec.scala | 2 - .../rapids/AutoCloseColumnBatchIterator.scala | 2 +- .../com/nvidia/spark/rapids/GpuCast.scala | 5 +-- .../rapids/GpuColumnarBatchSerializer.scala | 2 +- .../nvidia/spark/rapids/GpuParquetScan.scala | 7 ++- .../nvidia/spark/rapids/GpuPartitioning.scala | 5 +-- .../spark/rapids/GpuRowToColumnarExec.scala | 4 +- .../spark/rapids/HostColumnarToGpu.scala | 7 ++- .../spark/rapids/RapidsBufferStore.scala | 7 +-- .../com/nvidia/spark/rapids/RapidsConf.scala | 4 +- .../nvidia/spark/rapids/RapidsDiskStore.scala | 2 +- .../spark/rapids/RapidsHostMemoryStore.scala | 2 +- .../rapids/ShuffleReceivedBufferCatalog.scala | 3 +- ...CreateDataSourceTableAsSelectCommand.scala | 2 +- .../sql/rapids/GpuFileFormatWriter.scala | 2 +- .../spark/sql/rapids/GpuShuffleEnv.scala | 2 +- .../sql/rapids/datetimeExpressions.scala | 5 +-- .../GpuBroadcastNestedLoopJoinExec.scala | 1 - .../InternalColumnarRddConverter.scala | 2 +- .../spark/sql/rapids/mathExpressions.scala | 2 +- .../spark/rapids/AdaptiveQueryExecSuite.scala | 1 - .../nvidia/spark/rapids/AnsiCastOpSuite.scala | 4 +- .../rapids/CostBasedOptimizerSuite.scala | 44 ++++++++----------- .../com/nvidia/spark/rapids/FuzzerUtils.scala | 5 +-- .../spark/rapids/GpuBatchUtilsSuite.scala | 2 +- .../spark/rapids/ImplicitsTestSuite.scala | 2 +- .../nvidia/spark/rapids/MetaUtilsSuite.scala | 2 - .../rapids/SparkQueryCompareTestSuite.scala | 8 ++-- .../shuffle/RapidsShuffleClientSuite.scala | 2 +- .../shuffle/RapidsShuffleTestHelper.scala | 3 +- .../shuffle/WindowedBlockIteratorSuite.scala | 2 +- .../spark/udf/CatalystExpressionBuilder.scala | 10 ++--- .../com/nvidia/spark/udf/Instruction.scala | 35 +++++++-------- .../nvidia/spark/udf/LambdaReflection.scala | 6 +-- .../scala/com/nvidia/spark/udf/Plugin.scala | 2 +- .../scala/com/nvidia/spark/OpcodeSuite.scala | 40 ++++++++--------- 41 files changed, 111 insertions(+), 154 deletions(-) diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/datasourcev2/TestingV2Source.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/datasourcev2/TestingV2Source.scala index d84e4fdfce6..0af617c678d 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/datasourcev2/TestingV2Source.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/datasourcev2/TestingV2Source.scala @@ -126,7 +126,7 @@ class ArrowColumnarDataSourceV2 extends TestingV2Source { } override def createReaderFactory(): PartitionReaderFactory = { - new ColumnarReaderFactory(options) + new ColumnarReaderFactory } } @@ -137,8 +137,7 @@ class ArrowColumnarDataSourceV2 extends TestingV2Source { } } -class ColumnarReaderFactory(options: CaseInsensitiveStringMap) - extends PartitionReaderFactory { +class ColumnarReaderFactory extends PartitionReaderFactory { private final val BATCH_SIZE = 20 override def supportColumnarReads(partition: InputPartition): Boolean = true diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala index c488db0791a..5040cded8e4 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala @@ -193,7 +193,7 @@ class StringOperatorsSuite extends SparkQueryCompareTestSuite { class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite { def generateResults(gen : org.apache.spark.sql.Column => org.apache.spark.sql.Column): (Array[Row], Array[Row]) = { - val (testConf, qualifiedTestName) = setupTestConfAndQualifierName("", true, false, + val (testConf, _) = setupTestConfAndQualifierName("", true, false, new SparkConf(), Seq.empty, 0.0, false, false) runOnCpuAndGpu(TestCodepoints.validCodepointCharsDF, frame => frame.select(gen(col("strings"))), testConf) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastNestedLoopJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastNestedLoopJoinExec.scala index 85d53aa8172..1c2e359cc70 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastNestedLoopJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastNestedLoopJoinExec.scala @@ -34,7 +34,7 @@ case class GpuBroadcastNestedLoopJoinExec( joinType: JoinType, condition: Option[Expression], targetSizeBytes: Long) - extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition, + extends GpuBroadcastNestedLoopJoinExecBase(left, right, joinType, condition, targetSizeBytes) { def getGpuBuildSide: GpuBuildSide = { diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastNestedLoopJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastNestedLoopJoinExec.scala index d027cd41b61..350ec44d3a1 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastNestedLoopJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastNestedLoopJoinExec.scala @@ -34,7 +34,7 @@ case class GpuBroadcastNestedLoopJoinExec( joinType: JoinType, condition: Option[Expression], targetSizeBytes: Long) - extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition, + extends GpuBroadcastNestedLoopJoinExecBase(left, right, joinType, condition, targetSizeBytes) { def getGpuBuildSide: GpuBuildSide = { diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala index 929c76896c6..59524d5972b 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala @@ -307,7 +307,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { def putOnGpuIfNeeded(batch: ColumnarBatch): ColumnarBatch = { if (!batch.column(0).isInstanceOf[GpuColumnVector]) { val s: StructType = schema.toStructType - val gpuCB = new GpuColumnarBatchBuilder(s, batch.numRows(), batch).build(batch.numRows()) + val gpuCB = new GpuColumnarBatchBuilder(s, batch.numRows()).build(batch.numRows()) batch.close() gpuCB } else { @@ -1137,7 +1137,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { private def getSupportedSchemaFromUnsupported( cachedAttributes: Seq[Attribute], requestedAttributes: Seq[Attribute] = Seq.empty): (Seq[Attribute], Seq[Attribute]) = { - def getSupportedDataType(dataType: DataType, nullable: Boolean = true): DataType = { + def getSupportedDataType(dataType: DataType): DataType = { dataType match { case CalendarIntervalType => intervalStructType @@ -1147,19 +1147,19 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { val newStructType = StructType( s.map { field => StructField(field.name, - getSupportedDataType(field.dataType, field.nullable), field.nullable, + getSupportedDataType(field.dataType), field.nullable, field.metadata) }) mapping.put(s, newStructType) newStructType case a@ArrayType(elementType, nullable) => val newArrayType = - ArrayType(getSupportedDataType(elementType, nullable), nullable) + ArrayType(getSupportedDataType(elementType), nullable) mapping.put(a, newArrayType) newArrayType case m@MapType(keyType, valueType, nullable) => - val newKeyType = getSupportedDataType(keyType, nullable) - val newValueType = getSupportedDataType(valueType, nullable) + val newKeyType = getSupportedDataType(keyType) + val newValueType = getSupportedDataType(valueType) val mapType = MapType(newKeyType, newValueType, nullable) mapping.put(m, mapType) mapType @@ -1180,13 +1180,13 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { AttributeReference(attribute.name, DataTypes.ByteType, nullable = true, metadata = attribute.metadata)(attribute.exprId).asInstanceOf[Attribute] case s: StructType => - AttributeReference(attribute.name, getSupportedDataType(s, attribute.nullable), + AttributeReference(attribute.name, getSupportedDataType(s), attribute.nullable, attribute.metadata)(attribute.exprId) case a: ArrayType => - AttributeReference(attribute.name, getSupportedDataType(a, attribute.nullable), + AttributeReference(attribute.name, getSupportedDataType(a), attribute.nullable, attribute.metadata)(attribute.exprId) case m: MapType => - AttributeReference(attribute.name, getSupportedDataType(m, attribute.nullable), + AttributeReference(attribute.name, getSupportedDataType(m), attribute.nullable, attribute.metadata)(attribute.exprId) case _ => attribute diff --git a/shims/spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala b/shims/spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala index 04f35dead25..bd04fc5f2ae 100644 --- a/shims/spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala +++ b/shims/spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala @@ -97,8 +97,6 @@ case class GpuInMemoryTableScanExec( lazy val readPartitions = sparkContext.longAccumulator lazy val readBatches = sparkContext.longAccumulator - private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning - private def filteredCachedBatches() = { // Right now just return the batch without filtering relation.cacheBuilder.cachedColumnBuffers diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AutoCloseColumnBatchIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AutoCloseColumnBatchIterator.scala index 20b26249c74..ebf1a8cb86f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AutoCloseColumnBatchIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AutoCloseColumnBatchIterator.scala @@ -39,7 +39,7 @@ class AutoCloseColumnBatchIterator[U](itr: Iterator[U], nextBatch: Iterator[U] = } } - TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => { + TaskContext.get().addTaskCompletionListener[Unit]((_: TaskContext) => { closeCurrentBatch() }) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 620bbc6de66..e576116e011 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -1227,7 +1227,7 @@ case class GpuCast( if (to.scale <= from.scale) { if (!isFrom32Bit && isTo32Bit) { // check for overflow when 64bit => 32bit - withResource(checkForOverflow(input, from, to, isFrom32Bit)) { checkedInput => + withResource(checkForOverflow(input, to, isFrom32Bit)) { checkedInput => castCheckedDecimal(checkedInput) } } else { @@ -1240,7 +1240,7 @@ case class GpuCast( } } else { // from.scale > to.scale - withResource(checkForOverflow(input, from, to, isFrom32Bit)) { checkedInput => + withResource(checkForOverflow(input, to, isFrom32Bit)) { checkedInput => castCheckedDecimal(checkedInput) } } @@ -1248,7 +1248,6 @@ case class GpuCast( def checkForOverflow( input: ColumnVector, - from: DecimalType, to: DecimalType, isFrom32Bit: Boolean): ColumnVector = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index f1454a4a378..5fa3b6aabde 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -145,7 +145,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se new Iterator[(Int, ColumnarBatch)] with Arm { var toBeReturned: Option[ColumnarBatch] = None - TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => { + TaskContext.get().addTaskCompletionListener[Unit]((_: TaskContext) => { toBeReturned.foreach(_.close()) toBeReturned = None dIn.close() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 90f627c8103..27dd95c54d2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -357,7 +357,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( return uri } } catch { - case e: URISyntaxException => + case _: URISyntaxException => } new File(path).getAbsoluteFile().toURI() } @@ -1089,7 +1089,6 @@ class MultiFileParquetPartitionReader( // and concatenate those together then go to the next column for ((field, colIndex) <- partitionSchema.fields.zipWithIndex) { val dataType = field.dataType - val partitionColumns = new Array[GpuColumnVector](inPartitionValues.size) withResource(new Array[GpuColumnVector](inPartitionValues.size)) { partitionColumns => for ((rowsInPart, partIndex) <- rowsPerPartition.zipWithIndex) { @@ -1565,7 +1564,7 @@ class MultiFileCloudParquetPartitionReader( // in cases close got called early for like limit() calls isDone = true currentFileHostBuffers.foreach { current => - current.memBuffersAndSizes.foreach { case (buf, size) => + current.memBuffersAndSizes.foreach { case (buf, _) => if (buf != null) { buf.close() } @@ -1576,7 +1575,7 @@ class MultiFileCloudParquetPartitionReader( batch = None tasks.asScala.foreach { task => if (task.isDone()) { - task.get.memBuffersAndSizes.foreach { case (buf, size) => + task.get.memBuffersAndSizes.foreach { case (buf, _) => if (buf != null) { buf.close() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 729ec535f2d..0ddb0965de3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -53,7 +53,7 @@ trait GpuPartitioning extends Partitioning with Arm { val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*)) GpuShuffleEnv.rapidsShuffleCodec match { case Some(codec) => - compressSplits(splits, codec, contiguousTables, dataTypes) + compressSplits(splits, codec, contiguousTables) case None => // GpuPackedTableColumn takes ownership of the contiguous tables closeOnExcept(contiguousTables) { cts => @@ -127,8 +127,7 @@ trait GpuPartitioning extends Partitioning with Arm { def compressSplits( outputBatches: ArrayBuffer[ColumnarBatch], codec: TableCompressionCodec, - contiguousTables: Array[ContiguousTable], - dataTypes: Array[DataType]): Unit = { + contiguousTables: Array[ContiguousTable]): Unit = { withResource(codec.createBatchCompressor(maxCompressionBatchSize, Cuda.DEFAULT_STREAM)) { compressor => // tracks batches with no data and the corresponding output index for the batch diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index 09de317bae0..9beb0602008 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -58,7 +58,7 @@ private class GpuRowToColumnConverter(schema: StructType) extends Serializable w */ final def convertBatch(rows: Array[InternalRow], schema: StructType): ColumnarBatch = { val numRows = rows.length - val builders = new GpuColumnarBatchBuilder(schema, numRows, null) + val builders = new GpuColumnarBatchBuilder(schema, numRows) rows.foreach(convert(_, builders)) builders.build(numRows) } @@ -585,7 +585,7 @@ class RowToColumnarIterator( } } - val builders = new GpuColumnarBatchBuilder(localSchema, targetRows, null) + val builders = new GpuColumnarBatchBuilder(localSchema, targetRows) try { var rowCount = 0 // Double because validity can be < 1 byte, and this is just an estimate anyways diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index a33f04f749c..988d7e2cfa4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -67,7 +67,6 @@ object HostColumnarToGpu extends Logging { def arrowColumnarCopy( cv: ColumnVector, ab: ai.rapids.cudf.ArrowColumnBuilder, - nullable: Boolean, rows: Int): ju.List[ReferenceManager] = { val valVector = cv match { case v: ArrowColumnVector => @@ -100,7 +99,7 @@ object HostColumnarToGpu extends Logging { try { offsets = getBufferAndAddReference(ShimLoader.getSparkShims.getArrowOffsetsBuf(valVector)) } catch { - case e: UnsupportedOperationException => + case _: UnsupportedOperationException => // swallow the exception and assume no offsets buffer } ab.addBatch(rows, nullCount, dataBuf, validity, offsets) @@ -315,10 +314,10 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch], (batch.column(0).isInstanceOf[ArrowColumnVector] || batch.column(0).isInstanceOf[AccessibleArrowColumnVector])) { logDebug("Using GpuArrowColumnarBatchBuilder") - batchBuilder = new GpuColumnVector.GpuArrowColumnarBatchBuilder(schema, batchRowLimit, batch) + batchBuilder = new GpuColumnVector.GpuArrowColumnarBatchBuilder(schema) } else { logDebug("Using GpuColumnarBatchBuilder") - batchBuilder = new GpuColumnVector.GpuColumnarBatchBuilder(schema, batchRowLimit, null) + batchBuilder = new GpuColumnVector.GpuColumnarBatchBuilder(schema, batchRowLimit) } totalRows = 0 } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 2ee9381897d..c2e94b83498 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -40,8 +40,7 @@ object RapidsBufferStore { */ abstract class RapidsBufferStore( val tier: StorageTier, - catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton, - deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage) + catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) extends AutoCloseable with Logging with Arm { val name: String = tier.toString @@ -63,10 +62,6 @@ abstract class RapidsBufferStore( totalBytesStored += buffer.size } - def get(id: RapidsBufferId): RapidsBufferBase = synchronized { - buffers.get(id) - } - def remove(id: RapidsBufferId): Unit = synchronized { val obj = buffers.remove(id) if (obj != null) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 7ac472d1ee4..da56a0e15f5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -76,7 +76,7 @@ object ConfHelper { v.map(stringConverter).mkString(",") } - def byteFromString(str: String, unit: ByteUnit, key: String): Long = { + def byteFromString(str: String, unit: ByteUnit): Long = { val (input, multiplier) = if (str.length() > 0 && str.charAt(0) == '-') { (str.substring(1), -1) @@ -265,7 +265,7 @@ class ConfBuilder(val key: String, val register: ConfEntry[_] => Unit) { } def bytesConf(unit: ByteUnit): TypedConfBuilder[Long] = { - new TypedConfBuilder[Long](this, byteFromString(_, unit, key)) + new TypedConfBuilder[Long](this, byteFromString(_, unit)) } def integerConf: TypedConfBuilder[Integer] = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala index 97de1b564f9..41a2700c389 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala @@ -31,7 +31,7 @@ class RapidsDiskStore( diskBlockManager: RapidsDiskBlockManager, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton, deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage) - extends RapidsBufferStore(StorageTier.DISK, catalog, deviceStorage) { + extends RapidsBufferStore(StorageTier.DISK, catalog) { private[this] val sharedBufferFiles = new ConcurrentHashMap[RapidsBufferId, File] override protected def createBuffer(incoming: RapidsBuffer, incomingBuffer: MemoryBuffer, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index 5f90820dda3..0602b02ce8f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -31,7 +31,7 @@ class RapidsHostMemoryStore( maxSize: Long, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton, deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage) - extends RapidsBufferStore(StorageTier.HOST, catalog, deviceStorage) { + extends RapidsBufferStore(StorageTier.HOST, catalog) { private[this] val pool = HostMemoryBuffer.allocate(maxSize, false) private[this] val addressAllocator = new AddressSpaceAllocator(maxSize) private[this] var haveLoggedMaxExceeded = false diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShuffleReceivedBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShuffleReceivedBufferCatalog.scala index c602353d80a..8e520d96fb4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShuffleReceivedBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShuffleReceivedBufferCatalog.scala @@ -46,8 +46,7 @@ case class ShuffleReceivedBufferId( /** Catalog for lookup of shuffle buffers by block ID */ class ShuffleReceivedBufferCatalog( - catalog: RapidsBufferCatalog, - diskBlockManager: RapidsDiskBlockManager) extends Logging { + catalog: RapidsBufferCatalog) extends Logging { /** Mapping of table ID to shuffle buffer ID */ private[this] val tableMap = new ConcurrentHashMap[Int, ShuffleReceivedBufferId] diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCreateDataSourceTableAsSelectCommand.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCreateDataSourceTableAsSelectCommand.scala index 505471ef411..7c1b76b35b6 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCreateDataSourceTableAsSelectCommand.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCreateDataSourceTableAsSelectCommand.scala @@ -81,7 +81,7 @@ case class GpuCreateDataSourceTableAsSelectCommand( sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) result match { - case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && + case _: HadoopFsRelation if table.partitionColumnNames.nonEmpty && sparkSession.sqlContext.conf.manageFilesourcePartitions => // Need to recover partitions into the metastore so our saved data is visible. sessionState.executePlan( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index b537511ccd8..196d4f06970 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -127,7 +127,7 @@ object GpuFileFormatWriter extends Logging { val empty2NullPlan = if (needConvert) GpuProjectExec(projectList, plan) else plan - val bucketIdExpression = bucketSpec.map { spec => + val bucketIdExpression = bucketSpec.map { _ => // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can // guarantee the data distribution is same between shuffle and bucketed data source, which // enables us to only shuffle one side when join a bucketed table and a normal one. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index 768d8352782..40da1bd67bc 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -49,7 +49,7 @@ class GpuShuffleEnv(rapidsConf: RapidsConf) extends Logging { shuffleCatalog = new ShuffleBufferCatalog(RapidsBufferCatalog.singleton, diskBlockManager) shuffleReceivedBufferCatalog = - new ShuffleReceivedBufferCatalog(RapidsBufferCatalog.singleton, diskBlockManager) + new ShuffleReceivedBufferCatalog(RapidsBufferCatalog.singleton) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 2dc0b110f42..354ba68caa9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -425,7 +425,6 @@ object GpuToTimestamp extends Arm { lhs: GpuColumnVector, sparkFormat: String, strfFormat: String, - timeParserPolicy: TimeParserPolicy, dtype: DType, daysScalar: String => Scalar, asTimestamp: (ColumnVector, String) => ColumnVector): ColumnVector = { @@ -441,7 +440,7 @@ object GpuToTimestamp extends Arm { withResource(daysEqual(lhs.getBase, DateUtils.TODAY)) { isToday => withResource(daysEqual(lhs.getBase, DateUtils.YESTERDAY)) { isYesterday => withResource(daysEqual(lhs.getBase, DateUtils.TOMORROW)) { isTomorrow => - withResource(lhs.getBase.isNull) { isNull => + withResource(lhs.getBase.isNull) { _ => withResource(Scalar.fromNull(dtype)) { nullValue => withResource(asTimestamp(lhs.getBase, strfFormat)) { converted => withResource(daysScalar(DateUtils.EPOCH)) { epoch => @@ -517,7 +516,6 @@ abstract class GpuToTimestamp lhs, sparkFormat, strfFormat, - timeParserPolicy, DType.TIMESTAMP_MICROSECONDS, daysScalarMicros, (col, strfFormat) => col.asTimestampMicroseconds(strfFormat)) @@ -560,7 +558,6 @@ abstract class GpuToTimestampImproved extends GpuToTimestamp { lhs, sparkFormat, strfFormat, - timeParserPolicy, DType.TIMESTAMP_SECONDS, daysScalarSeconds, (col, strfFormat) => col.asTimestampSeconds(strfFormat)) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index c931b2847ac..42a962dcd8d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -136,7 +136,6 @@ object GpuBroadcastNestedLoopJoinExecBase extends Arm { abstract class GpuBroadcastNestedLoopJoinExecBase( left: SparkPlan, right: SparkPlan, - join: BroadcastNestedLoopJoinExec, joinType: JoinType, condition: Option[Expression], targetSizeBytes: Long) extends BinaryExecNode with GpuExec { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala index d552d8d4822..06034fdb0c4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala @@ -404,7 +404,7 @@ private class ExternalRowToColumnarIterator( } } - val builders = new GpuColumnarBatchBuilder(localSchema, targetRows, null) + val builders = new GpuColumnarBatchBuilder(localSchema, targetRows) try { var rowCount = 0 var byteCount: Long = variableWidthColumnCount * 4 // offset bytes diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala index 6e423e907c5..a5b2dc62cfd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala @@ -396,7 +396,7 @@ abstract class GpuRoundBase(child: Expression, scale: Expression) extends GpuBin override def doColumnar(value: GpuColumnVector, scale: Scalar): ColumnVector = { val scaleVal = dataType match { - case DecimalType.Fixed(p, s) => s + case DecimalType.Fixed(_, s) => s case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => scale.getInt case _ => throw new IllegalArgumentException(s"Round operator doesn't support $dataType") } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala index fb50c32ad67..4c9df92ba21 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala @@ -104,7 +104,6 @@ class AdaptiveQueryExecSuite val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult( spark, "SELECT * FROM skewData1 join skewData2 ON key1 = key2") - val innerSmj = findTopLevelGpuShuffleHashJoin(innerAdaptivePlan) val shuffleExchanges = ShimLoader.getSparkShims .findOperators(innerAdaptivePlan, _.isInstanceOf[ShuffleQueryStageExec]) .map(_.asInstanceOf[ShuffleQueryStageExec]) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala index fdc8472f1de..594d9f3985d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala @@ -58,7 +58,7 @@ class AnsiCastOpSuite extends GpuExpressionTestSuite { //static seed val r = new Random(4135277987418063300L) - val seq = for (i <- 1 to 100) yield r.nextInt(2) match { + val seq = for (_ <- 1 to 100) yield r.nextInt(2) match { case 0 => new Timestamp((upperValid * r.nextDouble()).toLong) case 1 => new Timestamp((lowerValid * r.nextDouble()).toLong) } @@ -744,13 +744,11 @@ class AnsiCastOpSuite extends GpuExpressionTestSuite { private def testTimestamps = testData(DataTypes.TimestampType)(_) private def testDates = testData(DataTypes.DateType)(_) - private val HIVE_BOOL_SQL_TYPE = "BOOLEAN" private val HIVE_LONG_SQL_TYPE = "BIGINT" private val HIVE_INT_SQL_TYPE = "INT" private val HIVE_SHORT_SQL_TYPE = "SMALLINT" private val HIVE_BYTE_SQL_TYPE = "TINYINT" private val HIVE_FLOAT_SQL_TYPE = "FLOAT" - private val HIVE_DOUBLE_SQL_TYPE = "DOUBLE" private val HIVE_STRING_SQL_TYPE = "STRING" private def testData(dt: DataType)(spark: SparkSession) = { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala index 2965038afba..2c6d52fe2bd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala @@ -56,9 +56,9 @@ class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite with BeforeAndA val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() GpuOverrides.addListener( - (plan: SparkPlanMeta[SparkPlan], - sparkPlan: SparkPlan, - costOptimizations: Seq[Optimization]) => { + (_: SparkPlanMeta[SparkPlan], + _: SparkPlan, + costOptimizations: Seq[Optimization]) => { optimizations += costOptimizations }) @@ -113,9 +113,9 @@ class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite with BeforeAndA val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() GpuOverrides.addListener( - (plan: SparkPlanMeta[SparkPlan], - sparkPlan: SparkPlan, - costOptimizations: Seq[Optimization]) => { + (_: SparkPlanMeta[SparkPlan], + _: SparkPlan, + costOptimizations: Seq[Optimization]) => { optimizations += costOptimizations }) @@ -169,9 +169,9 @@ class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite with BeforeAndA val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() GpuOverrides.addListener( - (plan: SparkPlanMeta[SparkPlan], - sparkPlan: SparkPlan, - costOptimizations: Seq[Optimization]) => { + (_: SparkPlanMeta[SparkPlan], + _: SparkPlan, + costOptimizations: Seq[Optimization]) => { optimizations += costOptimizations }) @@ -210,9 +210,9 @@ class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite with BeforeAndA val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() GpuOverrides.addListener( - (plan: SparkPlanMeta[SparkPlan], - sparkPlan: SparkPlan, - costOptimizations: Seq[Optimization]) => { + (_: SparkPlanMeta[SparkPlan], + _: SparkPlan, + costOptimizations: Seq[Optimization]) => { optimizations += costOptimizations }) @@ -248,9 +248,9 @@ class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite with BeforeAndA val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() GpuOverrides.addListener( - (plan: SparkPlanMeta[SparkPlan], - sparkPlan: SparkPlan, - costOptimizations: Seq[Optimization]) => { + (_: SparkPlanMeta[SparkPlan], + _: SparkPlan, + costOptimizations: Seq[Optimization]) => { optimizations += costOptimizations }) @@ -281,9 +281,9 @@ class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite with BeforeAndA var optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() GpuOverrides.addListener( - (plan: SparkPlanMeta[SparkPlan], - sparkPlan: SparkPlan, - costOptimizations: Seq[Optimization]) => { + (_: SparkPlanMeta[SparkPlan], + _: SparkPlan, + costOptimizations: Seq[Optimization]) => { optimizations += costOptimizations }) @@ -409,12 +409,4 @@ class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite with BeforeAndA df } - private def addListener(optimizations: ListBuffer[Optimization]): Unit = { - GpuOverrides.addListener( - (plan: SparkPlanMeta[SparkPlan], - sparkPlan: SparkPlan, - costOptimizations: Seq[Optimization]) => { - optimizations.appendAll(costOptimizations) - }) - } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala b/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala index 6bf531a7bdb..a992ba2e219 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala @@ -65,12 +65,11 @@ object FuzzerUtils { def createColumnarBatch( schema: StructType, rowCount: Int, - maxStringLen: Int = 64, options: FuzzerOptions = DEFAULT_OPTIONS, seed: Long = 0): ColumnarBatch = { val rand = new Random(seed) val r = new EnhancedRandom(rand, options) - val builders = new GpuColumnarBatchBuilder(schema, rowCount, null) + val builders = new GpuColumnarBatchBuilder(schema, rowCount) schema.fields.zipWithIndex.foreach { case (field, i) => val builder = builders.builder(i) @@ -147,7 +146,7 @@ object FuzzerUtils { def createColumnarBatch(values: Seq[Option[Any]], dataType: DataType): ColumnarBatch = { val schema = createSchema(Seq(dataType)) val rowCount = values.length - val builders = new GpuColumnarBatchBuilder(schema, rowCount, null) + val builders = new GpuColumnarBatchBuilder(schema, rowCount) schema.fields.zipWithIndex.foreach { case (field, i) => val builder = builders.builder(i) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala index a3eceadb090..552722283b9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala @@ -170,7 +170,7 @@ class GpuBatchUtilsSuite extends FunSuite { * verify that the memory calculations are consistent with the numbers reported by CuDF. */ private def calculateGpuMemory(schema: StructType, rows: Array[InternalRow]): Long = { - val builders = new GpuColumnarBatchBuilder(schema, rows.length, null) + val builders = new GpuColumnarBatchBuilder(schema, rows.length) try { val converters = new GpuRowToColumnConverter(schema) rows.foreach(row => converters.convert(row, builders)) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ImplicitsTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ImplicitsTestSuite.scala index 0c121cefdf0..bfcc91b4d03 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ImplicitsTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ImplicitsTestSuite.scala @@ -196,7 +196,7 @@ class ImplicitsTestSuite extends FlatSpec with Matchers { val resources = (0 until 10).map(new RefCountTest(_, false)) val out = resources.toArray.zipWithIndex.safeMap { - case (res, i) => + case (res, _) => res.incRefCount() } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala index b6047459e26..13a311e0c09 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala @@ -149,8 +149,6 @@ class MetaUtilsSuite extends FunSuite with Arm { withResource(buildContiguousTable()) { contigTable => val origBuffer = contigTable.getBuffer val meta = MetaUtils.buildTableMeta(10, contigTable) - val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType, - DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 5)) withResource(origBuffer.sliceWithCopy(0, origBuffer.getLength)) { buffer => withResource(MetaUtils.getBatchFromMeta(buffer, meta, contiguousTableSparkTypes)) { batch => diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index 3be5caac1c1..b7534aac837 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -661,7 +661,7 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { } else if (cmp > 0) { return false } // else equal go on - case (o1, o2) => + case (o1, _) => throw new UnsupportedOperationException(o1.getClass + " is not supported yet") } } @@ -809,11 +809,9 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { def testUnaryFunction( conf: SparkConf, values: Seq[Any], - expectNull: Boolean = false, maxFloatDiff: Double = 0.0, sort: Boolean = false, - repart: Integer = 1, - sortBeforeRepart: Boolean = false)(fun: DataFrame => DataFrame): Unit = { + repart: Integer = 1)(fun: DataFrame => DataFrame): Unit = { val df: SparkSession => DataFrame = (sparkSession: SparkSession) => createDataFrame(sparkSession, values) @@ -843,7 +841,7 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { test(qualifiedTestName) { val t = Try({ - val fromGpu = withGpuSparkSession( session => { + withGpuSparkSession( session => { var data = df(session) if (repart > 0) { // repartition the data so it is turned into a projection, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala index 13e27c9355e..3edef3d7f9d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala @@ -106,7 +106,7 @@ class RapidsShuffleClientSuite extends RapidsShuffleTestHelper { val numRows = 100000 val numBatches = 3 - RapidsShuffleTestHelper.mockDegenerateMetaResponse(mockTransport, numRows, numBatches) + RapidsShuffleTestHelper.mockDegenerateMetaResponse(mockTransport, numBatches) // initialize metadata fetch client.doFetch(shuffleRequests.map(_._1), mockHandler) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala index 9871ad2acb5..b945bbe78c8 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala @@ -193,10 +193,9 @@ object RapidsShuffleTestHelper extends MockitoSugar with Arm { def mockDegenerateMetaResponse( mockTransport: RapidsShuffleTransport, - numRows: Long, numBatches: Int, maximumResponseSize: Long = 10000): Seq[TableMeta] = { - val tableMetas = (0 until numBatches).map(b => buildDegenerateMockTableMeta()) + val tableMetas = (0 until numBatches).map(_ => buildDegenerateMockTableMeta()) val res = ShuffleMetadata.buildMetaResponse(tableMetas, maximumResponseSize) val refCountedRes = new RefCountedDirectByteBuffer(res) when(mockTransport.getMetaBuffer(any())).thenReturn(refCountedRes) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala index 65211f89394..67cf775c4e4 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIteratorSuite.scala @@ -42,7 +42,7 @@ class WindowedBlockIteratorSuite extends RapidsShuffleTestHelper { } test ("1024 1-byte blocks all fit in 1 1024-byte window") { - val mockBlocks = (0 until 1024).map { i => + val mockBlocks = (0 until 1024).map { _ => val block = mock[BlockWithSize] when(block.size).thenReturn(1) block diff --git a/udf-compiler/src/main/scala/com/nvidia/spark/udf/CatalystExpressionBuilder.scala b/udf-compiler/src/main/scala/com/nvidia/spark/udf/CatalystExpressionBuilder.scala index 5084ce1e018..7ee7b1173e1 100644 --- a/udf-compiler/src/main/scala/com/nvidia/spark/udf/CatalystExpressionBuilder.scala +++ b/udf-compiler/src/main/scala/com/nvidia/spark/udf/CatalystExpressionBuilder.scala @@ -254,7 +254,7 @@ object CatalystExpressionBuilder extends Logging { val res = expr match { case And(Literal.TrueLiteral, c) => simplifyExpr(c) case And(c, Literal.TrueLiteral) => simplifyExpr(c) - case And(Literal.FalseLiteral, c) => Literal.FalseLiteral + case And(Literal.FalseLiteral, _) => Literal.FalseLiteral case And(c1@LessThan(s1, Literal(v1, t1)), c2@LessThan(s2, Literal(v2, t2))) if s1 == s2 && t1 == t2 => { t1 match { @@ -346,7 +346,7 @@ object CatalystExpressionBuilder extends Logging { } } case And(c1, c2) => And(simplifyExpr(c1), simplifyExpr(c2)) - case Or(Literal.TrueLiteral, c) => Literal.TrueLiteral + case Or(Literal.TrueLiteral, _) => Literal.TrueLiteral case Or(Literal.FalseLiteral, c) => simplifyExpr(c) case Or(c, Literal.FalseLiteral) => simplifyExpr(c) case Or(c1@GreaterThan(s1, Literal(v1, t1)), @@ -384,13 +384,13 @@ object CatalystExpressionBuilder extends Logging { Literal(0, _)) => simplifyExpr(And(Not(c1), c2)) case LessThanOrEqual(If(c1, Literal(1, _), - If(c2, + If(_, Literal(-1, _), Literal(0, _))), Literal(0, _)) => simplifyExpr(Not(c1)) case GreaterThan(If(c1, Literal(1, _), - If(c2, + If(_, Literal(-1, _), Literal(0, _))), Literal(0, _)) => c1 @@ -406,7 +406,7 @@ object CatalystExpressionBuilder extends Logging { Literal(-1, _), Literal(0, _))), Literal(0, _)) => simplifyExpr(And(Not(c1), Not(c2))) - case If(c, t, f) if t == f => t + case If(_, t, f) if t == f => t // JVMachine encodes boolean array components using 1 to represent true // and 0 to represent false (see // https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.3.4). diff --git a/udf-compiler/src/main/scala/com/nvidia/spark/udf/Instruction.scala b/udf-compiler/src/main/scala/com/nvidia/spark/udf/Instruction.scala index 733d23aec17..b8739df7343 100644 --- a/udf-compiler/src/main/scala/com/nvidia/spark/udf/Instruction.scala +++ b/udf-compiler/src/main/scala/com/nvidia/spark/udf/Instruction.scala @@ -349,7 +349,7 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend } private def pop(state: State): State = { - val State(locals, top :: rest, cond, expr) = state + val State(locals, _ :: rest, cond, expr) = state State(locals, rest, cond, expr) } @@ -404,7 +404,7 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend } private def checkcast(lambdaReflection: LambdaReflection, state: State): State = { - val State(locals, top :: rest, cond, expr) = state + val State(_, top :: _, _, _) = state val typeName = lambdaReflection.lookupClassName(operand) LambdaReflection.parseTypeSig(typeName).fold{ // Defer the check until top is actually used. @@ -419,19 +419,19 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend private def ifCmp(state: State, predicate: (Expression, Expression) => Expression): State = { - val State(locals, op2 :: op1 :: rest, cond, expr) = state + val State(locals, op2 :: op1 :: rest, cond, _) = state State(locals, rest, cond, Some(predicate(op1, op2))) } private def ifOp( state: State, predicate: Expression => Expression): State = { - val State(locals, top :: rest, cond, expr) = state + val State(locals, top :: rest, cond, _) = state State(locals, rest, cond, Some(predicate(top))) } private def switch(state: State): State = { - val State(locals, top :: rest, cond, expr) = state + val State(locals, top :: rest, cond, _) = state State(locals, rest, cond, Some(top)) } @@ -448,27 +448,27 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend // We support only some math and string methods. if (declaringClassName.equals("scala.math.package$")) { State(locals, - mathOp(lambdaReflection, method.getName, args) :: rest, + mathOp(method.getName, args) :: rest, cond, expr) } else if (declaringClassName.equals("scala.Predef$")) { State(locals, - predefOp(lambdaReflection, method.getName, args) :: rest, + predefOp(method.getName, args) :: rest, cond, expr) } else if (declaringClassName.equals("scala.Array$")) { State(locals, - arrayOp(lambdaReflection, method.getName, args) :: rest, + arrayOp(method.getName, args) :: rest, cond, expr) } else if (declaringClassName.equals("scala.reflect.ClassTag$")) { State(locals, - classTagOp(lambdaReflection, method.getName, args) :: rest, + classTagOp(method.getName, args) :: rest, cond, expr) } else if (declaringClassName.equals("scala.collection.mutable.ArrayBuffer$")) { State(locals, - arrayBufferOp(lambdaReflection, method.getName, args) :: rest, + arrayBufferOp(method.getName, args) :: rest, cond, expr) } else if (declaringClassName.equals("java.lang.Double")) { @@ -559,8 +559,7 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend } } - private def mathOp(lambdaReflection: LambdaReflection, - methodName: String, args: List[Expression]): Expression = { + private def mathOp(methodName: String, args: List[Expression]): Expression = { // Math unary functions if (args.length != 2) { throw new SparkException( @@ -598,8 +597,7 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend } } - private def predefOp(lambdaReflection: LambdaReflection, - methodName: String, args: List[Expression]): Expression = { + private def predefOp(methodName: String, args: List[Expression]): Expression = { // Make sure that the objref is scala.math.package$. args.head match { case getstatic: Repr.GetStatic => @@ -621,8 +619,7 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend } } - private def arrayOp(lambdaReflection: LambdaReflection, - methodName: String, args: List[Expression]): Expression = { + private def arrayOp(methodName: String, args: List[Expression]): Expression = { // Make sure that the objref is scala.math.package$. args.head match { case getstatic: Repr.GetStatic => @@ -663,8 +660,7 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend } } - private def classTagOp(lambdaReflection: LambdaReflection, - methodName: String, args: List[Expression]): Expression = { + private def classTagOp(methodName: String, args: List[Expression]): Expression = { // Make sure that the objref is scala.math.package$. args.head match { case getstatic: Repr.GetStatic => @@ -698,8 +694,7 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend } } - private def arrayBufferOp(lambdaReflection: LambdaReflection, - methodName: String, args: List[Expression]): Expression = { + private def arrayBufferOp(methodName: String, args: List[Expression]): Expression = { // Make sure that the objref is scala.math.package$. args.head match { case getstatic: Repr.GetStatic => diff --git a/udf-compiler/src/main/scala/com/nvidia/spark/udf/LambdaReflection.scala b/udf-compiler/src/main/scala/com/nvidia/spark/udf/LambdaReflection.scala index 57104545c2b..8169a3caf95 100644 --- a/udf-compiler/src/main/scala/com/nvidia/spark/udf/LambdaReflection.scala +++ b/udf-compiler/src/main/scala/com/nvidia/spark/udf/LambdaReflection.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.types._ // used by the rest of the compiler. // class LambdaReflection private(private val classPool: ClassPool, - private val ctClass: CtClass, private val ctMethod: CtMethod, val capturedArgs: Seq[Expression] = Seq()) { def lookupConstant(constPoolIndex: Int): Any = { @@ -63,7 +62,6 @@ class LambdaReflection private(private val classPool: ClassPool, val methodName = constPool.getInterfaceMethodrefName(constPoolIndex) val descriptor = constPool.getInterfaceMethodrefType(constPoolIndex) val className = constPool.getInterfaceMethodrefClassName(constPoolIndex) - val params = Descriptor.getParameterTypes(descriptor, classPool) classPool.getCtClass(className).getMethod(methodName, descriptor) } else { if (constPool.getTag(constPoolIndex) != ConstPool.CONST_Methodref) { @@ -140,13 +138,13 @@ object LambdaReflection { val lambdaImplName = serializedLambda.getImplMethodName ctClass.getDeclaredMethod(lambdaImplName.stripSuffix("$adapted")) } - new LambdaReflection(classPool, ctClass, ctMethod, capturedArgs) + new LambdaReflection(classPool, ctMethod, capturedArgs) } private def apply(ctMethod: CtMethod): LambdaReflection = { val ctClass = ctMethod.getDeclaringClass val classPool = ctClass.getClassPool - new LambdaReflection(classPool, ctClass, ctMethod) + new LambdaReflection(classPool, ctMethod) } def getClass(name: String): Class[_] = { diff --git a/udf-compiler/src/main/scala/com/nvidia/spark/udf/Plugin.scala b/udf-compiler/src/main/scala/com/nvidia/spark/udf/Plugin.scala index 98164e70ea1..1e3881c12dc 100644 --- a/udf-compiler/src/main/scala/com/nvidia/spark/udf/Plugin.scala +++ b/udf-compiler/src/main/scala/com/nvidia/spark/udf/Plugin.scala @@ -68,7 +68,7 @@ case class LogicalPlanRules() extends Rule[LogicalPlan] with Logging { exp } } catch { - case npe: NullPointerException => { + case _: NullPointerException => { exp } } diff --git a/udf-compiler/src/test/scala/com/nvidia/spark/OpcodeSuite.scala b/udf-compiler/src/test/scala/com/nvidia/spark/OpcodeSuite.scala index 048544483d4..551c7c1473f 100644 --- a/udf-compiler/src/test/scala/com/nvidia/spark/OpcodeSuite.scala +++ b/udf-compiler/src/test/scala/com/nvidia/spark/OpcodeSuite.scala @@ -200,7 +200,7 @@ class OpcodeSuite extends FunSuite { // tests for load and store operations, also cover +/-/* operators for int,long,double,float test("LLOAD_ odd") { val dataset = List((1,1L,1L)).toDF("x","y","z") - val myudf: (Int, Long, Long) => Long = (a,b,c) => { + val myudf: (Int, Long, Long) => Long = (_, b, c) => { (b+c)*c-b } val u = makeUdf(myudf) @@ -211,7 +211,7 @@ class OpcodeSuite extends FunSuite { test("DLOAD_ odd") { val dataset = List((1,1.0,1.0)).toDF("x","y","z") - val myudf: (Int, Double, Double) => Double = (a,b,c) => { + val myudf: (Int, Double, Double) => Double = (_, b, c) => { (b+c)*b-c } val u = makeUdf(myudf) @@ -267,7 +267,6 @@ class OpcodeSuite extends FunSuite { test("ISTORE_ all") { val myudf: () => Int = () => { var myInt : Int = 1 - var myInt2 : Int = 1 var myInt3 : Int = myInt var myInt4 : Int = myInt * myInt3 myInt4 @@ -293,7 +292,7 @@ class OpcodeSuite extends FunSuite { } test("DSTORE_ odd") { - val myudf: (Int) => Double = (a) => { + val myudf: (Int) => Double = (_) => { var myDoub : Double = 1.0 var myDoub2 : Double = 1.0 * myDoub myDoub2 @@ -306,7 +305,7 @@ class OpcodeSuite extends FunSuite { } test("ALOAD_0") { - val myudf: (String,String,String,String) => String = (a,b,c,d) => { + val myudf: (String,String,String,String) => String = (a, _, _, _) => { a } val dataset = List(("a","b","c","d")).toDF("x","y","z","w") @@ -317,7 +316,7 @@ class OpcodeSuite extends FunSuite { } test("ALOAD_1") { - val myudf: (String,String,String,String) => String = (a,b,c,d) => { + val myudf: (String,String,String,String) => String = (_, b, _, _) => { b } val dataset = List(("a","b","c","d")).toDF("x","y","z","w") @@ -328,7 +327,7 @@ class OpcodeSuite extends FunSuite { } test("ALOAD_2") { - val myudf: (String,String,String,String) => String = (a,b,c,d) => { + val myudf: (String,String,String,String) => String = (_, _, c, _) => { c } val dataset = List(("a","b","c","d")).toDF("x","y","z","w") @@ -339,7 +338,7 @@ class OpcodeSuite extends FunSuite { } test("ALOAD_3") { - val myudf: (String,String,String,String) => String = (a,b,c,d) => { + val myudf: (String,String,String,String) => String = (_, _, _, d) => { d } val dataset = List(("a","b","c","d")).toDF("x","y","z","w") @@ -391,7 +390,7 @@ class OpcodeSuite extends FunSuite { } test("LSTORE_3") { - val myudf: (Int, Long) => Long = (a,b) => { + val myudf: (Int, Long) => Long = (_, b) => { var myLong : Long = b myLong } @@ -515,7 +514,6 @@ class OpcodeSuite extends FunSuite { test("FSTORE_0, LSTORE_1") { val myudf: () => Float = () => { var myFloat : Float = 1.0f - var myLong : Long = 1L myFloat } val dataset = List(5.0f).toDS() @@ -538,7 +536,7 @@ class OpcodeSuite extends FunSuite { } test("ILOAD") { - val myudf: (Int, Int, Int, Int, Int, Long, Float, Double) => Int = (a,b,c,d,e,f,g,h) => { + val myudf: (Int, Int, Int, Int, Int, Long, Float, Double) => Int = (_, _, _, _, e, _, _, _) => { e } val u = makeUdf(myudf) @@ -551,8 +549,8 @@ class OpcodeSuite extends FunSuite { } test("LLOAD") { - val myudf: (Int, Int, Int, Int, Int, Long, Float, Double) => Long = (a,b,c,d,e,f,g,h) => { - f + val myudf: (Int, Int, Int, Int, Int, Long, Float, Double) => Long = { + (_, _, _, _, _, f, _, _) => f } val u = makeUdf(myudf) val dataset = List((1,2,3,4,5,1L,1.0f,1.0)).toDF("a","b","c","d","e","f","g","h") @@ -564,8 +562,8 @@ class OpcodeSuite extends FunSuite { } test("FLOAD") { - val myudf: (Int, Int, Int, Int, Int, Long, Float, Double) => Float = (a,b,c,d,e,f,g,h) => { - g + val myudf: (Int, Int, Int, Int, Int, Long, Float, Double) => Float = { + (_, _, _, _, _, _, g, _) => g } val u = makeUdf(myudf) val dataset = List((1,2,3,4,5,1L,1.0f,1.0)).toDF("a","b","c","d","e","f","g","h") @@ -577,8 +575,8 @@ class OpcodeSuite extends FunSuite { } test("DLOAD") { - val myudf: (Int, Int, Int, Int, Int, Long, Float, Double) => Double = (a,b,c,d,e,f,g,h) => { - h + val myudf: (Int, Int, Int, Int, Int, Long, Float, Double) => Double = { + (_, _, _, _, _, _, _, h) => h } val u = makeUdf(myudf) val dataset = List((1,2,3,4,5,1L,1.0f,1.0)).toDF("a","b","c","d","e","f","g","h") @@ -788,7 +786,7 @@ class OpcodeSuite extends FunSuite { } test("ALOAD opcode") { - val myudf: (Int, Int, Int, Int, String) => String = (a,b,c,d,e) => { + val myudf: (Int, Int, Int, Int, String) => String = (_, _, _, _, e) => { e } val u = makeUdf(myudf) @@ -1140,7 +1138,7 @@ class OpcodeSuite extends FunSuite { } test("float constant in a function call") { - val myudf: (Float) => Float = x => { + val myudf: (Float) => Float = _ => { val myFloat : Float = math.abs(-2.0f) myFloat } @@ -1152,7 +1150,7 @@ class OpcodeSuite extends FunSuite { } test("int constant in a function call") { - val myudf: (Int) => Int = x => { + val myudf: (Int) => Int = _ => { val myInt : Int = math.abs(-2) myInt } @@ -1419,7 +1417,7 @@ class OpcodeSuite extends FunSuite { test("FALLBACK TO CPU: loops") { val myudf: (Int, Int) => Int = (a,b) => { var myVar : Int = 0 - for (indexVar <- a to b){ + for (_ <- a to b){ myVar = myVar + 1 } myVar