Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16898. Remove write lock for processCommandFromActor of DataNode to reduce impact on heartbeat #5330

Merged
merged 1 commit into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -679,15 +679,20 @@ boolean processCommandFromActor(DatanodeCommand cmd,
actor.reRegister();
return false;
}
writeLock();
boolean isActiveActor;
InetSocketAddress nnSocketAddress;
readLock();
try {
if (actor == bpServiceToActive) {
return processCommandFromActive(cmd, actor);
} else {
return processCommandFromStandby(cmd, actor);
}
isActiveActor = (actor == bpServiceToActive);
nnSocketAddress = actor.getNNSocketAddress();
} finally {
writeUnlock();
readUnlock();
}

if (isActiveActor) {
return processCommandFromActive(cmd, nnSocketAddress);
} else {
return processCommandFromStandby(cmd, nnSocketAddress);
}
}

Expand Down Expand Up @@ -715,7 +720,7 @@ private String blockIdArrayToString(long ids[]) {
* @throws IOException
*/
private boolean processCommandFromActive(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
InetSocketAddress nnSocketAddress) throws IOException {
final BlockCommand bcmd =
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
final BlockIdCommand blockIdCmd =
Expand Down Expand Up @@ -768,7 +773,7 @@ assert getBlockPoolId().equals(bp) :
dn.finalizeUpgradeForPool(bp);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
String who = "NameNode at " + actor.getNNSocketAddress();
String who = "NameNode at " + nnSocketAddress;
dn.getBlockRecoveryWorker().recoverBlocks(who,
((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
Expand Down Expand Up @@ -810,11 +815,11 @@ assert getBlockPoolId().equals(bp) :
* DNA_REGISTER which should be handled earlier itself.
*/
private boolean processCommandFromStandby(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
InetSocketAddress nnSocketAddress) throws IOException {
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action from standby NN {}: DNA_ACCESSKEYUPDATE",
actor.getNNSocketAddress());
nnSocketAddress);
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(
getBlockPoolId(),
Expand All @@ -831,11 +836,11 @@ private boolean processCommandFromStandby(DatanodeCommand cmd,
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
LOG.warn("Got a command from standby NN {} - ignoring command: {}",
actor.getNNSocketAddress(), cmd.getAction());
nnSocketAddress, cmd.getAction());
break;
default:
LOG.warn("Unknown DatanodeCommand action: {} from standby NN {}",
cmd.getAction(), actor.getNNSocketAddress());
cmd.getAction(), nnSocketAddress);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ private boolean processCommand(DatanodeCommand[] cmds) {
dn.getMetrics().addNumProcessedCommands(processCommandsMs);
}
if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) {
LOG.info("Took {} ms to process {} commands from NN",
LOG.warn("Took {} ms to process {} commands from NN",
processCommandsMs, cmds.length);
}
}
Expand Down