From e248328b39f52073422a12fd0388208de41be1c7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 29 Aug 2014 11:47:49 -0700 Subject: [PATCH 01/15] [SPARK-3307] [PySpark] Fix doc string of SparkContext.broadcast() remove invalid docs Author: Davies Liu Closes #2202 from davies/keep and squashes the following commits: aa3b44f [Davies Liu] remove invalid docs --- python/pyspark/context.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a90870ed3a353..82f76de31afc1 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -568,8 +568,6 @@ def broadcast(self, value): L{Broadcast} object for reading it in distributed functions. The variable will be sent to each cluster only once. - - :keep: Keep the `value` in driver or not. """ ser = CompressedSerializer(PickleSerializer()) # pass large object by py4j is very slow and need much memory From 53aa8316e88980c6f46d3b9fc90d935a4738a370 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Fri, 29 Aug 2014 15:23:32 -0700 Subject: [PATCH 02/15] [Docs] SQL doc formatting and typo fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As [reported on the dev list](http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-1-1-0-RC2-tp8107p8131.html): * Code fencing with triple-backticks doesn’t seem to work like it does on GitHub. Newlines are lost. Instead, use 4-space indent to format small code blocks. * Nested bullets need 2 leading spaces, not 1. * Spellcheck! Author: Nicholas Chammas Author: nchammas Closes #2201 from nchammas/sql-doc-fixes and squashes the following commits: 873f889 [Nicholas Chammas] [Docs] fix skip-api flag 5195e0c [Nicholas Chammas] [Docs] SQL doc formatting and typo fixes 3b26c8d [nchammas] [Spark QA] Link to console output on test time out --- docs/README.md | 2 +- docs/sql-programming-guide.md | 109 ++++++++++++++++------------------ 2 files changed, 52 insertions(+), 59 deletions(-) diff --git a/docs/README.md b/docs/README.md index fd7ba4e0d72ea..0a0126c5747d1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -30,7 +30,7 @@ called `_site` containing index.html as well as the rest of the compiled files. You can modify the default Jekyll build as follows: # Skip generating API docs (which takes a while) - $ SKIP_SCALADOC=1 jekyll build + $ SKIP_API=1 jekyll build # Serve content locally on port 4000 $ jekyll serve --watch # Build the site with extra features used on the live page diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index c41f2804a6021..8f7fb5431cfb6 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -474,10 +474,10 @@ anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. -In order to use Hive you must first run '`sbt/sbt -Phive assembly/assembly`' (or use `-Phive` for maven). +In order to use Hive you must first run "`sbt/sbt -Phive assembly/assembly`" (or use `-Phive` for maven). This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries -(SerDes) in order to acccess data stored in Hive. +(SerDes) in order to access data stored in Hive. Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. @@ -576,9 +576,8 @@ evaluated by the SQL execution engine. A full list of the functions supported c ## Running the Thrift JDBC server -The Thrift JDBC server implemented here corresponds to the [`HiveServer2`] -(https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) in Hive 0.12. You can test -the JDBC server with the beeline script comes with either Spark or Hive 0.12. +The Thrift JDBC server implemented here corresponds to the [`HiveServer2`](https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) +in Hive 0.12. You can test the JDBC server with the beeline script comes with either Spark or Hive 0.12. To start the JDBC server, run the following in the Spark directory: @@ -597,7 +596,7 @@ Connect to the JDBC server in beeline with: Beeline will ask you for a username and password. In non-secure mode, simply enter the username on your machine and a blank password. For secure mode, please follow the instructions given in the -[beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients) +[beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients). Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. @@ -616,11 +615,10 @@ In Shark, default reducer number is 1 and is controlled by the property `mapred. SQL deprecates this property by a new property `spark.sql.shuffle.partitions`, whose default value is 200. Users may customize this property via `SET`: -``` -SET spark.sql.shuffle.partitions=10; -SELECT page, count(*) c FROM logs_last_month_cached -GROUP BY page ORDER BY c DESC LIMIT 10; -``` + SET spark.sql.shuffle.partitions=10; + SELECT page, count(*) c + FROM logs_last_month_cached + GROUP BY page ORDER BY c DESC LIMIT 10; You may also put this property in `hive-site.xml` to override the default value. @@ -630,22 +628,18 @@ For now, the `mapred.reduce.tasks` property is still recognized, and is converte #### Caching The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no -longer automcatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to +longer automatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to let user control table caching explicitly: -``` -CACHE TABLE logs_last_month; -UNCACHE TABLE logs_last_month; -``` + CACHE TABLE logs_last_month; + UNCACHE TABLE logs_last_month; -**NOTE** `CACHE TABLE tbl` is lazy, it only marks table `tbl` as "need to by cached if necessary", +**NOTE:** `CACHE TABLE tbl` is lazy, it only marks table `tbl` as "need to by cached if necessary", but doesn't actually cache it until a query that touches `tbl` is executed. To force the table to be cached, you may simply count the table immediately after executing `CACHE TABLE`: -``` -CACHE TABLE logs_last_month; -SELECT COUNT(1) FROM logs_last_month; -``` + CACHE TABLE logs_last_month; + SELECT COUNT(1) FROM logs_last_month; Several caching related features are not supported yet: @@ -655,7 +649,7 @@ Several caching related features are not supported yet: ### Compatibility with Apache Hive -#### Deploying in Exising Hive Warehouses +#### Deploying in Existing Hive Warehouses Spark SQL Thrift JDBC server is designed to be "out of the box" compatible with existing Hive installations. You do not need to modify your existing Hive Metastore or change the data placement @@ -666,50 +660,50 @@ or partitioning of your tables. Spark SQL supports the vast majority of Hive features, such as: * Hive query statements, including: - * `SELECT` - * `GROUP BY - * `ORDER BY` - * `CLUSTER BY` - * `SORT BY` + * `SELECT` + * `GROUP BY` + * `ORDER BY` + * `CLUSTER BY` + * `SORT BY` * All Hive operators, including: - * Relational operators (`=`, `⇔`, `==`, `<>`, `<`, `>`, `>=`, `<=`, etc) - * Arthimatic operators (`+`, `-`, `*`, `/`, `%`, etc) - * Logical operators (`AND`, `&&`, `OR`, `||`, etc) - * Complex type constructors - * Mathemtatical functions (`sign`, `ln`, `cos`, etc) - * String functions (`instr`, `length`, `printf`, etc) + * Relational operators (`=`, `⇔`, `==`, `<>`, `<`, `>`, `>=`, `<=`, etc) + * Arithmetic operators (`+`, `-`, `*`, `/`, `%`, etc) + * Logical operators (`AND`, `&&`, `OR`, `||`, etc) + * Complex type constructors + * Mathematical functions (`sign`, `ln`, `cos`, etc) + * String functions (`instr`, `length`, `printf`, etc) * User defined functions (UDF) * User defined aggregation functions (UDAF) -* User defined serialization formats (SerDe's) +* User defined serialization formats (SerDes) * Joins - * `JOIN` - * `{LEFT|RIGHT|FULL} OUTER JOIN` - * `LEFT SEMI JOIN` - * `CROSS JOIN` + * `JOIN` + * `{LEFT|RIGHT|FULL} OUTER JOIN` + * `LEFT SEMI JOIN` + * `CROSS JOIN` * Unions -* Sub queries - * `SELECT col FROM ( SELECT a + b AS col from t1) t2` +* Sub-queries + * `SELECT col FROM ( SELECT a + b AS col from t1) t2` * Sampling * Explain * Partitioned tables * All Hive DDL Functions, including: - * `CREATE TABLE` - * `CREATE TABLE AS SELECT` - * `ALTER TABLE` + * `CREATE TABLE` + * `CREATE TABLE AS SELECT` + * `ALTER TABLE` * Most Hive Data types, including: - * `TINYINT` - * `SMALLINT` - * `INT` - * `BIGINT` - * `BOOLEAN` - * `FLOAT` - * `DOUBLE` - * `STRING` - * `BINARY` - * `TIMESTAMP` - * `ARRAY<>` - * `MAP<>` - * `STRUCT<>` + * `TINYINT` + * `SMALLINT` + * `INT` + * `BIGINT` + * `BOOLEAN` + * `FLOAT` + * `DOUBLE` + * `STRING` + * `BINARY` + * `TIMESTAMP` + * `ARRAY<>` + * `MAP<>` + * `STRUCT<>` #### Unsupported Hive Functionality @@ -749,8 +743,7 @@ releases of Spark SQL. Hive automatically converts the join into a map join. We are adding this auto conversion in the next release. * Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you - need to control the degree of parallelism post-shuffle using "SET - spark.sql.shuffle.partitions=[num_tasks];". We are going to add auto-setting of parallelism in the + need to control the degree of parallelism post-shuffle using "`SET spark.sql.shuffle.partitions=[num_tasks];`". We are going to add auto-setting of parallelism in the next release. * Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result. From 2f1519defaba4f3c7d536669f909bfd9e13e4069 Mon Sep 17 00:00:00 2001 From: William Benton Date: Fri, 29 Aug 2014 15:26:59 -0700 Subject: [PATCH 03/15] SPARK-2813: [SQL] Implement SQRT() directly in Spark SQL This PR adds a native implementation for SQL SQRT() and thus avoids delegating this function to Hive. Author: William Benton Closes #1750 from willb/spark-2813 and squashes the following commits: 22c8a79 [William Benton] Fixed missed newline from rebase d673861 [William Benton] Added string coercions for SQRT and associated test case e125df4 [William Benton] Added ExpressionEvaluationSuite test cases for SQRT 7b84bcd [William Benton] SQL SQRT now properly returns NULL for NULL inputs 8256971 [William Benton] added SQRT test to SqlQuerySuite 504d2e5 [William Benton] Added native SQRT implementation --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 2 ++ .../sql/catalyst/analysis/HiveTypeCoercion.scala | 2 ++ .../sql/catalyst/expressions/arithmetic.scala | 13 +++++++++++++ .../expressions/ExpressionEvaluationSuite.scala | 13 +++++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++++++ .../scala/org/apache/spark/sql/hive/HiveQl.scala | 2 ++ 6 files changed, 46 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 2c73a80f64ebf..4f166c06b6997 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -122,6 +122,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val EXCEPT = Keyword("EXCEPT") protected val SUBSTR = Keyword("SUBSTR") protected val SUBSTRING = Keyword("SUBSTRING") + protected val SQRT = Keyword("SQRT") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -323,6 +324,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { (SUBSTR | SUBSTRING) ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ { case s ~ "," ~ p ~ "," ~ l => Substring(s,p,l) } | + SQRT ~> "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } | ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ { case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 15eb5982a4a91..ecfcd62d2063f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -227,6 +227,8 @@ trait HiveTypeCoercion { Sum(Cast(e, DoubleType)) case Average(e) if e.dataType == StringType => Average(Cast(e, DoubleType)) + case Sqrt(e) if e.dataType == StringType => + Sqrt(Cast(e, DoubleType)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index aae86a3628be1..56f042891a2e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -33,6 +33,19 @@ case class UnaryMinus(child: Expression) extends UnaryExpression { } } +case class Sqrt(child: Expression) extends UnaryExpression { + type EvaluatedType = Any + + def dataType = child.dataType + override def foldable = child.foldable + def nullable = child.nullable + override def toString = s"SQRT($child)" + + override def eval(input: Row): Any = { + n1(child, input, ((na,a) => math.sqrt(na.toDouble(a)))) + } +} + abstract class BinaryArithmetic extends BinaryExpression { self: Product => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index f1df817c41362..b961346dfc995 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -577,4 +577,17 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation(s.substring(0, 2), "ex", row) checkEvaluation(s.substring(0), "example", row) } + + test("SQRT") { + val inputSequence = (1 to (1<<24) by 511).map(_ * (1L<<24)) + val expectedResults = inputSequence.map(l => math.sqrt(l.toDouble)) + val rowSequence = inputSequence.map(l => new GenericRow(Array[Any](l.toDouble))) + val d = 'a.double.at(0) + + for ((row, expected) <- rowSequence zip expectedResults) { + checkEvaluation(Sqrt(d), expected, row) + } + + checkEvaluation(Sqrt(Literal(null, DoubleType)), null, new GenericRow(Array[Any](null))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9b2a36d33fca7..4047bc0672bbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -34,6 +34,20 @@ class SQLQuerySuite extends QueryTest { "test") } + test("SQRT") { + checkAnswer( + sql("SELECT SQRT(key) FROM testData"), + (1 to 100).map(x => Row(math.sqrt(x.toDouble))).toSeq + ) + } + + test("SQRT with automatic string casts") { + checkAnswer( + sql("SELECT SQRT(CAST(key AS STRING)) FROM testData"), + (1 to 100).map(x => Row(math.sqrt(x.toDouble))).toSeq + ) + } + test("SPARK-2407 Added Parser of SQL SUBSTR()") { checkAnswer( sql("SELECT substr(tableName, 1, 2) FROM tableName"), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index fa3adfdf5855c..a4dd6be5f9e35 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -889,6 +889,7 @@ private[hive] object HiveQl { val WHEN = "(?i)WHEN".r val CASE = "(?i)CASE".r val SUBSTR = "(?i)SUBSTR(?:ING)?".r + val SQRT = "(?i)SQRT".r protected def nodeToExpr(node: Node): Expression = node match { /* Attribute References */ @@ -958,6 +959,7 @@ private[hive] object HiveQl { case Token(DIV(), left :: right:: Nil) => Cast(Divide(nodeToExpr(left), nodeToExpr(right)), LongType) case Token("%", left :: right:: Nil) => Remainder(nodeToExpr(left), nodeToExpr(right)) + case Token("TOK_FUNCTION", Token(SQRT(), Nil) :: arg :: Nil) => Sqrt(nodeToExpr(arg)) /* Comparisons */ case Token("=", left :: right:: Nil) => EqualTo(nodeToExpr(left), nodeToExpr(right)) From 287c0ac7722dd4bc51b921ccc6f0e3c1625b5ff4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 29 Aug 2014 15:29:43 -0700 Subject: [PATCH 04/15] [SPARK-3234][Build] Fixed environment variables that rely on deprecated command line options in make-distribution.sh Please refer to [SPARK-3234](https://issues.apache.org/jira/browse/SPARK-3234) for details. Author: Cheng Lian Closes #2208 from liancheng/spark-3234 and squashes the following commits: fb26de8 [Cheng Lian] Fixed SPARK-3234 --- make-distribution.sh | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/make-distribution.sh b/make-distribution.sh index f7a6a9d838bb6..ee1399071112d 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -113,7 +113,17 @@ if ! which mvn &>/dev/null; then echo -e "Download Maven from https://maven.apache.org/" exit -1; fi + VERSION=$(mvn help:evaluate -Dexpression=project.version 2>/dev/null | grep -v "INFO" | tail -n 1) +SPARK_HADOOP_VERSION=$(mvn help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null\ + | grep -v "INFO"\ + | tail -n 1) +SPARK_HIVE=$(mvn help:evaluate -Dexpression=project.activeProfiles $@ 2>/dev/null\ + | grep -v "INFO"\ + | fgrep --count "hive";\ + # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\ + # because we use "set -o pipefail" + echo -n) JAVA_CMD="$JAVA_HOME"/bin/java JAVA_VERSION=$("$JAVA_CMD" -version 2>&1) @@ -175,7 +185,7 @@ cp "$FWDIR"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/" mkdir -p "$DISTDIR/examples/src/main" cp -r "$FWDIR"/examples/src/main "$DISTDIR/examples/src/" -if [ "$SPARK_HIVE" == "true" ]; then +if [ "$SPARK_HIVE" == "1" ]; then cp "$FWDIR"/lib_managed/jars/datanucleus*.jar "$DISTDIR/lib/" fi From dc4d577c6549df58f42c0e22cac354554d169896 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 29 Aug 2014 15:32:26 -0700 Subject: [PATCH 05/15] [SPARK-3198] [SQL] Remove the TreeNode.id Thus id property of the TreeNode API does save time in a faster way to compare 2 TreeNodes, it is kind of performance bottleneck during the expression object creation in a multi-threading env (because of the memory barrier). Fortunately, the tree node comparison only happen once in master, so even we remove it, the entire performance will not be affected. Author: Cheng Hao Closes #2155 from chenghao-intel/treenode and squashes the following commits: 7cf2cd2 [Cheng Hao] Remove the implicit keyword for TreeNodeRef and some other small issues 5873415 [Cheng Hao] Remove the TreeNode.id --- .../sql/catalyst/planning/patterns.scala | 11 +++++---- .../spark/sql/catalyst/plans/QueryPlan.scala | 12 +++++----- .../spark/sql/catalyst/trees/TreeNode.scala | 24 ++----------------- .../spark/sql/catalyst/trees/package.scala | 11 +++++++++ .../sql/catalyst/trees/TreeNodeSuite.scala | 5 +++- .../sql/execution/GeneratedAggregate.scala | 10 ++++---- .../spark/sql/execution/debug/package.scala | 7 +++--- .../spark/sql/execution/pythonUdfs.scala | 2 +- 8 files changed, 40 insertions(+), 42 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 90923fe31a063..f0fd9a8b9a46e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.catalyst.planning import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.trees.TreeNodeRef +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -134,8 +135,8 @@ object PartialAggregation { // Only do partial aggregation if supported by all aggregate expressions. if (allAggregates.size == partialAggregates.size) { // Create a map of expressions to their partial evaluations for all aggregate expressions. - val partialEvaluations: Map[Long, SplitEvaluation] = - partialAggregates.map(a => (a.id, a.asPartial)).toMap + val partialEvaluations: Map[TreeNodeRef, SplitEvaluation] = + partialAggregates.map(a => (new TreeNodeRef(a), a.asPartial)).toMap // We need to pass all grouping expressions though so the grouping can happen a second // time. However some of them might be unnamed so we alias them allowing them to be @@ -148,8 +149,8 @@ object PartialAggregation { // Replace aggregations with a new expression that computes the result from the already // computed partial evaluations and grouping values. val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp { - case e: Expression if partialEvaluations.contains(e.id) => - partialEvaluations(e.id).finalEvaluation + case e: Expression if partialEvaluations.contains(new TreeNodeRef(e)) => + partialEvaluations(new TreeNodeRef(e)).finalEvaluation case e: Expression if namedGroupingExpressions.contains(e) => namedGroupingExpressions(e).toAttribute }).asInstanceOf[Seq[NamedExpression]] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 1e177e28f80b3..af9e4d86e995a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -50,11 +50,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy @inline def transformExpressionDown(e: Expression) = { val newE = e.transformDown(rule) - if (newE.id != e.id && newE != e) { + if (newE.fastEquals(e)) { + e + } else { changed = true newE - } else { - e } } @@ -82,11 +82,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy @inline def transformExpressionUp(e: Expression) = { val newE = e.transformUp(rule) - if (newE.id != e.id && newE != e) { + if (newE.fastEquals(e)) { + e + } else { changed = true newE - } else { - e } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 96ce35939e2cc..2013ae4f7bd13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -19,11 +19,6 @@ package org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.errors._ -object TreeNode { - private val currentId = new java.util.concurrent.atomic.AtomicLong - protected def nextId() = currentId.getAndIncrement() -} - /** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */ private class MutableInt(var i: Int) @@ -33,29 +28,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { /** Returns a Seq of the children of this node */ def children: Seq[BaseType] - /** - * A globally unique id for this specific instance. Not preserved across copies. - * Unlike `equals`, `id` can be used to differentiate distinct but structurally - * identical branches of a tree. - */ - val id = TreeNode.nextId() - - /** - * Returns true if other is the same [[catalyst.trees.TreeNode TreeNode]] instance. Unlike - * `equals` this function will return false for different instances of structurally identical - * trees. - */ - def sameInstance(other: TreeNode[_]): Boolean = { - this.id == other.id - } - /** * Faster version of equality which short-circuits when two treeNodes are the same instance. * We don't just override Object.Equals, as doing so prevents the scala compiler from from * generating case class `equals` methods */ def fastEquals(other: TreeNode[_]): Boolean = { - sameInstance(other) || this == other + this.eq(other) || this == other } /** @@ -393,3 +372,4 @@ trait UnaryNode[BaseType <: TreeNode[BaseType]] { def child: BaseType def children = child :: Nil } + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala index d725a92c06f7b..79a8e06d4b4d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala @@ -37,4 +37,15 @@ package object trees extends Logging { // Since we want tree nodes to be lightweight, we create one logger for all treenode instances. protected override def logName = "catalyst.trees" + /** + * A [[TreeNode]] companion for reference equality for Hash based Collection. + */ + class TreeNodeRef(val obj: TreeNode[_]) { + override def equals(o: Any) = o match { + case that: TreeNodeRef => that.obj.eq(obj) + case _ => false + } + + override def hashCode = if (obj == null) 0 else obj.hashCode + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 296202543e2ca..036fd3fa1d6a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -51,7 +51,10 @@ class TreeNodeSuite extends FunSuite { val after = before transform { case Literal(5, _) => Literal(1)} assert(before === after) - assert(before.map(_.id) === after.map(_.id)) + // Ensure that the objects after are the same objects before the transformation. + before.map(identity[Expression]).zip(after.map(identity[Expression])).foreach { + case (b, a) => assert(b eq a) + } } test("collect") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 31ad5e8aabb0e..b3edd5020fa8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.trees._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.types._ @@ -141,9 +142,10 @@ case class GeneratedAggregate( val computationSchema = computeFunctions.flatMap(_.schema) - val resultMap: Map[Long, Expression] = aggregatesToCompute.zip(computeFunctions).map { - case (agg, func) => agg.id -> func.result - }.toMap + val resultMap: Map[TreeNodeRef, Expression] = + aggregatesToCompute.zip(computeFunctions).map { + case (agg, func) => new TreeNodeRef(agg) -> func.result + }.toMap val namedGroups = groupingExpressions.zipWithIndex.map { case (ne: NamedExpression, _) => (ne, ne) @@ -156,7 +158,7 @@ case class GeneratedAggregate( // The set of expressions that produce the final output given the aggregation buffer and the // grouping expressions. val resultExpressions = aggregateExpressions.map(_.transform { - case e: Expression if resultMap.contains(e.id) => resultMap(e.id) + case e: Expression if resultMap.contains(new TreeNodeRef(e)) => resultMap(new TreeNodeRef(e)) case e: Expression if groupMap.contains(e) => groupMap(e) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 5b896c55b7393..8ff757bbe3508 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -23,6 +23,7 @@ import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext._ import org.apache.spark.sql.{SchemaRDD, Row} +import org.apache.spark.sql.catalyst.trees.TreeNodeRef /** * :: DeveloperApi :: @@ -43,10 +44,10 @@ package object debug { implicit class DebugQuery(query: SchemaRDD) { def debug(): Unit = { val plan = query.queryExecution.executedPlan - val visited = new collection.mutable.HashSet[Long]() + val visited = new collection.mutable.HashSet[TreeNodeRef]() val debugPlan = plan transform { - case s: SparkPlan if !visited.contains(s.id) => - visited += s.id + case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) => + visited += new TreeNodeRef(s) DebugNode(s) } println(s"Results returned: ${debugPlan.execute().count()}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index aef6ebf86b1eb..3dc8be2456781 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -98,7 +98,7 @@ private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] { logical.Project( l.output, l.transformExpressions { - case p: PythonUDF if p.id == udf.id => evaluation.resultAttribute + case p: PythonUDF if p.fastEquals(udf) => evaluation.resultAttribute }.withNewChildren(newChildren)) } } From b1eccfc88a13b937d42bbae8a0c3f09cffc5ae47 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 29 Aug 2014 15:34:59 -0700 Subject: [PATCH 06/15] [SQL] Turns on in-memory columnar compression in HiveCompatibilitySuite `HiveCompatibilitySuite` already turns on in-memory columnar caching, it would be good to also enable compression to improve test coverage. Author: Cheng Lian Closes #2190 from liancheng/compression-on and squashes the following commits: 88b536c [Cheng Lian] Code cleanup, narrowed field visibility d13efd2 [Cheng Lian] Turns on in-memory columnar compression in HiveCompatibilitySuite --- .../hive/execution/HiveCompatibilitySuite.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 035fd3214bd1d..b589994bd25fa 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -22,6 +22,7 @@ import java.util.{Locale, TimeZone} import org.scalatest.BeforeAndAfter +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.hive.test.TestHive /** @@ -29,29 +30,31 @@ import org.apache.spark.sql.hive.test.TestHive */ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // TODO: bundle in jar files... get from classpath - lazy val hiveQueryDir = TestHive.getHiveFile("ql" + File.separator + "src" + - File.separator + "test" + File.separator + "queries" + File.separator + "clientpositive") + private lazy val hiveQueryDir = TestHive.getHiveFile( + "ql/src/test/queries/clientpositive".split("/").mkString(File.separator)) - var originalTimeZone: TimeZone = _ - var originalLocale: Locale = _ + private val originalTimeZone = TimeZone.getDefault + private val originalLocale = Locale.getDefault + private val originalUseCompression = TestHive.useCompression def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) override def beforeAll() { + // Enable in-memory columnar caching TestHive.cacheTables = true // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) - originalTimeZone = TimeZone.getDefault TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) - // Add Locale setting - originalLocale = Locale.getDefault Locale.setDefault(Locale.US) + // Enable in-memory columnar compression + TestHive.setConf(SQLConf.COMPRESS_CACHED, "true") } override def afterAll() { TestHive.cacheTables = false TimeZone.setDefault(originalTimeZone) Locale.setDefault(originalLocale) + TestHive.setConf(SQLConf.COMPRESS_CACHED, originalUseCompression.toString) } /** A list of tests deemed out of scope currently and thus completely disregarded. */ From d94a44d7caaf3fe7559d9ad7b10872fa16cf81ca Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 29 Aug 2014 15:36:04 -0700 Subject: [PATCH 07/15] [SPARK-3269][SQL] Decreases initial buffer size for row set to prevent OOM When a large batch size is specified, `SparkSQLOperationManager` OOMs even if the whole result set is much smaller than the batch size. Author: Cheng Lian Closes #2171 from liancheng/jdbc-fetch-size and squashes the following commits: 5e1623b [Cheng Lian] Decreases initial buffer size for row set to prevent OOM --- .../hive/thriftserver/server/SparkSQLOperationManager.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 6eccb1ba6d4dc..f12b5a69a09f7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -66,9 +66,10 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage if (!iter.hasNext) { new RowSet() } else { - val maxRows = maxRowsL.toInt // Do you really want a row batch larger than Int Max? No. + // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int + val maxRows = maxRowsL.toInt var curRow = 0 - var rowSet = new ArrayBuffer[Row](maxRows) + var rowSet = new ArrayBuffer[Row](maxRows.min(1024)) while (curRow < maxRows && iter.hasNext) { val sparkRow = iter.next() From 634d04b87c2744d645e9c26e746ba2006371d9b5 Mon Sep 17 00:00:00 2001 From: "qiping.lqp" Date: Fri, 29 Aug 2014 15:37:43 -0700 Subject: [PATCH 08/15] [SPARK-3291][SQL]TestcaseName in createQueryTest should not contain ":" ":" is not allowed to appear in a file name of Windows system. If file name contains ":", this file can't be checked out in a Windows system and developers using Windows must be careful to not commit the deletion of such files, Which is very inconvenient. Author: qiping.lqp Closes #2191 from chouqin/querytest and squashes the following commits: 0e943a1 [qiping.lqp] rename golden file 60a863f [qiping.lqp] TestcaseName in createQueryTest should not contain ":" --- ...y when query Hive table-0-5d14d21a239daa42b086cc895215009a} | 0 .../apache/spark/sql/hive/execution/HiveComparisonTest.scala | 3 +++ .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) rename sql/hive/src/test/resources/golden/{case sensitivity: Hive table-0-5d14d21a239daa42b086cc895215009a => case sensitivity when query Hive table-0-5d14d21a239daa42b086cc895215009a} (100%) diff --git a/sql/hive/src/test/resources/golden/case sensitivity: Hive table-0-5d14d21a239daa42b086cc895215009a b/sql/hive/src/test/resources/golden/case sensitivity when query Hive table-0-5d14d21a239daa42b086cc895215009a similarity index 100% rename from sql/hive/src/test/resources/golden/case sensitivity: Hive table-0-5d14d21a239daa42b086cc895215009a rename to sql/hive/src/test/resources/golden/case sensitivity when query Hive table-0-5d14d21a239daa42b086cc895215009a diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 502ce8fb297e9..671c3b162f875 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -195,6 +195,9 @@ abstract class HiveComparisonTest val installHooksCommand = "(?i)SET.*hooks".r def createQueryTest(testCaseName: String, sql: String, reset: Boolean = true) { + // testCaseName must not contain ':', which is not allowed to appear in a filename of Windows + assert(!testCaseName.contains(":")) + // If test sharding is enable, skip tests that are not in the correct shard. shardInfo.foreach { case (shardId, numShards) if testCaseName.hashCode % numShards != shardId => return diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 6d925e56e6838..c4abb3eb4861f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -309,7 +309,7 @@ class HiveQuerySuite extends HiveComparisonTest { } } - createQueryTest("case sensitivity: Hive table", + createQueryTest("case sensitivity when query Hive table", "SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15") test("case sensitivity: registered table") { From 98ddbe6cdbe4141df3d008dcb675ecd682c97492 Mon Sep 17 00:00:00 2001 From: Zdenek Farana Date: Fri, 29 Aug 2014 15:39:15 -0700 Subject: [PATCH 09/15] [SPARK-3173][SQL] Timestamp support in the parser If you have a table with TIMESTAMP column, that column can't be used in WHERE clause properly - it is not evaluated properly. [More](https://issues.apache.org/jira/browse/SPARK-3173) Motivation: http://www.aproint.com/aggregation-with-spark-sql/ - [x] modify SqlParser so it supports casting to TIMESTAMP (workaround for item 2) - [x] the string literal should be converted into Timestamp if the column is Timestamp. Author: Zdenek Farana Author: Zdenek Farana Closes #2084 from byF/SPARK-3173 and squashes the following commits: 442b59d [Zdenek Farana] Fixed test merge conflict 2dbf4f6 [Zdenek Farana] Merge remote-tracking branch 'origin/SPARK-3173' into SPARK-3173 65b6215 [Zdenek Farana] Fixed timezone sensitivity in the test 47b27b4 [Zdenek Farana] Now works in the case of "StringLiteral=TimestampColumn" 96a661b [Zdenek Farana] Code style change 491dfcf [Zdenek Farana] Added test cases for SPARK-3173 4446b1e [Zdenek Farana] A string literal is casted into Timestamp when the column is Timestamp. 59af397 [Zdenek Farana] Added a new TIMESTAMP keyword; CAST to TIMESTAMP now can be used in SQL expression. --- .../apache/spark/sql/catalyst/SqlParser.scala | 3 +- .../catalyst/analysis/HiveTypeCoercion.scala | 10 +++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 43 ++++++++++++++++++- 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 4f166c06b6997..a88bd859fc85e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -114,6 +114,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val STRING = Keyword("STRING") protected val SUM = Keyword("SUM") protected val TABLE = Keyword("TABLE") + protected val TIMESTAMP = Keyword("TIMESTAMP") protected val TRUE = Keyword("TRUE") protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") @@ -359,7 +360,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { literal protected lazy val dataType: Parser[DataType] = - STRING ^^^ StringType + STRING ^^^ StringType | TIMESTAMP ^^^ TimestampType } class SqlLexical(val keywords: Seq[String]) extends StdLexical { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index ecfcd62d2063f..d6758eb5b6a32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -218,11 +218,21 @@ trait HiveTypeCoercion { case a: BinaryArithmetic if a.right.dataType == StringType => a.makeCopy(Array(a.left, Cast(a.right, DoubleType))) + case p: BinaryPredicate if p.left.dataType == StringType + && p.right.dataType == TimestampType => + p.makeCopy(Array(Cast(p.left, TimestampType), p.right)) + case p: BinaryPredicate if p.left.dataType == TimestampType + && p.right.dataType == StringType => + p.makeCopy(Array(p.left, Cast(p.right, TimestampType))) + case p: BinaryPredicate if p.left.dataType == StringType && p.right.dataType != StringType => p.makeCopy(Array(Cast(p.left, DoubleType), p.right)) case p: BinaryPredicate if p.left.dataType != StringType && p.right.dataType == StringType => p.makeCopy(Array(p.left, Cast(p.right, DoubleType))) + case i @ In(a,b) if a.dataType == TimestampType && b.forall(_.dataType == StringType) => + i.makeCopy(Array(a,b.map(Cast(_,TimestampType)))) + case Sum(e) if e.dataType == StringType => Sum(Cast(e, DoubleType)) case Average(e) if e.dataType == StringType => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4047bc0672bbb..1ac205937714c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,15 +19,28 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.test._ +import org.scalatest.BeforeAndAfterAll +import java.util.TimeZone /* Implicits */ import TestSQLContext._ import TestData._ -class SQLQuerySuite extends QueryTest { +class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { // Make sure the tables are loaded. TestData + var origZone: TimeZone = _ + override protected def beforeAll() { + origZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + } + + override protected def afterAll() { + TimeZone.setDefault(origZone) + } + + test("SPARK-2041 column name equals tablename") { checkAnswer( sql("SELECT tableName FROM tableName"), @@ -63,6 +76,34 @@ class SQLQuerySuite extends QueryTest { "st") } + test("SPARK-3173 Timestamp support in the parser") { + checkAnswer(sql( + "SELECT time FROM timestamps WHERE time=CAST('1970-01-01 00:00:00.001' AS TIMESTAMP)"), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))) + + checkAnswer(sql( + "SELECT time FROM timestamps WHERE time='1970-01-01 00:00:00.001'"), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))) + + checkAnswer(sql( + "SELECT time FROM timestamps WHERE '1970-01-01 00:00:00.001'=time"), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))) + + checkAnswer(sql( + """SELECT time FROM timestamps WHERE time<'1970-01-01 00:00:00.003' + AND time>'1970-01-01 00:00:00.001'"""), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.002")))) + + checkAnswer(sql( + "SELECT time FROM timestamps WHERE time IN ('1970-01-01 00:00:00.001','1970-01-01 00:00:00.002')"), + Seq(Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")), + Seq(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.002")))) + + checkAnswer(sql( + "SELECT time FROM timestamps WHERE time='123'"), + Nil) + } + test("index into array") { checkAnswer( sql("SELECT data, data[0], data[0] + data[1], data[0 + 1] FROM arrayData"), From 13901764f4e9ed3de03e420d88ab42bdce5d5140 Mon Sep 17 00:00:00 2001 From: wangfei Date: Fri, 29 Aug 2014 17:37:15 -0700 Subject: [PATCH 10/15] [SPARK-3296][mllib] spark-example should be run-example in head notation of DenseKMeans and SparseNaiveBayes `./bin/spark-example` should be `./bin/run-example` in DenseKMeans and SparseNaiveBayes Author: wangfei Closes #2193 from scwf/run-example and squashes the following commits: 207eb3a [wangfei] spark-example should be run-example 27a8999 [wangfei] ./bin/spark-example should be ./bin/run-example --- .../scala/org/apache/spark/examples/mllib/DenseKMeans.scala | 2 +- .../org/apache/spark/examples/mllib/SparseNaiveBayes.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala index f96bc1bf00b92..89dfa26c2299c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala @@ -27,7 +27,7 @@ import org.apache.spark.mllib.linalg.Vectors /** * An example k-means app. Run with * {{{ - * ./bin/spark-example org.apache.spark.examples.mllib.DenseKMeans [options] + * ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] * }}} * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala index 88acd9dbb0878..952fa2a5109a4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala @@ -27,7 +27,7 @@ import org.apache.spark.mllib.util.MLUtils /** * An example naive Bayes app. Run with * {{{ - * ./bin/spark-example org.apache.spark.examples.mllib.SparseNaiveBayes [options] + * ./bin/run-example org.apache.spark.examples.mllib.SparseNaiveBayes [options] * }}} * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ From 32b18dd52cf8920903819f23e406271ecd8ac6bb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 29 Aug 2014 18:16:47 -0700 Subject: [PATCH 11/15] [SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions Author: Cheng Lian Closes #2213 from liancheng/spark-3320 and squashes the following commits: 45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite f67067d [Cheng Lian] Fixed SPARK-3320 --- .../columnar/InMemoryColumnarTableScan.scala | 49 +++++++------------ .../scala/org/apache/spark/sql/TestData.scala | 5 ++ .../columnar/InMemoryColumnarQuerySuite.scala | 19 +++++-- 3 files changed, 39 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index bc36bacd00b13..cb055cd74a5e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan( override def execute() = { relation.cachedColumnBuffers.mapPartitions { iterator => // Find the ordinals of the requested columns. If none are requested, use the first. - val requestedColumns = - if (attributes.isEmpty) { - Seq(0) - } else { - attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) - } - - new Iterator[Row] { - private[this] var columnBuffers: Array[ByteBuffer] = null - private[this] var columnAccessors: Seq[ColumnAccessor] = null - nextBatch() - - private[this] val nextRow = new GenericMutableRow(columnAccessors.length) - - def nextBatch() = { - columnBuffers = iterator.next() - columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_)) - } + val requestedColumns = if (attributes.isEmpty) { + Seq(0) + } else { + attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) + } - override def next() = { - if (!columnAccessors.head.hasNext) { - nextBatch() - } + iterator + .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_))) + .flatMap { columnAccessors => + val nextRow = new GenericMutableRow(columnAccessors.length) + new Iterator[Row] { + override def next() = { + var i = 0 + while (i < nextRow.length) { + columnAccessors(i).extractTo(nextRow, i) + i += 1 + } + nextRow + } - var i = 0 - while (i < nextRow.length) { - columnAccessors(i).extractTo(nextRow, i) - i += 1 + override def hasNext = columnAccessors.head.hasNext } - nextRow } - - override def hasNext = columnAccessors.head.hasNext || iterator.hasNext - } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index c3ec82fb69778..eb33a61c6e811 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -151,4 +151,9 @@ object TestData { TimestampField(new Timestamp(i)) }) timestamps.registerTempTable("timestamps") + + case class IntField(i: Int) + // An RDD with 4 elements and 8 partitions + val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8) + withEmptyParts.registerTempTable("withEmptyParts") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index fdd2799a53268..0e3c67f5eed29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.columnar -import org.apache.spark.sql.{QueryTest, TestData} import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.execution.SparkLogicalPlan import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.{SQLConf, QueryTest, TestData} class InMemoryColumnarQuerySuite extends QueryTest { - import TestData._ - import TestSQLContext._ + import org.apache.spark.sql.TestData._ + import org.apache.spark.sql.test.TestSQLContext._ test("simple columnar query") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan @@ -93,4 +92,16 @@ class InMemoryColumnarQuerySuite extends QueryTest { sql("SELECT time FROM timestamps"), timestamps.collect().toSeq) } + + test("SPARK-3320 regression: batched column buffer building should work with empty partitions") { + checkAnswer( + sql("SELECT * FROM withEmptyParts"), + withEmptyParts.collect().toSeq) + + TestSQLContext.cacheTable("withEmptyParts") + + checkAnswer( + sql("SELECT * FROM withEmptyParts"), + withEmptyParts.collect().toSeq) + } } From a004a8d879a85af3be0aefa3f331116d4aabb1e4 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 29 Aug 2014 22:24:35 -0700 Subject: [PATCH 12/15] BUILD: Adding back CDH4 as per user requests --- dev/create-release/create-release.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 7549fbbe66654..281e8d4de6d71 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -118,6 +118,7 @@ make_binary_release() { } make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4" & +make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Pyarn" & make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Pyarn" & make_binary_release "hadoop2.4-without-hive" "-Phadoop-2.4 -Pyarn" & From 7e662af332beb171dc89027a2562d0949d69cfa0 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 29 Aug 2014 22:52:32 -0700 Subject: [PATCH 13/15] [SPARK-3305] Remove unused import from UI classes. Author: Kousuke Saruta Closes #2200 from sarutak/SPARK-3305 and squashes the following commits: 3cbd6ee [Kousuke Saruta] Removed unused import from classes related to UI --- .../src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 1 - .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 02df4e8fe61af..b0e3bb3b552fd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -21,7 +21,6 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.storage.StorageLevel import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index f7f918fd521a9..eaeb861f59e5a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable.{HashMap, ListBuffer, Map} +import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi From acea92806c91535162a9fdcb1cce579e7f1f91c7 Mon Sep 17 00:00:00 2001 From: Raymond Liu Date: Fri, 29 Aug 2014 23:05:18 -0700 Subject: [PATCH 14/15] [SPARK-2288] Hide ShuffleBlockManager behind ShuffleManager By Hiding the shuffleblockmanager behind Shufflemanager, we decouple the shuffle data's block mapping management work from Diskblockmananger. This give a more clear interface and more easy for other shuffle manager to implement their own block management logic. the jira ticket have more details. Author: Raymond Liu Closes #1241 from colorant/shuffle and squashes the following commits: 0e01ae3 [Raymond Liu] Move ShuffleBlockmanager behind shuffleManager --- .../FileShuffleBlockManager.scala} | 81 +++++----- .../shuffle/IndexShuffleBlockManager.scala | 121 +++++++++++++++ .../spark/shuffle/ShuffleBlockManager.scala | 38 +++++ .../apache/spark/shuffle/ShuffleManager.scala | 9 +- .../shuffle/hash/HashShuffleManager.scala | 18 ++- .../shuffle/hash/HashShuffleWriter.scala | 10 +- .../shuffle/sort/SortShuffleManager.scala | 49 +++--- .../shuffle/sort/SortShuffleWriter.scala | 32 +--- .../spark/storage/BlockFetcherIterator.scala | 2 +- .../org/apache/spark/storage/BlockId.scala | 8 + .../apache/spark/storage/BlockManager.scala | 22 ++- .../storage/BlockManagerSlaveActor.scala | 4 +- .../spark/storage/DiskBlockManager.scala | 34 +---- .../org/apache/spark/storage/DiskStore.scala | 41 +++-- .../spark/storage/TachyonBlockManager.scala | 4 +- .../util/collection/ExternalSorter.scala | 38 +---- .../hash/HashShuffleManagerSuite.scala | 111 ++++++++++++++ .../storage/BlockFetcherIteratorSuite.scala | 36 ++--- .../spark/storage/BlockManagerSuite.scala | 7 +- .../spark/storage/DiskBlockManagerSuite.scala | 143 +----------------- project/MimaExcludes.scala | 2 + .../spark/tools/StoragePerfTester.scala | 9 +- 22 files changed, 466 insertions(+), 353 deletions(-) rename core/src/main/scala/org/apache/spark/{storage/ShuffleBlockManager.scala => shuffle/FileShuffleBlockManager.scala} (83%) create mode 100644 core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala create mode 100644 core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala create mode 100644 core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala similarity index 83% rename from core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala rename to core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index b8f5d3a5b02aa..76e3932a9bb91 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.spark.storage +package org.apache.spark.shuffle import java.io.File +import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ -import org.apache.spark.Logging +import org.apache.spark.{SparkEnv, SparkConf, Logging} +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.ShuffleManager -import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup +import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup +import org.apache.spark.storage._ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} -import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.executor.ShuffleWriteMetrics /** A group of writers for a ShuffleMapTask, one writer per reducer. */ private[spark] trait ShuffleWriterGroup { @@ -61,20 +61,18 @@ private[spark] trait ShuffleWriterGroup { * each block stored in each file. In order to find the location of a shuffle block, we search the * files within a ShuffleFileGroups associated with the block's reducer. */ -// TODO: Factor this into a separate class for each ShuffleManager implementation + private[spark] -class ShuffleBlockManager(blockManager: BlockManager, - shuffleManager: ShuffleManager) extends Logging { - def conf = blockManager.conf +class FileShuffleBlockManager(conf: SparkConf) + extends ShuffleBlockManager with Logging { + + private lazy val blockManager = SparkEnv.get.blockManager // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. - val consolidateShuffleFiles = + private val consolidateShuffleFiles = conf.getBoolean("spark.shuffle.consolidateFiles", false) - // Are we using sort-based shuffle? - val sortBasedShuffle = shuffleManager.isInstanceOf[SortShuffleManager] - private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 /** @@ -93,22 +91,11 @@ class ShuffleBlockManager(blockManager: BlockManager, val completedMapTasks = new ConcurrentLinkedQueue[Int]() } - type ShuffleId = Int private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf) - /** - * Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle - * because it just writes a single file by itself. - */ - def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = { - shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) - val shuffleState = shuffleStates(shuffleId) - shuffleState.completedMapTasks.add(mapId) - } - /** * Get a ShuffleWriterGroup for the given map task, which will register it as complete * when the writers are closed successfully @@ -181,17 +168,30 @@ class ShuffleBlockManager(blockManager: BlockManager, /** * Returns the physical file segment in which the given BlockId is located. - * This function should only be called if shuffle file consolidation is enabled, as it is - * an error condition if we don't find the expected block. */ - def getBlockLocation(id: ShuffleBlockId): FileSegment = { - // Search all file groups associated with this shuffle. - val shuffleState = shuffleStates(id.shuffleId) - for (fileGroup <- shuffleState.allFileGroups) { - val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) - if (segment.isDefined) { return segment.get } + private def getBlockLocation(id: ShuffleBlockId): FileSegment = { + if (consolidateShuffleFiles) { + // Search all file groups associated with this shuffle. + val shuffleState = shuffleStates(id.shuffleId) + val iter = shuffleState.allFileGroups.iterator + while (iter.hasNext) { + val segment = iter.next.getFileSegmentFor(id.mapId, id.reduceId) + if (segment.isDefined) { return segment.get } + } + throw new IllegalStateException("Failed to find shuffle block: " + id) + } else { + val file = blockManager.diskBlockManager.getFile(id) + new FileSegment(file, 0, file.length()) } - throw new IllegalStateException("Failed to find shuffle block: " + id) + } + + override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = { + val segment = getBlockLocation(blockId) + blockManager.diskStore.getBytes(segment) + } + + override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = { + Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])) } /** Remove all the blocks / files and metadata related to a particular shuffle. */ @@ -207,14 +207,7 @@ class ShuffleBlockManager(blockManager: BlockManager, private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = { shuffleStates.get(shuffleId) match { case Some(state) => - if (sortBasedShuffle) { - // There's a single block ID for each map, plus an index file for it - for (mapId <- state.completedMapTasks) { - val blockId = new ShuffleBlockId(shuffleId, mapId, 0) - blockManager.diskBlockManager.getFile(blockId).delete() - blockManager.diskBlockManager.getFile(blockId.name + ".index").delete() - } - } else if (consolidateShuffleFiles) { + if (consolidateShuffleFiles) { for (fileGroup <- state.allFileGroups; file <- fileGroup.files) { file.delete() } @@ -240,13 +233,13 @@ class ShuffleBlockManager(blockManager: BlockManager, shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) } - def stop() { + override def stop() { metadataCleaner.cancel() } } private[spark] -object ShuffleBlockManager { +object FileShuffleBlockManager { /** * A group of shuffle files, one per reducer. * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala new file mode 100644 index 0000000000000..8bb9efc46cc58 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.SparkEnv +import org.apache.spark.storage._ + +/** + * Create and maintain the shuffle blocks' mapping between logic block and physical file location. + * Data of shuffle blocks from the same map task are stored in a single consolidated data file. + * The offsets of the data blocks in the data file are stored in a separate index file. + * + * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data" + * as the filename postfix for data file, and ".index" as the filename postfix for index file. + * + */ +private[spark] +class IndexShuffleBlockManager extends ShuffleBlockManager { + + private lazy val blockManager = SparkEnv.get.blockManager + + /** + * Mapping to a single shuffleBlockId with reduce ID 0. + * */ + def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = { + ShuffleBlockId(shuffleId, mapId, 0) + } + + def getDataFile(shuffleId: Int, mapId: Int): File = { + blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0)) + } + + private def getIndexFile(shuffleId: Int, mapId: Int): File = { + blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0)) + } + + /** + * Remove data file and index file that contain the output data from one map. + * */ + def removeDataByMap(shuffleId: Int, mapId: Int): Unit = { + var file = getDataFile(shuffleId, mapId) + if (file.exists()) { + file.delete() + } + + file = getIndexFile(shuffleId, mapId) + if (file.exists()) { + file.delete() + } + } + + /** + * Write an index file with the offsets of each block, plus a final offset at the end for the + * end of the output file. This will be used by getBlockLocation to figure out where each block + * begins and ends. + * */ + def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]) = { + val indexFile = getIndexFile(shuffleId, mapId) + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) + try { + // We take in lengths of each block, need to convert it to offsets. + var offset = 0L + out.writeLong(offset) + + for (length <- lengths) { + offset += length + out.writeLong(offset) + } + } finally { + out.close() + } + } + + /** + * Get the location of a block in a map output file. Uses the index file we create for it. + * */ + private def getBlockLocation(blockId: ShuffleBlockId): FileSegment = { + // The block is actually going to be a range of a single map output file for this map, so + // find out the consolidated file, then the offset within that from our index + val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) + + val in = new DataInputStream(new FileInputStream(indexFile)) + try { + in.skip(blockId.reduceId * 8) + val offset = in.readLong() + val nextOffset = in.readLong() + new FileSegment(getDataFile(blockId.shuffleId, blockId.mapId), offset, nextOffset - offset) + } finally { + in.close() + } + } + + override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = { + val segment = getBlockLocation(blockId) + blockManager.diskStore.getBytes(segment) + } + + override def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] = { + Left(getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])) + } + + override def stop() = {} +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala new file mode 100644 index 0000000000000..4240580250046 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.nio.ByteBuffer + +import org.apache.spark.storage.{FileSegment, ShuffleBlockId} + +private[spark] +trait ShuffleBlockManager { + type ShuffleId = Int + + /** + * Get shuffle block data managed by the local ShuffleBlockManager. + * @return Some(ByteBuffer) if block found, otherwise None. + */ + def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] + + def getBlockData(blockId: ShuffleBlockId): Either[FileSegment, ByteBuffer] + + def stop(): Unit +} + diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 9c859b8b4a118..801ae54086053 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -49,8 +49,13 @@ private[spark] trait ShuffleManager { endPartition: Int, context: TaskContext): ShuffleReader[K, C] - /** Remove a shuffle's metadata from the ShuffleManager. */ - def unregisterShuffle(shuffleId: Int) + /** + * Remove a shuffle's metadata from the ShuffleManager. + * @return true if the metadata removed successfully, otherwise false. + */ + def unregisterShuffle(shuffleId: Int): Boolean + + def shuffleBlockManager: ShuffleBlockManager /** Shut down this ShuffleManager. */ def stop(): Unit diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala index df98d18fa8193..62e0629b34400 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala @@ -25,6 +25,9 @@ import org.apache.spark.shuffle._ * mapper (possibly reusing these across waves of tasks). */ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager { + + private val fileShuffleBlockManager = new FileShuffleBlockManager(conf) + /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ override def registerShuffle[K, V, C]( shuffleId: Int, @@ -49,12 +52,21 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) : ShuffleWriter[K, V] = { - new HashShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + new HashShuffleWriter( + shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) } /** Remove a shuffle's metadata from the ShuffleManager. */ - override def unregisterShuffle(shuffleId: Int): Unit = {} + override def unregisterShuffle(shuffleId: Int): Boolean = { + shuffleBlockManager.removeShuffle(shuffleId) + } + + override def shuffleBlockManager: FileShuffleBlockManager = { + fileShuffleBlockManager + } /** Shut down this ShuffleManager. */ - override def stop(): Unit = {} + override def stop(): Unit = { + shuffleBlockManager.stop() + } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 51e454d9313c9..4b9454d75abb7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -17,14 +17,15 @@ package org.apache.spark.shuffle.hash -import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter} -import org.apache.spark.{Logging, MapOutputTracker, SparkEnv, TaskContext} -import org.apache.spark.storage.{BlockObjectWriter} -import org.apache.spark.serializer.Serializer +import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle._ +import org.apache.spark.storage.BlockObjectWriter private[spark] class HashShuffleWriter[K, V]( + shuffleBlockManager: FileShuffleBlockManager, handle: BaseShuffleHandle[K, V, _], mapId: Int, context: TaskContext) @@ -43,7 +44,6 @@ private[spark] class HashShuffleWriter[K, V]( metrics.shuffleWriteMetrics = Some(writeMetrics) private val blockManager = SparkEnv.get.blockManager - private val shuffleBlockManager = blockManager.shuffleBlockManager private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, writeMetrics) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 6dcca47ea7c0c..b727438ae7e47 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -17,14 +17,17 @@ package org.apache.spark.shuffle.sort -import java.io.{DataInputStream, FileInputStream} +import java.util.concurrent.ConcurrentHashMap +import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency} import org.apache.spark.shuffle._ -import org.apache.spark.{TaskContext, ShuffleDependency} import org.apache.spark.shuffle.hash.HashShuffleReader -import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} -private[spark] class SortShuffleManager extends ShuffleManager { +private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager { + + private val indexShuffleBlockManager = new IndexShuffleBlockManager() + private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]() + /** * Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ @@ -52,29 +55,29 @@ private[spark] class SortShuffleManager extends ShuffleManager { /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) : ShuffleWriter[K, V] = { - new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]] + shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps) + new SortShuffleWriter( + shuffleBlockManager, baseShuffleHandle, mapId, context) } /** Remove a shuffle's metadata from the ShuffleManager. */ - override def unregisterShuffle(shuffleId: Int): Unit = {} + override def unregisterShuffle(shuffleId: Int): Boolean = { + if (shuffleMapNumber.containsKey(shuffleId)) { + val numMaps = shuffleMapNumber.remove(shuffleId) + (0 until numMaps).map{ mapId => + shuffleBlockManager.removeDataByMap(shuffleId, mapId) + } + } + true + } - /** Shut down this ShuffleManager. */ - override def stop(): Unit = {} + override def shuffleBlockManager: IndexShuffleBlockManager = { + indexShuffleBlockManager + } - /** Get the location of a block in a map output file. Uses the index file we create for it. */ - def getBlockLocation(blockId: ShuffleBlockId, diskManager: DiskBlockManager): FileSegment = { - // The block is actually going to be a range of a single map output file for this map, so - // figure out the ID of the consolidated file, then the offset within that from our index - val consolidatedId = blockId.copy(reduceId = 0) - val indexFile = diskManager.getFile(consolidatedId.name + ".index") - val in = new DataInputStream(new FileInputStream(indexFile)) - try { - in.skip(blockId.reduceId * 8) - val offset = in.readLong() - val nextOffset = in.readLong() - new FileSegment(diskManager.getFile(consolidatedId), offset, nextOffset - offset) - } finally { - in.close() - } + /** Shut down this ShuffleManager. */ + override def stop(): Unit = { + shuffleBlockManager.stop() } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index b8c9ad46ab035..89a78d6982ba0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -17,29 +17,25 @@ package org.apache.spark.shuffle.sort -import java.io.File - import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus -import org.apache.spark.shuffle.{ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle} import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter private[spark] class SortShuffleWriter[K, V, C]( + shuffleBlockManager: IndexShuffleBlockManager, handle: BaseShuffleHandle[K, V, C], mapId: Int, context: TaskContext) extends ShuffleWriter[K, V] with Logging { private val dep = handle.dependency - private val numPartitions = dep.partitioner.numPartitions private val blockManager = SparkEnv.get.blockManager private var sorter: ExternalSorter[K, V, _] = null - private var outputFile: File = null - private var indexFile: File = null // Are we in the process of stopping? Because map tasks can call stop() with success = true // and then call stop() with success = false if they get an exception, we want to make sure @@ -69,17 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C]( sorter.insertAll(records) } - // Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later - // serve different ranges of this file using an index file that we create at the end. - val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0) - - outputFile = blockManager.diskBlockManager.getFile(blockId) - indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index") - - val partitionLengths = sorter.writePartitionedFile(blockId, context) - - // Register our map output with the ShuffleBlockManager, which handles cleaning it over time - blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions) + val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) + val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) + val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) + shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = new MapStatus(blockManager.blockManagerId, partitionLengths.map(MapOutputTracker.compressSize)) @@ -95,13 +84,8 @@ private[spark] class SortShuffleWriter[K, V, C]( if (success) { return Option(mapStatus) } else { - // The map task failed, so delete our output file if we created one - if (outputFile != null) { - outputFile.delete() - } - if (indexFile != null) { - indexFile.delete() - } + // The map task failed, so delete our output data. + shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId) return None } } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index d07e6a1b1836c..e35b7fe62c753 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -197,7 +197,7 @@ object BlockFetcherIterator { for (id <- localBlocksToFetch) { try { readMetrics.localBlocksFetched += 1 - results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get)) + results.put(new FetchResult(id, 0, () => getLocalShuffleFromDisk(id, serializer).get)) logDebug("Got local block " + id) } catch { case e: Exception => { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index c1756ac905417..a83a3f468ae5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -58,6 +58,11 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends Blo def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId } +@DeveloperApi +case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { + def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" +} + @DeveloperApi case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" @@ -92,6 +97,7 @@ private[spark] case class TestBlockId(id: String) extends BlockId { object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r + val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r @@ -104,6 +110,8 @@ object BlockId { RDDBlockId(rddId.toInt, splitIndex.toInt) case SHUFFLE(shuffleId, mapId, reduceId) => ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case SHUFFLE_DATA(shuffleId, mapId, reduceId) => + ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) case BROADCAST(broadcastId, field) => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index cfe5b6c50aea2..a714142763243 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -64,8 +64,8 @@ private[spark] class BlockManager( extends BlockDataProvider with Logging { private val port = conf.getInt("spark.blockManager.port", 0) - val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager) - val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) + + val diskBlockManager = new DiskBlockManager(this, conf) val connectionManager = new ConnectionManager(port, conf, securityManager, "Connection manager for block manager") @@ -83,7 +83,7 @@ private[spark] class BlockManager( val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998") val tachyonBlockManager = - new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, tachyonMaster) + new TachyonBlockManager(this, tachyonStorePath, tachyonMaster) tachyonInitialized = true new TachyonStore(this, tachyonBlockManager) } @@ -215,7 +215,7 @@ private[spark] class BlockManager( override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] = { val bid = BlockId(blockId) if (bid.isShuffle) { - Left(diskBlockManager.getBlockLocation(bid)) + shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId]) } else { val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] if (blockBytesOpt.isDefined) { @@ -333,8 +333,14 @@ private[spark] class BlockManager( * shuffle blocks. It is safe to do so without a lock on block info since disk store * never deletes (recent) items. */ - def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - diskStore.getValues(blockId, serializer).orElse { + def getLocalShuffleFromDisk( + blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { + + val shuffleBlockManager = shuffleManager.shuffleBlockManager + val values = shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]).map( + bytes => this.dataDeserialize(blockId, bytes, serializer)) + + values.orElse { throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be") } } @@ -355,7 +361,8 @@ private[spark] class BlockManager( // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { - diskStore.getBytes(blockId) match { + val shuffleBlockManager = shuffleManager.shuffleBlockManager + shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match { case Some(bytes) => Some(bytes) case None => @@ -1045,7 +1052,6 @@ private[spark] class BlockManager( def stop(): Unit = { connectionManager.stop() - shuffleBlockManager.stop() diskBlockManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index c194e0fed3367..14ae2f38c5670 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -21,7 +21,7 @@ import scala.concurrent.Future import akka.actor.{ActorRef, Actor} -import org.apache.spark.{Logging, MapOutputTracker} +import org.apache.spark.{Logging, MapOutputTracker, SparkEnv} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.ActorLogReceive @@ -55,7 +55,7 @@ class BlockManagerSlaveActor( if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } - blockManager.shuffleBlockManager.removeShuffle(shuffleId) + SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } case RemoveBroadcast(broadcastId, tellMaster) => diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ec022ce9c048a..a715594f198c2 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -21,11 +21,9 @@ import java.io.File import java.text.SimpleDateFormat import java.util.{Date, Random, UUID} -import org.apache.spark.{SparkConf, SparkEnv, Logging} +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.network.netty.PathResolver import org.apache.spark.util.Utils -import org.apache.spark.shuffle.sort.SortShuffleManager /** * Creates and maintains the logical mapping between logical blocks and physical on-disk @@ -36,13 +34,11 @@ import org.apache.spark.shuffle.sort.SortShuffleManager * Block files are hashed among the directories listed in spark.local.dir (or in * SPARK_LOCAL_DIRS, if it's set). */ -private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf) - extends PathResolver with Logging { +private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf) + extends Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - - private val subDirsPerLocalDir = - shuffleBlockManager.conf.getInt("spark.diskStore.subDirectories", 64) + private val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64) /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid @@ -56,26 +52,6 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, addShutdownHook() - /** - * Returns the physical file segment in which the given BlockId is located. If the BlockId has - * been mapped to a specific FileSegment by the shuffle layer, that will be returned. - * Otherwise, we assume the Block is mapped to the whole file identified by the BlockId. - */ - def getBlockLocation(blockId: BlockId): FileSegment = { - val env = SparkEnv.get // NOTE: can be null in unit tests - if (blockId.isShuffle && env != null && env.shuffleManager.isInstanceOf[SortShuffleManager]) { - // For sort-based shuffle, let it figure out its blocks - val sortShuffleManager = env.shuffleManager.asInstanceOf[SortShuffleManager] - sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this) - } else if (blockId.isShuffle && shuffleBlockManager.consolidateShuffleFiles) { - // For hash-based shuffle with consolidated files, ShuffleBlockManager takes care of this - shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) - } else { - val file = getFile(blockId.name) - new FileSegment(file, 0, file.length()) - } - } - def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) @@ -105,7 +81,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getBlockLocation(blockId).file.exists() + getFile(blockId.name).exists() } /** List all the files currently stored on disk by the disk manager. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c83261dd91b36..e9304f6bb45d0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, RandomAccessFile} +import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode @@ -34,7 +34,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) override def getSize(blockId: BlockId): Long = { - diskManager.getBlockLocation(blockId).length + diskManager.getFile(blockId.name).length } override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { @@ -89,25 +89,33 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } } - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val segment = diskManager.getBlockLocation(blockId) - val channel = new RandomAccessFile(segment.file, "r").getChannel + private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { + val channel = new RandomAccessFile(file, "r").getChannel try { // For small files, directly read rather than memory map - if (segment.length < minMemoryMapBytes) { - val buf = ByteBuffer.allocate(segment.length.toInt) - channel.read(buf, segment.offset) + if (length < minMemoryMapBytes) { + val buf = ByteBuffer.allocate(length.toInt) + channel.read(buf, offset) buf.flip() Some(buf) } else { - Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) + Some(channel.map(MapMode.READ_ONLY, offset, length)) } } finally { channel.close() } } + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val file = diskManager.getFile(blockId.name) + getBytes(file, 0, file.length) + } + + def getBytes(segment: FileSegment): Option[ByteBuffer] = { + getBytes(segment.file, segment.offset, segment.length) + } + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } @@ -117,24 +125,25 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc * shuffle short-circuit code. */ def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { + // TODO: Should bypass getBytes and use a stream based implementation, so that + // we won't use a lot of memory during e.g. external sort merge. getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) } override def remove(blockId: BlockId): Boolean = { - val fileSegment = diskManager.getBlockLocation(blockId) - val file = fileSegment.file - if (file.exists() && file.length() == fileSegment.length) { + val file = diskManager.getFile(blockId.name) + // If consolidation mode is used With HashShuffleMananger, the physical filename for the block + // is different from blockId.name. So the file returns here will not be exist, thus we avoid to + // delete the whole consolidated file by mistake. + if (file.exists()) { file.delete() } else { - if (fileSegment.length < file.length()) { - logWarning(s"Could not delete block associated with only a part of a file: $blockId") - } false } } override def contains(blockId: BlockId): Boolean = { - val file = diskManager.getBlockLocation(blockId).file + val file = diskManager.getFile(blockId.name) file.exists() } } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index a6cbe3aa440ff..6908a59a79e60 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.Utils * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ private[spark] class TachyonBlockManager( - shuffleManager: ShuffleBlockManager, + blockManager: BlockManager, rootDirs: String, val master: String) extends Logging { @@ -49,7 +49,7 @@ private[spark] class TachyonBlockManager( private val MAX_DIR_CREATION_ATTEMPTS = 10 private val subDirsPerTachyonDir = - shuffleManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt + blockManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; // then, inside this directory, create multiple subdirectories that we will hash files into, diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 5d8a648d9551e..782b979e2e93d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -719,20 +719,20 @@ private[spark] class ExternalSorter[K, V, C]( def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) /** - * Write all the data added into this ExternalSorter into a file in the disk store, creating - * an .index file for it as well with the offsets of each partition. This is called by the - * SortShuffleWriter and can go through an efficient path of just concatenating binary files - * if we decided to avoid merge-sorting. + * Write all the data added into this ExternalSorter into a file in the disk store. This is + * called by the SortShuffleWriter and can go through an efficient path of just concatenating + * binary files if we decided to avoid merge-sorting. * * @param blockId block ID to write to. The index file will be blockId.name + ".index". * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) */ - def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = { - val outputFile = blockManager.diskBlockManager.getFile(blockId) + def writePartitionedFile( + blockId: BlockId, + context: TaskContext, + outputFile: File): Array[Long] = { // Track location of each range in the output file - val offsets = new Array[Long](numPartitions + 1) val lengths = new Array[Long](numPartitions) if (bypassMergeSort && partitionWriters != null) { @@ -750,7 +750,6 @@ private[spark] class ExternalSorter[K, V, C]( in.close() in = null lengths(i) = size - offsets(i + 1) = offsets(i) + lengths(i) } } finally { if (out != null) { @@ -772,11 +771,7 @@ private[spark] class ExternalSorter[K, V, C]( } writer.commitAndClose() val segment = writer.fileSegment() - offsets(id + 1) = segment.offset + segment.length lengths(id) = segment.length - } else { - // The partition is empty; don't create a new writer to avoid writing headers, etc - offsets(id + 1) = offsets(id) } } } @@ -784,23 +779,6 @@ private[spark] class ExternalSorter[K, V, C]( context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled context.taskMetrics.diskBytesSpilled += diskBytesSpilled - // Write an index file with the offsets of each block, plus a final offset at the end for the - // end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure - // out where each block begins and ends. - - val diskBlockManager = blockManager.diskBlockManager - val indexFile = diskBlockManager.getFile(blockId.name + ".index") - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) - try { - var i = 0 - while (i < numPartitions + 1) { - out.writeLong(offsets(i)) - i += 1 - } - } finally { - out.close() - } - lengths } @@ -811,7 +789,7 @@ private[spark] class ExternalSorter[K, V, C]( if (writer.isOpen) { writer.commitAndClose() } - blockManager.getLocalFromDisk(writer.blockId, ser).get.asInstanceOf[Iterator[Product2[K, C]]] + blockManager.diskStore.getValues(writer.blockId, ser).get.asInstanceOf[Iterator[Product2[K, C]]] } def stop(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala new file mode 100644 index 0000000000000..6061e544e79b4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.hash + +import java.io.{File, FileWriter} + +import scala.language.reflectiveCalls + +import org.scalatest.FunSuite + +import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf} +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.shuffle.FileShuffleBlockManager +import org.apache.spark.storage.{ShuffleBlockId, FileSegment} + +class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { + private val testConf = new SparkConf(false) + + private def checkSegments(segment1: FileSegment, segment2: FileSegment) { + assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath) + assert (segment1.offset === segment2.offset) + assert (segment1.length === segment2.length) + } + + test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") { + + val conf = new SparkConf(false) + // reset after EACH object write. This is to ensure that there are bytes appended after + // an object is written. So if the codepaths assume writeObject is end of data, this should + // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc. + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager") + + sc = new SparkContext("local", "test", conf) + + val shuffleBlockManager = + SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager] + + val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf), + new ShuffleWriteMetrics) + for (writer <- shuffle1.writers) { + writer.write("test1") + writer.write("test2") + } + for (writer <- shuffle1.writers) { + writer.commitAndClose() + } + + val shuffle1Segment = shuffle1.writers(0).fileSegment() + shuffle1.releaseWriters(success = true) + + val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf), + new ShuffleWriteMetrics) + + for (writer <- shuffle2.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle2.writers) { + writer.commitAndClose() + } + val shuffle2Segment = shuffle2.writers(0).fileSegment() + shuffle2.releaseWriters(success = true) + + // Now comes the test : + // Write to shuffle 3; and close it, but before registering it, check if the file lengths for + // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length + // of block based on remaining data in file : which could mess things up when there is concurrent read + // and writes happening to the same shuffle group. + + val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), + new ShuffleWriteMetrics) + for (writer <- shuffle3.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle3.writers) { + writer.commitAndClose() + } + // check before we register. + checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get) + shuffle3.releaseWriters(success = true) + checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get) + shuffleBlockManager.removeShuffle(1) + + } + + + def writeToFile(file: File, numBytes: Int) { + val writer = new FileWriter(file, true) + for (i <- 0 until numBytes) writer.write(i) + writer.close() + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala index fbfcb5156d496..3c86f6bafcaa3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala @@ -60,11 +60,11 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { } // 3rd block is going to fail - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any()) - doAnswer(answer).when(blockManager).getLocalFromDisk(meq(blIds(2)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any()) + doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any()) val bmId = BlockManagerId("test-client", "test-client", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( @@ -76,24 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. - verify(blockManager, times(0)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk. + verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") // the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined") - verify(blockManager, times(1)).getLocalFromDisk(any(), any()) + verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element") assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined") - verify(blockManager, times(2)).getLocalFromDisk(any(), any()) + verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements") // 3rd fetch should be failed intercept[Exception] { iterator.next() } - verify(blockManager, times(3)).getLocalFromDisk(any(), any()) + verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any()) } @@ -115,11 +115,11 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { val optItr = mock(classOf[Option[Iterator[Any]]]) // All blocks should be fetched successfully - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(2)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any()) val bmId = BlockManagerId("test-client", "test-client", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( @@ -131,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. - verify(blockManager, times(0)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk. + verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined") @@ -145,7 +145,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements") assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") - verify(blockManager, times(5)).getLocalFromDisk(any(), any()) + verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any()) } test("block fetch from remote fails using BasicBlockFetcherIterator") { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index bdcea07e5714f..14ffadab99cae 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -49,6 +49,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.postfixOps +import org.apache.spark.shuffle.ShuffleBlockManager class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { @@ -823,11 +824,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // be nice to refactor classes involved in disk storage in a way that // allows for easier testing. val blockManager = mock(classOf[BlockManager]) - val shuffleBlockManager = mock(classOf[ShuffleBlockManager]) - when(shuffleBlockManager.conf).thenReturn(conf) - val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) - when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString)) + val diskBlockManager = new DiskBlockManager(blockManager, conf) + val diskStoreMapped = new DiskStore(blockManager, diskBlockManager) diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY) val mapped = diskStoreMapped.getBytes(blockId).get diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index aabaeadd7a071..26082ded8ca7a 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -26,6 +26,7 @@ import scala.language.reflectiveCalls import akka.actor.Props import com.google.common.io.Files +import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.SparkConf @@ -40,18 +41,8 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before private var rootDir1: File = _ private var rootDirs: String = _ - // This suite focuses primarily on consolidation features, - // so we coerce consolidation if not already enabled. - testConf.set("spark.shuffle.consolidateFiles", "true") - - private val shuffleManager = new HashShuffleManager(testConf.clone) - - val shuffleBlockManager = new ShuffleBlockManager(null, shuffleManager) { - override def conf = testConf.clone - var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() - override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id) - } - + val blockManager = mock(classOf[BlockManager]) + when(blockManager.conf).thenReturn(testConf) var diskBlockManager: DiskBlockManager = _ override def beforeAll() { @@ -73,22 +64,17 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before override def beforeEach() { val conf = testConf.clone conf.set("spark.local.dir", rootDirs) - diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) - shuffleBlockManager.idToSegmentMap.clear() + diskBlockManager = new DiskBlockManager(blockManager, conf) } override def afterEach() { diskBlockManager.stop() - shuffleBlockManager.idToSegmentMap.clear() } test("basic block creation") { val blockId = new TestBlockId("test") - assertSegmentEquals(blockId, blockId.name, 0, 0) - val newFile = diskBlockManager.getFile(blockId) writeToFile(newFile, 10) - assertSegmentEquals(blockId, blockId.name, 0, 10) assert(diskBlockManager.containsBlock(blockId)) newFile.delete() assert(!diskBlockManager.containsBlock(blockId)) @@ -101,127 +87,6 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before assert(diskBlockManager.getAllBlocks.toSet === ids.toSet) } - test("block appending") { - val blockId = new TestBlockId("test") - val newFile = diskBlockManager.getFile(blockId) - writeToFile(newFile, 15) - assertSegmentEquals(blockId, blockId.name, 0, 15) - val newFile2 = diskBlockManager.getFile(blockId) - assert(newFile === newFile2) - writeToFile(newFile2, 12) - assertSegmentEquals(blockId, blockId.name, 0, 27) - newFile.delete() - } - - test("block remapping") { - val filename = "test" - val blockId0 = new ShuffleBlockId(1, 2, 3) - val newFile = diskBlockManager.getFile(filename) - writeToFile(newFile, 15) - shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15) - assertSegmentEquals(blockId0, filename, 0, 15) - - val blockId1 = new ShuffleBlockId(1, 2, 4) - val newFile2 = diskBlockManager.getFile(filename) - writeToFile(newFile2, 12) - shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12) - assertSegmentEquals(blockId1, filename, 15, 12) - - assert(newFile === newFile2) - newFile.delete() - } - - private def checkSegments(segment1: FileSegment, segment2: FileSegment) { - assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath) - assert (segment1.offset === segment2.offset) - assert (segment1.length === segment2.length) - } - - test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") { - - val serializer = new JavaSerializer(testConf) - val confCopy = testConf.clone - // reset after EACH object write. This is to ensure that there are bytes appended after - // an object is written. So if the codepaths assume writeObject is end of data, this should - // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc. - confCopy.set("spark.serializer.objectStreamReset", "1") - - val securityManager = new org.apache.spark.SecurityManager(confCopy) - // Do not use the shuffleBlockManager above ! - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy, - securityManager) - val master = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))), - confCopy) - val store = new BlockManager("", actorSystem, master , serializer, confCopy, - securityManager, null, shuffleManager) - - try { - - val shuffleManager = store.shuffleBlockManager - - val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer, new ShuffleWriteMetrics) - for (writer <- shuffle1.writers) { - writer.write("test1") - writer.write("test2") - } - for (writer <- shuffle1.writers) { - writer.commitAndClose() - } - - val shuffle1Segment = shuffle1.writers(0).fileSegment() - shuffle1.releaseWriters(success = true) - - val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf), - new ShuffleWriteMetrics) - - for (writer <- shuffle2.writers) { - writer.write("test3") - writer.write("test4") - } - for (writer <- shuffle2.writers) { - writer.commitAndClose() - } - val shuffle2Segment = shuffle2.writers(0).fileSegment() - shuffle2.releaseWriters(success = true) - - // Now comes the test : - // Write to shuffle 3; and close it, but before registering it, check if the file lengths for - // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length - // of block based on remaining data in file : which could mess things up when there is concurrent read - // and writes happening to the same shuffle group. - - val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), - new ShuffleWriteMetrics) - for (writer <- shuffle3.writers) { - writer.write("test3") - writer.write("test4") - } - for (writer <- shuffle3.writers) { - writer.commitAndClose() - } - // check before we register. - checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) - shuffle3.releaseWriters(success = true) - checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) - shuffleManager.removeShuffle(1) - } finally { - - if (store != null) { - store.stop() - } - actorSystem.shutdown() - actorSystem.awaitTermination() - } - } - - def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) { - val segment = diskBlockManager.getBlockLocation(blockId) - assert(segment.file.getName === filename) - assert(segment.offset === offset) - assert(segment.length === length) - } - def writeToFile(file: File, numBytes: Int) { val writer = new FileWriter(file, true) for (i <- 0 until numBytes) writer.write(i) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 300589394b96f..fe8ffe6d97a05 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -58,6 +58,8 @@ object MimaExcludes { "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.storage.DiskStore.getValues"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.storage.MemoryStore.Entry") ) ++ diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 17bf7c2541d13..db58eb642b56d 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -20,10 +20,11 @@ package org.apache.spark.tools import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.SparkContext import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.util.Utils -import org.apache.spark.executor.ShuffleWriteMetrics /** * Internal utility for micro-benchmarking shuffle write performance. @@ -50,13 +51,15 @@ object StoragePerfTester { System.setProperty("spark.shuffle.compress", "false") System.setProperty("spark.shuffle.sync", "true") + System.setProperty("spark.shuffle.manager", + "org.apache.spark.shuffle.hash.HashShuffleManager") // This is only used to instantiate a BlockManager. All thread scheduling is done manually. val sc = new SparkContext("local[4]", "Write Tester") - val blockManager = sc.env.blockManager + val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager] def writeOutputBytes(mapId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, + val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) { From d90434c03564558a4208f64e15b20009eabe3645 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 29 Aug 2014 23:21:38 -0700 Subject: [PATCH 15/15] Manually close old pull requests Closes #1824