Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement adaptive partitioning for FTE based on input #17024

Merged
merged 2 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public final class SystemSessionProperties
public static final String JOIN_DISTRIBUTION_TYPE = "join_distribution_type";
public static final String JOIN_MAX_BROADCAST_TABLE_SIZE = "join_max_broadcast_table_size";
public static final String JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR = "join_multi_clause_independence_factor";
public static final String DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED = "determine_partition_count_for_write_enabled";
public static final String MAX_HASH_PARTITION_COUNT = "max_hash_partition_count";
public static final String MIN_HASH_PARTITION_COUNT = "min_hash_partition_count";
public static final String MIN_HASH_PARTITION_COUNT_FOR_WRITE = "min_hash_partition_count_for_write";
public static final String PREFER_STREAMING_OPERATORS = "prefer_streaming_operators";
public static final String TASK_WRITER_COUNT = "task_writer_count";
public static final String TASK_PARTITIONED_WRITER_COUNT = "task_partitioned_writer_count";
Expand Down Expand Up @@ -180,7 +182,9 @@ public final class SystemSessionProperties
public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY = "fault_tolerant_execution_task_memory";
public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY_GROWTH_FACTOR = "fault_tolerant_execution_task_memory_growth_factor";
public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE = "fault_tolerant_execution_task_memory_estimation_quantile";
public static final String FAULT_TOLERANT_EXECUTION_PARTITION_COUNT = "fault_tolerant_execution_partition_count";
public static final String FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT = "fault_tolerant_execution_max_partition_count";
public static final String FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT = "fault_tolerant_execution_min_partition_count";
public static final String FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE = "fault_tolerant_execution_min_partition_count_for_write";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold";
Expand Down Expand Up @@ -254,6 +258,11 @@ public SystemSessionProperties(
false,
value -> validateDoubleRange(value, JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, 0.0, 1.0),
value -> value),
booleanProperty(
DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED,
"Determine the number of partitions based on amount of data read and processed by the query for write queries",
false,
false),
integerProperty(
MAX_HASH_PARTITION_COUNT,
"Maximum number of partitions for distributed joins and aggregations",
Expand All @@ -266,6 +275,12 @@ public SystemSessionProperties(
queryManagerConfig.getMinHashPartitionCount(),
value -> validateIntegerValue(value, MIN_HASH_PARTITION_COUNT, 1, false),
false),
integerProperty(
MIN_HASH_PARTITION_COUNT_FOR_WRITE,
"Minimum number of partitions for distributed joins and aggregations in write queries",
queryManagerConfig.getMinHashPartitionCountForWrite(),
value -> validateIntegerValue(value, MIN_HASH_PARTITION_COUNT_FOR_WRITE, 1, false),
false),
booleanProperty(
PREFER_STREAMING_OPERATORS,
"Prefer source table layouts that produce streaming operators",
Expand Down Expand Up @@ -913,9 +928,22 @@ public SystemSessionProperties(
value -> validateDoubleRange(value, FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE, 0.0, 1.0),
false),
integerProperty(
FAULT_TOLERANT_EXECUTION_PARTITION_COUNT,
"Number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled",
queryManagerConfig.getFaultTolerantExecutionPartitionCount(),
FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT,
"Maximum number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled",
queryManagerConfig.getFaultTolerantExecutionMaxPartitionCount(),
value -> validateIntegerValue(value, FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, 1, false),
false),
integerProperty(
FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT,
"Minimum number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled",
queryManagerConfig.getFaultTolerantExecutionMinPartitionCount(),
value -> validateIntegerValue(value, FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, 1, false),
false),
integerProperty(
FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE,
"Minimum number of partitions for distributed joins and aggregations in write queries executed with fault tolerant execution enabled",
queryManagerConfig.getFaultTolerantExecutionMinPartitionCountForWrite(),
value -> validateIntegerValue(value, FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE, 1, false),
false),
booleanProperty(
ADAPTIVE_PARTIAL_AGGREGATION_ENABLED,
Expand Down Expand Up @@ -1026,6 +1054,11 @@ public static double getJoinMultiClauseIndependenceFactor(Session session)
return session.getSystemProperty(JOIN_MULTI_CLAUSE_INDEPENDENCE_FACTOR, Double.class);
}

public static boolean isDeterminePartitionCountForWriteEnabled(Session session)
{
return session.getSystemProperty(DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED, Boolean.class);
}

public static int getMaxHashPartitionCount(Session session)
{
return session.getSystemProperty(MAX_HASH_PARTITION_COUNT, Integer.class);
Expand All @@ -1036,6 +1069,11 @@ public static int getMinHashPartitionCount(Session session)
return session.getSystemProperty(MIN_HASH_PARTITION_COUNT, Integer.class);
}

public static int getMinHashPartitionCountForWrite(Session session)
{
return session.getSystemProperty(MIN_HASH_PARTITION_COUNT_FOR_WRITE, Integer.class);
}

public static boolean preferStreamingOperators(Session session)
{
return session.getSystemProperty(PREFER_STREAMING_OPERATORS, Boolean.class);
Expand Down Expand Up @@ -1699,9 +1737,19 @@ public static double getFaultTolerantExecutionTaskMemoryEstimationQuantile(Sessi
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE, Double.class);
}

public static int getFaultTolerantExecutionPartitionCount(Session session)
public static int getFaultTolerantExecutionMaxPartitionCount(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, Integer.class);
}

public static int getFaultTolerantExecutionMinPartitionCount(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, Integer.class);
}

public static int getFaultTolerantExecutionMinPartitionCountForWrite(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_PARTITION_COUNT, Integer.class);
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT_FOR_WRITE, Integer.class);
}

public static boolean isAdaptivePartialAggregationEnabled(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.function.IntSupplier;

import static io.trino.SystemSessionProperties.getCostEstimationWorkerCount;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionPartitionCount;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount;
import static io.trino.SystemSessionProperties.getMaxHashPartitionCount;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static java.lang.Math.min;
Expand Down Expand Up @@ -70,7 +70,7 @@ public int estimateHashedTaskCount(Session session)
{
int partitionCount;
if (getRetryPolicy(session) == RetryPolicy.TASK) {
partitionCount = getFaultTolerantExecutionPartitionCount(session);
partitionCount = getFaultTolerantExecutionMaxPartitionCount(session);
}
else {
partitionCount = getMaxHashPartitionCount(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ public class QueryManagerConfig
private int maxConcurrentQueries = 1000;
private int maxQueuedQueries = 5000;

private boolean determinePartitionCountForWriteEnabled;
private int maxHashPartitionCount = 100;
private int minHashPartitionCount = 4;
private int minHashPartitionCountForWrite = 50;
private int maxWriterTasksCount = 100;
private Duration minQueryExpireAge = new Duration(15, TimeUnit.MINUTES);
private int maxQueryHistory = 100;
Expand Down Expand Up @@ -120,7 +122,9 @@ public class QueryManagerConfig
private DataSize faultTolerantExecutionStandardSplitSize = DataSize.of(64, MEGABYTE);
private int faultTolerantExecutionMaxTaskSplitCount = 256;
private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15));
private int faultTolerantExecutionPartitionCount = 50;
private int faultTolerantExecutionMaxPartitionCount = 50;
private int faultTolerantExecutionMinPartitionCount = 4;
private int faultTolerantExecutionMinPartitionCountForWrite = 50;
private boolean faultTolerantExecutionForcePreferredWritePartitioningEnabled = true;

@Min(1)
Expand Down Expand Up @@ -179,6 +183,19 @@ public QueryManagerConfig setMaxQueuedQueries(int maxQueuedQueries)
return this;
}

public boolean isDeterminePartitionCountForWriteEnabled()
{
return determinePartitionCountForWriteEnabled;
}

@Config("query.determine-partition-count-for-write-enabled")
@ConfigDescription("Determine the number of partitions based on amount of data read and processed by the query for write queries")
public QueryManagerConfig setDeterminePartitionCountForWriteEnabled(boolean determinePartitionCountForWriteEnabled)
{
this.determinePartitionCountForWriteEnabled = determinePartitionCountForWriteEnabled;
return this;
}

@Min(1)
public int getMaxHashPartitionCount()
{
Expand Down Expand Up @@ -208,6 +225,20 @@ public QueryManagerConfig setMinHashPartitionCount(int minHashPartitionCount)
return this;
}

@Min(1)
public int getMinHashPartitionCountForWrite()
{
return minHashPartitionCountForWrite;
}

@Config("query.min-hash-partition-count-for-write")
@ConfigDescription("Minimum number of partitions for distributed joins and aggregations in write queries")
public QueryManagerConfig setMinHashPartitionCountForWrite(int minHashPartitionCountForWrite)
{
this.minHashPartitionCountForWrite = minHashPartitionCountForWrite;
return this;
}

@Min(1)
public int getMaxWriterTasksCount()
{
Expand Down Expand Up @@ -860,16 +891,45 @@ public QueryManagerConfig setFaultTolerantExecutionTaskDescriptorStorageMaxMemor
}

@Min(1)
public int getFaultTolerantExecutionPartitionCount()
public int getFaultTolerantExecutionMaxPartitionCount()
{
return faultTolerantExecutionMaxPartitionCount;
}

@Config("fault-tolerant-execution-max-partition-count")
@LegacyConfig("fault-tolerant-execution-partition-count")
@ConfigDescription("Maximum number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled")
public QueryManagerConfig setFaultTolerantExecutionMaxPartitionCount(int faultTolerantExecutionMaxPartitionCount)
{
this.faultTolerantExecutionMaxPartitionCount = faultTolerantExecutionMaxPartitionCount;
return this;
}

@Min(1)
public int getFaultTolerantExecutionMinPartitionCount()
{
return faultTolerantExecutionMinPartitionCount;
}

@Config("fault-tolerant-execution-min-partition-count")
@ConfigDescription("Minimum number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled")
public QueryManagerConfig setFaultTolerantExecutionMinPartitionCount(int faultTolerantExecutionMinPartitionCount)
{
this.faultTolerantExecutionMinPartitionCount = faultTolerantExecutionMinPartitionCount;
return this;
}

@Min(1)
public int getFaultTolerantExecutionMinPartitionCountForWrite()
{
return faultTolerantExecutionPartitionCount;
return faultTolerantExecutionMinPartitionCountForWrite;
}

@Config("fault-tolerant-execution-partition-count")
@ConfigDescription("Number of partitions for distributed joins and aggregations executed with fault tolerant execution enabled")
public QueryManagerConfig setFaultTolerantExecutionPartitionCount(int faultTolerantExecutionPartitionCount)
@Config("fault-tolerant-execution-min-partition-count-for-write")
@ConfigDescription("Minimum number of partitions for distributed joins and aggregations in write queries executed with fault tolerant execution enabled")
public QueryManagerConfig setFaultTolerantExecutionMinPartitionCountForWrite(int faultTolerantExecutionMinPartitionCountForWrite)
{
this.faultTolerantExecutionPartitionCount = faultTolerantExecutionPartitionCount;
this.faultTolerantExecutionMinPartitionCountForWrite = faultTolerantExecutionMinPartitionCountForWrite;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
import static com.google.common.util.concurrent.Futures.getDone;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionDefaultCoordinatorTaskMemory;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionPartitionCount;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount;
import static io.trino.SystemSessionProperties.getMaxTasksWaitingForExecutionPerQuery;
import static io.trino.SystemSessionProperties.getMaxTasksWaitingForNodePerStage;
import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor;
Expand Down Expand Up @@ -270,7 +270,7 @@ public synchronized void start()
FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory = new FaultTolerantPartitioningSchemeFactory(
nodePartitioningManager,
session,
getFaultTolerantExecutionPartitionCount(session));
getFaultTolerantExecutionMaxPartitionCount(session));
Closer closer = Closer.create();
NodeAllocator nodeAllocator = closer.register(nodeAllocatorService.getNodeAllocator(session));
try {
Expand Down Expand Up @@ -824,11 +824,13 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, int sch
stage.getStageSpan(),
fragment,
sourceExchanges.buildOrThrow(),
partitioningSchemeFactory.get(fragment.getPartitioning()),
partitioningSchemeFactory.get(fragment.getPartitioning(), fragment.getPartitionCount()),
stage::recordGetSplitTime,
outputDataSizeEstimates.buildOrThrow()));

FaultTolerantPartitioningScheme sinkPartitioningScheme = partitioningSchemeFactory.get(fragment.getOutputPartitioningScheme().getPartitioning().getHandle());
FaultTolerantPartitioningScheme sinkPartitioningScheme = partitioningSchemeFactory.get(
fragment.getOutputPartitioningScheme().getPartitioning().getHandle(),
fragment.getOutputPartitioningScheme().getPartitionCount());
ExchangeContext exchangeContext = new ExchangeContext(queryStateMachine.getQueryId(), new ExchangeId("external-exchange-" + stage.getStageId().getId()));

boolean preserveOrderWithinPartition = rootFragment && stage.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,43 @@ public class FaultTolerantPartitioningSchemeFactory
{
private final NodePartitioningManager nodePartitioningManager;
private final Session session;
private final int partitionCount;
private final int maxPartitionCount;

private final Map<PartitioningHandle, FaultTolerantPartitioningScheme> cache = new HashMap<>();

public FaultTolerantPartitioningSchemeFactory(NodePartitioningManager nodePartitioningManager, Session session, int partitionCount)
public FaultTolerantPartitioningSchemeFactory(NodePartitioningManager nodePartitioningManager, Session session, int maxPartitionCount)
{
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.session = requireNonNull(session, "session is null");
this.partitionCount = partitionCount;
this.maxPartitionCount = maxPartitionCount;
}

public FaultTolerantPartitioningScheme get(PartitioningHandle handle)
public FaultTolerantPartitioningScheme get(PartitioningHandle handle, Optional<Integer> partitionCount)
{
FaultTolerantPartitioningScheme result = cache.get(handle);
if (result == null) {
// Avoid using computeIfAbsent as the "get" method is called recursively from the "create" method
result = create(handle);
result = create(handle, partitionCount);
cache.put(handle, result);
}
return result;
}

private FaultTolerantPartitioningScheme create(PartitioningHandle partitioningHandle)
private FaultTolerantPartitioningScheme create(PartitioningHandle partitioningHandle, Optional<Integer> partitionCount)
{
if (partitioningHandle.getConnectorHandle() instanceof MergePartitioningHandle mergePartitioningHandle) {
return mergePartitioningHandle.getFaultTolerantPartitioningScheme(this::get);
return mergePartitioningHandle.getFaultTolerantPartitioningScheme(handle -> this.get(handle, partitionCount));
}
if (partitioningHandle.equals(FIXED_HASH_DISTRIBUTION) || partitioningHandle.equals(SCALED_WRITER_HASH_DISTRIBUTION)) {
return createSystemSchema(partitionCount);
return createSystemSchema(partitionCount.orElse(maxPartitionCount));
}
if (partitioningHandle.getCatalogHandle().isPresent()) {
Optional<ConnectorBucketNodeMap> connectorBucketNodeMap = nodePartitioningManager.getConnectorBucketNodeMap(session, partitioningHandle);
if (connectorBucketNodeMap.isEmpty()) {
return createSystemSchema(partitionCount);
return createSystemSchema(partitionCount.orElse(maxPartitionCount));
}
ToIntFunction<Split> splitToBucket = nodePartitioningManager.getSplitToBucket(session, partitioningHandle);
return createConnectorSpecificSchema(partitionCount, connectorBucketNodeMap.get(), splitToBucket);
return createConnectorSpecificSchema(partitionCount.orElse(maxPartitionCount), connectorBucketNodeMap.get(), splitToBucket);
}
return new FaultTolerantPartitioningScheme(1, Optional.empty(), Optional.empty(), Optional.empty());
}
Expand Down
Loading