diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index fd6029cb64721..a4967960a0331 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -58,6 +58,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; + @Private public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol { @@ -173,4 +176,17 @@ NodesToAttributesMappingResponse mapAttributesToNodes( @Idempotent DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request) throws YarnException, IOException; + + /** + * In YARN-Federation mode, We will be storing the Policy information for Queues. + * + * @param request saveFederationQueuePolicy Request + * @return Response from saveFederationQueuePolicy. + * @throws YarnException exceptions from yarn servers. + * @throws IOException if an IO error occurred. + */ + @Private + @Idempotent + SaveFederationQueuePolicyResponse saveFederationQueuePolicy( + SaveFederationQueuePolicyRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java new file mode 100644 index 0000000000000..c63ee1b713d0d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Queue weights for representing Federation. + */ +@Private +@Unstable +public abstract class FederationQueueWeight { + + /** + * The FederationQueueWeight object consists of three parts: + * routerWeight, amrmWeight, and headRoomAlpha. + * + * @param routerWeight Weight for routing applications to different subclusters. + * We will route the application to different subclusters based on the configured weights. + * Assuming we have two subclusters, SC-1 and SC-2, + * with a weight of 0.7 for SC-1 and 0.3 for SC-2, + * the application will be allocated in such a way + * that 70% of the applications will be assigned to SC-1 and 30% to SC-2. + * + * @param amrmWeight Weight for resource request from ApplicationMaster (AM) to + * different subclusters' Resource Manager (RM). + * Assuming we have two subclusters, SC-1 and SC-2, + * with a weight of 0.6 for SC-1 and 0.4 for SC-2, + * When AM requesting resources, + * 60% of the requests will be made to the Resource Manager (RM) of SC-1 + * and 40% to the RM of SC-2. + * + * @param headRoomAlpha + * used by policies that balance weight-based and load-based considerations in their decisions. + * For policies that use this parameter, + * values close to 1 indicate that most of the decision + * should be based on currently observed headroom from various sub-clusters, + * values close to zero, indicate that the decision should be + * mostly based on weights and practically ignore current load. + * + * @return FederationQueueWeight + */ + @Private + @Unstable + public static FederationQueueWeight newInstance(String routerWeight, + String amrmWeight, String headRoomAlpha) { + FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class); + federationQueueWeight.setRouterWeight(routerWeight); + federationQueueWeight.setAmrmWeight(amrmWeight); + federationQueueWeight.setHeadRoomAlpha(headRoomAlpha); + return federationQueueWeight; + } + + @Public + @Unstable + public abstract String getRouterWeight(); + + @Public + @Unstable + public abstract void setRouterWeight(String routerWeight); + + @Public + @Unstable + public abstract String getAmrmWeight(); + + @Public + @Unstable + public abstract void setAmrmWeight(String amrmWeight); + + @Public + @Unstable + public abstract String getHeadRoomAlpha(); + + @Public + @Unstable + public abstract void setHeadRoomAlpha(String headRoomAlpha); + + private static final String COMMA = ","; + private static final String COLON = ":"; + + /** + * Check if the subCluster Queue Weight Ratio are valid. + * + * This method can be used to validate RouterPolicyWeight and AMRMPolicyWeight. + * + * @param subClusterWeight the weight ratios of subClusters. + * @throws YarnException exceptions from yarn servers. + */ + public static void checkSubClusterQueueWeightRatioValid(String subClusterWeight) + throws YarnException { + // The subClusterWeight cannot be empty. + if (StringUtils.isBlank(subClusterWeight)) { + throw new YarnException("subClusterWeight can't be empty!"); + } + + // SC-1:0.7,SC-2:0.3 -> [SC-1:0.7,SC-2:0.3] + String[] subClusterWeights = subClusterWeight.split(COMMA); + Map subClusterWeightMap = new LinkedHashMap<>(); + for (String subClusterWeightItem : subClusterWeights) { + // SC-1:0.7 -> [SC-1,0.7] + // We require that the parsing result is not empty and must have a length of 2. + String[] subClusterWeightItems = subClusterWeightItem.split(COLON); + if (subClusterWeightItems == null || subClusterWeightItems.length != 2) { + throw new YarnException("The subClusterWeight cannot be empty," + + " and the subClusterWeight size must be 2. (eg.SC-1,0.2)"); + } + subClusterWeightMap.put(subClusterWeightItems[0], Double.valueOf(subClusterWeightItems[1])); + } + + // The sum of weight ratios for subClusters must be equal to 1. + double sum = subClusterWeightMap.values().stream().mapToDouble(Double::doubleValue).sum(); + boolean isValid = Math.abs(sum - 1.0) < 1e-6; // Comparing with a tolerance of 1e-6 + + if (!isValid) { + throw new YarnException("The sum of ratios for all subClusters must be equal to 1."); + } + } + + /** + * Check if HeadRoomAlpha is a number and is between 0 and 1. + * + * @param headRoomAlpha headroomalpha. + * @throws YarnException exceptions from yarn servers. + */ + public static void checkHeadRoomAlphaValid(String headRoomAlpha) throws YarnException { + if (!isNumeric(headRoomAlpha)) { + throw new YarnException("HeadRoomAlpha must be a number."); + } + + double dHeadRoomAlpha = Double.parseDouble(headRoomAlpha); + if (!(dHeadRoomAlpha >= 0 && dHeadRoomAlpha <= 1)) { + throw new YarnException("HeadRoomAlpha must be between 0-1."); + } + } + + /** + * Determines whether the given value is a number. + * + * @param value given value. + * @return true, is a number, false, not a number. + */ + protected static boolean isNumeric(String value) { + return NumberUtils.isCreatable(value); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyRequest.java new file mode 100644 index 0000000000000..c8a201a97ef8a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyRequest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * In Yarn Federation mode, this class is used to save the queue policy interface. + * + * This class stores the queue, the weight of the queue, + * and the PolicyManagerClassName information of the queue. + */ +@Private +@Unstable +public abstract class SaveFederationQueuePolicyRequest { + + @Private + @Unstable + public static SaveFederationQueuePolicyRequest newInstance( + String queue, FederationQueueWeight federationQueueWeight, String policyManagerClassName) { + SaveFederationQueuePolicyRequest request = + Records.newRecord(SaveFederationQueuePolicyRequest.class); + request.setQueue(queue); + request.setFederationQueueWeight(federationQueueWeight); + request.setPolicyManagerClassName(policyManagerClassName); + return request; + } + + @Public + @Unstable + public abstract FederationQueueWeight getFederationQueueWeight(); + + @Private + @Unstable + public abstract void setFederationQueueWeight(FederationQueueWeight federationQueueWeight); + + @Public + @Unstable + public abstract String getQueue(); + + @Public + @Unstable + public abstract void setQueue(String queue); + + @Public + @Unstable + public abstract String getPolicyManagerClassName(); + + @Public + @Unstable + public abstract void setPolicyManagerClassName(String className); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyResponse.java new file mode 100644 index 0000000000000..827734f676e72 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyResponse.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +@Private +@Unstable +public abstract class SaveFederationQueuePolicyResponse { + public static SaveFederationQueuePolicyResponse newInstance() { + return Records.newRecord(SaveFederationQueuePolicyResponse.class); + } + + public static SaveFederationQueuePolicyResponse newInstance(String msg) { + SaveFederationQueuePolicyResponse response = + Records.newRecord(SaveFederationQueuePolicyResponse.class); + response.setMessage(msg); + return response; + } + + @Public + @Unstable + public abstract String getMessage(); + + @Public + @Unstable + public abstract void setMessage(String msg); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index 0546e334d238e..809817fa9f9d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -48,4 +48,5 @@ service ResourceManagerAdministrationProtocolService { rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto); rpc mapAttributesToNodes(NodesToAttributesMappingRequestProto) returns (NodesToAttributesMappingResponseProto); rpc deregisterSubCluster(DeregisterSubClusterRequestProto) returns (DeregisterSubClusterResponseProto); + rpc saveFederationQueuePolicy(SaveFederationQueuePolicyRequestProto) returns (SaveFederationQueuePolicyResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index b5165cbd3720d..4e330fb1e632f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -170,6 +170,16 @@ message DeregisterSubClusterResponseProto { repeated DeregisterSubClustersProto deregisterSubClusters = 1; } +message SaveFederationQueuePolicyRequestProto { + required string queue = 1; + required FederationQueueWeightProto federationQueueWeight = 2; + optional string policyManagerClassName = 3; +} + +message SaveFederationQueuePolicyResponseProto { + required string message = 1; +} + ////////////////////////////////////////////////////////////////// ///////////// RM Failover related records //////////////////////// ////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 0c272e2b5636f..847919091cfbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -440,6 +440,12 @@ message DeregisterSubClustersProto { optional string subClusterState = 5; } +message FederationQueueWeightProto { + optional string routerWeight = 1; + optional string amrmWeight = 2; + optional string headRoomAlpha = 3; +} + //////////////////////////////////////////////////////////////////////// ////// From AM_RM_Protocol ///////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java index a571f7c52757f..1c5873f28bd66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java @@ -38,6 +38,11 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStreamWriter; @@ -48,14 +53,34 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkHeadRoomAlphaValid; +import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkSubClusterQueueWeightRatioValid; + public class RouterCLI extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory.getLogger(RouterCLI.class); + protected final static Map ADMIN_USAGE = - ImmutableMap.builder().put("-deregisterSubCluster", - new UsageInfo("[-sc|--subClusterId [subCluster Id]]", + ImmutableMap.builder() + // Command1: deregisterSubCluster + .put("-deregisterSubCluster", new UsageInfo( + "[-sc|--subClusterId [subCluster Id]]", "Deregister SubCluster, If the interval between the heartbeat time of the subCluster " + "and the current time exceeds the timeout period, " + - "set the state of the subCluster to SC_LOST")).build(); + "set the state of the subCluster to SC_LOST.")) + // Command2: policy + .put("-policy", new UsageInfo( + "[-s|--save [queue;router weight;amrm weight;headroomalpha]]", + "We provide a set of commands for Policy:" + + " Include list policies, save policies, batch save policies. " + + " (Note: The policy type will be directly read from the" + + " yarn.federation.policy-manager in the local yarn-site.xml.)" + + " eg. (routeradmin -policy [-s|--save] root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0)")) + .build(); + + // Common Constant + private static final String SEMICOLON = ";"; // Command Constant private static final String CMD_EMPTY = ""; @@ -74,6 +99,12 @@ public class RouterCLI extends Configured implements Tool { private static final String CMD_DEREGISTERSUBCLUSTER = "-deregisterSubCluster"; private static final String CMD_HELP = "-help"; + // Command2: policy + // save policy + private static final String OPTION_S = "s"; + private static final String OPTION_SAVE = "save"; + private static final String CMD_POLICY = "-policy"; + public RouterCLI() { super(); } @@ -128,9 +159,10 @@ private static void printHelp() { summary.append("routeradmin is the command to execute ") .append("YARN Federation administrative commands.\n") .append("The full syntax is: \n\n") - .append("routeradmin") - .append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]") - .append(" [-help [cmd]]").append("\n"); + .append("routeradmin\n") + .append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]\n") + .append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha]]\n") + .append(" [-help [cmd]]").append("\n"); StringBuilder helpBuilder = new StringBuilder(); System.out.println(summary); @@ -260,6 +292,98 @@ private int deregisterSubCluster() throws IOException, YarnException { return EXIT_SUCCESS; } + private int handlePolicy(String[] args) + throws IOException, YarnException, ParseException { + + // Prepare Options. + Options opts = new Options(); + opts.addOption("policy", false, + "We provide a set of commands for Policy Include list policies, " + + "save policies, batch save policies."); + Option saveOpt = new Option(OPTION_S, OPTION_SAVE, true, + "We will save the policy information of the queue, " + + "including queue and weight information"); + saveOpt.setOptionalArg(true); + opts.addOption(saveOpt); + + // Parse command line arguments. + CommandLine cliParser; + try { + cliParser = new GnuParser().parse(opts, args); + } catch (MissingArgumentException ex) { + System.out.println("Missing argument for options"); + printUsage(args[0]); + return EXIT_ERROR; + } + + // Try to parse the cmd save. + if (cliParser.hasOption(OPTION_S) || cliParser.hasOption(OPTION_SAVE)) { + String policy = cliParser.getOptionValue(OPTION_S); + if (StringUtils.isBlank(policy)) { + policy = cliParser.getOptionValue(OPTION_SAVE); + } + return handleSavePolicy(policy); + } + + return EXIT_ERROR; + } + + private int handleSavePolicy(String policy) { + LOG.info("Save Federation Policy = {}.", policy); + try { + SaveFederationQueuePolicyRequest request = parsePolicy(policy); + ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); + SaveFederationQueuePolicyResponse response = adminProtocol.saveFederationQueuePolicy(request); + System.out.println(response.getMessage()); + return EXIT_SUCCESS; + } catch (YarnException | IOException e) { + LOG.error("handleSavePolicy error.", e); + return EXIT_ERROR; + } + } + + /** + * We will parse the policy, and it has specific formatting requirements. + * + * 1. queue,router weight,amrm weight,headroomalpha {@link FederationQueueWeight}. + * 2. the sum of weights for all sub-clusters in routerWeight/amrmWeight should be 1. + * + * @param policy queue weight. + * @return If the conversion is correct, we will get the FederationQueueWeight, + * otherwise an exception will be thrown. + * @throws YarnException exceptions from yarn servers. + */ + protected SaveFederationQueuePolicyRequest parsePolicy(String policy) throws YarnException { + + String[] policyItems = policy.split(SEMICOLON); + if (policyItems == null || policyItems.length != 4) { + throw new YarnException("The policy cannot be empty or the policy is incorrect. \n" + + " Required information to provide: queue,router weight,amrm weight,headroomalpha \n" + + " eg. root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0"); + } + + String queue = policyItems[0]; + String routerWeight = policyItems[1]; + String amrmWeight = policyItems[2]; + String headroomalpha = policyItems[3]; + + LOG.info("Policy: [Queue = {}, RouterWeight = {}, AmRmWeight = {}, Headroomalpha = {}]", + queue, routerWeight, amrmWeight, headroomalpha); + + checkSubClusterQueueWeightRatioValid(routerWeight); + checkSubClusterQueueWeightRatioValid(amrmWeight); + checkHeadRoomAlphaValid(headroomalpha); + + FederationQueueWeight federationQueueWeight = + FederationQueueWeight.newInstance(routerWeight, amrmWeight, headroomalpha); + String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER, + YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER); + SaveFederationQueuePolicyRequest request = SaveFederationQueuePolicyRequest.newInstance( + queue, federationQueueWeight, policyManager); + + return request; + } + @Override public int run(String[] args) throws Exception { YarnConfiguration yarnConf = getConf() == null ? @@ -287,6 +411,10 @@ public int run(String[] args) throws Exception { return handleDeregisterSubCluster(args); } + if (CMD_POLICY.equals(cmd)) { + return handlePolicy(args); + } + return EXIT_SUCCESS; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java index 157a33cdf6291..c7989e11af561 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java @@ -19,11 +19,16 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; import org.junit.Before; import org.junit.Test; import org.mockito.stubbing.Answer; @@ -35,6 +40,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,6 +55,7 @@ public class TestRouterCLI { public void setup() throws Exception { admin = mock(ResourceManagerAdministrationProtocol.class); + when(admin.deregisterSubCluster(any(DeregisterSubClusterRequest.class))) .thenAnswer((Answer) invocationOnMock -> { // Step1. parse subClusterId. @@ -63,6 +70,14 @@ public void setup() throws Exception { } }); + when(admin.saveFederationQueuePolicy(any(SaveFederationQueuePolicyRequest.class))) + .thenAnswer((Answer) invocationOnMock -> { + // Step1. parse subClusterId. + Object obj = invocationOnMock.getArgument(0); + SaveFederationQueuePolicyRequest request = (SaveFederationQueuePolicyRequest) obj; + return SaveFederationQueuePolicyResponse.newInstance("success"); + }); + Configuration config = new Configuration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); @@ -114,6 +129,9 @@ public void testHelp() throws Exception { args = new String[]{"-help", "-deregisterSubCluster"}; rmAdminCLI.run(args); + + args = new String[]{"-help", "-policy"}; + rmAdminCLI.run(args); } @Test @@ -152,4 +170,51 @@ public void testDeregisterSubClusters() throws Exception { assertEquals(0, rmAdminCLI.run(args)); } + + @Test + public void testParsePolicy() throws Exception { + // Case1, If policy is empty. + String errMsg1 = "The policy cannot be empty or the policy is incorrect. \n" + + " Required information to provide: queue,router weight,amrm weight,headroomalpha \n" + + " eg. root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0"; + LambdaTestUtils.intercept(YarnException.class, errMsg1, () -> rmAdminCLI.parsePolicy("")); + + // Case2, If policy is incomplete, We need 4 items, but only 2 of them are provided. + LambdaTestUtils.intercept(YarnException.class, errMsg1, + () -> rmAdminCLI.parsePolicy("root.a;SC-1:0.1,SC-2:0.9;")); + + // Case3, If policy is incomplete, The weight of a subcluster is missing. + String errMsg2 = "The subClusterWeight cannot be empty, " + + "and the subClusterWeight size must be 2. (eg.SC-1,0.2)"; + LambdaTestUtils.intercept(YarnException.class, errMsg2, + () -> rmAdminCLI.parsePolicy("root.a;SC-1:0.1,SC-2;SC-1:0.1,SC-2;0.3,1.0")); + + // Case4, The policy is complete, but the sum of weights for each subcluster is not equal to 1. + String errMsg3 = "The sum of ratios for all subClusters must be equal to 1."; + LambdaTestUtils.intercept(YarnException.class, errMsg3, + () -> rmAdminCLI.parsePolicy("root.a;SC-1:0.1,SC-2:0.8;SC-1:0.1,SC-2;0.3,1.0")); + + // If policy is root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0 + String policy = "root.a;SC-1:0.7,SC-2:0.3;SC-1:0.6,SC-2:0.4;1.0"; + SaveFederationQueuePolicyRequest request = rmAdminCLI.parsePolicy(policy); + FederationQueueWeight federationQueueWeight = request.getFederationQueueWeight(); + assertNotNull(federationQueueWeight); + assertEquals("SC-1:0.7,SC-2:0.3", federationQueueWeight.getRouterWeight()); + assertEquals("SC-1:0.6,SC-2:0.4", federationQueueWeight.getAmrmWeight()); + assertEquals("1.0", federationQueueWeight.getHeadRoomAlpha()); + } + + @Test + public void testSavePolicy() throws Exception { + PrintStream oldOutPrintStream = System.out; + ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(dataOut)); + oldOutPrintStream.println(dataOut); + + String[] args = {"-policy", "-s", "root.a;SC-1:0.1,SC-2:0.9;SC-1:0.7,SC-2:0.3;1.0"}; + assertEquals(0, rmAdminCLI.run(args)); + + args = new String[]{"-policy", "-save", "root.a;SC-1:0.1,SC-2:0.9;SC-1:0.7,SC-2:0.3;1.0"}; + assertEquals(0, rmAdminCLI.run(args)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index 3d64db4f4f1bf..3242fe1400bdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; @@ -78,6 +79,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; @@ -108,6 +111,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -362,4 +367,18 @@ public DeregisterSubClusterResponse deregisterSubCluster( return null; } } + + @Override + public SaveFederationQueuePolicyResponse saveFederationQueuePolicy( + SaveFederationQueuePolicyRequest request) throws YarnException, IOException { + SaveFederationQueuePolicyRequestProto requestProto = + ((SaveFederationQueuePolicyRequestPBImpl) request).getProto(); + try { + return new SaveFederationQueuePolicyResponsePBImpl( + proxy.saveFederationQueuePolicy(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index f6c48a289221a..e393523ed2e33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyResponseProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; @@ -75,6 +77,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; @@ -105,6 +109,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -379,4 +385,18 @@ public DeregisterSubClusterResponseProto deregisterSubCluster(RpcController cont throw new ServiceException(e); } } + + @Override + public SaveFederationQueuePolicyResponseProto saveFederationQueuePolicy(RpcController controller, + SaveFederationQueuePolicyRequestProto proto) throws ServiceException { + SaveFederationQueuePolicyRequest request = new SaveFederationQueuePolicyRequestPBImpl(proto); + try { + SaveFederationQueuePolicyResponse response = real.saveFederationQueuePolicy(request); + return ((SaveFederationQueuePolicyResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java new file mode 100644 index 0000000000000..4ca7f783bd97e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; + +@Private +@Unstable +public class FederationQueueWeightPBImpl extends FederationQueueWeight { + + private FederationQueueWeightProto proto = FederationQueueWeightProto.getDefaultInstance(); + private FederationQueueWeightProto.Builder builder = null; + private boolean viaProto = false; + + public FederationQueueWeightPBImpl() { + this.builder = FederationQueueWeightProto.newBuilder(); + } + + public FederationQueueWeightPBImpl(FederationQueueWeightProto proto) { + this.proto = proto; + this.viaProto = true; + } + + private synchronized void maybeInitBuilder() { + if (this.viaProto || this.builder == null) { + this.builder = FederationQueueWeightProto.newBuilder(proto); + } + this.viaProto = false; + } + + public FederationQueueWeightProto getProto() { + this.proto = this.viaProto ? this.proto : this.builder.build(); + this.viaProto = true; + return this.proto; + } + + @Override + public String getRouterWeight() { + FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder; + boolean hasRouterWeight = p.hasRouterWeight(); + if (hasRouterWeight) { + return p.getRouterWeight(); + } + return null; + } + + @Override + public void setRouterWeight(String routerWeight) { + maybeInitBuilder(); + if (routerWeight == null) { + builder.clearRouterWeight(); + return; + } + builder.setRouterWeight(routerWeight); + } + + @Override + public String getAmrmWeight() { + FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder; + boolean hasAmrmWeight = p.hasAmrmWeight(); + if (hasAmrmWeight) { + return p.getAmrmWeight(); + } + return null; + } + + @Override + public void setAmrmWeight(String amrmWeight) { + maybeInitBuilder(); + if (amrmWeight == null) { + builder.clearAmrmWeight(); + return; + } + builder.setAmrmWeight(amrmWeight); + } + + @Override + public String getHeadRoomAlpha() { + FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder; + boolean hasHeadRoomAlpha = p.hasHeadRoomAlpha(); + if (hasHeadRoomAlpha) { + return p.getHeadRoomAlpha(); + } + return null; + } + + @Override + public void setHeadRoomAlpha(String headRoomAlpha) { + maybeInitBuilder(); + if (headRoomAlpha == null) { + builder.clearHeadRoomAlpha(); + return; + } + builder.setHeadRoomAlpha(headRoomAlpha); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof FederationQueueWeight)) { + return false; + } + FederationQueueWeightPBImpl otherImpl = this.getClass().cast(other); + return new EqualsBuilder() + .append(this.getProto(), otherImpl.getProto()) + .isEquals(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyRequestPBImpl.java new file mode 100644 index 0000000000000..e97f1dab5c2f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyRequestPBImpl.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; + +@Private +@Unstable +public class SaveFederationQueuePolicyRequestPBImpl extends SaveFederationQueuePolicyRequest { + + private SaveFederationQueuePolicyRequestProto proto = + SaveFederationQueuePolicyRequestProto.getDefaultInstance(); + private SaveFederationQueuePolicyRequestProto.Builder builder = null; + private boolean viaProto = false; + private FederationQueueWeight federationQueueWeight = null; + + public SaveFederationQueuePolicyRequestPBImpl() { + builder = SaveFederationQueuePolicyRequestProto.newBuilder(); + } + + public SaveFederationQueuePolicyRequestPBImpl(SaveFederationQueuePolicyRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SaveFederationQueuePolicyRequestProto.newBuilder(proto); + } + viaProto = false; + } + + public SaveFederationQueuePolicyRequestProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SaveFederationQueuePolicyRequest)) { + return false; + } + + SaveFederationQueuePolicyRequestPBImpl otherImpl = this.getClass().cast(other); + return new EqualsBuilder() + .append(this.getProto(), otherImpl.getProto()) + .isEquals(); + } + + @Override + public FederationQueueWeight getFederationQueueWeight() { + if (this.federationQueueWeight != null) { + return this.federationQueueWeight; + } + SaveFederationQueuePolicyRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFederationQueueWeight()) { + return null; + } + this.federationQueueWeight = convertFromProtoFormat(p.getFederationQueueWeight()); + return this.federationQueueWeight; + } + + @Override + public void setFederationQueueWeight(FederationQueueWeight pFederationQueueWeight) { + if (pFederationQueueWeight == null) { + throw new IllegalArgumentException("FederationQueueWeight cannot be null."); + } + maybeInitBuilder(); + this.federationQueueWeight = pFederationQueueWeight; + mergeLocalToBuilder(); + } + + private void mergeLocalToBuilder() { + if (this.federationQueueWeight != null) { + builder.setFederationQueueWeight(convertToProtoFormat(this.federationQueueWeight)); + } + } + + @Override + public String getQueue() { + SaveFederationQueuePolicyRequestProtoOrBuilder p = viaProto ? proto : builder; + boolean hasQueue = p.hasQueue(); + if (hasQueue) { + return p.getQueue(); + } + return null; + } + + @Override + public void setQueue(String queue) { + maybeInitBuilder(); + if (queue == null) { + builder.clearQueue(); + return; + } + builder.setQueue(queue); + } + + @Override + public String getPolicyManagerClassName() { + SaveFederationQueuePolicyRequestProtoOrBuilder p = viaProto ? proto : builder; + boolean hasPolicyManagerClassName = p.hasPolicyManagerClassName(); + if (hasPolicyManagerClassName) { + return p.getPolicyManagerClassName(); + } + return null; + } + + @Override + public void setPolicyManagerClassName(String className) { + maybeInitBuilder(); + if (className == null) { + builder.clearPolicyManagerClassName(); + return; + } + builder.setPolicyManagerClassName(className); + } + + private FederationQueueWeightProto convertToProtoFormat( + FederationQueueWeight pFederationQueueWeight) { + return ((FederationQueueWeightPBImpl) pFederationQueueWeight).getProto(); + } + + private FederationQueueWeight convertFromProtoFormat( + FederationQueueWeightProto federationQueueWeightProto) { + return new FederationQueueWeightPBImpl(federationQueueWeightProto); + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyResponsePBImpl.java new file mode 100644 index 0000000000000..3221c678c2338 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyResponsePBImpl.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; + +@Private +@Unstable +public class SaveFederationQueuePolicyResponsePBImpl extends SaveFederationQueuePolicyResponse { + + private SaveFederationQueuePolicyResponseProto proto = + SaveFederationQueuePolicyResponseProto.getDefaultInstance(); + private SaveFederationQueuePolicyResponseProto.Builder builder = null; + private boolean viaProto = false; + + public SaveFederationQueuePolicyResponsePBImpl() { + builder = SaveFederationQueuePolicyResponseProto.newBuilder(); + } + + public SaveFederationQueuePolicyResponsePBImpl(SaveFederationQueuePolicyResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SaveFederationQueuePolicyResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public String getMessage() { + SaveFederationQueuePolicyResponseProtoOrBuilder p = viaProto ? proto : builder; + boolean hasMessage = p.hasMessage(); + if (hasMessage) { + return p.getMessage(); + } + return null; + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SaveFederationQueuePolicyResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public void setMessage(String msg) { + maybeInitBuilder(); + if (msg == null) { + builder.clearMessage(); + return; + } + builder.setMessage(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java index ad03fb09a4e85..02af630eaf88a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java @@ -84,4 +84,9 @@ public boolean equals(Object obj) { public int hashCode() { return new HashCodeBuilder().append(this.id).toHashCode(); } + + @Override + public String toString() { + return "SubClusterIdInfo{ id='" + id + '\'' + '}'; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 50460e7340431..d1b08ecaed5bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -311,6 +312,18 @@ public SubClusterPolicyConfiguration getPolicyConfiguration(final String queue) } } + /** + * Set a policy configuration into the state store. + * + * @param policyConf the policy configuration to set + * @throws YarnException if the request is invalid/fails + */ + public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf) + throws YarnException { + stateStore.setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest.newInstance(policyConf)); + } + /** * Get the policies that is represented as * {@link SubClusterPolicyConfiguration} for all currently active queues in diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index c0ca3b5d8a5ea..14bb941c9d61e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -175,6 +175,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; /** @@ -964,6 +966,12 @@ public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterReq return null; } + @Override + public SaveFederationQueuePolicyResponse saveFederationQueuePolicy( + SaveFederationQueuePolicyRequest request) throws YarnException, IOException { + return null; + } + @VisibleForTesting public HashMap> getApplicationContainerIdMap() { return applicationContainerIdMap; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index e50550901afa2..d9eade8fe2d69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -97,6 +97,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; @@ -1054,6 +1056,13 @@ public DeregisterSubClusterResponse deregisterSubCluster( "Please call Router's deregisterSubCluster to set."); } + @Override + public SaveFederationQueuePolicyResponse saveFederationQueuePolicy( + SaveFederationQueuePolicyRequest request) throws YarnException, IOException { + throw new YarnException("It is not allowed to call the RM's saveFederationQueuePolicy. " + + " Please call Router's deregisterSubCluster to set Policy."); + } + private void validateAttributesExists( List nodesToAttributes) throws IOException { NodeAttributesManager nodeAttributesManager = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 17a9cfc71ebe5..f9769f7026c3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -149,6 +149,8 @@ public final class RouterMetrics { private MutableGaugeInt numRefreshUserToGroupsMappingsFailedRetrieved; @Metric("# of deregisterSubCluster failed to be retrieved") private MutableGaugeInt numDeregisterSubClusterFailedRetrieved; + @Metric("# of saveFederationQueuePolicy failed to be retrieved") + private MutableGaugeInt numSaveFederationQueuePolicyFailedRetrieved; @Metric("# of refreshAdminAcls failed to be retrieved") private MutableGaugeInt numRefreshAdminAclsFailedRetrieved; @Metric("# of refreshServiceAcls failed to be retrieved") @@ -295,6 +297,8 @@ public final class RouterMetrics { private MutableRate totalSucceededGetSchedulerInfoRetrieved; @Metric("Total number of successful Retrieved DeregisterSubCluster and latency(ms)") private MutableRate totalSucceededDeregisterSubClusterRetrieved; + @Metric("Total number of successful Retrieved SaveFederationQueuePolicy and latency(ms)") + private MutableRate totalSucceededSaveFederationQueuePolicyRetrieved; @Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)") private MutableRate totalSucceededRefreshAdminAclsRetrieved; @Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)") @@ -381,6 +385,7 @@ public final class RouterMetrics { private MutableQuantiles refreshSuperUserGroupsConfLatency; private MutableQuantiles refreshUserToGroupsMappingsLatency; private MutableQuantiles refreshDeregisterSubClusterLatency; + private MutableQuantiles saveFederationQueuePolicyLatency; private MutableQuantiles refreshAdminAclsLatency; private MutableQuantiles refreshServiceAclsLatency; private MutableQuantiles replaceLabelsOnNodesLatency; @@ -592,6 +597,9 @@ private RouterMetrics() { refreshDeregisterSubClusterLatency = registry.newQuantiles("refreshDeregisterSubClusterLatency", "latency of deregister subcluster timeouts", "ops", "latency", 10); + saveFederationQueuePolicyLatency = registry.newQuantiles("saveFederationQueuePolicyLatency", + "latency of refresh subcluster timeouts", "ops", "latency", 10); + refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency", "latency of refresh admin acls timeouts", "ops", "latency", 10); @@ -921,6 +929,11 @@ public long getNumSucceededDeregisterSubClusterRetrieved() { return totalSucceededDeregisterSubClusterRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededSaveFederationQueuePolicyRetrieved() { + return totalSucceededSaveFederationQueuePolicyRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededRefreshAdminAclsRetrieved() { return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples(); @@ -1266,6 +1279,11 @@ public double getLatencySucceededDeregisterSubClusterRetrieved() { return totalSucceededDeregisterSubClusterRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededSaveFederationQueuePolicyRetrieved() { + return totalSucceededSaveFederationQueuePolicyRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededRefreshAdminAclsRetrieved() { return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean(); @@ -1561,6 +1579,10 @@ public int getDeregisterSubClusterFailedRetrieved() { return numDeregisterSubClusterFailedRetrieved.value(); } + public int getSaveFederationQueuePolicyFailedRetrieved() { + return numSaveFederationQueuePolicyFailedRetrieved.value(); + } + public int getNumRefreshAdminAclsFailedRetrieved() { return numRefreshAdminAclsFailedRetrieved.value(); } @@ -1913,6 +1935,11 @@ public void succeededDeregisterSubClusterRetrieved(long duration) { refreshDeregisterSubClusterLatency.add(duration); } + public void succeededSaveFederationQueuePolicyRetrieved(long duration) { + totalSucceededSaveFederationQueuePolicyRetrieved.add(duration); + saveFederationQueuePolicyLatency.add(duration); + } + public void succeededRefreshAdminAclsRetrieved(long duration) { totalSucceededRefreshAdminAclsRetrieved.add(duration); refreshAdminAclsLatency.add(duration); @@ -2191,6 +2218,10 @@ public void incrDeregisterSubClusterFailedRetrieved() { numDeregisterSubClusterFailedRetrieved.incr(); } + public void incrSaveFederationQueuePolicyFailedRetrieved() { + numSaveFederationQueuePolicyFailedRetrieved.incr(); + } + public void incrRefreshAdminAclsFailedRetrieved() { numRefreshAdminAclsFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java index df5b4d5835df9..20aa0dda5fdb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java @@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,4 +211,10 @@ public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterReq throws YarnException, IOException { return rmAdminProxy.deregisterSubCluster(request); } + + @Override + public SaveFederationQueuePolicyResponse saveFederationQueuePolicy( + SaveFederationQueuePolicyRequest request) throws YarnException, IOException { + return rmAdminProxy.saveFederationQueuePolicy(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index 1f4cdc0a7f9cc..8125d4eb2b99c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -63,10 +63,16 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters; +import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.router.RouterMetrics; import org.apache.hadoop.yarn.server.router.RouterServerUtil; @@ -79,6 +85,7 @@ import java.util.List; import java.util.ArrayList; import java.util.Map; +import java.util.HashMap; import java.util.Collection; import java.util.Set; import java.util.Date; @@ -96,6 +103,9 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep private static final Logger LOG = LoggerFactory.getLogger(FederationRMAdminInterceptor.class); + private static final String COMMA = ","; + private static final String COLON = ":"; + private Map adminRMProxies; private FederationStateStoreFacade federationFacade; private final Clock clock = new MonotonicClock(); @@ -855,13 +865,118 @@ public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterReq } catch (Exception e) { routerMetrics.incrDeregisterSubClusterFailedRetrieved(); RouterServerUtil.logAndThrowException(e, - "Unable to deregisterSubCluster due to exception. " + e.getMessage()); + "Unable to deregisterSubCluster due to exception. " + e.getMessage()); } routerMetrics.incrDeregisterSubClusterFailedRetrieved(); throw new YarnException("Unable to deregisterSubCluster."); } + /** + * Save the Queue Policy for the Federation. + * + * @param request saveFederationQueuePolicy Request. + * @return Response from saveFederationQueuePolicy. + * @throws YarnException exceptions from yarn servers. + * @throws IOException if an IO error occurred. + */ + @Override + public SaveFederationQueuePolicyResponse saveFederationQueuePolicy( + SaveFederationQueuePolicyRequest request) throws YarnException, IOException { + + // Parameter validation. + + if (request == null) { + routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing SaveFederationQueuePolicy request.", null); + } + + FederationQueueWeight federationQueueWeight = request.getFederationQueueWeight(); + if (federationQueueWeight == null) { + routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing FederationQueueWeight information.", null); + } + + String queue = request.getQueue(); + if (StringUtils.isBlank(queue)) { + routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing Queue information.", null); + } + + String amRmWeight = federationQueueWeight.getAmrmWeight(); + FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight); + + String routerWeight = federationQueueWeight.getRouterWeight(); + FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerWeight); + + String headRoomAlpha = federationQueueWeight.getHeadRoomAlpha(); + FederationQueueWeight.checkHeadRoomAlphaValid(headRoomAlpha); + + try { + long startTime = clock.getTime(); + // Step1, get parameters. + String policyManagerClassName = request.getPolicyManagerClassName(); + + + // Step2, parse amRMPolicyWeights. + Map amRMPolicyWeights = getSubClusterWeightMap(amRmWeight); + LOG.debug("amRMPolicyWeights = {}.", amRMPolicyWeights); + + // Step3, parse routerPolicyWeights. + Map routerPolicyWeights = getSubClusterWeightMap(routerWeight); + LOG.debug("routerWeights = {}.", amRMPolicyWeights); + + // Step4, Initialize WeightedPolicyInfo. + WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo(); + weightedPolicyInfo.setHeadroomAlpha(Float.parseFloat(headRoomAlpha)); + weightedPolicyInfo.setAMRMPolicyWeights(amRMPolicyWeights); + weightedPolicyInfo.setRouterPolicyWeights(routerPolicyWeights); + + // Step5, Set SubClusterPolicyConfiguration. + SubClusterPolicyConfiguration policyConfiguration = + SubClusterPolicyConfiguration.newInstance(queue, policyManagerClassName, + weightedPolicyInfo.toByteBuffer()); + federationFacade.setPolicyConfiguration(policyConfiguration); + long stopTime = clock.getTime(); + routerMetrics.succeededSaveFederationQueuePolicyRetrieved(stopTime - startTime); + return SaveFederationQueuePolicyResponse.newInstance("save policy success."); + } catch (Exception e) { + routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to saveFederationQueuePolicy due to exception. " + e.getMessage()); + } + + routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved(); + throw new YarnException("Unable to saveFederationQueuePolicy."); + } + + /** + * Get the Map of SubClusterWeight. + * + * This method can parse the Weight information of Router and + * the Weight information of AMRMProxy. + * + * An example of a parsed string is as follows: + * SC-1:0.7,SC-2:0.3 + * + * @param policyWeight policyWeight. + * @return Map of SubClusterWeight. + */ + private Map getSubClusterWeightMap(String policyWeight) + throws YarnException { + FederationQueueWeight.checkSubClusterQueueWeightRatioValid(policyWeight); + Map result = new HashMap<>(); + String[] policyWeights = policyWeight.split(COMMA); + for (String policyWeightItem : policyWeights) { + String[] subClusterWeight = policyWeightItem.split(COLON); + String subClusterId = subClusterWeight[0]; + SubClusterIdInfo subClusterIdInfo = new SubClusterIdInfo(subClusterId); + String weight = subClusterWeight[1]; + result.put(subClusterIdInfo, Float.valueOf(weight)); + } + return result; + } + /** * deregisterSubCluster by SubClusterId. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java index a436cc1b275d5..3c63ee9bd7f69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -65,6 +65,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider; import org.apache.hadoop.yarn.util.LRUCacheHashMap; @@ -392,4 +394,11 @@ public DeregisterSubClusterResponse deregisterSubCluster( RequestInterceptorChainWrapper pipeline = getInterceptorChain(); return pipeline.getRootInterceptor().deregisterSubCluster(request); } + + @Override + public SaveFederationQueuePolicyResponse saveFederationQueuePolicy( + SaveFederationQueuePolicyRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().saveFederationQueuePolicy(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index bfa3afb9b3abd..c53ecdd472e2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -623,6 +623,11 @@ public void getGroupsForUserFailed() { LOG.info("Mocked: failed getGroupsForUser call"); metrics.incrGetGroupsForUserFailedRetrieved(); } + + public void getSaveFederationQueuePolicyFailedRetrieved() { + LOG.info("Mocked: failed refreshClusterMaxPriority call"); + metrics.incrSaveFederationQueuePolicyFailedRetrieved(); + } } // Records successes for all calls @@ -952,6 +957,12 @@ public void getGroupsForUsersRetrieved(long duration) { duration); metrics.succeededGetGroupsForUsersRetrieved(duration); } + + public void getSaveFederationQueuePolicyRetrieved(long duration) { + LOG.info("Mocked: successful SaveFederationQueuePolicy call with duration {}", + duration); + metrics.succeededSaveFederationQueuePolicyRetrieved(duration); + } } @Test @@ -2208,4 +2219,26 @@ public void testGetGroupsForUserRetrieved() { Assert.assertEquals(225, metrics.getLatencySucceededGetGroupsForUsersRetrieved(), ASSERT_DOUBLE_DELTA); } + + @Test + public void testSaveFederationQueuePolicyFailedRetrieved() { + long totalBadBefore = metrics.getSaveFederationQueuePolicyFailedRetrieved(); + badSubCluster.getSaveFederationQueuePolicyFailedRetrieved(); + Assert.assertEquals(totalBadBefore + 1, metrics.getSaveFederationQueuePolicyFailedRetrieved()); + } + + @Test + public void testSaveFederationQueuePolicyRetrieved() { + long totalGoodBefore = metrics.getNumSucceededSaveFederationQueuePolicyRetrieved(); + goodSubCluster.getSaveFederationQueuePolicyRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededSaveFederationQueuePolicyRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededSaveFederationQueuePolicyRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getSaveFederationQueuePolicyRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededSaveFederationQueuePolicyRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededSaveFederationQueuePolicyRetrieved(), ASSERT_DOUBLE_DELTA); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java index 99c098a1b4203..da44923128af9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java @@ -52,6 +52,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; /** * Mock interceptor that does not do anything other than forwarding it to the @@ -161,4 +163,10 @@ public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterReq throws YarnException, IOException { return getNextInterceptor().deregisterSubCluster(request); } + + @Override + public SaveFederationQueuePolicyResponse saveFederationQueuePolicy( + SaveFederationQueuePolicyRequest request) throws YarnException, IOException { + return getNextInterceptor().saveFederationQueuePolicy(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index 2cfab9ca681c2..2c4cda2d2a9e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -56,8 +56,15 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.junit.Assert; @@ -66,6 +73,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -603,4 +611,93 @@ public void testGetGroupsForUserNormalRequest() throws Exception { assertEquals(1, groups.length); assertEquals("admin", groups[0]); } + + @Test + public void testSaveFederationQueuePolicyErrorRequest() throws Exception { + // null request. + LambdaTestUtils.intercept(YarnException.class, "Missing SaveFederationQueuePolicy request.", + () -> interceptor.saveFederationQueuePolicy(null)); + + // federationQueueWeight is null. + LambdaTestUtils.intercept( + IllegalArgumentException.class, "FederationQueueWeight cannot be null.", + () -> SaveFederationQueuePolicyRequest.newInstance("root.a", null, "-")); + + // queue is null + FederationQueueWeight federationQueueWeight = + FederationQueueWeight.newInstance("SC-1:0.7,SC-2:0.3", "SC-1:0.7,SC-2:0.3", "1.0"); + SaveFederationQueuePolicyRequest request = + SaveFederationQueuePolicyRequest.newInstance("", federationQueueWeight, "-"); + LambdaTestUtils.intercept(YarnException.class, "Missing Queue information.", + () -> interceptor.saveFederationQueuePolicy(request)); + + // routerWeight / amrmWeight + // The sum of the routerWeight is not equal to 1. + FederationQueueWeight federationQueueWeight2 = FederationQueueWeight.newInstance( + "SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0"); + SaveFederationQueuePolicyRequest request2 = + SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2, "-"); + LambdaTestUtils.intercept(YarnException.class, + "The sum of ratios for all subClusters must be equal to 1.", + () -> interceptor.saveFederationQueuePolicy(request2)); + } + + @Test + public void testSaveFederationQueuePolicyRequest() throws IOException, YarnException { + + // We design unit tests, including 2 SubCluster (SC-1, SC-2) + // Router Weight: SC-1=0.7,SC-2=0.3 + // AMRM Weight: SC-1=0.6,SC-2=0.4 + // headRoomAlpha: 1.0 + String queue = "root.a"; + String subCluster1 = "SC-1"; + String subCluster2 = "SC-2"; + String routerWeight = "SC-1:0.7,SC-2:0.3"; + String amrmWeight = "SC-1:0.6,SC-2:0.4"; + String headRoomAlpha = "1.0"; + + // Step1. Write FederationQueue information to stateStore. + String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName(); + FederationQueueWeight federationQueueWeight = + FederationQueueWeight.newInstance(routerWeight, amrmWeight, headRoomAlpha); + SaveFederationQueuePolicyRequest request = + SaveFederationQueuePolicyRequest.newInstance(queue, federationQueueWeight, policyTypeName); + SaveFederationQueuePolicyResponse response = interceptor.saveFederationQueuePolicy(request); + assertNotNull(response); + assertEquals("save policy success.", response.getMessage()); + + // Step2. We query Policy information from FederationStateStore. + FederationStateStoreFacade federationFacade = interceptor.getFederationFacade(); + SubClusterPolicyConfiguration policyConfiguration = + federationFacade.getPolicyConfiguration(queue); + assertNotNull(policyConfiguration); + assertEquals(queue, policyConfiguration.getQueue()); + + ByteBuffer params = policyConfiguration.getParams(); + assertNotNull(params); + WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params); + assertNotNull(weightedPolicyInfo); + + SubClusterIdInfo sc1 = new SubClusterIdInfo(subCluster1); + SubClusterIdInfo sc2 = new SubClusterIdInfo(subCluster2); + + // Step3. We will compare the accuracy of routerPolicyWeights and amrmPolicyWeights. + Map routerPolicyWeights = weightedPolicyInfo.getRouterPolicyWeights(); + Float sc1Weight = routerPolicyWeights.get(sc1); + assertNotNull(sc1Weight); + assertEquals(0.7f, sc1Weight.floatValue(), 0.00001); + + Float sc2Weight = routerPolicyWeights.get(sc2); + assertNotNull(sc2Weight); + assertEquals(0.3f, sc2Weight.floatValue(), 0.00001); + + Map amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights(); + Float sc1AMRMWeight = amrmPolicyWeights.get(sc1); + assertNotNull(sc1AMRMWeight); + assertEquals(0.6f, sc1AMRMWeight.floatValue(), 0.00001); + + Float sc2AMRMWeight = amrmPolicyWeights.get(sc2); + assertNotNull(sc2AMRMWeight); + assertEquals(0.4f, sc2AMRMWeight.floatValue(), 0.00001); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md index e24f30e70958e..bdd9d82859a90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md @@ -235,9 +235,10 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. |`yarn.federation.subcluster-resolver.class` | `org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl` | The class used to resolve which subcluster a node belongs to, and which subcluster(s) a rack belongs to. | |`yarn.federation.machine-list` | `` | Path of machine-list file used by `SubClusterResolver`. Each line of the file is a node with sub-cluster and rack information. Below is the example:

node1, subcluster1, rack1
node2, subcluster2, rack1
node3, subcluster3, rack2
node4, subcluster3, rack2 | -**How to configure the policy-manager?** +How to configure the policy-manager? +-------------------- -- Router Policy +Router Policy Router Policy defines the logic for determining the routing of an application submission and determines the HomeSubCluster for the application. @@ -263,7 +264,7 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. - WeightedRandomRouterPolicy - This policy implements a weighted random sample among currently active sub-clusters. -- AMRM Policy +AMRM Policy AMRM Proxy defines the logic to split the resource request list received by AM among RMs. @@ -282,7 +283,7 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. - RejectAMRMProxyPolicy - This policy simply rejects all requests. Useful to prevent apps from accessing any sub-cluster. -- Policy Manager +Policy Manager The PolicyManager is providing a combination of RouterPolicy and AMRMPolicy. @@ -316,6 +317,38 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. - WeightedLocalityPolicyManager - Policy that allows operator to configure "weights" for routing. This picks a LocalityRouterPolicy for the router and a LocalityMulticastAMRMProxyPolicy for the amrmproxy as they are designed to work together. +How to configure the queue policy? +-------------------- + +We will provide a set of commands to view and save queue policies. + +The Queue Policy(SubClusterPolicyConfiguration) include the following: + +| Property | Description | +|:---------------|:----------------------------------------------------------------------| +| `queue` | `Queue for Job submission` | +| `policyType` | `Policy Manager Class name, Default is UniformBroadcastPolicyManager` | +| `policyParams` | `It stores serialized objects of WeightedPolicyInfo.` | + +WeightedPolicyInfo include the following: + +- RouterWeight + + Weight for routing applications to different subclusters. We will route the application to different subclusters based on the configured weights. + Assuming we have two subclusters, SC-1 and SC-2, with a weight of 0.7 for SC-1 and 0.3 for SC-2, + the application will be allocated in such a way that 70% of the applications will be assigned to SC-1 and 30% to SC-2. + +- AmRMWeight + + Weight for resource request from ApplicationMaster (AM) to different subclusters' Resource Manager (RM). + Assuming we have two subclusters, SC-1 and SC-2, with a weight of 0.6 for SC-1 and 0.4 for SC-2, + When AM requesting resources, 60% of the requests will be made to the Resource Manager (RM) of SC-1 and 40% to the RM of SC-2. + +- HeadRoomAlpha + + used by policies that balance weight-based and load-based considerations in their decisions. + For policies that use this parameter, values close to 1 indicate that most of the decision should be based on currently observed headroom from various sub-clusters, values close to zero, indicate that the decision should be mostly based on weights and practically ignore current load. + ### ON RMs: These are extra configurations that should appear in the **conf/yarn-site.xml** at each ResourceManager.