Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11483. [Federation] Router AdminCLI Supports Clean Finish Apps. #6251

Merged
merged 7 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;

@Private
public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol {
Expand Down Expand Up @@ -218,4 +220,17 @@ BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
@Idempotent
QueryFederationQueuePoliciesResponse listFederationQueuePolicies(
QueryFederationQueuePoliciesRequest request) throws YarnException, IOException;

/**
* In YARN-Federation mode, this method provides a way to delete federation application.
*
* @param request DeleteFederationApplicationRequest Request.
* @return Response from deleteFederationApplication.
* @throws YarnException exceptions from yarn servers.
* @throws IOException if an IO error occurred.
*/
@Private
@Idempotent
DeleteFederationApplicationResponse deleteFederationApplication(
DeleteFederationApplicationRequest request) throws YarnException, IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;

/**
* This class is used for cleaning up an application that exists in the FederationStateStore.
* This is a user-specified operation;
* we typically use this command to clean up an expired application.
* However, it can also be used to clean up non-expired application, although it is not recommended.
*/
@Private
@Unstable
public abstract class DeleteFederationApplicationRequest {

@Private
@Unstable
public static DeleteFederationApplicationRequest newInstance(String application) {
DeleteFederationApplicationRequest request =
Records.newRecord(DeleteFederationApplicationRequest.class);
request.setApplication(application);
return request;
}

@Public
@Unstable
public abstract String getApplication();

@Public
@Unstable
public abstract void setApplication(String application);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;

@Private
@Unstable
public abstract class DeleteFederationApplicationResponse {

public static DeleteFederationApplicationResponse newInstance() {
return Records.newRecord(DeleteFederationApplicationResponse.class);
}

public static DeleteFederationApplicationResponse newInstance(String msg) {
DeleteFederationApplicationResponse response =
Records.newRecord(DeleteFederationApplicationResponse.class);
response.setMessage(msg);
return response;
}

@Public
@Unstable
public abstract String getMessage();

@Public
@Unstable
public abstract void setMessage(String msg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ service ResourceManagerAdministrationProtocolService {
rpc saveFederationQueuePolicy(SaveFederationQueuePolicyRequestProto) returns (SaveFederationQueuePolicyResponseProto);
rpc batchSaveFederationQueuePolicies(BatchSaveFederationQueuePoliciesRequestProto) returns (BatchSaveFederationQueuePoliciesResponseProto);
rpc listFederationQueuePolicies(QueryFederationQueuePoliciesRequestProto) returns (QueryFederationQueuePoliciesResponseProto);
rpc deleteFederationApplication(DeleteFederationApplicationRequestProto) returns (DeleteFederationApplicationResponseProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ message QueryFederationQueuePoliciesResponseProto {
repeated FederationQueueWeightProto federationQueueWeights = 5;
}

message DeleteFederationApplicationRequestProto {
optional string application = 1;
}

message DeleteFederationApplicationResponseProto {
optional string message = 1;
}

//////////////////////////////////////////////////////////////////
///////////// RM Failover related records ////////////////////////
//////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.cli;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Option;
Expand All @@ -42,6 +43,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
Expand Down Expand Up @@ -227,12 +230,37 @@ public class RouterCLI extends Configured implements Tool {
.addExample(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_1)
.addExample(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_2);

// Command3: application
private static final String CMD_APPLICATION = "-application";

// Application Delete
protected final static UsageInfo APPLICATION_DELETE_USAGE = new UsageInfo(
"--delete <application_id>",
"This command is used to delete the specified application.");

protected final static String APPLICATION_DELETE_USAGE_EXAMPLE_DESC =
"If we want to delete application_1440536969523_0001.";

protected final static String APPLICATION_DELETE_USAGE_EXAMPLE_1 =
"yarn routeradmin -application --delete application_1440536969523_0001";

protected final static RouterCmdUsageInfos APPLICATION_USAGEINFOS = new RouterCmdUsageInfos()
// application delete
.addUsageInfo(APPLICATION_DELETE_USAGE)
.addExampleDescs(APPLICATION_DELETE_USAGE.args, APPLICATION_DELETE_USAGE_EXAMPLE_DESC)
.addExample(APPLICATION_DELETE_USAGE.args, APPLICATION_DELETE_USAGE_EXAMPLE_1);

// delete application
private static final String OPTION_DELETE_APP = "delete";

protected final static Map<String, RouterCmdUsageInfos> ADMIN_USAGE =
ImmutableMap.<String, RouterCmdUsageInfos>builder()
// Command1: deregisterSubCluster
.put(CMD_DEREGISTERSUBCLUSTER, DEREGISTER_SUBCLUSTER_USAGEINFOS)
// Command2: policy
.put(CMD_POLICY, POLICY_USAGEINFOS)
// Command3: application
.put(CMD_APPLICATION, APPLICATION_USAGEINFOS)
.build();

public RouterCLI() {
Expand Down Expand Up @@ -814,6 +842,50 @@ protected int handListPolicies(int pageSize, int currentPage, String queue, List
}
}

private int handleDeleteApplication(String application) {
LOG.info("Delete Application = {}.", application);
try {
DeleteFederationApplicationRequest request =
DeleteFederationApplicationRequest.newInstance(application);
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
DeleteFederationApplicationResponse response =
adminProtocol.deleteFederationApplication(request);
System.out.println(response.getMessage());
return EXIT_SUCCESS;
} catch (Exception e) {
LOG.error("handleSavePolicy error.", e);
return EXIT_ERROR;
}
}

private int handleApplication(String[] args)
throws IOException, YarnException, ParseException {
// Prepare Options.
Options opts = new Options();
opts.addOption("application", false,
"We provide a set of commands to query and clean applications.");
Option deleteOpt = new Option(null, OPTION_DELETE_APP, true,
"We will clean up the provided application.");
opts.addOption(deleteOpt);

// Parse command line arguments.
CommandLine cliParser;
try {
cliParser = new DefaultParser().parse(opts, args);
} catch (MissingArgumentException ex) {
System.out.println("Missing argument for options");
printUsage(args[0]);
return EXIT_ERROR;
}

if (cliParser.hasOption(OPTION_DELETE_APP)) {
String application = cliParser.getOptionValue(OPTION_DELETE_APP);
return handleDeleteApplication(application);
}

return 0;
}

@Override
public int run(String[] args) throws Exception {
YarnConfiguration yarnConf = getConf() == null ?
Expand All @@ -839,6 +911,8 @@ public int run(String[] args) throws Exception {
return handleDeregisterSubCluster(args);
} else if (CMD_POLICY.equals(cmd)) {
return handlePolicy(args);
} else if (CMD_APPLICATION.equals(cmd)) {
return handleApplication(args);
} else {
System.out.println("No related commands found.");
printHelp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void setup() throws Exception {
return QueryFederationQueuePoliciesResponse.newInstance(1, 1, 1, 10, weights);
});


Configuration config = new Configuration();
config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);

Expand Down Expand Up @@ -271,7 +272,7 @@ public void testListPolicies() throws Exception {
@Test
public void testBuildHelpMsg() throws Exception {
Map<String, RouterCLI.RouterCmdUsageInfos> adminUsage = rmAdminCLI.getAdminUsage();
assertEquals(2, adminUsage.size());
assertEquals(3, adminUsage.size());

RouterCLI.RouterCmdUsageInfos deregisterSubClusterUsageInfos =
adminUsage.get("-deregisterSubCluster");
Expand All @@ -291,5 +292,11 @@ public void testBuildHelpMsg() throws Exception {
policyExamplesMap.forEach((cmd, cmdExamples) -> {
assertEquals(2, cmdExamples.size());
});

RouterCLI.RouterCmdUsageInfos applicationUsageInfos = adminUsage.get("-application");
assertNotNull(applicationUsageInfos);
Map<String, List<String>> applicationExamplesMap = applicationUsageInfos.getExamples();
assertNotNull(applicationExamplesMap);
assertEquals(1, applicationExamplesMap.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationApplicationRequestProto;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
Expand Down Expand Up @@ -86,6 +87,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
Expand Down Expand Up @@ -122,6 +125,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationApplicationRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationApplicationResponsePBImpl;

import org.apache.hadoop.thirdparty.protobuf.ServiceException;

Expand Down Expand Up @@ -420,4 +425,18 @@ public QueryFederationQueuePoliciesResponse listFederationQueuePolicies(
}
return null;
}

@Override
public DeleteFederationApplicationResponse deleteFederationApplication(
DeleteFederationApplicationRequest request) throws YarnException, IOException {
DeleteFederationApplicationRequestProto requestProto =
((DeleteFederationApplicationRequestPBImpl) request).getProto();
try {
return new DeleteFederationApplicationResponsePBImpl(
proxy.deleteFederationApplication(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeleteFederationApplicationRequestProto;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
Expand Down Expand Up @@ -87,6 +89,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
Expand Down Expand Up @@ -123,6 +127,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationApplicationRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeleteFederationApplicationResponsePBImpl;

import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
Expand Down Expand Up @@ -445,4 +451,20 @@ public QueryFederationQueuePoliciesResponseProto listFederationQueuePolicies(
throw new ServiceException(e);
}
}

@Override
public DeleteFederationApplicationResponseProto deleteFederationApplication(
RpcController controller, DeleteFederationApplicationRequestProto proto)
throws ServiceException {
DeleteFederationApplicationRequest request =
new DeleteFederationApplicationRequestPBImpl(proto);
try {
DeleteFederationApplicationResponse response = real.deleteFederationApplication(request);
return ((DeleteFederationApplicationResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}
Loading