Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation #4610

Merged
merged 10 commits into from
Jul 25, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public final class RouterMetrics {
private MutableGaugeInt numGetResourceProfilesFailedRetrieved;
@Metric("# of getResourceProfile failed to be retrieved")
private MutableGaugeInt numGetResourceProfileFailedRetrieved;
@Metric("# of getAttributesToNodes failed to be retrieved")
private MutableGaugeInt numGetAttributesToNodesFailedRetrieved;
@Metric("# of getClusterNodeAttributes failed to be retrieved")
private MutableGaugeInt numGetClusterNodeAttributesFailedRetrieved;
@Metric("# of getNodesToAttributes failed to be retrieved")
private MutableGaugeInt numGetNodesToAttributesFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand All @@ -101,14 +107,11 @@ public final class RouterMetrics {
private MutableRate totalSucceededAppsCreated;
@Metric("Total number of successful Retrieved app reports and latency(ms)")
private MutableRate totalSucceededAppsRetrieved;
@Metric("Total number of successful Retrieved multiple apps reports and "
+ "latency(ms)")
@Metric("Total number of successful Retrieved multiple apps reports and latency(ms)")
private MutableRate totalSucceededMultipleAppsRetrieved;
@Metric("Total number of successful Retrieved " +
"appAttempt reports and latency(ms)")
@Metric("Total number of successful Retrieved appAttempt reports and latency(ms)")
private MutableRate totalSucceededAppAttemptsRetrieved;
@Metric("Total number of successful Retrieved getClusterMetrics and "
+ "latency(ms)")
@Metric("Total number of successful Retrieved getClusterMetrics and latency(ms)")
private MutableRate totalSucceededGetClusterMetricsRetrieved;
@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
private MutableRate totalSucceededGetClusterNodesRetrieved;
Expand Down Expand Up @@ -144,9 +147,14 @@ public final class RouterMetrics {
private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;
@Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)")
private MutableRate totalSucceededGetResourceProfilesRetrieved;

@Metric("Total number of successful Retrieved getResourceProfile and latency(ms)")
private MutableRate totalSucceededGetResourceProfileRetrieved;
@Metric("Total number of successful Retrieved getAttributesToNodes and latency(ms)")
private MutableRate totalSucceededGetAttributesToNodesRetrieved;
@Metric("Total number of successful Retrieved getClusterNodeAttributes and latency(ms)")
private MutableRate totalSucceededGetClusterNodeAttributesRetrieved;
@Metric("Total number of successful Retrieved getNodesToAttributes and latency(ms)")
private MutableRate totalSucceededGetNodesToAttributesRetrieved;

/**
* Provide quantile counters for all latencies.
Expand Down Expand Up @@ -176,6 +184,10 @@ public final class RouterMetrics {
private MutableQuantiles moveApplicationAcrossQueuesLatency;
private MutableQuantiles getResourceProfilesLatency;
private MutableQuantiles getResourceProfileLatency;
private MutableQuantiles getAttributesToNodesLatency;
private MutableQuantiles getClusterNodeAttributesLatency;

private MutableQuantiles getNodesToAttributesLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -274,6 +286,18 @@ private RouterMetrics() {
getResourceProfileLatency =
registry.newQuantiles("getResourceProfileLatency",
"latency of get resource profile timeouts", "ops", "latency", 10);

getAttributesToNodesLatency =
registry.newQuantiles("getAttributesToNodesLatency",
"latency of get attributes to nodes timeouts", "ops", "latency", 10);

getClusterNodeAttributesLatency =
registry.newQuantiles("getClusterNodeAttributesLatency",
"latency of get cluster node attributes timeouts", "ops", "latency", 10);

getNodesToAttributesLatency =
registry.newQuantiles("getNodesToAttributesLatency",
"latency of get nodes to attributes timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -420,6 +444,21 @@ public long getNumSucceededGetResourceProfileRetrieved() {
return totalSucceededGetResourceProfileRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetAttributesToNodesRetrieved() {
return totalSucceededGetAttributesToNodesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterNodeAttributesRetrieved() {
return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetNodesToAttributesRetrieved() {
return totalSucceededGetNodesToAttributesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -545,6 +584,21 @@ public double getLatencySucceededGetResourceProfileRetrieved() {
return totalSucceededGetResourceProfileRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetAttributesToNodesRetrieved() {
return totalSucceededGetAttributesToNodesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterNodeAttributesRetrieved() {
return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetNodesToAttributesRetrieved() {
return totalSucceededGetNodesToAttributesRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -666,6 +720,18 @@ public int getResourceProfileFailedRetrieved() {
return numGetResourceProfileFailedRetrieved.value();
}

public int getAttributesToNodesFailedRetrieved() {
return numGetAttributesToNodesFailedRetrieved.value();
}

public int getClusterNodeAttributesFailedRetrieved() {
return numGetClusterNodeAttributesFailedRetrieved.value();
}

public int getNodesToAttributesFailedRetrieved() {
return numGetNodesToAttributesFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -791,6 +857,21 @@ public void succeededGetResourceProfileRetrieved(long duration) {
getResourceProfileLatency.add(duration);
}

public void succeededGetAttributesToNodesRetrieved(long duration) {
totalSucceededGetAttributesToNodesRetrieved.add(duration);
getAttributesToNodesLatency.add(duration);
}

public void succeededGetClusterNodeAttributesRetrieved(long duration) {
totalSucceededGetClusterNodeAttributesRetrieved.add(duration);
getClusterNodeAttributesLatency.add(duration);
}

public void succeededGetNodesToAttributesRetrieved(long duration) {
totalSucceededGetNodesToAttributesRetrieved.add(duration);
getNodesToAttributesLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -890,4 +971,16 @@ public void incrGetResourceProfilesFailedRetrieved() {
public void incrGetResourceProfileFailedRetrieved() {
numGetResourceProfileFailedRetrieved.incr();
}

public void incrGetAttributesToNodesFailedRetrieved() {
numGetAttributesToNodesFailedRetrieved.incr();
}

public void incrGetClusterNodeAttributesFailedRetrieved() {
numGetClusterNodeAttributesFailedRetrieved.incr();
}

public void incrGetNodesToAttributesFailedRetrieved() {
numGetNodesToAttributesFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public void init(String userName) {
federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random(System.currentTimeMillis());


int numThreads = getConf().getInt(
YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
Expand All @@ -196,11 +195,11 @@ public void init(String userName) {
}

numSubmitRetries =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indentation is not very clean.
Let's make it numSubmitRetries = conf.getInt(

conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
conf.getInt(
goiri marked this conversation as resolved.
Show resolved Hide resolved
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
goiri marked this conversation as resolved.
Show resolved Hide resolved

clientRMProxies =
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
clientRMProxies = new ConcurrentHashMap<>();
routerMetrics = RouterMetrics.getMetrics();

returnPartialReport = conf.getBoolean(
Expand All @@ -227,19 +226,17 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
ApplicationClientProtocol clientRMProxy = null;
try {
boolean serviceAuthEnabled = getConf().getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
UserGroupInformation realUser = user;
if (serviceAuthEnabled) {
realUser = UserGroupInformation.createProxyUser(
user.getShortUserName(), UserGroupInformation.getLoginUser());
realUser = UserGroupInformation.createProxyUser(user.getShortUserName(),
UserGroupInformation.getLoginUser());
}
clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
ApplicationClientProtocol.class, subClusterId, realUser);
} catch (Exception e) {
RouterServerUtil.logAndThrowException(
"Unable to create the interface to reach the SubCluster "
+ subClusterId,
e);
"Unable to create the interface to reach the SubCluster " + subClusterId, e);
}

clientRMProxies.put(subClusterId, clientRMProxy);
Expand Down Expand Up @@ -287,8 +284,7 @@ public GetNewApplicationResponse getNewApplication(

for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
LOG.debug(
"getNewApplication try #{} on SubCluster {}", i, subClusterId);
LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
GetNewApplicationResponse response = null;
Expand Down Expand Up @@ -410,7 +406,7 @@ public SubmitApplicationResponse submitApplication(
ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId();

List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
List<SubClusterId> blacklist = new ArrayList<>();

for (int i = 0; i < numSubmitRetries; ++i) {

Expand Down Expand Up @@ -561,8 +557,8 @@ public KillApplicationResponse forceKillApplication(
}

if (response == null) {
LOG.error("No response when attempting to kill the application "
+ applicationId + " to SubCluster " + subClusterId.getId());
LOG.error("No response when attempting to kill the application {} to SubCluster {}.",
applicationId, subClusterId.getId());
}

long stopTime = clock.getTime();
Expand Down Expand Up @@ -1015,7 +1011,7 @@ public GetLabelsToNodesResponse getLabelsToNodes(
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes",
new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
Collection<GetLabelsToNodesResponse> labelNodes;
try {
labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
Expand All @@ -1040,7 +1036,7 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels",
new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
Collection<GetClusterNodeLabelsResponse> nodeLabels;
try {
nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
Expand Down Expand Up @@ -1528,20 +1524,75 @@ public void shutdown() {
@Override
public GetAttributesToNodesResponse getAttributesToNodes(
GetAttributesToNodesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getNodeAttributes() == null) {
routerMetrics.incrGetAttributesToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getAttributesToNodes request " +
"or nodeAttributes.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getAttributesToNodes",
new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request});
Collection<GetAttributesToNodesResponse> attributesToNodesResponses = null;
try {
attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetAttributesToNodesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetAttributesToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.",
ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetAttributesToNodesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeAttributesToNodesResponse(attributesToNodesResponses);
}

@Override
public GetClusterNodeAttributesResponse getClusterNodeAttributes(
GetClusterNodeAttributesRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
GetClusterNodeAttributesRequest request) throws YarnException, IOException {
if (request == null) {
routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getClusterNodeAttributes request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeAttributes",
new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request});
Collection<GetClusterNodeAttributesResponse> clusterNodeAttributesResponses = null;
try {
clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetClusterNodeAttributesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get cluster node attributes due " +
" to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodeAttributesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeClusterNodeAttributesResponse(clusterNodeAttributesResponses);
}

@Override
public GetNodesToAttributesResponse getNodesToAttributes(
GetNodesToAttributesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getHostNames() == null) {
routerMetrics.incrGetNodesToAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getNodesToAttributes request or " +
"hostNames.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getNodesToAttributes",
new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request});
Collection<GetNodesToAttributesResponse> nodesToAttributesResponses = null;
try {
nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetNodesToAttributesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetNodesToAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get nodes to attributes due " +
" to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetNodesToAttributesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeNodesToAttributesResponse(nodesToAttributesResponses);
}

protected SubClusterId getApplicationHomeSubCluster(
Expand Down
Loading