Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 15, 2015
2 parents 0ca1801 + 8e3822a commit f6de871
Show file tree
Hide file tree
Showing 26 changed files with 292 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ function renderDagVizForJob(svgContainer) {
var stageId = metadata.attr("stage-id");
var containerId = VizConstants.graphPrefix + stageId;
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
var stageLink = "/stages/stage/?id=" +
stageId.replace(VizConstants.stagePrefix, "") + "&attempt=0&expandDagViz=true";
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
.find("a")
.attr("href") + "&expandDagViz=true";
var container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
throw new SparkException("An application name must be set in your configuration")
}

// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
// yarn-standalone is deprecated, but still supported
if ((master == "yarn-cluster" || master == "yarn-standalone") &&
!_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}

if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
Expand Down
16 changes: 3 additions & 13 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,6 @@ private[spark] object ClosureCleaner extends Logging {
logDebug(s" + fields accessed by starting closure: " + accessedFields.size)
accessedFields.foreach { f => logDebug(" " + f) }

val inInterpreter = {
try {
val interpClass = Class.forName("spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
case _: ClassNotFoundException => true
}
}

// List of outer (class, object) pairs, ordered from outermost to innermost
// Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
Expand All @@ -274,7 +265,7 @@ private[spark] object ClosureCleaner extends Logging {
// required fields from the original object. We need the parent here because the Java
// language specification requires the first constructor parameter of any closure to be
// its enclosing object.
val clone = instantiateClass(cls, parent, inInterpreter)
val clone = instantiateClass(cls, parent)
for (fieldName <- accessedFields(cls)) {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
Expand Down Expand Up @@ -327,9 +318,8 @@ private[spark] object ClosureCleaner extends Logging {

private def instantiateClass(
cls: Class[_],
enclosingObject: AnyRef,
inInterpreter: Boolean): AnyRef = {
if (!inInterpreter) {
enclosingObject: AnyRef): AnyRef = {
if (!Utils.isInInterpreter) {
// This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor
val cons = cls.getConstructors()(0)
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,20 @@ private[spark] object Utils extends Logging {
}
}

lazy val isInInterpreter: Boolean = {
try {
val interpClass = classForName("spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
// Returning true seems to be a mistake.
// Currently changing it to false causes tests failures in Streaming.
// For a more detailed discussion, please, refer to
// https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments.
// Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527
case _: ClassNotFoundException => true
}
}

/**
* Return a well-formed URI for the file described by a user input string.
*
Expand Down
10 changes: 8 additions & 2 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}

// Test that GC causes RDD cleanup after dereferencing the RDD
// Note rdd is used after previous GC to avoid early collection by the JVM
val postGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
rdd = null // Make RDD out of scope
runGC()
Expand All @@ -181,9 +182,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
}
rdd.count() // Defeat early collection by the JVM

// Test that GC causes shuffle cleanup after dereferencing the RDD
rdd.count() // Defeat any early collection of rdd variable by the JVM
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
runGC()
Expand All @@ -201,6 +202,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}

// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
// Note broadcast is used after previous GC to avoid early collection by the JVM
val postGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
broadcast = null // Make broadcast variable out of scope
runGC()
Expand All @@ -226,7 +228,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {

// the checkpoint is not cleaned by default (without the configuration set)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
rdd = null // Make RDD out of scope
rdd = null // Make RDD out of scope, ok if collected earlier
runGC()
postGCTester.assertCleanup()
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
Expand All @@ -245,6 +247,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
// Confirm the checkpoint directory exists
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))

// Reference rdd to defeat any early collection by the JVM
rdd.count()

// Test that GC causes checkpoint data cleanup after dereferencing the RDD
postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
rdd = null // Make RDD out of scope
Expand Down Expand Up @@ -352,6 +357,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
}
rdd.count() // Defeat early collection by the JVM

// Test that GC causes shuffle cleanup after dereferencing the RDD
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ class DenseMatrix(

override def copy: DenseMatrix = new DenseMatrix(numRows, numCols, values.clone())

private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f))
private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f),
isTransposed)

private[mllib] def update(f: Double => Double): DenseMatrix = {
val len = values.length
Expand Down Expand Up @@ -535,7 +536,7 @@ class SparseMatrix(
}

private[mllib] def map(f: Double => Double) =
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f))
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f), isTransposed)

private[mllib] def update(f: Double => Double): SparseMatrix = {
val len = values.length
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def predict(self, x):
if isinstance(x, RDD):
cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
return cluster_labels
else:
raise TypeError("x should be represented by an RDD, "
"but got %s." % type(x))

def predictSoft(self, x):
"""
Expand All @@ -225,6 +228,9 @@ def predictSoft(self, x):
membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
_convert_to_vector(self._weights), means, sigmas)
return membership_matrix.map(lambda x: pyarray.array('d', x))
else:
raise TypeError("x should be represented by an RDD, "
"but got %s." % type(x))


class GaussianMixture(object):
Expand Down
6 changes: 5 additions & 1 deletion sbin/start-master.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
sbin="`dirname "$0"`"
sbin="`cd "$sbin"; pwd`"

ORIGINAL_ARGS="$@"

START_TACHYON=false

while (( "$#" )); do
Expand Down Expand Up @@ -53,7 +55,9 @@ if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
SPARK_MASTER_WEBUI_PORT=8080
fi

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
--ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS

if [ "$START_TACHYON" == "true" ]; then
"$sbin"/../tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP
Expand Down
14 changes: 7 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
} else if (conf.parquetUseDataSourceApi) {
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
baseRelationToDataFrame(
new FSBasedParquetRelation(
new ParquetRelation2(
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
} else {
DataFrame(this, parquet.ParquetRelation(
Expand Down Expand Up @@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand All @@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand All @@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def jdbc(
url: String,
table: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
Expand Down Expand Up @@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val parts = JDBCRelation.columnPartition(partitioning)
jdbc(url, table, parts, properties)
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand Down Expand Up @@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
jdbc(url, table, parts, properties)
}

private def jdbc(
url: String,
table: String,
Expand Down
Loading

0 comments on commit f6de871

Please sign in to comment.