Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16705. RBF: Support healthMonitor timeout configurable and cache NN and client proxy in NamenodeHeartbeatService #4662

Merged
merged 1 commit into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;

Expand All @@ -25,6 +27,7 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
Expand Down Expand Up @@ -85,6 +88,10 @@ public class NamenodeHeartbeatService extends PeriodicService {
private NNHAServiceTarget localTarget;
/** Cache HA protocol. */
private HAServiceProtocol localTargetHAProtocol;
/** Cache NN protocol. */
private NamenodeProtocol namenodeProtocol;
/** Cache Client protocol. */
private ClientProtocol clientProtocol;
/** RPC address for the namenode. */
private String rpcAddress;
/** Service RPC address for the namenode. */
Expand All @@ -100,6 +107,9 @@ public class NamenodeHeartbeatService extends PeriodicService {

private String resolvedHost;
private String originalNnId;

private int healthMonitorTimeoutMs = (int) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;

/**
* Create a new Namenode status updater.
* @param resolver Namenode resolver service to handle NN registration.
Expand Down Expand Up @@ -211,6 +221,15 @@ protected void serviceInit(Configuration configuration) throws Exception {
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));

long timeoutMs = conf.getTimeDuration(DFS_ROUTER_HEALTH_MONITOR_TIMEOUT,
DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
if (timeoutMs < 0) {
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
"Using value of : 0ms instead.", timeoutMs, DFS_ROUTER_HEALTH_MONITOR_TIMEOUT);
this.healthMonitorTimeoutMs = 0;
} else {
this.healthMonitorTimeoutMs = (int) timeoutMs;
}

super.serviceInit(configuration);
}
Expand Down Expand Up @@ -309,66 +328,26 @@ protected NamenodeStatusReport getNamenodeStatusReport() {
LOG.debug("Probing NN at service address: {}", serviceAddress);

URI serviceURI = new URI("hdfs://" + serviceAddress);
// Read the filesystem info from RPC (required)
NamenodeProtocol nn = NameNodeProxies
.createProxy(this.conf, serviceURI, NamenodeProtocol.class)
.getProxy();

if (nn != null) {
NamespaceInfo info = nn.versionRequest();
if (info != null) {
report.setNamespaceInfo(info);
}
}
// Read the filesystem info from RPC (required)
updateNameSpaceInfoParameters(serviceURI, report);
if (!report.registrationValid()) {
return report;
}

// Check for safemode from the client protocol. Currently optional, but
// should be required at some point for QoS
try {
ClientProtocol client = NameNodeProxies
.createProxy(this.conf, serviceURI, ClientProtocol.class)
.getProxy();
if (client != null) {
boolean isSafeMode = client.setSafeMode(
SafeModeAction.SAFEMODE_GET, false);
report.setSafeMode(isSafeMode);
}
} catch (Exception e) {
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
}
updateSafeModeParameters(serviceURI, report);

// Read the stats from JMX (optional)
updateJMXParameters(webAddress, report);

if (localTarget != null) {
// Try to get the HA status
try {
// Determine if NN is active
// TODO: dynamic timeout
if (localTargetHAProtocol == null) {
localTargetHAProtocol = localTarget.getHealthMonitorProxy(conf, 30*1000);
LOG.debug("Get HA status with address {}", lifelineAddress);
}
HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
report.setHAServiceState(status.getState());
} catch (Throwable e) {
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
LOG.error("HA for {} is not enabled", getNamenodeDesc());
localTarget = null;
} else {
// Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}: {}",
getNamenodeDesc(), e.getMessage(), e);
}
localTargetHAProtocol = null;
}
}
} catch(IOException e) {
// Try to get the HA status
updateHAStatusParameters(report);
} catch (IOException e) {
LOG.error("Cannot communicate with {}: {}",
getNamenodeDesc(), e.getMessage());
} catch(Throwable e) {
} catch (Throwable e) {
// Generic error that we don't know about
LOG.error("Unexpected exception while communicating with {}: {}",
getNamenodeDesc(), e.getMessage(), e);
Expand Down Expand Up @@ -399,6 +378,59 @@ private static String getNnHeartBeatServiceName(String nsId, String nnId) {
(nnId == null ? "" : " " + nnId);
}

/**
* Get the namespace information for a Namenode via RPC and add them to the report.
* @param serviceURI Server address of the Namenode to monitor.
* @param report Namenode status report updating with namespace information data.
* @throws IOException This method will throw IOException up, because RBF need
* use Namespace Info to identify this NS. If there are some IOExceptions,
* RBF doesn't need to get other information from NameNode,
* so throw IOException up.
*/
private void updateNameSpaceInfoParameters(URI serviceURI,
NamenodeStatusReport report) throws IOException {
try {
if (this.namenodeProtocol == null) {
this.namenodeProtocol = NameNodeProxies.createProxy(this.conf, serviceURI,
NamenodeProtocol.class).getProxy();
}
if (namenodeProtocol != null) {
NamespaceInfo info = namenodeProtocol.versionRequest();
if (info != null) {
report.setNamespaceInfo(info);
}
}
} catch (IOException e) {
this.namenodeProtocol = null;
throw e;
}
}

/**
* Get the safemode information for a Namenode via RPC and add them to the report.
* Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
* So If there are some IOExceptions, RBF can just ignore it and try to collect
* other information form namenode continue.
* @param serviceURI Server address of the Namenode to monitor.
* @param report Namenode status report updating with safemode information data.
*/
private void updateSafeModeParameters(URI serviceURI, NamenodeStatusReport report) {
try {
if (this.clientProtocol == null) {
this.clientProtocol = NameNodeProxies
.createProxy(this.conf, serviceURI, ClientProtocol.class)
.getProxy();
}
if (clientProtocol != null) {
boolean isSafeMode = clientProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
report.setSafeMode(isSafeMode);
}
} catch (Exception e) {
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
goiri marked this conversation as resolved.
Show resolved Hide resolved
this.clientProtocol = null;
}
}

/**
* Get the parameters for a Namenode from JMX and add them to the report.
* @param address Web interface of the Namenode to monitor.
Expand All @@ -415,6 +447,34 @@ private void updateJMXParameters(
}
}

/**
* Get the HA status for a Namenode via RPC and add them to the report.
* @param report Namenode status report updating with HA status information data.
*/
private void updateHAStatusParameters(NamenodeStatusReport report) {
if (localTarget != null) {
try {
// Determine if NN is active
if (localTargetHAProtocol == null) {
localTargetHAProtocol = localTarget.getHealthMonitorProxy(
conf, this.healthMonitorTimeoutMs);
LOG.debug("Get HA status with address {}", lifelineAddress);
}
HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
report.setHAServiceState(status.getState());
} catch (Throwable e) {
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
LOG.error("HA for {} is not enabled", getNamenodeDesc());
localTarget = null;
} else {
// Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}", getNamenodeDesc(), e);
}
localTargetHAProtocol = null;
}
}
}

/**
* Fetches NamenodeInfo metrics from namenode.
* @param address Web interface of the Namenode to monitor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_ROUTER_HEALTH_MONITOR_TIMEOUT =
FEDERATION_ROUTER_PREFIX + "health.monitor.timeout";
public static final long DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT =
TimeUnit.SECONDS.toMillis(30);
public static final String DFS_ROUTER_MONITOR_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.namenode";
public static final String DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@
</description>
</property>

<property>
<name>dfs.federation.router.health.monitor.timeout</name>
<value>30s</value>
<description>
Time out for Router to obtain HAServiceStatus from NameNode.
</description>
</property>

<property>
<name>dfs.federation.router.heartbeat-state.interval</name>
<value>5s</value>
Expand Down