diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 67cd81ee91a96..397d81f92f60b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -214,6 +214,15 @@ public class CommonConfigurationKeysPublic { public static final String FS_TRASH_INTERVAL_KEY = "fs.trash.interval"; /** Default value for FS_TRASH_INTERVAL_KEY */ public static final long FS_TRASH_INTERVAL_DEFAULT = 0; + /** + * @see + * + * core-default.xml + */ + public static final String FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY = + "fs.trash.clean.trashroot.enable"; + /** Default value for FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY. */ + public static final boolean FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT = false; /** * @see * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java index f4228dea69f49..2fb4bff09f9fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java @@ -19,6 +19,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; @@ -70,6 +72,8 @@ public class TrashPolicyDefault extends TrashPolicy { private long emptierInterval; + private boolean cleanNonCheckpointUnderTrashRoot; + public TrashPolicyDefault() { } private TrashPolicyDefault(FileSystem fs, Configuration conf) @@ -90,6 +94,8 @@ public void initialize(Configuration conf, FileSystem fs, Path home) { this.emptierInterval = (long)(conf.getFloat( FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) * MSECS_PER_MINUTE); + this.cleanNonCheckpointUnderTrashRoot = conf.getBoolean( + FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT); } @Override @@ -101,6 +107,8 @@ public void initialize(Configuration conf, FileSystem fs) { this.emptierInterval = (long)(conf.getFloat( FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) * MSECS_PER_MINUTE); + this.cleanNonCheckpointUnderTrashRoot = conf.getBoolean( + FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT); if (deletionInterval < 0) { LOG.warn("Invalid value {} for deletion interval," + " deletion interaval can not be negative." @@ -374,8 +382,14 @@ private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately) try { time = getTimeFromCheckpoint(name); } catch (ParseException e) { - LOG.warn("Unexpected item in trash: "+dir+". Ignoring."); - continue; + if (cleanNonCheckpointUnderTrashRoot) { + fs.delete(path, true); + LOG.warn("Unexpected item in trash: " + dir + ". Deleting."); + continue; + } else { + LOG.warn("Unexpected item in trash: " + dir + ". Ignoring."); + continue; + } } if (((now - deletionInterval) > time) || deleteImmediately) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java index e43b176d0bfe9..4461c118625a1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java @@ -110,6 +110,7 @@ public abstract class CachingBlockManager extends BlockManager { * @param prefetchingStatistics statistics for this stream. * @param conf the configuration. * @param localDirAllocator the local dir allocator instance. + * @param maxBlocksCount max blocks count to be kept in cache at any time. * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ public CachingBlockManager( @@ -118,7 +119,8 @@ public CachingBlockManager( int bufferPoolSize, PrefetchingStatistics prefetchingStatistics, Configuration conf, - LocalDirAllocator localDirAllocator) { + LocalDirAllocator localDirAllocator, + int maxBlocksCount) { super(blockData); Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize"); @@ -129,16 +131,16 @@ public CachingBlockManager( this.numReadErrors = new AtomicInteger(); this.cachingDisabled = new AtomicBoolean(); this.prefetchingStatistics = requireNonNull(prefetchingStatistics); + this.conf = requireNonNull(conf); if (this.getBlockData().getFileSize() > 0) { this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(), this.prefetchingStatistics); - this.cache = this.createCache(); + this.cache = this.createCache(maxBlocksCount); } this.ops = new BlockOperations(); this.ops.setDebug(false); - this.conf = requireNonNull(conf); this.localDirAllocator = localDirAllocator; } @@ -557,8 +559,8 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, } } - protected BlockCache createCache() { - return new SingleFilePerBlockCache(prefetchingStatistics); + protected BlockCache createCache(int maxBlocksCount) { + return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount); } protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java new file mode 100644 index 0000000000000..785023f523cd5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.impl.prefetch; + +import java.util.concurrent.TimeUnit; + +/** + * Constants used by prefetch implementations. + */ +public final class PrefetchConstants { + + private PrefetchConstants() { + } + + /** + * Timeout to be used by close, while acquiring prefetch block write lock. + * Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT} + */ + static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5; + + /** + * Lock timeout unit to be used by the thread while acquiring prefetch block write lock. + * Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT_UNIT} + */ + static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS; + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index e043fbd904be8..a84a79eb77851 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -47,6 +47,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.util.Preconditions; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull; @@ -61,27 +62,42 @@ public class SingleFilePerBlockCache implements BlockCache { /** * Blocks stored in this cache. */ - private final Map blocks = new ConcurrentHashMap<>(); + private final Map blocks; /** - * Number of times a block was read from this cache. - * Used for determining cache utilization factor. + * Total max blocks count, to be considered as baseline for LRU cache eviction. */ - private int numGets = 0; + private final int maxBlocksCount; - private final AtomicBoolean closed; + /** + * The lock to be shared by LRU based linked list updates. + */ + private final ReentrantReadWriteLock blocksLock; - private final PrefetchingStatistics prefetchingStatistics; + /** + * Head of the linked list. + */ + private Entry head; + + /** + * Tail of the linked list. + */ + private Entry tail; /** - * Timeout to be used by close, while acquiring prefetch block write lock. + * Total size of the linked list. */ - private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5; + private int entryListSize; /** - * Lock timeout unit to be used by the thread while acquiring prefetch block write lock. + * Number of times a block was read from this cache. + * Used for determining cache utilization factor. */ - private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS; + private int numGets = 0; + + private final AtomicBoolean closed; + + private final PrefetchingStatistics prefetchingStatistics; /** * File attributes attached to any intermediate temporary file created during index creation. @@ -103,6 +119,8 @@ private enum LockType { READ, WRITE } + private Entry previous; + private Entry next; Entry(int blockNumber, Path path, int size, long checksum) { this.blockNumber = blockNumber; @@ -110,6 +128,8 @@ private enum LockType { this.size = size; this.checksum = checksum; this.lock = new ReentrantReadWriteLock(); + this.previous = null; + this.next = null; } @Override @@ -166,16 +186,37 @@ private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) { } return false; } + + private Entry getPrevious() { + return previous; + } + + private void setPrevious(Entry previous) { + this.previous = previous; + } + + private Entry getNext() { + return next; + } + + private void setNext(Entry next) { + this.next = next; + } } /** * Constructs an instance of a {@code SingleFilePerBlockCache}. * * @param prefetchingStatistics statistics for this stream. + * @param maxBlocksCount max blocks count to be kept in cache at any time. */ - public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) { + public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, int maxBlocksCount) { this.prefetchingStatistics = requireNonNull(prefetchingStatistics); this.closed = new AtomicBoolean(false); + this.maxBlocksCount = maxBlocksCount; + Preconditions.checkArgument(maxBlocksCount > 0, "maxBlocksCount should be more than 0"); + blocks = new ConcurrentHashMap<>(); + blocksLock = new ReentrantReadWriteLock(); } /** @@ -247,9 +288,60 @@ private Entry getEntry(int blockNumber) { throw new IllegalStateException(String.format("block %d not found in cache", blockNumber)); } numGets++; + addToLinkedListHead(entry); return entry; } + /** + * Helper method to add the given entry to the head of the linked list. + * + * @param entry Block entry to add. + */ + private void addToLinkedListHead(Entry entry) { + blocksLock.writeLock().lock(); + try { + addToHeadOfLinkedList(entry); + } finally { + blocksLock.writeLock().unlock(); + } + } + + /** + * Add the given entry to the head of the linked list. + * + * @param entry Block entry to add. + */ + private void addToHeadOfLinkedList(Entry entry) { + if (head == null) { + head = entry; + tail = entry; + } + LOG.debug( + "Block num {} to be added to the head. Current head block num: {} and tail block num: {}", + entry.blockNumber, head.blockNumber, tail.blockNumber); + if (entry != head) { + Entry prev = entry.getPrevious(); + Entry nxt = entry.getNext(); + // no-op if the block is already evicted + if (!blocks.containsKey(entry.blockNumber)) { + return; + } + if (prev != null) { + prev.setNext(nxt); + } + if (nxt != null) { + nxt.setPrevious(prev); + } + entry.setPrevious(null); + entry.setNext(head); + head.setPrevious(entry); + head = entry; + if (prev != null && prev.getNext() == null) { + tail = prev; + } + } + } + /** * Puts the given block in this cache. * @@ -278,6 +370,7 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf, } finally { entry.releaseLock(Entry.LockType.READ); } + addToLinkedListHead(entry); return; } @@ -299,9 +392,65 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf, // Update stream_read_blocks_in_cache stats only after blocks map is updated with new file // entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache. // If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of - // the input stream can lead to the removal of the cache file even before blocks is added with - // the new cache file, leading to incorrect value of stream_read_blocks_in_cache. + // the input stream can lead to the removal of the cache file even before blocks is added + // with the new cache file, leading to incorrect value of stream_read_blocks_in_cache. prefetchingStatistics.blockAddedToFileCache(); + addToLinkedListAndEvictIfRequired(entry); + } + + /** + * Add the given entry to the head of the linked list and if the LRU cache size + * exceeds the max limit, evict tail of the LRU linked list. + * + * @param entry Block entry to add. + */ + private void addToLinkedListAndEvictIfRequired(Entry entry) { + blocksLock.writeLock().lock(); + try { + addToHeadOfLinkedList(entry); + entryListSize++; + if (entryListSize > maxBlocksCount && !closed.get()) { + Entry elementToPurge = tail; + tail = tail.getPrevious(); + if (tail == null) { + tail = head; + } + tail.setNext(null); + elementToPurge.setPrevious(null); + deleteBlockFileAndEvictCache(elementToPurge); + } + } finally { + blocksLock.writeLock().unlock(); + } + } + + /** + * Delete cache file as part of the block cache LRU eviction. + * + * @param elementToPurge Block entry to evict. + */ + private void deleteBlockFileAndEvictCache(Entry elementToPurge) { + boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + if (!lockAcquired) { + LOG.error("Cache file {} deletion would not be attempted as write lock could not" + + " be acquired within {} {}", elementToPurge.path, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + } else { + try { + if (Files.deleteIfExists(elementToPurge.path)) { + entryListSize--; + prefetchingStatistics.blockRemovedFromFileCache(); + blocks.remove(elementToPurge.blockNumber); + } + } catch (IOException e) { + LOG.warn("Failed to delete cache file {}", elementToPurge.path, e); + } finally { + elementToPurge.releaseLock(Entry.LockType.WRITE); + } + } } private static final Set CREATE_OPTIONS = @@ -337,30 +486,38 @@ protected Path getCacheFilePath(final Configuration conf, public void close() throws IOException { if (closed.compareAndSet(false, true)) { LOG.debug(getStats()); - int numFilesDeleted = 0; - - for (Entry entry : blocks.values()) { - boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT, - PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); - if (!lockAcquired) { - LOG.error("Cache file {} deletion would not be attempted as write lock could not" - + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT, - PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); - continue; - } - try { - Files.deleteIfExists(entry.path); + deleteCacheFiles(); + } + } + + /** + * Delete cache files as part of the close call. + */ + private void deleteCacheFiles() { + int numFilesDeleted = 0; + for (Entry entry : blocks.values()) { + boolean lockAcquired = + entry.takeLock(Entry.LockType.WRITE, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + if (!lockAcquired) { + LOG.error("Cache file {} deletion would not be attempted as write lock could not" + + " be acquired within {} {}", entry.path, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + continue; + } + try { + if (Files.deleteIfExists(entry.path)) { prefetchingStatistics.blockRemovedFromFileCache(); numFilesDeleted++; - } catch (IOException e) { - LOG.warn("Failed to delete cache file {}", entry.path, e); - } finally { - entry.releaseLock(Entry.LockType.WRITE); } + } catch (IOException e) { + LOG.warn("Failed to delete cache file {}", entry.path, e); + } finally { + entry.releaseLock(Entry.LockType.WRITE); } - - LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted); } + LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index dd543deb8a5a5..5f841bd233d34 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -974,6 +974,14 @@ + + fs.trash.clean.trashroot.enable + false + Whether clean some directories and files + in Trash home which are not under checkpoint directory. + + + fs.protected.directories diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java index 5b8c10b3fa6f9..30c9a31fda4ea 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java @@ -32,6 +32,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.junit.After; import org.junit.Before; @@ -786,6 +787,55 @@ public void testTrashEmptier() throws Exception { emptierThread.join(); } + /** + * Test trash emptier can delete non-checkpoint dir or not. + * @throws Exception + */ + @Test() + public void testTrashEmptierCleanDirNotInCheckpointDir() throws Exception { + Configuration conf = new Configuration(); + // Trash with 12 second deletes and 6 seconds checkpoints. + conf.set(FS_TRASH_INTERVAL_KEY, "0.2"); // 12 seconds + conf.setClass("fs.file.impl", TestLFS.class, FileSystem.class); + conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "0.1"); // 6 seconds + conf.setBoolean(FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, true); + FileSystem fs = FileSystem.getLocal(conf); + conf.set("fs.default.name", fs.getUri().toString()); + + Trash trash = new Trash(conf); + + // Start Emptier in background. + Runnable emptier = trash.getEmptier(); + Thread emptierThread = new Thread(emptier); + emptierThread.start(); + + FsShell shell = new FsShell(); + shell.setConf(conf); + shell.init(); + + // Make sure the .Trash dir existed. + mkdir(fs, shell.getCurrentTrashDir()); + assertTrue(fs.exists(shell.getCurrentTrashDir())); + // Create a directory under .Trash directly. + Path myPath = new Path(shell.getCurrentTrashDir().getParent(), "test_dirs"); + mkdir(fs, myPath); + assertTrue(fs.exists(myPath)); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return !fs.exists(myPath); + } catch (IOException e) { + // Do nothing. + } + return false; + } + }, 6000, 60000); + emptierThread.interrupt(); + emptierThread.join(); + } + @After public void tearDown() throws IOException { File trashDir = new File(TEST_DIR.toUri().getPath()); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java index 3b60c1c795336..b32ce20a37354 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java @@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase { public void testArgChecks() throws Exception { // Should not throw. BlockCache cache = - new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance()); + new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2); ByteBuffer buffer = ByteBuffer.allocate(16); @@ -55,7 +55,7 @@ public void testArgChecks() throws Exception { intercept(NullPointerException.class, null, - () -> new SingleFilePerBlockCache(null)); + () -> new SingleFilePerBlockCache(null, 2)); } @@ -63,7 +63,7 @@ public void testArgChecks() throws Exception { @Test public void testPutAndGet() throws Exception { BlockCache cache = - new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance()); + new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2); ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE); for (byte i = 0; i < BUFFER_SIZE; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java index 81c40fe6346cc..90dcd83ddba1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java @@ -75,6 +75,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -867,6 +868,15 @@ public static ErasureCodingPolicyInfo toECPolicyInfo(Map m) { return new ErasureCodingPolicyInfo(ecPolicy, ecPolicyState); } + public static Map getErasureCodeCodecs(Map json) { + Map map = new HashMap<>(); + Map m = (Map) json.get("ErasureCodingCodecs"); + m.forEach((key, value) -> { + map.put((String) key, (String) value); + }); + return map; + } + private static List toDiffList( List objs) { if (objs == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index f65ec98a9d782..5210692ab324e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -2206,6 +2206,19 @@ Collection decodeResponse(Map json) { }.run(); } + public Map getAllErasureCodingCodecs() + throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_EC_CODECS); + final HttpOpParam.Op op = GetOpParam.Op.GETECCODECS; + return new FsPathResponseRunner>(op, null) { + @Override + Map decodeResponse(Map json) { + return JsonUtilClient.getErasureCodeCodecs(json); + } + }.run(); + } + @VisibleForTesting InetSocketAddress[] getResolvedNNAddr() { return nnAddrs; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java index 093609843ae3c..3efe37b61f80a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java @@ -68,6 +68,7 @@ public enum Op implements HttpOpParam.Op { GETFILELINKSTATUS(false, HttpURLConnection.HTTP_OK), GETSTATUS(false, HttpURLConnection.HTTP_OK), GETECPOLICIES(false, HttpURLConnection.HTTP_OK), + GETECCODECS(false, HttpURLConnection.HTTP_OK), GETSNAPSHOTLIST(false, HttpURLConnection.HTTP_OK); final boolean redirect; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index fab3619cb2cea..e09fe7fbd9bc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1696,7 +1696,7 @@ public void verifyReplication(String src, public boolean isSufficientlyReplicated(BlockInfo b) { // Compare against the lesser of the minReplication and number of live DNs. final int liveReplicas = countNodes(b).liveReplicas(); - if (liveReplicas >= minReplication) { + if (hasMinStorage(b, liveReplicas)) { return true; } // getNumLiveDataNodes() is very expensive and we minimize its use by diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 2096f18d31abf..0ed1304cb8f02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -77,6 +77,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; @@ -356,7 +360,9 @@ public class DataNode extends ReconfigurableBase DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, FS_DU_INTERVAL_KEY, FS_GETSPACEUSED_JITTER_KEY, - FS_GETSPACEUSED_CLASSNAME)); + FS_GETSPACEUSED_CLASSNAME, + DFS_DISK_BALANCER_ENABLED, + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL)); public static final String METRICS_LOG_NAME = "DataNodeMetricsLog"; @@ -706,6 +712,9 @@ public String reconfigurePropertyImpl(String property, String newVal) case FS_GETSPACEUSED_JITTER_KEY: case FS_GETSPACEUSED_CLASSNAME: return reconfDfsUsageParameters(property, newVal); + case DFS_DISK_BALANCER_ENABLED: + case DFS_DISK_BALANCER_PLAN_VALID_INTERVAL: + return reconfDiskBalancerParameters(property, newVal); default: break; } @@ -951,6 +960,44 @@ private String reconfDfsUsageParameters(String property, String newVal) } } + private String reconfDiskBalancerParameters(String property, String newVal) + throws ReconfigurationException { + String result = null; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + if (property.equals(DFS_DISK_BALANCER_ENABLED)) { + if (newVal != null && !newVal.equalsIgnoreCase("true") + && !newVal.equalsIgnoreCase("false")) { + throw new IllegalArgumentException("Not a valid Boolean value for " + property); + } + boolean enable = (newVal == null ? DFS_DISK_BALANCER_ENABLED_DEFAULT : + Boolean.parseBoolean(newVal)); + getDiskBalancer().setDiskBalancerEnabled(enable); + result = Boolean.toString(enable); + } else if (property.equals(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL)) { + if (newVal == null) { + // set to default + long defaultInterval = getConf().getTimeDuration( + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + getDiskBalancer().setPlanValidityInterval(defaultInterval); + result = DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT; + } else { + long newInterval = getConf() + .getTimeDurationHelper(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + newVal, TimeUnit.MILLISECONDS); + getDiskBalancer().setPlanValidityInterval(newInterval); + result = newVal; + } + } + LOG.info("RECONFIGURE* changed {} to {}", property, result); + return result; + } catch (IllegalArgumentException | IOException e) { + throw new ReconfigurationException(property, newVal, getConf().get(property), e); + } + } + /** * Get a list of the keys of the re-configurable properties in configuration. */ @@ -4201,7 +4248,8 @@ public List getVolumeReport() throws IOException { return volumeInfoList; } - private DiskBalancer getDiskBalancer() throws IOException { + @VisibleForTesting + public DiskBalancer getDiskBalancer() throws IOException { if (this.diskBalancer == null) { throw new IOException("DiskBalancer is not initialized"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index f97dbbfcd6e8a..bf88e6fe88bb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -406,6 +406,7 @@ public void run() { } try { reconcile(); + dataset.setLastDirScannerFinishTime(System.currentTimeMillis()); } catch (Exception e) { // Log and continue - allows Executor to run again next cycle LOG.error( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 4126140678759..e2f9877483156 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -82,14 +82,14 @@ public class DiskBalancer { private final BlockMover blockMover; private final ReentrantLock lock; private final ConcurrentHashMap workMap; - private boolean isDiskBalancerEnabled = false; + private volatile boolean isDiskBalancerEnabled = false; private ExecutorService scheduler; private Future future; private String planID; private String planFile; private DiskBalancerWorkStatus.Result currentResult; private long bandwidth; - private long planValidityInterval; + private volatile long planValidityInterval; private final Configuration config; /** @@ -341,6 +341,58 @@ private void checkDiskBalancerEnabled() } } + /** + * Sets Disk balancer is to enable or not to enable. + * + * @param diskBalancerEnabled + * true, enable diskBalancer, otherwise false to disable it. + */ + public void setDiskBalancerEnabled(boolean diskBalancerEnabled) { + isDiskBalancerEnabled = diskBalancerEnabled; + } + + /** + * Returns the value indicating if diskBalancer is enabled. + * + * @return boolean. + */ + @VisibleForTesting + public boolean isDiskBalancerEnabled() { + return isDiskBalancerEnabled; + } + + /** + * Sets maximum amount of time disk balancer plan is valid. + * + * @param planValidityInterval - maximum amount of time in the unit of milliseconds. + */ + public void setPlanValidityInterval(long planValidityInterval) { + this.config.setTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + planValidityInterval, TimeUnit.MILLISECONDS); + this.planValidityInterval = planValidityInterval; + } + + /** + * Gets maximum amount of time disk balancer plan is valid. + * + * @return the maximum amount of time in milliseconds. + */ + @VisibleForTesting + public long getPlanValidityInterval() { + return planValidityInterval; + } + + /** + * Gets maximum amount of time disk balancer plan is valid in config. + * + * @return the maximum amount of time in milliseconds. + */ + @VisibleForTesting + public long getPlanValidityIntervalInConfig() { + return config.getTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + } + /** * Verifies that user provided plan is valid. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 4cad7aa4d36bc..4ab7e1be84523 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -692,4 +692,10 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, * Get the volume list. */ List getVolumeList(); + + /** + * Set the last time in milliseconds when the directory scanner successfully ran. + * @param time the last time in milliseconds when the directory scanner successfully ran. + */ + default void setLastDirScannerFinishTime(long time) {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d81b5411c531b..f1115efcc21dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -284,7 +284,8 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) private long maxDirScannerNotifyCount; private long curDirScannerNotifyCount; private long lastDirScannerNotifyTime; - + private volatile long lastDirScannerFinishTime; + /** * An FSDataset has a directory where it loads its data files. */ @@ -3811,5 +3812,15 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) { public List getVolumeList() { return volumes.getVolumes(); } + + @Override + public long getLastDirScannerFinishTime() { + return this.lastDirScannerFinishTime; + } + + @Override + public void setLastDirScannerFinishTime(long time) { + this.lastDirScannerFinishTime = time; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetricHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetricHelper.java index b1a2d4f956ce9..65fd92ec78438 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetricHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetricHelper.java @@ -69,10 +69,11 @@ public static void getMetrics(MetricsCollector collector, " of blocks cached"), beanClass.getNumBlocksCached()) .addGauge(Interns.info("NumBlocksFailedToCache", "Datanode number of " + "blocks failed to cache"), beanClass.getNumBlocksFailedToCache()) - .addGauge(Interns.info("NumBlocksFailedToUnCache", "Datanode number of" + + .addGauge(Interns.info("NumBlocksFailedToUnCache", "Datanode number of" + " blocks failed in cache eviction"), - beanClass.getNumBlocksFailedToUncache()); - + beanClass.getNumBlocksFailedToUncache()) + .addGauge(Interns.info("LastDirectoryScannerFinishTime", + "Finish time of the last directory scan"), beanClass.getLastDirScannerFinishTime()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java index c2f175b97d132..0bfb14e232f23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java @@ -122,4 +122,9 @@ public interface FSDatasetMBean extends MetricsSource { * Returns the number of blocks that the datanode was unable to uncache */ public long getNumBlocksFailedToUncache(); + + /** + * Returns the last time in milliseconds when the directory scanner successfully ran. + */ + long getLastDirScannerFinishTime(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 8495256d4b726..c00ff5d8ec5e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -33,6 +33,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import javax.servlet.ServletContext; @@ -1413,6 +1414,11 @@ protected Response get( final String js = JsonUtil.toJsonString(ecPolicyInfos); return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } + case GETECCODECS: { + Map ecCodecs = cp.getErasureCodingCodecs(); + final String js = JsonUtil.toJsonString("ErasureCodingCodecs", ecCodecs); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } default: throw new UnsupportedOperationException(op + " is not supported"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index 435e615b30d64..01ae1b3e47724 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -390,6 +390,7 @@ Usage: hdfs dfsadmin [-fetchImage ] hdfs dfsadmin [-allowSnapshot ] hdfs dfsadmin [-disallowSnapshot ] + hdfs dfsadmin [-provisionSnapshotTrash [-all]] hdfs dfsadmin [-shutdownDatanode [upgrade]] hdfs dfsadmin [-evictWriters ] hdfs dfsadmin [-getDatanodeInfo ] @@ -428,6 +429,7 @@ Usage: | `-fetchImage` \ | Downloads the most recent fsimage from the NameNode and saves it in the specified local directory. | | `-allowSnapshot` \ | Allowing snapshots of a directory to be created. If the operation completes successfully, the directory becomes snapshottable. See the [HDFS Snapshot Documentation](./HdfsSnapshots.html) for more information. | | `-disallowSnapshot` \ | Disallowing snapshots of a directory to be created. All snapshots of the directory must be deleted before disallowing snapshots. See the [HDFS Snapshot Documentation](./HdfsSnapshots.html) for more information. | +| `-provisionSnapshotTrash` \ [-all] | Provision trash root in one or all snapshottable directories. See the [HDFS Snapshot Documentation](./HdfsSnapshots.html) for more information. | | `-shutdownDatanode` \ [upgrade] | Submit a shutdown request for the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-shutdownDatanode) for the detail. | | `-evictWriters` \ | Make the datanode evict all clients that are writing a block. This is useful if decommissioning is hung due to slow writers. | | `-getDatanodeInfo` \ | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsSnapshots.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsSnapshots.md index af9f83ad1c860..0617bfc5bbdfc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsSnapshots.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsSnapshots.md @@ -151,6 +151,22 @@ All snapshots of the directory must be deleted before disallowing snapshots. See also the corresponding Java API `void disallowSnapshot(Path path)` in `HdfsAdmin`. +#### Provision SnapshotTrash + +Provision trash root in one or all snapshottable directories. + +* Command: + + hdfs dfsadmin -provisionSnapshotTrash [-all] + +* Arguments: + + | --- | --- | + | path | The path of the snapshottable directory. | + | -all | Which is an optional argument, when it is set will provision trash root in all snapshottable directories. | + +See also the corresponding Java API +`void provisionSnapshotTrash(Path path)` in `HdfsAdmin`. ### User Operations diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md index baee66211aeae..969d288b4c97b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md @@ -62,6 +62,7 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop * [`GETFILELINKSTATUS`](#Get_File_Link_Status) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileLinkStatus) * [`GETSTATUS`](#Get_Status) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStatus) * [`GETECPOLICIES`](#Get_EC_Policies) + * [`GETECCODECS`](#Get_EC_Codecs) * HTTP PUT * [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create) * [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs) @@ -1252,6 +1253,26 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileLi See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStatus +### Get EC Codecs + +* Submit a HTTP GET request. + + curl -i "http://:/webhdfs/v1/?op=GETALLECCODECS" + + The client receives a response with a [`ECCodecs` JSON object](#EC_Codecs_JSON_Schema): + + HTTP/1.1 200 OK + Content-Type: application/json + Transfer-Encoding: chunked + + { + "ErasureCodeCodecs": { + "rs": "rs_native, rs_java", + "rs-legacy": "rs-legacy_java", + "xor":"xor_native, xor_java" + } + } + Storage Policy Operations ------------------------- @@ -3244,6 +3265,18 @@ var blockLocationProperties = } ``` +### EC Codecs JSON Schema + +```json +{ + "ErasureCodingCodecs": { + "rs": "rs_native, rs_java", + "rs-legacy": "rs-legacy_java", + "xor": "xor_native, xor_java" + } +} +``` + HTTP Query Parameter Dictionary ------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 1d479f34e0258..8c75ca9f75208 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -939,6 +939,11 @@ public long getNumBlocksFailedToUncache() { return 0L; } + @Override + public long getLastDirScannerFinishTime() { + return 0L; + } + /** * Get metrics from the metrics source * @@ -1632,5 +1637,10 @@ public MountVolumeMap getMountVolumeMap() { public List getVolumeList() { return null; } + + @Override + public void setLastDirScannerFinishTime(long time) { + throw new UnsupportedOperationException(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index d9578ca02a949..a14ee2554f03a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -46,6 +46,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -57,6 +61,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; @@ -794,4 +799,56 @@ public void testDfsUsageKlass() throws ReconfigurationException, InterruptedExce Thread.sleep(5000); assertTrue(counter > lastCounter); } + + @Test + public void testDiskBalancerParameters() throws Exception { + for (int i = 0; i < NUM_DATA_NODE; i++) { + DataNode dn = cluster.getDataNodes().get(i); + + // Verify DFS_DISK_BALANCER_ENABLED. + // Try invalid values. + LambdaTestUtils.intercept(ReconfigurationException.class, + "Could not change property dfs.disk.balancer.enabled from 'true' to 'text'", + () -> dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "text")); + + // Set default value. + dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, null); + assertEquals(dn.getConf().getBoolean(DFS_DISK_BALANCER_ENABLED, + DFS_DISK_BALANCER_ENABLED_DEFAULT), dn.getDiskBalancer().isDiskBalancerEnabled()); + + // Set DFS_DISK_BALANCER_ENABLED to false. + dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "false"); + assertFalse(dn.getDiskBalancer().isDiskBalancerEnabled()); + + // Set DFS_DISK_BALANCER_ENABLED to true. + dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "true"); + assertTrue(dn.getDiskBalancer().isDiskBalancerEnabled()); + + // Verify DFS_DISK_BALANCER_PLAN_VALID_INTERVAL. + // Try invalid values. + LambdaTestUtils.intercept(ReconfigurationException.class, + "Could not change property dfs.disk.balancer.plan.valid.interval from " + + "'1d' to 'text'", + () -> dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "text")); + + // Set default value. + dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, null); + assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS), + dn.getDiskBalancer().getPlanValidityInterval()); + assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS), + dn.getDiskBalancer().getPlanValidityIntervalInConfig()); + + // Set value is 6 then 6 milliseconds. + dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "" + 6); + assertEquals(6, dn.getDiskBalancer().getPlanValidityInterval()); + assertEquals(6, dn.getDiskBalancer().getPlanValidityIntervalInConfig()); + + // Set value with time unit. + dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "1m"); + assertEquals(60000, dn.getDiskBalancer().getPlanValidityInterval()); + assertEquals(60000, dn.getDiskBalancer().getPlanValidityIntervalInConfig()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 74c70cec76967..244b60e13867c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -23,6 +23,7 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -1304,6 +1305,24 @@ public void testLocalReplicaUpdateWithReplica() throws Exception { assertEquals(realBlkFile, localReplica.getBlockFile()); } + @Test(timeout = 60000) + public void testLastDirScannerFinishTimeIsUpdated() throws Exception { + Configuration conf = getConfiguration(); + conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 3L); + cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + final DataNode dn = cluster.getDataNodes().get(0); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + long lastDirScannerFinishTime = fds.getLastDirScannerFinishTime(); + dn.getDirectoryScanner().run(); + assertNotEquals(lastDirScannerFinishTime, fds.getLastDirScannerFinishTime()); + } finally { + cluster.shutdown(); + } + } + public long getRandomBlockId() { return Math.abs(new Random().nextLong()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 413a2e6b594f2..1bd42e0bdfbeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -477,4 +477,10 @@ public MountVolumeMap getMountVolumeMap() { public List getVolumeList() { return null; } + + @Override + public long getLastDirScannerFinishTime() { + return 0L; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 0ee7eb3ec1556..1c94e351c2b52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -238,7 +238,6 @@ public void setUp() throws IOException { for (String bpid : BLOCK_POOL_IDS) { dataset.addBlockPool(bpid, conf); } - assertEquals(NUM_INIT_VOLUMES, getNumVolumes()); assertEquals(0, dataset.getNumFailedVolumes()); } @@ -250,6 +249,13 @@ public void checkDataSetLockManager() { assertNull(manager.getLastException()); } + @Test + public void testSetLastDirScannerFinishTime() throws IOException { + assertEquals(dataset.getLastDirScannerFinishTime(), 0L); + dataset.setLastDirScannerFinishTime(System.currentTimeMillis()); + assertNotEquals(0L, dataset.getLastDirScannerFinishTime()); + } + @Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index 21c9a5937e9bf..258f812dffa07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -197,7 +197,7 @@ public void testGetDiskBalancerInvalidSetting() throws Exception { } @Test - public void testgetDiskBalancerBandwidth() throws Exception { + public void testGetDiskBalancerBandwidth() throws Exception { RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); DataNode dataNode = rpcTestHelper.getDataNode(); String planHash = rpcTestHelper.getPlanHash(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java index 771caefd20a7b..0cf696f504b9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java @@ -394,7 +394,7 @@ private void updateCountForQuota(int i) { FSNamesystem fsn = cluster.getNamesystem(); fsn.writeLock(); try { - getFSDirectory().updateCountForQuota(1); + getFSDirectory().updateCountForQuota(i); } finally { fsn.writeUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 4b1db53e8838b..78664e27ca286 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -346,7 +346,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("datanode", address, outs, errs); - assertEquals(20, outs.size()); + assertEquals(22, outs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java index 6bd737324887f..18382d201e3e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java @@ -2318,6 +2318,30 @@ public void testGetErasureCodingPolicies() throws Exception { } } + @Test + public void getAllErasureCodingCodecs() throws Exception { + final Configuration conf = WebHdfsTestUtil.createConf(); + cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + + final WebHdfsFileSystem webHdfs = + WebHdfsTestUtil.getWebHdfsFileSystem(conf, + WebHdfsConstants.WEBHDFS_SCHEME); + + final DistributedFileSystem dfs = cluster.getFileSystem(); + + Map webHdfsEcCodecs = webHdfs.getAllErasureCodingCodecs(); + + Map dfsEcCodecs = dfs.getAllErasureCodingCodecs(); + + //Validate erasureCodingCodecs are the same as DistributedFileSystem + assertEquals(webHdfsEcCodecs, dfsEcCodecs); + } finally { + cluster.shutdown(); + } + } + /** * Get FileStatus JSONObject from ListStatus response. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index ce65cf67772d9..6c1d6371d179d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1276,4 +1276,16 @@ private Constants() { public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED = "fs.s3a.capability.multipart.uploads.enabled"; + /** + * Prefetch max blocks count config. + * Value = {@value} + */ + public static final String PREFETCH_MAX_BLOCKS_COUNT = "fs.s3a.prefetch.max.blocks.count"; + + /** + * Default value for max blocks count config. + * Value = {@value} + */ + public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 4; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java index c166943c00ead..a02922053aa39 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java @@ -33,6 +33,9 @@ import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; + /** * Provides access to S3 file one block at a time. */ @@ -67,7 +70,13 @@ public S3ACachingBlockManager( Configuration conf, LocalDirAllocator localDirAllocator) { - super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator); + super(futurePool, + blockData, + bufferPoolSize, + streamStatistics, + conf, + localDirAllocator, + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)); Validate.checkNotNull(reader, "reader"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java new file mode 100644 index 0000000000000..bbe01887588cc --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * Test the prefetching input stream with LRU cache eviction on S3ACachingInputStream. + */ +@RunWith(Parameterized.class) +public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest { + + private final String maxBlocks; + + @Parameterized.Parameters(name = "max-blocks-{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {"1"}, + {"2"}, + {"3"}, + {"4"} + }); + } + + public ITestS3APrefetchingLruEviction(final String maxBlocks) { + super(true); + this.maxBlocks = maxBlocks; + } + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class); + + private static final int S_1K = 1024; + // Path for file which should have length > block size so S3ACachingInputStream is used + private Path largeFile; + private FileSystem largeFileFS; + private int blockSize; + + private static final int TIMEOUT_MILLIS = 5000; + private static final int INTERVAL_MILLIS = 500; + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT); + conf.setBoolean(PREFETCH_ENABLED_KEY, true); + conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks)); + return conf; + } + + @Override + public void teardown() throws Exception { + super.teardown(); + cleanupWithLogger(LOG, largeFileFS); + largeFileFS = null; + } + + private void openFS() throws Exception { + Configuration conf = getConfiguration(); + String largeFileUri = S3ATestUtils.getCSVTestFile(conf); + + largeFile = new Path(largeFileUri); + blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); + largeFileFS = new S3AFileSystem(); + largeFileFS.initialize(new URI(largeFileUri), getConfiguration()); + } + + @Test + public void testSeeksWithLruEviction() throws Throwable { + IOStatistics ioStats; + openFS(); + + ExecutorService executorService = Executors.newFixedThreadPool(5, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("testSeeksWithLruEviction-%d") + .build()); + CountDownLatch countDownLatch = new CountDownLatch(7); + + try (FSDataInputStream in = largeFileFS.open(largeFile)) { + ioStats = in.getIOStatistics(); + // tests to add multiple blocks in the prefetch cache + // and let LRU eviction take place as more cache entries + // are added with multiple block reads. + + // Don't read block 0 completely + executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, + in, + 0, + blockSize - S_1K * 10)); + + // Seek to block 1 and don't read completely + executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, + in, + blockSize, + 2 * S_1K)); + + // Seek to block 2 and don't read completely + executorService.submit(() -> readFullyWithSeek(countDownLatch, + in, + blockSize * 2L, + 2 * S_1K)); + + // Seek to block 3 and don't read completely + executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, + in, + blockSize * 3L, + 2 * S_1K)); + + // Seek to block 4 and don't read completely + executorService.submit(() -> readFullyWithSeek(countDownLatch, + in, + blockSize * 4L, + 2 * S_1K)); + + // Seek to block 5 and don't read completely + executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, + in, + blockSize * 5L, + 2 * S_1K)); + + // backward seek, can't use block 0 as it is evicted + executorService.submit(() -> readFullyWithSeek(countDownLatch, + in, + S_1K * 5, + 2 * S_1K)); + + countDownLatch.await(); + + // expect 3 blocks as rest are to be evicted by LRU + LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> { + LOG.info("IO stats: {}", ioStats); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, + Integer.parseInt(maxBlocks)); + }); + // let LRU evictions settle down, if any + Thread.sleep(TIMEOUT_MILLIS); + } finally { + executorService.shutdownNow(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + } + LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> { + LOG.info("IO stats: {}", ioStats); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); + }); + } + + /** + * Read the bytes from the given position in the stream to a new buffer using the positioned + * readable. + * + * @param countDownLatch count down latch to mark the operation completed. + * @param in input stream. + * @param position position in the given input stream to seek from. + * @param len the number of bytes to read. + * @return true if the read operation is successful. + */ + private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in, + long position, int len) { + byte[] buffer = new byte[blockSize]; + // Don't read block 0 completely + try { + in.readFully(position, buffer, 0, len); + countDownLatch.countDown(); + return true; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Read the bytes from the given position in the stream to a new buffer using seek followed by + * input stream read. + * + * @param countDownLatch count down latch to mark the operation completed. + * @param in input stream. + * @param position position in the given input stream to seek from. + * @param len the number of bytes to read. + * @return true if the read operation is successful. + */ + private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in, + long position, int len) { + byte[] buffer = new byte[blockSize]; + // Don't read block 0 completely + try { + in.seek(position); + in.readFully(buffer, 0, len); + countDownLatch.countDown(); + return true; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java index cf6aa7ba1aa89..6cf2ab241e239 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.SingleFilePerBlockCache; import org.apache.hadoop.fs.impl.prefetch.Validate; +import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; import org.apache.hadoop.fs.s3a.S3AFileStatus; @@ -314,7 +315,8 @@ public static class FakeS3FilePerBlockCache extends SingleFilePerBlockCache { private final int writeDelay; public FakeS3FilePerBlockCache(int readDelay, int writeDelay) { - super(new EmptyS3AStatisticsContext().newInputStreamStatistics()); + super(new EmptyS3AStatisticsContext().newInputStreamStatistics(), + Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT); this.files = new ConcurrentHashMap<>(); this.readDelay = readDelay; this.writeDelay = writeDelay; @@ -387,7 +389,7 @@ public int read(ByteBuffer buffer, long offset, int size) } @Override - protected BlockCache createCache() { + protected BlockCache createCache(int maxBlocksCount) { final int readDelayMs = 50; final int writeDelayMs = 200; return new FakeS3FilePerBlockCache(readDelayMs, writeDelayMs); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java index cbfa643ee5362..8ec94d469da64 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java @@ -37,7 +37,9 @@ import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT; import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; @@ -173,6 +175,10 @@ protected void cachePut(int blockNumber, super.cachePut(blockNumber, buffer); } } + + public Configuration getConf() { + return CONF; + } } // @Ignore @@ -285,8 +291,11 @@ streamStatistics, conf, new LocalDirAllocator( blockManager.requestCaching(data); } - waitForCaching(blockManager, blockData.getNumBlocks()); - assertEquals(blockData.getNumBlocks(), blockManager.numCached()); + waitForCaching(blockManager, Math.min(blockData.getNumBlocks(), + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))); + assertEquals(Math.min(blockData.getNumBlocks(), + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)), + blockManager.numCached()); assertEquals(0, this.totalErrors(blockManager)); } @@ -330,8 +339,11 @@ public void testCachingOfGetHelper(boolean forceCachingFailure) } blockManager.requestCaching(data); - waitForCaching(blockManager, expectedNumSuccesses); - assertEquals(expectedNumSuccesses, blockManager.numCached()); + waitForCaching(blockManager, Math.min(expectedNumSuccesses, blockManager.getConf() + .getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))); + assertEquals(Math.min(expectedNumSuccesses, blockManager.getConf() + .getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)), + blockManager.numCached()); if (forceCachingFailure) { assertEquals(expectedNumErrors, this.totalErrors(blockManager)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index eceb7b25e48ad..e82fcefb6ac06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; @@ -497,6 +498,7 @@ private class MonitoringThread extends Thread { public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { + long start = Time.monotonicNow(); // Print the processTrees for debugging. if (LOG.isDebugEnabled()) { StringBuilder tmp = new StringBuilder("[ "); @@ -587,6 +589,9 @@ public void run() { // Save the aggregated utilization of the containers setContainersUtilization(trackedContainersUtilization); + long duration = Time.monotonicNow() - start; + LOG.debug("Finished monitoring container cost {} ms", duration); + // Publish the container utilization metrics to node manager // metrics system. NodeManagerMetrics nmMetrics = context.getNodeManagerMetrics(); @@ -597,6 +602,7 @@ public void run() { trackedContainersUtilization.getVirtualMemory()); nmMetrics.setContainerCpuUtilization( trackedContainersUtilization.getCPU()); + nmMetrics.addContainerMonitorCostTime(duration); } try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java index 775196f582887..86c67f74d79ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java @@ -118,6 +118,9 @@ public class NodeManagerMetrics { @Metric("Container localization time in milliseconds") MutableRate localizationDurationMillis; + @Metric("ContainerMonitor time cost in milliseconds") + MutableGaugeLong containersMonitorCostTime; + // CHECKSTYLE:ON:VisibilityModifier private JvmMetrics jvmMetrics = null; @@ -481,4 +484,9 @@ public void localizationCacheHitMiss(long size) { public void localizationComplete(long downloadMillis) { localizationDurationMillis.add(downloadMillis); } + + public void addContainerMonitorCostTime(long duration) { + containersMonitorCostTime.incr(duration); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java index 84216665156e8..a20f27d15a315 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java @@ -130,9 +130,12 @@ public void testReferenceOfSingletonJvmMetrics() { // Update resource and check available resource again metrics.addResource(total); + metrics.addContainerMonitorCostTime(200L); + MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics"); assertGauge("AvailableGB", 12, rb); assertGauge("AvailableVCores", 19, rb); + assertGauge("ContainersMonitorCostTime", 200L, rb); } public static void checkMetrics(int launched, int completed, int failed,