Skip to content

Commit

Permalink
YARN-11525. [Federation] Router CLI Supports Save the SubClusterPolic…
Browse files Browse the repository at this point in the history
…yConfiguration Of Queues. (#5816)
  • Loading branch information
slfan1989 authored Jul 19, 2023
1 parent 84dd624 commit 23ecc32
Show file tree
Hide file tree
Showing 26 changed files with 1,322 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;


@Private
public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol {
Expand Down Expand Up @@ -173,4 +176,17 @@ NodesToAttributesMappingResponse mapAttributesToNodes(
@Idempotent
DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request)
throws YarnException, IOException;

/**
* In YARN-Federation mode, We will be storing the Policy information for Queues.
*
* @param request saveFederationQueuePolicy Request
* @return Response from saveFederationQueuePolicy.
* @throws YarnException exceptions from yarn servers.
* @throws IOException if an IO error occurred.
*/
@Private
@Idempotent
SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
SaveFederationQueuePolicyRequest request) throws YarnException, IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* Queue weights for representing Federation.
*/
@Private
@Unstable
public abstract class FederationQueueWeight {

/**
* The FederationQueueWeight object consists of three parts:
* routerWeight, amrmWeight, and headRoomAlpha.
*
* @param routerWeight Weight for routing applications to different subclusters.
* We will route the application to different subclusters based on the configured weights.
* Assuming we have two subclusters, SC-1 and SC-2,
* with a weight of 0.7 for SC-1 and 0.3 for SC-2,
* the application will be allocated in such a way
* that 70% of the applications will be assigned to SC-1 and 30% to SC-2.
*
* @param amrmWeight Weight for resource request from ApplicationMaster (AM) to
* different subclusters' Resource Manager (RM).
* Assuming we have two subclusters, SC-1 and SC-2,
* with a weight of 0.6 for SC-1 and 0.4 for SC-2,
* When AM requesting resources,
* 60% of the requests will be made to the Resource Manager (RM) of SC-1
* and 40% to the RM of SC-2.
*
* @param headRoomAlpha
* used by policies that balance weight-based and load-based considerations in their decisions.
* For policies that use this parameter,
* values close to 1 indicate that most of the decision
* should be based on currently observed headroom from various sub-clusters,
* values close to zero, indicate that the decision should be
* mostly based on weights and practically ignore current load.
*
* @return FederationQueueWeight
*/
@Private
@Unstable
public static FederationQueueWeight newInstance(String routerWeight,
String amrmWeight, String headRoomAlpha) {
FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class);
federationQueueWeight.setRouterWeight(routerWeight);
federationQueueWeight.setAmrmWeight(amrmWeight);
federationQueueWeight.setHeadRoomAlpha(headRoomAlpha);
return federationQueueWeight;
}

@Public
@Unstable
public abstract String getRouterWeight();

@Public
@Unstable
public abstract void setRouterWeight(String routerWeight);

@Public
@Unstable
public abstract String getAmrmWeight();

@Public
@Unstable
public abstract void setAmrmWeight(String amrmWeight);

@Public
@Unstable
public abstract String getHeadRoomAlpha();

@Public
@Unstable
public abstract void setHeadRoomAlpha(String headRoomAlpha);

private static final String COMMA = ",";
private static final String COLON = ":";

/**
* Check if the subCluster Queue Weight Ratio are valid.
*
* This method can be used to validate RouterPolicyWeight and AMRMPolicyWeight.
*
* @param subClusterWeight the weight ratios of subClusters.
* @throws YarnException exceptions from yarn servers.
*/
public static void checkSubClusterQueueWeightRatioValid(String subClusterWeight)
throws YarnException {
// The subClusterWeight cannot be empty.
if (StringUtils.isBlank(subClusterWeight)) {
throw new YarnException("subClusterWeight can't be empty!");
}

// SC-1:0.7,SC-2:0.3 -> [SC-1:0.7,SC-2:0.3]
String[] subClusterWeights = subClusterWeight.split(COMMA);
Map<String, Double> subClusterWeightMap = new LinkedHashMap<>();
for (String subClusterWeightItem : subClusterWeights) {
// SC-1:0.7 -> [SC-1,0.7]
// We require that the parsing result is not empty and must have a length of 2.
String[] subClusterWeightItems = subClusterWeightItem.split(COLON);
if (subClusterWeightItems == null || subClusterWeightItems.length != 2) {
throw new YarnException("The subClusterWeight cannot be empty," +
" and the subClusterWeight size must be 2. (eg.SC-1,0.2)");
}
subClusterWeightMap.put(subClusterWeightItems[0], Double.valueOf(subClusterWeightItems[1]));
}

// The sum of weight ratios for subClusters must be equal to 1.
double sum = subClusterWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
boolean isValid = Math.abs(sum - 1.0) < 1e-6; // Comparing with a tolerance of 1e-6

if (!isValid) {
throw new YarnException("The sum of ratios for all subClusters must be equal to 1.");
}
}

/**
* Check if HeadRoomAlpha is a number and is between 0 and 1.
*
* @param headRoomAlpha headroomalpha.
* @throws YarnException exceptions from yarn servers.
*/
public static void checkHeadRoomAlphaValid(String headRoomAlpha) throws YarnException {
if (!isNumeric(headRoomAlpha)) {
throw new YarnException("HeadRoomAlpha must be a number.");
}

double dHeadRoomAlpha = Double.parseDouble(headRoomAlpha);
if (!(dHeadRoomAlpha >= 0 && dHeadRoomAlpha <= 1)) {
throw new YarnException("HeadRoomAlpha must be between 0-1.");
}
}

/**
* Determines whether the given value is a number.
*
* @param value given value.
* @return true, is a number, false, not a number.
*/
protected static boolean isNumeric(String value) {
return NumberUtils.isCreatable(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;

/**
* In Yarn Federation mode, this class is used to save the queue policy interface.
*
* This class stores the queue, the weight of the queue,
* and the PolicyManagerClassName information of the queue.
*/
@Private
@Unstable
public abstract class SaveFederationQueuePolicyRequest {

@Private
@Unstable
public static SaveFederationQueuePolicyRequest newInstance(
String queue, FederationQueueWeight federationQueueWeight, String policyManagerClassName) {
SaveFederationQueuePolicyRequest request =
Records.newRecord(SaveFederationQueuePolicyRequest.class);
request.setQueue(queue);
request.setFederationQueueWeight(federationQueueWeight);
request.setPolicyManagerClassName(policyManagerClassName);
return request;
}

@Public
@Unstable
public abstract FederationQueueWeight getFederationQueueWeight();

@Private
@Unstable
public abstract void setFederationQueueWeight(FederationQueueWeight federationQueueWeight);

@Public
@Unstable
public abstract String getQueue();

@Public
@Unstable
public abstract void setQueue(String queue);

@Public
@Unstable
public abstract String getPolicyManagerClassName();

@Public
@Unstable
public abstract void setPolicyManagerClassName(String className);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;

@Private
@Unstable
public abstract class SaveFederationQueuePolicyResponse {
public static SaveFederationQueuePolicyResponse newInstance() {
return Records.newRecord(SaveFederationQueuePolicyResponse.class);
}

public static SaveFederationQueuePolicyResponse newInstance(String msg) {
SaveFederationQueuePolicyResponse response =
Records.newRecord(SaveFederationQueuePolicyResponse.class);
response.setMessage(msg);
return response;
}

@Public
@Unstable
public abstract String getMessage();

@Public
@Unstable
public abstract void setMessage(String msg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ service ResourceManagerAdministrationProtocolService {
rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto);
rpc mapAttributesToNodes(NodesToAttributesMappingRequestProto) returns (NodesToAttributesMappingResponseProto);
rpc deregisterSubCluster(DeregisterSubClusterRequestProto) returns (DeregisterSubClusterResponseProto);
rpc saveFederationQueuePolicy(SaveFederationQueuePolicyRequestProto) returns (SaveFederationQueuePolicyResponseProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ message DeregisterSubClusterResponseProto {
repeated DeregisterSubClustersProto deregisterSubClusters = 1;
}

message SaveFederationQueuePolicyRequestProto {
required string queue = 1;
required FederationQueueWeightProto federationQueueWeight = 2;
optional string policyManagerClassName = 3;
}

message SaveFederationQueuePolicyResponseProto {
required string message = 1;
}

//////////////////////////////////////////////////////////////////
///////////// RM Failover related records ////////////////////////
//////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ message DeregisterSubClustersProto {
optional string subClusterState = 5;
}

message FederationQueueWeightProto {
optional string routerWeight = 1;
optional string amrmWeight = 2;
optional string headRoomAlpha = 3;
}

////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol /////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 23ecc32

Please sign in to comment.