From 1d8856a1dd10b9afa60d3da9baba7096c8256ed6 Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Mon, 24 Feb 2020 16:29:06 -0500 Subject: [PATCH 1/6] making databaseAccount update dynamic --- .../implementation/GlobalEndpointManager.java | 50 +++++- .../implementation/RxDocumentClientImpl.java | 21 +-- .../GatewayServiceConfigurationReader.java | 124 +------------- ...GatewayServiceConfigurationReaderTest.java | 160 ++++++++++-------- .../GatewayServiceConfiguratorReaderMock.java | 1 - .../GlobalEndPointManagerTest.java | 14 +- .../directconnectivity/ReflectionUtils.java | 13 ++ .../azure/cosmos/rx/TokenResolverTest.java | 4 +- 8 files changed, 165 insertions(+), 222 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index dcc3762eaf3db..0e630b8f84b4a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -3,12 +3,15 @@ package com.azure.cosmos.implementation; -import com.azure.cosmos.implementation.routing.LocationCache; -import com.azure.cosmos.implementation.routing.LocationHelper; import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.DatabaseAccount; +import com.azure.cosmos.DatabaseAccountLocation; +import com.azure.cosmos.implementation.caches.AsyncCache; +import com.azure.cosmos.implementation.routing.LocationCache; +import com.azure.cosmos.implementation.routing.LocationHelper; import org.apache.commons.collections4.list.UnmodifiableList; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -16,13 +19,13 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import java.net.URISyntaxException; import java.net.URI; import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,9 +48,13 @@ public class GlobalEndpointManager implements AutoCloseable { private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Scheduler scheduler = Schedulers.fromExecutor(executor); private volatile boolean isClosed; + private final AsyncCache databaseAccountAsyncCache; + private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true); + private volatile DatabaseAccount latestDatabaseAccount; public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) { this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000; + this.databaseAccountAsyncCache = new AsyncCache<>(); try { this.locationCache = new LocationCache( new ArrayList<>(connectionPolicy.getPreferredLocations() != null ? @@ -159,6 +166,20 @@ public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean }); } + public Mono getDatabaseAccountFromCache(URI defaultEndpoint) { + return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).single().doOnSuccess(databaseAccount -> { + if(databaseAccount != null) { + this.latestDatabaseAccount = databaseAccount; + } + + this.refreshLocationAsync(databaseAccount, false); + })); + } + + public DatabaseAccount getLatestDatabaseAccount() { + return this.latestDatabaseAccount; + } + private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) { return Mono.defer(() -> { logger.debug("refreshLocationPrivateAsync() refreshing locations"); @@ -253,8 +274,27 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { } private Mono getDatabaseAccountAsync(URI serviceEndpoint) { - return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint) - .doOnNext(i -> logger.debug("account retrieved: {}", i)).single(); + final GlobalEndpointManager that = this; + Callable> fetchDatabaseAccount = () -> { + return that.owner.getDatabaseAccountFromEndpoint(serviceEndpoint).doOnNext(databaseAccount -> { + if(databaseAccount != null) { + this.latestDatabaseAccount = databaseAccount; + } + + logger.debug("account retrieved: {}", databaseAccount); + }).single(); + }; + + Mono obsoleteValueMono = databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, fetchDatabaseAccount); + return obsoleteValueMono.flatMap(obsoleteValue -> { + if (firstTimeDatabaseAccountInitialization.compareAndSet(true, false)) { + return Mono.just(obsoleteValue); + } + return databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, obsoleteValue, fetchDatabaseAccount).doOnError(t -> { + //Putting back the old value in cache, this will avoid cache corruption + databaseAccountAsyncCache.set(StringUtils.EMPTY, obsoleteValue); + }); + }); } public boolean isClosed() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index a5e22fe5b5c19..e09fb1680f970 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -250,29 +250,12 @@ private RxDocumentClientImpl(URI serviceEndpoint, } private void initializeGatewayConfigurationReader() { - String resourceToken; - if(this.tokenResolver != null) { - resourceToken = this.tokenResolver.getAuthorizationToken(RequestVerb.GET, "", CosmosResourceType.System, null); - } else if(!this.hasAuthKeyResourceToken && this.authorizationTokenProvider == null) { - resourceToken = this.firstResourceTokenFromPermissionFeed; - } else { - assert this.masterKeyOrResourceToken != null || this.cosmosKeyCredential != null; - resourceToken = this.masterKeyOrResourceToken; - } - - this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, - this.hasAuthKeyResourceToken, - resourceToken, - this.connectionPolicy, - this.authorizationTokenProvider, - this.reactorHttpClient); - - DatabaseAccount databaseAccount = this.gatewayConfigurationReader.initializeReaderAsync().block(); + this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, this.globalEndpointManager); + DatabaseAccount databaseAccount = this.globalEndpointManager.getLatestDatabaseAccount(); this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount); // TODO: add support for openAsync // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 - this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block(); } public void init() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java index d9f562885cac4..a9af7d876f30b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java @@ -4,29 +4,11 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.BridgeInternal; -import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.RequestVerb; -import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider; -import com.azure.cosmos.implementation.Constants; -import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.GlobalEndpointManager; -import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ReplicationPolicy; -import com.azure.cosmos.implementation.UserAgentContainer; -import com.azure.cosmos.implementation.Utils; -import com.azure.cosmos.implementation.http.HttpClient; -import com.azure.cosmos.implementation.http.HttpHeaders; -import com.azure.cosmos.implementation.http.HttpRequest; -import com.azure.cosmos.implementation.http.HttpResponse; -import io.netty.handler.codec.http.HttpMethod; -import reactor.core.publisher.Mono; -import java.net.MalformedURLException; import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; /** @@ -42,118 +24,28 @@ */ public class GatewayServiceConfigurationReader { - public static final String GATEWAY_READER_NOT_INITIALIZED = "GatewayServiceConfigurationReader has not been initialized"; - - public ReplicationPolicy userReplicationPolicy; - private ReplicationPolicy systemReplicationPolicy; - private ConsistencyLevel consistencyLevel; - private volatile boolean initialized; private URI serviceEndpoint; - private final ConnectionPolicy connectionPolicy; - private Map queryEngineConfiguration; - private final BaseAuthorizationTokenProvider baseAuthorizationTokenProvider; - private final boolean hasAuthKeyResourceToken; - private final String authKeyResourceToken; - private HttpClient httpClient; + private GlobalEndpointManager globalEndpointManager; - public GatewayServiceConfigurationReader(URI serviceEndpoint, boolean hasResourceToken, String resourceToken, - ConnectionPolicy connectionPolicy, BaseAuthorizationTokenProvider baseAuthorizationTokenProvider, - HttpClient httpClient) { + public GatewayServiceConfigurationReader(URI serviceEndpoint, GlobalEndpointManager globalEndpointManager) { this.serviceEndpoint = serviceEndpoint; - this.baseAuthorizationTokenProvider = baseAuthorizationTokenProvider; - this.hasAuthKeyResourceToken = hasResourceToken; - this.authKeyResourceToken = resourceToken; - this.connectionPolicy = connectionPolicy; - this.httpClient = httpClient; + this.globalEndpointManager = globalEndpointManager; + this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).block(); } public ReplicationPolicy getUserReplicationPolicy() { - this.throwIfNotInitialized(); - return this.userReplicationPolicy; + return BridgeInternal.getReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount()); } public ReplicationPolicy getSystemReplicationPolicy() { - this.throwIfNotInitialized(); - return this.systemReplicationPolicy; - } - - public boolean enableAuthorization() { - return true; + return BridgeInternal.getSystemReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount()); } public ConsistencyLevel getDefaultConsistencyLevel() { - this.throwIfNotInitialized(); - return this.consistencyLevel; - } - - public void setDefaultConsistencyLevel(ConsistencyLevel value) { - this.throwIfNotInitialized(); - this.consistencyLevel = value; + return BridgeInternal.getConsistencyPolicy(this.globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel(); } public Map getQueryEngineConfiguration() { - this.throwIfNotInitialized(); - return this.queryEngineConfiguration; - } - - private Mono getDatabaseAccountAsync(URI serviceEndpoint) { - - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.set(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION); - - UserAgentContainer userAgentContainer = new UserAgentContainer(); - String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix(); - if (userAgentSuffix != null && userAgentSuffix.length() > 0) { - userAgentContainer.setSuffix(userAgentSuffix); - } - - httpHeaders.set(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); - httpHeaders.set(HttpConstants.HttpHeaders.API_TYPE, Constants.Properties.SQL_API_TYPE); - - String xDate = Utils.nowAsRFC1123(); - httpHeaders.set(HttpConstants.HttpHeaders.X_DATE, xDate); - - String authorizationToken; - if (this.hasAuthKeyResourceToken || baseAuthorizationTokenProvider == null) { - authorizationToken = HttpUtils.urlEncode(this.authKeyResourceToken); - } else { - // Retrieve the document service properties. - Map header = new HashMap<>(); - header.put(HttpConstants.HttpHeaders.X_DATE, xDate); - authorizationToken = baseAuthorizationTokenProvider - .generateKeyAuthorizationSignature(RequestVerb.GET, serviceEndpoint, header); - } - httpHeaders.set(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken); - - HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, serviceEndpoint, serviceEndpoint.getPort(), httpHeaders); - Mono httpResponse = httpClient.send(httpRequest); - return toDatabaseAccountObservable(httpResponse, httpRequest); - } - - public Mono initializeReaderAsync() { - return GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.serviceEndpoint, - - new ArrayList<>(this.connectionPolicy.getPreferredLocations()), url -> { - return getDatabaseAccountAsync(url); - - }).doOnSuccess(databaseAccount -> { - userReplicationPolicy = BridgeInternal.getReplicationPolicy(databaseAccount); - systemReplicationPolicy = BridgeInternal.getSystemReplicationPolicy(databaseAccount); - queryEngineConfiguration = BridgeInternal.getQueryEngineConfiuration(databaseAccount); - consistencyLevel = BridgeInternal.getConsistencyPolicy(databaseAccount).getDefaultConsistencyLevel(); - initialized = true; - }); - } - - private Mono toDatabaseAccountObservable(Mono httpResponse, HttpRequest httpRequest) { - - return HttpClientUtils.parseResponseAsync(httpResponse, httpRequest) - .map(rxDocumentServiceResponse -> rxDocumentServiceResponse.getResource(DatabaseAccount.class)); - } - - private void throwIfNotInitialized() { - if (!this.initialized) { - throw new IllegalArgumentException(GATEWAY_READER_NOT_INITIALIZED); - } + return BridgeInternal.getQueryEngineConfiuration(this.globalEndpointManager.getLatestDatabaseAccount()); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java index 3eee10ca6cfce..fa2cc58780834 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java @@ -5,11 +5,17 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosKeyCredential; import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.AsyncDocumentClient.Builder; import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.DatabaseAccountManagerInternal; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.SpyClientUnderTestFactory; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TestSuiteBase; @@ -21,6 +27,7 @@ import io.netty.buffer.ByteBufUtil; import io.reactivex.subscribers.TestSubscriber; import org.apache.commons.io.IOUtils; +import org.mockito.Matchers; import org.mockito.Mockito; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -51,57 +58,98 @@ public GatewayServiceConfigurationReaderTest(Builder clientBuilder) { super(clientBuilder); } - @BeforeClass(groups = "simple") - public void before_GatewayServiceConfigurationReaderTest() throws Exception { - client = clientBuilder().build(); - SpyClientUnderTestFactory.ClientUnderTest clientUnderTest = SpyClientUnderTestFactory.createClientUnderTest(this.clientBuilder()); - HttpClient httpClient = clientUnderTest.getSpyHttpClient(); - baseAuthorizationTokenProvider = new BaseAuthorizationTokenProvider(new CosmosKeyCredential(TestConfigurations.MASTER_KEY)); - connectionPolicy = ConnectionPolicy.getDefaultPolicy(); - mockHttpClient = Mockito.mock(HttpClient.class); - mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - false, TestConfigurations.MASTER_KEY, connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient); - - gatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - false, - TestConfigurations.MASTER_KEY, - connectionPolicy, - baseAuthorizationTokenProvider, - httpClient); - databaseAccountJson = IOUtils - .toString(getClass().getClassLoader().getResourceAsStream("databaseAccount.json"), "UTF-8"); - expectedDatabaseAccount = new DatabaseAccount(databaseAccountJson); - HttpResponse mockResponse = getMockResponse(databaseAccountJson); - Mockito.when(mockHttpClient.send(Mockito.any(HttpRequest.class))).thenReturn(Mono.just(mockResponse)); - } - @AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); } @Test(groups = "simple") - public void mockInitializeReaderAsync() { - Mono databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount, expectedDatabaseAccount); + public void clientInitialization() throws Exception { + client = this.clientBuilder().build(); + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) client; + GatewayServiceConfigurationReader serviceConfigurationReader = ReflectionUtils.getServiceConfigurationReader(rxDocumentClient); + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(serviceConfigurationReader); + Mono databaseAccountMono = globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)); + validateSuccess(databaseAccountMono); + assertThat(serviceConfigurationReader.getDefaultConsistencyLevel()).isNotNull(); + assertThat(serviceConfigurationReader.getQueryEngineConfiguration()).isNotNull(); + assertThat(serviceConfigurationReader.getSystemReplicationPolicy()).isNotNull(); + assertThat(serviceConfigurationReader.getSystemReplicationPolicy()).isNotNull(); } @Test(groups = "simple") - public void mockInitializeReaderAsyncWithResourceToken() throws Exception { - HttpResponse mockResponse = getMockResponse(databaseAccountJson); - Mockito.when(mockHttpClient.send(Mockito.any(HttpRequest.class))).thenReturn(Mono.just(mockResponse)); - - mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - true, "SampleResourceToken", connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient); - - Mono databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount, expectedDatabaseAccount); + public void configurationPropertyReads() throws Exception { + DatabaseAccountManagerInternal databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI(TestConfigurations.HOST)); + GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, new ConnectionPolicy(), new Configs()); + ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); + globalEndpointManager.init(); + + GatewayServiceConfigurationReader configurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), globalEndpointManager); + assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isFalse(); + assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(5); + assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(5); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + + //Testing scenario of scheduled cache refresh with error + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN))); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); } @Test(groups = "simple") - public void initializeReaderAsync() { - Mono databaseAccount = gatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount); + public void configurationPropertyReadsViaCache() throws Exception { + DatabaseAccountManagerInternal databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI(TestConfigurations.HOST)); + GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, new ConnectionPolicy(), new Configs()); + ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); + globalEndpointManager.init(); + + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); + Thread.sleep(2000); + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isFalse(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(5); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(5); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); + Thread.sleep(2000); + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + + //Testing scenario of scheduled cache refresh with error + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN))); + Thread.sleep(2000); + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); } public static void validateSuccess(Mono observable) { @@ -117,38 +165,4 @@ public static void validateSuccess(Mono observable) { assertThat(BridgeInternal.getReplicationPolicy(databaseAccount)).isNotNull(); assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount)).isNotNull(); } - - public static void validateSuccess(Mono observable, DatabaseAccount expectedDatabaseAccount) { - TestSubscriber testSubscriber = new TestSubscriber(); - - observable.subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS); - testSubscriber.assertNoErrors(); - testSubscriber.assertComplete(); - testSubscriber.assertValueCount(1); - DatabaseAccount databaseAccount = testSubscriber.values().get(0); - assertThat(databaseAccount.getId()).isEqualTo(expectedDatabaseAccount.getId()); - assertThat(BridgeInternal.getAddressesLink(databaseAccount)) - .isEqualTo(BridgeInternal.getAddressesLink(expectedDatabaseAccount)); - assertThat(databaseAccount.getWritableLocations().iterator().next().getEndpoint()) - .isEqualTo(expectedDatabaseAccount.getWritableLocations().iterator().next().getEndpoint()); - assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount).getMaxReplicaSetSize()) - .isEqualTo(BridgeInternal.getSystemReplicationPolicy(expectedDatabaseAccount).getMaxReplicaSetSize()); - assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount).getMaxReplicaSetSize()) - .isEqualTo(BridgeInternal.getSystemReplicationPolicy(expectedDatabaseAccount).getMaxReplicaSetSize()); - assertThat(BridgeInternal.getQueryEngineConfiuration(databaseAccount)) - .isEqualTo(BridgeInternal.getQueryEngineConfiuration(expectedDatabaseAccount)); - } - - private HttpResponse getMockResponse(String databaseAccountJson) { - HttpResponse httpResponse = Mockito.mock(HttpResponse.class); - Mockito.doReturn(200).when(httpResponse).statusCode(); - Mockito.doReturn(Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson))) - .when(httpResponse).body(); - Mockito.doReturn(Mono.just(databaseAccountJson)) - .when(httpResponse).bodyAsString(); - - Mockito.doReturn(new HttpHeaders()).when(httpResponse).headers(); - return httpResponse; - } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java index d7859b4a1b40e..92c18eba01847 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java @@ -47,7 +47,6 @@ public GatewayServiceConfiguratorReaderMock(ReplicationPolicy userReplicationPol ConsistencyLevel defaultConsistencyLevel) { this.gatewayServiceConfigurationReader = Mockito.mock(GatewayServiceConfigurationReader.class); - Mockito.doReturn(Mono.just(Mockito.mock(DatabaseAccount.class))).when(this.gatewayServiceConfigurationReader).initializeReaderAsync(); Mockito.doReturn(defaultConsistencyLevel).when(this.gatewayServiceConfigurationReader).getDefaultConsistencyLevel(); Mockito.doReturn(systemReplicationPolicy).when(this.gatewayServiceConfigurationReader).getSystemReplicationPolicy(); Mockito.doReturn(userReplicationPolicy).when(this.gatewayServiceConfigurationReader).getUserReplicationPolicy(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java index 92ef88cf54b8b..7fab9e5fd1d05 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java @@ -32,7 +32,7 @@ public class GlobalEndPointManagerTest { protected static final int TIMEOUT = 6000000; DatabaseAccountManagerInternal databaseAccountManagerInternal; - private String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + static String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"}]," + "\"readableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false," + @@ -44,18 +44,18 @@ public class GlobalEndPointManagerTest { "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n"; - private String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + static String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure" + ".com:443/\"}],\"readableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}]," + - "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + - "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":5}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Eventual\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":5}," + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + - "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":false}\"}"; - private String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + static String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure" + ".com:443/\"}],\"readableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure.com:443/\"}]," + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + @@ -66,7 +66,7 @@ public class GlobalEndPointManagerTest { "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; - private String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + static String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"readableLocations\":[{\"name\":\"East US\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents" + diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index c78fe6b2b5f1b..d420ea1a4d654 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -8,6 +8,7 @@ import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.CosmosClient; import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.http.HttpClient; import org.apache.commons.lang3.reflect.FieldUtils; @@ -82,4 +83,16 @@ public static AsyncDocumentClient getAsyncDocumentClient(CosmosAsyncClient clien public static void setAsyncDocumentClient(CosmosAsyncClient client, RxDocumentClientImpl rxClient) { set(client, rxClient, "asyncDocumentClient"); } + + public static GatewayServiceConfigurationReader getServiceConfigurationReader(RxDocumentClientImpl rxDocumentClient){ + return get(GatewayServiceConfigurationReader.class, rxDocumentClient, "gatewayConfigurationReader"); + } + + public static GlobalEndpointManager getGlobalEndpointManager(GatewayServiceConfigurationReader serviceConfigurationReader){ + return get(GlobalEndpointManager.class, serviceConfigurationReader, "globalEndpointManager"); + } + + public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ + set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java index 9911b3cde18a1..8aedb0fdd5702 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java @@ -507,7 +507,9 @@ private Permission getPermission(Resource resource, String permissionId, Permiss private TokenResolver getTokenResolver(PermissionMode permissionMode) { return (RequestVerb requestVerb, String resourceIdOrFullName, CosmosResourceType resourceType, Map properties) -> { - if (permissionMode == null) { + if(resourceType.equals(CosmosResourceType.System)) { + return readPermission.getToken(); + } if (permissionMode == null) { return "invalid"; } else if (permissionMode.equals(PermissionMode.READ)) { return readPermission.getToken(); From 390f8b2ff26e1e49884b860a3d35fdd216442eed Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Mon, 24 Feb 2020 16:39:51 -0500 Subject: [PATCH 2/6] Revert "making databaseAccount update dynamic" This reverts commit 1d8856a1dd10b9afa60d3da9baba7096c8256ed6. --- .../implementation/GlobalEndpointManager.java | 50 +----- .../implementation/RxDocumentClientImpl.java | 21 ++- .../GatewayServiceConfigurationReader.java | 124 +++++++++++++- ...GatewayServiceConfigurationReaderTest.java | 160 ++++++++---------- .../GatewayServiceConfiguratorReaderMock.java | 1 + .../GlobalEndPointManagerTest.java | 14 +- .../directconnectivity/ReflectionUtils.java | 13 -- .../azure/cosmos/rx/TokenResolverTest.java | 4 +- 8 files changed, 222 insertions(+), 165 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index 0e630b8f84b4a..dcc3762eaf3db 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -3,15 +3,12 @@ package com.azure.cosmos.implementation; +import com.azure.cosmos.implementation.routing.LocationCache; +import com.azure.cosmos.implementation.routing.LocationHelper; import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.DatabaseAccount; -import com.azure.cosmos.DatabaseAccountLocation; -import com.azure.cosmos.implementation.caches.AsyncCache; -import com.azure.cosmos.implementation.routing.LocationCache; -import com.azure.cosmos.implementation.routing.LocationHelper; import org.apache.commons.collections4.list.UnmodifiableList; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -19,13 +16,13 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import java.net.URISyntaxException; import java.net.URI; import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,13 +45,9 @@ public class GlobalEndpointManager implements AutoCloseable { private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Scheduler scheduler = Schedulers.fromExecutor(executor); private volatile boolean isClosed; - private final AsyncCache databaseAccountAsyncCache; - private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true); - private volatile DatabaseAccount latestDatabaseAccount; public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) { this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000; - this.databaseAccountAsyncCache = new AsyncCache<>(); try { this.locationCache = new LocationCache( new ArrayList<>(connectionPolicy.getPreferredLocations() != null ? @@ -166,20 +159,6 @@ public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean }); } - public Mono getDatabaseAccountFromCache(URI defaultEndpoint) { - return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).single().doOnSuccess(databaseAccount -> { - if(databaseAccount != null) { - this.latestDatabaseAccount = databaseAccount; - } - - this.refreshLocationAsync(databaseAccount, false); - })); - } - - public DatabaseAccount getLatestDatabaseAccount() { - return this.latestDatabaseAccount; - } - private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) { return Mono.defer(() -> { logger.debug("refreshLocationPrivateAsync() refreshing locations"); @@ -274,27 +253,8 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { } private Mono getDatabaseAccountAsync(URI serviceEndpoint) { - final GlobalEndpointManager that = this; - Callable> fetchDatabaseAccount = () -> { - return that.owner.getDatabaseAccountFromEndpoint(serviceEndpoint).doOnNext(databaseAccount -> { - if(databaseAccount != null) { - this.latestDatabaseAccount = databaseAccount; - } - - logger.debug("account retrieved: {}", databaseAccount); - }).single(); - }; - - Mono obsoleteValueMono = databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, fetchDatabaseAccount); - return obsoleteValueMono.flatMap(obsoleteValue -> { - if (firstTimeDatabaseAccountInitialization.compareAndSet(true, false)) { - return Mono.just(obsoleteValue); - } - return databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, obsoleteValue, fetchDatabaseAccount).doOnError(t -> { - //Putting back the old value in cache, this will avoid cache corruption - databaseAccountAsyncCache.set(StringUtils.EMPTY, obsoleteValue); - }); - }); + return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint) + .doOnNext(i -> logger.debug("account retrieved: {}", i)).single(); } public boolean isClosed() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index e09fb1680f970..a5e22fe5b5c19 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -250,12 +250,29 @@ private RxDocumentClientImpl(URI serviceEndpoint, } private void initializeGatewayConfigurationReader() { - this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, this.globalEndpointManager); - DatabaseAccount databaseAccount = this.globalEndpointManager.getLatestDatabaseAccount(); + String resourceToken; + if(this.tokenResolver != null) { + resourceToken = this.tokenResolver.getAuthorizationToken(RequestVerb.GET, "", CosmosResourceType.System, null); + } else if(!this.hasAuthKeyResourceToken && this.authorizationTokenProvider == null) { + resourceToken = this.firstResourceTokenFromPermissionFeed; + } else { + assert this.masterKeyOrResourceToken != null || this.cosmosKeyCredential != null; + resourceToken = this.masterKeyOrResourceToken; + } + + this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, + this.hasAuthKeyResourceToken, + resourceToken, + this.connectionPolicy, + this.authorizationTokenProvider, + this.reactorHttpClient); + + DatabaseAccount databaseAccount = this.gatewayConfigurationReader.initializeReaderAsync().block(); this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount); // TODO: add support for openAsync // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 + this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block(); } public void init() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java index a9af7d876f30b..d9f562885cac4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java @@ -4,11 +4,29 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.RequestVerb; +import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider; +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ReplicationPolicy; +import com.azure.cosmos.implementation.UserAgentContainer; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.http.HttpHeaders; +import com.azure.cosmos.implementation.http.HttpRequest; +import com.azure.cosmos.implementation.http.HttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import reactor.core.publisher.Mono; +import java.net.MalformedURLException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; /** @@ -24,28 +42,118 @@ */ public class GatewayServiceConfigurationReader { + public static final String GATEWAY_READER_NOT_INITIALIZED = "GatewayServiceConfigurationReader has not been initialized"; + + public ReplicationPolicy userReplicationPolicy; + private ReplicationPolicy systemReplicationPolicy; + private ConsistencyLevel consistencyLevel; + private volatile boolean initialized; private URI serviceEndpoint; - private GlobalEndpointManager globalEndpointManager; + private final ConnectionPolicy connectionPolicy; + private Map queryEngineConfiguration; + private final BaseAuthorizationTokenProvider baseAuthorizationTokenProvider; + private final boolean hasAuthKeyResourceToken; + private final String authKeyResourceToken; + private HttpClient httpClient; - public GatewayServiceConfigurationReader(URI serviceEndpoint, GlobalEndpointManager globalEndpointManager) { + public GatewayServiceConfigurationReader(URI serviceEndpoint, boolean hasResourceToken, String resourceToken, + ConnectionPolicy connectionPolicy, BaseAuthorizationTokenProvider baseAuthorizationTokenProvider, + HttpClient httpClient) { this.serviceEndpoint = serviceEndpoint; - this.globalEndpointManager = globalEndpointManager; - this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).block(); + this.baseAuthorizationTokenProvider = baseAuthorizationTokenProvider; + this.hasAuthKeyResourceToken = hasResourceToken; + this.authKeyResourceToken = resourceToken; + this.connectionPolicy = connectionPolicy; + this.httpClient = httpClient; } public ReplicationPolicy getUserReplicationPolicy() { - return BridgeInternal.getReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount()); + this.throwIfNotInitialized(); + return this.userReplicationPolicy; } public ReplicationPolicy getSystemReplicationPolicy() { - return BridgeInternal.getSystemReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount()); + this.throwIfNotInitialized(); + return this.systemReplicationPolicy; + } + + public boolean enableAuthorization() { + return true; } public ConsistencyLevel getDefaultConsistencyLevel() { - return BridgeInternal.getConsistencyPolicy(this.globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel(); + this.throwIfNotInitialized(); + return this.consistencyLevel; + } + + public void setDefaultConsistencyLevel(ConsistencyLevel value) { + this.throwIfNotInitialized(); + this.consistencyLevel = value; } public Map getQueryEngineConfiguration() { - return BridgeInternal.getQueryEngineConfiuration(this.globalEndpointManager.getLatestDatabaseAccount()); + this.throwIfNotInitialized(); + return this.queryEngineConfiguration; + } + + private Mono getDatabaseAccountAsync(URI serviceEndpoint) { + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.set(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION); + + UserAgentContainer userAgentContainer = new UserAgentContainer(); + String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix(); + if (userAgentSuffix != null && userAgentSuffix.length() > 0) { + userAgentContainer.setSuffix(userAgentSuffix); + } + + httpHeaders.set(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); + httpHeaders.set(HttpConstants.HttpHeaders.API_TYPE, Constants.Properties.SQL_API_TYPE); + + String xDate = Utils.nowAsRFC1123(); + httpHeaders.set(HttpConstants.HttpHeaders.X_DATE, xDate); + + String authorizationToken; + if (this.hasAuthKeyResourceToken || baseAuthorizationTokenProvider == null) { + authorizationToken = HttpUtils.urlEncode(this.authKeyResourceToken); + } else { + // Retrieve the document service properties. + Map header = new HashMap<>(); + header.put(HttpConstants.HttpHeaders.X_DATE, xDate); + authorizationToken = baseAuthorizationTokenProvider + .generateKeyAuthorizationSignature(RequestVerb.GET, serviceEndpoint, header); + } + httpHeaders.set(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken); + + HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, serviceEndpoint, serviceEndpoint.getPort(), httpHeaders); + Mono httpResponse = httpClient.send(httpRequest); + return toDatabaseAccountObservable(httpResponse, httpRequest); + } + + public Mono initializeReaderAsync() { + return GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.serviceEndpoint, + + new ArrayList<>(this.connectionPolicy.getPreferredLocations()), url -> { + return getDatabaseAccountAsync(url); + + }).doOnSuccess(databaseAccount -> { + userReplicationPolicy = BridgeInternal.getReplicationPolicy(databaseAccount); + systemReplicationPolicy = BridgeInternal.getSystemReplicationPolicy(databaseAccount); + queryEngineConfiguration = BridgeInternal.getQueryEngineConfiuration(databaseAccount); + consistencyLevel = BridgeInternal.getConsistencyPolicy(databaseAccount).getDefaultConsistencyLevel(); + initialized = true; + }); + } + + private Mono toDatabaseAccountObservable(Mono httpResponse, HttpRequest httpRequest) { + + return HttpClientUtils.parseResponseAsync(httpResponse, httpRequest) + .map(rxDocumentServiceResponse -> rxDocumentServiceResponse.getResource(DatabaseAccount.class)); + } + + private void throwIfNotInitialized() { + if (!this.initialized) { + throw new IllegalArgumentException(GATEWAY_READER_NOT_INITIALIZED); + } } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java index fa2cc58780834..3eee10ca6cfce 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java @@ -5,17 +5,11 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; -import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosKeyCredential; import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.AsyncDocumentClient.Builder; import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider; -import com.azure.cosmos.implementation.Configs; -import com.azure.cosmos.implementation.DatabaseAccountManagerInternal; -import com.azure.cosmos.implementation.GlobalEndpointManager; -import com.azure.cosmos.implementation.HttpConstants; -import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.SpyClientUnderTestFactory; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TestSuiteBase; @@ -27,7 +21,6 @@ import io.netty.buffer.ByteBufUtil; import io.reactivex.subscribers.TestSubscriber; import org.apache.commons.io.IOUtils; -import org.mockito.Matchers; import org.mockito.Mockito; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -58,98 +51,57 @@ public GatewayServiceConfigurationReaderTest(Builder clientBuilder) { super(clientBuilder); } + @BeforeClass(groups = "simple") + public void before_GatewayServiceConfigurationReaderTest() throws Exception { + client = clientBuilder().build(); + SpyClientUnderTestFactory.ClientUnderTest clientUnderTest = SpyClientUnderTestFactory.createClientUnderTest(this.clientBuilder()); + HttpClient httpClient = clientUnderTest.getSpyHttpClient(); + baseAuthorizationTokenProvider = new BaseAuthorizationTokenProvider(new CosmosKeyCredential(TestConfigurations.MASTER_KEY)); + connectionPolicy = ConnectionPolicy.getDefaultPolicy(); + mockHttpClient = Mockito.mock(HttpClient.class); + mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), + false, TestConfigurations.MASTER_KEY, connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient); + + gatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), + false, + TestConfigurations.MASTER_KEY, + connectionPolicy, + baseAuthorizationTokenProvider, + httpClient); + databaseAccountJson = IOUtils + .toString(getClass().getClassLoader().getResourceAsStream("databaseAccount.json"), "UTF-8"); + expectedDatabaseAccount = new DatabaseAccount(databaseAccountJson); + HttpResponse mockResponse = getMockResponse(databaseAccountJson); + Mockito.when(mockHttpClient.send(Mockito.any(HttpRequest.class))).thenReturn(Mono.just(mockResponse)); + } + @AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); } @Test(groups = "simple") - public void clientInitialization() throws Exception { - client = this.clientBuilder().build(); - RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) client; - GatewayServiceConfigurationReader serviceConfigurationReader = ReflectionUtils.getServiceConfigurationReader(rxDocumentClient); - GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(serviceConfigurationReader); - Mono databaseAccountMono = globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)); - validateSuccess(databaseAccountMono); - assertThat(serviceConfigurationReader.getDefaultConsistencyLevel()).isNotNull(); - assertThat(serviceConfigurationReader.getQueryEngineConfiguration()).isNotNull(); - assertThat(serviceConfigurationReader.getSystemReplicationPolicy()).isNotNull(); - assertThat(serviceConfigurationReader.getSystemReplicationPolicy()).isNotNull(); + public void mockInitializeReaderAsync() { + Mono databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync(); + validateSuccess(databaseAccount, expectedDatabaseAccount); } @Test(groups = "simple") - public void configurationPropertyReads() throws Exception { - DatabaseAccountManagerInternal databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); - Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI(TestConfigurations.HOST)); - GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, new ConnectionPolicy(), new Configs()); - ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); - globalEndpointManager.init(); - - GatewayServiceConfigurationReader configurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), globalEndpointManager); - assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); - assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); - assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); - - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); - Thread.sleep(2000); - assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); - assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isFalse(); - assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(5); - assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(5); - - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); - Thread.sleep(2000); - assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); - assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); - assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); - - //Testing scenario of scheduled cache refresh with error - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN))); - Thread.sleep(2000); - assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); - assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); - assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + public void mockInitializeReaderAsyncWithResourceToken() throws Exception { + HttpResponse mockResponse = getMockResponse(databaseAccountJson); + Mockito.when(mockHttpClient.send(Mockito.any(HttpRequest.class))).thenReturn(Mono.just(mockResponse)); + + mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), + true, "SampleResourceToken", connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient); + + Mono databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync(); + validateSuccess(databaseAccount, expectedDatabaseAccount); } @Test(groups = "simple") - public void configurationPropertyReadsViaCache() throws Exception { - DatabaseAccountManagerInternal databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); - Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI(TestConfigurations.HOST)); - GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, new ConnectionPolicy(), new Configs()); - ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); - globalEndpointManager.init(); - - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); - - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); - Thread.sleep(2000); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isFalse(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(5); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(5); - - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); - Thread.sleep(2000); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); - - //Testing scenario of scheduled cache refresh with error - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN))); - Thread.sleep(2000); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + public void initializeReaderAsync() { + Mono databaseAccount = gatewayServiceConfigurationReader.initializeReaderAsync(); + validateSuccess(databaseAccount); } public static void validateSuccess(Mono observable) { @@ -165,4 +117,38 @@ public static void validateSuccess(Mono observable) { assertThat(BridgeInternal.getReplicationPolicy(databaseAccount)).isNotNull(); assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount)).isNotNull(); } + + public static void validateSuccess(Mono observable, DatabaseAccount expectedDatabaseAccount) { + TestSubscriber testSubscriber = new TestSubscriber(); + + observable.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS); + testSubscriber.assertNoErrors(); + testSubscriber.assertComplete(); + testSubscriber.assertValueCount(1); + DatabaseAccount databaseAccount = testSubscriber.values().get(0); + assertThat(databaseAccount.getId()).isEqualTo(expectedDatabaseAccount.getId()); + assertThat(BridgeInternal.getAddressesLink(databaseAccount)) + .isEqualTo(BridgeInternal.getAddressesLink(expectedDatabaseAccount)); + assertThat(databaseAccount.getWritableLocations().iterator().next().getEndpoint()) + .isEqualTo(expectedDatabaseAccount.getWritableLocations().iterator().next().getEndpoint()); + assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount).getMaxReplicaSetSize()) + .isEqualTo(BridgeInternal.getSystemReplicationPolicy(expectedDatabaseAccount).getMaxReplicaSetSize()); + assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount).getMaxReplicaSetSize()) + .isEqualTo(BridgeInternal.getSystemReplicationPolicy(expectedDatabaseAccount).getMaxReplicaSetSize()); + assertThat(BridgeInternal.getQueryEngineConfiuration(databaseAccount)) + .isEqualTo(BridgeInternal.getQueryEngineConfiuration(expectedDatabaseAccount)); + } + + private HttpResponse getMockResponse(String databaseAccountJson) { + HttpResponse httpResponse = Mockito.mock(HttpResponse.class); + Mockito.doReturn(200).when(httpResponse).statusCode(); + Mockito.doReturn(Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson))) + .when(httpResponse).body(); + Mockito.doReturn(Mono.just(databaseAccountJson)) + .when(httpResponse).bodyAsString(); + + Mockito.doReturn(new HttpHeaders()).when(httpResponse).headers(); + return httpResponse; + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java index 92c18eba01847..d7859b4a1b40e 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java @@ -47,6 +47,7 @@ public GatewayServiceConfiguratorReaderMock(ReplicationPolicy userReplicationPol ConsistencyLevel defaultConsistencyLevel) { this.gatewayServiceConfigurationReader = Mockito.mock(GatewayServiceConfigurationReader.class); + Mockito.doReturn(Mono.just(Mockito.mock(DatabaseAccount.class))).when(this.gatewayServiceConfigurationReader).initializeReaderAsync(); Mockito.doReturn(defaultConsistencyLevel).when(this.gatewayServiceConfigurationReader).getDefaultConsistencyLevel(); Mockito.doReturn(systemReplicationPolicy).when(this.gatewayServiceConfigurationReader).getSystemReplicationPolicy(); Mockito.doReturn(userReplicationPolicy).when(this.gatewayServiceConfigurationReader).getUserReplicationPolicy(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java index 7fab9e5fd1d05..92ef88cf54b8b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java @@ -32,7 +32,7 @@ public class GlobalEndPointManagerTest { protected static final int TIMEOUT = 6000000; DatabaseAccountManagerInternal databaseAccountManagerInternal; - static String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + private String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"}]," + "\"readableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false," + @@ -44,18 +44,18 @@ public class GlobalEndPointManagerTest { "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n"; - static String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + private String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure" + ".com:443/\"}],\"readableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}]," + - "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":5}," + - "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Eventual\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":5}," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + - "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":false}\"}"; + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; - static String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + private String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure" + ".com:443/\"}],\"readableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure.com:443/\"}]," + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + @@ -66,7 +66,7 @@ public class GlobalEndPointManagerTest { "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; - static String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + private String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"readableLocations\":[{\"name\":\"East US\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents" + diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index d420ea1a4d654..c78fe6b2b5f1b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -8,7 +8,6 @@ import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.CosmosClient; import com.azure.cosmos.implementation.AsyncDocumentClient; -import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.http.HttpClient; import org.apache.commons.lang3.reflect.FieldUtils; @@ -83,16 +82,4 @@ public static AsyncDocumentClient getAsyncDocumentClient(CosmosAsyncClient clien public static void setAsyncDocumentClient(CosmosAsyncClient client, RxDocumentClientImpl rxClient) { set(client, rxClient, "asyncDocumentClient"); } - - public static GatewayServiceConfigurationReader getServiceConfigurationReader(RxDocumentClientImpl rxDocumentClient){ - return get(GatewayServiceConfigurationReader.class, rxDocumentClient, "gatewayConfigurationReader"); - } - - public static GlobalEndpointManager getGlobalEndpointManager(GatewayServiceConfigurationReader serviceConfigurationReader){ - return get(GlobalEndpointManager.class, serviceConfigurationReader, "globalEndpointManager"); - } - - public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ - set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); - } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java index 8aedb0fdd5702..9911b3cde18a1 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java @@ -507,9 +507,7 @@ private Permission getPermission(Resource resource, String permissionId, Permiss private TokenResolver getTokenResolver(PermissionMode permissionMode) { return (RequestVerb requestVerb, String resourceIdOrFullName, CosmosResourceType resourceType, Map properties) -> { - if(resourceType.equals(CosmosResourceType.System)) { - return readPermission.getToken(); - } if (permissionMode == null) { + if (permissionMode == null) { return "invalid"; } else if (permissionMode.equals(PermissionMode.READ)) { return readPermission.getToken(); From 3864034a967a964eae0768fcac3d37d3fdc38276 Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Mon, 24 Feb 2020 16:42:31 -0500 Subject: [PATCH 3/6] making databaseAccount update dynamic --- .../implementation/GlobalEndpointManager.java | 50 +++++- .../implementation/RxDocumentClientImpl.java | 21 +-- .../GatewayServiceConfigurationReader.java | 124 +------------- ...GatewayServiceConfigurationReaderTest.java | 160 ++++++++++-------- .../GatewayServiceConfiguratorReaderMock.java | 1 - .../GlobalEndPointManagerTest.java | 14 +- .../directconnectivity/ReflectionUtils.java | 13 ++ .../azure/cosmos/rx/TokenResolverTest.java | 4 +- 8 files changed, 165 insertions(+), 222 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index dcc3762eaf3db..0e630b8f84b4a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -3,12 +3,15 @@ package com.azure.cosmos.implementation; -import com.azure.cosmos.implementation.routing.LocationCache; -import com.azure.cosmos.implementation.routing.LocationHelper; import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.DatabaseAccount; +import com.azure.cosmos.DatabaseAccountLocation; +import com.azure.cosmos.implementation.caches.AsyncCache; +import com.azure.cosmos.implementation.routing.LocationCache; +import com.azure.cosmos.implementation.routing.LocationHelper; import org.apache.commons.collections4.list.UnmodifiableList; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -16,13 +19,13 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import java.net.URISyntaxException; import java.net.URI; import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,9 +48,13 @@ public class GlobalEndpointManager implements AutoCloseable { private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Scheduler scheduler = Schedulers.fromExecutor(executor); private volatile boolean isClosed; + private final AsyncCache databaseAccountAsyncCache; + private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true); + private volatile DatabaseAccount latestDatabaseAccount; public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) { this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000; + this.databaseAccountAsyncCache = new AsyncCache<>(); try { this.locationCache = new LocationCache( new ArrayList<>(connectionPolicy.getPreferredLocations() != null ? @@ -159,6 +166,20 @@ public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean }); } + public Mono getDatabaseAccountFromCache(URI defaultEndpoint) { + return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).single().doOnSuccess(databaseAccount -> { + if(databaseAccount != null) { + this.latestDatabaseAccount = databaseAccount; + } + + this.refreshLocationAsync(databaseAccount, false); + })); + } + + public DatabaseAccount getLatestDatabaseAccount() { + return this.latestDatabaseAccount; + } + private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) { return Mono.defer(() -> { logger.debug("refreshLocationPrivateAsync() refreshing locations"); @@ -253,8 +274,27 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { } private Mono getDatabaseAccountAsync(URI serviceEndpoint) { - return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint) - .doOnNext(i -> logger.debug("account retrieved: {}", i)).single(); + final GlobalEndpointManager that = this; + Callable> fetchDatabaseAccount = () -> { + return that.owner.getDatabaseAccountFromEndpoint(serviceEndpoint).doOnNext(databaseAccount -> { + if(databaseAccount != null) { + this.latestDatabaseAccount = databaseAccount; + } + + logger.debug("account retrieved: {}", databaseAccount); + }).single(); + }; + + Mono obsoleteValueMono = databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, fetchDatabaseAccount); + return obsoleteValueMono.flatMap(obsoleteValue -> { + if (firstTimeDatabaseAccountInitialization.compareAndSet(true, false)) { + return Mono.just(obsoleteValue); + } + return databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, obsoleteValue, fetchDatabaseAccount).doOnError(t -> { + //Putting back the old value in cache, this will avoid cache corruption + databaseAccountAsyncCache.set(StringUtils.EMPTY, obsoleteValue); + }); + }); } public boolean isClosed() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index a5e22fe5b5c19..e09fb1680f970 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -250,29 +250,12 @@ private RxDocumentClientImpl(URI serviceEndpoint, } private void initializeGatewayConfigurationReader() { - String resourceToken; - if(this.tokenResolver != null) { - resourceToken = this.tokenResolver.getAuthorizationToken(RequestVerb.GET, "", CosmosResourceType.System, null); - } else if(!this.hasAuthKeyResourceToken && this.authorizationTokenProvider == null) { - resourceToken = this.firstResourceTokenFromPermissionFeed; - } else { - assert this.masterKeyOrResourceToken != null || this.cosmosKeyCredential != null; - resourceToken = this.masterKeyOrResourceToken; - } - - this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, - this.hasAuthKeyResourceToken, - resourceToken, - this.connectionPolicy, - this.authorizationTokenProvider, - this.reactorHttpClient); - - DatabaseAccount databaseAccount = this.gatewayConfigurationReader.initializeReaderAsync().block(); + this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, this.globalEndpointManager); + DatabaseAccount databaseAccount = this.globalEndpointManager.getLatestDatabaseAccount(); this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount); // TODO: add support for openAsync // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589 - this.globalEndpointManager.refreshLocationAsync(databaseAccount, false).block(); } public void init() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java index d9f562885cac4..a9af7d876f30b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java @@ -4,29 +4,11 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.BridgeInternal; -import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.RequestVerb; -import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider; -import com.azure.cosmos.implementation.Constants; -import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.GlobalEndpointManager; -import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ReplicationPolicy; -import com.azure.cosmos.implementation.UserAgentContainer; -import com.azure.cosmos.implementation.Utils; -import com.azure.cosmos.implementation.http.HttpClient; -import com.azure.cosmos.implementation.http.HttpHeaders; -import com.azure.cosmos.implementation.http.HttpRequest; -import com.azure.cosmos.implementation.http.HttpResponse; -import io.netty.handler.codec.http.HttpMethod; -import reactor.core.publisher.Mono; -import java.net.MalformedURLException; import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; /** @@ -42,118 +24,28 @@ */ public class GatewayServiceConfigurationReader { - public static final String GATEWAY_READER_NOT_INITIALIZED = "GatewayServiceConfigurationReader has not been initialized"; - - public ReplicationPolicy userReplicationPolicy; - private ReplicationPolicy systemReplicationPolicy; - private ConsistencyLevel consistencyLevel; - private volatile boolean initialized; private URI serviceEndpoint; - private final ConnectionPolicy connectionPolicy; - private Map queryEngineConfiguration; - private final BaseAuthorizationTokenProvider baseAuthorizationTokenProvider; - private final boolean hasAuthKeyResourceToken; - private final String authKeyResourceToken; - private HttpClient httpClient; + private GlobalEndpointManager globalEndpointManager; - public GatewayServiceConfigurationReader(URI serviceEndpoint, boolean hasResourceToken, String resourceToken, - ConnectionPolicy connectionPolicy, BaseAuthorizationTokenProvider baseAuthorizationTokenProvider, - HttpClient httpClient) { + public GatewayServiceConfigurationReader(URI serviceEndpoint, GlobalEndpointManager globalEndpointManager) { this.serviceEndpoint = serviceEndpoint; - this.baseAuthorizationTokenProvider = baseAuthorizationTokenProvider; - this.hasAuthKeyResourceToken = hasResourceToken; - this.authKeyResourceToken = resourceToken; - this.connectionPolicy = connectionPolicy; - this.httpClient = httpClient; + this.globalEndpointManager = globalEndpointManager; + this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).block(); } public ReplicationPolicy getUserReplicationPolicy() { - this.throwIfNotInitialized(); - return this.userReplicationPolicy; + return BridgeInternal.getReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount()); } public ReplicationPolicy getSystemReplicationPolicy() { - this.throwIfNotInitialized(); - return this.systemReplicationPolicy; - } - - public boolean enableAuthorization() { - return true; + return BridgeInternal.getSystemReplicationPolicy(this.globalEndpointManager.getLatestDatabaseAccount()); } public ConsistencyLevel getDefaultConsistencyLevel() { - this.throwIfNotInitialized(); - return this.consistencyLevel; - } - - public void setDefaultConsistencyLevel(ConsistencyLevel value) { - this.throwIfNotInitialized(); - this.consistencyLevel = value; + return BridgeInternal.getConsistencyPolicy(this.globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel(); } public Map getQueryEngineConfiguration() { - this.throwIfNotInitialized(); - return this.queryEngineConfiguration; - } - - private Mono getDatabaseAccountAsync(URI serviceEndpoint) { - - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.set(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION); - - UserAgentContainer userAgentContainer = new UserAgentContainer(); - String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix(); - if (userAgentSuffix != null && userAgentSuffix.length() > 0) { - userAgentContainer.setSuffix(userAgentSuffix); - } - - httpHeaders.set(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); - httpHeaders.set(HttpConstants.HttpHeaders.API_TYPE, Constants.Properties.SQL_API_TYPE); - - String xDate = Utils.nowAsRFC1123(); - httpHeaders.set(HttpConstants.HttpHeaders.X_DATE, xDate); - - String authorizationToken; - if (this.hasAuthKeyResourceToken || baseAuthorizationTokenProvider == null) { - authorizationToken = HttpUtils.urlEncode(this.authKeyResourceToken); - } else { - // Retrieve the document service properties. - Map header = new HashMap<>(); - header.put(HttpConstants.HttpHeaders.X_DATE, xDate); - authorizationToken = baseAuthorizationTokenProvider - .generateKeyAuthorizationSignature(RequestVerb.GET, serviceEndpoint, header); - } - httpHeaders.set(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken); - - HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, serviceEndpoint, serviceEndpoint.getPort(), httpHeaders); - Mono httpResponse = httpClient.send(httpRequest); - return toDatabaseAccountObservable(httpResponse, httpRequest); - } - - public Mono initializeReaderAsync() { - return GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.serviceEndpoint, - - new ArrayList<>(this.connectionPolicy.getPreferredLocations()), url -> { - return getDatabaseAccountAsync(url); - - }).doOnSuccess(databaseAccount -> { - userReplicationPolicy = BridgeInternal.getReplicationPolicy(databaseAccount); - systemReplicationPolicy = BridgeInternal.getSystemReplicationPolicy(databaseAccount); - queryEngineConfiguration = BridgeInternal.getQueryEngineConfiuration(databaseAccount); - consistencyLevel = BridgeInternal.getConsistencyPolicy(databaseAccount).getDefaultConsistencyLevel(); - initialized = true; - }); - } - - private Mono toDatabaseAccountObservable(Mono httpResponse, HttpRequest httpRequest) { - - return HttpClientUtils.parseResponseAsync(httpResponse, httpRequest) - .map(rxDocumentServiceResponse -> rxDocumentServiceResponse.getResource(DatabaseAccount.class)); - } - - private void throwIfNotInitialized() { - if (!this.initialized) { - throw new IllegalArgumentException(GATEWAY_READER_NOT_INITIALIZED); - } + return BridgeInternal.getQueryEngineConfiuration(this.globalEndpointManager.getLatestDatabaseAccount()); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java index 3eee10ca6cfce..fa2cc58780834 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java @@ -5,11 +5,17 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosKeyCredential; import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.AsyncDocumentClient.Builder; import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.DatabaseAccountManagerInternal; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.SpyClientUnderTestFactory; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TestSuiteBase; @@ -21,6 +27,7 @@ import io.netty.buffer.ByteBufUtil; import io.reactivex.subscribers.TestSubscriber; import org.apache.commons.io.IOUtils; +import org.mockito.Matchers; import org.mockito.Mockito; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -51,57 +58,98 @@ public GatewayServiceConfigurationReaderTest(Builder clientBuilder) { super(clientBuilder); } - @BeforeClass(groups = "simple") - public void before_GatewayServiceConfigurationReaderTest() throws Exception { - client = clientBuilder().build(); - SpyClientUnderTestFactory.ClientUnderTest clientUnderTest = SpyClientUnderTestFactory.createClientUnderTest(this.clientBuilder()); - HttpClient httpClient = clientUnderTest.getSpyHttpClient(); - baseAuthorizationTokenProvider = new BaseAuthorizationTokenProvider(new CosmosKeyCredential(TestConfigurations.MASTER_KEY)); - connectionPolicy = ConnectionPolicy.getDefaultPolicy(); - mockHttpClient = Mockito.mock(HttpClient.class); - mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - false, TestConfigurations.MASTER_KEY, connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient); - - gatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - false, - TestConfigurations.MASTER_KEY, - connectionPolicy, - baseAuthorizationTokenProvider, - httpClient); - databaseAccountJson = IOUtils - .toString(getClass().getClassLoader().getResourceAsStream("databaseAccount.json"), "UTF-8"); - expectedDatabaseAccount = new DatabaseAccount(databaseAccountJson); - HttpResponse mockResponse = getMockResponse(databaseAccountJson); - Mockito.when(mockHttpClient.send(Mockito.any(HttpRequest.class))).thenReturn(Mono.just(mockResponse)); - } - @AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); } @Test(groups = "simple") - public void mockInitializeReaderAsync() { - Mono databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount, expectedDatabaseAccount); + public void clientInitialization() throws Exception { + client = this.clientBuilder().build(); + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) client; + GatewayServiceConfigurationReader serviceConfigurationReader = ReflectionUtils.getServiceConfigurationReader(rxDocumentClient); + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(serviceConfigurationReader); + Mono databaseAccountMono = globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)); + validateSuccess(databaseAccountMono); + assertThat(serviceConfigurationReader.getDefaultConsistencyLevel()).isNotNull(); + assertThat(serviceConfigurationReader.getQueryEngineConfiguration()).isNotNull(); + assertThat(serviceConfigurationReader.getSystemReplicationPolicy()).isNotNull(); + assertThat(serviceConfigurationReader.getSystemReplicationPolicy()).isNotNull(); } @Test(groups = "simple") - public void mockInitializeReaderAsyncWithResourceToken() throws Exception { - HttpResponse mockResponse = getMockResponse(databaseAccountJson); - Mockito.when(mockHttpClient.send(Mockito.any(HttpRequest.class))).thenReturn(Mono.just(mockResponse)); - - mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), - true, "SampleResourceToken", connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient); - - Mono databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount, expectedDatabaseAccount); + public void configurationPropertyReads() throws Exception { + DatabaseAccountManagerInternal databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI(TestConfigurations.HOST)); + GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, new ConnectionPolicy(), new Configs()); + ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); + globalEndpointManager.init(); + + GatewayServiceConfigurationReader configurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), globalEndpointManager); + assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isFalse(); + assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(5); + assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(5); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + + //Testing scenario of scheduled cache refresh with error + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN))); + Thread.sleep(2000); + assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); + assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); + assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); } @Test(groups = "simple") - public void initializeReaderAsync() { - Mono databaseAccount = gatewayServiceConfigurationReader.initializeReaderAsync(); - validateSuccess(databaseAccount); + public void configurationPropertyReadsViaCache() throws Exception { + DatabaseAccountManagerInternal databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); + Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI(TestConfigurations.HOST)); + GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, new ConnectionPolicy(), new Configs()); + ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); + globalEndpointManager.init(); + + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); + Thread.sleep(2000); + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isFalse(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(5); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(5); + + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); + Thread.sleep(2000); + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + + //Testing scenario of scheduled cache refresh with error + Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN))); + Thread.sleep(2000); + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); } public static void validateSuccess(Mono observable) { @@ -117,38 +165,4 @@ public static void validateSuccess(Mono observable) { assertThat(BridgeInternal.getReplicationPolicy(databaseAccount)).isNotNull(); assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount)).isNotNull(); } - - public static void validateSuccess(Mono observable, DatabaseAccount expectedDatabaseAccount) { - TestSubscriber testSubscriber = new TestSubscriber(); - - observable.subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS); - testSubscriber.assertNoErrors(); - testSubscriber.assertComplete(); - testSubscriber.assertValueCount(1); - DatabaseAccount databaseAccount = testSubscriber.values().get(0); - assertThat(databaseAccount.getId()).isEqualTo(expectedDatabaseAccount.getId()); - assertThat(BridgeInternal.getAddressesLink(databaseAccount)) - .isEqualTo(BridgeInternal.getAddressesLink(expectedDatabaseAccount)); - assertThat(databaseAccount.getWritableLocations().iterator().next().getEndpoint()) - .isEqualTo(expectedDatabaseAccount.getWritableLocations().iterator().next().getEndpoint()); - assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount).getMaxReplicaSetSize()) - .isEqualTo(BridgeInternal.getSystemReplicationPolicy(expectedDatabaseAccount).getMaxReplicaSetSize()); - assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount).getMaxReplicaSetSize()) - .isEqualTo(BridgeInternal.getSystemReplicationPolicy(expectedDatabaseAccount).getMaxReplicaSetSize()); - assertThat(BridgeInternal.getQueryEngineConfiuration(databaseAccount)) - .isEqualTo(BridgeInternal.getQueryEngineConfiuration(expectedDatabaseAccount)); - } - - private HttpResponse getMockResponse(String databaseAccountJson) { - HttpResponse httpResponse = Mockito.mock(HttpResponse.class); - Mockito.doReturn(200).when(httpResponse).statusCode(); - Mockito.doReturn(Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson))) - .when(httpResponse).body(); - Mockito.doReturn(Mono.just(databaseAccountJson)) - .when(httpResponse).bodyAsString(); - - Mockito.doReturn(new HttpHeaders()).when(httpResponse).headers(); - return httpResponse; - } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java index d7859b4a1b40e..92c18eba01847 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfiguratorReaderMock.java @@ -47,7 +47,6 @@ public GatewayServiceConfiguratorReaderMock(ReplicationPolicy userReplicationPol ConsistencyLevel defaultConsistencyLevel) { this.gatewayServiceConfigurationReader = Mockito.mock(GatewayServiceConfigurationReader.class); - Mockito.doReturn(Mono.just(Mockito.mock(DatabaseAccount.class))).when(this.gatewayServiceConfigurationReader).initializeReaderAsync(); Mockito.doReturn(defaultConsistencyLevel).when(this.gatewayServiceConfigurationReader).getDefaultConsistencyLevel(); Mockito.doReturn(systemReplicationPolicy).when(this.gatewayServiceConfigurationReader).getSystemReplicationPolicy(); Mockito.doReturn(userReplicationPolicy).when(this.gatewayServiceConfigurationReader).getUserReplicationPolicy(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java index 92ef88cf54b8b..7fab9e5fd1d05 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalEndPointManagerTest.java @@ -32,7 +32,7 @@ public class GlobalEndPointManagerTest { protected static final int TIMEOUT = 6000000; DatabaseAccountManagerInternal databaseAccountManagerInternal; - private String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + static String dbAccountJson1 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"}]," + "\"readableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false," + @@ -44,18 +44,18 @@ public class GlobalEndPointManagerTest { "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}\n"; - private String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + static String dbAccountJson2 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure" + ".com:443/\"}],\"readableLocations\":[{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}]," + - "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + - "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Session\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":5}," + + "\"userConsistencyPolicy\":{\"defaultConsistencyLevel\":\"Eventual\"},\"systemReplicationPolicy\":{\"minReplicaSetSize\":3,\"maxReplicasetSize\":5}," + "\"readPolicy\":{\"primaryReadCoefficient\":1,\"secondaryReadCoefficient\":1},\"queryEngineConfiguration\":\"{\\\"maxSqlQueryInputLength\\\":262144,\\\"maxJoinsPerSqlQuery\\\":5," + "\\\"maxLogicalAndPerSqlQuery\\\":500,\\\"maxLogicalOrPerSqlQuery\\\":500,\\\"maxUdfRefPerSqlQuery\\\":10,\\\"maxInExpressionItemsCount\\\":16000," + "\\\"queryMaxInMemorySortDocumentCount\\\":500,\\\"maxQueryRequestTimeoutFraction\\\":0.9,\\\"sqlAllowNonFiniteNumbers\\\":false,\\\"sqlAllowAggregateFunctions\\\":true," + "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + - "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":false}\"}"; - private String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + + static String dbAccountJson3 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\"," + "\"addresses\":\"//addresses/\",\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure" + ".com:443/\"}],\"readableLocations\":[{\"name\":\"West US\",\"databaseAccountEndpoint\":\"https://testaccount-westus.documents.azure.com:443/\"}]," + "\"enableMultipleWriteLocations\":false,\"userReplicationPolicy\":{\"asyncReplication\":false,\"minReplicaSetSize\":3,\"maxReplicasetSize\":4}," + @@ -66,7 +66,7 @@ public class GlobalEndPointManagerTest { "\\\"sqlAllowSubQuery\\\":true,\\\"sqlAllowScalarSubQuery\\\":true,\\\"allowNewKeywords\\\":true,\\\"sqlAllowLike\\\":false,\\\"sqlAllowGroupByClause\\\":true," + "\\\"maxSpatialQueryCells\\\":12,\\\"spatialMaxGeometryPointCount\\\":256,\\\"sqlAllowTop\\\":true,\\\"enableSpatialIndexing\\\":true}\"}"; - private String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + + static String dbAccountJson4 = "{\"_self\":\"\",\"id\":\"testaccount\",\"_rid\":\"testaccount.documents.azure.com\",\"media\":\"//media/\",\"addresses\":\"//addresses/\"," + "\"_dbs\":\"//dbs/\",\"writableLocations\":[{\"name\":\"East US\",\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents.azure.com:443/\"}],\"readableLocations\":[{\"name\":\"East US\"," + "\"databaseAccountEndpoint\":\"https://testaccount-eastus.documents.azure.com:443/\"},{\"name\":\"East Asia\",\"databaseAccountEndpoint\":\"https://testaccount-eastasia.documents" + diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index c78fe6b2b5f1b..d420ea1a4d654 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -8,6 +8,7 @@ import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.CosmosClient; import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.http.HttpClient; import org.apache.commons.lang3.reflect.FieldUtils; @@ -82,4 +83,16 @@ public static AsyncDocumentClient getAsyncDocumentClient(CosmosAsyncClient clien public static void setAsyncDocumentClient(CosmosAsyncClient client, RxDocumentClientImpl rxClient) { set(client, rxClient, "asyncDocumentClient"); } + + public static GatewayServiceConfigurationReader getServiceConfigurationReader(RxDocumentClientImpl rxDocumentClient){ + return get(GatewayServiceConfigurationReader.class, rxDocumentClient, "gatewayConfigurationReader"); + } + + public static GlobalEndpointManager getGlobalEndpointManager(GatewayServiceConfigurationReader serviceConfigurationReader){ + return get(GlobalEndpointManager.class, serviceConfigurationReader, "globalEndpointManager"); + } + + public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ + set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java index 9911b3cde18a1..8aedb0fdd5702 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TokenResolverTest.java @@ -507,7 +507,9 @@ private Permission getPermission(Resource resource, String permissionId, Permiss private TokenResolver getTokenResolver(PermissionMode permissionMode) { return (RequestVerb requestVerb, String resourceIdOrFullName, CosmosResourceType resourceType, Map properties) -> { - if (permissionMode == null) { + if(resourceType.equals(CosmosResourceType.System)) { + return readPermission.getToken(); + } if (permissionMode == null) { return "invalid"; } else if (permissionMode.equals(PermissionMode.READ)) { return readPermission.getToken(); From 39fa9221dd222332b74dce47d08ca5e7e4eface3 Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Wed, 26 Feb 2020 10:09:20 -0500 Subject: [PATCH 4/6] resolving comments --- .../implementation/GlobalEndpointManager.java | 10 ++-- ...GatewayServiceConfigurationReaderTest.java | 49 +++++++++++-------- .../directconnectivity/ReflectionUtils.java | 12 ++++- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index 0e630b8f84b4a..ce6231af5b3b6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -6,7 +6,6 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.DatabaseAccount; -import com.azure.cosmos.DatabaseAccountLocation; import com.azure.cosmos.implementation.caches.AsyncCache; import com.azure.cosmos.implementation.routing.LocationCache; import com.azure.cosmos.implementation.routing.LocationHelper; @@ -167,13 +166,14 @@ public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean } public Mono getDatabaseAccountFromCache(URI defaultEndpoint) { - return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).single().doOnSuccess(databaseAccount -> { - if(databaseAccount != null) { + return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).flatMap(databaseAccount -> { + if (databaseAccount != null) { this.latestDatabaseAccount = databaseAccount; } - this.refreshLocationAsync(databaseAccount, false); - })); + Mono refreshLocationCompletable = this.refreshLocationAsync(databaseAccount, false); + return refreshLocationCompletable.then(Mono.just(databaseAccount)); + }).single()); } public DatabaseAccount getLatestDatabaseAccount() { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java index fa2cc58780834..370213ca2349f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java @@ -19,10 +19,12 @@ import com.azure.cosmos.implementation.SpyClientUnderTestFactory; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TestSuiteBase; +import com.azure.cosmos.implementation.caches.AsyncCache; import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; import com.azure.cosmos.implementation.http.HttpResponse; +import com.azure.cosmos.implementation.routing.LocationCache; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.reactivex.subscribers.TestSubscriber; @@ -121,35 +123,40 @@ public void configurationPropertyReadsViaCache() throws Exception { Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI(TestConfigurations.HOST)); GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, new ConnectionPolicy(), new Configs()); - ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); - globalEndpointManager.init(); + LocationCache locationCache = ReflectionUtils.getLocationCache(globalEndpointManager); + + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(locationCache.getWriteEndpoints().get(0).toString()).contains("eastus"); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); - Thread.sleep(2000); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isFalse(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(5); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(5); + AsyncCache databaseAccountAsyncCache = ReflectionUtils.getDatabaseAccountAsyncCache(globalEndpointManager); + databaseAccountAsyncCache.clear(); + + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).get("enableSpatialIndexing")).isFalse(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(5); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(5); + assertThat(locationCache.getWriteEndpoints().get(0).toString()).contains("eastasia"); + databaseAccountAsyncCache.clear(); Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); - Thread.sleep(2000); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(locationCache.getWriteEndpoints().get(0).toString()).contains("westus"); //Testing scenario of scheduled cache refresh with error Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN))); - Thread.sleep(2000); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getLatestDatabaseAccount()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getLatestDatabaseAccount()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getLatestDatabaseAccount()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); + assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).get("enableSpatialIndexing")).isTrue(); + assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); + assertThat(locationCache.getWriteEndpoints().get(0).toString()).contains("westus"); } public static void validateSuccess(Mono observable) { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index d420ea1a4d654..41291ed0fc203 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -3,14 +3,16 @@ package com.azure.cosmos.implementation.directconnectivity; -import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.caches.AsyncCache; import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.routing.LocationCache; import org.apache.commons.lang3.reflect.FieldUtils; /** @@ -92,6 +94,14 @@ public static GlobalEndpointManager getGlobalEndpointManager(GatewayServiceConfi return get(GlobalEndpointManager.class, serviceConfigurationReader, "globalEndpointManager"); } + public static LocationCache getLocationCache(GlobalEndpointManager globalEndpointManager){ + return get(LocationCache.class, globalEndpointManager, "locationCache"); + } + + public static AsyncCache getDatabaseAccountAsyncCache(GlobalEndpointManager globalEndpointManager){ + return get(AsyncCache.class, globalEndpointManager, "databaseAccountAsyncCache"); + }; + public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); } From d27cde4738dbe5db8cfa725c732b34bf817f6fe2 Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Wed, 26 Feb 2020 16:06:59 -0500 Subject: [PATCH 5/6] removing cahce logic --- .../implementation/GlobalEndpointManager.java | 36 ++++---------- .../implementation/RxDocumentClientImpl.java | 6 ++- .../GatewayServiceConfigurationReader.java | 5 +- ...GatewayServiceConfigurationReaderTest.java | 47 +------------------ .../directconnectivity/ReflectionUtils.java | 15 ------ 5 files changed, 15 insertions(+), 94 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index ce6231af5b3b6..2bd3e9a6c0d9f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -47,13 +47,11 @@ public class GlobalEndpointManager implements AutoCloseable { private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Scheduler scheduler = Schedulers.fromExecutor(executor); private volatile boolean isClosed; - private final AsyncCache databaseAccountAsyncCache; private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true); private volatile DatabaseAccount latestDatabaseAccount; public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) { this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000; - this.databaseAccountAsyncCache = new AsyncCache<>(); try { this.locationCache = new LocationCache( new ArrayList<>(connectionPolicy.getPreferredLocations() != null ? @@ -165,17 +163,12 @@ public Mono refreshLocationAsync(DatabaseAccount databaseAccount, boolean }); } - public Mono getDatabaseAccountFromCache(URI defaultEndpoint) { - return this.databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, () -> this.owner.getDatabaseAccountFromEndpoint(defaultEndpoint).flatMap(databaseAccount -> { - if (databaseAccount != null) { - this.latestDatabaseAccount = databaseAccount; - } - - Mono refreshLocationCompletable = this.refreshLocationAsync(databaseAccount, false); - return refreshLocationCompletable.then(Mono.just(databaseAccount)); - }).single()); - } - + /** + * This will provide the latest databaseAccount. + * If due to some reason last databaseAccount update was null, + * this method will return previous valid value + * @return DatabaseAccount + */ public DatabaseAccount getLatestDatabaseAccount() { return this.latestDatabaseAccount; } @@ -274,27 +267,14 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { } private Mono getDatabaseAccountAsync(URI serviceEndpoint) { - final GlobalEndpointManager that = this; - Callable> fetchDatabaseAccount = () -> { - return that.owner.getDatabaseAccountFromEndpoint(serviceEndpoint).doOnNext(databaseAccount -> { + return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint) + .doOnNext(databaseAccount -> { if(databaseAccount != null) { this.latestDatabaseAccount = databaseAccount; } logger.debug("account retrieved: {}", databaseAccount); }).single(); - }; - - Mono obsoleteValueMono = databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, null, fetchDatabaseAccount); - return obsoleteValueMono.flatMap(obsoleteValue -> { - if (firstTimeDatabaseAccountInitialization.compareAndSet(true, false)) { - return Mono.just(obsoleteValue); - } - return databaseAccountAsyncCache.getAsync(StringUtils.EMPTY, obsoleteValue, fetchDatabaseAccount).doOnError(t -> { - //Putting back the old value in cache, this will avoid cache corruption - databaseAccountAsyncCache.set(StringUtils.EMPTY, obsoleteValue); - }); - }); } public boolean isClosed() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index e09fb1680f970..7a9b340e4f7e2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -250,8 +250,12 @@ private RxDocumentClientImpl(URI serviceEndpoint, } private void initializeGatewayConfigurationReader() { - this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.serviceEndpoint, this.globalEndpointManager); + this.gatewayConfigurationReader = new GatewayServiceConfigurationReader(this.globalEndpointManager); DatabaseAccount databaseAccount = this.globalEndpointManager.getLatestDatabaseAccount(); + //Database account should not be null here, + // this.globalEndpointManager.init() must have been already called + // hence asserting it + assert(databaseAccount != null); this.useMultipleWriteLocations = this.connectionPolicy.getUsingMultipleWriteLocations() && BridgeInternal.isEnableMultipleWriteLocations(databaseAccount); // TODO: add support for openAsync diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java index a9af7d876f30b..adb678c0788bb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReader.java @@ -24,13 +24,10 @@ */ public class GatewayServiceConfigurationReader { - private URI serviceEndpoint; private GlobalEndpointManager globalEndpointManager; - public GatewayServiceConfigurationReader(URI serviceEndpoint, GlobalEndpointManager globalEndpointManager) { - this.serviceEndpoint = serviceEndpoint; + public GatewayServiceConfigurationReader(GlobalEndpointManager globalEndpointManager) { this.globalEndpointManager = globalEndpointManager; - this.globalEndpointManager.getDatabaseAccountFromCache(this.serviceEndpoint).block(); } public ReplicationPolicy getUserReplicationPolicy() { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java index 370213ca2349f..db08961e98241 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java @@ -70,9 +70,6 @@ public void clientInitialization() throws Exception { client = this.clientBuilder().build(); RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) client; GatewayServiceConfigurationReader serviceConfigurationReader = ReflectionUtils.getServiceConfigurationReader(rxDocumentClient); - GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(serviceConfigurationReader); - Mono databaseAccountMono = globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)); - validateSuccess(databaseAccountMono); assertThat(serviceConfigurationReader.getDefaultConsistencyLevel()).isNotNull(); assertThat(serviceConfigurationReader.getQueryEngineConfiguration()).isNotNull(); assertThat(serviceConfigurationReader.getSystemReplicationPolicy()).isNotNull(); @@ -88,7 +85,7 @@ public void configurationPropertyReads() throws Exception { ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000); globalEndpointManager.init(); - GatewayServiceConfigurationReader configurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST), globalEndpointManager); + GatewayServiceConfigurationReader configurationReader = new GatewayServiceConfigurationReader(globalEndpointManager); assertThat(configurationReader.getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); assertThat((boolean) configurationReader.getQueryEngineConfiguration().get("enableSpatialIndexing")).isTrue(); assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); @@ -117,48 +114,6 @@ public void configurationPropertyReads() throws Exception { assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); } - @Test(groups = "simple") - public void configurationPropertyReadsViaCache() throws Exception { - DatabaseAccountManagerInternal databaseAccountManagerInternal = Mockito.mock(DatabaseAccountManagerInternal.class); - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson1))); - Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI(TestConfigurations.HOST)); - GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal, new ConnectionPolicy(), new Configs()); - LocationCache locationCache = ReflectionUtils.getLocationCache(globalEndpointManager); - - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(locationCache.getWriteEndpoints().get(0).toString()).contains("eastus"); - - - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson2))); - AsyncCache databaseAccountAsyncCache = ReflectionUtils.getDatabaseAccountAsyncCache(globalEndpointManager); - databaseAccountAsyncCache.clear(); - - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.EVENTUAL); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).get("enableSpatialIndexing")).isFalse(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(5); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(5); - assertThat(locationCache.getWriteEndpoints().get(0).toString()).contains("eastasia"); - databaseAccountAsyncCache.clear(); - - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.just(new DatabaseAccount(GlobalEndPointManagerTest.dbAccountJson3))); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(locationCache.getWriteEndpoints().get(0).toString()).contains("westus"); - - //Testing scenario of scheduled cache refresh with error - Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(Matchers.any())).thenReturn(Flux.error(BridgeInternal.createCosmosClientException(HttpConstants.StatusCodes.FORBIDDEN))); - assertThat(BridgeInternal.getConsistencyPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getDefaultConsistencyLevel()).isEqualTo(ConsistencyLevel.SESSION); - assertThat((boolean) BridgeInternal.getQueryEngineConfiuration(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).get("enableSpatialIndexing")).isTrue(); - assertThat(BridgeInternal.getSystemReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(BridgeInternal.getReplicationPolicy(globalEndpointManager.getDatabaseAccountFromCache(new URI(TestConfigurations.HOST)).block()).getMaxReplicaSetSize()).isEqualTo(4); - assertThat(locationCache.getWriteEndpoints().get(0).toString()).contains("westus"); - } - public static void validateSuccess(Mono observable) { TestSubscriber testSubscriber = new TestSubscriber(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index 41291ed0fc203..a27dc5b76d593 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -6,13 +6,10 @@ import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.CosmosClient; -import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.RxDocumentClientImpl; -import com.azure.cosmos.implementation.caches.AsyncCache; import com.azure.cosmos.implementation.http.HttpClient; -import com.azure.cosmos.implementation.routing.LocationCache; import org.apache.commons.lang3.reflect.FieldUtils; /** @@ -90,18 +87,6 @@ public static GatewayServiceConfigurationReader getServiceConfigurationReader(Rx return get(GatewayServiceConfigurationReader.class, rxDocumentClient, "gatewayConfigurationReader"); } - public static GlobalEndpointManager getGlobalEndpointManager(GatewayServiceConfigurationReader serviceConfigurationReader){ - return get(GlobalEndpointManager.class, serviceConfigurationReader, "globalEndpointManager"); - } - - public static LocationCache getLocationCache(GlobalEndpointManager globalEndpointManager){ - return get(LocationCache.class, globalEndpointManager, "locationCache"); - } - - public static AsyncCache getDatabaseAccountAsyncCache(GlobalEndpointManager globalEndpointManager){ - return get(AsyncCache.class, globalEndpointManager, "databaseAccountAsyncCache"); - }; - public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); } From 9b8c10c3c63f73108ecc5c1f17adc7d698ed2968 Mon Sep 17 00:00:00 2001 From: Naveen Kumar Singh Date: Thu, 27 Feb 2020 13:01:57 -0500 Subject: [PATCH 6/6] resolving conflicts --- ...GatewayServiceConfigurationReaderTest.java | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java index db08961e98241..a915106147e7a 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayServiceConfigurationReaderTest.java @@ -6,7 +6,6 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConnectionPolicy; import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.CosmosKeyCredential; import com.azure.cosmos.DatabaseAccount; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.AsyncDocumentClient.Builder; @@ -16,30 +15,17 @@ import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.RxDocumentClientImpl; -import com.azure.cosmos.implementation.SpyClientUnderTestFactory; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TestSuiteBase; -import com.azure.cosmos.implementation.caches.AsyncCache; import com.azure.cosmos.implementation.http.HttpClient; -import com.azure.cosmos.implementation.http.HttpHeaders; -import com.azure.cosmos.implementation.http.HttpRequest; -import com.azure.cosmos.implementation.http.HttpResponse; -import com.azure.cosmos.implementation.routing.LocationCache; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; -import io.reactivex.subscribers.TestSubscriber; -import org.apache.commons.io.IOUtils; import org.mockito.Matchers; import org.mockito.Mockito; import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Factory; import org.testng.annotations.Test; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import java.net.URI; -import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -113,18 +99,4 @@ public void configurationPropertyReads() throws Exception { assertThat(configurationReader.getSystemReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); assertThat(configurationReader.getUserReplicationPolicy().getMaxReplicaSetSize()).isEqualTo(4); } - - public static void validateSuccess(Mono observable) { - TestSubscriber testSubscriber = new TestSubscriber(); - - observable.subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(TIMEOUT, TimeUnit.MILLISECONDS); - testSubscriber.assertNoErrors(); - testSubscriber.assertComplete(); - testSubscriber.assertValueCount(1); - DatabaseAccount databaseAccount = testSubscriber.values().get(0); - assertThat(BridgeInternal.getQueryEngineConfiuration(databaseAccount).size() > 0).isTrue(); - assertThat(BridgeInternal.getReplicationPolicy(databaseAccount)).isNotNull(); - assertThat(BridgeInternal.getSystemReplicationPolicy(databaseAccount)).isNotNull(); - } }