Skip to content

Commit

Permalink
Fix #SPARK-1149 Bad partitioners can cause Spark to hang
Browse files Browse the repository at this point in the history
Author: liguoqiang <[email protected]>

Closes apache#44 from witgo/SPARK-1149 and squashes the following commits:

3dcdcaf [liguoqiang] Merge branch 'master' into SPARK-1149
8425395 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149
3dad595 [liguoqiang] review comment
e3e56aa [liguoqiang] Merge branch 'master' into SPARK-1149
b0d5c07 [liguoqiang] review comment
d0a6005 [liguoqiang] review comment
3395ee7 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149
ac006a3 [liguoqiang] code Formatting
3feb3a8 [liguoqiang] Merge branch 'master' into SPARK-1149
adc443e [liguoqiang] partitions check  bugfix
928e1e3 [liguoqiang] Added a unit test for PairRDDFunctions.lookup with bad partitioner
db6ecc5 [liguoqiang] Merge branch 'master' into SPARK-1149
1e3331e [liguoqiang] Merge branch 'master' into SPARK-1149
3348619 [liguoqiang] Optimize performance for partitions check
61e5a87 [liguoqiang] Merge branch 'master' into SPARK-1149
e68210a [liguoqiang] add partition index check to submitJob
3a65903 [liguoqiang] make the code more readable
6bb725e [liguoqiang] fix #SPARK-1149 Bad partitioners can cause Spark to hang
  • Loading branch information
liguoqiang authored and James Z.M. Gao committed Mar 15, 2014
1 parent 9c487dc commit e3ee6e9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,9 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
partitions.foreach{ p =>
require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
Expand Down Expand Up @@ -950,6 +953,9 @@ class SparkContext(
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
partitions.foreach{ p =>
require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
}
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,22 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
assert(shuffled.lookup(5) === Seq(6,7))
assert(shuffled.lookup(-1) === Seq())
}

test("lookup with bad partitioner") {
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))

val p = new Partitioner {
def numPartitions: Int = 2

def getPartition(key: Any): Int = key.hashCode() % 2
}
val shuffled = pairs.partitionBy(p)

assert(shuffled.partitioner === Some(p))
assert(shuffled.lookup(1) === Seq(2))
intercept[IllegalArgumentException] {shuffled.lookup(-1)}
}

}

/*
Expand Down

0 comments on commit e3ee6e9

Please sign in to comment.