Skip to content

Commit

Permalink
Support non blocking finish/abort for ExchangeSink
Browse files Browse the repository at this point in the history
ExchangeSink#finish is called to commit ExchangeSink when noMorePages
is set on the SpoolingExchangeOutputBuffer. The setNoMorePages method
is assumed to be lightweight and is called from a thread pool designed
to handle lightweight task notifications. By default the thread pool
size is only 5 threads large.

It is not ideal to simply increase thread pool size as it is hard to
know what specific output buffer will be used and whether any
heavyweight processing on "noMorePages" is needed. Instead this commit
changes the finish and abort operations on ExchangeSink to be non
blocking. With this approach the ExchangeSink will be free to implement
it's own commit strategy without blocking the engine thread pools.
  • Loading branch information
arhimondr committed Jan 21, 2022
1 parent d3e017d commit af6b14f
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.operator.StageExecutionDescriptor;
import io.trino.operator.TaskContext;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoException;
import io.trino.sql.planner.LocalExecutionPlanner.LocalExecutionPlan;
import io.trino.sql.planner.plan.PlanNodeId;

Expand Down Expand Up @@ -76,6 +77,7 @@
import static io.trino.execution.SqlTaskExecution.SplitsState.FINISHED;
import static io.trino.execution.SqlTaskExecution.SplitsState.NO_MORE_SPLITS;
import static io.trino.operator.PipelineExecutionStrategy.UNGROUPED_EXECUTION;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -640,13 +642,27 @@ private synchronized void checkTaskCompletion()
outputBuffer.setNoMorePages();

BufferState bufferState = outputBuffer.getState();
if (bufferState != BufferState.FINISHED) {
if (!bufferState.isTerminal()) {
taskStateMachine.transitionToFlushing();
return;
}

// Cool! All done!
taskStateMachine.finished();
if (bufferState == BufferState.FINISHED) {
// Cool! All done!
taskStateMachine.finished();
return;
}

if (bufferState == BufferState.FAILED) {
Throwable failureCause = outputBuffer.getFailureCause()
.orElseGet(() -> new TrinoException(GENERIC_INTERNAL_ERROR, "Output buffer is failed but the failure cause is missing"));
taskStateMachine.failed(failureCause);
return;
}

// The only terminal state that remains is ABORTED.
// Buffer is expected to be aborted only if the task itself is aborted. In this scenario the following statement is expected to be noop.
taskStateMachine.failed(new TrinoException(GENERIC_INTERNAL_ERROR, "Unexpected buffer state: " + bufferState));
}

@Override
Expand Down Expand Up @@ -1111,7 +1127,7 @@ public CheckTaskCompletionOnBufferFinish(SqlTaskExecution sqlTaskExecution)
@Override
public void stateChanged(BufferState newState)
{
if (newState == BufferState.FINISHED) {
if (newState.isTerminal()) {
SqlTaskExecution sqlTaskExecution = sqlTaskExecutionReference.get();
if (sqlTaskExecution != null) {
sqlTaskExecution.checkTaskCompletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ public long getPeakMemoryUsage()
return memoryManager.getPeakMemoryUsage();
}

@Override
public Optional<Throwable> getFailureCause()
{
return stateMachine.getFailureCause();
}

@VisibleForTesting
void forceFreeMemory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -337,6 +338,12 @@ public long getPeakMemoryUsage()
return memoryManager.getPeakMemoryUsage();
}

@Override
public Optional<Throwable> getFailureCause()
{
return stateMachine.getFailureCause();
}

@VisibleForTesting
void forceFreeMemory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ public enum BufferState
* assumed that the reader will be cleaned up elsewhere.
* This is the terminal state.
*/
ABORTED(false, false, true);
ABORTED(false, false, true),

/**
* Buffer is failed. No more buffers or pages can be added. The task will be failed.
* This is the terminal state.
*/
FAILED(false, false, true);

public static final Set<BufferState> TERMINAL_BUFFER_STATES = Stream.of(BufferState.values()).filter(BufferState::isTerminal).collect(toImmutableSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
Expand Down Expand Up @@ -337,6 +338,12 @@ public long getPeakMemoryUsage()
return 0;
}

@Override
public Optional<Throwable> getFailureCause()
{
return stateMachine.getFailureCause();
}

@Nullable
private OutputBuffer getDelegateOutputBuffer()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.execution.buffer.OutputBuffers.OutputBufferId;

import java.util.List;
import java.util.Optional;

public interface OutputBuffer
{
Expand Down Expand Up @@ -114,4 +115,9 @@ public interface OutputBuffer
* @return the peak memory usage of this output buffer.
*/
long getPeakMemoryUsage();

/**
* Returns non empty failure cause if the buffer is in state {@link BufferState#FAILED}
*/
Optional<Throwable> getFailureCause();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

import static io.trino.execution.buffer.BufferState.ABORTED;
import static io.trino.execution.buffer.BufferState.FAILED;
import static io.trino.execution.buffer.BufferState.FINISHED;
import static io.trino.execution.buffer.BufferState.FLUSHING;
import static io.trino.execution.buffer.BufferState.NO_MORE_BUFFERS;
import static io.trino.execution.buffer.BufferState.NO_MORE_PAGES;
import static io.trino.execution.buffer.BufferState.OPEN;
import static io.trino.execution.buffer.BufferState.TERMINAL_BUFFER_STATES;
import static java.util.Objects.requireNonNull;

public class OutputBufferStateMachine
{
private final StateMachine<BufferState> state;
private final AtomicReference<Throwable> failureCause = new AtomicReference<>();

public OutputBufferStateMachine(TaskId taskId, Executor executor)
{
Expand Down Expand Up @@ -70,4 +75,17 @@ public boolean abort()
{
return state.setIf(ABORTED, oldState -> !oldState.isTerminal());
}

public boolean fail(Throwable throwable)
{
requireNonNull(throwable, "throwable is null");

failureCause.compareAndSet(null, throwable);
return state.setIf(FAILED, oldState -> !oldState.isTerminal());
}

public Optional<Throwable> getFailureCause()
{
return Optional.ofNullable(failureCause.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.memory.context.LocalMemoryContext;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
Expand Down Expand Up @@ -266,6 +267,12 @@ public long getPeakMemoryUsage()
return memoryManager.getPeakMemoryUsage();
}

@Override
public Optional<Throwable> getFailureCause()
{
return stateMachine.getFailureCause();
}

@VisibleForTesting
void forceFreeMemory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.spi.exchange.ExchangeSink;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand Down Expand Up @@ -166,34 +167,51 @@ public void enqueue(int partition, List<Slice> pages)
@Override
public void setNoMorePages()
{
stateMachine.noMorePages();
destroy();
if (stateMachine.noMorePages()) {
exchangeSink.finish().whenComplete((value, failure) -> {
try {
if (failure != null) {
stateMachine.fail(failure);
}
else {
stateMachine.finish();
}
}
finally {
updateMemoryUsage(exchangeSink.getMemoryUsage());
}
});
}
}

@Override
public void destroy()
{
if (stateMachine.finish()) {
try {
exchangeSink.finish();
}
finally {
updateMemoryUsage(exchangeSink.getMemoryUsage());
}
if (stateMachine.getState().canAddPages()) {
// This situation is possible if the task has been cancelled
// Task cancellation is not supported (and not expected to be requested by the scheduler)
// when external exchange is active as the task output is expected to be deterministic
// As a safety precaution abort the sink to mark the output as invalid
abort();
}
}

@Override
public void abort()
{
if (stateMachine.abort()) {
exchangeSink.abort().whenComplete((value, failure) -> {
try {
exchangeSink.abort();
if (failure != null) {
stateMachine.fail(failure);
}
else {
stateMachine.abort();
}
}
finally {
updateMemoryUsage(0);
updateMemoryUsage(exchangeSink.getMemoryUsage());
}
}
});
}

@Override
Expand All @@ -202,6 +220,12 @@ public long getPeakMemoryUsage()
return peakMemoryUsage.get();
}

@Override
public Optional<Throwable> getFailureCause()
{
return stateMachine.getFailureCause();
}

private void updateMemoryUsage(long bytes)
{
LocalMemoryContext context = getSystemMemoryContextOrNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import static java.lang.Math.toIntExact;
import static java.nio.file.Files.createFile;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;

public class LocalFileSystemExchangeSink
implements ExchangeSink
Expand Down Expand Up @@ -109,10 +111,10 @@ public synchronized long getMemoryUsage()
}

@Override
public synchronized void finish()
public synchronized CompletableFuture<?> finish()
{
if (closed) {
return;
return completedFuture(null);
}
try {
for (SliceOutput output : outputs.values()) {
Expand All @@ -133,17 +135,18 @@ public synchronized void finish()
}
catch (Throwable t) {
abort();
throw t;
return failedFuture(t);
}
committed = true;
closed = true;
return completedFuture(null);
}

@Override
public synchronized void abort()
public synchronized CompletableFuture<?> abort()
{
if (closed) {
return;
return completedFuture(null);
}
closed = true;
for (SliceOutput output : outputs.values()) {
Expand All @@ -161,5 +164,6 @@ public synchronized void abort()
catch (IOException e) {
log.warn(e, "Error cleaning output directory");
}
return completedFuture(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,12 @@ public long getPeakMemoryUsage()
{
return 0;
}

@Override
public Optional<Throwable> getFailureCause()
{
return Optional.empty();
}
}

private static class SumModuloPartitionFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ public interface ExchangeSink

/**
* Notifies the exchange sink that no more data will be appended
*
* @return future that will be resolved when the finish operation either succeeds or fails
*/
void finish();
CompletableFuture<?> finish();

/**
* Notifies the exchange that the write operation has been aborted
*
* @return future that will be resolved when the abort operation either succeeds or fails
*/
void abort();
CompletableFuture<?> abort();
}

0 comments on commit af6b14f

Please sign in to comment.