Skip to content

Commit

Permalink
Remove unused symbols. Contributes to NVIDIA#2109
Browse files Browse the repository at this point in the history
Signed-off-by: Gera Shegalov <[email protected]>
  • Loading branch information
gerashegalov committed Apr 17, 2021
1 parent 57afdc8 commit 20db73c
Show file tree
Hide file tree
Showing 41 changed files with 111 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class ArrowColumnarDataSourceV2 extends TestingV2Source {
}

override def createReaderFactory(): PartitionReaderFactory = {
new ColumnarReaderFactory(options)
new ColumnarReaderFactory
}
}

Expand All @@ -137,8 +137,7 @@ class ArrowColumnarDataSourceV2 extends TestingV2Source {
}
}

class ColumnarReaderFactory(options: CaseInsensitiveStringMap)
extends PartitionReaderFactory {
class ColumnarReaderFactory extends PartitionReaderFactory {
private final val BATCH_SIZE = 20

override def supportColumnarReads(partition: InputPartition): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class StringOperatorsSuite extends SparkQueryCompareTestSuite {
class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite {
def generateResults(gen : org.apache.spark.sql.Column => org.apache.spark.sql.Column):
(Array[Row], Array[Row]) = {
val (testConf, qualifiedTestName) = setupTestConfAndQualifierName("", true, false,
val (testConf, _) = setupTestConfAndQualifierName("", true, false,
new SparkConf(), Seq.empty, 0.0, false, false)
runOnCpuAndGpu(TestCodepoints.validCodepointCharsDF,
frame => frame.select(gen(col("strings"))), testConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class GpuBroadcastNestedLoopJoinExec(
joinType: JoinType,
condition: Option[Expression],
targetSizeBytes: Long)
extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition,
extends GpuBroadcastNestedLoopJoinExecBase(left, right, joinType, condition,
targetSizeBytes) {

def getGpuBuildSide: GpuBuildSide = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class GpuBroadcastNestedLoopJoinExec(
joinType: JoinType,
condition: Option[Expression],
targetSizeBytes: Long)
extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition,
extends GpuBroadcastNestedLoopJoinExecBase(left, right, joinType, condition,
targetSizeBytes) {

def getGpuBuildSide: GpuBuildSide = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
def putOnGpuIfNeeded(batch: ColumnarBatch): ColumnarBatch = {
if (!batch.column(0).isInstanceOf[GpuColumnVector]) {
val s: StructType = schema.toStructType
val gpuCB = new GpuColumnarBatchBuilder(s, batch.numRows(), batch).build(batch.numRows())
val gpuCB = new GpuColumnarBatchBuilder(s, batch.numRows()).build(batch.numRows())
batch.close()
gpuCB
} else {
Expand Down Expand Up @@ -1137,7 +1137,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
private def getSupportedSchemaFromUnsupported(
cachedAttributes: Seq[Attribute],
requestedAttributes: Seq[Attribute] = Seq.empty): (Seq[Attribute], Seq[Attribute]) = {
def getSupportedDataType(dataType: DataType, nullable: Boolean = true): DataType = {
def getSupportedDataType(dataType: DataType): DataType = {
dataType match {
case CalendarIntervalType =>
intervalStructType
Expand All @@ -1147,19 +1147,19 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
val newStructType = StructType(
s.map { field =>
StructField(field.name,
getSupportedDataType(field.dataType, field.nullable), field.nullable,
getSupportedDataType(field.dataType), field.nullable,
field.metadata)
})
mapping.put(s, newStructType)
newStructType
case a@ArrayType(elementType, nullable) =>
val newArrayType =
ArrayType(getSupportedDataType(elementType, nullable), nullable)
ArrayType(getSupportedDataType(elementType), nullable)
mapping.put(a, newArrayType)
newArrayType
case m@MapType(keyType, valueType, nullable) =>
val newKeyType = getSupportedDataType(keyType, nullable)
val newValueType = getSupportedDataType(valueType, nullable)
val newKeyType = getSupportedDataType(keyType)
val newValueType = getSupportedDataType(valueType)
val mapType = MapType(newKeyType, newValueType, nullable)
mapping.put(m, mapType)
mapType
Expand All @@ -1180,13 +1180,13 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
AttributeReference(attribute.name, DataTypes.ByteType, nullable = true,
metadata = attribute.metadata)(attribute.exprId).asInstanceOf[Attribute]
case s: StructType =>
AttributeReference(attribute.name, getSupportedDataType(s, attribute.nullable),
AttributeReference(attribute.name, getSupportedDataType(s),
attribute.nullable, attribute.metadata)(attribute.exprId)
case a: ArrayType =>
AttributeReference(attribute.name, getSupportedDataType(a, attribute.nullable),
AttributeReference(attribute.name, getSupportedDataType(a),
attribute.nullable, attribute.metadata)(attribute.exprId)
case m: MapType =>
AttributeReference(attribute.name, getSupportedDataType(m, attribute.nullable),
AttributeReference(attribute.name, getSupportedDataType(m),
attribute.nullable, attribute.metadata)(attribute.exprId)
case _ =>
attribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ case class GpuInMemoryTableScanExec(
lazy val readPartitions = sparkContext.longAccumulator
lazy val readBatches = sparkContext.longAccumulator

private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning

private def filteredCachedBatches() = {
// Right now just return the batch without filtering
relation.cacheBuilder.cachedColumnBuffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class AutoCloseColumnBatchIterator[U](itr: Iterator[U], nextBatch: Iterator[U] =
}
}

TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => {
TaskContext.get().addTaskCompletionListener[Unit]((_: TaskContext) => {
closeCurrentBatch()
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ case class GpuCast(
if (to.scale <= from.scale) {
if (!isFrom32Bit && isTo32Bit) {
// check for overflow when 64bit => 32bit
withResource(checkForOverflow(input, from, to, isFrom32Bit)) { checkedInput =>
withResource(checkForOverflow(input, to, isFrom32Bit)) { checkedInput =>
castCheckedDecimal(checkedInput)
}
} else {
Expand All @@ -1240,15 +1240,14 @@ case class GpuCast(
}
} else {
// from.scale > to.scale
withResource(checkForOverflow(input, from, to, isFrom32Bit)) { checkedInput =>
withResource(checkForOverflow(input, to, isFrom32Bit)) { checkedInput =>
castCheckedDecimal(checkedInput)
}
}
}

def checkForOverflow(
input: ColumnVector,
from: DecimalType,
to: DecimalType,
isFrom32Bit: Boolean): ColumnVector = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se
new Iterator[(Int, ColumnarBatch)] with Arm {
var toBeReturned: Option[ColumnarBatch] = None

TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => {
TaskContext.get().addTaskCompletionListener[Unit]((_: TaskContext) => {
toBeReturned.foreach(_.close())
toBeReturned = None
dIn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
return uri
}
} catch {
case e: URISyntaxException =>
case _: URISyntaxException =>
}
new File(path).getAbsoluteFile().toURI()
}
Expand Down Expand Up @@ -1089,7 +1089,6 @@ class MultiFileParquetPartitionReader(
// and concatenate those together then go to the next column
for ((field, colIndex) <- partitionSchema.fields.zipWithIndex) {
val dataType = field.dataType
val partitionColumns = new Array[GpuColumnVector](inPartitionValues.size)
withResource(new Array[GpuColumnVector](inPartitionValues.size)) {
partitionColumns =>
for ((rowsInPart, partIndex) <- rowsPerPartition.zipWithIndex) {
Expand Down Expand Up @@ -1565,7 +1564,7 @@ class MultiFileCloudParquetPartitionReader(
// in cases close got called early for like limit() calls
isDone = true
currentFileHostBuffers.foreach { current =>
current.memBuffersAndSizes.foreach { case (buf, size) =>
current.memBuffersAndSizes.foreach { case (buf, _) =>
if (buf != null) {
buf.close()
}
Expand All @@ -1576,7 +1575,7 @@ class MultiFileCloudParquetPartitionReader(
batch = None
tasks.asScala.foreach { task =>
if (task.isDone()) {
task.get.memBuffersAndSizes.foreach { case (buf, size) =>
task.get.memBuffersAndSizes.foreach { case (buf, _) =>
if (buf != null) {
buf.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait GpuPartitioning extends Partitioning with Arm {
val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*))
GpuShuffleEnv.rapidsShuffleCodec match {
case Some(codec) =>
compressSplits(splits, codec, contiguousTables, dataTypes)
compressSplits(splits, codec, contiguousTables)
case None =>
// GpuPackedTableColumn takes ownership of the contiguous tables
closeOnExcept(contiguousTables) { cts =>
Expand Down Expand Up @@ -127,8 +127,7 @@ trait GpuPartitioning extends Partitioning with Arm {
def compressSplits(
outputBatches: ArrayBuffer[ColumnarBatch],
codec: TableCompressionCodec,
contiguousTables: Array[ContiguousTable],
dataTypes: Array[DataType]): Unit = {
contiguousTables: Array[ContiguousTable]): Unit = {
withResource(codec.createBatchCompressor(maxCompressionBatchSize,
Cuda.DEFAULT_STREAM)) { compressor =>
// tracks batches with no data and the corresponding output index for the batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private class GpuRowToColumnConverter(schema: StructType) extends Serializable w
*/
final def convertBatch(rows: Array[InternalRow], schema: StructType): ColumnarBatch = {
val numRows = rows.length
val builders = new GpuColumnarBatchBuilder(schema, numRows, null)
val builders = new GpuColumnarBatchBuilder(schema, numRows)
rows.foreach(convert(_, builders))
builders.build(numRows)
}
Expand Down Expand Up @@ -585,7 +585,7 @@ class RowToColumnarIterator(
}
}

val builders = new GpuColumnarBatchBuilder(localSchema, targetRows, null)
val builders = new GpuColumnarBatchBuilder(localSchema, targetRows)
try {
var rowCount = 0
// Double because validity can be < 1 byte, and this is just an estimate anyways
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ object HostColumnarToGpu extends Logging {
def arrowColumnarCopy(
cv: ColumnVector,
ab: ai.rapids.cudf.ArrowColumnBuilder,
nullable: Boolean,
rows: Int): ju.List[ReferenceManager] = {
val valVector = cv match {
case v: ArrowColumnVector =>
Expand Down Expand Up @@ -100,7 +99,7 @@ object HostColumnarToGpu extends Logging {
try {
offsets = getBufferAndAddReference(ShimLoader.getSparkShims.getArrowOffsetsBuf(valVector))
} catch {
case e: UnsupportedOperationException =>
case _: UnsupportedOperationException =>
// swallow the exception and assume no offsets buffer
}
ab.addBatch(rows, nullCount, dataBuf, validity, offsets)
Expand Down Expand Up @@ -315,10 +314,10 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
(batch.column(0).isInstanceOf[ArrowColumnVector] ||
batch.column(0).isInstanceOf[AccessibleArrowColumnVector])) {
logDebug("Using GpuArrowColumnarBatchBuilder")
batchBuilder = new GpuColumnVector.GpuArrowColumnarBatchBuilder(schema, batchRowLimit, batch)
batchBuilder = new GpuColumnVector.GpuArrowColumnarBatchBuilder(schema)
} else {
logDebug("Using GpuColumnarBatchBuilder")
batchBuilder = new GpuColumnVector.GpuColumnarBatchBuilder(schema, batchRowLimit, null)
batchBuilder = new GpuColumnVector.GpuColumnarBatchBuilder(schema, batchRowLimit)
}
totalRows = 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ object RapidsBufferStore {
*/
abstract class RapidsBufferStore(
val tier: StorageTier,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton,
deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage)
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton)
extends AutoCloseable with Logging with Arm {

val name: String = tier.toString
Expand All @@ -63,10 +62,6 @@ abstract class RapidsBufferStore(
totalBytesStored += buffer.size
}

def get(id: RapidsBufferId): RapidsBufferBase = synchronized {
buffers.get(id)
}

def remove(id: RapidsBufferId): Unit = synchronized {
val obj = buffers.remove(id)
if (obj != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object ConfHelper {
v.map(stringConverter).mkString(",")
}

def byteFromString(str: String, unit: ByteUnit, key: String): Long = {
def byteFromString(str: String, unit: ByteUnit): Long = {
val (input, multiplier) =
if (str.length() > 0 && str.charAt(0) == '-') {
(str.substring(1), -1)
Expand Down Expand Up @@ -265,7 +265,7 @@ class ConfBuilder(val key: String, val register: ConfEntry[_] => Unit) {
}

def bytesConf(unit: ByteUnit): TypedConfBuilder[Long] = {
new TypedConfBuilder[Long](this, byteFromString(_, unit, key))
new TypedConfBuilder[Long](this, byteFromString(_, unit))
}

def integerConf: TypedConfBuilder[Integer] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class RapidsDiskStore(
diskBlockManager: RapidsDiskBlockManager,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton,
deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage)
extends RapidsBufferStore(StorageTier.DISK, catalog, deviceStorage) {
extends RapidsBufferStore(StorageTier.DISK, catalog) {
private[this] val sharedBufferFiles = new ConcurrentHashMap[RapidsBufferId, File]

override protected def createBuffer(incoming: RapidsBuffer, incomingBuffer: MemoryBuffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class RapidsHostMemoryStore(
maxSize: Long,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton,
deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage)
extends RapidsBufferStore(StorageTier.HOST, catalog, deviceStorage) {
extends RapidsBufferStore(StorageTier.HOST, catalog) {
private[this] val pool = HostMemoryBuffer.allocate(maxSize, false)
private[this] val addressAllocator = new AddressSpaceAllocator(maxSize)
private[this] var haveLoggedMaxExceeded = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ case class ShuffleReceivedBufferId(

/** Catalog for lookup of shuffle buffers by block ID */
class ShuffleReceivedBufferCatalog(
catalog: RapidsBufferCatalog,
diskBlockManager: RapidsDiskBlockManager) extends Logging {
catalog: RapidsBufferCatalog) extends Logging {
/** Mapping of table ID to shuffle buffer ID */
private[this] val tableMap = new ConcurrentHashMap[Int, ShuffleReceivedBufferId]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ case class GpuCreateDataSourceTableAsSelectCommand(
sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)

result match {
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
case _: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sessionState.executePlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object GpuFileFormatWriter extends Logging {

val empty2NullPlan = if (needConvert) GpuProjectExec(projectList, plan) else plan

val bucketIdExpression = bucketSpec.map { spec =>
val bucketIdExpression = bucketSpec.map { _ =>
// Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
// guarantee the data distribution is same between shuffle and bucketed data source, which
// enables us to only shuffle one side when join a bucketed table and a normal one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class GpuShuffleEnv(rapidsConf: RapidsConf) extends Logging {
shuffleCatalog =
new ShuffleBufferCatalog(RapidsBufferCatalog.singleton, diskBlockManager)
shuffleReceivedBufferCatalog =
new ShuffleReceivedBufferCatalog(RapidsBufferCatalog.singleton, diskBlockManager)
new ShuffleReceivedBufferCatalog(RapidsBufferCatalog.singleton)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ object GpuToTimestamp extends Arm {
lhs: GpuColumnVector,
sparkFormat: String,
strfFormat: String,
timeParserPolicy: TimeParserPolicy,
dtype: DType,
daysScalar: String => Scalar,
asTimestamp: (ColumnVector, String) => ColumnVector): ColumnVector = {
Expand All @@ -441,7 +440,7 @@ object GpuToTimestamp extends Arm {
withResource(daysEqual(lhs.getBase, DateUtils.TODAY)) { isToday =>
withResource(daysEqual(lhs.getBase, DateUtils.YESTERDAY)) { isYesterday =>
withResource(daysEqual(lhs.getBase, DateUtils.TOMORROW)) { isTomorrow =>
withResource(lhs.getBase.isNull) { isNull =>
withResource(lhs.getBase.isNull) { _ =>
withResource(Scalar.fromNull(dtype)) { nullValue =>
withResource(asTimestamp(lhs.getBase, strfFormat)) { converted =>
withResource(daysScalar(DateUtils.EPOCH)) { epoch =>
Expand Down Expand Up @@ -517,7 +516,6 @@ abstract class GpuToTimestamp
lhs,
sparkFormat,
strfFormat,
timeParserPolicy,
DType.TIMESTAMP_MICROSECONDS,
daysScalarMicros,
(col, strfFormat) => col.asTimestampMicroseconds(strfFormat))
Expand Down Expand Up @@ -560,7 +558,6 @@ abstract class GpuToTimestampImproved extends GpuToTimestamp {
lhs,
sparkFormat,
strfFormat,
timeParserPolicy,
DType.TIMESTAMP_SECONDS,
daysScalarSeconds,
(col, strfFormat) => col.asTimestampSeconds(strfFormat))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ object GpuBroadcastNestedLoopJoinExecBase extends Arm {
abstract class GpuBroadcastNestedLoopJoinExecBase(
left: SparkPlan,
right: SparkPlan,
join: BroadcastNestedLoopJoinExec,
joinType: JoinType,
condition: Option[Expression],
targetSizeBytes: Long) extends BinaryExecNode with GpuExec {
Expand Down
Loading

0 comments on commit 20db73c

Please sign in to comment.