Skip to content

Commit

Permalink
YARN-11158. Support (Create/Renew/Cancel) DelegationToken API's for F…
Browse files Browse the repository at this point in the history
…ederation. (#5104)
  • Loading branch information
slfan1989 authored Dec 1, 2022
1 parent 5440c75 commit 4af4997
Show file tree
Hide file tree
Showing 6 changed files with 468 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public final class RouterMetrics {
private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
@Metric("# of checkUserAccessToQueue failed to be retrieved")
private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
@Metric("# of getDelegationToken failed to be retrieved")
private MutableGaugeInt numGetDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand Down Expand Up @@ -215,6 +221,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
@Metric("Total number of successful Retrieved GetDelegationToken and latency(ms)")
private MutableRate totalSucceededGetDelegationTokenRetrieved;
@Metric("Total number of successful Retrieved RenewDelegationToken and latency(ms)")
private MutableRate totalSucceededRenewDelegationTokenRetrieved;
@Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
private MutableRate totalSucceededCancelDelegationTokenRetrieved;

/**
* Provide quantile counters for all latencies.
Expand Down Expand Up @@ -262,6 +274,9 @@ public final class RouterMetrics {
private MutableQuantiles getRefreshQueuesLatency;
private MutableQuantiles getRMNodeLabelsLatency;
private MutableQuantiles checkUserAccessToQueueLatency;
private MutableQuantiles getDelegationTokenLatency;
private MutableQuantiles renewDelegationTokenLatency;
private MutableQuantiles cancelDelegationTokenLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -423,6 +438,15 @@ private RouterMetrics() {

checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency",
"latency of get apptimeouts timeouts", "ops", "latency", 10);

getDelegationTokenLatency = registry.newQuantiles("getDelegationTokenLatency",
"latency of get delegation token timeouts", "ops", "latency", 10);

renewDelegationTokenLatency = registry.newQuantiles("renewDelegationTokenLatency",
"latency of renew delegation token timeouts", "ops", "latency", 10);

cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
"latency of cancel delegation token timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -655,10 +679,25 @@ public long getNumSucceededGetRMNodeLabelsRetrieved() {
}

@VisibleForTesting
public long getNumSucceededCheckUserAccessToQueueRetrievedRetrieved() {
public long getNumSucceededCheckUserAccessToQueueRetrieved() {
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetDelegationTokenRetrieved() {
return totalSucceededGetDelegationTokenRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededRenewDelegationTokenRetrieved() {
return totalSucceededRenewDelegationTokenRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -874,6 +913,21 @@ public double getLatencySucceededCheckUserAccessToQueueRetrieved() {
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetDelegationTokenRetrieved() {
return totalSucceededGetDelegationTokenRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededRenewDelegationTokenRetrieved() {
return totalSucceededRenewDelegationTokenRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -1068,6 +1122,18 @@ public int getCheckUserAccessToQueueFailedRetrieved() {
return numCheckUserAccessToQueueFailedRetrieved.value();
}

public int getDelegationTokenFailedRetrieved() {
return numGetDelegationTokenFailedRetrieved.value();
}

public int getRenewDelegationTokenFailedRetrieved() {
return numRenewDelegationTokenFailedRetrieved.value();
}

public int getCancelDelegationTokenFailedRetrieved() {
return numCancelDelegationTokenFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -1283,6 +1349,21 @@ public void succeededCheckUserAccessToQueueRetrieved(long duration) {
checkUserAccessToQueueLatency.add(duration);
}

public void succeededGetDelegationTokenRetrieved(long duration) {
totalSucceededGetDelegationTokenRetrieved.add(duration);
getDelegationTokenLatency.add(duration);
}

public void succeededRenewDelegationTokenRetrieved(long duration) {
totalSucceededRenewDelegationTokenRetrieved.add(duration);
renewDelegationTokenLatency.add(duration);
}

public void succeededCancelDelegationTokenRetrieved(long duration) {
totalSucceededCancelDelegationTokenRetrieved.add(duration);
cancelDelegationTokenLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -1454,4 +1535,16 @@ public void incrGetRMNodeLabelsFailedRetrieved() {
public void incrCheckUserAccessToQueueFailedRetrieved() {
numCheckUserAccessToQueueFailedRetrieved.incr();
}
}

public void incrGetDelegationTokenFailedRetrieved() {
numGetDelegationTokenFailedRetrieved.incr();
}

public void incrRenewDelegationTokenFailedRetrieved() {
numRenewDelegationTokenFailedRetrieved.incr();
}

public void incrCancelDelegationTokenFailedRetrieved() {
numCancelDelegationTokenFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +38,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.EnumSet;
import java.io.IOException;

/**
Expand Down Expand Up @@ -470,6 +473,27 @@ public static void validateContainerId(String containerId)
}
}

public static boolean isAllowedDelegationTokenOp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS,
UserGroupInformation.AuthenticationMethod.KERBEROS_SSL,
UserGroupInformation.AuthenticationMethod.CERTIFICATE)
.contains(UserGroupInformation.getCurrentUser()
.getRealAuthenticationMethod());
} else {
return true;
}
}

public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser();
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
// we can always renew our own tokens
return loginUser.getUserName().equals(user.getUserName())
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
}

public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Method;
Expand All @@ -40,7 +41,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -118,9 +118,13 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;

import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
Expand All @@ -136,6 +140,7 @@
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1392,19 +1397,103 @@ public GetContainersResponse getContainers(GetContainersRequest request)
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");

if (request == null || request.getRenewer() == null) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getDelegationToken request or Renewer.", null);
}

try {
// Verify that the connection is kerberos authenticated
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be issued only with kerberos authentication.");
}

long startTime = clock.getTime();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Text owner = new Text(ugi.getUserName());
Text realUser = null;
if (ugi.getRealUser() != null) {
realUser = new Text(ugi.getRealUser().getUserName());
}

RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), realUser);
Token<RMDelegationTokenIdentifier> realRMDToken =
new Token<>(tokenIdentifier, this.getTokenSecretManager());

org.apache.hadoop.yarn.api.records.Token routerRMDTToken =
BuilderUtils.newDelegationToken(realRMDToken.getIdentifier(),
realRMDToken.getKind().toString(),
realRMDToken.getPassword(), realRMDToken.getService().toString());

long stopTime = clock.getTime();
routerMetrics.succeededGetDelegationTokenRetrieved((stopTime - startTime));
return GetDelegationTokenResponse.newInstance(routerRMDTToken);
} catch(IOException e) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}

@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
try {

if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrRenewDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be renewed only with kerberos authentication");
}

long startTime = clock.getTime();
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = RouterServerUtil.getRenewerForToken(token);
long nextExpTime = this.getTokenSecretManager().renewToken(token, user);
RenewDelegationTokenResponse renewResponse =
Records.newRecord(RenewDelegationTokenResponse.class);
renewResponse.setNextExpirationTime(nextExpTime);
long stopTime = clock.getTime();
routerMetrics.succeededRenewDelegationTokenRetrieved((stopTime - startTime));
return renewResponse;

} catch (IOException e) {
routerMetrics.incrRenewDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}

@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
try {
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrCancelDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be cancelled only with kerberos authentication");
}

long startTime = clock.getTime();
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getUserName();
this.getTokenSecretManager().cancelToken(token, user);
long stopTime = clock.getTime();
routerMetrics.succeededCancelDelegationTokenRetrieved((stopTime - startTime));
return Records.newRecord(CancelDelegationTokenResponse.class);
} catch (IOException e) {
routerMetrics.incrCancelDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}

@Override
Expand Down
Loading

0 comments on commit 4af4997

Please sign in to comment.