diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupManager.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupManager.java index 0f8284708f88..258a78496f16 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupManager.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupManager.java @@ -35,12 +35,14 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.plugin.raptor.legacy.RaptorErrorCode.RAPTOR_BACKUP_CORRUPTION; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; public class BackupManager { @@ -71,7 +73,7 @@ public BackupManager(Optional backupStore, StorageService storageSe @PreDestroy public void shutdown() { - executorService.shutdownNow(); + shutdownAndAwaitTermination(executorService, 10, SECONDS); } public CompletableFuture submit(UUID uuid, File source) diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/DatabaseMetadataModule.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/DatabaseMetadataModule.java index 4cf735884e1d..aa37ef2857f8 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/DatabaseMetadataModule.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/DatabaseMetadataModule.java @@ -85,7 +85,7 @@ public void configure(Binder binder) @ForMetadata public static ConnectionFactory createConnectionFactory(H2DatabaseConfig config) { - String url = format("jdbc:h2:%s;DB_CLOSE_DELAY=-1", config.getFilename()); + String url = format("jdbc:h2:retry:%s", config.getFilename()); return () -> DriverManager.getConnection(url); } diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java index 17452bd20054..2a416e31eecc 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java @@ -56,6 +56,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; import static io.airlift.concurrent.MoreFutures.addExceptionCallback; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.units.DataSize.succinctBytes; @@ -135,8 +136,8 @@ public void start() @PreDestroy public void shutdown() { - executorService.shutdownNow(); - missingShardExecutor.shutdownNow(); + shutdownAndAwaitTermination(missingShardExecutor, 10, SECONDS); + shutdownAndAwaitTermination(executorService, 10, SECONDS); } private void scheduleRecoverMissingShards() diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/organization/ShardOrganizer.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/organization/ShardOrganizer.java index 19d31a66b8f9..ba9512c9a027 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/organization/ShardOrganizer.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/organization/ShardOrganizer.java @@ -27,14 +27,15 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Sets.newConcurrentHashSet; +import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; public class ShardOrganizer { @@ -43,8 +44,6 @@ public class ShardOrganizer private final ExecutorService executorService; private final ThreadPoolExecutorMBean executorMBean; - private final AtomicBoolean shutdown = new AtomicBoolean(); - // Tracks shards that are scheduled for compaction so that we do not schedule them more than once private final Set shardsInProgress = newConcurrentHashSet(); private final JobFactory jobFactory; @@ -68,9 +67,7 @@ public ShardOrganizer(JobFactory jobFactory, int threads) @PreDestroy public void shutdown() { - if (!shutdown.getAndSet(true)) { - executorService.shutdownNow(); - } + shutdownAndAwaitTermination(executorService, 10, SECONDS); } public CompletableFuture enqueue(OrganizationSet organizationSet)