Skip to content

Commit

Permalink
HDFS-16898. Make write lock fine-grain in method processCommandFromActor
Browse files Browse the repository at this point in the history
  • Loading branch information
admin authored and admin committed Feb 16, 2023
1 parent 8c9c68c commit 106f85a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -678,15 +678,20 @@ boolean processCommandFromActor(DatanodeCommand cmd,
actor.reRegister();
return false;
}
writeLock();
boolean isActiveActor;
InetSocketAddress nnSocketAddress;
readLock();
try {
if (actor == bpServiceToActive) {
return processCommandFromActive(cmd, actor);
} else {
return processCommandFromStandby(cmd, actor);
}
isActiveActor = (actor == bpServiceToActive);
nnSocketAddress = actor.getNNSocketAddress();
} finally {
writeUnlock();
readUnlock();
}

if (isActiveActor) {
return processCommandFromActive(cmd, nnSocketAddress);
} else {
return processCommandFromStandby(cmd, nnSocketAddress);
}
}

Expand Down Expand Up @@ -714,7 +719,7 @@ private String blockIdArrayToString(long ids[]) {
* @throws IOException
*/
private boolean processCommandFromActive(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
InetSocketAddress nnSocketAddress) throws IOException {
final BlockCommand bcmd =
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
final BlockIdCommand blockIdCmd =
Expand Down Expand Up @@ -768,7 +773,7 @@ assert getBlockPoolId().equals(bp) :
dn.finalizeUpgradeForPool(bp);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
String who = "NameNode at " + actor.getNNSocketAddress();
String who = "NameNode at " + nnSocketAddress;
dn.getBlockRecoveryWorker().recoverBlocks(who,
((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
Expand Down Expand Up @@ -810,10 +815,11 @@ assert getBlockPoolId().equals(bp) :
* DNA_REGISTER which should be handled earlier itself.
*/
private boolean processCommandFromStandby(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
InetSocketAddress nnSocketAddress) throws IOException {
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE");
LOG.info("DatanodeCommand action from standby NN {}: DNA_ACCESSKEYUPDATE",
nnSocketAddress);
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(
getBlockPoolId(),
Expand All @@ -829,10 +835,12 @@ private boolean processCommandFromStandby(DatanodeCommand cmd,
case DatanodeProtocol.DNA_CACHE:
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
LOG.warn("Got a command from standby NN {} - ignoring command: {}",
nnSocketAddress, cmd.getAction());
break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
LOG.warn("Unknown DatanodeCommand action: {} from standby NN {}",
cmd.getAction(), nnSocketAddress);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ private boolean processCommand(DatanodeCommand[] cmds) {
dn.getMetrics().addNumProcessedCommands(processCommandsMs);
}
if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) {
LOG.info("Took {} ms to process {} commands from NN",
LOG.warn("Took {} ms to process {} commands from NN",
processCommandsMs, cmds.length);
}
}
Expand Down

0 comments on commit 106f85a

Please sign in to comment.