Skip to content

Commit

Permalink
Drop unneeded TaskManager interface
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 1, 2022
1 parent 03f17e7 commit 78cb3d5
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import io.airlift.node.NodeInfo;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.execution.SqlTaskManager;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskManager;
import io.trino.execution.TaskStatus;
import io.trino.operator.TaskStats;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -81,11 +81,11 @@ public class TaskSystemTable
.column("end", TIMESTAMP_TZ_MILLIS)
.build();

private final TaskManager taskManager;
private final SqlTaskManager taskManager;
private final String nodeId;

@Inject
public TaskSystemTable(TaskManager taskManager, NodeInfo nodeInfo)
public TaskSystemTable(SqlTaskManager taskManager, NodeInfo nodeInfo)
{
this.taskManager = taskManager;
this.nodeId = nodeInfo.getNodeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class SqlQueryExecution
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;
private final TypeAnalyzer typeAnalyzer;
private final TaskManager coordinatorTaskManager;
private final SqlTaskManager coordinatorTaskManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final TaskSourceFactory taskSourceFactory;
private final TaskDescriptorStorage taskDescriptorStorage;
Expand Down Expand Up @@ -148,7 +148,7 @@ private SqlQueryExecution(
WarningCollector warningCollector,
TableExecuteContextManager tableExecuteContextManager,
TypeAnalyzer typeAnalyzer,
TaskManager coordinatorTaskManager,
SqlTaskManager coordinatorTaskManager,
ExchangeManagerRegistry exchangeManagerRegistry,
TaskSourceFactory taskSourceFactory,
TaskDescriptorStorage taskDescriptorStorage)
Expand Down Expand Up @@ -698,7 +698,7 @@ public static class SqlQueryExecutionFactory
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;
private final TypeAnalyzer typeAnalyzer;
private final TaskManager coordinatorTaskManager;
private final SqlTaskManager coordinatorTaskManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final TaskSourceFactory taskSourceFactory;
private final TaskDescriptorStorage taskDescriptorStorage;
Expand All @@ -725,7 +725,7 @@ public static class SqlQueryExecutionFactory
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
TypeAnalyzer typeAnalyzer,
TaskManager coordinatorTaskManager,
SqlTaskManager coordinatorTaskManager,
ExchangeManagerRegistry exchangeManagerRegistry,
TaskSourceFactory taskSourceFactory,
TaskDescriptorStorage taskDescriptorStorage)
Expand Down
104 changes: 85 additions & 19 deletions core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import static java.util.concurrent.Executors.newScheduledThreadPool;

public class SqlTaskManager
implements TaskManager, Closeable
implements Closeable
{
private static final Logger log = Logger.get(SqlTaskManager.class);

Expand Down Expand Up @@ -216,8 +216,8 @@ public void start()
}, 0, 1, TimeUnit.SECONDS);
}

@Override
@PreDestroy
@Override
public void close()
{
boolean taskCanceled = false;
Expand Down Expand Up @@ -265,15 +265,24 @@ public List<SqlTask> getAllTasks()
return ImmutableList.copyOf(tasks.asMap().values());
}

@Override
/**
* Gets all of the currently tracked tasks. This will included
* uninitialized, running, and completed tasks.
*/
public List<TaskInfo> getAllTaskInfo()
{
return tasks.asMap().values().stream()
.map(SqlTask::getTaskInfo)
.collect(toImmutableList());
}

@Override
/**
* Gets the info for the specified task. If the task has not been created
* yet, an uninitialized task is created and the info is returned.
* <p>
* NOTE: this design assumes that only tasks that will eventually exist are
* queried.
*/
public TaskInfo getTaskInfo(TaskId taskId)
{
requireNonNull(taskId, "taskId is null");
Expand All @@ -283,7 +292,9 @@ public TaskInfo getTaskInfo(TaskId taskId)
return sqlTask.getTaskInfo();
}

@Override
/**
* Gets the status for the specified task.
*/
public TaskStatus getTaskStatus(TaskId taskId)
{
requireNonNull(taskId, "taskId is null");
Expand All @@ -293,7 +304,15 @@ public TaskStatus getTaskStatus(TaskId taskId)
return sqlTask.getTaskStatus();
}

@Override
/**
* Gets future info for the task after the state changes from
* {@code current state}. If the task has not been created yet, an
* uninitialized task is created and the future is returned. If the task
* is already in a final state, the info is returned immediately.
* <p>
* NOTE: this design assumes that only tasks that will eventually exist are
* queried.
*/
public ListenableFuture<TaskInfo> getTaskInfo(TaskId taskId, long currentVersion)
{
requireNonNull(taskId, "taskId is null");
Expand All @@ -303,15 +322,26 @@ public ListenableFuture<TaskInfo> getTaskInfo(TaskId taskId, long currentVersion
return sqlTask.getTaskInfo(currentVersion);
}

@Override
/**
* Gets the unique instance id of a task. This can be used to detect a task
* that was destroyed and recreated.
*/
public String getTaskInstanceId(TaskId taskId)
{
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
return sqlTask.getTaskInstanceId();
}

@Override
/**
* Gets future status for the task after the state changes from
* {@code current state}. If the task has not been created yet, an
* uninitialized task is created and the future is returned. If the task
* is already in a final state, the status is returned immediately.
* <p>
* NOTE: this design assumes that only tasks that will eventually exist are
* queried.
*/
public ListenableFuture<TaskStatus> getTaskStatus(TaskId taskId, long currentVersion)
{
requireNonNull(taskId, "taskId is null");
Expand All @@ -321,7 +351,6 @@ public ListenableFuture<TaskStatus> getTaskStatus(TaskId taskId, long currentVer
return sqlTask.getTaskStatus(currentVersion);
}

@Override
public VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(TaskId taskId, long currentDynamicFiltersVersion)
{
requireNonNull(taskId, "taskId is null");
Expand All @@ -331,7 +360,10 @@ public VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(Ta
return sqlTask.acknowledgeAndGetNewDynamicFilterDomains(currentDynamicFiltersVersion);
}

@Override
/**
* Updates the task plan, splitAssignments and output buffers. If the task does not
* already exist, it is created and then updated.
*/
public TaskInfo updateTask(
Session session,
TaskId taskId,
Expand Down Expand Up @@ -386,7 +418,14 @@ private TaskInfo doUpdateTask(
return sqlTask.updateTask(session, fragment, splitAssignments, outputBuffers, dynamicFilterDomains);
}

@Override
/**
* Gets results from a task either immediately or in the future. If the
* task or buffer has not been created yet, an uninitialized task is
* created and a future is returned.
* <p>
* NOTE: this design assumes that only tasks and buffers that will
* eventually exist are queried.
*/
public ListenableFuture<BufferResult> getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, DataSize maxSize)
{
requireNonNull(taskId, "taskId is null");
Expand All @@ -397,7 +436,9 @@ public ListenableFuture<BufferResult> getTaskResults(TaskId taskId, OutputBuffer
return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize);
}

@Override
/**
* Acknowledges previously received results.
*/
public void acknowledgeTaskResults(TaskId taskId, OutputBufferId bufferId, long sequenceId)
{
requireNonNull(taskId, "taskId is null");
Expand All @@ -407,7 +448,14 @@ public void acknowledgeTaskResults(TaskId taskId, OutputBufferId bufferId, long
tasks.getUnchecked(taskId).acknowledgeTaskResults(bufferId, sequenceId);
}

@Override
/**
* Aborts a result buffer for a task. If the task or buffer has not been
* created yet, an uninitialized task is created and a the buffer is
* aborted.
* <p>
* NOTE: this design assumes that only tasks and buffers that will
* eventually exist are queried.
*/
public TaskInfo destroyTaskResults(TaskId taskId, OutputBufferId bufferId)
{
requireNonNull(taskId, "taskId is null");
Expand All @@ -416,23 +464,32 @@ public TaskInfo destroyTaskResults(TaskId taskId, OutputBufferId bufferId)
return tasks.getUnchecked(taskId).destroyTaskResults(bufferId);
}

@Override
/**
* Cancels a task. If the task does not already exist, it is created and then
* canceled.
*/
public TaskInfo cancelTask(TaskId taskId)
{
requireNonNull(taskId, "taskId is null");

return tasks.getUnchecked(taskId).cancel();
}

@Override
/**
* Aborts a task. If the task does not already exist, it is created and then
* aborted.
*/
public TaskInfo abortTask(TaskId taskId)
{
requireNonNull(taskId, "taskId is null");

return tasks.getUnchecked(taskId).abort();
}

@Override
/**
* Fail a task. If the task does not already exist, it is created and then
* failed.
*/
public TaskInfo failTask(TaskId taskId, Throwable failure)
{
requireNonNull(taskId, "taskId is null");
Expand Down Expand Up @@ -507,20 +564,29 @@ private void updateStats()
cachedStats.resetTo(tempIoStats);
}

@Override
/**
* Adds a state change listener to the specified task.
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
* possible notifications are observed out of order due to the asynchronous execution.
*/
public void addStateChangeListener(TaskId taskId, StateChangeListener<TaskState> stateChangeListener)
{
requireNonNull(taskId, "taskId is null");
tasks.getUnchecked(taskId).addStateChangeListener(stateChangeListener);
}

@Override
/**
* Add a listener that notifies about failures of any source tasks for a given task
*/
public void addSourceTaskFailureListener(TaskId taskId, TaskFailureListener listener)
{
tasks.getUnchecked(taskId).addSourceTaskFailureListener(listener);
}

@Override
/**
* Return trace token for a given task (see Session#traceToken)
*/
public Optional<String> getTraceToken(TaskId taskId)
{
return tasks.getUnchecked(taskId).getTraceToken();
Expand Down
Loading

0 comments on commit 78cb3d5

Please sign in to comment.