diff --git a/core/trino-main/src/main/java/io/trino/connector/system/TaskSystemTable.java b/core/trino-main/src/main/java/io/trino/connector/system/TaskSystemTable.java index 89e5e839bf4f..b8d172c8c938 100644 --- a/core/trino-main/src/main/java/io/trino/connector/system/TaskSystemTable.java +++ b/core/trino-main/src/main/java/io/trino/connector/system/TaskSystemTable.java @@ -16,8 +16,8 @@ import io.airlift.node.NodeInfo; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.execution.SqlTaskManager; import io.trino.execution.TaskInfo; -import io.trino.execution.TaskManager; import io.trino.execution.TaskStatus; import io.trino.operator.TaskStats; import io.trino.spi.connector.ConnectorSession; @@ -81,11 +81,11 @@ public class TaskSystemTable .column("end", TIMESTAMP_TZ_MILLIS) .build(); - private final TaskManager taskManager; + private final SqlTaskManager taskManager; private final String nodeId; @Inject - public TaskSystemTable(TaskManager taskManager, NodeInfo nodeInfo) + public TaskSystemTable(SqlTaskManager taskManager, NodeInfo nodeInfo) { this.taskManager = taskManager; this.nodeId = nodeInfo.getNodeId(); diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index 2e51b7cd00ea..a0fff71f1121 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -118,7 +118,7 @@ public class SqlQueryExecution private final DynamicFilterService dynamicFilterService; private final TableExecuteContextManager tableExecuteContextManager; private final TypeAnalyzer typeAnalyzer; - private final TaskManager coordinatorTaskManager; + private final SqlTaskManager coordinatorTaskManager; private final ExchangeManagerRegistry exchangeManagerRegistry; private final TaskSourceFactory taskSourceFactory; private final TaskDescriptorStorage taskDescriptorStorage; @@ -148,7 +148,7 @@ private SqlQueryExecution( WarningCollector warningCollector, TableExecuteContextManager tableExecuteContextManager, TypeAnalyzer typeAnalyzer, - TaskManager coordinatorTaskManager, + SqlTaskManager coordinatorTaskManager, ExchangeManagerRegistry exchangeManagerRegistry, TaskSourceFactory taskSourceFactory, TaskDescriptorStorage taskDescriptorStorage) @@ -698,7 +698,7 @@ public static class SqlQueryExecutionFactory private final DynamicFilterService dynamicFilterService; private final TableExecuteContextManager tableExecuteContextManager; private final TypeAnalyzer typeAnalyzer; - private final TaskManager coordinatorTaskManager; + private final SqlTaskManager coordinatorTaskManager; private final ExchangeManagerRegistry exchangeManagerRegistry; private final TaskSourceFactory taskSourceFactory; private final TaskDescriptorStorage taskDescriptorStorage; @@ -725,7 +725,7 @@ public static class SqlQueryExecutionFactory DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, TypeAnalyzer typeAnalyzer, - TaskManager coordinatorTaskManager, + SqlTaskManager coordinatorTaskManager, ExchangeManagerRegistry exchangeManagerRegistry, TaskSourceFactory taskSourceFactory, TaskDescriptorStorage taskDescriptorStorage) diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java index a53eb2ee3899..35db246a860a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java @@ -84,7 +84,7 @@ import static java.util.concurrent.Executors.newScheduledThreadPool; public class SqlTaskManager - implements TaskManager, Closeable + implements Closeable { private static final Logger log = Logger.get(SqlTaskManager.class); @@ -216,8 +216,8 @@ public void start() }, 0, 1, TimeUnit.SECONDS); } - @Override @PreDestroy + @Override public void close() { boolean taskCanceled = false; @@ -265,7 +265,10 @@ public List getAllTasks() return ImmutableList.copyOf(tasks.asMap().values()); } - @Override + /** + * Gets all of the currently tracked tasks. This will included + * uninitialized, running, and completed tasks. + */ public List getAllTaskInfo() { return tasks.asMap().values().stream() @@ -273,7 +276,13 @@ public List getAllTaskInfo() .collect(toImmutableList()); } - @Override + /** + * Gets the info for the specified task. If the task has not been created + * yet, an uninitialized task is created and the info is returned. + *

+ * NOTE: this design assumes that only tasks that will eventually exist are + * queried. + */ public TaskInfo getTaskInfo(TaskId taskId) { requireNonNull(taskId, "taskId is null"); @@ -283,7 +292,9 @@ public TaskInfo getTaskInfo(TaskId taskId) return sqlTask.getTaskInfo(); } - @Override + /** + * Gets the status for the specified task. + */ public TaskStatus getTaskStatus(TaskId taskId) { requireNonNull(taskId, "taskId is null"); @@ -293,7 +304,15 @@ public TaskStatus getTaskStatus(TaskId taskId) return sqlTask.getTaskStatus(); } - @Override + /** + * Gets future info for the task after the state changes from + * {@code current state}. If the task has not been created yet, an + * uninitialized task is created and the future is returned. If the task + * is already in a final state, the info is returned immediately. + *

+ * NOTE: this design assumes that only tasks that will eventually exist are + * queried. + */ public ListenableFuture getTaskInfo(TaskId taskId, long currentVersion) { requireNonNull(taskId, "taskId is null"); @@ -303,7 +322,10 @@ public ListenableFuture getTaskInfo(TaskId taskId, long currentVersion return sqlTask.getTaskInfo(currentVersion); } - @Override + /** + * Gets the unique instance id of a task. This can be used to detect a task + * that was destroyed and recreated. + */ public String getTaskInstanceId(TaskId taskId) { SqlTask sqlTask = tasks.getUnchecked(taskId); @@ -311,7 +333,15 @@ public String getTaskInstanceId(TaskId taskId) return sqlTask.getTaskInstanceId(); } - @Override + /** + * Gets future status for the task after the state changes from + * {@code current state}. If the task has not been created yet, an + * uninitialized task is created and the future is returned. If the task + * is already in a final state, the status is returned immediately. + *

+ * NOTE: this design assumes that only tasks that will eventually exist are + * queried. + */ public ListenableFuture getTaskStatus(TaskId taskId, long currentVersion) { requireNonNull(taskId, "taskId is null"); @@ -321,7 +351,6 @@ public ListenableFuture getTaskStatus(TaskId taskId, long currentVer return sqlTask.getTaskStatus(currentVersion); } - @Override public VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(TaskId taskId, long currentDynamicFiltersVersion) { requireNonNull(taskId, "taskId is null"); @@ -331,7 +360,10 @@ public VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(Ta return sqlTask.acknowledgeAndGetNewDynamicFilterDomains(currentDynamicFiltersVersion); } - @Override + /** + * Updates the task plan, splitAssignments and output buffers. If the task does not + * already exist, it is created and then updated. + */ public TaskInfo updateTask( Session session, TaskId taskId, @@ -386,7 +418,14 @@ private TaskInfo doUpdateTask( return sqlTask.updateTask(session, fragment, splitAssignments, outputBuffers, dynamicFilterDomains); } - @Override + /** + * Gets results from a task either immediately or in the future. If the + * task or buffer has not been created yet, an uninitialized task is + * created and a future is returned. + *

+ * NOTE: this design assumes that only tasks and buffers that will + * eventually exist are queried. + */ public ListenableFuture getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) { requireNonNull(taskId, "taskId is null"); @@ -397,7 +436,9 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize); } - @Override + /** + * Acknowledges previously received results. + */ public void acknowledgeTaskResults(TaskId taskId, OutputBufferId bufferId, long sequenceId) { requireNonNull(taskId, "taskId is null"); @@ -407,7 +448,14 @@ public void acknowledgeTaskResults(TaskId taskId, OutputBufferId bufferId, long tasks.getUnchecked(taskId).acknowledgeTaskResults(bufferId, sequenceId); } - @Override + /** + * Aborts a result buffer for a task. If the task or buffer has not been + * created yet, an uninitialized task is created and a the buffer is + * aborted. + *

+ * NOTE: this design assumes that only tasks and buffers that will + * eventually exist are queried. + */ public TaskInfo destroyTaskResults(TaskId taskId, OutputBufferId bufferId) { requireNonNull(taskId, "taskId is null"); @@ -416,7 +464,10 @@ public TaskInfo destroyTaskResults(TaskId taskId, OutputBufferId bufferId) return tasks.getUnchecked(taskId).destroyTaskResults(bufferId); } - @Override + /** + * Cancels a task. If the task does not already exist, it is created and then + * canceled. + */ public TaskInfo cancelTask(TaskId taskId) { requireNonNull(taskId, "taskId is null"); @@ -424,7 +475,10 @@ public TaskInfo cancelTask(TaskId taskId) return tasks.getUnchecked(taskId).cancel(); } - @Override + /** + * Aborts a task. If the task does not already exist, it is created and then + * aborted. + */ public TaskInfo abortTask(TaskId taskId) { requireNonNull(taskId, "taskId is null"); @@ -432,7 +486,10 @@ public TaskInfo abortTask(TaskId taskId) return tasks.getUnchecked(taskId).abort(); } - @Override + /** + * Fail a task. If the task does not already exist, it is created and then + * failed. + */ public TaskInfo failTask(TaskId taskId, Throwable failure) { requireNonNull(taskId, "taskId is null"); @@ -507,20 +564,29 @@ private void updateStats() cachedStats.resetTo(tempIoStats); } - @Override + /** + * Adds a state change listener to the specified task. + * Listener is always notified asynchronously using a dedicated notification thread pool so, care should + * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is + * possible notifications are observed out of order due to the asynchronous execution. + */ public void addStateChangeListener(TaskId taskId, StateChangeListener stateChangeListener) { requireNonNull(taskId, "taskId is null"); tasks.getUnchecked(taskId).addStateChangeListener(stateChangeListener); } - @Override + /** + * Add a listener that notifies about failures of any source tasks for a given task + */ public void addSourceTaskFailureListener(TaskId taskId, TaskFailureListener listener) { tasks.getUnchecked(taskId).addSourceTaskFailureListener(listener); } - @Override + /** + * Return trace token for a given task (see Session#traceToken) + */ public Optional getTraceToken(TaskId taskId) { return tasks.getUnchecked(taskId).getTraceToken(); diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManager.java b/core/trino-main/src/main/java/io/trino/execution/TaskManager.java deleted file mode 100644 index 6502ac93b500..000000000000 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManager.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.execution; - -import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.units.DataSize; -import io.trino.Session; -import io.trino.execution.DynamicFiltersCollector.VersionedDynamicFilterDomains; -import io.trino.execution.StateMachine.StateChangeListener; -import io.trino.execution.buffer.BufferResult; -import io.trino.execution.buffer.OutputBuffers; -import io.trino.execution.buffer.OutputBuffers.OutputBufferId; -import io.trino.spi.predicate.Domain; -import io.trino.sql.planner.PlanFragment; -import io.trino.sql.planner.plan.DynamicFilterId; - -import java.util.List; -import java.util.Map; -import java.util.Optional; - -public interface TaskManager -{ - /** - * Gets all of the currently tracked tasks. This will included - * uninitialized, running, and completed tasks. - */ - List getAllTaskInfo(); - - /** - * Gets the info for the specified task. If the task has not been created - * yet, an uninitialized task is created and the info is returned. - *

- * NOTE: this design assumes that only tasks that will eventually exist are - * queried. - */ - TaskInfo getTaskInfo(TaskId taskId); - - /** - * Gets the status for the specified task. - */ - TaskStatus getTaskStatus(TaskId taskId); - - /** - * Gets future info for the task after the state changes from - * {@code current state}. If the task has not been created yet, an - * uninitialized task is created and the future is returned. If the task - * is already in a final state, the info is returned immediately. - *

- * NOTE: this design assumes that only tasks that will eventually exist are - * queried. - */ - ListenableFuture getTaskInfo(TaskId taskId, long currentVersion); - - /** - * Gets the unique instance id of a task. This can be used to detect a task - * that was destroyed and recreated. - */ - String getTaskInstanceId(TaskId taskId); - - /** - * Gets future status for the task after the state changes from - * {@code current state}. If the task has not been created yet, an - * uninitialized task is created and the future is returned. If the task - * is already in a final state, the status is returned immediately. - *

- * NOTE: this design assumes that only tasks that will eventually exist are - * queried. - */ - ListenableFuture getTaskStatus(TaskId taskId, long currentVersion); - - VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(TaskId taskId, long currentDynamicFiltersVersion); - - /** - * Updates the task plan, splitAssignments and output buffers. If the task does not - * already exist, it is created and then updated. - */ - TaskInfo updateTask( - Session session, - TaskId taskId, - Optional fragment, - List splitAssignments, - OutputBuffers outputBuffers, - Map dynamicFilterDomains); - - /** - * Cancels a task. If the task does not already exist, it is created and then - * canceled. - */ - TaskInfo cancelTask(TaskId taskId); - - /** - * Aborts a task. If the task does not already exist, it is created and then - * aborted. - */ - TaskInfo abortTask(TaskId taskId); - - /** - * Fail a task. If the task does not already exist, it is created and then - * failed. - */ - TaskInfo failTask(TaskId taskId, Throwable failure); - - /** - * Gets results from a task either immediately or in the future. If the - * task or buffer has not been created yet, an uninitialized task is - * created and a future is returned. - *

- * NOTE: this design assumes that only tasks and buffers that will - * eventually exist are queried. - */ - ListenableFuture getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, DataSize maxSize); - - /** - * Acknowledges previously received results. - */ - void acknowledgeTaskResults(TaskId taskId, OutputBufferId bufferId, long sequenceId); - - /** - * Aborts a result buffer for a task. If the task or buffer has not been - * created yet, an uninitialized task is created and a the buffer is - * aborted. - *

- * NOTE: this design assumes that only tasks and buffers that will - * eventually exist are queried. - */ - TaskInfo destroyTaskResults(TaskId taskId, OutputBufferId bufferId); - - /** - * Adds a state change listener to the specified task. - * Listener is always notified asynchronously using a dedicated notification thread pool so, care should - * be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is - * possible notifications are observed out of order due to the asynchronous execution. - */ - void addStateChangeListener(TaskId taskId, StateChangeListener stateChangeListener); - - /** - * Add a listener that notifies about failures of any source tasks for a given task - */ - void addSourceTaskFailureListener(TaskId taskId, TaskFailureListener listener); - - /** - * Return trace token for a given task (see Session#traceToken) - */ - Optional getTraceToken(TaskId taskId); -} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java index 115c9dd835c9..4d702603d702 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java @@ -38,6 +38,7 @@ import io.trino.execution.RemoteTask; import io.trino.execution.RemoteTaskFactory; import io.trino.execution.SqlStage; +import io.trino.execution.SqlTaskManager; import io.trino.execution.StageId; import io.trino.execution.StageInfo; import io.trino.execution.StateMachine; @@ -46,7 +47,6 @@ import io.trino.execution.TableInfo; import io.trino.execution.TaskFailureListener; import io.trino.execution.TaskId; -import io.trino.execution.TaskManager; import io.trino.execution.TaskStatus; import io.trino.execution.scheduler.policy.ExecutionPolicy; import io.trino.execution.scheduler.policy.ExecutionSchedule; @@ -223,7 +223,7 @@ public SqlQueryScheduler( TableExecuteContextManager tableExecuteContextManager, Metadata metadata, SplitSourceFactory splitSourceFactory, - TaskManager coordinatorTaskManager, + SqlTaskManager coordinatorTaskManager, ExchangeManagerRegistry exchangeManagerRegistry, TaskSourceFactory taskSourceFactory, TaskDescriptorStorage taskDescriptorStorage) @@ -790,7 +790,7 @@ private static class CoordinatorStagesScheduler private final StageManager stageManager; private final List stageExecutions; private final AtomicReference distributedStagesScheduler; - private final TaskManager coordinatorTaskManager; + private final SqlTaskManager coordinatorTaskManager; private final AtomicBoolean scheduled = new AtomicBoolean(); @@ -801,7 +801,7 @@ public static CoordinatorStagesScheduler create( FailureDetector failureDetector, Executor executor, AtomicReference distributedStagesScheduler, - TaskManager coordinatorTaskManager) + SqlTaskManager coordinatorTaskManager) { Map outputBuffersForStagesConsumedByCoordinator = createOutputBuffersForStagesConsumedByCoordinator(stageManager); Map> bucketToPartitionForStagesConsumedByCoordinator = createBucketToPartitionForStagesConsumedByCoordinator(stageManager); @@ -887,7 +887,7 @@ private CoordinatorStagesScheduler( StageManager stageManager, List stageExecutions, AtomicReference distributedStagesScheduler, - TaskManager coordinatorTaskManager) + SqlTaskManager coordinatorTaskManager) { this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null"); this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); diff --git a/core/trino-main/src/main/java/io/trino/server/GracefulShutdownHandler.java b/core/trino-main/src/main/java/io/trino/server/GracefulShutdownHandler.java index 307bcc0f4133..d3b27ae5f189 100644 --- a/core/trino-main/src/main/java/io/trino/server/GracefulShutdownHandler.java +++ b/core/trino-main/src/main/java/io/trino/server/GracefulShutdownHandler.java @@ -16,8 +16,8 @@ import io.airlift.bootstrap.LifeCycleManager; import io.airlift.log.Logger; import io.airlift.units.Duration; +import io.trino.execution.SqlTaskManager; import io.trino.execution.TaskInfo; -import io.trino.execution.TaskManager; import javax.annotation.concurrent.GuardedBy; import javax.inject.Inject; @@ -48,7 +48,7 @@ public class GracefulShutdownHandler private final ScheduledExecutorService shutdownHandler = newSingleThreadScheduledExecutor(threadsNamed("shutdown-handler-%s")); private final ExecutorService lifeCycleStopper = newSingleThreadExecutor(threadsNamed("lifecycle-stopper-%s")); private final LifeCycleManager lifeCycleManager; - private final TaskManager sqlTaskManager; + private final SqlTaskManager sqlTaskManager; private final boolean isCoordinator; private final ShutdownAction shutdownAction; private final Duration gracePeriod; @@ -58,7 +58,7 @@ public class GracefulShutdownHandler @Inject public GracefulShutdownHandler( - TaskManager sqlTaskManager, + SqlTaskManager sqlTaskManager, ServerConfig serverConfig, ShutdownAction shutdownAction, LifeCycleManager lifeCycleManager) diff --git a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java index f25635d1b8a3..285aaa975652 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java @@ -15,7 +15,6 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import com.google.inject.Key; import com.google.inject.Provides; import com.google.inject.Scopes; import io.airlift.concurrent.BoundedExecutor; @@ -47,7 +46,6 @@ import io.trino.execution.SqlTaskManager; import io.trino.execution.TableExecuteContextManager; import io.trino.execution.TaskManagementExecutor; -import io.trino.execution.TaskManager; import io.trino.execution.TaskManagerConfig; import io.trino.execution.executor.MultilevelSplitQueue; import io.trino.execution.executor.TaskExecutor; @@ -292,7 +290,6 @@ protected void setup(Binder binder) newExporter(binder).export(TaskExecutorResource.class).withGeneratedName(); binder.bind(TaskManagementExecutor.class).in(Scopes.SINGLETON); binder.bind(SqlTaskManager.class).in(Scopes.SINGLETON); - binder.bind(TaskManager.class).to(Key.get(SqlTaskManager.class)); binder.bind(TableExecuteContextManager.class).in(Scopes.SINGLETON); // memory revoking scheduler @@ -307,7 +304,7 @@ protected void setup(Binder binder) binder.bind(LocalMemoryManager.class).in(Scopes.SINGLETON); binder.bind(LocalMemoryManagerExporter.class).in(Scopes.SINGLETON); newOptionalBinder(binder, VersionEmbedder.class).setDefault().to(EmbedVersion.class).in(Scopes.SINGLETON); - newExporter(binder).export(TaskManager.class).withGeneratedName(); + newExporter(binder).export(SqlTaskManager.class).withGeneratedName(); binder.bind(TaskExecutor.class).in(Scopes.SINGLETON); newExporter(binder).export(TaskExecutor.class).withGeneratedName(); binder.bind(MultilevelSplitQueue.class).in(Scopes.SINGLETON); diff --git a/core/trino-main/src/main/java/io/trino/server/TaskResource.java b/core/trino-main/src/main/java/io/trino/server/TaskResource.java index 2f9e00872e36..bb21a6546865 100644 --- a/core/trino-main/src/main/java/io/trino/server/TaskResource.java +++ b/core/trino-main/src/main/java/io/trino/server/TaskResource.java @@ -26,9 +26,9 @@ import io.trino.Session; import io.trino.execution.FailureInjector; import io.trino.execution.FailureInjector.InjectedFailure; +import io.trino.execution.SqlTaskManager; import io.trino.execution.TaskId; import io.trino.execution.TaskInfo; -import io.trino.execution.TaskManager; import io.trino.execution.TaskState; import io.trino.execution.TaskStatus; import io.trino.execution.buffer.BufferResult; @@ -94,7 +94,7 @@ public class TaskResource private static final Duration ADDITIONAL_WAIT_TIME = new Duration(5, SECONDS); private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(2, SECONDS); - private final TaskManager taskManager; + private final SqlTaskManager taskManager; private final SessionPropertyManager sessionPropertyManager; private final Executor responseExecutor; private final ScheduledExecutorService timeoutExecutor; @@ -104,7 +104,7 @@ public class TaskResource @Inject public TaskResource( - TaskManager taskManager, + SqlTaskManager taskManager, SessionPropertyManager sessionPropertyManager, @ForAsyncHttp BoundedExecutor responseExecutor, @ForAsyncHttp ScheduledExecutorService timeoutExecutor, diff --git a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java index d8d76b446d46..62ad6cc5677b 100644 --- a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java +++ b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java @@ -50,8 +50,8 @@ import io.trino.execution.QueryInfo; import io.trino.execution.QueryManager; import io.trino.execution.SqlQueryManager; +import io.trino.execution.SqlTaskManager; import io.trino.execution.StateMachine.StateChangeListener; -import io.trino.execution.TaskManager; import io.trino.execution.resourcegroups.InternalResourceGroupManager; import io.trino.memory.ClusterMemoryManager; import io.trino.memory.LocalMemoryManager; @@ -170,7 +170,7 @@ public static Builder builder() private final Announcer announcer; private final DispatchManager dispatchManager; private final SqlQueryManager queryManager; - private final TaskManager taskManager; + private final SqlTaskManager taskManager; private final GracefulShutdownHandler gracefulShutdownHandler; private final ShutdownAction shutdownAction; private final MBeanServer mBeanServer; @@ -339,7 +339,7 @@ private TestingTrinoServer( nodeManager = injector.getInstance(InternalNodeManager.class); serviceSelectorManager = injector.getInstance(ServiceSelectorManager.class); gracefulShutdownHandler = injector.getInstance(GracefulShutdownHandler.class); - taskManager = injector.getInstance(TaskManager.class); + taskManager = injector.getInstance(SqlTaskManager.class); shutdownAction = injector.getInstance(ShutdownAction.class); mBeanServer = injector.getInstance(MBeanServer.class); announcer = injector.getInstance(Announcer.class); @@ -555,7 +555,7 @@ public GracefulShutdownHandler getGracefulShutdownHandler() return gracefulShutdownHandler; } - public TaskManager getTaskManager() + public SqlTaskManager getTaskManager() { return taskManager; } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestGracefulShutdown.java b/testing/trino-tests/src/test/java/io/trino/tests/TestGracefulShutdown.java index c590088576b2..66f03dc3896f 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestGracefulShutdown.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestGracefulShutdown.java @@ -19,7 +19,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.trino.Session; -import io.trino.execution.TaskManager; +import io.trino.execution.SqlTaskManager; import io.trino.server.BasicQueryInfo; import io.trino.server.testing.TestingTrinoServer; import io.trino.server.testing.TestingTrinoServer.TestShutdownAction; @@ -88,7 +88,7 @@ public void testShutdown() .findFirst() .get(); - TaskManager taskManager = worker.getTaskManager(); + SqlTaskManager taskManager = worker.getTaskManager(); // wait until tasks show up on the worker while (taskManager.getAllTaskInfo().isEmpty()) {