Skip to content

Commit

Permalink
Fix ShardCleaner not running on HDFS
Browse files Browse the repository at this point in the history
ShardCleaner needs to clean up files even on HDFS.
  • Loading branch information
highker committed Sep 3, 2019
1 parent 1dfa608 commit 97b9c11
Showing 1 changed file with 4 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.units.Duration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

Expand Down Expand Up @@ -55,8 +55,6 @@

import static com.facebook.presto.raptor.metadata.ShardDao.CLEANABLE_SHARDS_BATCH_SIZE;
import static com.facebook.presto.raptor.metadata.ShardDao.CLEANUP_TRANSACTIONS_BATCH_SIZE;
import static com.facebook.presto.raptor.storage.LocalOrcDataEnvironment.tryGetLocalFileSystem;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
Expand Down Expand Up @@ -88,7 +86,7 @@ public class ShardCleaner
private final Duration backupCleanTime;
private final ScheduledExecutorService scheduler;
private final ExecutorService backupExecutor;
private final Optional<RawLocalFileSystem> localFileSystem;
private final FileSystem fileSystem;
private final Duration maxCompletedTransactionAge;

private final AtomicBoolean started = new AtomicBoolean();
Expand Down Expand Up @@ -162,8 +160,7 @@ public ShardCleaner(
this.backupCleanTime = requireNonNull(backupCleanTime, "backupCleanTime is null");
this.scheduler = newScheduledThreadPool(2, daemonThreadsNamed("shard-cleaner-%s"));
this.backupExecutor = newFixedThreadPool(backupDeletionThreads, daemonThreadsNamed("shard-cleaner-backup-%s"));
this.localFileSystem = tryGetLocalFileSystem(requireNonNull(environment, "environment is null"));
checkState((!backupStore.isPresent() || localFileSystem.isPresent()), "cannot support backup for remote file system");
this.fileSystem = requireNonNull(environment, "environment is null").getFileSystem();
this.maxCompletedTransactionAge = requireNonNull(maxCompletedTransactionAge, "maxCompletedTransactionAge is null");
}

Expand Down Expand Up @@ -529,6 +526,6 @@ private static Timestamp maxTimestamp(Duration duration)
private void deleteFile(Path file)
throws IOException
{
localFileSystem.get().delete(file, false);
fileSystem.delete(file, false);
}
}

0 comments on commit 97b9c11

Please sign in to comment.