Skip to content

Commit

Permalink
Fix supervise 2.2.0 (apache#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gschiavon authored and jlopezmalla committed Jan 23, 2018
1 parent 1f8a11f commit 78e0938
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 2.2.0.5 (upcoming)

* Fixed Supervise mode
* Separate stderr and stdout in dispatcher
* Fix history server stderr/stdout. Now is possible to set log level through SPARK_LOG_LEVEL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ package object config {
.stringConf
.createOptional

private[spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
"during a temporary disconnection, before tearing down all the executors.")
.doubleConf
.createWithDefault(0.0)

private[spark] val DRIVER_CONSTRAINTS =
ConfigBuilder("spark.mesos.driver.constraints")
.doc("Attribute based constraints on mesos resource offers. Applied by the dispatcher " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ private[spark] class MesosClusterScheduler(
}

private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
s"${frameworkId}-${desc.submissionId}"
val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" }.getOrElse("")
s"${frameworkId}-${desc.submissionId}${retries}"
}

private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock

import scala.collection.JavaConverters._
Expand All @@ -29,6 +30,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointAddress
Expand Down Expand Up @@ -168,6 +170,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

override def start() {
super.start()

val startedBefore = IdHelper.startedBefore.getAndSet(true)

val suffix = if (startedBefore) {
f"-${IdHelper.nextSCNumber.incrementAndGet()}%04d"
} else {
""
}

val driver = createSchedulerDriver(
master,
MesosCoarseGrainedSchedulerBackend.this,
Expand All @@ -176,11 +187,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
None,
sc.conf.getOption("spark.mesos.driver.frameworkId")
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
)

unsetFrameworkID(sc)
startScheduler(driver)
}

Expand Down Expand Up @@ -722,3 +732,9 @@ private class Slave(val hostname: String) {
var taskFailures = 0
var shuffleRegistered = false
}

object IdHelper {
// Use atomic values since Spark contexts can be initialized in parallel
private[mesos] val nextSCNumber = new AtomicLong(0)
private[mesos] val startedBefore = new AtomicBoolean(false)
}

0 comments on commit 78e0938

Please sign in to comment.