Skip to content

Commit

Permalink
HDFS-16718. Improve Code with Lambda in org.apahce.hadoop.hdfs.server…
Browse files Browse the repository at this point in the history
….datanode packages
  • Loading branch information
zengqiang.xu committed Aug 6, 2022
1 parent dbf73e1 commit 7eea2d3
Show file tree
Hide file tree
Showing 20 changed files with 196 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1073,12 +1073,7 @@ public void start() {
formatThreadName("lifeline", lifelineNnAddr));
lifelineThread.setDaemon(true);
lifelineThread.setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread thread, Throwable t) {
LOG.error(thread + " terminating on unexpected exception", t);
}
});
(thread, t) -> LOG.error("{} terminating on unexpected exception", thread, t));
lifelineThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,11 @@ void shutDownAll(List<BPOfferService> bposList) throws InterruptedException {
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
for (BPOfferService bpos : offerServices) {
bpos.start();
}
return null;
(PrivilegedExceptionAction<Object>) () -> {
for (BPOfferService bpos : offerServices) {
bpos.start();
}
return null;
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
Expand Down Expand Up @@ -274,13 +271,10 @@ private void doRefreshNamenodes(
nnIds.add(nnId);
}
try {
UserGroupInformation.getLoginUser()
.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs);
return null;
}
UserGroupInformation.getLoginUser().doAs(
(PrivilegedExceptionAction<Object>) () -> {
bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs);
return null;
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,9 @@ private void doUpgrade(final StorageDirectory bpSd,
if (callables == null) {
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
} else {
callables.add(new Callable<StorageDirectory>() {
@Override
public StorageDirectory call() throws Exception {
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV,
conf);
return bpSd;
}
callables.add(() -> {
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
return bpSd;
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,26 +598,23 @@ DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(

public Daemon recoverBlocks(final String who,
final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(datanode.threadGroup, new Runnable() {
@Override
public void run() {
datanode.metrics.incrDataNodeBlockRecoveryWorkerCount();
try {
for (RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
if (b.isStriped()) {
new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
} else {
new RecoveryTaskContiguous(b).recover();
}
} catch (IOException e) {
LOG.warn("recover Block: {} FAILED: {}", b, e);
Daemon d = new Daemon(datanode.threadGroup, () -> {
datanode.metrics.incrDataNodeBlockRecoveryWorkerCount();
try {
for (RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
if (b.isStriped()) {
new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
} else {
new RecoveryTaskContiguous(b).recover();
}
} catch (IOException e) {
LOG.warn("recover Block: {} FAILED: {}", b, e);
}
} finally {
datanode.metrics.decrDataNodeBlockRecoveryWorkerCount();
}
} finally {
datanode.metrics.decrDataNodeBlockRecoveryWorkerCount();
}
});
d.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -202,7 +201,6 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
Expand Down Expand Up @@ -1176,16 +1174,13 @@ private void refreshVolumes(String newVolumes) throws IOException {

Preconditions.checkNotNull(data, "Storage not yet initialized");
for (final StorageLocation location : changedVolumes.newLocations) {
exceptions.add(service.submit(new Callable<IOException>() {
@Override
public IOException call() {
try {
data.addVolume(location, nsInfos);
} catch (IOException e) {
return e;
}
return null;
exceptions.add(service.submit(() -> {
try {
data.addVolume(location, nsInfos);
} catch (IOException e) {
return e;
}
return null;
}));
}

Expand Down Expand Up @@ -2242,14 +2237,10 @@ public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
dnAddr, addr);
final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
try {
return loginUgi
.doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
@Override
public InterDatanodeProtocol run() throws IOException {
return new InterDatanodeProtocolTranslatorPB(addr, loginUgi,
conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
});
return loginUgi.doAs(
(PrivilegedExceptionAction<InterDatanodeProtocol>) () ->
new InterDatanodeProtocolTranslatorPB(addr, loginUgi, conf,
NetUtils.getDefaultSocketFactory(conf), socketTimeout));
} catch (InterruptedException ie) {
throw new IOException(ie.getMessage());
}
Expand Down Expand Up @@ -3029,14 +3020,8 @@ public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
*/
public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
final ExtendedBlock block) {
return new DataEncryptionKeyFactory() {
@Override
public DataEncryptionKey newDataEncryptionKey() {
return dnConf.encryptDataTransfer ?
blockPoolTokenSecretManager.generateDataEncryptionKey(
block.getBlockPoolId()) : null;
}
};
return () -> dnConf.encryptDataTransfer ?
blockPoolTokenSecretManager.generateDataEncryptionKey(block.getBlockPoolId()) : null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,12 +839,9 @@ void doUpgradePreFederation(final StorageDirectory sd,
if (callables == null) {
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
} else {
callables.add(new Callable<StorageDirectory>() {
@Override
public StorageDirectory call() throws Exception {
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
return sd;
}
callables.add(() -> {
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
return sd;
});
}
}
Expand Down Expand Up @@ -1138,17 +1135,13 @@ private static void linkBlocks(File from, File to, int oldLV,
List<Future<Void>> futures = Lists.newArrayList();
for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
final int iCopy = i;
futures.add(linkWorkers.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
int upperBound = Math.min(iCopy + step,
idBasedLayoutSingleLinks.size());
for (int j = iCopy; j < upperBound; j++) {
LinkArgs cur = idBasedLayoutSingleLinks.get(j);
HardLink.createHardLink(cur.src(), cur.dst());
}
return null;
futures.add(linkWorkers.submit(() -> {
int upperBound = Math.min(iCopy + step, idBasedLayoutSingleLinks.size());
for (int j = iCopy; j < upperBound; j++) {
LinkArgs cur = idBasedLayoutSingleLinks.get(j);
HardLink.createHardLink(cur.src(), cur.dst());
}
return null;
}));
}
linkWorkers.shutdown();
Expand Down Expand Up @@ -1334,12 +1327,7 @@ static void linkBlocksHelper(File from, File to, HardLink hl,
// from is a directory
hl.linkStats.countDirs++;

String[] blockNames = from.list(new java.io.FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith(Block.BLOCK_FILE_PREFIX);
}
});
String[] blockNames = from.list((dir, name) -> name.startsWith(Block.BLOCK_FILE_PREFIX));

// If we are upgrading to block ID-based layout, we don't want to recreate
// any subdirs from the source that contain blocks, since we have a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ void restartNotifyPeers() {
lock.lock();
try {
// interrupt each and every DataXceiver thread.
peers.values().forEach(t -> t.interrupt());
peers.values().forEach(Thread::interrupt);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,17 +527,12 @@ private void executePlan() {
this.scheduler = Executors.newSingleThreadExecutor();
}

this.future = scheduler.submit(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("DiskBalancerThread");
LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
planFile, planID);
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
workMap.entrySet()) {
blockMover.setRunnable();
blockMover.copyBlocks(entry.getKey(), entry.getValue());
}
this.future = scheduler.submit(() -> {
Thread.currentThread().setName("DiskBalancerThread");
LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}", planFile, planID);
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry : workMap.entrySet()) {
blockMover.setRunnable();
blockMover.copyBlocks(entry.getKey(), entry.getValue());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,8 @@ public Set<FsVolumeSpi> checkAllVolumes(
allVolumes.add(reference.getVolume());
Futures.addCallback(olf.get(),
new ResultHandler(reference, healthyVolumes, failedVolumes,
numVolumes, new Callback() {
@Override
public void call(Set<FsVolumeSpi> ignored1,
Set<FsVolumeSpi> ignored2) {
latch.countDown();
}
}), MoreExecutors.directExecutor());
numVolumes, (ignored1, ignored2) -> latch.countDown()),
MoreExecutors.directExecutor());
} else {
IOUtils.cleanupWithLogger(null, reference);
if (numVolumes.decrementAndGet() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -135,13 +134,8 @@ public synchronized Optional<ListenableFuture<V>> schedule(
}

LOG.info("Scheduling a check for {}", target);
final ListenableFuture<V> lfWithoutTimeout = executorService.submit(
new Callable<V>() {
@Override
public V call() throws Exception {
return target.check(context);
}
});
final ListenableFuture<V> lfWithoutTimeout =
executorService.submit(() -> target.check(context));
final ListenableFuture<V> lf;

if (diskCheckTimeout > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,24 +166,19 @@ private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,

Callable<BlockReadStats> readFromBlock(final int length,
final CorruptedBlocks corruptedBlocks) {
return new Callable<BlockReadStats>() {

@Override
public BlockReadStats call() throws Exception {
try {
getReadBuffer().limit(length);
return actualReadFromBlock();
} catch (ChecksumException e) {
LOG.warn("Found Checksum error for {} from {} at {}", block,
source, e.getPos());
corruptedBlocks.addCorruptedBlock(block, source);
throw e;
} catch (IOException e) {
LOG.info(e.getMessage());
throw e;
} finally {
DataNodeFaultInjector.get().interceptBlockReader();
}
return () -> {
try {
getReadBuffer().limit(length);
return actualReadFromBlock();
} catch (ChecksumException e) {
LOG.warn("Found Checksum error for {} from {} at {}", block, source, e.getPos());
corruptedBlocks.addCorruptedBlock(block, source);
throw e;
} catch (IOException e) {
LOG.info(e.getMessage());
throw e;
} finally {
DataNodeFaultInjector.get().interceptBlockReader();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,7 @@ public class BlockPoolSlice {
private static ForkJoinPool addReplicaThreadPool = null;
private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
.getRuntime().availableProcessors();
private static final Comparator<File> FILE_COMPARATOR =
new Comparator<File>() {
@Override
public int compare(File f1, File f2) {
return f1.getName().compareTo(f2.getName());
}
};
private static final Comparator<File> FILE_COMPARATOR = Comparator.comparing(File::getName);

// TODO:FEDERATION scalability issue - a thread per DU is needed
private volatile GetSpaceUsed dfsUsage;
Expand Down Expand Up @@ -228,13 +222,10 @@ public int compare(File f1, File f2) {
initializeAddReplicaPool(conf, (FsDatasetImpl) volume.getDataset());
}
// Make the dfs usage to be saved during shutdown.
shutdownHook = new Runnable() {
@Override
public void run() {
if (!dfsUsedSaved) {
saveDfsUsed();
addReplicaThreadPool.shutdownNow();
}
shutdownHook = () -> {
if (!dfsUsedSaved) {
saveDfsUsed();
addReplicaThreadPool.shutdownNow();
}
};
ShutdownHookManager.get().addShutdownHook(shutdownHook,
Expand Down
Loading

0 comments on commit 7eea2d3

Please sign in to comment.