diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesRequest.java new file mode 100644 index 00000000000000..c71c61aca9f1a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +/** + * In Federation mode, + * we will support batch save queues policies to FederationStateStore. + */ +@Private +@Unstable +public abstract class BatchSaveFederationQueuePoliciesRequest { + + @Private + @Unstable + public static BatchSaveFederationQueuePoliciesRequest newInstance( + List federationQueueWeights) { + BatchSaveFederationQueuePoliciesRequest request = + Records.newRecord(BatchSaveFederationQueuePoliciesRequest.class); + request.setFederationQueueWeights(federationQueueWeights); + return request; + } + + @Public + @Unstable + public abstract List getFederationQueueWeights(); + + @Private + @Unstable + public abstract void setFederationQueueWeights( + List federationQueueWeights); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java index c63ee1b713d0d9..3b0b8c947bac2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java @@ -75,6 +75,19 @@ public static FederationQueueWeight newInstance(String routerWeight, return federationQueueWeight; } + @Private + @Unstable + public static FederationQueueWeight newInstance(String routerWeight, + String amrmWeight, String headRoomAlpha, String queue, String policyManagerClassName) { + FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class); + federationQueueWeight.setRouterWeight(routerWeight); + federationQueueWeight.setAmrmWeight(amrmWeight); + federationQueueWeight.setHeadRoomAlpha(headRoomAlpha); + federationQueueWeight.setQueue(queue); + federationQueueWeight.setPolicyManagerClassName(policyManagerClassName); + return federationQueueWeight; + } + @Public @Unstable public abstract String getRouterWeight(); @@ -166,4 +179,20 @@ public static void checkHeadRoomAlphaValid(String headRoomAlpha) throws YarnExce protected static boolean isNumeric(String value) { return NumberUtils.isCreatable(value); } + + @Public + @Unstable + public abstract String getQueue(); + + @Public + @Unstable + public abstract void setQueue(String queue); + + @Public + @Unstable + public abstract String getPolicyManagerClassName(); + + @Public + @Unstable + public abstract void setPolicyManagerClassName(String policyManagerClassName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 4e330fb1e632f4..06e11f913b6b1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -180,6 +180,14 @@ message SaveFederationQueuePolicyResponseProto { required string message = 1; } +message BatchSaveFederationQueuePoliciesRequestProto { + repeated FederationQueueWeightProto federationQueueWeights = 1; +} + +message BatchSaveFederationQueuePoliciesResponseProto { + required string message = 1; +} + ////////////////////////////////////////////////////////////////// ///////////// RM Failover related records //////////////////////// ////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 847919091cfbe4..71c102f4f8577d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -444,6 +444,8 @@ message FederationQueueWeightProto { optional string routerWeight = 1; optional string amrmWeight = 2; optional string headRoomAlpha = 3; + optional string queue = 4; + optional string policyManagerClassName = 5; } //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java index 1c5873f28bd668..2233f790525c67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java @@ -71,7 +71,8 @@ public class RouterCLI extends Configured implements Tool { "set the state of the subCluster to SC_LOST.")) // Command2: policy .put("-policy", new UsageInfo( - "[-s|--save [queue;router weight;amrm weight;headroomalpha]]", + "[-s|--save [queue;router weight;amrm weight;headroomalpha]] " + + "[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]", "We provide a set of commands for Policy:" + " Include list policies, save policies, batch save policies. " + " (Note: The policy type will be directly read from the" + @@ -102,8 +103,18 @@ public class RouterCLI extends Configured implements Tool { // Command2: policy // save policy private static final String OPTION_S = "s"; + + private static final String OPTION_BATCH_S = "bs"; + private static final String OPTION_SAVE = "save"; + + private static final String OPTION_BATCH_SAVE = "batch-save"; + + private static final String OPTION_FORMAT = "format"; + private static final String CMD_POLICY = "-policy"; + private static final String FORMAT_XML = "xml"; + private static final String FORMAT_JSON = "json"; public RouterCLI() { super(); @@ -161,7 +172,8 @@ private static void printHelp() { .append("The full syntax is: \n\n") .append("routeradmin\n") .append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]\n") - .append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha]]\n") + .append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha] " + + "[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]]\n") .append(" [-help [cmd]]").append("\n"); StringBuilder helpBuilder = new StringBuilder(); System.out.println(summary); @@ -304,7 +316,20 @@ private int handlePolicy(String[] args) "We will save the policy information of the queue, " + "including queue and weight information"); saveOpt.setOptionalArg(true); + Option batchSaveOpt = new Option(OPTION_BATCH_S, OPTION_BATCH_SAVE, false, + "We will save queue policies in bulk, " + + "where users can provide XML or JSON files containing the policies. " + + "This command will parse the file contents and store the results " + + "in the FederationStateStore."); + Option formatOpt = new Option(null, "format", true, + "Users can specify the file format using this option. " + + "Currently, there are two supported file formats: XML and JSON. " + + "These files contain the policy information for storing queue policies."); + formatOpt.setOptionalArg(true); + opts.addOption(saveOpt); + opts.addOption(batchSaveOpt); + opts.addOption(formatOpt); // Parse command line arguments. CommandLine cliParser; @@ -317,6 +342,7 @@ private int handlePolicy(String[] args) } // Try to parse the cmd save. + // Save a single queue policy if (cliParser.hasOption(OPTION_S) || cliParser.hasOption(OPTION_SAVE)) { String policy = cliParser.getOptionValue(OPTION_S); if (StringUtils.isBlank(policy)) { @@ -325,6 +351,21 @@ private int handlePolicy(String[] args) return handleSavePolicy(policy); } + // Save Queue Policies in Batches + if (cliParser.hasOption(OPTION_BATCH_S) || cliParser.hasOption(OPTION_BATCH_SAVE)) { + if (cliParser.hasOption(OPTION_FORMAT)) { + String format = cliParser.getOptionValue(OPTION_FORMAT); + if (StringUtils.isBlank(format) || + StringUtils.equalsAnyIgnoreCase(format, FORMAT_XML, FORMAT_JSON)) { + System.out.println("We currently only support policy configuration files " + + "in XML and JSON formats."); + return EXIT_ERROR; + } + System.out.println("format:" + format); + } + System.out.println("We need to specify the format of the input file."); + } + return EXIT_ERROR; } @@ -342,6 +383,12 @@ private int handleSavePolicy(String policy) { } } + private int handBatchSavePolicies(String format, String policyFile) { + LOG.info("Batch Save Federation Policies. Format = {}, PolicyFile = {}.", + format, policyFile); + return 1; + } + /** * We will parse the policy, and it has specific formatting requirements. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/MemoryPageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/MemoryPageUtils.java new file mode 100644 index 00000000000000..42a80742ed3ca9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/MemoryPageUtils.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.util; + +import java.util.ArrayList; +import java.util.List; + +/** + * This is a memory paging utility that is used to paginate a dataset. + * + * This class is designed to support batch entry queue policies. + */ +public class MemoryPageUtils { + private List dataList; + private int pageSize; + + /** + * MemoryPageUtils constructor. + * + * @param pageSize Number of records returned per page. + */ + public MemoryPageUtils(int pageSize) { + this.pageSize = pageSize; + this.dataList = new ArrayList<>(); + } + + public void addToMemory(T data) { + dataList.add(data); + } + + public List readFromMemory(int pageNumber) { + int startIndex = pageNumber * pageSize; + int endIndex = Math.min(startIndex + pageSize, dataList.size()); + if (startIndex >= dataList.size()) { + return null; + } + return dataList.subList(startIndex, endIndex); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMemoryPageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMemoryPageUtils.java new file mode 100644 index 00000000000000..4783c73d669e00 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMemoryPageUtils.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client; + +import org.apache.hadoop.yarn.client.util.MemoryPageUtils; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * The purpose of this class is to test + * whether the memory paging function is as expected. + */ +public class TestMemoryPageUtils { + + @Test + public void testMemoryPage() { + // We design such a unit test for testing pagination, and we prepare 6 pieces of policy data. + // If 1 page is followed by 5 pieces of data, we will get 2 pages. + // Page 1 will contain 5 records and page 2 will contain 1 record. + MemoryPageUtils policies = new MemoryPageUtils<>(5); + policies.addToMemory("policy-1"); + policies.addToMemory("policy-2"); + policies.addToMemory("policy-3"); + policies.addToMemory("policy-4"); + policies.addToMemory("policy-5"); + policies.addToMemory("policy-6"); + + // Page 1 will return 5 records. + List firstPage = policies.readFromMemory(0); + assertEquals(5, firstPage.size()); + + // Page 2 will return 1 records + List secondPage = policies.readFromMemory(1); + assertEquals(1, secondPage.size()); + + // Page 10, This is a wrong number of pages, we will get null. + List tenPage = policies.readFromMemory(10); + assertNull(tenPage); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java index c7989e11af561e..ea3b57fed2060b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java @@ -120,8 +120,8 @@ private DeregisterSubClusterResponse generateAllSubClusterData() { public void testHelp() throws Exception { ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); ByteArrayOutputStream dataErr = new ByteArrayOutputStream(); - System.setOut(new PrintStream(dataOut)); - System.setErr(new PrintStream(dataErr)); + // System.setOut(new PrintStream(dataOut)); + // System.setErr(new PrintStream(dataErr)); String[] args = {"-help"}; rmAdminCLI.run(args); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/federation-weights.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/federation-weights.xml new file mode 100644 index 00000000000000..fd0db3319fee84 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/federation-weights.xml @@ -0,0 +1,48 @@ + + + + + + + + + root.a + + + SC-1 + 0.7 + + + SC-2 + 0.3 + + + + + SC-1 + 0.6 + + + SC-2 + 0.4 + + + 1.0 + + + \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java index 4ca7f783bd97e9..0c9aa711ba7470 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java @@ -114,6 +114,46 @@ public void setHeadRoomAlpha(String headRoomAlpha) { builder.setHeadRoomAlpha(headRoomAlpha); } + @Override + public String getQueue() { + FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder; + boolean hasQueue = p.hasQueue(); + if (hasQueue) { + return p.getQueue(); + } + return null; + } + + @Override + public void setQueue(String queue) { + maybeInitBuilder(); + if (queue == null) { + builder.clearQueue(); + return; + } + builder.setQueue(queue); + } + + @Override + public String getPolicyManagerClassName() { + FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder; + boolean hasPolicyManagerClassName = p.hasPolicyManagerClassName(); + if (hasPolicyManagerClassName) { + return p.getPolicyManagerClassName(); + } + return null; + } + + @Override + public void setPolicyManagerClassName(String policyManagerClassName) { + maybeInitBuilder(); + if (policyManagerClassName == null) { + builder.clearPolicyManagerClassName(); + return; + } + builder.setPolicyManagerClassName(policyManagerClassName); + } + @Override public int hashCode() { return getProto().hashCode();