Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Communication] - Common - Remove all Future and replace with Mono #18052

Merged
merged 15 commits into from
Dec 11, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public CommunicationBearerTokenCredential(CommunicationTokenCredential communica
@Override
public Mono<AccessToken> getToken(TokenRequestContext request) {
try {
return Mono.just(credential.getToken().get());
return credential.getToken();
} catch (InterruptedException ex) {
return Mono.error(ex);
} catch (ExecutionException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
package com.azure.communication.common;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.azure.core.util.logging.ClientLogger;

import reactor.core.publisher.Mono;

import com.azure.core.credential.AccessToken;

import java.io.IOException;
Expand All @@ -29,46 +28,11 @@ public final class CommunicationTokenCredential implements AutoCloseable {
private final ClientLogger logger = new ClientLogger(CommunicationTokenCredential.class);

private AccessToken accessToken;
private Future<AccessToken> tokenFuture;
private final TokenParser tokenParser = new TokenParser();
private TokenRefresher refresher;
private FetchingTask fetchingTask;
private boolean isClosed = false;

private static class TokenImmediate implements Future<AccessToken> {
private final AccessToken accessToken;

TokenImmediate(AccessToken accessToken) {
this.accessToken = accessToken;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}

@Override
public AccessToken get() throws InterruptedException, ExecutionException {
return this.accessToken;
}

@Override
public AccessToken get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return this.accessToken;
}
}

/**
* Create with serialized JWT token
*
Expand All @@ -77,7 +41,6 @@ public AccessToken get(long timeout, TimeUnit unit)
public CommunicationTokenCredential(String initialToken) {
Objects.requireNonNull(initialToken, "'initialToken' cannot be null.");
setToken(initialToken);
tokenFuture = new TokenImmediate(accessToken);
}

/**
Expand All @@ -92,52 +55,55 @@ public CommunicationTokenCredential(TokenRefresher tokenRefresher) {

/**
* Create with serialized JWT token and a token supplier to auto-refresh the
* token before it expires. Callback function tokenRefresher will be called ahead
* of the token expiry by the number of minutes specified by
* token before it expires. Callback function tokenRefresher will be called
* ahead of the token expiry by the number of minutes specified by
* CallbackOffsetMinutes defaulted to two minutes. To modify this default, call
* setCallbackOffsetMinutes after construction
*
* @param tokenRefresher implementation to supply fresh token when reqested
* @param initialToken serialized JWT token
* @param refreshProactively when set to true, turn on proactive fetching to
* call tokenRefresher before token expiry by minutes
* set with setCallbackOffsetMinutes or default value
* of two minutes
* @param refreshProactively when set to true, turn on proactive fetching to call
* tokenRefresher before token expiry by minutes set
* with setCallbackOffsetMinutes or default value of
* two minutes
*/
public CommunicationTokenCredential(
TokenRefresher tokenRefresher,
String initialToken,
boolean refreshProactively)
{
public CommunicationTokenCredential(TokenRefresher tokenRefresher, String initialToken,
boolean refreshProactively) {
this(tokenRefresher);
Objects.requireNonNull(initialToken, "'initialToken' cannot be null.");
setToken(initialToken);
tokenFuture = new TokenImmediate(accessToken);
if (refreshProactively) {
OffsetDateTime nextFetchTime = accessToken.getExpiresAt().minusMinutes(DEFAULT_EXPIRING_OFFSET_MINUTES);
fetchingTask = new FetchingTask(this, nextFetchTime);
}
}


/**
* Get Azure core access token from credential
*
* @return Asynchronous call to fetch actual token
* @throws ExecutionException when supplier throws this exception
* @throws InterruptedException when supplier throws this exception
*/
public Future<AccessToken> getToken() throws InterruptedException, ExecutionException {
public Mono<AccessToken> getToken() throws InterruptedException, ExecutionException {
if (isClosed) {
throw logger.logExceptionAsError(
new RuntimeException("getToken called on closed CommunicationTokenCredential object"));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, return a Mono.error() here instead of throwing an exception.

if ((accessToken == null || accessToken.isExpired()) // no valid token to return
&& refresher != null // can refresh
&& (tokenFuture == null || tokenFuture.isDone())) { // no fetching in progress, proactive or on-demand
fetchFreshToken();
if ((accessToken == null || accessToken.isExpired()) && refresher != null) {
synchronized (this) {
// no valid token to return and can refresh
if ((accessToken == null || accessToken.isExpired()) && refresher != null) {
return fetchFreshToken()
.map(token -> {
accessToken = tokenParser.parseJWTToken(token);
return accessToken;
});
}
}
}

return tokenFuture;
return Mono.just(accessToken);
}

@Override
Expand All @@ -155,61 +121,22 @@ boolean hasProactiveFetcher() {
return fetchingTask != null;
}

private void setToken(String freshToken) {
private void setToken(String freshToken) {
accessToken = tokenParser.parseJWTToken(freshToken);

if (fetchingTask != null) {
OffsetDateTime nextFetchTime = accessToken.getExpiresAt().minusMinutes(DEFAULT_EXPIRING_OFFSET_MINUTES);
fetchingTask.setNextFetchTime(nextFetchTime);
}
}

private Future<String> fetchFreshToken() {
Future<String> fetchFuture = refresher.getFetchTokenFuture();
if (fetchFuture == null) {
throw logger.logExceptionAsError(
new RuntimeException("TokenRefresher returned null when getFetchTokenFuture is called"));
}
tokenFuture = new TokenFuture(fetchFuture);
return fetchFuture;
}

private class TokenFuture implements Future<AccessToken> {
private final Future<String> clientTokenFuture;

TokenFuture(Future<String> tokenStringFuture) {
this.clientTokenFuture = tokenStringFuture;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return clientTokenFuture.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return clientTokenFuture.isCancelled();
}

@Override
public boolean isDone() {
return clientTokenFuture.isDone();
}

@Override
public AccessToken get() throws InterruptedException, ExecutionException {
String freshToken = clientTokenFuture.get();
setToken(freshToken);
return accessToken;
}

@Override
public AccessToken get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
String freshToken = clientTokenFuture.get(timeout, unit);
setToken(freshToken);
return accessToken;
private Mono<String> fetchFreshToken() {
Mono<String> tokenAsync = refresher.getTokenAsync();
if (tokenAsync == null) {
throw logger.logExceptionAsError(
new RuntimeException("TokenRefresher returned null when getTokenAsync is called"));
}
return tokenAsync;
}

private static class FetchingTask {
Expand Down Expand Up @@ -246,7 +173,7 @@ private synchronized void stopTimer() {
expiringTimer = null;
}

private Future<String> fetchFreshToken() {
private Mono<String> fetchFreshToken() {
return host.fetchFreshToken();
}

Expand All @@ -265,8 +192,8 @@ private class TokenExpiringTask extends TimerTask {
@Override
public void run() {
try {
Future<String> tokenStringFuture = tokenCache.fetchFreshToken();
tokenCache.setToken(tokenStringFuture.get());
Mono<String> tokenAsync = tokenCache.fetchFreshToken();
tokenCache.setToken(tokenAsync.block());
} catch (Exception exception) {
logger.logExceptionAsError(new RuntimeException(exception));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.
package com.azure.communication.common;

import java.util.concurrent.Future;
import reactor.core.publisher.Mono;

/**
* Interface to provide capacity to fetch fresh token
Expand All @@ -12,5 +12,5 @@ public interface TokenRefresher {
* Asynchronous call to fetch a fresh token
* @return Wrapper for asynchronous call
*/
Future<String> getFetchTokenFuture();
Mono<String> getTokenAsync();
}
Loading