Skip to content

Commit

Permalink
Relax condition for allowing writer tasks splitting
Browse files Browse the repository at this point in the history
Current condition which governed if given partition can be split between
multiple writer tasks allowed only SCALED_WRITER_HASH_DISTRIBUTION. This
effectively disabled writing single partition from multiple writer tasks
if connector based partitioning was used, like e.g. Icebergs
  day(some_timestamp_column)
  • Loading branch information
losipiuk committed Apr 23, 2024
1 parent 9cca381 commit c920022
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -79,9 +78,9 @@ public static HashDistributionSplitAssigner create(
int targetMinTaskCount,
int targetMaxTaskCount)
{
if (fragment.getPartitioning().equals(SCALED_WRITER_HASH_DISTRIBUTION)) {
if (fragment.getPartitioning().isScaleWriters()) {
verify(fragment.getPartitionedSources().isEmpty() && fragment.getRemoteSourceNodes().size() == 1,
"SCALED_WRITER_HASH_DISTRIBUTION fragments are expected to have exactly one remote source and no table scans");
"fragments using scale-writers partitioning are expected to have exactly one remote source and no table scans");
}
return new HashDistributionSplitAssigner(
catalogRequirement,
Expand All @@ -95,7 +94,7 @@ public static HashDistributionSplitAssigner create(
targetPartitionSizeInBytes,
targetMinTaskCount,
targetMaxTaskCount,
sourceId -> fragment.getPartitioning().equals(SCALED_WRITER_HASH_DISTRIBUTION),
sourceId -> fragment.getPartitioning().isScaleWriters(),
// never merge partitions for table write to avoid running into the maximum writers limit per task
!isWriteFragment(fragment)));
}
Expand Down

0 comments on commit c920022

Please sign in to comment.