diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupManager.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupManager.java index a9b664d0116f..5d9252d2c3e2 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupManager.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupManager.java @@ -36,12 +36,14 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.plugin.raptor.legacy.RaptorErrorCode.RAPTOR_BACKUP_CORRUPTION; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; public class BackupManager { @@ -72,7 +74,7 @@ public BackupManager(Optional backupStore, StorageService storageSe @PreDestroy public void shutdown() { - executorService.shutdownNow(); + shutdownAndAwaitTermination(executorService, 10, SECONDS); } public CompletableFuture submit(UUID uuid, File source) diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/DatabaseMetadataModule.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/DatabaseMetadataModule.java index b407d706f10e..a14659fde223 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/DatabaseMetadataModule.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/DatabaseMetadataModule.java @@ -86,7 +86,7 @@ public void configure(Binder binder) @ForMetadata public static ConnectionFactory createConnectionFactory(H2DatabaseConfig config) { - String url = format("jdbc:h2:%s;DB_CLOSE_DELAY=-1", config.getFilename()); + String url = format("jdbc:h2:retry:%s", config.getFilename()); return () -> DriverManager.getConnection(url); } diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java index 1fcd9f85b5cf..78eee022d6e5 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java @@ -57,6 +57,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; import static io.airlift.concurrent.MoreFutures.addExceptionCallback; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.units.DataSize.succinctBytes; @@ -136,8 +137,8 @@ public void start() @PreDestroy public void shutdown() { - executorService.shutdownNow(); - missingShardExecutor.shutdownNow(); + shutdownAndAwaitTermination(missingShardExecutor, 10, SECONDS); + shutdownAndAwaitTermination(executorService, 10, SECONDS); } private void scheduleRecoverMissingShards() @@ -171,7 +172,9 @@ private synchronized void enqueueMissingShards() } } catch (Throwable t) { - log.error(t, "Error creating shard recovery tasks"); + if (!executorService.isShutdown()) { + log.error(t, "Error creating shard recovery tasks"); + } } } diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/organization/ShardOrganizer.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/organization/ShardOrganizer.java index 085887551065..e60f0644eb0d 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/organization/ShardOrganizer.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/organization/ShardOrganizer.java @@ -28,14 +28,15 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Sets.newConcurrentHashSet; +import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; public class ShardOrganizer { @@ -44,8 +45,6 @@ public class ShardOrganizer private final ExecutorService executorService; private final ThreadPoolExecutorMBean executorMBean; - private final AtomicBoolean shutdown = new AtomicBoolean(); - // Tracks shards that are scheduled for compaction so that we do not schedule them more than once private final Set shardsInProgress = newConcurrentHashSet(); private final JobFactory jobFactory; @@ -69,9 +68,7 @@ public ShardOrganizer(JobFactory jobFactory, int threads) @PreDestroy public void shutdown() { - if (!shutdown.getAndSet(true)) { - executorService.shutdownNow(); - } + shutdownAndAwaitTermination(executorService, 10, SECONDS); } public CompletableFuture enqueue(OrganizationSet organizationSet)