Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2947] DAGScheduler resubmit the stage into an infinite loop #1877

Closed
wants to merge 2 commits into from

Conversation

witgo
Copy link
Contributor

@witgo witgo commented Aug 10, 2014

No description provided.

@SparkQA
Copy link

SparkQA commented Aug 10, 2014

QA tests have started for PR 1877. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18281/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 10, 2014

QA results for PR 1877:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18281/consoleFull

@mateiz
Copy link
Contributor

mateiz commented Aug 10, 2014

@witgo can you explain how this happens and why the fix works, and add a unit test for it? We can't really merge something like this without a test.

@@ -1024,31 +1024,33 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
runningStages -= failedStage
// TODO: Cancel running tasks in the stage
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mateiz
There is no cancel running tasks in the stage . When any one of the running tasks which throws an exception.The following code will be repeated.

failedStages += failedStage
failedStages += mapStage

Stage will be unnecessary resubmit by resubmitFailedStages

@witgo
Copy link
Contributor Author

witgo commented Aug 11, 2014

It takes some time to add a test for this.

@witgo witgo changed the title [WIP][SPARK-2947] DAGScheduler resubmit the stage into an infinite loop [SPARK-2947] DAGScheduler resubmit the stage into an infinite loop Aug 12, 2014
@SparkQA
Copy link

SparkQA commented Aug 12, 2014

QA tests have started for PR 1877. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18367/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 12, 2014

QA results for PR 1877:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18367/consoleFull

@davies
Copy link
Contributor

davies commented Aug 25, 2014

@witgo Could you rebase this PR onto master? There are some conflict right now.

@SparkQA
Copy link

SparkQA commented Aug 26, 2014

QA tests have started for PR 1877 at commit 3484c29.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 26, 2014

QA tests have finished for PR 1877 at commit 3484c29.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "$
    • $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "$
    • class KMeansModel (val clusterCenters: Array[Vector]) extends Serializable
    • class BoundedFloat(float):
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression
    • case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends Command

@witgo
Copy link
Contributor Author

witgo commented Aug 28, 2014

@rxin could you take a look at this PR? Thanks!

@rxin
Copy link
Contributor

rxin commented Aug 28, 2014

Can you explain what problem you are seeing?

@SparkQA
Copy link

SparkQA commented Aug 28, 2014

QA tests have started for PR 1877 at commit c4b0f91.

  • This patch merges cleanly.

@witgo
Copy link
Contributor Author

witgo commented Aug 28, 2014

SPARK-3224 is the same problem.
This PR adds some boundary judgments and removed some redundant code

@@ -472,6 +472,44 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(sparkListener.failedStages.size == 1)
}

test("run trivial shuffle with repeated fetch failure") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change this and/or the name for the test at line 438? They are currently almost identical such that it's unclear what the point of each test is.

@SparkQA
Copy link

SparkQA commented Aug 28, 2014

QA tests have finished for PR 1877 at commit c4b0f91.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@witgo witgo force-pushed the SPARK-2947 branch 2 times, most recently from 089577f to bf6f81a Compare August 29, 2014 15:55
@SparkQA
Copy link

SparkQA commented Aug 29, 2014

QA tests have started for PR 1877 at commit bf6f81a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 29, 2014

QA tests have finished for PR 1877 at commit bf6f81a.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is possible
// the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
if (runningStages.contains(failedStage) && stage.pendingTasks.contains(task)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Because there is no cancel running tasks in the stage. stage.pendingTasks.contains(task) is necessary.

@witgo
Copy link
Contributor Author

witgo commented Sep 9, 2014

screenshots:
qq20140909-1

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have started for PR 1877 at commit bf6f81a.

  • This patch merges cleanly.

if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once you put this within the conditional statement, only one executor failure will be handled for each stage. that means if there are two executor fails, the 2nd one gets ignored by the dagscheduler, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Yes, here is unnecessary modifications to processing logic, I negligence.
是的,这里处理逻辑被不必要的修改了,疏忽了.

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have finished for PR 1877 at commit bf6f81a.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • throw new IllegalStateException("The main method in the given main class must be static")
    • class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception
    • class Dummy(object):

@SparkQA
Copy link

SparkQA commented Sep 14, 2014

QA tests have started for PR 1877 at commit 958d7db.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 14, 2014

QA tests have finished for PR 1877 at commit 958d7db.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class RatingDeserializer(FramedSerializer):
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder extends compression.Encoder[IntegerType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type])
    • class Encoder extends compression.Encoder[LongType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type])

@witgo witgo closed this Dec 3, 2014
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…der (apache#1877)

Parquet footer metadata is now always read twice in vectorized parquet reader.
When the NameNode is under high pressure, it will cost time to read twice. Actually we can avoid reading the footer twice by reading all row groups in advance and filter row groups according to filters that require push down (no need to read the footer metadata again the second time).

Reduce the reading of footer in vectorized parquet reader

no

existing tests

Closes apache#39950 from yabola/skip_footer.

Authored-by: chenliang.lu <[email protected]>

Signed-off-by: Chao Sun <[email protected]>
Co-authored-by: chenliang.lu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants