Skip to content

Commit

Permalink
YARN-11536. [Federation] Router CLI Supports Batch Save the SubCluste…
Browse files Browse the repository at this point in the history
…rPolicyConfiguration Of Queues.
  • Loading branch information
slfan1989 committed Jul 26, 2023
1 parent 82c8070 commit cff2b62
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;

import java.util.List;

/**
* In Federation mode,
* we will support batch save queues policies to FederationStateStore.
*/
@Private
@Unstable
public abstract class BatchSaveFederationQueuePoliciesRequest {

@Private
@Unstable
public static BatchSaveFederationQueuePoliciesRequest newInstance(
List<FederationQueueWeight> federationQueueWeights) {
BatchSaveFederationQueuePoliciesRequest request =
Records.newRecord(BatchSaveFederationQueuePoliciesRequest.class);
request.setFederationQueueWeights(federationQueueWeights);
return request;
}

@Public
@Unstable
public abstract List<FederationQueueWeight> getFederationQueueWeights();

@Private
@Unstable
public abstract void setFederationQueueWeights(
List<FederationQueueWeight> federationQueueWeights);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ public static FederationQueueWeight newInstance(String routerWeight,
return federationQueueWeight;
}

@Private
@Unstable
public static FederationQueueWeight newInstance(String routerWeight,
String amrmWeight, String headRoomAlpha, String queue, String policyManagerClassName) {
FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class);
federationQueueWeight.setRouterWeight(routerWeight);
federationQueueWeight.setAmrmWeight(amrmWeight);
federationQueueWeight.setHeadRoomAlpha(headRoomAlpha);
federationQueueWeight.setQueue(queue);
federationQueueWeight.setPolicyManagerClassName(policyManagerClassName);
return federationQueueWeight;
}

@Public
@Unstable
public abstract String getRouterWeight();
Expand Down Expand Up @@ -166,4 +179,20 @@ public static void checkHeadRoomAlphaValid(String headRoomAlpha) throws YarnExce
protected static boolean isNumeric(String value) {
return NumberUtils.isCreatable(value);
}

@Public
@Unstable
public abstract String getQueue();

@Public
@Unstable
public abstract void setQueue(String queue);

@Public
@Unstable
public abstract String getPolicyManagerClassName();

@Public
@Unstable
public abstract void setPolicyManagerClassName(String policyManagerClassName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ message SaveFederationQueuePolicyResponseProto {
required string message = 1;
}

message BatchSaveFederationQueuePoliciesRequestProto {
repeated FederationQueueWeightProto federationQueueWeights = 1;
}

message BatchSaveFederationQueuePoliciesResponseProto {
required string message = 1;
}

//////////////////////////////////////////////////////////////////
///////////// RM Failover related records ////////////////////////
//////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ message FederationQueueWeightProto {
optional string routerWeight = 1;
optional string amrmWeight = 2;
optional string headRoomAlpha = 3;
optional string queue = 4;
optional string policyManagerClassName = 5;
}

////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public class RouterCLI extends Configured implements Tool {
"set the state of the subCluster to SC_LOST."))
// Command2: policy
.put("-policy", new UsageInfo(
"[-s|--save [queue;router weight;amrm weight;headroomalpha]]",
"[-s|--save [queue;router weight;amrm weight;headroomalpha]] " +
"[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]",
"We provide a set of commands for Policy:" +
" Include list policies, save policies, batch save policies. " +
" (Note: The policy type will be directly read from the" +
Expand Down Expand Up @@ -102,8 +103,18 @@ public class RouterCLI extends Configured implements Tool {
// Command2: policy
// save policy
private static final String OPTION_S = "s";

private static final String OPTION_BATCH_S = "bs";

private static final String OPTION_SAVE = "save";

private static final String OPTION_BATCH_SAVE = "batch-save";

private static final String OPTION_FORMAT = "format";

private static final String CMD_POLICY = "-policy";
private static final String FORMAT_XML = "xml";
private static final String FORMAT_JSON = "json";

public RouterCLI() {
super();
Expand Down Expand Up @@ -161,7 +172,8 @@ private static void printHelp() {
.append("The full syntax is: \n\n")
.append("routeradmin\n")
.append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]\n")
.append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha]]\n")
.append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha] " +
"[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]]\n")
.append(" [-help [cmd]]").append("\n");
StringBuilder helpBuilder = new StringBuilder();
System.out.println(summary);
Expand Down Expand Up @@ -304,7 +316,20 @@ private int handlePolicy(String[] args)
"We will save the policy information of the queue, " +
"including queue and weight information");
saveOpt.setOptionalArg(true);
Option batchSaveOpt = new Option(OPTION_BATCH_S, OPTION_BATCH_SAVE, false,
"We will save queue policies in bulk, " +
"where users can provide XML or JSON files containing the policies. " +
"This command will parse the file contents and store the results " +
"in the FederationStateStore.");
Option formatOpt = new Option(null, "format", true,
"Users can specify the file format using this option. " +
"Currently, there are two supported file formats: XML and JSON. " +
"These files contain the policy information for storing queue policies.");
formatOpt.setOptionalArg(true);

opts.addOption(saveOpt);
opts.addOption(batchSaveOpt);
opts.addOption(formatOpt);

// Parse command line arguments.
CommandLine cliParser;
Expand All @@ -317,6 +342,7 @@ private int handlePolicy(String[] args)
}

// Try to parse the cmd save.
// Save a single queue policy
if (cliParser.hasOption(OPTION_S) || cliParser.hasOption(OPTION_SAVE)) {
String policy = cliParser.getOptionValue(OPTION_S);
if (StringUtils.isBlank(policy)) {
Expand All @@ -325,6 +351,21 @@ private int handlePolicy(String[] args)
return handleSavePolicy(policy);
}

// Save Queue Policies in Batches
if (cliParser.hasOption(OPTION_BATCH_S) || cliParser.hasOption(OPTION_BATCH_SAVE)) {
if (cliParser.hasOption(OPTION_FORMAT)) {
String format = cliParser.getOptionValue(OPTION_FORMAT);
if (StringUtils.isBlank(format) ||
StringUtils.equalsAnyIgnoreCase(format, FORMAT_XML, FORMAT_JSON)) {
System.out.println("We currently only support policy configuration files " +
"in XML and JSON formats.");
return EXIT_ERROR;
}
System.out.println("format:" + format);
}
System.out.println("We need to specify the format of the input file.");
}

return EXIT_ERROR;
}

Expand All @@ -342,6 +383,12 @@ private int handleSavePolicy(String policy) {
}
}

private int handBatchSavePolicies(String format, String policyFile) {
LOG.info("Batch Save Federation Policies. Format = {}, PolicyFile = {}.",
format, policyFile);
return 1;
}

/**
* We will parse the policy, and it has specific formatting requirements.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.util;

import java.util.ArrayList;
import java.util.List;

/**
* This is a memory paging utility that is used to paginate a dataset.
*
* This class is designed to support batch entry queue policies.
*/
public class MemoryPageUtils<T> {
private List<T> dataList;
private int pageSize;

/**
* MemoryPageUtils constructor.
*
* @param pageSize Number of records returned per page.
*/
public MemoryPageUtils(int pageSize) {
this.pageSize = pageSize;
this.dataList = new ArrayList<>();
}

public void addToMemory(T data) {
dataList.add(data);
}

public List<T> readFromMemory(int pageNumber) {
int startIndex = pageNumber * pageSize;
int endIndex = Math.min(startIndex + pageSize, dataList.size());
if (startIndex >= dataList.size()) {
return null;
}
return dataList.subList(startIndex, endIndex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;

import org.apache.hadoop.yarn.client.util.MemoryPageUtils;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

/**
* The purpose of this class is to test
* whether the memory paging function is as expected.
*/
public class TestMemoryPageUtils {

@Test
public void testMemoryPage() {
// We design such a unit test for testing pagination, and we prepare 6 pieces of policy data.
// If 1 page is followed by 5 pieces of data, we will get 2 pages.
// Page 1 will contain 5 records and page 2 will contain 1 record.
MemoryPageUtils<String> policies = new MemoryPageUtils<>(5);
policies.addToMemory("policy-1");
policies.addToMemory("policy-2");
policies.addToMemory("policy-3");
policies.addToMemory("policy-4");
policies.addToMemory("policy-5");
policies.addToMemory("policy-6");

// Page 1 will return 5 records.
List<String> firstPage = policies.readFromMemory(0);
assertEquals(5, firstPage.size());

// Page 2 will return 1 records
List<String> secondPage = policies.readFromMemory(1);
assertEquals(1, secondPage.size());

// Page 10, This is a wrong number of pages, we will get null.
List<String> tenPage = policies.readFromMemory(10);
assertNull(tenPage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ private DeregisterSubClusterResponse generateAllSubClusterData() {
public void testHelp() throws Exception {
ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
ByteArrayOutputStream dataErr = new ByteArrayOutputStream();
System.setOut(new PrintStream(dataOut));
System.setErr(new PrintStream(dataErr));
// System.setOut(new PrintStream(dataOut));
// System.setErr(new PrintStream(dataErr));

String[] args = {"-help"};
rmAdminCLI.run(args);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<federationWeights>
<weight>
<queue>
<name>root.a</name>
<amrmPolicyWeights>
<subClusterIdInfo>
<id>SC-1</id>
<weight>0.7</weight>
</subClusterIdInfo>
<subClusterIdInfo>
<id>SC-2</id>
<weight>0.3</weight>
</subClusterIdInfo>
</amrmPolicyWeights>
<routerPolicyWeights>
<subClusterIdInfo>
<id>SC-1</id>
<weight>0.6</weight>
</subClusterIdInfo>
<subClusterIdInfo>
<id>SC-2</id>
<weight>0.4</weight>
</subClusterIdInfo>
</routerPolicyWeights>
<headroomAlpha>1.0</headroomAlpha>
</queue>
</weight>
</federationWeights>
Loading

0 comments on commit cff2b62

Please sign in to comment.