Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connector SPI for scale writers options #18561

Merged
merged 1 commit into from
Aug 17, 2023

Conversation

gaurav8297
Copy link
Member

Description

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Aug 7, 2023
@github-actions github-actions bot added tests:hive iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels Aug 7, 2023
@gaurav8297 gaurav8297 marked this pull request as draft August 7, 2023 00:40
@raunaqmorarka raunaqmorarka requested a review from sopel39 August 7, 2023 04:41
@gaurav8297 gaurav8297 marked this pull request as ready for review August 8, 2023 06:41

import static java.util.Objects.requireNonNull;

public record WriterScalingOptions(boolean isWriterTasksScalingEnabled, boolean isPerTaskWriterScalingEnabled, Optional<Integer> taskScalingMaxWriterCount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskScalingMaxWriterCount - is this per task or per table writer node?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is per task

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have another method getMaxWriterTasks in the SPI which helps to control the maximum number of tasks. How about if we put it in the WriterScalingOptions itself? cc @raunaqmorarka

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on why getMaxWriterTasks was introduced. If it was to limit the maximum amount of write parallelism to an external data source, then the presence of per-task scaling already breaks that.
Per task writer scaling should probably be disabled when getMaxWriterTasks returns something non-empty, as it appears that jdbc connectors call this "write_parallelism".
We should probably deal with that separately to avoid getting blocked on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@radek-kondziolka radek-kondziolka Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getMaxWriterTasks was introduced to limit number of writing tasks. From what I know jdbc connectors need that and use that - I know it from talks with different people - I did not confirm that so I am not sure.
It does not interfere with parallelism within task because it limit statically number of tasks.

If getMaxWriterTasks is not empty we could disable "local" scaling or "respect" this limit in local scaling.
@gaurav8297 , is it hard to disable / respect this number?

Copy link
Member Author

@gaurav8297 gaurav8297 Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then the presence of per-task scaling already breaks that.

Yes, but after this PR we won't have writer scaling enabled for JDBC connectors.

As @radek-starburst mentioned, getMaxWriterTasks is there to limit the number of writing tasks. It doesn't have any control over what happens within a task. If a connector wants to control parallelism with scaling, then they ideally should set the value of both maxWriterTasks and perTaskMaxWriterCount.

Or, maybe instead of having two properties, we can have just one property maxWriters which controls the total number of writers in the cluster. We can look into this in future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think this property works even when you've scaling disabled which is a case where we assign writing tasks to all the available workers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that getMaxWriterTasks limits number of writing tasks. The question is what underlying problem were we trying to solve by doing that ? If this was about limiting parallelism, then we need to change getMaxWriterTasks to getMaxWriterParallelism and implementing that should disable within task writer scaling. We can do it as a follow-up since landing this PR will side step the problem for now, but we should fix the API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it as a follow-up since landing this PR will side step the problem for now, but we should fix the API.

Yes, we should to it.

@findepi
Copy link
Member

findepi commented Aug 8, 2023

Can you please move Remove supportsReportingWrittenBytes from SPI to separate PR?

Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm
Needs eyes from other reviewers as well


import static java.util.Objects.requireNonNull;

public record WriterScalingOptions(boolean isWriterTasksScalingEnabled, boolean isPerTaskWriterScalingEnabled, Optional<Integer> taskScalingMaxWriterCount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on why getMaxWriterTasks was introduced. If it was to limit the maximum amount of write parallelism to an external data source, then the presence of per-task scaling already breaks that.
Per task writer scaling should probably be disabled when getMaxWriterTasks returns something non-empty, as it appears that jdbc connectors call this "write_parallelism".
We should probably deal with that separately to avoid getting blocked on that.

@findepi
Copy link
Member

findepi commented Aug 9, 2023

the CI seems red

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments. I agree with @raunaqmorarka on 2ac76ee#r1287956305


import static java.util.Objects.requireNonNull;

public record WriterScalingOptions(boolean isWriterTasksScalingEnabled, boolean isPerTaskWriterScalingEnabled, Optional<Integer> taskScalingMaxWriterCount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@findepi
Copy link
Member

findepi commented Aug 9, 2023

Can you please move Remove supportsReportingWrittenBytes from SPI to separate PR?

thanks for thumbs up. feel free to link the PR here once it's created.

@gaurav8297 gaurav8297 force-pushed the scale_writer_flag branch 2 times, most recently from 6164754 to 32525ce Compare August 10, 2023 02:56
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind test failures

.add(node.getPartitioningScheme().getPartitioning().getHandle())
.addAll(collectPartitioningHandles(node.getSources()))
return ImmutableList.<ExchangeNode>builder()
.add(node)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: that has quadratic cost (copying lists over and over)


import static java.util.Objects.requireNonNull;

public record WriterScalingOptions(boolean isWriterTasksScalingEnabled, boolean isPerTaskWriterScalingEnabled, Optional<Integer> perTaskMaxScaledWriterCount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav8297 @raunaqmorarka do we want to keep perTaskMaxScaledWriterCount as it seems redundant per previous discussion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep it for now since it helps to limit parallelism. In future, we can look into combining both maxWriterTasks and this property into a single one.

Using WriterScalingOptions connector can
control scaling by providing the following
configurations.

1. isWriterTasksScalingEnabled
2. isPerTaskWriterScalingEnabled
3. perTaskMaxScaledWriterCount

Additionally, for now scaling is only enabled
for hive, iceberg and delta connector.
@sopel39 sopel39 merged commit 996171c into trinodb:master Aug 17, 2023
@sopel39
Copy link
Member

sopel39 commented Aug 17, 2023

thx!

@sopel39 sopel39 mentioned this pull request Aug 17, 2023
@github-actions github-actions bot added this to the 424 milestone Aug 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

6 participants