Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task level retry and Stage level retry #261

Merged
merged 10 commits into from
Oct 2, 2022

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Sep 21, 2022

Which issue does this PR close?

Closes #140.

Rationale for this change

What changes are included in this PR?

1. Task level retry,
a) Add task attempt num, use a job level unique task_id to represent the Task
b) Define couple of failure reasons for FailedTask

pub enum FailedReason {
        #[prost(message, tag="4")]
        ExecutionError(super::ExecutionError),
        #[prost(message, tag="5")]
        FetchPartitionError(super::FetchPartitionError),
        #[prost(message, tag="6")]
        IoError(super::IoError),
        #[prost(message, tag="7")]
        ExecutorLost(super::ExecutorLost),
        /// A successful task's result is lost due to executor lost
        #[prost(message, tag="8")]
        ResultLost(super::ResultLost),
        #[prost(message, tag="9")]
        TaskKilled(super::TaskKilled),
    }

c) Reasoning the real task failure reason when return the TaskStatus back to Scheduler
d) Based on the failure reason, scheduler decide to reschedule the task and bump task attempt number of failed task.

2. Stage level retry and shuffle read failure handling
a) Add stage attempt num.
b) When there are shuffle partition fetch failures, the current running stage will be rolled back and the map stages will be resubmit.
c) handling of delayed fetch failure task updates
d) Cancel the running tasks if Scheduler decide to fail the stage/job.

And below two items are not covered in this PR and we may need to revisit the related logic in future:

  1. If the plans/expressions in the Stage is not deterministic, need to revisit the resubmit logic.
  2. If we have a map stage whose shuffle output can be reused by multiple reduce stages, need to revisit the stage retry logic.

Are there any user-facing changes?

@yahoNanJing yahoNanJing marked this pull request as draft September 22, 2022 19:15
@mingmwang mingmwang marked this pull request as ready for review September 25, 2022 13:59
@mingmwang
Copy link
Contributor Author

@thinkharderdev @andygrove @yahoNanJing
Please help to review my PR.

@mingmwang
Copy link
Contributor Author

Added 10+ UTs to cover different cases.

@mingmwang mingmwang changed the title Failed task retry Task level retry and Stage level retry Sep 25, 2022
@yahoNanJing
Copy link
Contributor

With this PR, the state machine for the stage becomes as follows:
StageStateMachine

The transitions marked in red are newly added ones to deal with error task recovery.

@yahoNanJing
Copy link
Contributor

yahoNanJing commented Sep 28, 2022

For example, one SQL stage graph snapshot is as follows:

                                      Stage 4(Resolved)
                         ↗                                                 ↘
 Stage 1(Successful)                			                        Stage 5(Unresolved)
                         ↘                                                 ↗
           		      Stage 2(Successful)  ->  Stage 3(Running) 

Both of Stage1 and Stage2 has output shuffle data resides on Executor1. Unluckily, the Executor1 gets lost. Then tasks for the running Stage3 will fail due to not able to fetch shuffle data. For this kind of scenario and error, by this PR, the ExecutionGraph will be able to continue running with proper task reset.

  1. Firstly, Stage3 will be converted from Running to Unresolved. And Stage3 will ask its dependent Stage2 to rerun related tasks to prepare its input data.
  2. Then Stage2 will be converted from Successful to Running. Unluckily, the related rerunning tasks for Stage2 will also fail due to not able to fetch shuffle data. Then Stage2 will be converted from Running to Unresolved and will ask its dependent Stage1 to rerun related tasks to prepare its input data.
  3. Then Stage1 will be converted from Successful to Running.
  4. Then Stage4 who dependent on Stage1 will be converted from Resolved to Unresolved.

The stage graph snapshot will become as follows:

                                      Stage 4(Unresolved)
                         ↗                                                 ↘
 Stage 1(Running)                			                        Stage 5(Unresolved)
                         ↘                                                 ↗
           		      Stage 2(Unresolved)  ->  Stage 3(Unresolved) 

Once Stage1 finishes successfully, the graph snapshot will become as follows:

                                      Stage 4(Resolved)
                         ↗                                                 ↘
 Stage 1(Successful)                			                        Stage 5(Unresolved)
                         ↘                                                 ↗
           		      Stage 2(Resolved)  ->  Stage 3(Unresolved) 

...

The purpose of introducing task attempt and stage attempt is as follows:

  • Stage attempt:
    For a running stage, all of the task status stored should belong to the same stage attempt. The update of task statuses with oldest attempt number will be ignored.
  • Task attempt:
    For some errors, like shuffle write IO error, they are retryable. It's feasible to reschedule this task to another executor to have a new attempt running. The attempt number will be reset to 0 if the stage starts a new attempt.

);
let mut should_ignore = true;
// handle delayed failed tasks if the stage's next attempt is still in UnResolved status.
if let Some(task_status::Status::Failed(failed_task)) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, only failed tasks will be dealt with. Once a running stage converted into an unresolved, it will discard all of its tasks info. Then to update the successful task info is meaningless.

@thinkharderdev
Copy link
Contributor

@mingmwang I'm pretty swamped this week so won't have time to review this until this weekend.

events.push(StageEvent::StageCompleted(stage_id));
// if this stage is completed, we want to combine the stage metrics to plan's metric set and print out the plan
let is_final_successful = running_stage.is_successful()
&& !reset_running_stages.contains_key(&stage_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this running stage is converted from successful stage, there may be in flight running tasks of the stages depending on this stage. When we find those tasks fails due to fetching data failure, then we also need to reset this running stage. Therefore, here, the reset_running_stages check is necessary.

@yahoNanJing
Copy link
Contributor

Later, maybe we can make this error task recovery feature configurable.

@yahoNanJing
Copy link
Contributor

The overall design of this PR is good for me and it will be really useful to deal with executor lost issue and executor bad disk issue. @thinkharderdev, @andygrove, @avantgardnerio, could you help review this PR?

@andygrove
Copy link
Member

Thanks, @mingmwang. I plan on starting to review this tomorrow.

.await
/// Partition reader Trait, different partition reader can have
#[async_trait]
trait PartitionReader: Send + Sync + Clone {
Copy link
Contributor

@yahoNanJing yahoNanJing Sep 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trait is much more extensible than conditional compiling 👍

@@ -424,54 +424,87 @@ message ExecutionGraph {
uint64 output_partitions = 5;
repeated PartitionLocation output_locations = 6;
string scheduler_id = 7;
uint32 tid_gen = 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: tid_gen isn't a descriptive name. I am guessing this is short for task_id_gen?

Copy link
Contributor Author

@mingmwang mingmwang Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will change to task_id_gen.

Comment on lines 558 to 559
uint32 map_partition_id = 1;
PartitionId partition_id = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add comments explaining these different partition ids?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add more comments in the code. Here the new added map_partition_id is the partition_id of the map stage who produce this shuffle data. But the original PartitionId partition_id has different meanings in different places.
Sometimes it stands for a task_id, and here it stands for a shuffle partition id(composition of map_stage_id + reduce partition id + job_id), a mixed up of map and reduce infos together.
So if we do not consider the backward compatibility, I would suggest to unnest this and make the PartitionLocation a plan struct like blow:

message PartitionLocation {
  string job_id = 1;
  uint32 map_stage_id = 2;
  uint32 map_partition_id = 3;
  uint32 partition_id = 4;
  ExecutorMetadata executor_meta = 5;
  PartitionStats partition_stats = 6;
  string path = 7;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is late here for me, but I think that makes sense. I will review it again tomorrow.

@andygrove
Copy link
Member

@mingmwang I have taken a first pass through this PR and I think it looks good. I am going to spend some time testing out the PR locally. It would be good to wait for Dan to review as well before we merge this.

@andygrove
Copy link
Member

I tested this locally, and it worked really well!

I ran one scheduler, and two executors and I could kill one executor and still see a query complete successfully. This is not the case in the master branch.

I am happy to approve this once feedback has been addressed.

@mingmwang
Copy link
Contributor Author

I tested this locally, and it worked really well!

I ran one scheduler, and two executors and I could kill one executor and still see a query complete successfully. This is not the case in the master branch.

I am happy to approve this once feedback has been addressed.

Thank you. We will also start the chaos-monkey testing in next month to verify the recent changes.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mingmwang. This is a very nice improvement in stability.

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a few questions but this looks really great! Thanks for your work on this @mingmwang!

Comment on lines +734 to +738
message ExecutorLost {
}

message ResultLost {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the difference between these two errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the difference between these two errors

Yes, good question. In the current code base, both the two errors are not used directly by the executor tasks.
They are used by the Scheduler. When we see a 'FetchPartitionError' task update from the reduce task, the related map task's status is changed to 'ResultLost'. Of cause most of the time ResultLost should be caused by ExecutorLost.

Comment on lines +262 to +263
retryable: true,
count_to_failures: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand retyrable and count_to_failures. If a task is retry-able wouldn't wouldn't it count to failures in all case. Conversely if it's not retry-able then we don't need to count the failure at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, have both retyrable and count_to_failures is to support the case that we might have some specific error that we want it to retry forever until it is successful.

}

#[allow(dead_code)]
// TODO
struct LocalPartitionReader {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I assume where this is going is making the executors smart enough to read partitions on the same physical machine directly from disk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I assume where this is going is making the executors smart enough to read partitions on the same physical machine directly from disk.

Yes, exactly.

task_id_gen: usize,
/// Failed stage attempts, record the failed stage attempts to limit the retry times.
/// Map from Stage ID -> Set<Stage_ATTPMPT_NUM>
failed_stage_attempts: HashMap<usize, HashSet<usize>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why we are saving a HashSet of attempts. Shouldn't it just be usize?

Copy link
Contributor Author

@mingmwang mingmwang Oct 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why we are saving a HashSet of attempts. Shouldn't it just be usize?

The purpose of using HashSet is to record what are the exact distinct failed attempts. Using a usize we will lose the detail.

@yahoNanJing
Copy link
Contributor

Thanks @mingmwang for this huge step of error recovering. Thanks @andygrove and @thinkharderdev for reviewing this PR. Since we all approved this PR, I'll merge it first.

@yahoNanJing yahoNanJing merged commit f5bfef0 into apache:master Oct 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Failed task retry
4 participants