Skip to content

Commit

Permalink
YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, g…
Browse files Browse the repository at this point in the history
…etNodesToAttributes API's for Federation (#4610)
  • Loading branch information
slfan1989 authored Jul 25, 2022
1 parent 2f49eec commit edeb995
Show file tree
Hide file tree
Showing 7 changed files with 604 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public final class RouterMetrics {
private MutableGaugeInt numGetResourceProfilesFailedRetrieved;
@Metric("# of getResourceProfile failed to be retrieved")
private MutableGaugeInt numGetResourceProfileFailedRetrieved;
@Metric("# of getAttributesToNodes failed to be retrieved")
private MutableGaugeInt numGetAttributesToNodesFailedRetrieved;
@Metric("# of getClusterNodeAttributes failed to be retrieved")
private MutableGaugeInt numGetClusterNodeAttributesFailedRetrieved;
@Metric("# of getNodesToAttributes failed to be retrieved")
private MutableGaugeInt numGetNodesToAttributesFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand All @@ -101,14 +107,11 @@ public final class RouterMetrics {
private MutableRate totalSucceededAppsCreated;
@Metric("Total number of successful Retrieved app reports and latency(ms)")
private MutableRate totalSucceededAppsRetrieved;
@Metric("Total number of successful Retrieved multiple apps reports and "
+ "latency(ms)")
@Metric("Total number of successful Retrieved multiple apps reports and latency(ms)")
private MutableRate totalSucceededMultipleAppsRetrieved;
@Metric("Total number of successful Retrieved " +
"appAttempt reports and latency(ms)")
@Metric("Total number of successful Retrieved appAttempt reports and latency(ms)")
private MutableRate totalSucceededAppAttemptsRetrieved;
@Metric("Total number of successful Retrieved getClusterMetrics and "
+ "latency(ms)")
@Metric("Total number of successful Retrieved getClusterMetrics and latency(ms)")
private MutableRate totalSucceededGetClusterMetricsRetrieved;
@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
private MutableRate totalSucceededGetClusterNodesRetrieved;
Expand Down Expand Up @@ -144,9 +147,14 @@ public final class RouterMetrics {
private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;
@Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)")
private MutableRate totalSucceededGetResourceProfilesRetrieved;

@Metric("Total number of successful Retrieved getResourceProfile and latency(ms)")
private MutableRate totalSucceededGetResourceProfileRetrieved;
@Metric("Total number of successful Retrieved getAttributesToNodes and latency(ms)")
private MutableRate totalSucceededGetAttributesToNodesRetrieved;
@Metric("Total number of successful Retrieved getClusterNodeAttributes and latency(ms)")
private MutableRate totalSucceededGetClusterNodeAttributesRetrieved;
@Metric("Total number of successful Retrieved getNodesToAttributes and latency(ms)")
private MutableRate totalSucceededGetNodesToAttributesRetrieved;

/**
* Provide quantile counters for all latencies.
Expand Down Expand Up @@ -176,6 +184,10 @@ public final class RouterMetrics {
private MutableQuantiles moveApplicationAcrossQueuesLatency;
private MutableQuantiles getResourceProfilesLatency;
private MutableQuantiles getResourceProfileLatency;
private MutableQuantiles getAttributesToNodesLatency;
private MutableQuantiles getClusterNodeAttributesLatency;

private MutableQuantiles getNodesToAttributesLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -274,6 +286,18 @@ private RouterMetrics() {
getResourceProfileLatency =
registry.newQuantiles("getResourceProfileLatency",
"latency of get resource profile timeouts", "ops", "latency", 10);

getAttributesToNodesLatency =
registry.newQuantiles("getAttributesToNodesLatency",
"latency of get attributes to nodes timeouts", "ops", "latency", 10);

getClusterNodeAttributesLatency =
registry.newQuantiles("getClusterNodeAttributesLatency",
"latency of get cluster node attributes timeouts", "ops", "latency", 10);

getNodesToAttributesLatency =
registry.newQuantiles("getNodesToAttributesLatency",
"latency of get nodes to attributes timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -420,6 +444,21 @@ public long getNumSucceededGetResourceProfileRetrieved() {
return totalSucceededGetResourceProfileRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetAttributesToNodesRetrieved() {
return totalSucceededGetAttributesToNodesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterNodeAttributesRetrieved() {
return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetNodesToAttributesRetrieved() {
return totalSucceededGetNodesToAttributesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -545,6 +584,21 @@ public double getLatencySucceededGetResourceProfileRetrieved() {
return totalSucceededGetResourceProfileRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetAttributesToNodesRetrieved() {
return totalSucceededGetAttributesToNodesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterNodeAttributesRetrieved() {
return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetNodesToAttributesRetrieved() {
return totalSucceededGetNodesToAttributesRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -666,6 +720,18 @@ public int getResourceProfileFailedRetrieved() {
return numGetResourceProfileFailedRetrieved.value();
}

public int getAttributesToNodesFailedRetrieved() {
return numGetAttributesToNodesFailedRetrieved.value();
}

public int getClusterNodeAttributesFailedRetrieved() {
return numGetClusterNodeAttributesFailedRetrieved.value();
}

public int getNodesToAttributesFailedRetrieved() {
return numGetNodesToAttributesFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -791,6 +857,21 @@ public void succeededGetResourceProfileRetrieved(long duration) {
getResourceProfileLatency.add(duration);
}

public void succeededGetAttributesToNodesRetrieved(long duration) {
totalSucceededGetAttributesToNodesRetrieved.add(duration);
getAttributesToNodesLatency.add(duration);
}

public void succeededGetClusterNodeAttributesRetrieved(long duration) {
totalSucceededGetClusterNodeAttributesRetrieved.add(duration);
getClusterNodeAttributesLatency.add(duration);
}

public void succeededGetNodesToAttributesRetrieved(long duration) {
totalSucceededGetNodesToAttributesRetrieved.add(duration);
getNodesToAttributesLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -890,4 +971,16 @@ public void incrGetResourceProfilesFailedRetrieved() {
public void incrGetResourceProfileFailedRetrieved() {
numGetResourceProfileFailedRetrieved.incr();
}

public void incrGetAttributesToNodesFailedRetrieved() {
numGetAttributesToNodesFailedRetrieved.incr();
}

public void incrGetClusterNodeAttributesFailedRetrieved() {
numGetClusterNodeAttributesFailedRetrieved.incr();
}

public void incrGetNodesToAttributesFailedRetrieved() {
numGetNodesToAttributesFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public void init(String userName) {
federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random(System.currentTimeMillis());


int numThreads = getConf().getInt(
YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
Expand All @@ -195,12 +194,11 @@ public void init(String userName) {
LOG.error(e.getMessage());
}

numSubmitRetries =
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
numSubmitRetries = conf.getInt(
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);

clientRMProxies =
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
clientRMProxies = new ConcurrentHashMap<>();
routerMetrics = RouterMetrics.getMetrics();

returnPartialReport = conf.getBoolean(
Expand All @@ -227,19 +225,17 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
ApplicationClientProtocol clientRMProxy = null;
try {
boolean serviceAuthEnabled = getConf().getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
UserGroupInformation realUser = user;
if (serviceAuthEnabled) {
realUser = UserGroupInformation.createProxyUser(
user.getShortUserName(), UserGroupInformation.getLoginUser());
realUser = UserGroupInformation.createProxyUser(user.getShortUserName(),
UserGroupInformation.getLoginUser());
}
clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
ApplicationClientProtocol.class, subClusterId, realUser);
} catch (Exception e) {
RouterServerUtil.logAndThrowException(
"Unable to create the interface to reach the SubCluster "
+ subClusterId,
e);
"Unable to create the interface to reach the SubCluster " + subClusterId, e);
}

clientRMProxies.put(subClusterId, clientRMProxy);
Expand Down Expand Up @@ -287,8 +283,7 @@ public GetNewApplicationResponse getNewApplication(

for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
LOG.debug(
"getNewApplication try #{} on SubCluster {}", i, subClusterId);
LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
GetNewApplicationResponse response = null;
Expand Down Expand Up @@ -410,7 +405,7 @@ public SubmitApplicationResponse submitApplication(
ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId();

List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
List<SubClusterId> blacklist = new ArrayList<>();

for (int i = 0; i < numSubmitRetries; ++i) {

Expand Down Expand Up @@ -561,8 +556,8 @@ public KillApplicationResponse forceKillApplication(
}

if (response == null) {
LOG.error("No response when attempting to kill the application "
+ applicationId + " to SubCluster " + subClusterId.getId());
LOG.error("No response when attempting to kill the application {} to SubCluster {}.",
applicationId, subClusterId.getId());
}

long stopTime = clock.getTime();
Expand Down Expand Up @@ -1015,7 +1010,7 @@ public GetLabelsToNodesResponse getLabelsToNodes(
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes",
new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
Collection<GetLabelsToNodesResponse> labelNodes;
try {
labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
Expand All @@ -1040,7 +1035,7 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels",
new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
Collection<GetClusterNodeLabelsResponse> nodeLabels;
try {
nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
Expand Down Expand Up @@ -1528,20 +1523,75 @@ public void shutdown() {
@Override
public GetAttributesToNodesResponse getAttributesToNodes(
GetAttributesToNodesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getNodeAttributes() == null) {
routerMetrics.incrGetAttributesToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getAttributesToNodes request " +
"or nodeAttributes.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getAttributesToNodes",
new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request});
Collection<GetAttributesToNodesResponse> attributesToNodesResponses = null;
try {
attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetAttributesToNodesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetAttributesToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.",
ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetAttributesToNodesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeAttributesToNodesResponse(attributesToNodesResponses);
}

@Override
public GetClusterNodeAttributesResponse getClusterNodeAttributes(
GetClusterNodeAttributesRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
GetClusterNodeAttributesRequest request) throws YarnException, IOException {
if (request == null) {
routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getClusterNodeAttributes request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeAttributes",
new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request});
Collection<GetClusterNodeAttributesResponse> clusterNodeAttributesResponses = null;
try {
clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetClusterNodeAttributesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get cluster node attributes due " +
" to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodeAttributesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeClusterNodeAttributesResponse(clusterNodeAttributesResponses);
}

@Override
public GetNodesToAttributesResponse getNodesToAttributes(
GetNodesToAttributesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getHostNames() == null) {
routerMetrics.incrGetNodesToAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getNodesToAttributes request or " +
"hostNames.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getNodesToAttributes",
new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request});
Collection<GetNodesToAttributesResponse> nodesToAttributesResponses = null;
try {
nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetNodesToAttributesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetNodesToAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get nodes to attributes due " +
" to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetNodesToAttributesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeNodesToAttributesResponse(nodesToAttributesResponses);
}

protected SubClusterId getApplicationHomeSubCluster(
Expand Down
Loading

0 comments on commit edeb995

Please sign in to comment.