Skip to content

Commit

Permalink
Use ClosingBinder
Browse files Browse the repository at this point in the history
There is no logic difference here. This commit replaces existing code
with usage of closing binder without changing any logic.
  • Loading branch information
kokosing committed Mar 15, 2024
1 parent 1809890 commit 0d4f855
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@
import io.trino.sql.rewrite.ShowStatsRewrite;
import io.trino.sql.rewrite.StatementRewrite;
import io.trino.sql.rewrite.StatementRewrite.Rewrite;
import jakarta.annotation.PreDestroy;

import java.util.List;
import java.util.concurrent.ExecutorService;
Expand All @@ -143,6 +142,7 @@
import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static io.trino.server.InternalCommunicationHttpClientModule.internalHttpClientModule;
import static io.trino.util.Executors.decorateWithVersion;
import static java.util.concurrent.Executors.newCachedThreadPool;
Expand Down Expand Up @@ -361,7 +361,11 @@ List<OutputStatsEstimatorFactory> getCompositeOutputDataSizeEstimatorDelegateFac
install(new QueryExecutionFactoryModule());

// cleanup
binder.bind(ExecutorCleanup.class).asEagerSingleton();
closingBinder(binder)
.registerExecutor(ExecutorService.class, ForStatementResource.class)
.registerExecutor(ScheduledExecutorService.class, ForStatementResource.class)
.registerExecutor(ExecutorService.class, ForQueryExecution.class)
.registerExecutor(ScheduledExecutorService.class, ForScheduler.class);
}

// working around circular dependency Metadata <-> PlannerContext
Expand Down Expand Up @@ -458,30 +462,4 @@ private void bindLowMemoryTaskKiller(LowMemoryTaskKillerPolicy policy, Class<? e
.to(clazz)
.in(Scopes.SINGLETON)));
}

public static class ExecutorCleanup
{
private final List<ExecutorService> executors;

@Inject
public ExecutorCleanup(
@ForStatementResource ExecutorService statementResponseExecutor,
@ForStatementResource ScheduledExecutorService statementTimeoutExecutor,
@ForQueryExecution ExecutorService queryExecutionExecutor,
@ForScheduler ScheduledExecutorService schedulerExecutor)
{
executors = ImmutableList.<ExecutorService>builder()
.add(statementResponseExecutor)
.add(statementTimeoutExecutor)
.add(queryExecutionExecutor)
.add(schedulerExecutor)
.build();
}

@PreDestroy
public void shutdown()
{
executors.forEach(ExecutorService::shutdownNow);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.server;

import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provides;
Expand Down Expand Up @@ -155,9 +154,7 @@
import io.trino.type.TypeSignatureKeyDeserializer;
import io.trino.util.EmbedVersion;
import io.trino.util.FinalizerService;
import jakarta.annotation.PreDestroy;

import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand All @@ -177,6 +174,7 @@
import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.TOPOLOGY;
import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM;
import static io.trino.operator.RetryPolicy.TASK;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static io.trino.server.InternalCommunicationHttpClientModule.internalHttpClientModule;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
Expand Down Expand Up @@ -501,7 +499,10 @@ protected void setup(Binder binder)
newOptionalBinder(binder, RuleStatsRecorder.class);

// cleanup
binder.bind(ExecutorCleanup.class).in(Scopes.SINGLETON);
closingBinder(binder)
.registerExecutor(ScheduledExecutorService.class, ForExchange.class)
.registerExecutor(ExecutorService.class, ForAsyncHttp.class)
.registerExecutor(ScheduledExecutorService.class, ForAsyncHttp.class);
}

private static class RegisterFunctionBundles
Expand Down Expand Up @@ -602,27 +603,4 @@ public static ScheduledExecutorService createAsyncHttpTimeoutExecutor(TaskManage
{
return newScheduledThreadPool(config.getHttpTimeoutThreads(), daemonThreadsNamed("async-http-timeout-%s"));
}

public static class ExecutorCleanup
{
private final List<ExecutorService> executors;

@Inject
public ExecutorCleanup(
@ForExchange ScheduledExecutorService exchangeExecutor,
@ForAsyncHttp ExecutorService httpResponseExecutor,
@ForAsyncHttp ScheduledExecutorService httpTimeoutExecutor)
{
executors = ImmutableList.of(
exchangeExecutor,
httpResponseExecutor,
httpTimeoutExecutor);
}

@PreDestroy
public void shutdown()
{
executors.forEach(ExecutorService::shutdownNow);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,19 @@
*/
package io.trino.transaction;

import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.trino.metadata.CatalogManager;
import io.trino.spi.VersionEmbedder;
import jakarta.annotation.PreDestroy;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

Expand All @@ -39,7 +36,9 @@ public class InMemoryTransactionManagerModule
public void configure(Binder binder)
{
configBinder(binder).bindConfig(TransactionManagerConfig.class);
binder.bind(ExecutorCleanup.class).asEagerSingleton();
closingBinder(binder)
.registerExecutor(ExecutorService.class, ForTransactionManager.class)
.registerExecutor(ScheduledExecutorService.class, ForTransactionManager.class);
}

@Provides
Expand Down Expand Up @@ -69,26 +68,4 @@ public static TransactionManager createTransactionManager(
{
return InMemoryTransactionManager.create(config, idleCheckExecutor, catalogManager, versionEmbedder.embedVersion(finishingExecutor));
}

public static class ExecutorCleanup
{
private final List<ExecutorService> executors;

@Inject
public ExecutorCleanup(
@ForTransactionManager ExecutorService transactionFinishingExecutor,
@ForTransactionManager ScheduledExecutorService transactionIdleExecutor)
{
executors = ImmutableList.<ExecutorService>builder()
.add(transactionFinishingExecutor)
.add(transactionIdleExecutor)
.build();
}

@PreDestroy
public void shutdown()
{
executors.forEach(ExecutorService::shutdownNow);
}
}
}

0 comments on commit 0d4f855

Please sign in to comment.