-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve scaling speed and some cleanup #18005
Conversation
b6afb98
to
c4544f3
Compare
c4544f3
to
30890c3
Compare
07dfa35
to
ca576ed
Compare
b8efb88
to
d21b698
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterExchanger.java
Show resolved
Hide resolved
@@ -246,6 +247,11 @@ public void setFinishedFuture(ListenableFuture<Void> finishedFuture) | |||
checkState(this.finishedFuture.getAndSet(requireNonNull(finishedFuture, "finishedFuture is null")) == null, "finishedFuture already set"); | |||
} | |||
|
|||
public void recordWriterInputDataSize(long sizeInBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need extra methods for this? I think this value is just same as inputDataSize
for TableWriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do this. However, then we have to sum inputDataSize
for only TableWriter
operator type in DriverContext
to get writerInputDataSize
.
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TaskContext.java
Outdated
Show resolved
Hide resolved
767f85e
to
2a910c0
Compare
2a910c0
to
1120e16
Compare
core/trino-main/src/test/java/io/trino/operator/exchange/TestLocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
@@ -1471,11 +1471,13 @@ default Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, | |||
return Optional.empty(); | |||
} | |||
|
|||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a comment that it's now used only in test code and is not needed for any engine logic anymore cc @findepi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused methods should be removed from here.
For test-only functionality, this should be handled purely on the test side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to give it 1 iteration of release IMO. cc @kokosing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see a reason to keep unused method in ConnectorMetadata.
if someone wants to support two versions at a time, they can still compile their ConnectorMetadata
impl (just without @Override
annotation)
anyway, don't feel strongly about one version of not, just please do follow up. you should be able to create commit/PR without waiting for anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findepi I think we have one-version-backward compatibility policy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaurav8297 can you put up a PR that will remove these deprecated methods? to be merged after 422 goes out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this is part of #18561
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created another pr #18617
12224f8
to
4bd4609
Compare
Since writers are both CPU and IO bound, we should increase them to the same value as the number of worker threads.
Benchmarks: Before: Input Size: 6B rows Time: 7:24 mins After: Input Size: 6B rows Time: 4:31 mins
This is a preparatory commit to replace the dependency of physicalWrittenBytes from writer scaling. Instead, we will use the uncompress data processed by the writer.
This is a hidden property which is required to test end-to-end scaling partitioned writers in different scenarios.
Since writers are both CPU and IO bound, we should increase them to the same value as the number of worker threads.
Benchmarks: Before: Input Size: 3B rows with 3 skewed partitions Time: 3:43 mins After: Input Size: 3B rows with 3 skewed partitions Time: 2:16 mins
Now we don't have a dependency on physicalWrittenBytes for writer scaling in partitioned and unpartitioned case. Thus, we don't have to depend on supportsReportingWrittenBytes flag coming from connector. Besides, any connector can utilize writer scaling now.
4bd4609
to
4806376
Compare
The test attemps to ensure that queries will fail if one of the nodes holding a partition of the data goes offline. However, it's fundamentally broken, as it doesn't do anything to ensure that the node that's being shut down actually contains any data for the table in question. With the recent changes in trinodb#18005 that enable writer scaling for any connector, it is very likely that the table will land in a subset of the nodes and cause the test to fail if the "wrong" node is shut down.
The test attemps to ensure that queries will fail if one of the nodes holding a partition of the data goes offline. However, it's fundamentally broken, as it doesn't do anything to ensure that the node that's being shut down actually contains any data for the table in question. With the recent changes in #18005 that enable writer scaling for any connector, it is very likely that the table will land in a subset of the nodes and cause the test to fail if the "wrong" node is shut down.
Hey @gaurav8297! Nice one! I was wondering about the benchmarks that you've conducted.
I'm asking because these tests seem great for me even before the change. We're trying to insert with benthos, and Trino chokes on 1000 rows/sec (even with blackhole connector). Ofc our coordinator is kind of small (16vcpus, 32Gb ram), but I'd expect it to be able to ingest more. |
Hard to tell what is your bottleneck (might not be insert). Maybe if you shared query JSON it would be possible to tell more. What is your target connector? |
Hey @sopel39 , thanks for the quick response! Our Setup
Our InvestigationSo in our investigation we've discovered that PLANNING phase takes a long time:
And if we zoom in on the PLANNER:
which is 3.69 seconds. SidenoteOn the other hand these are the results of the query you've run for this PR:
Which looks fine, so probably it's for different reasons. Do you want me to create a separate issue for this? |
Please create an issue for this |
Description
Benchmarks
For unpartitioned case:
Before:
After:
For partitioned case:
Before:
After:
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: