Skip to content

Commit

Permalink
Rename OutputBuffer#fail to abort
Browse files Browse the repository at this point in the history
It seems to be more consistent with the naming in other places in the
codebase (e.g.: abortTask). Also it will help to disambiguate a failure
(when something failed inside an output buffer and must be reported) and
an abort (when a buffer is explicitly aborted by the engine).
  • Loading branch information
arhimondr committed Jan 21, 2022
1 parent f4a9b27 commit d3e017d
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void initialize(Consumer<SqlTask> onDone, CounterStat failedTasks)
if (newState == FAILED || newState == ABORTED) {
// don't close buffers for a failed query
// closed buffers signal to upstream tasks that everything finished cleanly
outputBuffer.fail();
outputBuffer.abort();
}
else {
outputBuffer.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,10 @@ public void destroy()
}

@Override
public void fail()
public void abort()
{
// ignore fail if the buffer already in a terminal state.
if (stateMachine.fail()) {
// ignore abort if the buffer already in a terminal state.
if (stateMachine.abort()) {
memoryManager.setNoBlockOnFull();
forceFreeMemory();
// DO NOT destroy buffers or set no more pages. The coordinator manages the teardown of failed queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.execution.buffer.BufferState.FAILED;
import static io.trino.execution.buffer.BufferState.ABORTED;
import static io.trino.execution.buffer.BufferState.FINISHED;
import static io.trino.execution.buffer.BufferState.FLUSHING;
import static io.trino.execution.buffer.BufferState.NO_MORE_BUFFERS;
Expand Down Expand Up @@ -321,10 +321,10 @@ public void destroy()
}

@Override
public void fail()
public void abort()
{
// ignore fail if the buffer already in a terminal state.
if (stateMachine.fail()) {
// ignore abort if the buffer already in a terminal state.
if (stateMachine.abort()) {
memoryManager.setNoBlockOnFull();
forceFreeMemory();
// DO NOT destroy buffers or set no more pages. The coordinator manages the teardown of failed queries.
Expand Down Expand Up @@ -360,8 +360,8 @@ private synchronized ClientBuffer getBuffer(OutputBufferId id)
// When no-more-buffers is set, we verify that all created buffers have been declared
buffer = new ClientBuffer(taskInstanceId, id, onPagesReleased);

// do not setup the new buffer if we are already failed
if (state != FAILED) {
// do not setup the new buffer if we are already aborted
if (state != ABORTED) {
// add initial pages
buffer.enqueuePages(initialPagesForNewBuffers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ public enum BufferState
*/
FINISHED(false, false, true),
/**
* Buffer has failed. No more buffers or pages can be added. Readers
* Buffer has been aborted. No more buffers or pages can be added. Readers
* will be blocked, as to not communicate a finished state. It is
* assumed that the reader will be cleaned up elsewhere.
* This is the terminal state.
*/
FAILED(false, false, true);
ABORTED(false, false, true);

public static final Set<BufferState> TERMINAL_BUFFER_STATES = Stream.of(BufferState.values()).filter(BufferState::isTerminal).collect(toImmutableSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,22 @@ public void destroy()
}

@Override
public void fail()
public void abort()
{
OutputBuffer outputBuffer = delegate;
if (outputBuffer == null) {
synchronized (this) {
if (delegate == null) {
// ignore fail if the buffer already in a terminal state.
stateMachine.fail();
// ignore abort if the buffer already in a terminal state.
stateMachine.abort();

// Do not free readers on fail
return;
}
outputBuffer = delegate;
}
}
outputBuffer.fail();
outputBuffer.abort();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ public interface OutputBuffer
void destroy();

/**
* Fail the buffer, discarding all pages, but blocking readers. It is expected that
* Abort the buffer, discarding all pages, but blocking readers. It is expected that
* readers will be unblocked when the failed query is cleaned up.
*/
void fail();
void abort();

/**
* @return the peak memory usage of this output buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.util.concurrent.Executor;

import static io.trino.execution.buffer.BufferState.FAILED;
import static io.trino.execution.buffer.BufferState.ABORTED;
import static io.trino.execution.buffer.BufferState.FINISHED;
import static io.trino.execution.buffer.BufferState.FLUSHING;
import static io.trino.execution.buffer.BufferState.NO_MORE_BUFFERS;
Expand Down Expand Up @@ -66,8 +66,8 @@ public boolean finish()
return state.setIf(FINISHED, oldState -> !oldState.isTerminal());
}

public boolean fail()
public boolean abort()
{
return state.setIf(FAILED, oldState -> !oldState.isTerminal());
return state.setIf(ABORTED, oldState -> !oldState.isTerminal());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ public void destroy()
}

@Override
public void fail()
public void abort()
{
// ignore fail if the buffer already in a terminal state.
if (stateMachine.fail()) {
// ignore abort if the buffer already in a terminal state.
if (stateMachine.abort()) {
memoryManager.setNoBlockOnFull();
forceFreeMemory();
// DO NOT destroy buffers or set no more pages. The coordinator manages the teardown of failed queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ public void destroy()
}

@Override
public void fail()
public void abort()
{
if (stateMachine.fail()) {
if (stateMachine.abort()) {
try {
exchangeSink.abort();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.execution.buffer.BufferResult.emptyResults;
import static io.trino.execution.buffer.BufferState.FAILED;
import static io.trino.execution.buffer.BufferState.ABORTED;
import static io.trino.execution.buffer.BufferState.FINISHED;
import static io.trino.execution.buffer.BufferState.FLUSHING;
import static io.trino.execution.buffer.BufferState.NO_MORE_BUFFERS;
Expand Down Expand Up @@ -790,8 +790,8 @@ public void testFailDoesNotFreeReader()
future = buffer.get(FIRST, 1, sizeOfPages(10));
assertFalse(future.isDone());

// fail the buffer
buffer.fail();
// abort the buffer
buffer.abort();

// future should have not finished
assertFalse(future.isDone());
Expand Down Expand Up @@ -828,9 +828,9 @@ public void testFailFreesWriter()
assertFalse(firstEnqueuePage.isDone());
assertFalse(secondEnqueuePage.isDone());

// fail the buffer (i.e., cancel the query)
buffer.fail();
assertEquals(buffer.getState(), FAILED);
// abort the buffer (i.e., fail the query)
buffer.abort();
assertEquals(buffer.getState(), ABORTED);

// verify the futures are completed
assertFutureIsDone(firstEnqueuePage);
Expand All @@ -857,8 +857,8 @@ public void testAddBufferAfterFail()
// verify we got one page
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));

// fail the buffer
buffer.fail();
// abort the buffer
buffer.abort();

// add a buffer
outputBuffers = outputBuffers.withBuffer(SECOND, BROADCAST_PARTITION_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.execution.buffer.BufferResult.emptyResults;
import static io.trino.execution.buffer.BufferState.FAILED;
import static io.trino.execution.buffer.BufferState.ABORTED;
import static io.trino.execution.buffer.BufferState.FINISHED;
import static io.trino.execution.buffer.BufferState.FLUSHING;
import static io.trino.execution.buffer.BufferState.NO_MORE_BUFFERS;
Expand Down Expand Up @@ -879,8 +879,8 @@ public void testFailDoesNotFreeReader()
future = buffer.get(FIRST, 1, sizeOfPages(10));
assertFalse(future.isDone());

// fail the buffer
buffer.fail();
// abort the buffer
buffer.abort();

// future should have not finished
assertFalse(future.isDone());
Expand Down Expand Up @@ -917,9 +917,9 @@ public void testFailFreesWriter()
assertFalse(firstEnqueuePage.isDone());
assertFalse(secondEnqueuePage.isDone());

// fail the buffer (i.e., cancel the query)
buffer.fail();
assertEquals(buffer.getState(), FAILED);
// abort the buffer (i.e., fail the query)
buffer.abort();
assertEquals(buffer.getState(), ABORTED);

// verify the futures are completed
assertFutureIsDone(firstEnqueuePage);
Expand All @@ -946,8 +946,8 @@ public void testAddBufferAfterFail()
// verify we got one page
assertBufferResultEquals(TYPES, getFuture(future, NO_WAIT), bufferResult(0, createPage(0)));

// fail the buffer
buffer.fail();
// abort the buffer
buffer.abort();

// add a buffer
outputBuffers = outputBuffers.withBuffer(SECOND, BROADCAST_PARTITION_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.execution.buffer.BufferResult.emptyResults;
import static io.trino.execution.buffer.BufferState.FAILED;
import static io.trino.execution.buffer.BufferState.ABORTED;
import static io.trino.execution.buffer.BufferState.FINISHED;
import static io.trino.execution.buffer.BufferState.FLUSHING;
import static io.trino.execution.buffer.BufferState.NO_MORE_BUFFERS;
Expand Down Expand Up @@ -682,8 +682,8 @@ public void testFailDoesNotFreeReader()
future = buffer.get(FIRST, 1, sizeOfPages(10));
assertFalse(future.isDone());

// fail the buffer
buffer.fail();
// abort the buffer
buffer.abort();

// future should have not finished
assertFalse(future.isDone());
Expand Down Expand Up @@ -720,9 +720,9 @@ public void testFailFreesWriter()
assertFalse(firstEnqueuePage.isDone());
assertFalse(secondEnqueuePage.isDone());

// fail the buffer (i.e., cancel the query)
buffer.fail();
assertEquals(buffer.getState(), FAILED);
// abort the buffer (i.e., fail the query)
buffer.abort();
assertEquals(buffer.getState(), ABORTED);

// verify the futures are completed
assertFutureIsDone(firstEnqueuePage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ public void destroy()
}

@Override
public void fail()
public void abort()
{
}

Expand Down

0 comments on commit d3e017d

Please sign in to comment.