diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 844b67ce1a877..8e547a475ada4 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -1073,12 +1073,7 @@ public void start() { formatThreadName("lifeline", lifelineNnAddr)); lifelineThread.setDaemon(true); lifelineThread.setUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread thread, Throwable t) { - LOG.error(thread + " terminating on unexpected exception", t); - } - }); + (thread, t) -> LOG.error("{} terminating on unexpected exception", thread, t)); lifelineThread.start(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java index 073576546c790..be4fa0fed54e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -123,14 +123,11 @@ void shutDownAll(List bposList) throws InterruptedException { synchronized void startAll() throws IOException { try { UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - for (BPOfferService bpos : offerServices) { - bpos.start(); - } - return null; + (PrivilegedExceptionAction) () -> { + for (BPOfferService bpos : offerServices) { + bpos.start(); } + return null; }); } catch (InterruptedException ex) { IOException ioe = new IOException(); @@ -274,13 +271,10 @@ private void doRefreshNamenodes( nnIds.add(nnId); } try { - UserGroupInformation.getLoginUser() - .doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs); - return null; - } + UserGroupInformation.getLoginUser().doAs( + (PrivilegedExceptionAction) () -> { + bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs); + return null; }); } catch (InterruptedException ex) { IOException ioe = new IOException(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index a82ed0045a6a3..f019f64f383fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -489,13 +489,9 @@ private void doUpgrade(final StorageDirectory bpSd, if (callables == null) { doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf); } else { - callables.add(new Callable() { - @Override - public StorageDirectory call() throws Exception { - doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, - conf); - return bpSd; - } + callables.add(() -> { + doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf); + return bpSd; }); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java index e4861f9774870..5ad85df9a4292 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java @@ -598,26 +598,23 @@ DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP( public Daemon recoverBlocks(final String who, final Collection blocks) { - Daemon d = new Daemon(datanode.threadGroup, new Runnable() { - @Override - public void run() { - datanode.metrics.incrDataNodeBlockRecoveryWorkerCount(); - try { - for (RecoveringBlock b : blocks) { - try { - logRecoverBlock(who, b); - if (b.isStriped()) { - new RecoveryTaskStriped((RecoveringStripedBlock) b).recover(); - } else { - new RecoveryTaskContiguous(b).recover(); - } - } catch (IOException e) { - LOG.warn("recover Block: {} FAILED: {}", b, e); + Daemon d = new Daemon(datanode.threadGroup, () -> { + datanode.metrics.incrDataNodeBlockRecoveryWorkerCount(); + try { + for (RecoveringBlock b : blocks) { + try { + logRecoverBlock(who, b); + if (b.isStriped()) { + new RecoveryTaskStriped((RecoveringStripedBlock) b).recover(); + } else { + new RecoveryTaskContiguous(b).recover(); } + } catch (IOException e) { + LOG.warn("recover Block: {} FAILED: {}", b, e); } - } finally { - datanode.metrics.decrDataNodeBlockRecoveryWorkerCount(); } + } finally { + datanode.metrics.decrDataNodeBlockRecoveryWorkerCount(); } }); d.start(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d64502b6ddc86..6453d43c9966c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -124,7 +124,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -202,7 +201,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; @@ -1176,16 +1174,13 @@ private void refreshVolumes(String newVolumes) throws IOException { Preconditions.checkNotNull(data, "Storage not yet initialized"); for (final StorageLocation location : changedVolumes.newLocations) { - exceptions.add(service.submit(new Callable() { - @Override - public IOException call() { - try { - data.addVolume(location, nsInfos); - } catch (IOException e) { - return e; - } - return null; + exceptions.add(service.submit(() -> { + try { + data.addVolume(location, nsInfos); + } catch (IOException e) { + return e; } + return null; })); } @@ -2242,14 +2237,10 @@ public static InterDatanodeProtocol createInterDataNodeProtocolProxy( dnAddr, addr); final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); try { - return loginUgi - .doAs(new PrivilegedExceptionAction() { - @Override - public InterDatanodeProtocol run() throws IOException { - return new InterDatanodeProtocolTranslatorPB(addr, loginUgi, - conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout); - } - }); + return loginUgi.doAs( + (PrivilegedExceptionAction) () -> + new InterDatanodeProtocolTranslatorPB(addr, loginUgi, conf, + NetUtils.getDefaultSocketFactory(conf), socketTimeout)); } catch (InterruptedException ie) { throw new IOException(ie.getMessage()); } @@ -3029,14 +3020,8 @@ public Token getBlockAccessToken(ExtendedBlock b, */ public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( final ExtendedBlock block) { - return new DataEncryptionKeyFactory() { - @Override - public DataEncryptionKey newDataEncryptionKey() { - return dnConf.encryptDataTransfer ? - blockPoolTokenSecretManager.generateDataEncryptionKey( - block.getBlockPoolId()) : null; - } - }; + return () -> dnConf.encryptDataTransfer ? + blockPoolTokenSecretManager.generateDataEncryptionKey(block.getBlockPoolId()) : null; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 782f2f36cc198..a1fe9e1230161 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -839,12 +839,9 @@ void doUpgradePreFederation(final StorageDirectory sd, if (callables == null) { doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf); } else { - callables.add(new Callable() { - @Override - public StorageDirectory call() throws Exception { - doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf); - return sd; - } + callables.add(() -> { + doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf); + return sd; }); } } @@ -1138,17 +1135,13 @@ private static void linkBlocks(File from, File to, int oldLV, List> futures = Lists.newArrayList(); for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) { final int iCopy = i; - futures.add(linkWorkers.submit(new Callable() { - @Override - public Void call() throws IOException { - int upperBound = Math.min(iCopy + step, - idBasedLayoutSingleLinks.size()); - for (int j = iCopy; j < upperBound; j++) { - LinkArgs cur = idBasedLayoutSingleLinks.get(j); - HardLink.createHardLink(cur.src(), cur.dst()); - } - return null; + futures.add(linkWorkers.submit(() -> { + int upperBound = Math.min(iCopy + step, idBasedLayoutSingleLinks.size()); + for (int j = iCopy; j < upperBound; j++) { + LinkArgs cur = idBasedLayoutSingleLinks.get(j); + HardLink.createHardLink(cur.src(), cur.dst()); } + return null; })); } linkWorkers.shutdown(); @@ -1334,12 +1327,7 @@ static void linkBlocksHelper(File from, File to, HardLink hl, // from is a directory hl.linkStats.countDirs++; - String[] blockNames = from.list(new java.io.FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith(Block.BLOCK_FILE_PREFIX); - } - }); + String[] blockNames = from.list((dir, name) -> name.startsWith(Block.BLOCK_FILE_PREFIX)); // If we are upgrading to block ID-based layout, we don't want to recreate // any subdirs from the source that contain blocks, since we have a new diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 3a67b76e43467..c90797800e6ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -382,7 +382,7 @@ void restartNotifyPeers() { lock.lock(); try { // interrupt each and every DataXceiver thread. - peers.values().forEach(t -> t.interrupt()); + peers.values().forEach(Thread::interrupt); } finally { lock.unlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 4126140678759..bc00962d5e8f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -527,17 +527,12 @@ private void executePlan() { this.scheduler = Executors.newSingleThreadExecutor(); } - this.future = scheduler.submit(new Runnable() { - @Override - public void run() { - Thread.currentThread().setName("DiskBalancerThread"); - LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}", - planFile, planID); - for (Map.Entry entry : - workMap.entrySet()) { - blockMover.setRunnable(); - blockMover.copyBlocks(entry.getKey(), entry.getValue()); - } + this.future = scheduler.submit(() -> { + Thread.currentThread().setName("DiskBalancerThread"); + LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}", planFile, planID); + for (Map.Entry entry : workMap.entrySet()) { + blockMover.setRunnable(); + blockMover.copyBlocks(entry.getKey(), entry.getValue()); } }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java index da6ae95918d9f..79e0eef15a793 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java @@ -225,13 +225,8 @@ public Set checkAllVolumes( allVolumes.add(reference.getVolume()); Futures.addCallback(olf.get(), new ResultHandler(reference, healthyVolumes, failedVolumes, - numVolumes, new Callback() { - @Override - public void call(Set ignored1, - Set ignored2) { - latch.countDown(); - } - }), MoreExecutors.directExecutor()); + numVolumes, (ignored1, ignored2) -> latch.countDown()), + MoreExecutors.directExecutor()); } else { IOUtils.cleanupWithLogger(null, reference); if (numVolumes.decrementAndGet() == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java index f969c7ade288b..74d548cf7b3bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java @@ -36,7 +36,6 @@ import java.util.Map; import java.util.Optional; import java.util.WeakHashMap; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -135,13 +134,8 @@ public synchronized Optional> schedule( } LOG.info("Scheduling a check for {}", target); - final ListenableFuture lfWithoutTimeout = executorService.submit( - new Callable() { - @Override - public V call() throws Exception { - return target.check(context); - } - }); + final ListenableFuture lfWithoutTimeout = + executorService.submit(() -> target.check(context)); final ListenableFuture lf; if (diskCheckTimeout > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index 54302e3c2561d..545790c7629ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -166,24 +166,19 @@ private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr, Callable readFromBlock(final int length, final CorruptedBlocks corruptedBlocks) { - return new Callable() { - - @Override - public BlockReadStats call() throws Exception { - try { - getReadBuffer().limit(length); - return actualReadFromBlock(); - } catch (ChecksumException e) { - LOG.warn("Found Checksum error for {} from {} at {}", block, - source, e.getPos()); - corruptedBlocks.addCorruptedBlock(block, source); - throw e; - } catch (IOException e) { - LOG.info(e.getMessage()); - throw e; - } finally { - DataNodeFaultInjector.get().interceptBlockReader(); - } + return () -> { + try { + getReadBuffer().limit(length); + return actualReadFromBlock(); + } catch (ChecksumException e) { + LOG.warn("Found Checksum error for {} from {} at {}", block, source, e.getPos()); + corruptedBlocks.addCorruptedBlock(block, source); + throw e; + } catch (IOException e) { + LOG.info(e.getMessage()); + throw e; + } finally { + DataNodeFaultInjector.get().interceptBlockReader(); } }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 23f3602a456c8..425678c7ef2d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -126,13 +126,7 @@ public class BlockPoolSlice { private static ForkJoinPool addReplicaThreadPool = null; private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime .getRuntime().availableProcessors(); - private static final Comparator FILE_COMPARATOR = - new Comparator() { - @Override - public int compare(File f1, File f2) { - return f1.getName().compareTo(f2.getName()); - } - }; + private static final Comparator FILE_COMPARATOR = Comparator.comparing(File::getName); // TODO:FEDERATION scalability issue - a thread per DU is needed private volatile GetSpaceUsed dfsUsage; @@ -228,13 +222,10 @@ public int compare(File f1, File f2) { initializeAddReplicaPool(conf, (FsDatasetImpl) volume.getDataset()); } // Make the dfs usage to be saved during shutdown. - shutdownHook = new Runnable() { - @Override - public void run() { - if (!dfsUsedSaved) { - saveDfsUsed(); - addReplicaThreadPool.shutdownNow(); - } + shutdownHook = () -> { + if (!dfsUsedSaved) { + saveDfsUsed(); + addReplicaThreadPool.shutdownNow(); } }; ShutdownHookManager.get().addShutdownHook(shutdownHook, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index b7686c512963a..bb11ccde4f997 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.FileDescriptor; import java.io.FileNotFoundException; -import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; @@ -90,13 +89,9 @@ static File getMetaFile(File f, long gs) { public static File findMetaFile(final File blockFile) throws IOException { final String prefix = blockFile.getName() + "_"; final File parent = blockFile.getParentFile(); - final File[] matches = parent.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return dir.equals(parent) && name.startsWith(prefix) - && name.endsWith(Block.METADATA_EXTENSION); - } - }); + final File[] matches = parent.listFiles( + (dir, name) -> dir.equals(parent) && name.startsWith(prefix) + && name.endsWith(Block.METADATA_EXTENSION)); if (matches == null || matches.length == 0) { throw new FileNotFoundException( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 262a24bd3aa45..711c0671cfc7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -260,23 +260,22 @@ void getAllVolumesMap(final String bpid, new ConcurrentHashMap(); List replicaAddingThreads = new ArrayList(); for (final FsVolumeImpl v : volumes) { - Thread t = new Thread() { - public void run() { - try (FsVolumeReference ref = v.obtainReference()) { - FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + - bpid + " on volume " + v + "..."); - long startTime = Time.monotonicNow(); - v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap); - long timeTaken = Time.monotonicNow() - startTime; - FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" - + " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); - } catch (IOException ioe) { - FsDatasetImpl.LOG.info("Caught exception while adding replicas " + - "from " + v + ". Will throw later.", ioe); - unhealthyDataDirs.put(v, ioe); - } + Thread t = new Thread(() -> { + try (FsVolumeReference ref = v.obtainReference()) { + FsDatasetImpl.LOG.info("Adding replicas to map for block pool {} on volume {}...", + bpid, v); + long startTime = Time.monotonicNow(); + v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap); + long timeTaken = Time.monotonicNow() - startTime; + FsDatasetImpl.LOG.info( + "Time to add replicas to map for block pool {} on volume {}: {}ms.", + bpid, v, timeTaken); + } catch (IOException ioe) { + FsDatasetImpl.LOG.info( + "Caught exception while adding replicas from {}. Will throw later.", v, ioe); + unhealthyDataDirs.put(v, ioe); } - }; + }); replicaAddingThreads.add(t); t.start(); } @@ -507,23 +506,19 @@ void addBlockPool(final String bpid, final Configuration conf) throws IOExceptio new ConcurrentHashMap(); List blockPoolAddingThreads = new ArrayList(); for (final FsVolumeImpl v : volumes) { - Thread t = new Thread() { - public void run() { - try (FsVolumeReference ref = v.obtainReference()) { - FsDatasetImpl.LOG.info("Scanning block pool " + bpid + - " on volume " + v + "..."); - long startTime = Time.monotonicNow(); - v.addBlockPool(bpid, conf); - long timeTaken = Time.monotonicNow() - startTime; - FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid + - " on " + v + ": " + timeTaken + "ms"); - } catch (IOException ioe) { - FsDatasetImpl.LOG.info("Caught exception while scanning " + v + - ". Will throw later.", ioe); - unhealthyDataDirs.put(v, ioe); - } + Thread t = new Thread(() -> { + try (FsVolumeReference ref = v.obtainReference()) { + FsDatasetImpl.LOG.info("Scanning block pool {} on volume {}...", bpid, v); + long startTime = Time.monotonicNow(); + v.addBlockPool(bpid, conf); + long timeTaken = Time.monotonicNow() - startTime; + FsDatasetImpl.LOG.info("Time taken to scan block pool {} on {}: {}ms", + bpid, v, timeTaken); + } catch (IOException ioe) { + FsDatasetImpl.LOG.info("Caught exception while scanning {}. Will throw later.", v, ioe); + unhealthyDataDirs.put(v, ioe); } - }; + }); blockPoolAddingThreads.add(t); t.start(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 0d42ae99e358e..a78905f126e92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -78,15 +78,10 @@ class RamDiskAsyncLazyPersistService { } private void addExecutorForVolume(final String storageId) { - ThreadFactory threadFactory = new ThreadFactory() { - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(threadGroup, r); - t.setName("Async RamDisk lazy persist worker " + - " for volume with id " + storageId); - return t; - } + ThreadFactory threadFactory = r -> { + Thread t = new Thread(threadGroup, r); + t.setName("Async RamDisk lazy persist worker for volume with id " + storageId); + return t; }; ThreadPoolExecutor executor = new ThreadPoolExecutor( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index 409084cfe8be8..fcd45e4d05239 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -36,9 +36,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -106,69 +104,57 @@ public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs, } private void startDiskOutlierDetectionThread() { - slowDiskDetectionDaemon = new Daemon(new Runnable() { - @Override - public void run() { - while (shouldRun) { - if (dn.getFSDataset() != null) { - Map metadataOpStats = Maps.newHashMap(); - Map readIoStats = Maps.newHashMap(); - Map writeIoStats = Maps.newHashMap(); - FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null; - try { - fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences(); - Iterator volumeIterator = fsVolumeReferences - .iterator(); - while (volumeIterator.hasNext()) { - FsVolumeSpi volume = volumeIterator.next(); - DataNodeVolumeMetrics metrics = volume.getMetrics(); - String volumeName = volume.getBaseURI().getPath(); - - metadataOpStats.put(volumeName, - metrics.getMetadataOperationMean()); - readIoStats.put(volumeName, metrics.getReadIoMean()); - writeIoStats.put(volumeName, metrics.getWriteIoMean()); - } - } finally { - if (fsVolumeReferences != null) { - try { - fsVolumeReferences.close(); - } catch (IOException e) { - LOG.error("Error in releasing FS Volume references", e); - } - } + slowDiskDetectionDaemon = new Daemon(() -> { + while (shouldRun) { + if (dn.getFSDataset() != null) { + Map metadataOpStats = Maps.newHashMap(); + Map readIoStats = Maps.newHashMap(); + Map writeIoStats = Maps.newHashMap(); + FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null; + try { + fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences(); + for (FsVolumeSpi volume : fsVolumeReferences) { + DataNodeVolumeMetrics metrics = volume.getMetrics(); + String volumeName = volume.getBaseURI().getPath(); + metadataOpStats.put(volumeName, metrics.getMetadataOperationMean()); + readIoStats.put(volumeName, metrics.getReadIoMean()); + writeIoStats.put(volumeName, metrics.getWriteIoMean()); } - if (metadataOpStats.isEmpty() && readIoStats.isEmpty() - && writeIoStats.isEmpty()) { - LOG.debug("No disk stats available for detecting outliers."); - continue; + } finally { + if (fsVolumeReferences != null) { + try { + fsVolumeReferences.close(); + } catch (IOException e) { + LOG.error("Error in releasing FS Volume references", e); + } } + } + if (metadataOpStats.isEmpty() && readIoStats.isEmpty() && writeIoStats.isEmpty()) { + LOG.debug("No disk stats available for detecting outliers."); + continue; + } - detectAndUpdateDiskOutliers(metadataOpStats, readIoStats, - writeIoStats); + detectAndUpdateDiskOutliers(metadataOpStats, readIoStats, writeIoStats); - // Sort the slow disks by latency and extract the top n by maxSlowDisksToExclude. - if (maxSlowDisksToExclude > 0) { - ArrayList diskLatencies = new ArrayList<>(); - for (Map.Entry> diskStats : - diskOutliersStats.entrySet()) { - diskLatencies.add(new DiskLatency(diskStats.getKey(), diskStats.getValue())); - } + // Sort the slow disks by latency and extract the top n by maxSlowDisksToExclude. + if (maxSlowDisksToExclude > 0) { + ArrayList diskLatencies = new ArrayList<>(); + for (Map.Entry> diskStats : diskOutliersStats.entrySet()) { + diskLatencies.add(new DiskLatency(diskStats.getKey(), diskStats.getValue())); + } - Collections.sort(diskLatencies, (o1, o2) - -> Double.compare(o2.getMaxLatency(), o1.getMaxLatency())); + diskLatencies.sort((o1, o2) -> Double.compare(o2.getMaxLatency(), o1.getMaxLatency())); - slowDisksToExclude = diskLatencies.stream().limit(maxSlowDisksToExclude) - .map(DiskLatency::getSlowDisk).collect(Collectors.toList()); - } + slowDisksToExclude = diskLatencies.stream().limit(maxSlowDisksToExclude) + .map(DiskLatency::getSlowDisk).collect(Collectors.toList()); } + } - try { - Thread.sleep(detectionInterval); - } catch (InterruptedException e) { - LOG.error("Disk Outlier Detection thread interrupted", e); - Thread.currentThread().interrupt(); - } + try { + Thread.sleep(detectionInterval); + } catch (InterruptedException e) { + LOG.error("Disk Outlier Detection thread interrupted", e); + Thread.currentThread().interrupt(); } } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java index bb2757287fbe0..dc06ea6dfea79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java @@ -180,19 +180,15 @@ protected void initChannel(SocketChannel ch) throws Exception { if (externalHttpChannel == null) { httpServer.channel(NioServerSocketChannel.class); } else { - httpServer.channelFactory(new ChannelFactory() { - @Override - public NioServerSocketChannel newChannel() { - return new NioServerSocketChannel(externalHttpChannel) { + httpServer.channelFactory((ChannelFactory) () + -> new NioServerSocketChannel(externalHttpChannel) { // The channel has been bounded externally via JSVC, // thus bind() becomes a no-op. @Override - protected void doBind(SocketAddress localAddress) - throws Exception { + protected void doBind(SocketAddress localAddress) { + } - }; - } - }); + }); } } else { this.httpServer = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java index ef8d90a4c4e6c..8c873d91865a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java @@ -75,17 +75,15 @@ public void channelInactive(ChannelHandlerContext ctx) { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) { - client.writeAndFlush(msg).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (future.isSuccess()) { - ctx.channel().read(); - } else { - LOG.debug("Proxy failed. Cause: ", future.cause()); - future.channel().close(); - } - } - }); + client.writeAndFlush(msg).addListener( + (ChannelFutureListener) future -> { + if (future.isSuccess()) { + ctx.channel().read(); + } else { + LOG.debug("Proxy failed. Cause: ", future.cause()); + future.channel().close(); + } + }); } @Override @@ -112,23 +110,19 @@ protected void initChannel(SocketChannel ch) throws Exception { }); ChannelFuture f = proxiedServer.connect(host); proxiedChannel = f.channel(); - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - ctx.channel().pipeline().remove(HttpResponseEncoder.class); - HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1, req.method(), req.uri()); - newReq.headers().add(req.headers()); - newReq.headers().set(CONNECTION, HttpHeaderValues.CLOSE); - future.channel().writeAndFlush(newReq); - } else { - DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, - INTERNAL_SERVER_ERROR); - resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE); - LOG.info("Proxy " + uri + " failed. Cause: ", future.cause()); - ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); - client.close(); - } + f.addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + ctx.channel().pipeline().remove(HttpResponseEncoder.class); + HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1, req.method(), req.uri()); + newReq.headers().add(req.headers()); + newReq.headers().set(CONNECTION, HttpHeaderValues.CLOSE); + future.channel().writeAndFlush(newReq); + } else { + DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR); + resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE); + LOG.info("Proxy {} failed. Cause: ", uri, future.cause()); + ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); + client.close(); } }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java index 93e24201b6173..a0ef7fc8ab247 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java @@ -30,7 +30,6 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -78,13 +77,7 @@ UserGroupInformation ugi() throws IOException { // This makes it possible to access the data stored in secure DataNode // through insecure Namenode. if (UserGroupInformation.isSecurityEnabled() && token != null) { - ugi = ugiCache.get(buildTokenCacheKey(token), - new Callable() { - @Override - public UserGroupInformation call() throws Exception { - return tokenUGI(token); - } - }); + ugi = ugiCache.get(buildTokenCacheKey(token), () -> tokenUGI(token)); } else { final String usernameFromQuery = params.userName(); final String doAsUserFromQuery = params.doAsUser(); @@ -92,15 +85,8 @@ public UserGroupInformation call() throws Exception { .getDefaultWebUserName(params.conf()) // not specified in request : usernameFromQuery; - ugi = ugiCache.get( - buildNonTokenCacheKey(doAsUserFromQuery, remoteUser), - new Callable() { - @Override - public UserGroupInformation call() throws Exception { - return nonTokenUGI(usernameFromQuery, doAsUserFromQuery, - remoteUser); - } - }); + ugi = ugiCache.get(buildNonTokenCacheKey(doAsUserFromQuery, remoteUser), + () -> nonTokenUGI(usernameFromQuery, doAsUserFromQuery, remoteUser)); } } catch (ExecutionException e) { Throwable cause = e.getCause(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java index 8de736ac0e237..d08dbfb772de5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java @@ -130,25 +130,21 @@ public void channelRead0(final ChannelHandlerContext ctx, path = params.path(); injectToken(); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { + ugi.doAs((PrivilegedExceptionAction) () -> { + try { + handle(ctx, req); + } finally { + String host; try { - handle(ctx, req); - } finally { - String host = null; - try { - host = ((InetSocketAddress)ctx.channel().remoteAddress()). - getAddress().getHostAddress(); - } catch (Exception e) { - LOG.warn("Error retrieving hostname: ", e); - host = "unknown"; - } - REQLOG.info(host + " " + req.method() + " " + req.uri() + " " + - getResponseCode()); + host = ((InetSocketAddress) ctx.channel().remoteAddress()) + .getAddress().getHostAddress(); + } catch (Exception e) { + LOG.warn("Error retrieving hostname: ", e); + host = "unknown"; } - return null; + REQLOG.info(host + " " + req.method() + " " + req.uri() + " " + getResponseCode()); } + return null; }); }