Skip to content

Commit

Permalink
Remove easy unused symbols (#2172)
Browse files Browse the repository at this point in the history
Signed-off-by: Gera Shegalov <[email protected]>
  • Loading branch information
gerashegalov authored Apr 20, 2021
1 parent a50f9dd commit 512ba88
Show file tree
Hide file tree
Showing 47 changed files with 140 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ class ArrowColumnarDataSourceV2 extends TestingV2Source {
ArrowInputPartition(TestingV2Source.dataTypesToUse, 100, 101))
}

override def createReaderFactory(): PartitionReaderFactory = {
new ColumnarReaderFactory(options)
}
override def createReaderFactory(): PartitionReaderFactory = ColumnarReaderFactory
}

override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
Expand All @@ -137,8 +135,7 @@ class ArrowColumnarDataSourceV2 extends TestingV2Source {
}
}

class ColumnarReaderFactory(options: CaseInsensitiveStringMap)
extends PartitionReaderFactory {
object ColumnarReaderFactory extends PartitionReaderFactory {
private final val BATCH_SIZE = 20

override def supportColumnarReads(partition: InputPartition): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -193,7 +193,7 @@ class StringOperatorsSuite extends SparkQueryCompareTestSuite {
class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite {
def generateResults(gen : org.apache.spark.sql.Column => org.apache.spark.sql.Column):
(Array[Row], Array[Row]) = {
val (testConf, qualifiedTestName) = setupTestConfAndQualifierName("", true, false,
val (testConf, _) = setupTestConfAndQualifierName("", true, false,
new SparkConf(), Seq.empty, 0.0, false, false)
runOnCpuAndGpu(TestCodepoints.validCodepointCharsDF,
frame => frame.select(gen(col("strings"))), testConf)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,7 @@ case class GpuBroadcastNestedLoopJoinExec(
joinType: JoinType,
condition: Option[Expression],
targetSizeBytes: Long)
extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition,
extends GpuBroadcastNestedLoopJoinExecBase(left, right, joinType, condition,
targetSizeBytes) {

def getGpuBuildSide: GpuBuildSide = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class GpuBroadcastNestedLoopJoinExec(
joinType: JoinType,
condition: Option[Expression],
targetSizeBytes: Long)
extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition,
extends GpuBroadcastNestedLoopJoinExecBase(left, right, joinType, condition,
targetSizeBytes) {

def getGpuBuildSide: GpuBuildSide = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
def putOnGpuIfNeeded(batch: ColumnarBatch): ColumnarBatch = {
if (!batch.column(0).isInstanceOf[GpuColumnVector]) {
val s: StructType = schema.toStructType
val gpuCB = new GpuColumnarBatchBuilder(s, batch.numRows(), batch).build(batch.numRows())
val gpuCB = new GpuColumnarBatchBuilder(s, batch.numRows()).build(batch.numRows())
batch.close()
gpuCB
} else {
Expand Down Expand Up @@ -1137,7 +1137,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
private def getSupportedSchemaFromUnsupported(
cachedAttributes: Seq[Attribute],
requestedAttributes: Seq[Attribute] = Seq.empty): (Seq[Attribute], Seq[Attribute]) = {
def getSupportedDataType(dataType: DataType, nullable: Boolean = true): DataType = {
def getSupportedDataType(dataType: DataType): DataType = {
dataType match {
case CalendarIntervalType =>
intervalStructType
Expand All @@ -1147,19 +1147,19 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
val newStructType = StructType(
s.map { field =>
StructField(field.name,
getSupportedDataType(field.dataType, field.nullable), field.nullable,
getSupportedDataType(field.dataType), field.nullable,
field.metadata)
})
mapping.put(s, newStructType)
newStructType
case a@ArrayType(elementType, nullable) =>
val newArrayType =
ArrayType(getSupportedDataType(elementType, nullable), nullable)
ArrayType(getSupportedDataType(elementType), nullable)
mapping.put(a, newArrayType)
newArrayType
case m@MapType(keyType, valueType, nullable) =>
val newKeyType = getSupportedDataType(keyType, nullable)
val newValueType = getSupportedDataType(valueType, nullable)
val newKeyType = getSupportedDataType(keyType)
val newValueType = getSupportedDataType(valueType)
val mapType = MapType(newKeyType, newValueType, nullable)
mapping.put(m, mapType)
mapType
Expand All @@ -1180,13 +1180,13 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
AttributeReference(attribute.name, DataTypes.ByteType, nullable = true,
metadata = attribute.metadata)(attribute.exprId).asInstanceOf[Attribute]
case s: StructType =>
AttributeReference(attribute.name, getSupportedDataType(s, attribute.nullable),
AttributeReference(attribute.name, getSupportedDataType(s),
attribute.nullable, attribute.metadata)(attribute.exprId)
case a: ArrayType =>
AttributeReference(attribute.name, getSupportedDataType(a, attribute.nullable),
AttributeReference(attribute.name, getSupportedDataType(a),
attribute.nullable, attribute.metadata)(attribute.exprId)
case m: MapType =>
AttributeReference(attribute.name, getSupportedDataType(m, attribute.nullable),
AttributeReference(attribute.name, getSupportedDataType(m),
attribute.nullable, attribute.metadata)(attribute.exprId)
case _ =>
attribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ case class GpuInMemoryTableScanExec(
lazy val readPartitions = sparkContext.longAccumulator
lazy val readBatches = sparkContext.longAccumulator

private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning

private def filteredCachedBatches() = {
// Right now just return the batch without filtering
relation.cacheBuilder.cachedColumnBuffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,8 @@ public static final class GpuArrowColumnarBatchBuilder extends GpuColumnarBatchB
/**
* A collection of builders for building up columnar data from Arrow data.
* @param schema the schema of the batch.
* @param rows the maximum number of rows in this batch.
* @param batch if this is going to copy a ColumnarBatch in a non GPU format that batch
* we are going to copy. If not this may be null. This is used to get an idea
* of how big to allocate buffers that do not necessarily correspond to the
* number of rows.
*/
public GpuArrowColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) {
public GpuArrowColumnarBatchBuilder(StructType schema) {
fields = schema.fields();
int len = fields.length;
builders = new ai.rapids.cudf.ArrowColumnBuilder[len];
Expand Down Expand Up @@ -315,9 +310,9 @@ protected ColumnVector buildAndPutOnDevice(int builderIndex) {
return gcv;
}

public void copyColumnar(ColumnVector cv, int colNum, boolean nullable, int rows) {
public void copyColumnar(ColumnVector cv, int colNum, boolean ignored, int rows) {
referenceHolders[colNum].addReferences(
HostColumnarToGpu.arrowColumnarCopy(cv, builder(colNum), nullable, rows)
HostColumnarToGpu.arrowColumnarCopy(cv, builder(colNum), rows)
);
}

Expand Down Expand Up @@ -345,12 +340,8 @@ public static final class GpuColumnarBatchBuilder extends GpuColumnarBatchBuilde
* A collection of builders for building up columnar data.
* @param schema the schema of the batch.
* @param rows the maximum number of rows in this batch.
* @param batch if this is going to copy a ColumnarBatch in a non GPU format that batch
* we are going to copy. If not this may be null. This is used to get an idea
* of how big to allocate buffers that do not necessarily correspond to the
* number of rows.
*/
public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) {
public GpuColumnarBatchBuilder(StructType schema, int rows) {
fields = schema.fields();
int len = fields.length;
builders = new ai.rapids.cudf.HostColumnVector.ColumnBuilder[len];
Expand Down Expand Up @@ -423,7 +414,7 @@ public void close() {
}

private static final class ArrowBufReferenceHolder {
private List<ReferenceManager> references = new ArrayList<>();
private final List<ReferenceManager> references = new ArrayList<>();

public void addReferences(List<ReferenceManager> refs) {
references.addAll(refs);
Expand Down Expand Up @@ -495,7 +486,7 @@ public static DType getNonNestedRapidsType(DataType type) {
* returning an empty batch from an operator is almost always the wrong thing to do.
*/
public static ColumnarBatch emptyBatch(StructType schema) {
try (GpuColumnarBatchBuilder builder = new GpuColumnarBatchBuilder(schema, 0, null)) {
try (GpuColumnarBatchBuilder builder = new GpuColumnarBatchBuilder(schema, 0)) {
return builder.build(0);
}
}
Expand All @@ -514,7 +505,7 @@ public static ColumnarBatch emptyBatch(List<Attribute> format) {
* when serializing an empty broadcast table.
*/
public static HostColumnVector[] emptyHostColumns(StructType schema) {
try (GpuColumnarBatchBuilder builder = new GpuColumnarBatchBuilder(schema, 0, null)) {
try (GpuColumnarBatchBuilder builder = new GpuColumnarBatchBuilder(schema, 0)) {
return builder.buildHostColumns();
}
}
Expand Down Expand Up @@ -911,8 +902,8 @@ public static long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
public static long getTotalDeviceMemoryUsed(GpuColumnVector[] vectors) {
long sum = 0;
HashSet<Long> found = new HashSet<>();
for (int i = 0; i < vectors.length; i++) {
ai.rapids.cudf.ColumnVector cv = vectors[i].getBase();
for (GpuColumnVector vector : vectors) {
ai.rapids.cudf.ColumnVector cv = vector.getBase();
long id = cv.getNativeView();
if (found.add(id)) {
sum += cv.getDeviceMemorySize();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DType;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,8 +78,4 @@ public final RapidsHostColumnVector incRefCount() {
public final ai.rapids.cudf.HostColumnVector getBase() {
return cudfCv;
}

public GpuColumnVector copyToDevice() {
return new GpuColumnVector(type, cudfCv.copyToDevice());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,9 +31,6 @@
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

import java.math.BigDecimal;
import java.math.RoundingMode;

/**
* A GPU accelerated version of the Spark ColumnVector.
* Most of the standard Spark APIs should never be called, as they assume that the data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public ColumnarBatch next() {
devColumn = hostColumn.copyToDevice();
}
}
try (NvtxRange range = buildRange;
try (NvtxRange ignored = buildRange;
ColumnVector cv = devColumn;
Table tab = Table.convertFromRows(cv, rapidsTypes)) {
return GpuColumnVector.from(tab, outputTypes);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,7 @@ class AutoCloseColumnBatchIterator[U](itr: Iterator[U], nextBatch: Iterator[U] =
}
}

TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => {
TaskContext.get().addTaskCompletionListener[Unit]((_: TaskContext) => {
closeCurrentBatch()
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ case class GpuCast(
if (to.scale <= from.scale) {
if (!isFrom32Bit && isTo32Bit) {
// check for overflow when 64bit => 32bit
withResource(checkForOverflow(input, from, to, isFrom32Bit)) { checkedInput =>
withResource(checkForOverflow(input, to, isFrom32Bit)) { checkedInput =>
castCheckedDecimal(checkedInput)
}
} else {
Expand All @@ -1240,15 +1240,14 @@ case class GpuCast(
}
} else {
// from.scale > to.scale
withResource(checkForOverflow(input, from, to, isFrom32Bit)) { checkedInput =>
withResource(checkForOverflow(input, to, isFrom32Bit)) { checkedInput =>
castCheckedDecimal(checkedInput)
}
}
}

def checkForOverflow(
input: ColumnVector,
from: DecimalType,
to: DecimalType,
isFrom32Bit: Boolean): ColumnVector = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se
new Iterator[(Int, ColumnarBatch)] with Arm {
var toBeReturned: Option[ColumnarBatch] = None

TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => {
TaskContext.get().addTaskCompletionListener[Unit]((_: TaskContext) => {
toBeReturned.foreach(_.close())
toBeReturned = None
dIn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
return uri
}
} catch {
case e: URISyntaxException =>
case _: URISyntaxException =>
}
new File(path).getAbsoluteFile().toURI()
}
Expand Down Expand Up @@ -1090,7 +1090,6 @@ class MultiFileParquetPartitionReader(
// and concatenate those together then go to the next column
for ((field, colIndex) <- partitionSchema.fields.zipWithIndex) {
val dataType = field.dataType
val partitionColumns = new Array[GpuColumnVector](inPartitionValues.size)
withResource(new Array[GpuColumnVector](inPartitionValues.size)) {
partitionColumns =>
for ((rowsInPart, partIndex) <- rowsPerPartition.zipWithIndex) {
Expand Down Expand Up @@ -1566,7 +1565,7 @@ class MultiFileCloudParquetPartitionReader(
// in cases close got called early for like limit() calls
isDone = true
currentFileHostBuffers.foreach { current =>
current.memBuffersAndSizes.foreach { case (buf, size) =>
current.memBuffersAndSizes.foreach { case (buf, _) =>
if (buf != null) {
buf.close()
}
Expand All @@ -1577,7 +1576,7 @@ class MultiFileCloudParquetPartitionReader(
batch = None
tasks.asScala.foreach { task =>
if (task.isDone()) {
task.get.memBuffersAndSizes.foreach { case (buf, size) =>
task.get.memBuffersAndSizes.foreach { case (buf, _) =>
if (buf != null) {
buf.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait GpuPartitioning extends Partitioning with Arm {
val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*))
GpuShuffleEnv.rapidsShuffleCodec match {
case Some(codec) =>
compressSplits(splits, codec, contiguousTables, dataTypes)
compressSplits(splits, codec, contiguousTables)
case None =>
// GpuPackedTableColumn takes ownership of the contiguous tables
closeOnExcept(contiguousTables) { cts =>
Expand Down Expand Up @@ -127,8 +127,7 @@ trait GpuPartitioning extends Partitioning with Arm {
def compressSplits(
outputBatches: ArrayBuffer[ColumnarBatch],
codec: TableCompressionCodec,
contiguousTables: Array[ContiguousTable],
dataTypes: Array[DataType]): Unit = {
contiguousTables: Array[ContiguousTable]): Unit = {
withResource(codec.createBatchCompressor(maxCompressionBatchSize,
Cuda.DEFAULT_STREAM)) { compressor =>
// tracks batches with no data and the corresponding output index for the batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private class GpuRowToColumnConverter(schema: StructType) extends Serializable w
*/
final def convertBatch(rows: Array[InternalRow], schema: StructType): ColumnarBatch = {
val numRows = rows.length
val builders = new GpuColumnarBatchBuilder(schema, numRows, null)
val builders = new GpuColumnarBatchBuilder(schema, numRows)
rows.foreach(convert(_, builders))
builders.build(numRows)
}
Expand Down Expand Up @@ -585,7 +585,7 @@ class RowToColumnarIterator(
}
}

val builders = new GpuColumnarBatchBuilder(localSchema, targetRows, null)
val builders = new GpuColumnarBatchBuilder(localSchema, targetRows)
try {
var rowCount = 0
// Double because validity can be < 1 byte, and this is just an estimate anyways
Expand Down
Loading

0 comments on commit 512ba88

Please sign in to comment.