Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag if task is speculative to TaskStatus #17316

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public RemoteTask createRemoteTask(
Span stageSpan,
TaskId taskId,
InternalNode node,
boolean speculative,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
Expand All @@ -62,6 +63,7 @@ public RemoteTask createRemoteTask(
stageSpan,
taskId,
node,
speculative,
fragment,
initialSplits,
outputBuffers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface RemoteTask

void setOutputBuffers(OutputBuffers outputBuffers);

void setSpeculative(boolean speculative);

/**
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ RemoteTask createRemoteTask(
Span stageSpan,
TaskId taskId,
InternalNode node,
boolean speculative,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public synchronized Optional<RemoteTask> createTask(
OutputBuffers outputBuffers,
Multimap<PlanNodeId, Split> splits,
Set<PlanNodeId> noMoreSplits,
Optional<DataSize> estimatedMemory)
Optional<DataSize> estimatedMemory,
boolean speculative)
{
if (stateMachine.getState().isDone()) {
return Optional.empty();
Expand All @@ -261,6 +262,7 @@ public synchronized Optional<RemoteTask> createTask(
stateMachine.getStageSpan(),
taskId,
node,
speculative,
stateMachine.getFragment().withBucketToPartition(bucketToPartition),
splits,
outputBuffers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class SqlTask
private final String taskInstanceId;
private final URI location;
private final String nodeId;
private final AtomicBoolean speculative = new AtomicBoolean(false);
private final TaskStateMachine taskStateMachine;
private final OutputBuffer outputBuffer;
private final QueryContext queryContext;
Expand Down Expand Up @@ -376,6 +377,7 @@ else if (taskHolder.getTaskExecution() != null) {
state,
location,
nodeId,
speculative.get(),
failures,
queuedPartitionedDrivers,
runningPartitionedDrivers,
Expand Down Expand Up @@ -468,7 +470,8 @@ public TaskInfo updateTask(
Optional<PlanFragment> fragment,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains)
Map<DynamicFilterId, Domain> dynamicFilterDomains,
boolean speculative)
{
try {
// trace token must be set first to make sure failure injection for getTaskResults requests works as expected
Expand All @@ -495,6 +498,9 @@ public TaskInfo updateTask(
taskExecution.addSplitAssignments(splitAssignments);
taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);
}

// update speculative flag
this.speculative.set(speculative);
}
catch (Error e) {
failed(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,11 @@ public TaskInfo updateTask(
Optional<PlanFragment> fragment,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains)
Map<DynamicFilterId, Domain> dynamicFilterDomains,
boolean speculative)
{
try {
return versionEmbedder.embedVersion(() -> doUpdateTask(session, taskId, stageSpan, fragment, splitAssignments, outputBuffers, dynamicFilterDomains)).call();
return versionEmbedder.embedVersion(() -> doUpdateTask(session, taskId, stageSpan, fragment, splitAssignments, outputBuffers, dynamicFilterDomains, speculative)).call();
}
catch (Exception e) {
throwIfUnchecked(e);
Expand All @@ -489,7 +490,8 @@ private TaskInfo doUpdateTask(
Optional<PlanFragment> fragment,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains)
Map<DynamicFilterId, Domain> dynamicFilterDomains,
boolean speculative)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
Expand Down Expand Up @@ -528,7 +530,7 @@ private TaskInfo doUpdateTask(
});

sqlTask.recordHeartbeat();
return sqlTask.updateTask(session, stageSpan, fragment, splitAssignments, outputBuffers, dynamicFilterDomains);
return sqlTask.updateTask(session, stageSpan, fragment, splitAssignments, outputBuffers, dynamicFilterDomains, speculative);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ public String toString()
.toString();
}

public static TaskInfo createInitialTask(TaskId taskId, URI location, String nodeId, Optional<List<PipelinedBufferInfo>> pipelinedBufferStates, TaskStats taskStats)
public static TaskInfo createInitialTask(TaskId taskId, URI location, String nodeId, boolean speculative, Optional<List<PipelinedBufferInfo>> pipelinedBufferStates, TaskStats taskStats)
{
return new TaskInfo(
initialTaskStatus(taskId, location, nodeId),
initialTaskStatus(taskId, location, nodeId, speculative),
DateTime.now(),
new OutputBufferInfo(
"UNINITIALIZED",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class TaskStatus
private final TaskState state;
private final URI self;
private final String nodeId;
private final boolean speculative;

private final int queuedPartitionedDrivers;
private final long queuedPartitionedSplitsWeight;
Expand Down Expand Up @@ -80,6 +81,7 @@ public TaskStatus(
@JsonProperty("state") TaskState state,
@JsonProperty("self") URI self,
@JsonProperty("nodeId") String nodeId,
boolean speculative,
@JsonProperty("failures") List<ExecutionFailureInfo> failures,
@JsonProperty("queuedPartitionedDrivers") int queuedPartitionedDrivers,
@JsonProperty("runningPartitionedDrivers") int runningPartitionedDrivers,
Expand All @@ -104,6 +106,7 @@ public TaskStatus(
this.state = requireNonNull(state, "state is null");
this.self = requireNonNull(self, "self is null");
this.nodeId = requireNonNull(nodeId, "nodeId is null");
this.speculative = speculative;

checkArgument(queuedPartitionedDrivers >= 0, "queuedPartitionedDrivers must be positive");
this.queuedPartitionedDrivers = queuedPartitionedDrivers;
Expand Down Expand Up @@ -268,7 +271,7 @@ public String toString()
.toString();
}

public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String nodeId)
public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String nodeId, boolean speculative)
{
return new TaskStatus(
taskId,
Expand All @@ -277,6 +280,7 @@ public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String n
PLANNED,
location,
nodeId,
speculative,
ImmutableList.of(),
0,
0,
Expand All @@ -303,6 +307,7 @@ public static TaskStatus failWith(TaskStatus taskStatus, TaskState state, List<E
state,
taskStatus.getSelf(),
taskStatus.getNodeId(),
false,
exceptions,
taskStatus.getQueuedPartitionedDrivers(),
taskStatus.getRunningPartitionedDrivers(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ public boolean remove(E element)

@Override
public E poll()
{
Entry<E> entry = pollEntry();
if (entry == null) {
return null;
}
return entry.getValue();
}

public Prioritized<E> pollPrioritized()
{
Entry<E> entry = pollEntry();
if (entry == null) {
return null;
}
return new Prioritized<>(entry.getValue(), entry.getPriority());
}

private Entry<E> pollEntry()
{
Iterator<Entry<E>> iterator = queue.iterator();
if (!iterator.hasNext()) {
Expand All @@ -89,7 +107,7 @@ public E poll()
Entry<E> entry = iterator.next();
iterator.remove();
checkState(index.remove(entry.getValue()) != null, "Failed to remove entry from index");
return entry.getValue();
return entry;
}

@Override
Expand Down Expand Up @@ -149,4 +167,26 @@ public long getGeneration()
return generation;
}
}

public static class Prioritized<V>
{
private final V value;
private final long priority;

public Prioritized(V value, long priority)
{
this.value = requireNonNull(value, "value is null");
this.priority = priority;
}

public V getValue()
{
return value;
}

public long getPriority()
{
return priority;
}
}
}
Loading