Skip to content

Commit

Permalink
[SQL] Update SparkSQL and ScalaTest in branch-1.0 to match master.
Browse files Browse the repository at this point in the history
#511 and #863 got left out of branch-1.0 since we were really close to the release.  Now that they have been tested a little I see no reason to leave them out.

Author: Michael Armbrust <[email protected]>
Author: witgo <[email protected]>

Closes #1078 from marmbrus/branch-1.0 and squashes the following commits:

22be674 [witgo]  [SPARK-1841]: update scalatest to version 2.1.5
fc8fc79 [Michael Armbrust] Include #1071 as well.
c5d0adf [Michael Armbrust] Update SparkSQL in branch-1.0 to match master.
  • Loading branch information
marmbrus authored and rxin committed Jun 13, 2014
1 parent 00b4317 commit 7e3e9af
Show file tree
Hide file tree
Showing 30 changed files with 1,039 additions and 254 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.postfixOps
import scala.util.Random

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.{PatienceConfiguration, Eventually}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

Expand Down Expand Up @@ -76,7 +76,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
tester.assertCleanup()

// Verify that shuffles can be re-executed after cleaning up
assert(rdd.collect().toList === collected)
assert(rdd.collect().toList.equals(collected))
}

test("cleanup broadcast") {
Expand Down Expand Up @@ -285,7 +285,7 @@ class CleanerTester(
sc.cleaner.get.attachListener(cleanerListener)

/** Assert that all the stuff has been cleaned up */
def assertCleanup()(implicit waitTimeout: Eventually.Timeout) {
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
try {
eventually(waitTimeout, interval(100 millis)) {
assert(isAllCleanedUp)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {

// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.

override def beforeAll(configMap: Map[String, Any]) {
override def beforeAll() {
System.setProperty("spark.shuffle.use.netty", "true")
}

override def afterAll(configMap: Map[String, Any]) {
override def afterAll() {
System.setProperty("spark.shuffle.use.netty", "false")
}
}
5 changes: 3 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {

// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
val isEquals = coalesced5.dependencies.head.rdd.dependencies.head.rdd.
asInstanceOf[ShuffledRDD[_, _, _]] != null
assert(isEquals)

// when shuffling, we can increase the number of partitions
val coalesced6 = data.coalesce(20, shuffle = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.language.reflectiveCalls

import akka.actor._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter, FunSuiteLike}

import org.apache.spark._
import org.apache.spark.rdd.RDD
Expand All @@ -37,7 +37,7 @@ class BuggyDAGEventProcessActor extends Actor {
}
}

class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
with ImplicitSender with BeforeAndAfter with LocalSparkContext {

val conf = new SparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class TimeStampedHashMapSuite extends FunSuite {
map("k1") = strongRef
map("k2") = "v2"
map("k3") = "v3"
assert(map("k1") === strongRef)
val isEquals = map("k1") == strongRef
assert(isEquals)

// clear strong reference to "k1"
strongRef = null
Expand Down
15 changes: 11 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -458,25 +458,31 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>1.9.1</version>
<version>2.1.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<artifactId>easymockclassextension</artifactId>
<version>3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.10.0</version>
<version>1.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -778,6 +784,7 @@
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-language:postfixOps</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
Expand Down
22 changes: 11 additions & 11 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,17 @@ object SparkBuild extends Build {
*/

libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.17.Final",
"org.eclipse.jetty" % "jetty-server" % jettyVersion,
"org.eclipse.jetty" % "jetty-util" % jettyVersion,
"org.eclipse.jetty" % "jetty-plus" % jettyVersion,
"org.eclipse.jetty" % "jetty-security" % jettyVersion,
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test"
"io.netty" % "netty-all" % "4.0.17.Final",
"org.eclipse.jetty" % "jetty-server" % jettyVersion,
"org.eclipse.jetty" % "jetty-util" % jettyVersion,
"org.eclipse.jetty" % "jetty-plus" % jettyVersion,
"org.eclipse.jetty" % "jetty-security" % jettyVersion,
"org.scalatest" %% "scalatest" % "2.1.5" % "test",
"org.scalacheck" %% "scalacheck" % "1.11.3" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymockclassextension" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.9.0" % "test",
"junit" % "junit" % "4.10" % "test"
),

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
Expand Down Expand Up @@ -476,7 +477,6 @@ object SparkBuild extends Build {
// this non-deterministically. TODO: FIX THIS.
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"com.typesafe" %% "scalalogging-slf4j" % "1.0.1"
)
)
Expand Down
6 changes: 4 additions & 2 deletions repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ class ReplSuite extends FunSuite {
}

def assertContains(message: String, output: String) {
assert(output.contains(message),
val isContain = output.contains(message)
assert(isContain,
"Interpreter output did not contain '" + message + "':\n" + output)
}

def assertDoesNotContain(message: String, output: String) {
assert(!output.contains(message),
val isContain = output.contains(message)
assert(!isContain,
"Interpreter output contained '" + message + "':\n" + output)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
import org.apache.spark.sql.catalyst.types.StringType

/**
Expand All @@ -26,35 +26,37 @@ import org.apache.spark.sql.catalyst.types.StringType
*/
abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
def output: Seq[Attribute] = Seq.empty
}

/**
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
* commands that are passed directly to another system.
*/
case class NativeCommand(cmd: String) extends Command
case class NativeCommand(cmd: String) extends Command {
override def output =
Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
}

/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)()
)
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
}

/**
* Returned by a parser when the users only wants to see what query plan would be executed, without
* actually performing the execution.
*/
case class ExplainCommand(plan: LogicalPlan) extends Command {
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
override def output =
Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
}

/**
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {

comparePlans(optimized, correctAnswer)
}

test("joins: push down left outer join #1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
Expand Down
45 changes: 10 additions & 35 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.columnar.InMemoryRelation
Expand Down Expand Up @@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = {
val result = new SchemaRDD(this, parseSql(sqlText))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))

/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
Expand Down Expand Up @@ -220,17 +213,21 @@ class SQLContext(@transient val sparkContext: SparkContext)
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
* The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
* away by the filter pushdown optimization.
*
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {

val projectSet = projectList.flatMap(_.references).toSet
val filterSet = filterPredicates.flatMap(_.references).toSet
val filterCondition = filterPredicates.reduceLeftOption(And)
val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And)

// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
Expand All @@ -255,8 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] val planner = new SparkPlanner

@transient
protected[sql] lazy val emptyResult =
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)

/**
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
Expand All @@ -276,35 +272,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected abstract class QueryExecution {
def logical: LogicalPlan

def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
case SetCommand(key, value) =>
// Only this case needs to be executed eagerly. The other cases will
// be taken care of when the actual results are being extracted.
// In the case of HiveContext, sqlConf is overridden to also pass the
// pair into its HiveConf.
if (key.isDefined && value.isDefined) {
set(key.get, value.get)
}
// It doesn't matter what we return here, since this is only used
// to force the evaluation to happen eagerly. To query the results,
// one must use SchemaRDD operations to extract them.
emptyResult
case _ => executedPlan.execute()
}

lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = {
logical match {
case s: SetCommand => eagerlyProcess(s)
case _ => executedPlan.execute()
}
}
lazy val toRdd: RDD[Row] = executedPlan.execute()

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
Expand All @@ -326,7 +301,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* TODO: We only support primitive types, add support for nested types.
*/
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
val schema = rdd.first.map { case (fieldName, obj) =>
val schema = rdd.first().map { case (fieldName, obj) =>
val dataType = obj.getClass match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ import java.util.{Map => JMap}
@AlphaComponent
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient protected[spark] val logicalPlan: LogicalPlan)
@transient val baseLogicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {

def baseSchemaRDD = this
Expand Down
15 changes: 13 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.sql
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkLogicalPlan

/**
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
*/
private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient protected[spark] val logicalPlan: LogicalPlan
@transient val baseLogicalPlan: LogicalPlan

private[sql] def baseSchemaRDD: SchemaRDD

Expand All @@ -48,7 +49,17 @@ private[sql] trait SchemaRDDLike {
*/
@transient
@DeveloperApi
lazy val queryExecution = sqlContext.executePlan(logicalPlan)
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
queryExecution.toRdd
SparkLogicalPlan(queryExecution.executedPlan)
case _ =>
baseLogicalPlan
}

override def toString =
s"""${super.toString}
Expand Down
Loading

0 comments on commit 7e3e9af

Please sign in to comment.