Skip to content

Commit

Permalink
Migrate to JDK21 callAs
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo authored and electrum committed May 7, 2024
1 parent cef916d commit 2279b97
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 86 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ jobs:
- ubuntu-latest
- macos-latest
java:
- 8
- 11
- 17
- 21
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.targetJdk>1.8</project.build.targetJdk>
<project.build.targetJdk>21</project.build.targetJdk>
<shadeBase>io.trino.hadoop.\$internal</shadeBase>
<dep.slf4j.version>1.7.36</dep.slf4j.version>
<dep.hadoop.version>3.3.5</dep.hadoop.version>
Expand Down Expand Up @@ -413,7 +413,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<version>3.5.2</version>
<executions>
<execution>
<phase>package</phase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
Expand Down Expand Up @@ -503,14 +503,10 @@ private HttpURLConnection createConnection(final URL url, String method)
HttpURLConnection conn;
try {
final String doAsUser = getDoAsUser();
conn = getActualUgi().doAs(new PrivilegedExceptionAction
<HttpURLConnection>() {
@Override
public HttpURLConnection run() throws Exception {
DelegationTokenAuthenticatedURL authUrl =
createAuthenticatedURL();
return authUrl.openConnection(url, authToken, doAsUser);
}
conn = getActualUgi().callAs(() -> {
DelegationTokenAuthenticatedURL authUrl =
createAuthenticatedURL();
return authUrl.openConnection(url, authToken, doAsUser);
});
} catch (ConnectException ex) {
String msg = "Failed to connect to: " + url.toString();
Expand Down Expand Up @@ -1026,24 +1022,19 @@ public Token<?> getDelegationToken(final String renewer) throws IOException {
Token<?> token = null;
try {
final String doAsUser = getDoAsUser();
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override
public Token<?> run() throws Exception {
// Not using the cached token here.. Creating a new token here
// everytime.
LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
return authUrl.getDelegationToken(url,
new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
}
token = getActualUgi().callAs((Callable<Token<?>>) () -> {
// Not using the cached token here.. Creating a new token here
// everytime.
LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
return authUrl.getDelegationToken(url,
new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
});
if (token != null) {
token.setService(dtService);
LOG.info("New token created: ({})", token);
} else {
throw new IOException("Got NULL as delegation token");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException) e;
Expand All @@ -1065,13 +1056,8 @@ public long renewDelegationToken(final Token<?> dToken) throws IOException {
token, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
createAuthenticatedURL();
return getActualUgi().doAs(
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return authUrl.renewDelegationToken(url, token, doAsUser);
}
}
return getActualUgi().callAs(
() -> authUrl.renewDelegationToken(url, token, doAsUser)
);
} catch (Exception ex) {
if (ex instanceof IOException) {
Expand All @@ -1088,18 +1074,15 @@ public Void cancelDelegationToken(final Token<?> dToken) throws IOException {
final String doAsUser = getDoAsUser();
final DelegationTokenAuthenticatedURL.Token token =
generateDelegationToken(dToken);
return getActualUgi().doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
final URL url = createURL(null, null, null, null);
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
dToken, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
createAuthenticatedURL();
authUrl.cancelDelegationToken(url, token, doAsUser);
return null;
}
return getActualUgi().callAs(
() -> {
final URL url = createURL(null, null, null, null);
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
dToken, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
createAuthenticatedURL();
authUrl.cancelDelegationToken(url, token, doAsUser);
return null;
}
);
} catch (Exception ex) {
Expand Down
15 changes: 2 additions & 13 deletions src/main/java/org/apache/hadoop/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.lang.ref.ReferenceQueue;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -272,12 +271,7 @@ public static FileSystem get(final URI uri, final Configuration conf,
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return get(uri, conf);
}
});
return ugi.callAs(() -> get(uri, conf));
}

/**
Expand Down Expand Up @@ -574,12 +568,7 @@ public static FileSystem newInstance(final URI uri, final Configuration conf,
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return newInstance(uri, conf);
}
});
return ugi.callAs(() -> newInstance(uri, conf));
}

/**
Expand Down
58 changes: 29 additions & 29 deletions src/main/java/org/apache/hadoop/security/UserGroupInformation.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -53,9 +50,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -575,8 +574,7 @@ public boolean hasKerberosCredentials() {
@InterfaceStability.Evolving
public static UserGroupInformation getCurrentUser() throws IOException {
ensureInitialized();
AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context);
Subject subject = Subject.current();
if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
return getLoginUser();
} else {
Expand Down Expand Up @@ -1862,22 +1860,6 @@ public Subject getSubject() {
return subject;
}

/**
* Run the given action as the user.
* @param <T> the return type of the run method
* @param action the method to execute
* @return the value from the run method
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public <T> T doAs(PrivilegedAction<T> action) {
if (LOG.isDebugEnabled()) {
LOG.debug("PrivilegedAction [as: {}][action: {}]", this, action,
new Exception());
}
return Subject.doAs(subject, action);
}

/**
* Run the given action as the user, potentially throwing an exception.
* @param <T> the return type of the run method
Expand All @@ -1891,20 +1873,20 @@ public <T> T doAs(PrivilegedAction<T> action) {
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public <T> T doAs(PrivilegedExceptionAction<T> action
public <T> T callAs(Callable<T> action
) throws IOException, InterruptedException {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("PrivilegedAction [as: {}][action: {}]", this, action,
new Exception());
}
return Subject.doAs(subject, action);
} catch (PrivilegedActionException pae) {
Throwable cause = pae.getCause();
LOG.debug("PrivilegedActionException as: {}", this, cause);
return Subject.callAs(subject, action);
} catch (CompletionException coe) {
Throwable cause = coe.getCause();
LOG.debug("CompletionException as: {}", this, cause);
if (cause == null) {
throw new RuntimeException("PrivilegedActionException with no " +
"underlying cause. UGI [" + this + "]" +": " + pae, pae);
throw new RuntimeException("CompletionException with no " +
"underlying cause. UGI [" + this + "]" +": " + coe, coe);
} else if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof Error) {
Expand All @@ -1919,6 +1901,24 @@ public <T> T doAs(PrivilegedExceptionAction<T> action
}
}

/**
* Run the given action as the user, potentially throwing an exception.
* @param <T> the return type of the run method
* @param action the method to execute
* @return the value from the run method
* @throws IOException if the action throws an IOException
* @throws Error if the action throws an Error
* @throws RuntimeException if the action throws a RuntimeException
* @throws InterruptedException if the action throws an InterruptedException
* @throws UndeclaredThrowableException if the action throws something else
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public <T> T doAs(PrivilegedExceptionAction<T> action
) throws IOException, InterruptedException {
return callAs(action::run);
}

/**
* Log current UGI and token information into specified log.
*
Expand Down Expand Up @@ -2280,4 +2280,4 @@ public static void main(String [] args) throws Exception {
System.out.println("Keytab " + loginUgi.isFromKeytab());
}
}
}
}

0 comments on commit 2279b97

Please sign in to comment.