From a8ce923ca671a3f94cb96bf0513b92c229a32bff Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 25 Mar 2021 18:37:52 -0500 Subject: [PATCH] Update order by to not load native libraries when sorting (#2022) Signed-off-by: Robert (Bobby) Evans --- .../com/nvidia/spark/rapids/GpuHashPartitioning.scala | 4 ++-- .../main/scala/com/nvidia/spark/rapids/SortUtils.scala | 10 +++++----- .../spark/sql/rapids/GpuFileFormatDataWriter.scala | 6 +++--- .../execution/python/GpuWindowInPandasExecBase.scala | 9 +++++---- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala index ebb764270bb..82ddd40a672 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, Table} +import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, OrderByArg, Table} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution} @@ -65,7 +65,7 @@ case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int) allColumns += parts allColumns ++= GpuColumnVector.extractBases(batch) withResource(new Table(allColumns: _*)) { fullTable => - fullTable.orderBy(Table.asc(0)) + fullTable.orderBy(OrderByArg.asc(0)) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala index b4361d7631d..cb57d905ac7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ColumnVector, NvtxColor, Table} +import ai.rapids.cudf.{ColumnVector, NvtxColor, OrderByArg, Table} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, NullsFirst, NullsLast, SortOrder} import org.apache.spark.sql.types.DataType @@ -33,11 +33,11 @@ object SortUtils extends Arm { case _ => None } - def getOrder(order: SortOrder, index: Int): Table.OrderByArg = + def getOrder(order: SortOrder, index: Int): OrderByArg = if (order.isAscending) { - Table.asc(index, order.nullOrdering == NullsFirst) + OrderByArg.asc(index, order.nullOrdering == NullsFirst) } else { - Table.desc(index, order.nullOrdering == NullsLast) + OrderByArg.desc(index, order.nullOrdering == NullsLast) } } @@ -88,7 +88,7 @@ class GpuSorter( private[this] lazy val (sortOrdersThatNeedComputation, cudfOrdering, cpuOrderingInternal) = { val sortOrdersThatNeedsComputation = mutable.ArrayBuffer[SortOrder]() val cpuOrdering = mutable.ArrayBuffer[SortOrder]() - val cudfOrdering = mutable.ArrayBuffer[Table.OrderByArg]() + val cudfOrdering = mutable.ArrayBuffer[OrderByArg]() var newColumnIndex = numInputColumns // Remove duplicates in the ordering itself because // there is no need to do it twice. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 34a40f3d6d3..4ef35319792 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids import scala.collection.mutable -import ai.rapids.cudf.{ContiguousTable, Table} +import ai.rapids.cudf.{ContiguousTable, OrderByArg, Table} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.hadoop.fs.Path @@ -276,7 +276,7 @@ class GpuDynamicPartitionDataWriter( val columnIds = 0 until t.getNumberOfColumns val distinct = t.groupBy(columnIds: _*).aggregate() try { - distinct.orderBy(columnIds.map(Table.asc(_, nullsSmallest)): _*) + distinct.orderBy(columnIds.map(OrderByArg.asc(_, nullsSmallest)): _*) } finally { distinct.close() } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala index f79a7d9d9f6..53812b31095 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala @@ -16,14 +16,15 @@ package org.apache.spark.sql.rapids.execution.python +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import ai.rapids.cudf -import ai.rapids.cudf.{Aggregation, Table} +import ai.rapids.cudf.{Aggregation, OrderByArg} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} @@ -125,7 +126,7 @@ class GroupingIterator( } } val orderedTable = withResource(cntTable) { table => - table.orderBy(partitionIndices.map(id => Table.asc(id, true)): _*) + table.orderBy(partitionIndices.map(id => OrderByArg.asc(id, true)): _*) } val (countHostCol, numRows) = withResource(orderedTable) { table => // Yes copying the data to host, it would be OK since just copying the aggregated