-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deduplication buffer spooling #10507
Deduplication buffer spooling #10507
Conversation
8d918d5
to
ce974f1
Compare
ce974f1
to
ee83e6b
Compare
core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientFactory.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientConfig.java
Show resolved
Hide resolved
@@ -28,6 +28,8 @@ private FaultTolerantExecutionConnectorTestHelper() {} | |||
.put("query.initial-hash-partitions", "5") | |||
.put("fault-tolerant-execution-target-task-input-size", "10MB") | |||
.put("fault-tolerant-execution-target-task-split-count", "4") | |||
// to trigger spilling | |||
.put("exchange.deduplication-buffer-size", "1kB") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have some test (just single use-case) to verify that spilling actually happens if this value is low, and does not if it is high?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to cover it with unit tests. I may try to extract statistics for the deduplication buffer and check whether spilledBytes is not 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually not that easy to extract operator statistics. There's a unit test that verifies whether spilling actually takes places. Verifying it here is more of verify whether the configuration parameter has been correctly passed to the buffer, as it is extremely unlikely that the hundreds of queries executed are not actually exceeding the 1kB limit. I wonder if it's worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not pushing too much. But getting stats should not be that hard with AbstractTestQueryFramework#assertQueryStats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spilling related stats are stored in the OperatorInfo
within the OperatorStats
. To get the right OperatorStats
you somehow need to know the stageId
, pipelineId
and the operatorId
for the correct ExchangeOperator
. While it is possible to deduce these values (or even hard code) it is not the only challenge.
For SELECT
queries the DeduplicationBuffer
is used in the ExchangeClient
created to fetch final query results (in Query.java
). The stats from this buffer are not available anywhere at the moment.
So it leaves only the INSERT
queries to play with. But for INSERT
queries the data size produced by table writers is not well predictable. It may vary based on seemingly unrelated changes to the connector and may cause weird failures.
In theory it is possible to set the limit to something extremely low, e.g.: 10B
. But then it will cause all queries to spill while I wanted to also exercise the "non spilling" path as well.
While I would also feel more comfortable if spilling was somehow explicitly asserted (at least for a single query in a test suite) I somehow not extremely worried about it.
The DeduplicationBuffer
tested for spilling scenarios with a unit test. That assures that the spilling implementation in the DeduplicationBuffer
works.
It looks like there are just two possible scenarios in what the spilling code path might not get exercised with the integration tests:
- The property is not passed correctly to the
DeduplicationBuffer
. I consider it rather unlikely. If the property name changes the test will fail with an unknown property name. Also I think changes that impede property propagation rather unlikely? - There's no queries that produce more than 1kb of output. While most of the queries don't produce too much output there are many queries that produce significant output. To verify that I added logging to both, spilling and non spilling paths and both of them are exercised with many tests.
ee83e6b
to
ff76dcd
Compare
@@ -106,6 +106,8 @@ protected final QueryRunner createQueryRunner() | |||
.put("failure-injection.request-timeout", new Duration(REQUEST_TIMEOUT.toMillis() * 2, MILLISECONDS).toString()) | |||
.put("exchange.http-client.idle-timeout", REQUEST_TIMEOUT.toString()) | |||
.put("query.initial-hash-partitions", "5") | |||
// to trigger spilling | |||
.put("exchange.deduplication-buffer-size", "1kB") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.put("exchange.deduplication-buffer-size", "1kB") | |
.put("exchange.deduplication-buffer-size", "1KB") |
Same below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why, but kilo bytes are actually lower case: https://github.com/airlift/units/blob/0e46afc2cf1196da4cb5d772f6b07482b648c860/src/main/java/io/airlift/units/DataSize.java#L302
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that is just the way it is: https://en.wikipedia.org/wiki/Metric_prefix
.../trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java
Show resolved
Hide resolved
ExchangeSinkHandle sinkHandle = exchange.addSink(0); | ||
sinkInstanceHandle = exchange.instantiateSink(sinkHandle, 0); | ||
exchange.noMoreSinks(); | ||
exchangeSink = exchangeManager.createSink(sinkInstanceHandle, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For exchange spooling on local file system and S3, does preserveRecordsOrder
make a difference at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The preserveRecordsOrder
flags tells the ExchangeSink
to preserve order in which records are added to a partition. Thus for example writing a single partition into multiple files in a randomized fashion should not be allowed. It doesn't impact the current (local file system) implementation as the records are appended to a single file in order. For S3 implementation I guess it should be the same.
long result = 0; | ||
for (Slice page : pages) { | ||
result += page.getRetainedSize(); | ||
} | ||
return result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long result = 0; | |
for (Slice page : pages) { | |
result += page.getRetainedSize(); | |
} | |
return result; | |
return pages.stream().mapToLong(Slice::getRetainedSize).sum() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied it from the streaming buffer where this operation is done using a loop (probably for performance reasons)? It shouldn't matter that much though, but at the same time it doesn't change readability much either.
|
||
if (!pageBuffer.isEmpty()) { | ||
for (Map.Entry<TaskId, List<Slice>> entry : asMap(pageBuffer).entrySet()) { | ||
writeToSink(entry.getKey(), entry.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory we can keep them in buffer and don't write to sink. Only spill data that will overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sink is likely to have a buffer on it's own. Maintaining two buffers will increase the memory footprint and make the implementation more complex.
DirectExchangeClient client = directExchangeClientSupplier.get(systemMemoryContext, taskFailureListener, retryPolicy); | ||
DirectExchangeClient client = directExchangeClientSupplier.get( | ||
taskId.getQueryId(), | ||
new ExchangeId(format("direct-exchange-%s-%s", taskId.getStageId().getId(), sourceId)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to include sourceId
here? If I remember correctly, there will be only one deduplication buffer on the coordinator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because a single stage may be consuming more than a single source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it hold for stages not involving coordinators? I don't see you include sourceId
there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somehow confusing to be honest. The external exchanges are named after the producing stage, that is always unique. Direct exchanges named after a consuming stage (that may consume data from multiple producing stages). Naming a direct exchange after a producing stage is not possible, as a single consumer may consume data from multiple source stages.
try (Closer closer = Closer.create()) { | ||
closer.register(exchange); | ||
if (exchangeSink != null) { | ||
closer.register(exchangeSink::abort); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why abort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I forgot to nullify it when creating an output. The idea is "when it's non null - it means it's active and it has to be aborted, when it's null - it's either hasn't been created or already closed"
} | ||
|
||
@NotThreadSafe | ||
private interface OutputSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels that it's more like InputSource
instead of OutputSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read it more as of This is a source of the buffer output
if (exchangeSource == null) { | ||
CompletableFuture<List<ExchangeSourceHandle>> sourceHandlesFuture = exchange.getSourceHandles(); | ||
if (!sourceHandlesFuture.isDone()) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not expected, right? Should we throw an exception in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of invokation of isBlocked
and poolPage
is inherently racy. It is possible that the isBlocked
return "not blocked" (because of the outputReady
feature) but then it become blocked on the sourceHandlesFuture
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand. When isBlocked
returns "not blocked", then sourceHandlesFuture
must have been unblocked, otherwise isBlocked
won't return "not blocked".
When outputReady
is done, you return outputSource.isBlocked()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract between getNext()
and isBlocked()
invocation order is weak. The implementation usually shouldn't assume that the isBlocked
is always in a not blocked state when the getNext
is invoked.
The contract is weak as the engine is allowed to call isBlocked
and getNext
from multiple threads. When more than a single thread calls isBlocked
and receives a signal that the buffer is not blocked both threads may decide to call getNext
but it is possible that the buffer will have data only for one of them.
Under these circumstances it is hard to rely on strong guarantees. For example engine may sometime allow "spurious" wakeups and call getNext
even when isBlocked
hasn't been resolved. Thus it doesn't feel safe to assert here.
} | ||
return buffer.slice(Integer.BYTES * 3, buffer.length() - Integer.BYTES * 3); | ||
} | ||
close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this close()
redundant? We register OutputSource
in closer anyways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is to have only the finished
variable instead of both finished
and closed
that seems redundant. It set's finished
to true
and closes the source. It is also correct to set finished
to true here directly, but then the source should be closed in close
and be controlled by a second closed
variable.
if (exchangeSource != null) { | ||
CompletableFuture<?> blocked = exchangeSource.isBlocked(); | ||
if (!blocked.isDone()) { | ||
return nonCancellationPropagating(asVoid(toListenableFuture(blocked))); | ||
} | ||
} | ||
CompletableFuture<List<ExchangeSourceHandle>> sourceHandles = exchange.getSourceHandles(); | ||
if (!sourceHandles.isDone()) { | ||
return nonCancellationPropagating(asVoid(toListenableFuture(sourceHandles))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels that the logical sequence is to check sourceHandles
then check exchangeSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The if (exchangeSource != null) {
implies that the source is already created, thus the sourceHandles
are already available
core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java
Outdated
Show resolved
Hide resolved
ff76dcd
to
1a20d15
Compare
core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientConfig.java
Show resolved
Hide resolved
@Config("exchange.deduplication-buffer-size") | ||
public DirectExchangeClientConfig setDeduplicationBufferSize(DataSize deduplicationBufferSize) | ||
{ | ||
this.deduplicationBufferSize = deduplicationBufferSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need requireNotNull
here? I am not sure. It is not common, but I don't know if this is an omission or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For stats we usually prefer validation by @NotNull
that I forgot to add :-)
core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java
Show resolved
Hide resolved
{ | ||
if (failure != null) { | ||
return; | ||
} | ||
|
||
if (inputFinished) { | ||
if (outputSource != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check outputReady
future instead? This would be more consistent with other checks in class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future is unblocked asynchronously =\
} | ||
catch (TrinoException e) { | ||
if (EXCHANGE_MANAGER_NOT_CONFIGURED.toErrorCode().equals(e.getErrorCode())) { | ||
throw new TrinoException(EXCHANGE_MANAGER_NOT_CONFIGURED, "Exchange manager must be configured for failure recovery capabilities to be fully functional"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to rethrow changing message. Is the original one poor - maybe we should just change it?
Also it would be nice to verify configuration earlier that when we really need to use the spooling exchange. Otherwise we may not notice configuration problem for very long time until we hit the affected codepath.
Actually, I believe it is being validated in some other place, but then it would be nice to add a comment that this is the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default message just says "Exchange manager is not configured". The idea was to provide a message that is more actionable and easier to understand.
As an afterthought the alternative message is not very actionable either. I first was thinking about changing the message to:
Capacity of the deduplication buffer required for failure recovery to function has been exceeded. Please configure exchange manager to allow the buffer to spill to an external storage.
But then I thought that the message of the error thrown by the ExchangeManager
should be more actionable in general.
I'm going to remove this overwrite of a message and improve the message in the ExchangeManager
as a separate commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise we may not notice configuration problem for very long time until we hit the affected codepath.
I'm not sure if we should always try to eagerly disallow failure recovery when the ExchangeManager is not configured. For example query level retries may function properly if the buffer capacity is never exceeded. For task level retries some queries (for example simple selects) may function properly as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For task-level retries the set of working queries is so limited that I strongly feel we should check at the very beginning that exchange manager is configured.
For query-level retries ... maybe not. Yet the cost of configuring it so it uses local disk is small. And then it is easier for Trino users to know that when they configured a cluster and it works for some test queries, it will actually work for other ones too.
On the other hand it makes life harder for people who run Trino with disk-less machines.
core/trino-main/src/main/java/io/trino/operator/DeduplicatingDirectExchangeBuffer.java
Show resolved
Hide resolved
verify(writeBuffer != null, "writeBuffer is expected to be initialized"); | ||
for (Slice page : pages) { | ||
// wait for the sink to unblock | ||
getUnchecked(exchangeSink.isBlocked()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to explicitly wait here. What is the semantics of exchangeSink.add
if sink is blocked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to inherent racy nature I would assume that the explicit wait can be omitted (e.g.: a Sink can technically become blocked in between the isBlocked
and add
). However it still feels like it is better to follow the contract.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
If the buffer is already in the failed state it won't be possible to propagate a failed check anyway
1a20d15
to
76a2580
Compare
Once the documentation is available we may also want to extend the message with a reference to failure recovery documentation.
76a2580
to
b4d213b
Compare
@arhimondr let me know if there is still sth to be addressed or good to go |
Nothing from my side. But let's wait for @linzebing to take another look. |
if (!outputReady.isDone()) { | ||
return nonCancellationPropagating(outputReady); | ||
} | ||
|
||
checkState(outputSource != null, "outputSource is expected to be set"); | ||
return outputSource.isBlocked(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!outputReady.isDone()) { | |
return nonCancellationPropagating(outputReady); | |
} | |
checkState(outputSource != null, "outputSource is expected to be set"); | |
return outputSource.isBlocked(); | |
return nonCancellationProgating(Futures.transformAsync(outputReady, ignored -> { | |
synchronized (this) { | |
if (outputSource != null) { | |
return outputSource.isBlocked(); | |
} | |
return immediateVoidFuture(); | |
} | |
}, directExecutor())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly for ExchangeOutputSource::isBlocked()
. Thoughts?
No description provided.