-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fault tolerant scheduler 2.0 #14205
Fault tolerant scheduler 2.0 #14205
Conversation
On top of #14072, still WIP |
4482925
to
03f90be
Compare
03f90be
to
aa023f0
Compare
Benchmark results:
Detailed: https://gist.github.com/arhimondr/02ccc06a4f145fcd5f47b91ff068c013 |
aa023f0
to
214cad4
Compare
Rebased |
a3474c9
to
6f3f2e7
Compare
Ready for review |
6f3f2e7
to
56deff7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Show resolved
Hide resolved
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
public interface EventDrivenTaskSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not look like you need this interface. You only need the Callback
for tests.
Also it looks awkward to have Callback
internal to EventDrivenTaskSource
as on interface level those two are not related at all.
I would suggest to just leave Callback
as an interface; it can be moved to top-level or to StageTaskSource
implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I was trying to model it after an existing TaskSource
. But indeed, I don't think the EventDrivenTaskSourceFactory
, EventDrivenTaskSource
interfaces are needed. Going to remove them and rename:
StageEventDrivenTaskSourceFactory
->EventDrivenTaskSourceFactory
StageTaskSource
->EventDrivenTaskSource
Also going to move the Callback
to the EventDrivenTaskSource
and move the EventDrivenTaskSource
out of the StageEventDrivenTaskSourceFactory
(basically reducing the number of nested classes)
partitionUpdates = ImmutableList.copyOf(requireNonNull(partitionUpdates, "partitionUpdates is null")); | ||
} | ||
|
||
void update(EventDrivenTaskSource.Callback callback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd suggest marking constructor and update
as public
to clearly mark what is the public interface of the class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(relevant to other classes too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's the only relevant class after moving classes around and removing unnecessary interfaces. Please let me know if I missed anything.
...trino-main/src/main/java/io/trino/execution/scheduler/StageEventDrivenTaskSourceFactory.java
Outdated
Show resolved
Hide resolved
PartitionAssignment partitionAssignment = openAssignments.get(hostRequirement); | ||
long splitSizeInBytes = getSplitSizeInBytes(split); | ||
if (partitionAssignment != null && partitionAssignment.getAssignedDataSizeInBytes() + splitSizeInBytes > targetPartitionSizeInBytes) { | ||
partitionAssignment.setFull(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verify(partitionAssignment.getAssignedDataSizeInBytes() > 0)
? Or maybe there is a chance it would not be true if split reports empty size?
Maybe for connectors which misbehave and report zero-size splits we should also mark as full based on splits count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe for connectors which misbehave and report zero-size splits we should also mark as full based on splits count?
Totally forgot to implement that. We do indeed have the fault_tolerant_execution_max_task_split_count
property. Implemented.
Another, less straightforward problem is actually around an another session property I forgot about, the fault_tolerant_execution_min_task_split_count
.
This property is there to ensure there's enough splits assigned to a single task to ensure a task is able to utilize thread level parallelism. Usually, when the file format is "splittable", it doesn't really matter. However for non splittable formats when only a single split per entire file is generated it seems like a good idea to provide enough splits for a single task to utilize all available threads.
One goal I was trying to achieve by ArbitraryDistributionSplitAssigner
was to remove the Arbitrary
/ Source
distribution duality (which in their essence are the same). However the fault_tolerant_execution_min_task_split_count
only makes sense for table scan splits and doesn't make much sense for RemoteSplits (that can provide parallelism even within a single split). Currently in the ArbitraryDistributionSplitAssigner
I'm trying to make as little difference as possible between a RemoteSplit and a ConnectorSplit. Implementing fault_tolerant_execution_min_task_split_count
will most certainly make it more difficult. I'm a little bit on a fence whether we really want to have the fault_tolerant_execution_min_task_split_count
or should we consider non splittable formats as a nieche use case?
...trino-main/src/main/java/io/trino/execution/scheduler/StageEventDrivenTaskSourceFactory.java
Outdated
Show resolved
Hide resolved
...trino-main/src/main/java/io/trino/execution/scheduler/StageEventDrivenTaskSourceFactory.java
Outdated
Show resolved
Hide resolved
...trino-main/src/main/java/io/trino/execution/scheduler/StageEventDrivenTaskSourceFactory.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public synchronized void start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startIfNotStarted
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also maybe split the method into two - one for creating SplitLoader
and other for starting those up with top level exception handlingn.
The indentation depth is below my comfort level right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startIfNotStarted?
We usually call it just start
in other places. Do you think this one should be different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think (maybe this is not really the case) that typically a start()
method would throw if "object being started" is already started. Not just ignore the call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we can do that. I don't know why did I implement it to simply ignore it. I don't think it is ever called more than once.
...trino-main/src/main/java/io/trino/execution/scheduler/StageEventDrivenTaskSourceFactory.java
Outdated
Show resolved
Hide resolved
import static org.testng.Assert.assertEquals; | ||
import static org.testng.Assert.assertTrue; | ||
|
||
public class TestArbitraryDistributionSplitAssigner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm - the test logic is not simpler than the logic in the tested code. I wonder if it is possible to make assertions more explicit and not bloat this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the assignment algorithm is relatively straightforward the different sequence of interaction is what I wanted to test. The idea is to implement the algorithm in a straightforward way and make sure that different sequences of interaction with the assigner produce the same result.
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of dumb questions. Sorry
this.plan.set(requireNonNull(plan, "plan is null")); | ||
} | ||
|
||
public StageInfo getStageInfo() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be called getRootStageInfo
(I know current naming matches QueryScheduler
interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a tree internally, including all stage infos for all stages. It is more of a QueryInfo at this point. Not sure if getRootStageInfo
wouldn't be interpreted as "stage info for the root stage only"
} | ||
|
||
@Override | ||
public BasicStageStats getBasicStageStats() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getBasicRootStageStats()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those stats are aggregate stats across all stages
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
public Optional<RemoteTask> schedule(int partitionId, InternalNode node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am too dumb to follow this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, probably the most beafy method of the entire scheduler. Essentially what it does - it schedules a task.
And to schedule a task you need to have splits and output selectors.
This method obtains splits either from an open descriptor (if a descriptor is still being built) or from a sealed descriptor stored in the task descriptor storage. It also merges in output selectors.
Open to suggestions of how to make it more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it is not that bad on the second go. What you could do is to extract:
private Set<PlanNodeId> getRemoteSourceIds()
{
// this can be cached
Set<PlanNodeId> remoteSourceIds = new HashSet<>();
for (RemoteSourceNode remoteSource : stage.getFragment().getRemoteSourceNodes()) {
remoteSourceIds.add(remoteSource.getId());
}
return remoteSourceIds;
}
private Map<PlanNodeId, ExchangeSourceOutputSelector> getMergedSourceOutputSelectors()
{
ImmutableMap.Builder<PlanNodeId, ExchangeSourceOutputSelector> outputSelectors = ImmutableMap.builder();
for (RemoteSourceNode remoteSource : stage.getFragment().getRemoteSourceNodes()) {
ExchangeSourceOutputSelector mergedSelector = null;
for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) {
ExchangeSourceOutputSelector sourceFragmentSelector = sourceOutputSelectors.get(sourceFragmentId);
if (sourceFragmentSelector == null) {
continue;
}
if (mergedSelector == null) {
mergedSelector = sourceFragmentSelector;
}
else {
mergedSelector = mergedSelector.merge(sourceFragmentSelector);
}
}
if (mergedSelector != null) {
outputSelectors.put(remoteSource.getId(), mergedSelector);
}
}
return outputSelectors.buildOrThrow();
}
Then you will have just
Set<PlanNodeId> remoteSourceIds = getRemoteSourceIds();
Map<PlanNodeId, ExchangeSourceOutputSelector> outputSelectors = getMergedSourceOutputSelectors();
in schedule()
.
Maybe you could also build some common interface adapter over TaskDescriptor
and OpenTaskDescriptor
with methods:
public ListMultimap<PlanNodeId, Split> getSplits();
public boolean wasNoMoreSplits(PlanNodeId remoteSourcePlanNodeId);
then you could mostly unify handling of both. But not sure if that is worth a fuss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestions. Refactored, it seems to be way more readable now. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
|
||
updateOutputSize(outputStats); | ||
|
||
// task tescriptor has been created |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo tescriptor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also what is the case that task completes but descriptor is not created yet? LIMIT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this implementation we don't wait for the descriptor to be created before scheduling. It is possible that a task finishes (as you mentioned in LIMIT
case) before task descriptor is sealed. In such case we don't need to store it, as since the task is already finished there will be no retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In such case we don't need to store it, as since the task is already finished there will be no retry
Makes sense. Do we also drop sealed task descriptors from storage when tasks competes sucessefully?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, see StagePartition#taskFinished
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Show resolved
Hide resolved
Thanks for the review. Went through the first section of comments. Going to continue tomorrow. |
56deff7
to
cc44e88
Compare
cc44e88
to
756a430
Compare
@losipiuk Updated |
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
@@ -95,6 +95,7 @@ | |||
private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15)); | |||
private int faultTolerantExecutionPartitionCount = 50; | |||
private boolean faultTolerantPreserveInputPartitionsInWriteStage = true; | |||
private boolean faultTolerantExecutionEventDriverSchedulerEnabled = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this one. Do we can get some adoption, while keeping it false
for a release or two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran multiple rounds of full scale testing and it seems to work fine. I would probably leave it on by default while leaving the old implementation as a fallback option if something goes wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
} | ||
|
||
for (ExchangeSourceHandleSource handleSource : handleSources) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bail out quickly if failure != null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These checks are important. I'm trying to verify that sources are getting closed in case of a failure.
756a430
to
d2c7e1e
Compare
Updated |
core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java
Outdated
Show resolved
Hide resolved
result | ||
.addPartition(new Partition(0, new NodeRequirements(Optional.empty(), hostRequirement))) | ||
.sealPartition(0) | ||
.setNoMorePartitions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: formatted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what auto-format does for me. Is it different on your end?
PriorityQueue<PartitionAssignment> assignments = new PriorityQueue<>(); | ||
assignments.add(new PartitionAssignment(new TaskPartition(), 0)); | ||
for (int outputPartitionId = 0; outputPartitionId < partitionCount; outputPartitionId++) { | ||
long outputPartitionSize = mergedEstimate.getPartitionSizeInBytes(outputPartitionId); | ||
if (assignments.peek().assignedDataSizeInBytes() + outputPartitionSize > targetPartitionSizeInBytes | ||
&& assignments.size() < partitionCount) { | ||
assignments.add(new PartitionAssignment(new TaskPartition(), 0)); | ||
} | ||
PartitionAssignment assignment = assignments.poll(); | ||
result.put(outputPartitionId, assignment.taskPartition()); | ||
assignments.add(new PartitionAssignment(assignment.taskPartition(), assignment.assignedDataSizeInBytes() + outputPartitionSize)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm lost in this part of the logic. Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When HashDistributionSplitAssigner
is created an input size esitmate is provided (see Map<PlanNodeId, OutputDataSizeEstimate> outputDataSizeEstimates
). Based on the estimates provided the HashDistributionSplitAssigner
assigns output partitions to tasks (to avoid small tasks). In the previous version (with a full barrier) it was done based on the information obtained from ExchangeSourceHandle
. However if speculative execution is allowed this must be done based on "estimates" as ExchangeSourceHandle
s may not yet be available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question is more like, if we have fixed number of partitions, shouldn't we simply try to distribute data evenly among the assignments? What's the sense of trying to respect targetPartitionSizeInBytes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could work but you need same input size statics still. Otherwise you do not know how many partitions you should group together, to be handled by single task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@losipiuk : in the final result there is no information around size stats. It's a mapping from outputPartitionId
to TaskPartition
s. So I'm just more confused now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The targetPartitionSizeInBytes
is needed to avoid creating tiny partitions. For example if the total data size is only 1GB it should be enough to create a single partition mapping all the output partitions to a single task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I see if we output the map with partitionIds to a new TaskPartition()
, so I'm not sure if we are achieving the above purposes stated above. Am I missing something obvious?
I have added the following to code to print out the returned result:
Map<Integer, TaskPartition> resultToBePrinted = result.buildOrThrow();
resultToBePrinted.forEach((partitionId, taskPartition) -> {
System.out.println(partitionId);
if (taskPartition.isIdAssigned()) {
System.out.println(taskPartition.getId());
}
else {
System.out.println("Not assigned");
}
});
I'm seeing:
0
Not assigned
1
Not assigned
2
Not assigned
So not sure this code piece has any effect at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, i see.
TaskPartition
is a placeholder. Basically what this algorithm does it groups certain output partitions to be processed by certain task partitions.
For example:
- Output partitions 1,2,3 must be processed by a separate task
- Output partitions 4,5 must be processed by a different task
- Output partitions 6,7 must be processed by a yet another task
However we are trying to avoid assigning a certain numeric id to a task at this step. The problem is that the output data size is only an esimate, and in reallity not all the tasks may have some data to process.
For example when reading a bucketed table what we know is that there are 1000 buckets. So we assign 1000 task partitions one for each bucket. But then it is possible that data is missing for a certain bucket what would create a confusing hole in task ids (you may endup with tasks 1.0.0, 1.1.0, 1.5.0 missing the 1.2.0. 1.3.0, 1.4.0 tasks). Assigning numeric ids lazily allows to avoid such gaps.
d2c7e1e
to
4a2e852
Compare
Updated |
private SubPlan optimizePlan(SubPlan plan) | ||
{ | ||
// Re-optimize plan here based on available runtime statistics. | ||
// Fragments changed due to re-optimization as well as their downstream stages are expected to be assigned new fragment ids. | ||
return plan; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is a TODO item?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, basically I just wanted to show where the plan can be mutated. The adaptive planner will come later.
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
if (e == Event.ABORT) { | ||
return false; | ||
} | ||
if (e == Event.WAKE_UP) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the sense of having the WAKE_UP
event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to be used as a generic event to wake up a scheduler (for example if no state modification needed when something happened).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we don't need to schedule anything in this case? Doesn't feel we need to schedule anything in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently used to notify the scheduler that a node has been acquired. There's no need in any extra information that has to passed to the scheduler through an event, just let the scheduler know that a further progress can be made.
Move TableInfo extraction to TableInfo to make it reusable
The new scheduler allows changing query plan dynamically during execution, speculative execution as well as provides a single view into a query task queue allowing to set a priority for any certain task
4a2e852
to
4e09b35
Compare
Updated |
Description
This PR lays down foundation for the future advancements in fault tolerant execution. The proposed structure of the scheduler is aimed at making it possible to implemen:
The scheduler is implemented as an event loop to minimize synchronization necessity and allow developers to think about scheduling as a single threaded process
Non-technical explanation
N/A
TODO
Release notes
(x) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: