Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making databaseAccount read dynamic after V4 master merge #8466

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,28 @@

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionPolicy;
import com.azure.cosmos.DatabaseAccount;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import org.apache.commons.collections4.list.UnmodifiableList;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URISyntaxException;
import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -45,9 +47,13 @@ public class GlobalEndpointManager implements AutoCloseable {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Scheduler scheduler = Schedulers.fromExecutor(executor);
private volatile boolean isClosed;
private final AsyncCache<String, DatabaseAccount> databaseAccountAsyncCache;
private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true);
private volatile DatabaseAccount latestDatabaseAccount;

public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) {
this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000;
this.databaseAccountAsyncCache = new AsyncCache<>();
try {
this.locationCache = new LocationCache(
new ArrayList<>(connectionPolicy.getPreferredLocations() != null ?
Expand Down Expand Up @@ -159,6 +165,21 @@ public Mono<Void> refreshLocationAsync(DatabaseAccount databaseAccount, boolean
});
}

public Mono<DatabaseAccount> getDatabaseAccountFromCache(URI defaultEndpoint) {
return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).flatMap(databaseAccount -> {
if (databaseAccount != null) {
this.latestDatabaseAccount = databaseAccount;
}

Mono<Void> refreshLocationCompletable = this.refreshLocationAsync(databaseAccount, false);
return refreshLocationCompletable.then(Mono.just(databaseAccount));
}).single());
}

public DatabaseAccount getLatestDatabaseAccount() {
return this.latestDatabaseAccount;
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
}

private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount) {
return Mono.defer(() -> {
logger.debug("refreshLocationPrivateAsync() refreshing locations");
Expand Down Expand Up @@ -253,8 +274,27 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {
}

private Mono<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {
return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint)
.doOnNext(i -> logger.debug("account retrieved: {}", i)).single();
final GlobalEndpointManager that = this;
Callable<Mono<DatabaseAccount>> fetchDatabaseAccount = () -> {
return that.owner.getDatabaseAccountFromEndpoint(serviceEndpoint).doOnNext(databaseAccount -> {
if(databaseAccount != null) {
this.latestDatabaseAccount = databaseAccount;
}

logger.debug("account retrieved: {}", databaseAccount);
}).single();
};

Mono<DatabaseAccount> obsoleteValueMono = databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, fetchDatabaseAccount);
return obsoleteValueMono.flatMap(obsoleteValue -> {
if (firstTimeDatabaseAccountInitialization.compareAndSet(true, false)) {
return Mono.just(obsoleteValue);
}
return databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, obsoleteValue, fetchDatabaseAccount).doOnError(t -> {
//Putting back the old value in cache, this will avoid cache corruption
databaseAccountAsyncCache.set(StringUtils.EMPTY, obsoleteValue);
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
});
});
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,29 +250,12 @@ private RxDocumentClientImpl(URI serviceEndpoint,
}

private void initializeGatewayConfigurationReader() {
String resourceToken;
if(this.tokenResolver != null) {
resourceToken = this.tokenResolver.getAuthorizationToken(RequestVerb.GET, "", CosmosResourceType.System, null);
} else if(!this.hasAuthKeyResourceToken && this.authorizationTokenProvider == null) {
resourceToken = this.firstResourceTokenFromPermissionFeed;
} else {
assert this.masterKeyOrResourceToken != null || this.cosmosKeyCredential != null;
resourceToken = this.masterKeyOrResourceToken;
}

this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint,
this.hasAuthKeyResourceToken,
resourceToken,
this.connectionPolicy,
this.authorizationTokenProvider,
this.reactorHttpClient);

DatabaseAccount databaseAccount = this.gatewayConfigurationReader.initializeReaderAsync().block();
this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, this.globalEndpointManager);
DatabaseAccount databaseAccount = this.globalEndpointManager.getLatestDatabaseAccount();
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount);

// TODO: add support for openAsync
// https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589
this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block();
}

public void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,11 @@
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionPolicy;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.RequestVerb;
import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.DatabaseAccount;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ReplicationPolicy;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -42,118 +24,28 @@
*/
public class GatewayServiceConfigurationReader {

public static final String GATEWAY_READER_NOT_INITIALIZED = "GatewayServiceConfigurationReader has not been initialized";

public ReplicationPolicy userReplicationPolicy;
private ReplicationPolicy systemReplicationPolicy;
private ConsistencyLevel consistencyLevel;
private volatile boolean initialized;
private URI serviceEndpoint;
private final ConnectionPolicy connectionPolicy;
private Map<String, Object> queryEngineConfiguration;
private final BaseAuthorizationTokenProvider baseAuthorizationTokenProvider;
private final boolean hasAuthKeyResourceToken;
private final String authKeyResourceToken;
private HttpClient httpClient;
private GlobalEndpointManager globalEndpointManager;

public GatewayServiceConfigurationReader(URI serviceEndpoint, boolean hasResourceToken, String resourceToken,
ConnectionPolicy connectionPolicy, BaseAuthorizationTokenProvider baseAuthorizationTokenProvider,
HttpClient httpClient) {
public GatewayServiceConfigurationReader(URI serviceEndpoint, GlobalEndpointManager globalEndpointManager) {
this.serviceEndpoint = serviceEndpoint;
this.baseAuthorizationTokenProvider = baseAuthorizationTokenProvider;
this.hasAuthKeyResourceToken = hasResourceToken;
this.authKeyResourceToken = resourceToken;
this.connectionPolicy = connectionPolicy;
this.httpClient = httpClient;
this.globalEndpointManager = globalEndpointManager;
this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).block();
}

public ReplicationPolicy getUserReplicationPolicy() {
this.throwIfNotInitialized();
return this.userReplicationPolicy;
return BridgeInternal.getReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount());
}

public ReplicationPolicy getSystemReplicationPolicy() {
this.throwIfNotInitialized();
return this.systemReplicationPolicy;
}

public boolean enableAuthorization() {
return true;
return BridgeInternal.getSystemReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount());
}

public ConsistencyLevel getDefaultConsistencyLevel() {
this.throwIfNotInitialized();
return this.consistencyLevel;
}

public void setDefaultConsistencyLevel(ConsistencyLevel value) {
this.throwIfNotInitialized();
this.consistencyLevel = value;
return BridgeInternal.getConsistencyPolicy(this.globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel();
}

public Map<String, Object> getQueryEngineConfiguration() {
this.throwIfNotInitialized();
return this.queryEngineConfiguration;
}

private Mono<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {

HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION);

UserAgentContainer userAgentContainer = new UserAgentContainer();
String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix();
if (userAgentSuffix != null && userAgentSuffix.length() > 0) {
userAgentContainer.setSuffix(userAgentSuffix);
}

httpHeaders.set(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent());
httpHeaders.set(HttpConstants.HttpHeaders.API_TYPE, Constants.Properties.SQL_API_TYPE);
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved

String xDate = Utils.nowAsRFC1123();
httpHeaders.set(HttpConstants.HttpHeaders.X_DATE, xDate);

String authorizationToken;
if (this.hasAuthKeyResourceToken || baseAuthorizationTokenProvider == null) {
authorizationToken = HttpUtils.urlEncode(this.authKeyResourceToken);
} else {
// Retrieve the document service properties.
Map<String, String> header = new HashMap<>();
header.put(HttpConstants.HttpHeaders.X_DATE, xDate);
authorizationToken = baseAuthorizationTokenProvider
.generateKeyAuthorizationSignature(RequestVerb.GET, serviceEndpoint, header);
}
httpHeaders.set(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken);

HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, serviceEndpoint, serviceEndpoint.getPort(), httpHeaders);
Mono<HttpResponse> httpResponse = httpClient.send(httpRequest);
return toDatabaseAccountObservable(httpResponse, httpRequest);
}

public Mono<DatabaseAccount> initializeReaderAsync() {
return GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.serviceEndpoint,

new ArrayList<>(this.connectionPolicy.getPreferredLocations()), url -> {
return getDatabaseAccountAsync(url);

}).doOnSuccess(databaseAccount -> {
userReplicationPolicy = BridgeInternal.getReplicationPolicy(databaseAccount);
systemReplicationPolicy = BridgeInternal.getSystemReplicationPolicy(databaseAccount);
queryEngineConfiguration = BridgeInternal.getQueryEngineConfiuration(databaseAccount);
consistencyLevel = BridgeInternal.getConsistencyPolicy(databaseAccount).getDefaultConsistencyLevel();
initialized = true;
});
}

private Mono<DatabaseAccount> toDatabaseAccountObservable(Mono<HttpResponse> httpResponse, HttpRequest httpRequest) {

return HttpClientUtils.parseResponseAsync(httpResponse, httpRequest)
.map(rxDocumentServiceResponse -> rxDocumentServiceResponse.getResource(DatabaseAccount.class));
}

private void throwIfNotInitialized() {
if (!this.initialized) {
throw new IllegalArgumentException(GATEWAY_READER_NOT_INITIALIZED);
}
return BridgeInternal.getQueryEngineConfiuration(this.globalEndpointManager.getLatestDatabaseAccount());
}
}
Loading