Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-3454_w_jersey
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Mar 23, 2015
2 parents 2382bef + 9f3273b commit 0be5120
Show file tree
Hide file tree
Showing 270 changed files with 3,369 additions and 1,230 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -290,7 +290,7 @@
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.5.0</version>
<version>0.6.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

listenerBus.start()
listenerBus.start(this)
}

/** Post the application start event */
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ case object TaskKilled extends TaskFailedReason {
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptID: Int)
extends TaskFailedReason {
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ private[spark] object TaskState extends Enumeration {

type TaskState = Value

def isFailed(state: TaskState) = (LOST == state) || (FAILED == state)

def isFinished(state: TaskState) = FINISHED_STATES.contains(state)

def toMesos(state: TaskState): MesosTaskState = state match {
Expand All @@ -46,5 +48,6 @@ private[spark] object TaskState extends Enumeration {
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST => LOST
case MesosTaskState.TASK_ERROR => LOST
}
}
46 changes: 37 additions & 9 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -227,24 +228,51 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
* In addition, users can control the partitioning of the output RDD, the serializer that is use
* for the shuffle, and whether to perform map-side aggregation (if a mapper can produce multiple
* items with the same key).
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner,
mapSideCombine: Boolean,
serializer: Serializer): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
fromRDD(rdd.combineByKey(
createCombiner,
mergeValue,
mergeCombiners,
partitioner
partitioner,
mapSideCombine,
serializer
))
}

/**
* Simplified version of combineByKey that hash-partitions the output RDD.
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
* "combined type" C * Note that V and C can be different -- for example, one might group an
* RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
* functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD. This method automatically
* uses map-side aggregation in shuffling the RDD.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, true, null)
}

/**
* Simplified version of combineByKey that hash-partitions the output RDD and uses map-side
* aggregation.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
Expand Down Expand Up @@ -488,7 +516,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
* partitioner/parallelism level and using map-side aggregation.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,23 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])

/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
*/
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] =
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
* @param seed seed for the random number generator
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
wrapRDD(rdd.sample(withReplacement, fraction, seed))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
import org.apache.spark.util.Utils

/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
Expand Down Expand Up @@ -405,8 +406,7 @@ private object SparkDocker {

private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
val ipPromise = promise[String]()
val outFile = File.createTempFile("fault-tolerance-test", "")
outFile.deleteOnExit()
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
def findIpAndLog(line: String): Unit = {
if (line.startsWith("CONTAINER_IP=")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def getRunner(operateFun: () => Unit): Runnable = {
new Runnable() {
override def run() = Utils.logUncaughtExceptions {
override def run() = Utils.tryOrExit {
operateFun()
}
}
Expand Down Expand Up @@ -238,7 +238,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
} catch {
case e: Exception =>
logError(
s"Exception encountered when attempting to load application log ${fileStatus.getPath}")
s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
e)
None
}
}.toSeq.sortWith(compareAppInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ import org.apache.spark.{TaskCommitDenied, TaskEndReason}
/**
* Exception thrown when a task attempts to commit output to HDFS but is denied by the driver.
*/
class CommitDeniedException(
private[spark] class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
attemptID: Int)
extends Exception(msg) {

def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID)

def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
}

Loading

0 comments on commit 0be5120

Please sign in to comment.