Skip to content

Commit

Permalink
HDFS-16016. BPServiceActor to provide new thread to handle IBR (apach…
Browse files Browse the repository at this point in the history
…e#2998)

Contributed by Viraj Jasani
  • Loading branch information
virajjasani authored Jun 14, 2021
1 parent e31d060 commit c1bf3cb
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -66,6 +68,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
Expand Down Expand Up @@ -94,6 +97,8 @@ class BPServiceActor implements Runnable {

volatile long lastCacheReport = 0;
private final Scheduler scheduler;
private final Object sendIBRLock;
private final ExecutorService ibrExecutorService;

Thread bpThread;
DatanodeProtocolClientSideTranslatorPB bpNamenode;
Expand Down Expand Up @@ -149,6 +154,10 @@ enum RunningState {
}
commandProcessingThread = new CommandProcessingThread(this);
commandProcessingThread.start();
sendIBRLock = new Object();
ibrExecutorService = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ibr-executor-%d").build());
}

public DatanodeRegistration getBpRegistration() {
Expand Down Expand Up @@ -368,8 +377,10 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
// we have a chance that we will miss the delHint information
// or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one.
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), getRpcMetricSuffix());
synchronized (sendIBRLock) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), getRpcMetricSuffix());
}

long brCreateStartTime = monotonicNow();
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
Expand Down Expand Up @@ -600,6 +611,9 @@ void stop() {
if (commandProcessingThread != null) {
commandProcessingThread.interrupt();
}
if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) {
ibrExecutorService.shutdownNow();
}
}

//This must be called only by blockPoolManager
Expand All @@ -614,13 +628,18 @@ void join() {
} catch (InterruptedException ie) { }
}

//Cleanup method to be called by current thread before exiting.
// Cleanup method to be called by current thread before exiting.
// Any Thread / ExecutorService started by BPServiceActor can be shutdown
// here.
private synchronized void cleanUp() {

shouldServiceRun = false;
IOUtils.cleanupWithLogger(null, bpNamenode);
IOUtils.cleanupWithLogger(null, lifelineSender);
bpos.shutdownActor(this);
if (!ibrExecutorService.isShutdown()) {
ibrExecutorService.shutdownNow();
}
}

private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
Expand Down Expand Up @@ -706,11 +725,6 @@ private void offerService() throws Exception {
commandProcessingThread.enqueue(resp.getCommands());
}
}
if (!dn.areIBRDisabledForTests() &&
(ibrManager.sendImmediately()|| sendHeartbeat)) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), getRpcMetricSuffix());
}

List<DatanodeCommand> cmds = null;
boolean forceFullBr =
Expand Down Expand Up @@ -874,6 +888,10 @@ public void run() {
initialRegistrationComplete.countDown();
}

// IBR tasks to be handled separately from offerService() in order to
// improve performance of offerService(), which can now focus only on
// FBR and heartbeat.
ibrExecutorService.submit(new IBRTaskHandler());
while (shouldRun()) {
try {
offerService();
Expand Down Expand Up @@ -1104,6 +1122,34 @@ private void sendLifeline() throws IOException {
}
}

class IBRTaskHandler implements Runnable {

@Override
public void run() {
LOG.info("Starting IBR Task Handler.");
while (shouldRun()) {
try {
final long startTime = scheduler.monotonicNow();
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
if (!dn.areIBRDisabledForTests() &&
(ibrManager.sendImmediately() || sendHeartbeat)) {
synchronized (sendIBRLock) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), getRpcMetricSuffix());
}
}
// There is no work to do; sleep until heartbeat timer elapses,
// or work arrives, and then iterate again.
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
} catch (Throwable t) {
LOG.error("Exception in IBRTaskHandler.", t);
sleepAndLogInterrupts(5000, "offering IBR service");
}
}
}

}

/**
* Utility class that wraps the timestamp computations for scheduling
* heartbeats and block reports.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,19 @@ public void testDatanodeReportMissingBlock() throws Exception {
// all bad datanodes
}
cluster.triggerHeartbeats(); // IBR delete ack
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
assertEquals(0, lb.getLocations().length);
int retries = 0;
while (true) {
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
if (0 != lb.getLocations().length) {
retries++;
if (retries > 7) {
Assert.fail("getLocatedBlocks failed after 7 retries");
}
Thread.sleep(2000);
} else {
break;
}
}
} finally {
cluster.shutdown();
}
Expand Down Expand Up @@ -223,4 +234,4 @@ static DataNode findDatanode(String id, List<DataNode> datanodes) {
throw new IllegalStateException("Datnode " + id + " not in datanode list: "
+ datanodes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;

import org.mockito.exceptions.base.MockitoAssertionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -156,7 +157,7 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {

// Sleep for a very short time since IBR is generated
// asynchronously.
Thread.sleep(2000);
Thread.sleep(1000);

// Ensure that no block report was generated immediately.
// Deleted blocks are reported when the IBR timer elapses.
Expand All @@ -167,13 +168,24 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {

// Trigger a heartbeat, this also triggers an IBR.
DataNodeTestUtils.triggerHeartbeat(singletonDn);
Thread.sleep(2000);

// Ensure that the deleted block is reported.
Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
int retries = 0;
while (true) {
try {
Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
break;
} catch (MockitoAssertionError e) {
if (retries > 7) {
throw e;
}
retries++;
Thread.sleep(2000);
}
}

} finally {
cluster.shutdown();
Expand Down

0 comments on commit c1bf3cb

Please sign in to comment.