Skip to content

Commit

Permalink
[SPARK-29296][BUILD][CORE] Remove use of .par to make 2.13 support ea…
Browse files Browse the repository at this point in the history
…sier; add scala-2.13 profile to enable pulling in par collections library separately, for the future

### What changes were proposed in this pull request?

Scala 2.13 removes the parallel collections classes to a separate library, so first, this establishes a `scala-2.13` profile to bring it back, for future use.

However the library enables use of `.par` implicit conversions via a new class that is not in 2.12, which makes cross-building hard. This implements a suggested workaround from scala/scala-parallel-collections#22 to avoid `.par` entirely.

### Why are the changes needed?

To compile for 2.13 and later to work with 2.13.

### Does this PR introduce any user-facing change?

Should not, no.

### How was this patch tested?

Existing tests.

Closes #25980 from srowen/SPARK-29296.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
srowen committed Oct 3, 2019
1 parent 2bc3fff commit 7aca0dd
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 11 deletions.
9 changes: 9 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,15 @@
</plugins>
</build>
</profile>
<profile>
<id>scala-2.13</id>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>

</project>
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.reflect.ClassTag

import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
Expand Down Expand Up @@ -75,13 +76,13 @@ class UnionRDD[T: ClassTag](

override def getPartitions: Array[Partition] = {
val parRDDs = if (isPartitionListingParallel) {
val parArray = rdds.par
val parArray = new ParVector(rdds.toVector)
parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
parArray
} else {
rdds
}
val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum)
val array = new Array[Partition](parRDDs.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2992,6 +2992,19 @@
<profile>
<id>scala-2.12</id>
</profile>

<profile>
<id>scala-2.13</id>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
<version>0.2.0</version>
</dependency>
</dependencies>
</dependencyManagement>
</profile>

<!--
This is a profile to enable the use of the ASF snapshot and staging repositories
Expand Down
12 changes: 12 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,16 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>scala-2.13</id>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp}
import java.util.{Calendar, TimeZone}
import java.util.concurrent.TimeUnit._

import scala.collection.parallel.immutable.ParVector

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -113,7 +115,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
}

test("cast string to timestamp") {
ALL_TIMEZONES.par.foreach { tz =>
new ParVector(ALL_TIMEZONES.toVector).foreach { tz =>
def checkCastStringToTimestamp(str: String, expected: Timestamp): Unit = {
checkEvaluation(cast(Literal(str), TimestampType, Option(tz.getID)), expected)
}
Expand Down
12 changes: 12 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,16 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>scala-2.13</id>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit._

import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -684,7 +685,7 @@ case class AlterTableRecoverPartitionsCommand(
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
// parallelize the list of partitions here, then we can have better parallelism later.
val parArray = statuses.par
val parArray = new ParVector(statuses.toVector)
parArray.tasksupport = evalTaskSupport
parArray
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.net.{MalformedURLException, URL}
import java.sql.{Date, Timestamp}
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.parallel.immutable.ParVector

import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
Expand Down Expand Up @@ -169,7 +171,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
"org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection")

withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
spark.sessionState.functionRegistry.listFunction().par.foreach { funcId =>
val parFuncs = new ParVector(spark.sessionState.functionRegistry.listFunction().toVector)
parFuncs.foreach { funcId =>
// Examples can change settings. We clone the session to prevent tests clashing.
val clonedSpark = spark.cloneSession()
val info = clonedSpark.sessionState.catalog.lookupFunctionInfo(funcId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import java.util.Properties
import scala.collection.parallel.immutable.ParRange

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
Expand Down Expand Up @@ -46,7 +46,7 @@ class SQLExecutionSuite extends SparkFunSuite {
import spark.implicits._
try {
// Should not throw IllegalArgumentException
(1 to 100).par.foreach { _ =>
new ParRange(1 to 100).foreach { _ =>
spark.sparkContext.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
}
} finally {
Expand Down
12 changes: 12 additions & 0 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,16 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>scala-2.13</id>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.immutable.ParVector

import org.apache.spark.internal.Logging
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
Expand Down Expand Up @@ -50,8 +51,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart())
numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
inputStreams.par.foreach(_.start())
inputStreamNameAndID = inputStreams.map(is => (is.name, is.id)).toSeq
new ParVector(inputStreams.toVector).foreach(_.start())
}
}

Expand All @@ -61,7 +62,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {

def stop(): Unit = {
this.synchronized {
inputStreams.par.foreach(_.stop())
new ParVector(inputStreams.toVector).foreach(_.stop())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ExecutionContextTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.concurrent.{Await, ExecutionContext, Future}

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -313,7 +314,7 @@ private[streaming] object FileBasedWriteAheadLog {
val groupSize = taskSupport.parallelismLevel.max(8)

source.grouped(groupSize).flatMap { group =>
val parallelCollection = group.par
val parallelCollection = new ParVector(group.toVector)
parallelCollection.tasksupport = taskSupport
parallelCollection.map(handler)
}.flatten
Expand Down

0 comments on commit 7aca0dd

Please sign in to comment.