diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index d3212b6384eaa..2d24c0f680e1b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -34,6 +34,8 @@ import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,6 +68,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionUtil; @@ -94,6 +97,8 @@ class BPServiceActor implements Runnable { volatile long lastCacheReport = 0; private final Scheduler scheduler; + private final Object sendIBRLock; + private final ExecutorService ibrExecutorService; Thread bpThread; DatanodeProtocolClientSideTranslatorPB bpNamenode; @@ -149,6 +154,10 @@ enum RunningState { } commandProcessingThread = new CommandProcessingThread(this); commandProcessingThread.start(); + sendIBRLock = new Object(); + ibrExecutorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ibr-executor-%d").build()); } public DatanodeRegistration getBpRegistration() { @@ -368,8 +377,10 @@ List blockReport(long fullBrLeaseId) throws IOException { // we have a chance that we will miss the delHint information // or we will report an RBW replica after the BlockReport already reports // a FINALIZED one. - ibrManager.sendIBRs(bpNamenode, bpRegistration, - bpos.getBlockPoolId(), getRpcMetricSuffix()); + synchronized (sendIBRLock) { + ibrManager.sendIBRs(bpNamenode, bpRegistration, + bpos.getBlockPoolId(), getRpcMetricSuffix()); + } long brCreateStartTime = monotonicNow(); Map perVolumeBlockLists = @@ -600,6 +611,9 @@ void stop() { if (commandProcessingThread != null) { commandProcessingThread.interrupt(); } + if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) { + ibrExecutorService.shutdownNow(); + } } //This must be called only by blockPoolManager @@ -614,13 +628,18 @@ void join() { } catch (InterruptedException ie) { } } - //Cleanup method to be called by current thread before exiting. + // Cleanup method to be called by current thread before exiting. + // Any Thread / ExecutorService started by BPServiceActor can be shutdown + // here. private synchronized void cleanUp() { shouldServiceRun = false; IOUtils.cleanupWithLogger(null, bpNamenode); IOUtils.cleanupWithLogger(null, lifelineSender); bpos.shutdownActor(this); + if (!ibrExecutorService.isShutdown()) { + ibrExecutorService.shutdownNow(); + } } private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException { @@ -706,11 +725,6 @@ private void offerService() throws Exception { commandProcessingThread.enqueue(resp.getCommands()); } } - if (!dn.areIBRDisabledForTests() && - (ibrManager.sendImmediately()|| sendHeartbeat)) { - ibrManager.sendIBRs(bpNamenode, bpRegistration, - bpos.getBlockPoolId(), getRpcMetricSuffix()); - } List cmds = null; boolean forceFullBr = @@ -874,6 +888,10 @@ public void run() { initialRegistrationComplete.countDown(); } + // IBR tasks to be handled separately from offerService() in order to + // improve performance of offerService(), which can now focus only on + // FBR and heartbeat. + ibrExecutorService.submit(new IBRTaskHandler()); while (shouldRun()) { try { offerService(); @@ -1104,6 +1122,34 @@ private void sendLifeline() throws IOException { } } + class IBRTaskHandler implements Runnable { + + @Override + public void run() { + LOG.info("Starting IBR Task Handler."); + while (shouldRun()) { + try { + final long startTime = scheduler.monotonicNow(); + final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime); + if (!dn.areIBRDisabledForTests() && + (ibrManager.sendImmediately() || sendHeartbeat)) { + synchronized (sendIBRLock) { + ibrManager.sendIBRs(bpNamenode, bpRegistration, + bpos.getBlockPoolId(), getRpcMetricSuffix()); + } + } + // There is no work to do; sleep until heartbeat timer elapses, + // or work arrives, and then iterate again. + ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime()); + } catch (Throwable t) { + LOG.error("Exception in IBRTaskHandler.", t); + sleepAndLogInterrupts(5000, "offering IBR service"); + } + } + } + + } + /** * Utility class that wraps the timestamp computations for scheduling * heartbeats and block reports. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java index 69dbf6438af93..de738eef177a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java @@ -172,8 +172,19 @@ public void testDatanodeReportMissingBlock() throws Exception { // all bad datanodes } cluster.triggerHeartbeats(); // IBR delete ack - lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0); - assertEquals(0, lb.getLocations().length); + int retries = 0; + while (true) { + lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0); + if (0 != lb.getLocations().length) { + retries++; + if (retries > 7) { + Assert.fail("getLocatedBlocks failed after 7 retries"); + } + Thread.sleep(2000); + } else { + break; + } + } } finally { cluster.shutdown(); } @@ -223,4 +234,4 @@ static DataNode findDatanode(String id, List datanodes) { throw new IllegalStateException("Datnode " + id + " not in datanode list: " + datanodes); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java index 4221ecaf2a064..e848cbfb37ffb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -25,6 +25,7 @@ import java.io.IOException; +import org.mockito.exceptions.base.MockitoAssertionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -156,7 +157,7 @@ public void testReportBlockDeleted() throws InterruptedException, IOException { // Sleep for a very short time since IBR is generated // asynchronously. - Thread.sleep(2000); + Thread.sleep(1000); // Ensure that no block report was generated immediately. // Deleted blocks are reported when the IBR timer elapses. @@ -167,13 +168,24 @@ public void testReportBlockDeleted() throws InterruptedException, IOException { // Trigger a heartbeat, this also triggers an IBR. DataNodeTestUtils.triggerHeartbeat(singletonDn); - Thread.sleep(2000); // Ensure that the deleted block is reported. - Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( - any(DatanodeRegistration.class), - anyString(), - any(StorageReceivedDeletedBlocks[].class)); + int retries = 0; + while (true) { + try { + Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + break; + } catch (MockitoAssertionError e) { + if (retries > 7) { + throw e; + } + retries++; + Thread.sleep(2000); + } + } } finally { cluster.shutdown();