From 2279b972396dbf931d5a1c79f64f20d2a1081a5f Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 28 Mar 2024 19:09:02 +0100 Subject: [PATCH] Migrate to JDK21 callAs --- .github/workflows/ci.yml | 4 +- pom.xml | 4 +- .../crypto/key/kms/KMSClientProvider.java | 61 +++++++------------ .../java/org/apache/hadoop/fs/FileSystem.java | 15 +---- .../hadoop/security/UserGroupInformation.java | 58 +++++++++--------- 5 files changed, 56 insertions(+), 86 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9e0c1b4..5ca262a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,9 +11,7 @@ jobs: - ubuntu-latest - macos-latest java: - - 8 - - 11 - - 17 + - 21 steps: - uses: actions/checkout@v2 - name: Setup JDK ${{ matrix.java }} diff --git a/pom.xml b/pom.xml index 9b8c507..7831d17 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ UTF-8 - 1.8 + 21 io.trino.hadoop.\$internal 1.7.36 3.3.5 @@ -413,7 +413,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.2.1 + 3.5.2 package diff --git a/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index 1bec204..5a241fc 100644 --- a/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -66,13 +66,13 @@ import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; @@ -503,14 +503,10 @@ private HttpURLConnection createConnection(final URL url, String method) HttpURLConnection conn; try { final String doAsUser = getDoAsUser(); - conn = getActualUgi().doAs(new PrivilegedExceptionAction - () { - @Override - public HttpURLConnection run() throws Exception { - DelegationTokenAuthenticatedURL authUrl = - createAuthenticatedURL(); - return authUrl.openConnection(url, authToken, doAsUser); - } + conn = getActualUgi().callAs(() -> { + DelegationTokenAuthenticatedURL authUrl = + createAuthenticatedURL(); + return authUrl.openConnection(url, authToken, doAsUser); }); } catch (ConnectException ex) { String msg = "Failed to connect to: " + url.toString(); @@ -1026,15 +1022,12 @@ public Token getDelegationToken(final String renewer) throws IOException { Token token = null; try { final String doAsUser = getDoAsUser(); - token = getActualUgi().doAs(new PrivilegedExceptionAction>() { - @Override - public Token run() throws Exception { - // Not using the cached token here.. Creating a new token here - // everytime. - LOG.debug("Getting new token from {}, renewer:{}", url, renewer); - return authUrl.getDelegationToken(url, - new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser); - } + token = getActualUgi().callAs((Callable>) () -> { + // Not using the cached token here.. Creating a new token here + // everytime. + LOG.debug("Getting new token from {}, renewer:{}", url, renewer); + return authUrl.getDelegationToken(url, + new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser); }); if (token != null) { token.setService(dtService); @@ -1042,8 +1035,6 @@ public Token run() throws Exception { } else { throw new IOException("Got NULL as delegation token"); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } catch (Exception e) { if (e instanceof IOException) { throw (IOException) e; @@ -1065,13 +1056,8 @@ public long renewDelegationToken(final Token dToken) throws IOException { token, url, doAsUser); final DelegationTokenAuthenticatedURL authUrl = createAuthenticatedURL(); - return getActualUgi().doAs( - new PrivilegedExceptionAction() { - @Override - public Long run() throws Exception { - return authUrl.renewDelegationToken(url, token, doAsUser); - } - } + return getActualUgi().callAs( + () -> authUrl.renewDelegationToken(url, token, doAsUser) ); } catch (Exception ex) { if (ex instanceof IOException) { @@ -1088,18 +1074,15 @@ public Void cancelDelegationToken(final Token dToken) throws IOException { final String doAsUser = getDoAsUser(); final DelegationTokenAuthenticatedURL.Token token = generateDelegationToken(dToken); - return getActualUgi().doAs( - new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - final URL url = createURL(null, null, null, null); - LOG.debug("Cancelling delegation token {} with url:{}, as:{}", - dToken, url, doAsUser); - final DelegationTokenAuthenticatedURL authUrl = - createAuthenticatedURL(); - authUrl.cancelDelegationToken(url, token, doAsUser); - return null; - } + return getActualUgi().callAs( + () -> { + final URL url = createURL(null, null, null, null); + LOG.debug("Cancelling delegation token {} with url:{}, as:{}", + dToken, url, doAsUser); + final DelegationTokenAuthenticatedURL authUrl = + createAuthenticatedURL(); + authUrl.cancelDelegationToken(url, token, doAsUser); + return null; } ); } catch (Exception ex) { diff --git a/src/main/java/org/apache/hadoop/fs/FileSystem.java b/src/main/java/org/apache/hadoop/fs/FileSystem.java index 6c3d625..3d878b6 100644 --- a/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -25,7 +25,6 @@ import java.lang.ref.ReferenceQueue; import java.net.URI; import java.net.URISyntaxException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -272,12 +271,7 @@ public static FileSystem get(final URI uri, final Configuration conf, conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user); - return ugi.doAs(new PrivilegedExceptionAction() { - @Override - public FileSystem run() throws IOException { - return get(uri, conf); - } - }); + return ugi.callAs(() -> get(uri, conf)); } /** @@ -574,12 +568,7 @@ public static FileSystem newInstance(final URI uri, final Configuration conf, conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user); - return ugi.doAs(new PrivilegedExceptionAction() { - @Override - public FileSystem run() throws IOException { - return newInstance(uri, conf); - } - }); + return ugi.callAs(() -> newInstance(uri, conf)); } /** diff --git a/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index 2b6a30f..636597c 100644 --- a/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -35,11 +35,8 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; -import java.security.AccessControlContext; -import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -53,9 +50,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -575,8 +574,7 @@ public boolean hasKerberosCredentials() { @InterfaceStability.Evolving public static UserGroupInformation getCurrentUser() throws IOException { ensureInitialized(); - AccessControlContext context = AccessController.getContext(); - Subject subject = Subject.getSubject(context); + Subject subject = Subject.current(); if (subject == null || subject.getPrincipals(User.class).isEmpty()) { return getLoginUser(); } else { @@ -1862,22 +1860,6 @@ public Subject getSubject() { return subject; } - /** - * Run the given action as the user. - * @param the return type of the run method - * @param action the method to execute - * @return the value from the run method - */ - @InterfaceAudience.Public - @InterfaceStability.Evolving - public T doAs(PrivilegedAction action) { - if (LOG.isDebugEnabled()) { - LOG.debug("PrivilegedAction [as: {}][action: {}]", this, action, - new Exception()); - } - return Subject.doAs(subject, action); - } - /** * Run the given action as the user, potentially throwing an exception. * @param the return type of the run method @@ -1891,20 +1873,20 @@ public T doAs(PrivilegedAction action) { */ @InterfaceAudience.Public @InterfaceStability.Evolving - public T doAs(PrivilegedExceptionAction action + public T callAs(Callable action ) throws IOException, InterruptedException { try { if (LOG.isDebugEnabled()) { LOG.debug("PrivilegedAction [as: {}][action: {}]", this, action, new Exception()); } - return Subject.doAs(subject, action); - } catch (PrivilegedActionException pae) { - Throwable cause = pae.getCause(); - LOG.debug("PrivilegedActionException as: {}", this, cause); + return Subject.callAs(subject, action); + } catch (CompletionException coe) { + Throwable cause = coe.getCause(); + LOG.debug("CompletionException as: {}", this, cause); if (cause == null) { - throw new RuntimeException("PrivilegedActionException with no " + - "underlying cause. UGI [" + this + "]" +": " + pae, pae); + throw new RuntimeException("CompletionException with no " + + "underlying cause. UGI [" + this + "]" +": " + coe, coe); } else if (cause instanceof IOException) { throw (IOException) cause; } else if (cause instanceof Error) { @@ -1919,6 +1901,24 @@ public T doAs(PrivilegedExceptionAction action } } + /** + * Run the given action as the user, potentially throwing an exception. + * @param the return type of the run method + * @param action the method to execute + * @return the value from the run method + * @throws IOException if the action throws an IOException + * @throws Error if the action throws an Error + * @throws RuntimeException if the action throws a RuntimeException + * @throws InterruptedException if the action throws an InterruptedException + * @throws UndeclaredThrowableException if the action throws something else + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public T doAs(PrivilegedExceptionAction action + ) throws IOException, InterruptedException { + return callAs(action::run); + } + /** * Log current UGI and token information into specified log. * @@ -2280,4 +2280,4 @@ public static void main(String [] args) throws Exception { System.out.println("Keytab " + loginUgi.isFromKeytab()); } } -} \ No newline at end of file +}