-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support async commit for ExchangeSink #10699
Support async commit for ExchangeSink #10699
Conversation
Can you provide some rationale? Would be nice to have it in commit message anyway. |
core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java
Show resolved
Hide resolved
import static io.trino.execution.buffer.BufferState.OPEN; | ||
import static io.trino.execution.buffer.BufferState.TERMINAL_BUFFER_STATES; | ||
|
||
public class OutputBufferStateMachine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we are not using boolean stateChanged
return values most of the time. Would that make sense to return void
for methods where we do not care about returned value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a boolean for most of the methods. The return value is not used for noMoreBuffers
and fail
. But I thought it might be better to be consistent with other methods.
.orElseGet(() -> new TrinoException(GENERIC_INTERNAL_ERROR, "Output buffer is failed but the failure cause is missing")); | ||
taskStateMachine.failed(failureCause); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about ABORTED
why is it not expected here? Worth a comment?
core/trino-main/src/main/java/io/trino/execution/buffer/BufferState.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/BufferState.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This came up during a discussion with @linzebing . It looks like currently the https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java#L640 Where the The commit operation on I will update the commit message. |
e95aacb
to
af6b14f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martint you may want to look at changes in BufferState
af6b14f
to
d62fd13
Compare
core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/BroadcastOutputBuffer.java
Outdated
Show resolved
Hide resolved
@@ -412,7 +407,7 @@ private void noMoreBuffers() | |||
|
|||
private void checkFlushComplete() | |||
{ | |||
if (state.get() != FLUSHING && state.get() != NO_MORE_BUFFERS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably should be BufferState state = stateMachine.get
and then you should perform check. Otherwise state
could move from NO_MORE_BUFFERS
to FLUSHING
between stateMachine.getState()
calls, which seem racy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it does seem weird. I also thought about that. I don't know exactly why it is implemented this way. At the end of the day I decided not to touch it and keep the change as close to being mechanic as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still I think this should be fixed (separate commit). I can image, state transitioning from NO_MORE_BUFFERS
to FLUSHING
and this method will destroy buffers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been like that for a very long time. It doesn't seem to be likely that the implementation is incorrect. But I agree, it's super confusing. Let me add a commit that simplifies it.
core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/PartitionedOutputBuffer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/ArbitraryOutputBuffer.java
Show resolved
Hide resolved
} | ||
|
||
// The only terminal state that remains is ABORTED. | ||
// Buffer is expected to be aborted only if the task itself is aborted. In this scenario the following statement is expected to be noop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this scenario the following statement is expected to be noop.
why? because task is aborted so this line should never execute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failing an aborted task is a noop, as the ABORTED
state is a terminal state.
{ | ||
requireNonNull(throwable, "throwable is null"); | ||
|
||
failureCause.compareAndSet(null, throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very that failureCause
is not overwritten (it is null)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is allowed to be called multiple times (similar to how it is implemented in other state machines). The contract is that the method has to preserve only the first failure that made the transition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment: the method has to preserve only the first failure that made the transition.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code seems to be self explanatory and aligns with what is done in other state machines. How strongly do you feel about having an explicit comment here?
core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java
Show resolved
Hide resolved
|
||
/** | ||
* Notifies the exchange that the write operation has been aborted | ||
* | ||
* @return future that will be resolved when the abort operation either succeeds or fails |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should abort
do when finish
is already running?
What should abort
do when another abort
is running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should abort do when finish is already running?
I think it can be implementation specific. The implementation may decide to keep the finish
running, or may decide to cancel finish
and abort. It doesn't really make a difference from the engine perspective.
What should abort do when another abort is running?
Same here. It is implementation specific. As long as the sink is properly invalidated the engine doesn't really care what happens underneath as the task is already aborted / failed anyway. Regardless I guess it is better to make the abort
method idempotent. I will change it to first transition the buffer to the ABORTED
state and then call the ExchangeSink#abort
result of which is technically ignored anyway (abort
is only called when the task itself is failed or aborted)
*/ | ||
void finish(); | ||
CompletableFuture<?> finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what should finish
do when abort
is already running? Contract is undefined here
what should finish
do when another finish
is running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what should finish do when abort is already running?
finish
should never be called after abort. If it is - it's a bug. Let me document it.
Contract is undefined here what should finish do when another finish is running?
finish
shouldn't be called when another finish
is running. If it is - it's a bug.
Updated java doc
finally { | ||
updateMemoryUsage(exchangeSink.getMemoryUsage()); | ||
} | ||
if (stateMachine.getState().canAddPages()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it racy with setNoMorePages
? E.g. abort
can be called after setNoMorePages
started running finish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setNoMorePages
starts running finish
after transitioning the state. If destroy
is called before setNoMorePages
it means that the task got cancelled prematurely and the buffer has to be invalidated. If there's a race and setNoMorePages
is called at the same time when destroy
is called it is legit to finish
the sink, as the data written to the sink at that point is complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand. If say this race is fine, then why do we need if (stateMachine.getState().canAddPages()) {
check here? In case of a race (between setNoMorePages
and destroy
), if would be like this check does not exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In normal flow there shouldn't be a race. When the output is completely written and the setNoMorePages
is called the task is only finished after ExchangeSink#finish
is done and the buffer is transitioned to the FINISHED
state. When the task itself is transitioned to FINISHED
the destroy
method is called and we don't want the sink to be aborted under normal circumstances. That's why there's a check.
However a race is possible when all the data is written but the task is cancelled before ExchangeSink#finish
is completed. This shouldn't happen in practice, as the scheduler is not expected to cancel tasks that are writing to a spooling exchange. However from the interface perspective it is possible. I was thinking about what's the best way to handle this situation. When the output is complete and the task is cancelled the output itself is valid. So letting it finish should be perfectly fine. However sending an "abort" to the sink gives the ExchangeSink
implementation to cancel commit if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline.
Removing the check to ensure abort
is always called if the finish
hasn't succeeded.
d62fd13
to
5d2d3a0
Compare
It feels that |
Currently it is not blocking. It returns a feature and the |
c2ec962
to
df84888
Compare
Need to wait for futures to complete here https://github.com/trinodb/trino/blob/master/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestExchangeManager.java#L167,L170 |
Good catch |
e7d4c69
to
2a9a495
Compare
Rebased on top of #10507 Applied necessary changes to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
log.warn(e, "error closing buffer"); | ||
} | ||
finally { | ||
memoryContext.setBytes(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we have a test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would probably require creating Exchange
mocks that can throw an exception on close. I wonder if it's worth it given that we don't have memory counting tests even for happy path scenarios.
{ | ||
requireNonNull(throwable, "throwable is null"); | ||
|
||
failureCause.compareAndSet(null, throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment: the method has to preserve only the first failure that made the transition.
?
|
||
failureCause.compareAndSet(null, throwable); | ||
return state.setIf(FAILED, oldState -> !oldState.isTerminal()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failure cause can be set before state transitions to FAILED
. Are we sure that won't cause any troubles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The failureCause
is only expected to be explored when the buffer is in the FAILED
state. If the buffer transitioned to ABORTED
in a meantime the failure cause is not expected to be queried.
core/trino-main/src/main/java/io/trino/execution/buffer/OutputBuffer.java
Show resolved
Hide resolved
updateMemoryUsage(exchangeSink.getMemoryUsage()); | ||
} | ||
} | ||
// Abort the buffer if it hasn't been finished. This is possible when a task is cancelled early by the coordinator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that description is confusing:
This is possible when a task is cancelled early by the coordinator.
and
Task cancellation is not supported as the task output is expected to be deterministic.
Both can't be true at same time, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task cancellation is not expected to be requested by coordinator. It can only be requested if there's a bug in the scheduler. However if this situation happens (e.g.: due to a bug) it is safer to invalidate the buffer with abort
to avoid publishing incomplete data to the exchange service.
Added one more sentence to elaborate it.
core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java
Show resolved
Hide resolved
try { | ||
exchangeSink.abort().whenComplete((result, failure) -> { | ||
if (failure != null) { | ||
log.warn(failure, "Error aborting exchange sink"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error?
}); | ||
} | ||
catch (RuntimeException e) { | ||
log.warn(e, "Error aborting exchange sink"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error?
exchangeSource.close(); | ||
} | ||
catch (RuntimeException e) { | ||
log.warn(e, "error closing exchange source"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error?
core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java
Show resolved
Hide resolved
2a9a495
to
9c3fde7
Compare
Encapsulate state transition logic shared between all output buffers in a single place. This will also help with extending the state machine to support failing a buffer with a specific exception that can be stored in the OutputBufferStateMachine
Preparation needed to allow failure handling
To be consistent with OutputBuffer#destroy() which does essentially the same operation but for all the buffers.
It seems to be more consistent with the naming in other places in the codebase (e.g.: abortTask). Also it will help to disambiguate a failure (when something failed inside an output buffer and must be reported) and an abort (when a buffer is explicitly aborted by the engine).
ExchangeSink#finish is called to commit ExchangeSink when noMorePages is set on the SpoolingExchangeOutputBuffer. The setNoMorePages method is assumed to be lightweight and is called from a thread pool designed to handle lightweight task notifications. By default the thread pool size is only 5 threads large. It is not ideal to simply increase thread pool size as it is hard to know what specific output buffer will be used and whether any heavyweight processing on "noMorePages" is needed. Instead this commit changes the finish and abort operations on ExchangeSink to be non blocking. With this approach the ExchangeSink will be free to implement it's own commit strategy without blocking the engine thread pools.
The isBlocked method accesses fields that must be accessed under a lock
9c3fde7
to
12ede0b
Compare
This came up during a discussion with @linzebing . It looks like currently the
noMorePages
anddestroy
methods could be called from a tiny thread pool designed to handle lightweight task notifications, for example:https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java#L640
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java#L568
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java#L602
Where the
notificationExecutor
is shared between all tasks and by default only has 5 threads in the pool: https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java#L79The commit operation on
ExchangeSink
could be quite time consuming (as it may require to flush existing buffers, create files and so on). So it looks like it is better to provide a non blockingExchangeSink
interface.I will update the commit message.