Skip to content

Commit

Permalink
Add thread-per-split executor
Browse files Browse the repository at this point in the history
* Uses a fair queue based on Completely Fair Scheduler
* Runs each split in a separate thread
  • Loading branch information
martint committed Jul 13, 2023
1 parent 21a6276 commit 831170a
Show file tree
Hide file tree
Showing 25 changed files with 3,076 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"task.level-absolute-priority"})
public class TaskManagerConfig
{
private boolean threadPerDriverSchedulerEnabled = true;
private boolean perOperatorCpuTimerEnabled = true;
private boolean taskCpuTimerEnabled = true;
private boolean statisticsCpuTimerEnabled = true;
Expand Down Expand Up @@ -107,6 +108,18 @@ public class TaskManagerConfig

private BigDecimal levelTimeMultiplier = new BigDecimal(2.0);

@Config("experimental.thread-per-split-scheduler-enabled")
public TaskManagerConfig setThreadPerDriverSchedulerEnabled(boolean enabled)
{
this.threadPerDriverSchedulerEnabled = enabled;
return this;
}

public boolean isThreadPerDriverSchedulerEnabled()
{
return threadPerDriverSchedulerEnabled;
}

@MinDuration("1ms")
@MaxDuration("10s")
@NotNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.executor2;

import com.google.common.base.Ticker;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
import io.airlift.stats.CpuTimer;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.trino.execution.SplitRunner;
import io.trino.execution.TaskId;
import io.trino.execution.executor2.scheduler.Schedulable;
import io.trino.execution.executor2.scheduler.SchedulerContext;
import io.trino.tracing.TrinoAttributes;

import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

class SplitProcessor
implements Schedulable
{
private static final Duration SPLIT_RUN_QUANTA = new Duration(1, TimeUnit.SECONDS);

private final TaskId taskId;
private final int splitId;
private final SplitRunner split;
private final Tracer tracer;

public SplitProcessor(TaskId taskId, int splitId, SplitRunner split, Tracer tracer)
{
this.taskId = requireNonNull(taskId, "taskId is null");
this.splitId = splitId;
this.split = requireNonNull(split, "split is null");
this.tracer = requireNonNull(tracer, "tracer is null");
}

@Override
public void run(SchedulerContext context)
{
Span splitSpan = tracer.spanBuilder("split")
.setParent(Context.current().with(split.getPipelineSpan()))
.setAttribute(TrinoAttributes.QUERY_ID, taskId.getQueryId().toString())
.setAttribute(TrinoAttributes.STAGE_ID, taskId.getStageId().toString())
.setAttribute(TrinoAttributes.TASK_ID, taskId.toString())
.setAttribute(TrinoAttributes.PIPELINE_ID, taskId.getStageId() + "-" + split.getPipelineId())
.setAttribute(TrinoAttributes.SPLIT_ID, taskId + "-" + splitId)
.startSpan();

Span processSpan = newSpan(splitSpan, null);

CpuTimer timer = new CpuTimer(Ticker.systemTicker(), false);
long previousCpuNanos = 0;
long previousScheduledNanos = 0;
try (SetThreadName ignored = new SetThreadName("SplitRunner-%s-%s", taskId, splitId)) {
while (!split.isFinished()) {
ListenableFuture<Void> blocked = split.processFor(SPLIT_RUN_QUANTA);
CpuTimer.CpuDuration elapsed = timer.elapsedTime();

long scheduledNanos = elapsed.getWall().roundTo(NANOSECONDS);
processSpan.setAttribute(TrinoAttributes.SPLIT_SCHEDULED_TIME_NANOS, scheduledNanos - previousScheduledNanos);
previousScheduledNanos = scheduledNanos;

long cpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
processSpan.setAttribute(TrinoAttributes.SPLIT_CPU_TIME_NANOS, cpuNanos - previousCpuNanos);
previousCpuNanos = cpuNanos;

if (!split.isFinished()) {
if (blocked.isDone()) {
processSpan.addEvent("yield");
processSpan.end();
if (!context.maybeYield()) {
return;
}
}
else {
processSpan.addEvent("blocked");
processSpan.end();
if (!context.block(blocked)) {
return;
}
}
processSpan = newSpan(splitSpan, processSpan);
}
}
}
finally {
processSpan.end();

splitSpan.setAttribute(TrinoAttributes.SPLIT_CPU_TIME_NANOS, timer.elapsedTime().getCpu().roundTo(NANOSECONDS));
splitSpan.setAttribute(TrinoAttributes.SPLIT_SCHEDULED_TIME_NANOS, context.getScheduledNanos());
splitSpan.setAttribute(TrinoAttributes.SPLIT_BLOCK_TIME_NANOS, context.getBlockedNanos());
splitSpan.setAttribute(TrinoAttributes.SPLIT_WAIT_TIME_NANOS, context.getWaitNanos());
splitSpan.end();
}
}

private Span newSpan(Span parent, Span previous)
{
SpanBuilder builder = tracer.spanBuilder("process")
.setParent(Context.current().with(parent));

if (previous != null) {
builder.addLink(previous.getSpanContext());
}

return builder.startSpan();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.executor2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Tracer;
import io.trino.execution.SplitRunner;
import io.trino.execution.TaskId;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.executor.RunningSplitInfo;
import io.trino.execution.executor.TaskExecutor;
import io.trino.execution.executor.TaskHandle;
import io.trino.execution.executor2.scheduler.FairScheduler;
import io.trino.execution.executor2.scheduler.Group;
import io.trino.execution.executor2.scheduler.Schedulable;
import io.trino.execution.executor2.scheduler.SchedulerContext;
import io.trino.spi.VersionEmbedder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.DoubleSupplier;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;

@ThreadSafe
public class ThreadPerDriverTaskExecutor
implements TaskExecutor
{
private final FairScheduler scheduler;
private final Tracer tracer;
private final VersionEmbedder versionEmbedder;
private volatile boolean closed;

@Inject
public ThreadPerDriverTaskExecutor(TaskManagerConfig config, Tracer tracer, VersionEmbedder versionEmbedder)
{
this(tracer, versionEmbedder, new FairScheduler(config.getMaxWorkerThreads(), "SplitRunner-%d", Ticker.systemTicker()));
}

@VisibleForTesting
public ThreadPerDriverTaskExecutor(Tracer tracer, VersionEmbedder versionEmbedder, FairScheduler scheduler)
{
this.scheduler = scheduler;
this.tracer = requireNonNull(tracer, "tracer is null");
this.versionEmbedder = requireNonNull(versionEmbedder, "versionEmbedder is null");
}

@PostConstruct
@Override
public synchronized void start()
{
scheduler.start();
}

@PreDestroy
@Override
public synchronized void stop()
{
closed = true;
scheduler.close();
}

@Override
public synchronized TaskHandle addTask(
TaskId taskId,
DoubleSupplier utilizationSupplier,
int initialSplitConcurrency,
Duration splitConcurrencyAdjustFrequency,
OptionalInt maxDriversPerTask)
{
checkArgument(!closed, "Executor is already closed");

Group group = scheduler.createGroup(taskId.toString());
return new TaskEntry(taskId, group);
}

@Override
public synchronized void removeTask(TaskHandle handle)
{
TaskEntry entry = (TaskEntry) handle;

if (!entry.isDestroyed()) {
scheduler.removeGroup(entry.group());
entry.destroy();
}
}

@Override
public synchronized List<ListenableFuture<Void>> enqueueSplits(TaskHandle handle, boolean intermediate, List<? extends SplitRunner> splits)
{
checkArgument(!closed, "Executor is already closed");

TaskEntry entry = (TaskEntry) handle;

List<ListenableFuture<Void>> futures = new ArrayList<>();
for (SplitRunner split : splits) {
entry.addSplit(split);

int splitId = entry.nextSplitId();
ListenableFuture<Void> done = scheduler.submit(entry.group(), splitId, new VersionEmbedderBridge(versionEmbedder, new SplitProcessor(entry.taskId(), splitId, split, tracer)));
done.addListener(split::close, directExecutor());
futures.add(done);
}

return futures;
}

@Override
public String getMaxActiveSplitsInfo()
{
return ""; // TODO
}

@Override
public Set<TaskId> getStuckSplitTaskIds(Duration processingDurationThreshold, Predicate<RunningSplitInfo> filter)
{
// TODO
return ImmutableSet.of();
}

private static class TaskEntry
implements TaskHandle
{
private final TaskId taskId;
private final Group group;
private final AtomicInteger nextSplitId = new AtomicInteger();
private volatile boolean destroyed;

@GuardedBy("this")
private Set<SplitRunner> splits = new HashSet<>();

public TaskEntry(TaskId taskId, Group group)
{
this.taskId = taskId;
this.group = group;
}

public TaskId taskId()
{
return taskId;
}

public Group group()
{
return group;
}

public synchronized void destroy()
{
destroyed = true;

for (SplitRunner split : splits) {
split.close();
}
}

public synchronized void addSplit(SplitRunner split)
{
checkArgument(!destroyed, "Task already destroyed: %s", taskId);
splits.add(split);
}

public int nextSplitId()
{
return nextSplitId.incrementAndGet();
}

@Override
public boolean isDestroyed()
{
return destroyed;
}
}

private record VersionEmbedderBridge(VersionEmbedder versionEmbedder, Schedulable delegate)
implements Schedulable
{
@Override
public void run(SchedulerContext context)
{
Runnable adapter = () -> delegate.run(context);
versionEmbedder.embedVersion(adapter).run();
}
}
}
Loading

0 comments on commit 831170a

Please sign in to comment.