Skip to content

Commit

Permalink
Implement task level failure recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr authored and martint committed Jan 21, 2022
1 parent a4264ea commit 790a24c
Show file tree
Hide file tree
Showing 28 changed files with 4,930 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public final class SystemSessionProperties
public static final String RETRY_INITIAL_DELAY = "retry_initial_delay";
public static final String RETRY_MAX_DELAY = "retry_max_delay";
public static final String HIDE_INACCESSIBLE_COLUMNS = "hide_inaccessible_columns";
public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE = "fault_tolerant_execution_target_task_input_size";
public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT = "fault_tolerant_execution_target_task_split_count";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -693,6 +695,16 @@ public SystemSessionProperties(
"When enabled non-accessible columns are silently filtered from results from SELECT * statements",
featuresConfig.isHideInaccesibleColumns(),
value -> validateHideInaccesibleColumns(value, featuresConfig.isHideInaccesibleColumns()),
false),
dataSizeProperty(
FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE,
"Target size in bytes of all task inputs for a single fault tolerant task",
queryManagerConfig.getFaultTolerantExecutionTargetTaskInputSize(),
false),
integerProperty(
FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT,
"Target number of splits for a single fault tolerant task",
queryManagerConfig.getFaultTolerantExecutionTargetTaskSplitCount(),
false));
}

Expand Down Expand Up @@ -1227,6 +1239,11 @@ public static RetryPolicy getRetryPolicy(Session session)
throw new TrinoException(NOT_SUPPORTED, "Dynamic filtering is not supported with automatic task retries enabled");
}
}
if (retryPolicy == RetryPolicy.TASK) {
if (isGroupedExecutionEnabled(session) || isDynamicScheduleForGroupedExecution(session)) {
throw new TrinoException(NOT_SUPPORTED, "Grouped execution is not supported with task level retries enabled");
}
}
return retryPolicy;
}

Expand All @@ -1249,4 +1266,14 @@ public static boolean isHideInaccesibleColumns(Session session)
{
return session.getSystemProperty(HIDE_INACCESSIBLE_COLUMNS, Boolean.class);
}

public static DataSize getFaultTolerantExecutionTargetTaskInputSize(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE, DataSize.class);
}

public static int getFaultTolerantExecutionTargetTaskSplitCount(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT, Integer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -77,6 +78,9 @@ public class QueryManagerConfig
private Duration retryInitialDelay = new Duration(10, SECONDS);
private Duration retryMaxDelay = new Duration(1, MINUTES);

private DataSize faultTolerantExecutionTargetTaskInputSize = DataSize.of(1, GIGABYTE);
private int faultTolerantExecutionTargetTaskSplitCount = 16;

@Min(1)
public int getScheduleSplitBatchSize()
{
Expand Down Expand Up @@ -441,4 +445,30 @@ public QueryManagerConfig setRetryMaxDelay(Duration retryMaxDelay)
this.retryMaxDelay = retryMaxDelay;
return this;
}

@NotNull
public DataSize getFaultTolerantExecutionTargetTaskInputSize()
{
return faultTolerantExecutionTargetTaskInputSize;
}

@Config("fault-tolerant-execution-target-task-input-size")
public QueryManagerConfig setFaultTolerantExecutionTargetTaskInputSize(DataSize faultTolerantExecutionTargetTaskInputSize)
{
this.faultTolerantExecutionTargetTaskInputSize = faultTolerantExecutionTargetTaskInputSize;
return this;
}

@Min(1)
public int getFaultTolerantExecutionTargetTaskSplitCount()
{
return faultTolerantExecutionTargetTaskSplitCount;
}

@Config("fault-tolerant-execution-target-task-split-count")
public QueryManagerConfig setFaultTolerantExecutionTargetTaskSplitCount(int faultTolerantExecutionTargetTaskSplitCount)
{
this.faultTolerantExecutionTargetTaskSplitCount = faultTolerantExecutionTargetTaskSplitCount;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import io.trino.connector.CatalogName;
import io.trino.cost.CostCalculator;
import io.trino.cost.StatsCalculator;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.QueryPreparer.PreparedQuery;
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.SqlQueryScheduler;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.execution.scheduler.policy.ExecutionPolicy;
import io.trino.execution.warnings.WarningCollector;
import io.trino.failuredetector.FailureDetector;
Expand Down Expand Up @@ -117,6 +119,8 @@ public class SqlQueryExecution
private final TableExecuteContextManager tableExecuteContextManager;
private final TypeAnalyzer typeAnalyzer;
private final TaskManager coordinatorTaskManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final TaskSourceFactory taskSourceFactory;

private SqlQueryExecution(
PreparedQuery preparedQuery,
Expand All @@ -143,7 +147,9 @@ private SqlQueryExecution(
WarningCollector warningCollector,
TableExecuteContextManager tableExecuteContextManager,
TypeAnalyzer typeAnalyzer,
TaskManager coordinatorTaskManager)
TaskManager coordinatorTaskManager,
ExchangeManagerRegistry exchangeManagerRegistry,
TaskSourceFactory taskSourceFactory)
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
this.slug = requireNonNull(slug, "slug is null");
Expand Down Expand Up @@ -199,6 +205,8 @@ private SqlQueryExecution(
this.remoteTaskFactory = new MemoryTrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine);
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");
this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
this.taskSourceFactory = requireNonNull(taskSourceFactory, "taskSourceFactory is null");
}
}

Expand Down Expand Up @@ -511,7 +519,9 @@ private void planDistribution(PlanRoot plan)
tableExecuteContextManager,
plannerContext.getMetadata(),
splitSourceFactory,
coordinatorTaskManager);
coordinatorTaskManager,
exchangeManagerRegistry,
taskSourceFactory);

queryScheduler.set(scheduler);

Expand Down Expand Up @@ -697,6 +707,8 @@ public static class SqlQueryExecutionFactory
private final TableExecuteContextManager tableExecuteContextManager;
private final TypeAnalyzer typeAnalyzer;
private final TaskManager coordinatorTaskManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final TaskSourceFactory taskSourceFactory;

@Inject
SqlQueryExecutionFactory(
Expand All @@ -720,7 +732,9 @@ public static class SqlQueryExecutionFactory
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
TypeAnalyzer typeAnalyzer,
TaskManager coordinatorTaskManager)
TaskManager coordinatorTaskManager,
ExchangeManagerRegistry exchangeManagerRegistry,
TaskSourceFactory taskSourceFactory)
{
requireNonNull(config, "config is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
Expand All @@ -744,6 +758,8 @@ public static class SqlQueryExecutionFactory
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");
this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
this.taskSourceFactory = requireNonNull(taskSourceFactory, "taskSourceFactory is null");
}

@Override
Expand Down Expand Up @@ -782,7 +798,9 @@ public QueryExecution createQueryExecution(
warningCollector,
tableExecuteContextManager,
typeAnalyzer,
coordinatorTaskManager);
coordinatorTaskManager,
exchangeManagerRegistry,
taskSourceFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public BucketNodeMap(ToIntFunction<Split> splitToBucket)

public abstract int getBucketCount();

public abstract int getNodeCount();

public abstract Optional<InternalNode> getAssignedNode(int bucketedId);

public abstract void assignBucketToNode(int bucketedId, InternalNode node);
Expand All @@ -42,4 +44,9 @@ public final Optional<InternalNode> getAssignedNode(Split split)
{
return getAssignedNode(splitToBucket.applyAsInt(split));
}

public final int getBucket(Split split)
{
return splitToBucket.applyAsInt(split);
}
}
Loading

0 comments on commit 790a24c

Please sign in to comment.