diff --git a/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/AsyncBenchmark.java b/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/AsyncBenchmark.java index 477118e3d51cc..6905b29cd27be 100644 --- a/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/AsyncBenchmark.java +++ b/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/AsyncBenchmark.java @@ -47,17 +47,17 @@ import rx.Subscriber; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; abstract class AsyncBenchmark { private final MetricRegistry metricsRegistry = new MetricRegistry(); private final ScheduledReporter reporter; - private final CountDownLatch operationCounterLatch; private final String nameCollectionLink; private Meter successMeter; @@ -87,7 +87,6 @@ abstract class AsyncBenchmark { nameCollectionLink = String.format("dbs/%s/colls/%s", database.getId(), collection.getId()); partitionKey = collection.getPartitionKey().getPaths().iterator().next().split("/")[1]; concurrencyControlSemaphore = new Semaphore(cfg.getConcurrency()); - operationCounterLatch = new CountDownLatch(cfg.getNumberOfOperations()); configuration = cfg; ArrayList> createDocumentObservables = new ArrayList<>(); @@ -166,6 +165,24 @@ protected String getDocumentLink(Document doc) { protected abstract void performWorkload(Subscriber subs, long i) throws Exception; + private boolean shouldContinue(long startTimeMillis, long iterationCount) { + Duration maxDurationTime = configuration.getMaxRunningTimeDuration(); + int maxNumberOfOperations = configuration.getNumberOfOperations(); + if (maxDurationTime == null) { + return iterationCount < maxNumberOfOperations; + } + + if (startTimeMillis + maxDurationTime.toMillis() < System.currentTimeMillis()) { + return false; + } + + if (maxNumberOfOperations < 0) { + return true; + } + + return iterationCount < maxNumberOfOperations; + } + void run() throws Exception { successMeter = metricsRegistry.meter("#Successful Operations"); @@ -178,7 +195,9 @@ void run() throws Exception { long startTime = System.currentTimeMillis(); - for (long i = 1; i <= configuration.getNumberOfOperations(); i++) { + AtomicLong count = new AtomicLong(0); + long i; + for ( i = 0; shouldContinue(startTime, i); i++) { Subscriber subs = new Subscriber() { @@ -190,8 +209,12 @@ public void onStart() { public void onCompleted() { successMeter.mark(); concurrencyControlSemaphore.release(); - operationCounterLatch.countDown(); AsyncBenchmark.this.onSuccess(); + + synchronized (count) { + count.incrementAndGet(); + count.notify(); + } } @Override @@ -200,8 +223,12 @@ public void onError(Throwable e) { logger.error("Encountered failure {} on thread {}" , e.getMessage(), Thread.currentThread().getName(), e); concurrencyControlSemaphore.release(); - operationCounterLatch.countDown(); AsyncBenchmark.this.onError(e); + + synchronized (count) { + count.incrementAndGet(); + count.notify(); + } } @Override @@ -212,7 +239,12 @@ public void onNext(T value) { performWorkload(subs, i); } - operationCounterLatch.await(); + synchronized (count) { + while (count.get() < i) { + count.wait(); + } + } + long endTime = System.currentTimeMillis(); logger.info("[{}] operations performed in [{}] seconds.", configuration.getNumberOfOperations(), (int) ((endTime - startTime) / 1000)); diff --git a/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/Configuration.java b/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/Configuration.java index 915591f8b7c9f..2b1e350afcf1f 100644 --- a/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/Configuration.java +++ b/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/Configuration.java @@ -23,6 +23,7 @@ package com.microsoft.azure.cosmosdb.benchmark; +import java.time.Duration; import java.util.Arrays; import org.apache.commons.lang3.StringUtils; @@ -99,6 +100,20 @@ class Configuration { @Parameter(names = "-numberOfOperations", description = "Total Number Of Documents To Insert") private int numberOfOperations = 100000; + static class DurationConverter implements IStringConverter { + @Override + public Duration convert(String value) { + if (value == null) { + return null; + } + + return Duration.parse(value); + } + } + + @Parameter(names = "-maxRunningTimeDuration", description = "Max Running Time Duration", converter = DurationConverter.class) + private Duration maxRunningTimeDuration; + @Parameter(names = "-printingInterval", description = "Interval of time after which Metrics should be printed (seconds)") private int printingInterval = 10; @@ -181,6 +196,10 @@ public ConsistencyLevel convert(String value) { } } + Duration getMaxRunningTimeDuration() { + return maxRunningTimeDuration; + } + Operation getOperationType() { return operation; } diff --git a/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/Main.java b/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/Main.java index 18c20c619fafc..ecfc9c5823ca0 100644 --- a/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/Main.java +++ b/benchmark/src/main/java/com/microsoft/azure/cosmosdb/benchmark/Main.java @@ -93,7 +93,7 @@ public static void main(String[] args) throws Exception { // if any error in parsing the cmd-line options print out the usage help System.err.println("Invalid Usage: " + e.getMessage()); System.err.println("Try '-help' for more information."); - System.exit(1); + throw e; } } } diff --git a/benchmark/src/test/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWritesConsistencyTest.java b/benchmark/src/test/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWritesConsistencyTest.java new file mode 100644 index 0000000000000..8970087944837 --- /dev/null +++ b/benchmark/src/test/java/com/microsoft/azure/cosmosdb/benchmark/ReadMyWritesConsistencyTest.java @@ -0,0 +1,224 @@ +/* + * The MIT License (MIT) + * Copyright (c) 2018 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.microsoft.azure.cosmosdb.benchmark; + +import com.beust.jcommander.JCommander; +import com.google.common.base.Strings; +import com.microsoft.azure.cosmosdb.DataType; +import com.microsoft.azure.cosmosdb.Database; +import com.microsoft.azure.cosmosdb.DocumentCollection; +import com.microsoft.azure.cosmosdb.IncludedPath; +import com.microsoft.azure.cosmosdb.Index; +import com.microsoft.azure.cosmosdb.IndexingPolicy; +import com.microsoft.azure.cosmosdb.PartitionKeyDefinition; +import com.microsoft.azure.cosmosdb.RequestOptions; +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; +import com.microsoft.azure.cosmosdb.rx.TestConfigurations; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import rx.Observable; +import rx.schedulers.Schedulers; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ReadMyWritesConsistencyTest { + private final static Logger logger = LoggerFactory.getLogger(ReadMyWritesConsistencyTest.class); + private final int initialCollectionThroughput = 10_000; + private final int newCollectionThroughput = 100_000; + private final int delayForInitiationCollectionScaleUpInSeconds = 60; + private final Duration defaultMaxRunningTimeInSeconds = Duration.ofMinutes(45); + + private final String maxRunningTime = + System.getProperty("MAX_RUNNING_TIME", StringUtils.defaultString(Strings.emptyToNull( + System.getenv().get("MAX_RUNNING_TIME")), defaultMaxRunningTimeInSeconds.toString())); + + private final AtomicBoolean collectionScaleUpFailed = new AtomicBoolean(false); + private final String desiredConsistency = + System.getProperty("DESIRED_CONSISTENCY", + StringUtils.defaultString(Strings.emptyToNull( + System.getenv().get("DESIRED_CONSISTENCY")), "Session")); + + private final String numberOfOperationsAsString = + System.getProperty("NUMBER_OF_OPERATIONS", + StringUtils.defaultString(Strings.emptyToNull( + System.getenv().get("NUMBER_OF_OPERATIONS")), "-1")); + + private Database database; + private DocumentCollection collection; + + @Test(dataProvider = "collectionLinkTypeArgProvider", groups = "e2e") + public void readMyWrites(boolean useNameLink) throws Exception { + int concurrency = 5; + String cmdFormat = "-serviceEndpoint %s -masterKey %s" + + " -databaseId %s -collectionId %s" + + " -consistencyLevel %s -concurrency %d" + + " -numberOfOperations %s" + + " -maxRunningTimeDuration %s" + + " -operation ReadMyWrites -connectionMode Direct -numberOfPreCreatedDocuments 100 " + + " -printingInterval 60"; + + String cmd = String.format(cmdFormat, + TestConfigurations.HOST, + TestConfigurations.MASTER_KEY, + database.getId(), + collection.getId(), + desiredConsistency, + concurrency, + numberOfOperationsAsString, + maxRunningTime) + + (useNameLink ? " -useNameLink" : ""); + + Configuration cfg = new Configuration(); + new JCommander(cfg, StringUtils.split(cmd)); + + AtomicInteger success = new AtomicInteger(); + AtomicInteger error = new AtomicInteger(); + + ReadMyWriteWorkflow wf = new ReadMyWriteWorkflow(cfg) { + @Override + protected void onError(Throwable throwable) { + error.incrementAndGet(); + } + + @Override + protected void onSuccess() { + success.incrementAndGet(); + } + }; + + // schedules a collection scale up after a delay + scheduleScaleUp(delayForInitiationCollectionScaleUpInSeconds, newCollectionThroughput); + + wf.run(); + wf.shutdown(); + + int numberOfOperations = Integer.parseInt(numberOfOperationsAsString); + + assertThat(error).hasValue(0); + assertThat(collectionScaleUpFailed).isFalse(); + + if (numberOfOperations > 0) { + assertThat(success).hasValue(numberOfOperations); + } + } + + @BeforeClass(groups = "e2e") + public void beforeClass() { + RequestOptions options = new RequestOptions(); + options.setOfferThroughput(initialCollectionThroughput); + AsyncDocumentClient housekeepingClient = Utils.housekeepingClient(); + database = Utils.createDatabaseForTest(housekeepingClient); + collection = housekeepingClient.createCollection("dbs/" + database.getId(), + getCollectionDefinitionWithRangeRangeIndex(), + options) + .toBlocking().single().getResource(); + housekeepingClient.close(); + } + + @DataProvider(name = "collectionLinkTypeArgProvider") + public Object[][] collectionLinkTypeArgProvider() { + return new Object[][]{ + // is namebased + {true}, + }; + } + + @AfterClass(groups = "e2e") + public void afterClass() { + AsyncDocumentClient housekeepingClient = Utils.housekeepingClient(); + Utils.safeCleanDatabases(housekeepingClient); + Utils.safeClean(housekeepingClient, database); + Utils.safeClose(housekeepingClient); + } + + DocumentCollection getCollectionDefinitionWithRangeRangeIndex() { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList<>(); + paths.add("/mypk"); + partitionKeyDef.setPaths(paths); + IndexingPolicy indexingPolicy = new IndexingPolicy(); + Collection includedPaths = new ArrayList<>(); + IncludedPath includedPath = new IncludedPath(); + includedPath.setPath("/*"); + Collection indexes = new ArrayList<>(); + Index stringIndex = Index.Range(DataType.String); + stringIndex.set("precision", -1); + indexes.add(stringIndex); + + Index numberIndex = Index.Range(DataType.Number); + numberIndex.set("precision", -1); + indexes.add(numberIndex); + includedPath.setIndexes(indexes); + includedPaths.add(includedPath); + indexingPolicy.setIncludedPaths(includedPaths); + + DocumentCollection collectionDefinition = new DocumentCollection(); + collectionDefinition.setIndexingPolicy(indexingPolicy); + collectionDefinition.setId(UUID.randomUUID().toString()); + collectionDefinition.setPartitionKey(partitionKeyDef); + + return collectionDefinition; + } + + private void scheduleScaleUp(int delayStartInSeconds, int newThroughput) { + AsyncDocumentClient housekeepingClient = Utils.housekeepingClient(); + Observable.timer(delayStartInSeconds, TimeUnit.SECONDS, Schedulers.newThread()).flatMap(aVoid -> { + + // increase throughput to max for a single partition collection to avoid throttling + // for bulk insert and later queries. + return housekeepingClient.queryOffers( + String.format("SELECT * FROM r WHERE r.offerResourceId = '%s'", + collection.getResourceId()) + , null).flatMap(page -> Observable.from(page.getResults())) + .first().flatMap(offer -> { + logger.info("going to scale up collection, newThroughput {}", newThroughput); + offer.setThroughput(newThroughput); + return housekeepingClient.replaceOffer(offer); + }); + }).doOnTerminate(() -> housekeepingClient.close()) + .subscribe(aVoid -> { + }, e -> { + logger.error("collectionScaleUpFailed to scale up collection", e); + collectionScaleUpFailed.set(true); + }, + () -> { + logger.info("Collection Scale up request sent to the service"); + + } + ); + } +} \ No newline at end of file diff --git a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/DatabaseForTest.java b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/DatabaseForTest.java index 1e646d58e6eb3..5711d2fa817e7 100644 --- a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/DatabaseForTest.java +++ b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/DatabaseForTest.java @@ -39,7 +39,7 @@ public class DatabaseForTest { private static Logger logger = LoggerFactory.getLogger(DatabaseForTest.class); public static final String SHARED_DB_ID_PREFIX = "RxJava.SDKTest.SharedDatabase"; - private static final Duration CLEANUP_THRESHOLD_DURATION = Duration.ofHours(1); + private static final Duration CLEANUP_THRESHOLD_DURATION = Duration.ofHours(2); private static final String DELIMITER = "_"; private static DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss"); diff --git a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/RetryAnalyzer.java b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/RetryAnalyzer.java new file mode 100644 index 0000000000000..36766d50c9bd4 --- /dev/null +++ b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/RetryAnalyzer.java @@ -0,0 +1,53 @@ +/* + * The MIT License (MIT) + * Copyright (c) 2018 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + + +package com.microsoft.azure.cosmosdb; + +import com.microsoft.azure.cosmosdb.rx.TestConfigurations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.ITestResult; +import org.testng.util.RetryAnalyzerCount; + +import java.util.concurrent.TimeUnit; + +public class RetryAnalyzer extends RetryAnalyzerCount { + private final Logger logger = LoggerFactory.getLogger(RetryAnalyzer.class); + private final int waitBetweenRetriesInSeconds = 120; + + public RetryAnalyzer() { + this.setCount(Integer.parseInt(TestConfigurations.MAX_RETRY_LIMIT)); + } + + @Override + public boolean retryMethod(ITestResult result) { + try { + TimeUnit.SECONDS.sleep(waitBetweenRetriesInSeconds); + } catch (InterruptedException e) { + return false; + } + + return true; + } +} diff --git a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/TestConfigurations.java b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/TestConfigurations.java index a6f115439e9e8..a6847021992e5 100644 --- a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/TestConfigurations.java +++ b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/TestConfigurations.java @@ -64,4 +64,22 @@ public final class TestConfigurations { System.getProperty("PREFERRED_LOCATIONS", StringUtils.defaultString(Strings.emptyToNull( System.getenv().get("PREFERRED_LOCATIONS")), null)); + + public static String MAX_RETRY_LIMIT = + System.getProperty("MAX_RETRY_LIMIT", + StringUtils.defaultString(Strings.emptyToNull( + System.getenv().get("MAX_RETRY_LIMIT")), + "2")); + + public static String DESIRED_CONSISTENCIES = + System.getProperty("DESIRED_CONSISTENCIES", + StringUtils.defaultString(Strings.emptyToNull( + System.getenv().get("DESIRED_CONSISTENCIES")), + null)); + + public static String PROTOCOLS = + System.getProperty("PROTOCOLS", + StringUtils.defaultString(Strings.emptyToNull( + System.getenv().get("PROTOCOLS")), + null)); } diff --git a/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClientTest.java b/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClientTest.java index b2ab400ad3841..44ec29a004e1f 100644 --- a/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClientTest.java +++ b/direct-impl/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/RntbdTransportClientTest.java @@ -692,7 +692,7 @@ public void verifyRequestFailures( RntbdResponse response ) { final UserAgentContainer userAgent = new UserAgentContainer(); - final Duration timeout = Duration.ofMillis(100); + final Duration timeout = Duration.ofMillis(1000); try (final RntbdTransportClient client = getRntbdTransportClientUnderTest(userAgent, timeout, response)) { diff --git a/pom.xml b/pom.xml index 840ec341e97b2..66119ea309925 100644 --- a/pom.xml +++ b/pom.xml @@ -207,6 +207,21 @@ + + + e2e + + e2e + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + + diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/CollectionCrudTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/CollectionCrudTest.java index 3aa4b54519cf7..7f157cf677bc3 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/CollectionCrudTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/CollectionCrudTest.java @@ -1,17 +1,17 @@ /* * The MIT License (MIT) * Copyright (c) 2018 Microsoft Corporation - * + * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: - * + * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. - * + * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -30,6 +30,7 @@ import com.microsoft.azure.cosmosdb.DatabaseForTest; import com.microsoft.azure.cosmosdb.PartitionKeyDefinition; +import com.microsoft.azure.cosmosdb.RetryAnalyzer; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -50,7 +51,7 @@ import rx.Observable; public class CollectionCrudTest extends TestSuiteBase { - private static final int TIMEOUT = 30000; + private static final int TIMEOUT = 50000; private static final int SETUP_TIMEOUT = 20000; private static final int SHUTDOWN_TIMEOUT = 20000; private final String databaseId = DatabaseForTest.generateId(); @@ -92,13 +93,13 @@ private DocumentCollection getCollectionDefinition(String collectionName) { @Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider") public void createCollection(String collectionName, boolean isNameBased) { DocumentCollection collectionDefinition = getCollectionDefinition(collectionName); - + Observable> createObservable = client .createCollection(getDatabaseLink(database, isNameBased), collectionDefinition, null); ResourceResponseValidator validator = new ResourceResponseValidator.Builder() .withId(collectionDefinition.getId()).build(); - + validateSuccess(createObservable, validator); safeDeleteAllCollections(client, database); } @@ -124,17 +125,17 @@ public void createCollectionWithCompositeIndexAndSpatialSpec() { compositePath5.setOrder(CompositePathSortOrder.Descending); CompositePath compositePath6 = new CompositePath(); compositePath6.setPath("/path6"); - + ArrayList compositeIndex1 = new ArrayList(); compositeIndex1.add(compositePath1); compositeIndex1.add(compositePath2); compositeIndex1.add(compositePath3); - + ArrayList compositeIndex2 = new ArrayList(); compositeIndex2.add(compositePath4); compositeIndex2.add(compositePath5); compositeIndex2.add(compositePath6); - + Collection> compositeIndexes = new ArrayList>(); compositeIndexes.add(compositeIndex1); compositeIndexes.add(compositeIndex2); @@ -145,7 +146,7 @@ public void createCollectionWithCompositeIndexAndSpatialSpec() { SpatialType.LineString, SpatialType.Polygon, SpatialType.MultiPolygon - }; + }; Collection spatialIndexes = new ArrayList(); for (int index = 0; index < 2; index++) { Collection collectionOfSpatialTypes = new ArrayList(); @@ -159,12 +160,12 @@ public void createCollectionWithCompositeIndexAndSpatialSpec() { spec.setSpatialTypes(collectionOfSpatialTypes); spatialIndexes.add(spec); } - + indexingPolicy.setSpatialIndexes(spatialIndexes); collection.setId(UUID.randomUUID().toString()); collection.setIndexingPolicy(indexingPolicy); - + Observable> createObservable = client .createCollection(database.getSelfLink(), collection, null); @@ -173,7 +174,7 @@ public void createCollectionWithCompositeIndexAndSpatialSpec() { .withCompositeIndexes(compositeIndexes) .withSpatialIndexes(spatialIndexes) .build(); - + validateSuccess(createObservable, validator); safeDeleteAllCollections(client, database); } @@ -181,9 +182,9 @@ public void createCollectionWithCompositeIndexAndSpatialSpec() { @Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider") public void readCollection(String collectionName, boolean isNameBased) { DocumentCollection collectionDefinition = getCollectionDefinition(collectionName); - + Observable> createObservable = client.createCollection(getDatabaseLink(database, isNameBased), collectionDefinition, - null); + null); DocumentCollection collection = createObservable.toBlocking().single().getResource(); Observable> readObservable = client.readCollection(getCollectionLink(database, collection, isNameBased), null); @@ -197,11 +198,11 @@ public void readCollection(String collectionName, boolean isNameBased) { @Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider") public void readCollection_NameBase(String collectionName, boolean isNameBased) { DocumentCollection collectionDefinition = getCollectionDefinition(collectionName); - + Observable> createObservable = client.createCollection(getDatabaseLink(database, isNameBased), collectionDefinition, - null); + null); DocumentCollection collection = createObservable.toBlocking().single().getResource(); - + Observable> readObservable = client.readCollection( getCollectionLink(database, collection, isNameBased), null); @@ -224,12 +225,12 @@ public void readCollection_DoesntExist(String collectionName, boolean isNameBase @Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider") public void deleteCollection(String collectionName, boolean isNameBased) { DocumentCollection collectionDefinition = getCollectionDefinition(collectionName); - + Observable> createObservable = client.createCollection(getDatabaseLink(database, isNameBased), collectionDefinition, null); DocumentCollection collection = createObservable.toBlocking().single().getResource(); Observable> deleteObservable = client.deleteCollection(getCollectionLink(database, collection, isNameBased), - null); + null); ResourceResponseValidator validator = new ResourceResponseValidator.Builder() .nullResource().build(); @@ -244,13 +245,13 @@ public void replaceCollection(String collectionName, boolean isNameBased) { DocumentCollection collection = createObservable.toBlocking().single().getResource(); // sanity check assertThat(collection.getIndexingPolicy().getIndexingMode()).isEqualTo(IndexingMode.Consistent); - + // replace indexing mode IndexingPolicy indexingMode = new IndexingPolicy(); indexingMode.setIndexingMode(IndexingMode.Lazy); collection.setIndexingPolicy(indexingMode); Observable> readObservable = client.replaceCollection(collection, null); - + // validate ResourceResponseValidator validator = new ResourceResponseValidator.Builder() .indexingMode(IndexingMode.Lazy).build(); @@ -258,12 +259,12 @@ public void replaceCollection(String collectionName, boolean isNameBased) { safeDeleteAllCollections(client, database); } - @Test(groups = { "emulator" }, timeOut = TIMEOUT) + @Test(groups = { "emulator" }, timeOut = 10 * TIMEOUT, retryAnalyzer = RetryAnalyzer.class) public void sessionTokenConsistencyCollectionDeleteCreateSameName() { AsyncDocumentClient client1 = clientBuilder.build(); AsyncDocumentClient client2 = clientBuilder.build(); - String dbId = "db"; + String dbId = DatabaseForTest.generateId(); String collectionId = "coll"; try { Database databaseDefinition = new Database(); @@ -284,7 +285,7 @@ public void sessionTokenConsistencyCollectionDeleteCreateSameName() { document.set("name", "New Updated Document"); ResourceResponse upsertDocumentResponse = client1.upsertDocument(collection.getSelfLink(), document, null, - true).toBlocking().single(); + true).toBlocking().single(); logger.info("Client 1 Upsert Document Client Side Request Statistics {}", upsertDocumentResponse.getRequestDiagnosticsString()); logger.info("Client 1 Upsert Document Latency {}", upsertDocumentResponse.getRequestLatency()); diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/DCDocumentCrudTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/DCDocumentCrudTest.java index fc8b79637e954..7a73fa0e04bde 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/DCDocumentCrudTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/DCDocumentCrudTest.java @@ -67,7 +67,7 @@ * The tests in other test classes validate the actual behaviour and different scenarios. */ public class DCDocumentCrudTest extends TestSuiteBase { - private final static int QUERY_TIMEOUT = 30000; + private final static int QUERY_TIMEOUT = 40000; private final static String PARTITION_KEY_FIELD_NAME = "mypk"; private static Database createdDatabase; diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OrderbyDocumentQueryTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OrderbyDocumentQueryTest.java index cca649efba67a..21829fd366f87 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OrderbyDocumentQueryTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/OrderbyDocumentQueryTest.java @@ -35,11 +35,13 @@ import java.util.function.Function; import java.util.stream.Collectors; +import com.microsoft.azure.cosmosdb.RetryAnalyzer; import org.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.core.JsonProcessingException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; @@ -372,8 +374,8 @@ public void orderByContinuationTokenRoundTrip() throws Exception { assertThat(OrderByContinuationToken.tryParse("{\"property\" : \"Not a valid Order By Token\"}", outOrderByContinuationToken)).isFalse(); } } - - @Test(groups = { "simple" }, timeOut = TIMEOUT * 10, dataProvider = "sortOrder") + @Test(groups = { "simple" }, timeOut = TIMEOUT * 10, dataProvider = "sortOrder", + retryAnalyzer = RetryAnalyzer.class) public void queryDocumentsWithOrderByContinuationTokensInteger(String sortOrder) throws Exception { // Get Actual String query = String.format("SELECT * FROM c ORDER BY c.propInt %s", sortOrder); @@ -438,6 +440,12 @@ public List bulkInsert(AsyncDocumentClient client, List preferredLocations; + private static final ImmutableList desiredConsistencies; + private static final ImmutableList protocols; + protected int subscriberValidationTimeout = TIMEOUT; protected Builder clientBuilder; protected static Database SHARED_DATABASE; @@ -110,6 +114,15 @@ public class TestSuiteBase { protected static DocumentCollection SHARED_SINGLE_PARTITION_COLLECTION_WITHOUT_PARTITION_KEY; protected static DocumentCollection SHARED_MULTI_PARTITION_COLLECTION_WITH_COMPOSITE_AND_SPATIAL_INDEXES; + static { + accountConsistency = parseConsistency(TestConfigurations.CONSISTENCY); + desiredConsistencies = immutableListOrNull( + ObjectUtils.defaultIfNull(parseDesiredConsistencies(TestConfigurations.DESIRED_CONSISTENCIES), + allEqualOrLowerConsistencies(accountConsistency))); + preferredLocations = immutableListOrNull(parsePreferredLocation(TestConfigurations.PREFERRED_LOCATIONS)); + protocols = ObjectUtils.defaultIfNull(immutableListOrNull(parseProtocols(TestConfigurations.PROTOCOLS)), + ImmutableList.of(Protocol.Https, Protocol.Tcp)); + } protected TestSuiteBase() { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -119,7 +132,11 @@ protected TestSuiteBase() { logger.debug("Initializing {} ...", this.getClass().getSimpleName()); } - @BeforeMethod(groups = { "simple", "long", "direct", "multi-master", "emulator", "non-emulator" }) + private static ImmutableList immutableListOrNull(List list) { + return list != null ? ImmutableList.copyOf(list) : null; + } + + @BeforeMethod(groups = {"simple", "long", "direct", "multi-master", "emulator", "non-emulator"}) public void beforeMethod(Method method) { if (this.clientBuilder != null) { logger.info("Starting {}::{} using {} {} mode with {} consistency", @@ -132,7 +149,7 @@ public void beforeMethod(Method method) { logger.info("Starting {}::{}", method.getDeclaringClass().getSimpleName(), method.getName()); } - @AfterMethod(groups = { "simple", "long", "direct", "multi-master", "emulator", "non-emulator" }) + @AfterMethod(groups = {"simple", "long", "direct", "multi-master", "emulator", "non-emulator"}) public void afterMethod(Method m) { Test t = m.getAnnotation(Test.class); logger.info("Finished {}:{}.", m.getDeclaringClass().getSimpleName(), m.getName()); @@ -166,7 +183,7 @@ public Observable> deleteDatabase(String id) { } } - @BeforeSuite(groups = { "simple", "long", "direct", "multi-master", "emulator", "non-emulator" }, timeOut = SUITE_SETUP_TIMEOUT) + @BeforeSuite(groups = {"simple", "long", "direct", "multi-master", "emulator", "non-emulator"}, timeOut = SUITE_SETUP_TIMEOUT) public static void beforeSuite() { logger.info("beforeSuite Started"); AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build(); @@ -184,7 +201,7 @@ public static void beforeSuite() { } } - @AfterSuite(groups = { "simple", "long", "direct", "multi-master", "emulator", "non-emulator" }, timeOut = SUITE_SHUTDOWN_TIMEOUT) + @AfterSuite(groups = {"simple", "long", "direct", "multi-master", "emulator", "non-emulator"}, timeOut = SUITE_SHUTDOWN_TIMEOUT) public static void afterSuite() { logger.info("afterSuite Started"); AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build(); @@ -653,7 +670,7 @@ static protected void safeDeleteAllCollections(AsyncDocumentClient client, Datab .toBlocking() .single(); - for(DocumentCollection collection: collections) { + for (DocumentCollection collection : collections) { client.deleteCollection(collection.getSelfLink(), null).toBlocking().single().getResource(); } } @@ -677,12 +694,24 @@ static protected void safeDeleteCollection(AsyncDocumentClient client, String da } } + static protected void safeCloseAsync(AsyncDocumentClient client) { + if (client != null) { + new Thread(() -> { + try { + client.close(); + } catch (Exception e) { + logger.error("failed to close client", e); + } + }).start(); + } + } + static protected void safeClose(AsyncDocumentClient client) { if (client != null) { try { client.close(); } catch (Exception e) { - e.printStackTrace(); + logger.error("failed to close client", e); } } } @@ -760,15 +789,15 @@ public static void validateQueryFailure(Observable parsePreferredLocation(String preferredLocations) { + static List parsePreferredLocation(String preferredLocations) { if (StringUtils.isEmpty(preferredLocations)) { return null; } try { - return objectMapper.readValue(preferredLocations, new TypeReference>(){}); + return objectMapper.readValue(preferredLocations, new TypeReference>() { + }); } catch (Exception e) { logger.error("Invalid configured test preferredLocations [{}].", preferredLocations); throw new IllegalStateException("Invalid configured test preferredLocations " + preferredLocations); } } + static List parseProtocols(String protocols) { + if (StringUtils.isEmpty(protocols)) { + return null; + } + + try { + return objectMapper.readValue(protocols, new TypeReference>() { + }); + } catch (Exception e) { + logger.error("Invalid configured test protocols [{}].", protocols); + throw new IllegalStateException("Invalid configured test protocols " + protocols); + } + } + @DataProvider public static Object[][] simpleClientBuildersWithDirect() { - return simpleClientBuildersWithDirect(Protocol.Https, Protocol.Tcp); + return simpleClientBuildersWithDirect(toArray(protocols)); } @DataProvider @@ -808,31 +852,11 @@ public static Object[][] simpleClientBuildersWithDirectHttps() { return simpleClientBuildersWithDirect(Protocol.Https); } - @DataProvider - public static Object[][] simpleClientBuildersWithDirectTcp() { - return simpleClientBuildersWithDirect(Protocol.Tcp); - } - private static Object[][] simpleClientBuildersWithDirect(Protocol... protocols) { - - accountConsistency = parseConsistency(TestConfigurations.CONSISTENCY); logger.info("Max test consistency to use is [{}]", accountConsistency); - List testConsistencies = new ArrayList<>(); - - switch (accountConsistency) { - case Strong: - case BoundedStaleness: - case Session: - case ConsistentPrefix: - case Eventual: - testConsistencies.add(ConsistencyLevel.Eventual); - break; - default: - throw new IllegalStateException("Invalid configured test consistency " + accountConsistency); - } - - List preferredLocation = parsePreferredLocation(TestConfigurations.PREFERRED_LOCATIONS); - boolean isMultiMasterEnabled = preferredLocation != null && accountConsistency == ConsistencyLevel.Session; + List testConsistencies = ImmutableList.of(ConsistencyLevel.Eventual); + + boolean isMultiMasterEnabled = preferredLocations != null && accountConsistency == ConsistencyLevel.Session; List builders = new ArrayList<>(); builders.add(createGatewayRxDocumentClient(ConsistencyLevel.Session, false, null)); @@ -841,7 +865,7 @@ private static Object[][] simpleClientBuildersWithDirect(Protocol... protocols) testConsistencies.forEach(consistencyLevel -> builders.add(createDirectRxDocumentClient(consistencyLevel, protocol, isMultiMasterEnabled, - preferredLocation))); + preferredLocations))); } builders.forEach(b -> logger.info("Will Use ConnectionMode [{}], Consistency [{}], Protocol [{}]", @@ -855,7 +879,7 @@ private static Object[][] simpleClientBuildersWithDirect(Protocol... protocols) @DataProvider public static Object[][] clientBuildersWithDirect() { - return clientBuildersWithDirectAllConsistencies(Protocol.Https, Protocol.Tcp); + return clientBuildersWithDirectAllConsistencies(toArray(protocols)); } @DataProvider @@ -864,24 +888,41 @@ public static Object[][] clientBuildersWithDirectHttps() { } @DataProvider - public static Object[][] clientBuildersWithDirectTcp() { - return clientBuildersWithDirectAllConsistencies(Protocol.Tcp); + public static Object[][] clientBuildersWithDirectSession() { + return clientBuildersWithDirectSession(toArray(protocols)); } - @DataProvider - public static Object[][] clientBuildersWithDirectSession() { - return clientBuildersWithDirectSession(Protocol.Https, Protocol.Tcp); + static Protocol[] toArray(List protocols) { + return protocols.toArray(new Protocol[protocols.size()]); } - + private static Object[][] clientBuildersWithDirectSession(Protocol... protocols) { - return clientBuildersWithDirect(new ArrayList(){{add(ConsistencyLevel.Session);}} , protocols); + return clientBuildersWithDirect(new ArrayList() {{ + add(ConsistencyLevel.Session); + }}, protocols); } private static Object[][] clientBuildersWithDirectAllConsistencies(Protocol... protocols) { - accountConsistency = parseConsistency(TestConfigurations.CONSISTENCY); logger.info("Max test consistency to use is [{}]", accountConsistency); - List testConsistencies = new ArrayList<>(); + return clientBuildersWithDirect(desiredConsistencies, protocols); + } + + static List parseDesiredConsistencies(String consistencies) { + if (StringUtils.isEmpty(consistencies)) { + return null; + } + + try { + return objectMapper.readValue(consistencies, new TypeReference>() { + }); + } catch (Exception e) { + logger.error("Invalid consistency test desiredConsistencies [{}].", consistencies); + throw new IllegalStateException("Invalid configured test desiredConsistencies " + consistencies); + } + } + static List allEqualOrLowerConsistencies(ConsistencyLevel accountConsistency) { + List testConsistencies = new ArrayList<>(); switch (accountConsistency) { case Strong: testConsistencies.add(ConsistencyLevel.Strong); @@ -897,22 +938,20 @@ private static Object[][] clientBuildersWithDirectAllConsistencies(Protocol... p default: throw new IllegalStateException("Invalid configured test consistency " + accountConsistency); } - return clientBuildersWithDirect(testConsistencies, protocols); + return testConsistencies; } - - private static Object[][] clientBuildersWithDirect(List testConsistencies, Protocol... protocols) { - List preferredLocation = parsePreferredLocation(TestConfigurations.PREFERRED_LOCATIONS); - boolean isMultiMasterEnabled = preferredLocation != null && accountConsistency == ConsistencyLevel.Session; + private static Object[][] clientBuildersWithDirect(List testConsistencies, Protocol... protocols) { + boolean isMultiMasterEnabled = preferredLocations != null && accountConsistency == ConsistencyLevel.Session; List builders = new ArrayList<>(); - builders.add(createGatewayRxDocumentClient(ConsistencyLevel.Session, isMultiMasterEnabled, preferredLocation)); + builders.add(createGatewayRxDocumentClient(ConsistencyLevel.Session, isMultiMasterEnabled, preferredLocations)); for (Protocol protocol : protocols) { testConsistencies.forEach(consistencyLevel -> builders.add(createDirectRxDocumentClient(consistencyLevel, protocol, isMultiMasterEnabled, - preferredLocation))); + preferredLocations))); } builders.forEach(b -> logger.info("Will Use ConnectionMode [{}], Consistency [{}], Protocol [{}]", diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/TopQueryTests.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/TopQueryTests.java index f2b1cbb7a163a..b40b22609e7ea 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/TopQueryTests.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/TopQueryTests.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import com.microsoft.azure.cosmosdb.RetryAnalyzer; import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol; import org.testng.SkipException; import org.testng.annotations.AfterClass; @@ -43,7 +44,6 @@ import com.microsoft.azure.cosmosdb.FeedOptions; import com.microsoft.azure.cosmosdb.FeedResponse; import com.microsoft.azure.cosmosdb.PartitionKey; -import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; import com.microsoft.azure.cosmosdb.rx.internal.Utils.ValueHolder; import com.microsoft.azure.cosmosdb.rx.internal.query.TakeContinuationToken; @@ -66,7 +66,8 @@ public TopQueryTests(AsyncDocumentClient.Builder clientBuilder) { this.clientBuilder = clientBuilder; } - @Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider") + @Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider", retryAnalyzer = RetryAnalyzer.class + ) public void queryDocumentsWithTop(boolean qmEnabled) throws Exception { FeedOptions options = new FeedOptions(); @@ -151,7 +152,7 @@ public void topContinuationTokenRoundTrips() throws Exception { } } - @Test(groups = { "simple" }, timeOut = TIMEOUT * 10) + @Test(groups = { "simple" }, timeOut = TIMEOUT * 10, retryAnalyzer = RetryAnalyzer.class) public void queryDocumentsWithTopContinuationTokens() throws Exception { String query = "SELECT TOP 8 * FROM c"; this.queryWithContinuationTokensAndPageSizes(query, new int[] { 1, 5, 10 }, 8); diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/VeryLargeDocumentQueryTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/VeryLargeDocumentQueryTest.java index b196a888e376d..bbb9072868dbf 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/VeryLargeDocumentQueryTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/VeryLargeDocumentQueryTest.java @@ -26,6 +26,7 @@ import com.microsoft.azure.cosmosdb.Document; import com.microsoft.azure.cosmosdb.DocumentCollection; import com.microsoft.azure.cosmosdb.ResourceResponse; +import com.microsoft.azure.cosmosdb.RetryAnalyzer; import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol; import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder; import org.apache.commons.lang3.StringUtils; @@ -53,7 +54,7 @@ public VeryLargeDocumentQueryTest(Builder clientBuilder) { this.clientBuilder = clientBuilder; } - @Test(groups = { "emulator" }, timeOut = TIMEOUT) + @Test(groups = { "emulator" }, timeOut = TIMEOUT, retryAnalyzer = RetryAnalyzer.class) public void queryLargeDocuments() { int cnt = 5; for(int i = 0; i < cnt; i++) { diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/RetryThrottleTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/RetryThrottleTest.java index 2f438b764604c..63498109452c4 100644 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/RetryThrottleTest.java +++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/RetryThrottleTest.java @@ -70,7 +70,7 @@ public class RetryThrottleTest extends TestSuiteBase { private Database database; private DocumentCollection collection; - @Test(groups = { "long" }, timeOut = LARGE_TIMEOUT ) + @Test(groups = { "long" }, timeOut = LARGE_TIMEOUT, enabled = false) public void retryCreateDocumentsOnSpike() throws Exception { ConnectionPolicy policy = new ConnectionPolicy(); RetryOptions retryOptions = new RetryOptions(); @@ -118,7 +118,7 @@ public Observable answer(InvocationOnMock invocation) System.out.println("total count is " + totalCount.get()); } - @Test(groups = { "long" }, timeOut = TIMEOUT) + @Test(groups = { "long" }, timeOut = TIMEOUT, enabled = false) public void retryDocumentCreate() throws Exception { client = SpyClientUnderTestFactory.createClientWithGatewaySpy(createGatewayRxDocumentClient()); @@ -153,12 +153,12 @@ public Observable answer(InvocationOnMock invocation) validateSuccess(createObservable, validator, TIMEOUT); } - @AfterMethod(groups = { "long" }) + @AfterMethod(groups = { "long" }, enabled = false) private void afterMethod() { safeClose(client); } - @BeforeClass(groups = { "long" }, timeOut = SETUP_TIMEOUT) + @BeforeClass(groups = { "long" }, timeOut = SETUP_TIMEOUT, enabled = false) public void beforeClass() { // set up the client database = SHARED_DATABASE; @@ -176,7 +176,7 @@ private Document getDocumentDefinition() { return doc; } - @AfterClass(groups = { "long" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = { "long" }, timeOut = SHUTDOWN_TIMEOUT, enabled = false) public void afterClass() { } } diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/TestSuiteBase.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/TestSuiteBase.java deleted file mode 100644 index fa92ee2af6342..0000000000000 --- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/TestSuiteBase.java +++ /dev/null @@ -1,1012 +0,0 @@ -/* - * The MIT License (MIT) - * Copyright (c) 2018 Microsoft Corporation - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.microsoft.azure.cosmosdb.rx.internal; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.microsoft.azure.cosmosdb.CompositePath; -import com.microsoft.azure.cosmosdb.CompositePathSortOrder; -import com.microsoft.azure.cosmosdb.ConnectionMode; -import com.microsoft.azure.cosmosdb.ConnectionPolicy; -import com.microsoft.azure.cosmosdb.ConsistencyLevel; -import com.microsoft.azure.cosmosdb.DataType; -import com.microsoft.azure.cosmosdb.Database; -import com.microsoft.azure.cosmosdb.DatabaseForTest; -import com.microsoft.azure.cosmosdb.Document; -import com.microsoft.azure.cosmosdb.DocumentClientException; -import com.microsoft.azure.cosmosdb.DocumentCollection; -import com.microsoft.azure.cosmosdb.FeedOptions; -import com.microsoft.azure.cosmosdb.FeedResponse; -import com.microsoft.azure.cosmosdb.IncludedPath; -import com.microsoft.azure.cosmosdb.Index; -import com.microsoft.azure.cosmosdb.IndexingPolicy; -import com.microsoft.azure.cosmosdb.PartitionKey; -import com.microsoft.azure.cosmosdb.PartitionKeyDefinition; -import com.microsoft.azure.cosmosdb.RequestOptions; -import com.microsoft.azure.cosmosdb.Resource; -import com.microsoft.azure.cosmosdb.ResourceResponse; -import com.microsoft.azure.cosmosdb.RetryOptions; -import com.microsoft.azure.cosmosdb.SqlQuerySpec; -import com.microsoft.azure.cosmosdb.Undefined; -import com.microsoft.azure.cosmosdb.User; -import com.microsoft.azure.cosmosdb.internal.PathParser; -import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol; -import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; -import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder; -import com.microsoft.azure.cosmosdb.rx.FailureValidator; -import com.microsoft.azure.cosmosdb.rx.FeedResponseListValidator; -import com.microsoft.azure.cosmosdb.rx.ResourceResponseValidator; -import com.microsoft.azure.cosmosdb.rx.TestConfigurations; -import org.apache.commons.lang3.StringUtils; -import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.AfterSuite; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.BeforeSuite; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; -import rx.Observable; -import rx.observers.TestSubscriber; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; - -public class TestSuiteBase { - private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 500; - private static final ObjectMapper objectMapper = new ObjectMapper(); - protected static Logger logger = LoggerFactory.getLogger(TestSuiteBase.class.getSimpleName()); - protected static final int TIMEOUT = 8000; - protected static final int FEED_TIMEOUT = 12000; - protected static final int SETUP_TIMEOUT = 30000; - protected static final int SHUTDOWN_TIMEOUT = 12000; - - protected static final int SUITE_SETUP_TIMEOUT = 120000; - protected static final int SUITE_SHUTDOWN_TIMEOUT = 60000; - - protected static final int WAIT_REPLICA_CATCH_UP_IN_MILLIS = 4000; - - protected int subscriberValidationTimeout = TIMEOUT; - - protected static ConsistencyLevel accountConsistency; - - protected Builder clientBuilder; - - protected static Database SHARED_DATABASE; - protected static DocumentCollection SHARED_MULTI_PARTITION_COLLECTION; - protected static DocumentCollection SHARED_SINGLE_PARTITION_COLLECTION; - protected static DocumentCollection SHARED_SINGLE_PARTITION_COLLECTION_WITHOUT_PARTITION_KEY; - protected static DocumentCollection SHARED_MULTI_PARTITION_COLLECTION_WITH_COMPOSITE_AND_SPATIAL_INDEXES; - - - protected TestSuiteBase() { - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); - objectMapper.configure(JsonParser.Feature.ALLOW_TRAILING_COMMA, true); - objectMapper.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true); - logger.debug("Initializing {} ...", this.getClass().getSimpleName()); - } - - @BeforeMethod(groups = { "simple", "long", "direct", "multi-master", "emulator" }) - public void beforeMethod(Method method) { - if (this.clientBuilder != null) { - logger.info("Starting {}::{} using {} {} mode with {} consistency", - method.getDeclaringClass().getSimpleName(), method.getName(), - this.clientBuilder.getConnectionPolicy().getConnectionMode(), - this.clientBuilder.getConfigs().getProtocol(), - this.clientBuilder.getDesiredConsistencyLevel()); - return; - } - logger.info("Starting {}::{}", method.getDeclaringClass().getSimpleName(), method.getName()); - } - - @AfterMethod(groups = { "simple", "long", "direct", "multi-master", "emulator" }) - public void afterMethod(Method m) { - Test t = m.getAnnotation(Test.class); - logger.info("Finished {}:{}.", m.getDeclaringClass().getSimpleName(), m.getName()); - } - - private static class DatabaseManagerImpl implements DatabaseForTest.DatabaseManager { - public static DatabaseManagerImpl getInstance(AsyncDocumentClient client) { - return new DatabaseManagerImpl(client); - } - - private final AsyncDocumentClient client; - - private DatabaseManagerImpl(AsyncDocumentClient client) { - this.client = client; - } - - @Override - public Observable> queryDatabases(SqlQuerySpec query) { - return client.queryDatabases(query, null); - } - - @Override - public Observable> createDatabase(Database databaseDefinition) { - return client.createDatabase(databaseDefinition, null); - } - - @Override - public Observable> deleteDatabase(String id) { - - return client.deleteDatabase("dbs/" + id, null); - } - } - - @BeforeSuite(groups = { "simple", "long", "direct", "multi-master", "emulator" }, timeOut = SUITE_SETUP_TIMEOUT) - public static void beforeSuite() { - logger.info("beforeSuite Started"); - AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build(); - try { - DatabaseForTest dbForTest = DatabaseForTest.create(DatabaseManagerImpl.getInstance(houseKeepingClient)); - SHARED_DATABASE = dbForTest.createdDatabase; - RequestOptions options = new RequestOptions(); - options.setOfferThroughput(10100); - SHARED_MULTI_PARTITION_COLLECTION = createCollection(houseKeepingClient, SHARED_DATABASE.getId(), getCollectionDefinitionWithRangeRangeIndex(), options); - SHARED_SINGLE_PARTITION_COLLECTION = createCollection(houseKeepingClient, SHARED_DATABASE.getId(), getCollectionDefinition(), null); - SHARED_SINGLE_PARTITION_COLLECTION_WITHOUT_PARTITION_KEY = createCollection(houseKeepingClient, SHARED_DATABASE.getId(), getCollectionDefinitionSinglePartitionWithoutPartitionKey()); - SHARED_MULTI_PARTITION_COLLECTION_WITH_COMPOSITE_AND_SPATIAL_INDEXES = createCollection(houseKeepingClient, SHARED_DATABASE.getId(), getCollectionDefinitionMultiPartitionWithCompositeAndSpatialIndexes(), options); - } finally { - houseKeepingClient.close(); - } - } - - @AfterSuite(groups = { "simple", "long", "direct", "multi-master", "emulator" }, timeOut = SUITE_SHUTDOWN_TIMEOUT) - public static void afterSuite() { - logger.info("afterSuite Started"); - AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build(); - try { - safeDeleteDatabase(houseKeepingClient, SHARED_DATABASE); - DatabaseForTest.cleanupStaleTestDatabases(DatabaseManagerImpl.getInstance(houseKeepingClient)); - } finally { - safeClose(houseKeepingClient); - } - } - - protected static void truncateCollection(DocumentCollection collection) { - logger.info("Truncating collection {} ...", collection.getId()); - AsyncDocumentClient houseKeepingClient = createGatewayHouseKeepingDocumentClient().build(); - try { - List paths = collection.getPartitionKey().getPaths(); - - FeedOptions options = new FeedOptions(); - options.setMaxDegreeOfParallelism(-1); - options.setEnableCrossPartitionQuery(true); - options.setMaxItemCount(100); - - logger.info("Truncating collection {} documents ...", collection.getId()); - - houseKeepingClient.queryDocuments(collection.getSelfLink(), "SELECT * FROM root", options) - .flatMap(page -> Observable.from(page.getResults())) - .flatMap(doc -> { - RequestOptions requestOptions = new RequestOptions(); - - if (paths != null && !paths.isEmpty()) { - List pkPath = PathParser.getPathParts(paths.get(0)); - Object propertyValue = doc.getObjectByPath(pkPath); - if (propertyValue == null) { - propertyValue = Undefined.Value(); - } - - requestOptions.setPartitionKey(new PartitionKey(propertyValue)); - } - - return houseKeepingClient.deleteDocument(doc.getSelfLink(), requestOptions); - }).toCompletable().await(); - - logger.info("Truncating collection {} triggers ...", collection.getId()); - - houseKeepingClient.queryTriggers(collection.getSelfLink(), "SELECT * FROM root", options) - .flatMap(page -> Observable.from(page.getResults())) - .flatMap(trigger -> { - RequestOptions requestOptions = new RequestOptions(); - -// if (paths != null && !paths.isEmpty()) { -// Object propertyValue = trigger.getObjectByPath(PathParser.getPathParts(paths.get(0))); -// requestOptions.setPartitionKey(new PartitionKey(propertyValue)); -// } - - return houseKeepingClient.deleteTrigger(trigger.getSelfLink(), requestOptions); - }).toCompletable().await(); - - logger.info("Truncating collection {} storedProcedures ...", collection.getId()); - - houseKeepingClient.queryStoredProcedures(collection.getSelfLink(), "SELECT * FROM root", options) - .flatMap(page -> Observable.from(page.getResults())) - .flatMap(storedProcedure -> { - RequestOptions requestOptions = new RequestOptions(); - -// if (paths != null && !paths.isEmpty()) { -// Object propertyValue = storedProcedure.getObjectByPath(PathParser.getPathParts(paths.get(0))); -// requestOptions.setPartitionKey(new PartitionKey(propertyValue)); -// } - - return houseKeepingClient.deleteStoredProcedure(storedProcedure.getSelfLink(), requestOptions); - }).toCompletable().await(); - - logger.info("Truncating collection {} udfs ...", collection.getId()); - - houseKeepingClient.queryUserDefinedFunctions(collection.getSelfLink(), "SELECT * FROM root", options) - .flatMap(page -> Observable.from(page.getResults())) - .flatMap(udf -> { - RequestOptions requestOptions = new RequestOptions(); - -// if (paths != null && !paths.isEmpty()) { -// Object propertyValue = udf.getObjectByPath(PathParser.getPathParts(paths.get(0))); -// requestOptions.setPartitionKey(new PartitionKey(propertyValue)); -// } - - return houseKeepingClient.deleteUserDefinedFunction(udf.getSelfLink(), requestOptions); - }).toCompletable().await(); - - } finally { - houseKeepingClient.close(); - } - - logger.info("Finished truncating collection {}.", collection.getId()); - } - - protected static void waitIfNeededForReplicasToCatchUp(AsyncDocumentClient.Builder clientBuilder) { - switch (clientBuilder.getDesiredConsistencyLevel()) { - case Eventual: - case ConsistentPrefix: - logger.info(" additional wait in Eventual mode so the replica catch up"); - // give times to replicas to catch up after a write - try { - TimeUnit.MILLISECONDS.sleep(WAIT_REPLICA_CATCH_UP_IN_MILLIS); - } catch (Exception e) { - logger.error("unexpected failure", e); - } - - case Session: - case BoundedStaleness: - case Strong: - default: - break; - } - } - - private static DocumentCollection getCollectionDefinitionSinglePartitionWithoutPartitionKey() { - DocumentCollection collectionDefinition = new DocumentCollection(); - collectionDefinition.setId(UUID.randomUUID().toString()); - - return collectionDefinition; - } - - - public static DocumentCollection createCollection(String databaseId, - DocumentCollection collection, - RequestOptions options) { - AsyncDocumentClient client = createGatewayHouseKeepingDocumentClient().build(); - try { - return client.createCollection("dbs/" + databaseId, collection, options).toBlocking().single().getResource(); - } finally { - client.close(); - } - } - - public static DocumentCollection createCollection(AsyncDocumentClient client, String databaseId, - DocumentCollection collection, RequestOptions options) { - return client.createCollection("dbs/" + databaseId, collection, options).toBlocking().single().getResource(); - } - - public static DocumentCollection createCollection(AsyncDocumentClient client, String databaseId, - DocumentCollection collection) { - return client.createCollection("dbs/" + databaseId, collection, null).toBlocking().single().getResource(); - } - - private static DocumentCollection getCollectionDefinitionMultiPartitionWithCompositeAndSpatialIndexes() { - final String NUMBER_FIELD = "numberField"; - final String STRING_FIELD = "stringField"; - final String NUMBER_FIELD_2 = "numberField2"; - final String STRING_FIELD_2 = "stringField2"; - final String BOOL_FIELD = "boolField"; - final String NULL_FIELD = "nullField"; - final String OBJECT_FIELD = "objectField"; - final String ARRAY_FIELD = "arrayField"; - final String SHORT_STRING_FIELD = "shortStringField"; - final String MEDIUM_STRING_FIELD = "mediumStringField"; - final String LONG_STRING_FIELD = "longStringField"; - final String PARTITION_KEY = "pk"; - - DocumentCollection documentCollection = new DocumentCollection(); - - IndexingPolicy indexingPolicy = new IndexingPolicy(); - Collection> compositeIndexes = new ArrayList>(); - - //Simple - ArrayList compositeIndexSimple = new ArrayList(); - CompositePath compositePath1 = new CompositePath(); - compositePath1.setPath("/" + NUMBER_FIELD); - compositePath1.setOrder(CompositePathSortOrder.Ascending); - - CompositePath compositePath2 = new CompositePath(); - compositePath2.setPath("/" + STRING_FIELD); - compositePath2.setOrder(CompositePathSortOrder.Descending); - - compositeIndexSimple.add(compositePath1); - compositeIndexSimple.add(compositePath2); - - //Max Columns - ArrayList compositeIndexMaxColumns = new ArrayList(); - CompositePath compositePath3 = new CompositePath(); - compositePath3.setPath("/" + NUMBER_FIELD); - compositePath3.setOrder(CompositePathSortOrder.Descending); - - CompositePath compositePath4 = new CompositePath(); - compositePath4.setPath("/" + STRING_FIELD); - compositePath4.setOrder(CompositePathSortOrder.Ascending); - - CompositePath compositePath5 = new CompositePath(); - compositePath5.setPath("/" + NUMBER_FIELD_2); - compositePath5.setOrder(CompositePathSortOrder.Descending); - - CompositePath compositePath6 = new CompositePath(); - compositePath6.setPath("/" + STRING_FIELD_2); - compositePath6.setOrder(CompositePathSortOrder.Ascending); - - compositeIndexMaxColumns.add(compositePath3); - compositeIndexMaxColumns.add(compositePath4); - compositeIndexMaxColumns.add(compositePath5); - compositeIndexMaxColumns.add(compositePath6); - - //Primitive Values - ArrayList compositeIndexPrimitiveValues = new ArrayList(); - CompositePath compositePath7 = new CompositePath(); - compositePath7.setPath("/" + NUMBER_FIELD); - compositePath7.setOrder(CompositePathSortOrder.Descending); - - CompositePath compositePath8 = new CompositePath(); - compositePath8.setPath("/" + STRING_FIELD); - compositePath8.setOrder(CompositePathSortOrder.Ascending); - - CompositePath compositePath9 = new CompositePath(); - compositePath9.setPath("/" + BOOL_FIELD); - compositePath9.setOrder(CompositePathSortOrder.Descending); - - CompositePath compositePath10 = new CompositePath(); - compositePath10.setPath("/" + NULL_FIELD); - compositePath10.setOrder(CompositePathSortOrder.Ascending); - - compositeIndexPrimitiveValues.add(compositePath7); - compositeIndexPrimitiveValues.add(compositePath8); - compositeIndexPrimitiveValues.add(compositePath9); - compositeIndexPrimitiveValues.add(compositePath10); - - //Long Strings - ArrayList compositeIndexLongStrings = new ArrayList(); - CompositePath compositePath11 = new CompositePath(); - compositePath11.setPath("/" + STRING_FIELD); - - CompositePath compositePath12 = new CompositePath(); - compositePath12.setPath("/" + SHORT_STRING_FIELD); - - CompositePath compositePath13 = new CompositePath(); - compositePath13.setPath("/" + MEDIUM_STRING_FIELD); - - CompositePath compositePath14 = new CompositePath(); - compositePath14.setPath("/" + LONG_STRING_FIELD); - - compositeIndexLongStrings.add(compositePath11); - compositeIndexLongStrings.add(compositePath12); - compositeIndexLongStrings.add(compositePath13); - compositeIndexLongStrings.add(compositePath14); - - compositeIndexes.add(compositeIndexSimple); - compositeIndexes.add(compositeIndexMaxColumns); - compositeIndexes.add(compositeIndexPrimitiveValues); - compositeIndexes.add(compositeIndexLongStrings); - - indexingPolicy.setCompositeIndexes(compositeIndexes); - documentCollection.setIndexingPolicy(indexingPolicy); - - PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition(); - ArrayList partitionKeyPaths = new ArrayList(); - partitionKeyPaths.add("/" + PARTITION_KEY); - partitionKeyDefinition.setPaths(partitionKeyPaths); - documentCollection.setPartitionKey(partitionKeyDefinition); - - documentCollection.setId(UUID.randomUUID().toString()); - - return documentCollection; - } - - public static Document createDocument(AsyncDocumentClient client, String databaseId, String collectionId, Document document) { - return createDocument(client, databaseId, collectionId, document, null); - } - - public static Document createDocument(AsyncDocumentClient client, String databaseId, String collectionId, Document document, RequestOptions options) { - return client.createDocument(com.microsoft.azure.cosmosdb.rx.Utils.getCollectionNameLink(databaseId, collectionId), document, options, false).toBlocking().single().getResource(); - } - - public Observable> bulkInsert(AsyncDocumentClient client, - String collectionLink, - List documentDefinitionList, - int concurrencyLevel) { - ArrayList>> result = new ArrayList>>(documentDefinitionList.size()); - for (Document docDef : documentDefinitionList) { - result.add(client.createDocument(collectionLink, docDef, null, false)); - } - - return Observable.merge(result, concurrencyLevel); - } - - public Observable> bulkInsert(AsyncDocumentClient client, - String collectionLink, - List documentDefinitionList) { - return bulkInsert(client, collectionLink, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL); - } - - public List bulkInsertBlocking(AsyncDocumentClient client, - String collectionLink, - List documentDefinitionList) { - return bulkInsert(client, collectionLink, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL) - .map(ResourceResponse::getResource) - .toList() - .toBlocking() - .single(); - } - - public static ConsistencyLevel getAccountDefaultConsistencyLevel(AsyncDocumentClient client) { - return client.getDatabaseAccount().toBlocking().single().getConsistencyPolicy().getDefaultConsistencyLevel(); - } - - public static User createUser(AsyncDocumentClient client, String databaseId, User user) { - return client.createUser("dbs/" + databaseId, user, null).toBlocking().single().getResource(); - } - - public static User safeCreateUser(AsyncDocumentClient client, String databaseId, User user) { - deleteUserIfExists(client, databaseId, user.getId()); - return createUser(client, databaseId, user); - } - - private static DocumentCollection safeCreateCollection(AsyncDocumentClient client, String databaseId, DocumentCollection collection, RequestOptions options) { - deleteCollectionIfExists(client, databaseId, collection.getId()); - return createCollection(client, databaseId, collection, options); - } - - public static String getCollectionLink(DocumentCollection collection) { - return collection.getSelfLink(); - } - - static protected DocumentCollection getCollectionDefinition() { - PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); - ArrayList paths = new ArrayList(); - paths.add("/mypk"); - partitionKeyDef.setPaths(paths); - - DocumentCollection collectionDefinition = new DocumentCollection(); - collectionDefinition.setId(UUID.randomUUID().toString()); - collectionDefinition.setPartitionKey(partitionKeyDef); - - return collectionDefinition; - } - - static protected DocumentCollection getCollectionDefinitionWithRangeRangeIndex() { - PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); - ArrayList paths = new ArrayList<>(); - paths.add("/mypk"); - partitionKeyDef.setPaths(paths); - IndexingPolicy indexingPolicy = new IndexingPolicy(); - Collection includedPaths = new ArrayList<>(); - IncludedPath includedPath = new IncludedPath(); - includedPath.setPath("/*"); - Collection indexes = new ArrayList<>(); - Index stringIndex = Index.Range(DataType.String); - stringIndex.set("precision", -1); - indexes.add(stringIndex); - - Index numberIndex = Index.Range(DataType.Number); - numberIndex.set("precision", -1); - indexes.add(numberIndex); - includedPath.setIndexes(indexes); - includedPaths.add(includedPath); - indexingPolicy.setIncludedPaths(includedPaths); - - DocumentCollection collectionDefinition = new DocumentCollection(); - collectionDefinition.setIndexingPolicy(indexingPolicy); - collectionDefinition.setId(UUID.randomUUID().toString()); - collectionDefinition.setPartitionKey(partitionKeyDef); - - return collectionDefinition; - } - - public static void deleteCollectionIfExists(AsyncDocumentClient client, String databaseId, String collectionId) { - List res = client.queryCollections("dbs/" + databaseId, - String.format("SELECT * FROM root r where r.id = '%s'", collectionId), null).toBlocking().single() - .getResults(); - if (!res.isEmpty()) { - deleteCollection(client, com.microsoft.azure.cosmosdb.rx.Utils.getCollectionNameLink(databaseId, collectionId)); - } - } - - public static void deleteCollection(AsyncDocumentClient client, String collectionLink) { - client.deleteCollection(collectionLink, null).toBlocking().single(); - } - - public static void deleteDocumentIfExists(AsyncDocumentClient client, String databaseId, String collectionId, String docId) { - FeedOptions options = new FeedOptions(); - options.setPartitionKey(new PartitionKey(docId)); - List res = client - .queryDocuments(com.microsoft.azure.cosmosdb.rx.Utils.getCollectionNameLink(databaseId, collectionId), String.format("SELECT * FROM root r where r.id = '%s'", docId), options) - .toBlocking().single().getResults(); - if (!res.isEmpty()) { - deleteDocument(client, com.microsoft.azure.cosmosdb.rx.Utils.getDocumentNameLink(databaseId, collectionId, docId)); - } - } - - public static void safeDeleteDocument(AsyncDocumentClient client, String documentLink, RequestOptions options) { - if (client != null && documentLink != null) { - try { - client.deleteDocument(documentLink, options).toBlocking().single(); - } catch (Exception e) { - DocumentClientException dce = com.microsoft.azure.cosmosdb.rx.internal.Utils.as(e, DocumentClientException.class); - if (dce == null || dce.getStatusCode() != 404) { - throw e; - } - } - } - } - - public static void deleteDocument(AsyncDocumentClient client, String documentLink) { - client.deleteDocument(documentLink, null).toBlocking().single(); - } - - public static void deleteUserIfExists(AsyncDocumentClient client, String databaseId, String userId) { - List res = client - .queryUsers("dbs/" + databaseId, String.format("SELECT * FROM root r where r.id = '%s'", userId), null) - .toBlocking().single().getResults(); - if (!res.isEmpty()) { - deleteUser(client, com.microsoft.azure.cosmosdb.rx.Utils.getUserNameLink(databaseId, userId)); - } - } - - public static void deleteUser(AsyncDocumentClient client, String userLink) { - client.deleteUser(userLink, null).toBlocking().single(); - } - - public static String getDatabaseLink(Database database) { - return database.getSelfLink(); - } - - static private Database safeCreateDatabase(AsyncDocumentClient client, Database database) { - safeDeleteDatabase(client, database.getId()); - return createDatabase(client, database); - } - - static protected Database createDatabase(AsyncDocumentClient client, Database database) { - Observable> databaseObservable = client.createDatabase(database, null); - return databaseObservable.toBlocking().single().getResource(); - } - - static protected Database createDatabase(AsyncDocumentClient client, String databaseId) { - Database databaseDefinition = new Database(); - databaseDefinition.setId(databaseId); - return createDatabase(client, databaseDefinition); - } - - static protected Database createDatabaseIfNotExists(AsyncDocumentClient client, String databaseId) { - return client.queryDatabases(String.format("SELECT * FROM r where r.id = '%s'", databaseId), null).flatMap(p -> Observable.from(p.getResults())).switchIfEmpty( - Observable.defer(() -> { - - Database databaseDefinition = new Database(); - databaseDefinition.setId(databaseId); - - return client.createDatabase(databaseDefinition, null).map(ResourceResponse::getResource); - }) - ).toBlocking().single(); - } - - static protected void safeDeleteDatabase(AsyncDocumentClient client, Database database) { - if (database != null) { - safeDeleteDatabase(client, database.getId()); - } - } - - static protected void safeDeleteDatabase(AsyncDocumentClient client, String databaseId) { - if (client != null) { - try { - client.deleteDatabase(com.microsoft.azure.cosmosdb.rx.Utils.getDatabaseNameLink(databaseId), null).toBlocking().single(); - } catch (Exception e) { - } - } - } - - static protected void safeDeleteAllCollections(AsyncDocumentClient client, Database database) { - if (database != null) { - List collections = client.readCollections(database.getSelfLink(), null) - .flatMap(p -> Observable.from(p.getResults())) - .toList() - .toBlocking() - .single(); - - for(DocumentCollection collection: collections) { - client.deleteCollection(collection.getSelfLink(), null).toBlocking().single().getResource(); - } - } - } - - static protected void safeDeleteCollection(AsyncDocumentClient client, DocumentCollection collection) { - if (client != null && collection != null) { - try { - client.deleteCollection(collection.getSelfLink(), null).toBlocking().single(); - } catch (Exception e) { - } - } - } - - static protected void safeDeleteCollection(AsyncDocumentClient client, String databaseId, String collectionId) { - if (client != null && databaseId != null && collectionId != null) { - try { - client.deleteCollection("/dbs/" + databaseId + "/colls/" + collectionId, null).toBlocking().single(); - } catch (Exception e) { - } - } - } - - static protected void safeClose(AsyncDocumentClient client) { - if (client != null) { - try { - client.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - public void validateSuccess(Observable> observable, - ResourceResponseValidator validator) { - validateSuccess(observable, validator, subscriberValidationTimeout); - } - - public static void validateSuccess(Observable> observable, - ResourceResponseValidator validator, long timeout) { - - VerboseTestSubscriber> testSubscriber = new VerboseTestSubscriber<>(); - - observable.subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - testSubscriber.assertNoErrors(); - testSubscriber.assertCompleted(); - testSubscriber.assertValueCount(1); - validator.validate(testSubscriber.getOnNextEvents().get(0)); - } - - public void validateFailure(Observable> observable, - FailureValidator validator) { - validateFailure(observable, validator, subscriberValidationTimeout); - } - - public static void validateFailure(Observable> observable, - FailureValidator validator, long timeout) { - - VerboseTestSubscriber> testSubscriber = new VerboseTestSubscriber<>(); - - observable.subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - testSubscriber.assertNotCompleted(); - testSubscriber.assertTerminalEvent(); - assertThat(testSubscriber.getOnErrorEvents()).hasSize(1); - validator.validate(testSubscriber.getOnErrorEvents().get(0)); - } - - public void validateQuerySuccess(Observable> observable, - FeedResponseListValidator validator) { - validateQuerySuccess(observable, validator, subscriberValidationTimeout); - } - - public static void validateQuerySuccess(Observable> observable, - FeedResponseListValidator validator, long timeout) { - - VerboseTestSubscriber> testSubscriber = new VerboseTestSubscriber<>(); - - observable.subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - testSubscriber.assertNoErrors(); - testSubscriber.assertCompleted(); - validator.validate(testSubscriber.getOnNextEvents()); - } - - public void validateQueryFailure(Observable> observable, - FailureValidator validator) { - validateQueryFailure(observable, validator, subscriberValidationTimeout); - } - - public static void validateQueryFailure(Observable> observable, - FailureValidator validator, long timeout) { - - VerboseTestSubscriber> testSubscriber = new VerboseTestSubscriber<>(); - - observable.subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - testSubscriber.assertNotCompleted(); - testSubscriber.assertTerminalEvent(); - assertThat(testSubscriber.getOnErrorEvents()).hasSize(1); - validator.validate(testSubscriber.getOnErrorEvents().get(0)); - } - - @DataProvider - public static Object[][] clientBuilders() { - return new Object[][] { { createGatewayRxDocumentClient(ConsistencyLevel.Session, false, null) } }; - } - - @DataProvider - public static Object[][] clientBuildersWithSessionConsistency() { - return new Object[][] { - { createGatewayRxDocumentClient(ConsistencyLevel.Session, false, null) }, - { createDirectRxDocumentClient(ConsistencyLevel.Session, Protocol.Https, false, null) }, - { createDirectRxDocumentClient(ConsistencyLevel.Session, Protocol.Tcp, false, null) } - }; - } - - private static ConsistencyLevel parseConsistency(String consistency) { - if (consistency != null) { - for (ConsistencyLevel consistencyLevel : ConsistencyLevel.values()) { - if (consistencyLevel.name().toLowerCase().equals(consistency.toLowerCase())) { - return consistencyLevel; - } - } - } - - logger.error("Invalid configured test consistency [{}].", consistency); - throw new IllegalStateException("Invalid configured test consistency " + consistency); - } - - private static List parsePreferredLocation(String preferredLocations) { - if (StringUtils.isEmpty(preferredLocations)) { - return null; - } - - try { - return objectMapper.readValue(preferredLocations, new TypeReference>(){}); - } catch (Exception e) { - logger.error("Invalid configured test preferredLocations [{}].", preferredLocations); - throw new IllegalStateException("Invalid configured test preferredLocations " + preferredLocations); - } - } - - @DataProvider - public static Object[][] simpleClientBuildersWithDirect() { - return simpleClientBuildersWithDirect(Protocol.Https, Protocol.Tcp); - } - - @DataProvider - public static Object[][] simpleClientBuildersWithDirectHttps() { - return simpleClientBuildersWithDirect(Protocol.Https); - } - - @DataProvider - public static Object[][] simpleClientBuildersWithDirectTcp() { - return simpleClientBuildersWithDirect(Protocol.Tcp); - } - - private static Object[][] simpleClientBuildersWithDirect(Protocol... protocols) { - - accountConsistency = parseConsistency(TestConfigurations.CONSISTENCY); - logger.info("Max test consistency to use is [{}]", accountConsistency); - List testConsistencies = new ArrayList<>(); - - switch (accountConsistency) { - case Strong: - case BoundedStaleness: - case Session: - case ConsistentPrefix: - case Eventual: - testConsistencies.add(ConsistencyLevel.Eventual); - break; - default: - throw new IllegalStateException("Invalid configured test consistency " + accountConsistency); - } - - List preferredLocation = parsePreferredLocation(TestConfigurations.PREFERRED_LOCATIONS); - boolean isMultiMasterEnabled = preferredLocation != null && accountConsistency == ConsistencyLevel.Session; - - List builders = new ArrayList<>(); - builders.add(createGatewayRxDocumentClient(ConsistencyLevel.Session, false, null)); - - for (Protocol protocol : protocols) { - testConsistencies.forEach(consistencyLevel -> builders.add(createDirectRxDocumentClient(consistencyLevel, - protocol, - isMultiMasterEnabled, - preferredLocation))); - } - - builders.forEach(b -> logger.info("Will Use ConnectionMode [{}], Consistency [{}], Protocol [{}]", - b.getConnectionPolicy().getConnectionMode(), - b.getDesiredConsistencyLevel(), - b.getConfigs().getProtocol() - )); - - return builders.stream().map(b -> new Object[]{b}).collect(Collectors.toList()).toArray(new Object[0][]); - } - - @DataProvider - public static Object[][] clientBuildersWithDirect() { - return clientBuildersWithDirectAllConsistencies(Protocol.Https, Protocol.Tcp); - } - - @DataProvider - public static Object[][] clientBuildersWithDirectHttps() { - return clientBuildersWithDirectAllConsistencies(Protocol.Https); - } - - @DataProvider - public static Object[][] clientBuildersWithDirectTcp() { - return clientBuildersWithDirectAllConsistencies(Protocol.Tcp); - } - - @DataProvider - public static Object[][] clientBuildersWithDirectSession() { - return clientBuildersWithDirectSession(Protocol.Https, Protocol.Tcp); - } - - private static Object[][] clientBuildersWithDirectSession(Protocol... protocols) { - return clientBuildersWithDirect(new ArrayList(){{add(ConsistencyLevel.Session);}} , protocols); - } - - private static Object[][] clientBuildersWithDirectAllConsistencies(Protocol... protocols) { - accountConsistency = parseConsistency(TestConfigurations.CONSISTENCY); - logger.info("Max test consistency to use is [{}]", accountConsistency); - List testConsistencies = new ArrayList<>(); - - switch (accountConsistency) { - case Strong: - testConsistencies.add(ConsistencyLevel.Strong); - case BoundedStaleness: - testConsistencies.add(ConsistencyLevel.BoundedStaleness); - case Session: - testConsistencies.add(ConsistencyLevel.Session); - case ConsistentPrefix: - testConsistencies.add(ConsistencyLevel.ConsistentPrefix); - case Eventual: - testConsistencies.add(ConsistencyLevel.Eventual); - break; - default: - throw new IllegalStateException("Invalid configured test consistency " + accountConsistency); - } - return clientBuildersWithDirect(testConsistencies, protocols); - } - - private static Object[][] clientBuildersWithDirect(List testConsistencies, Protocol... protocols) { - - List preferredLocation = parsePreferredLocation(TestConfigurations.PREFERRED_LOCATIONS); - boolean isMultiMasterEnabled = preferredLocation != null && accountConsistency == ConsistencyLevel.Session; - - List builders = new ArrayList<>(); - builders.add(createGatewayRxDocumentClient(ConsistencyLevel.Session, isMultiMasterEnabled, preferredLocation)); - - for (Protocol protocol : protocols) { - testConsistencies.forEach(consistencyLevel -> builders.add(createDirectRxDocumentClient(consistencyLevel, - protocol, - isMultiMasterEnabled, - preferredLocation))); - } - - builders.forEach(b -> logger.info("Will Use ConnectionMode [{}], Consistency [{}], Protocol [{}]", - b.getConnectionPolicy().getConnectionMode(), - b.getDesiredConsistencyLevel(), - b.getConfigs().getProtocol() - )); - - return builders.stream().map(b -> new Object[]{b}).collect(Collectors.toList()).toArray(new Object[0][]); - } - - static protected AsyncDocumentClient.Builder createGatewayHouseKeepingDocumentClient() { - ConnectionPolicy connectionPolicy = new ConnectionPolicy(); - connectionPolicy.setConnectionMode(ConnectionMode.Gateway); - RetryOptions options = new RetryOptions(); - options.setMaxRetryWaitTimeInSeconds(SUITE_SETUP_TIMEOUT); - connectionPolicy.setRetryOptions(options); - return new AsyncDocumentClient.Builder().withServiceEndpoint(TestConfigurations.HOST) - .withMasterKeyOrResourceToken(TestConfigurations.MASTER_KEY) - .withConnectionPolicy(connectionPolicy) - .withConsistencyLevel(ConsistencyLevel.Session); - } - - static protected AsyncDocumentClient.Builder createGatewayRxDocumentClient(ConsistencyLevel consistencyLevel, boolean multiMasterEnabled, List preferredLocations) { - ConnectionPolicy connectionPolicy = new ConnectionPolicy(); - connectionPolicy.setConnectionMode(ConnectionMode.Gateway); - connectionPolicy.setUsingMultipleWriteLocations(multiMasterEnabled); - connectionPolicy.setPreferredLocations(preferredLocations); - return new AsyncDocumentClient.Builder().withServiceEndpoint(TestConfigurations.HOST) - .withMasterKeyOrResourceToken(TestConfigurations.MASTER_KEY) - .withConnectionPolicy(connectionPolicy) - .withConsistencyLevel(consistencyLevel); - } - - static protected AsyncDocumentClient.Builder createGatewayRxDocumentClient() { - return createGatewayRxDocumentClient(ConsistencyLevel.Session, false, null); - } - - static protected AsyncDocumentClient.Builder createDirectRxDocumentClient(ConsistencyLevel consistencyLevel, - Protocol protocol, - boolean multiMasterEnabled, - List preferredLocations) { - ConnectionPolicy connectionPolicy = new ConnectionPolicy(); - connectionPolicy.setConnectionMode(ConnectionMode.Direct); - - if (preferredLocations != null) { - connectionPolicy.setPreferredLocations(preferredLocations); - } - - if (multiMasterEnabled && consistencyLevel == ConsistencyLevel.Session) { - connectionPolicy.setUsingMultipleWriteLocations(true); - } - - Configs configs = spy(new Configs()); - doAnswer((Answer) invocation -> protocol).when(configs).getProtocol(); - - return new AsyncDocumentClient.Builder().withServiceEndpoint(TestConfigurations.HOST) - .withMasterKeyOrResourceToken(TestConfigurations.MASTER_KEY) - .withConnectionPolicy(connectionPolicy) - .withConsistencyLevel(consistencyLevel) - .withConfigs(configs); - } - - protected int expectedNumberOfPages(int totalExpectedResult, int maxPageSize) { - return Math.max((totalExpectedResult + maxPageSize - 1 ) / maxPageSize, 1); - } - - @DataProvider(name = "queryMetricsArgProvider") - public Object[][] queryMetricsArgProvider() { - return new Object[][]{ - {true}, - {false}, - }; - } - - public static class VerboseTestSubscriber extends TestSubscriber { - @Override - public void assertNoErrors() { - List onErrorEvents = getOnErrorEvents(); - StringBuilder errorMessageBuilder = new StringBuilder(); - if (!onErrorEvents.isEmpty()) { - for(Throwable throwable : onErrorEvents) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - throwable.printStackTrace(pw); - String sStackTrace = sw.toString(); // stack trace as a string - errorMessageBuilder.append(sStackTrace); - } - - AssertionError ae = new AssertionError(errorMessageBuilder.toString()); - throw ae; - } - } - } -}