Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in a GpuMemoryLeaseManager #7361

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ protected InternalRowToColumnarBatchIterator(
this.input = input;
int sizePerRowEstimate = CudfUnsafeRow.getRowSizeEstimate(schema);
numRowsEstimate = (int)Math.max(1,
Math.min(Integer.MAX_VALUE - 1, goal.targetSizeBytes() / sizePerRowEstimate));
Math.min(Integer.MAX_VALUE - 1,
GpuMemoryLeaseManager.getAdjustedTargetBatchSize(goal.targetSizeBytes()) /
sizePerRowEstimate));
dataLength = ((long) sizePerRowEstimate) * numRowsEstimate;
rapidsTypes = new DType[schema.length];
outputTypes = new DataType[schema.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.stream.Collectors;

import com.nvidia.spark.rapids.GpuMemoryLeaseManager;
import com.nvidia.spark.rapids.GpuMetric;
import com.nvidia.spark.rapids.MultiFileReaderUtils;
import com.nvidia.spark.rapids.RapidsConf;
Expand Down Expand Up @@ -307,7 +308,7 @@ static class ReadTask implements InputPartition, Serializable {
private final int maxBatchSizeRows;
private final long maxBatchSizeBytes;

private final long targetBatchSizeBytes;
private final scala.Option<Object> targetBatchSizeBytes;
private final String parquetDebugDumpPrefix;
private final int numThreads;
private final int maxNumFileProcessed;
Expand Down Expand Up @@ -368,7 +369,7 @@ public long getMaxBatchSizeBytes() {
}

public long getTargetBatchSizeBytes() {
return targetBatchSizeBytes;
return GpuMemoryLeaseManager.getAdjustedTargetBatchSize(targetBatchSizeBytes);
}

public String getParquetDebugDumpPrefix() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ object CoalesceGoal {
// Nothing is the same so there is no guarantee
BatchedByKey(Seq.empty)(Seq.empty)
}
case (TargetSize(aSize), TargetSize(bSize)) if aSize > bSize => a
case (TargetSize(Some(aSize)), TargetSize(Some(bSize))) if aSize > bSize => a
case (TargetSize(Some(_)), TargetSize(None)) => a
case (TargetSize(None), TargetSize(Some(_))) => b
case _ => b
}

Expand All @@ -131,7 +133,9 @@ object CoalesceGoal {
} else {
null
}
case (TargetSize(aSize), TargetSize(bSize)) if aSize < bSize => a
case (TargetSize(Some(aSize)), TargetSize(Some(bSize))) if aSize < bSize => a
case (TargetSize(Some(_)), TargetSize(None)) => a
case (TargetSize(None), TargetSize(Some(_))) => b
case _ => b
}

Expand All @@ -145,7 +149,8 @@ object CoalesceGoal {
aOrder.zip(bOrder).forall {
case (a, b) => a.satisfies(b)
}
case (TargetSize(foundSize), TargetSize(requiredSize)) => foundSize >= requiredSize
case (TargetSize(Some(foundSize)), TargetSize(Some(requiredSize))) => foundSize >= requiredSize
case (TargetSize(None), TargetSize(None)) => true
case _ => false // found is null so it is not satisfied
}
}
Expand All @@ -163,7 +168,11 @@ sealed abstract class CoalesceGoal extends GpuUnevaluable with ShimExpression {

sealed abstract class CoalesceSizeGoal extends CoalesceGoal {

val targetSizeBytes: Long = Integer.MAX_VALUE
/**
* The size in bytes, but if it is not set, then the target size is determined
* at runtime by the GpuMemoryLeaseManager.
*/
val targetSizeBytes: Option[Long] = Some(Integer.MAX_VALUE)
}

/**
Expand All @@ -179,7 +188,7 @@ trait RequireSingleBatchLike
*/
case object RequireSingleBatch extends CoalesceSizeGoal with RequireSingleBatchLike {

override val targetSizeBytes: Long = Long.MaxValue
override val targetSizeBytes: Option[Long] = Some(Long.MaxValue)

/** Override toString to improve readability of Spark explain output */
override def toString: String = "RequireSingleBatch"
Expand All @@ -197,7 +206,7 @@ case object RequireSingleBatch extends CoalesceSizeGoal with RequireSingleBatchL
case class RequireSingleBatchWithFilter(filterExpression: GpuExpression)
extends CoalesceSizeGoal with RequireSingleBatchLike {

override val targetSizeBytes: Long = Long.MaxValue
override val targetSizeBytes: Option[Long] = Some(Long.MaxValue)

/** Override toString to improve readability of Spark explain output */
override def toString: String = "RequireSingleBatchWithFilter"
Expand All @@ -207,10 +216,10 @@ case class RequireSingleBatchWithFilter(filterExpression: GpuExpression)
* is estimated in some cases so it may go over a little, but it should generally be
* very close to the target size. Generally you should not go over 2 GiB to avoid
* limitations in cudf for nested type columns.
* @param targetSizeBytes the size of each batch in bytes.
* @param targetSizeBytes the size of each batch in bytes, if it is set.
*/
case class TargetSize(override val targetSizeBytes: Long) extends CoalesceSizeGoal {
require(targetSizeBytes <= Integer.MAX_VALUE,
case class TargetSize(override val targetSizeBytes: Option[Long]) extends CoalesceSizeGoal {
require(targetSizeBytes.getOrElse(Int.MaxValue.toLong) <= Int.MaxValue,
"Target cannot exceed 2GB without checks for cudf row count limit")
}

Expand Down Expand Up @@ -246,6 +255,8 @@ abstract class AbstractGpuCoalesceIterator(
opTime: GpuMetric,
opName: String) extends Iterator[ColumnarBatch] with Arm with Logging {

private val targetSize =
GpuMemoryLeaseManager.getAdjustedTargetBatchSize(goal.targetSizeBytes)
private val iter = new CollectTimeIterator(s"$opName: collect", batches, streamTime)

private var batchInitialized: Boolean = false
Expand Down Expand Up @@ -448,7 +459,7 @@ abstract class AbstractGpuCoalesceIterator(
}
} else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) {
saveOnDeck(cb)
} else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) {
} else if (wouldBeBytes > targetSize && numBytes > 0) {
// There are no explicit checks for the concatenate result exceeding the cudf 2^31
// row count limit for any column. We are relying on cudf's concatenate to throw
// an exception if this occurs and limiting performance-oriented goals to under
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ case class GpuGenerateExec(
projIn,
othersProjectList.length,
outer,
new RapidsConf(conf).gpuTargetBatchSizeBytes)
GpuMemoryLeaseManager.getAdjustedTargetBatchSize(
new RapidsConf(conf).gpuTargetBatchSizeBytes))
// 2. split up input batch with indices
makeSplitIterator(projIn, splitIndices).map { splitIn =>
withResource(splitIn) { splitIn =>
Expand Down
Loading