Skip to content

Commit

Permalink
Decrease a lock contention in PipelinedStageExecution
Browse files Browse the repository at this point in the history
Taking a monitor of io.trino.execution.scheduler.PipelinedStageExection
in the updateTaskStatus method causes a high lock contention. Make this
method lock-less.
  • Loading branch information
radek-kondziolka authored and sopel39 committed Oct 10, 2022
1 parent a2104a3 commit 426e759
Showing 1 changed file with 48 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,8 @@ public class PipelinedStageExecution
// current stage task tracking
@GuardedBy("this")
private final Set<TaskId> allTasks = new HashSet<>();
@GuardedBy("this")
private final Set<TaskId> finishedTasks = new HashSet<>();
@GuardedBy("this")
private final Set<TaskId> flushingTasks = new HashSet<>();
private final Set<TaskId> finishedTasks = ConcurrentHashMap.newKeySet();
private final Set<TaskId> flushingTasks = ConcurrentHashMap.newKeySet();

// source task tracking
@GuardedBy("this")
Expand Down Expand Up @@ -219,16 +217,16 @@ public synchronized void transitionToSchedulingSplits()
}

@Override
public synchronized void schedulingComplete()
public void schedulingComplete()
{
if (!stateMachine.transitionToScheduled()) {
return;
}

if (isFlushing()) {
if (isStageFlushing()) {
stateMachine.transitionToFlushing();
}
if (finishedTasks.containsAll(allTasks)) {
if (isStageFinished()) {
stateMachine.transitionToFinished();
}

Expand Down Expand Up @@ -333,13 +331,13 @@ public synchronized Optional<RemoteTask> scheduleTask(
return Optional.of(task);
}

private synchronized void updateTaskStatus(TaskStatus taskStatus)
private void updateTaskStatus(TaskStatus taskStatus)
{
State stageState = stateMachine.getState();
if (stageState.isDone()) {
return;
}

boolean newFlushingOrFinishedTaskObserved = false;
TaskState taskState = taskStatus.getState();

switch (taskState) {
Expand All @@ -360,11 +358,10 @@ private synchronized void updateTaskStatus(TaskStatus taskStatus)
fail(new TrinoException(GENERIC_INTERNAL_ERROR, "A task is in the ABORTED state but stage is " + stageState));
break;
case FLUSHING:
flushingTasks.add(taskStatus.getTaskId());
newFlushingOrFinishedTaskObserved = addFlushingTask(taskStatus.getTaskId());
break;
case FINISHED:
finishedTasks.add(taskStatus.getTaskId());
flushingTasks.remove(taskStatus.getTaskId());
newFlushingOrFinishedTaskObserved = addFinishedTask(taskStatus.getTaskId());
break;
default:
}
Expand All @@ -373,11 +370,14 @@ private synchronized void updateTaskStatus(TaskStatus taskStatus)
if (taskState == TaskState.RUNNING) {
stateMachine.transitionToRunning();
}
if (isFlushing()) {
stateMachine.transitionToFlushing();
}
if (finishedTasks.containsAll(allTasks)) {
stateMachine.transitionToFinished();
// avoid extra synchronization if no new flushing or finished task was observed
if (newFlushingOrFinishedTaskObserved) {
if (isStageFlushing()) {
stateMachine.transitionToFlushing();
}
if (isStageFinished()) {
stateMachine.transitionToFinished();
}
}
}
}
Expand All @@ -389,6 +389,37 @@ private synchronized boolean isStageFlushing()
&& allTasks.stream().allMatch(taskId -> finishedTasks.contains(taskId) || flushingTasks.contains(taskId));
}

private synchronized boolean isStageFinished()
{
return finishedTasks.containsAll(allTasks);
}

private boolean addFlushingTask(TaskId taskId)
{
if (!flushingTasks.contains(taskId) && !finishedTasks.contains(taskId)) {
synchronized (this) {
// We need to check whether that task is not already finished. It could happen because of out of order of
// task status events
if (!finishedTasks.contains(taskId)) {
return flushingTasks.add(taskId);
}
}
}
return false;
}

private boolean addFinishedTask(TaskId taskId)
{
if (!finishedTasks.contains(taskId)) {
synchronized (this) {
boolean added = finishedTasks.add(taskId);
flushingTasks.remove(taskId);
return added;
}
}
return false;
}

private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo)
{
if (executionFailureInfo.getRemoteHost() == null || failureDetector.getState(executionFailureInfo.getRemoteHost()) != GONE) {
Expand Down

0 comments on commit 426e759

Please sign in to comment.