diff --git a/.gitignore b/.gitignore
index bdb262af0bac6..19547d6a1616a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,3 +19,6 @@ hs_err_pid*
doc
docs
+target
+bin
+test-output
diff --git a/commons-test-utils/pom.xml b/commons-test-utils/pom.xml
index 977a5a5822a00..857a9edeb3d54 100644
--- a/commons-test-utils/pom.xml
+++ b/commons-test-utils/pom.xml
@@ -126,6 +126,10 @@ SOFTWARE.
log4j
${log4j.version}
+
+ io.projectreactor
+ reactor-core
+
com.google.guava
guava
diff --git a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/FeedResponseListValidator.java b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/FeedResponseListValidator.java
index 6356f1363fff9..ccf6bdd3df98e 100644
--- a/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/FeedResponseListValidator.java
+++ b/commons-test-utils/src/main/java/com/microsoft/azure/cosmosdb/rx/FeedResponseListValidator.java
@@ -32,6 +32,7 @@
import java.util.Map;
import java.util.stream.Collectors;
+import com.microsoft.azure.cosmos.CosmosItemSettings;
import com.microsoft.azure.cosmosdb.BridgeInternal;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.microsoft.azure.cosmosdb.CompositePath;
@@ -72,7 +73,7 @@ public void validate(List> feedList) {
return this;
}
- public Builder containsExactly(List expectedIds) {
+ public Builder containsExactly(List expectedRids) {
validators.add(new FeedResponseListValidator() {
@Override
public void validate(List> feedList) {
@@ -83,6 +84,23 @@ public void validate(List> feedList) {
.collect(Collectors.toList());
assertThat(actualIds)
.describedAs("Resource IDs of results")
+ .containsExactlyElementsOf(expectedRids);
+ }
+ });
+ return this;
+ }
+
+ public Builder containsExactlyIds(List expectedIds) {
+ validators.add(new FeedResponseListValidator() {
+ @Override
+ public void validate(List> feedList) {
+ List actualIds = feedList
+ .stream()
+ .flatMap(f -> f.getResults().stream())
+ .map(r -> r.getId())
+ .collect(Collectors.toList());
+ assertThat(actualIds)
+ .describedAs("IDs of results")
.containsExactlyElementsOf(expectedIds);
}
});
@@ -186,11 +204,11 @@ public void validate(List> feedList) {
}
public Builder withAggregateValue(Object value) {
- validators.add(new FeedResponseListValidator() {
+ validators.add(new FeedResponseListValidator() {
@Override
- public void validate(List> feedList) {
- List list = feedList.get(0).getResults();
- Document result = list.size() > 0 ? list.get(0) : null;
+ public void validate(List> feedList) {
+ List list = feedList.get(0).getResults();
+ CosmosItemSettings result = list.size() > 0 ? list.get(0) : null;
if (result != null) {
if (value instanceof Double) {
@@ -223,13 +241,13 @@ public void validate(List> feedList) {
return this;
}
- public Builder withOrderedResults(ArrayList expectedOrderedList,
+ public Builder withOrderedResults(ArrayList expectedOrderedList,
ArrayList compositeIndex) {
- validators.add(new FeedResponseListValidator() {
+ validators.add(new FeedResponseListValidator() {
@Override
- public void validate(List> feedList) {
+ public void validate(List> feedList) {
- List resultOrderedList = feedList.stream()
+ List resultOrderedList = feedList.stream()
.flatMap(f -> f.getResults().stream())
.collect(Collectors.toList());
assertThat(expectedOrderedList.size()).isEqualTo(resultOrderedList.size());
diff --git a/commons/pom.xml b/commons/pom.xml
index 2db549d7278de..293faef19f94d 100644
--- a/commons/pom.xml
+++ b/commons/pom.xml
@@ -194,6 +194,31 @@ SOFTWARE.
${log4j.version}
test
+
+ io.projectreactor
+ reactor-core
+
+
+ io.projectreactor.addons
+ reactor-adapter
+ ${reactor-addons.version}
+ test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ com.github.akarnokd
+ rxjava2-interop
+ ${rxjava2interop.verison}
+
+
+ io.reactivex.rxjava2
+ rxjava
+ ${rxjava2.version}
+
com.google.guava
guava
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainerRequestOptions.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosContainerRequestOptions.java
similarity index 99%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainerRequestOptions.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosContainerRequestOptions.java
index 1ba2413c293bd..cbed66e3bc830 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainerRequestOptions.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosContainerRequestOptions.java
@@ -127,4 +127,4 @@ protected RequestOptions toRequestOptions() {
requestOptions.setConsistencyLevel(consistencyLevel);
return requestOptions;
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainerSettings.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosContainerSettings.java
similarity index 87%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainerSettings.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosContainerSettings.java
index 62989947a4f91..b5256f1915b71 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainerSettings.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosContainerSettings.java
@@ -22,6 +22,7 @@
*/
package com.microsoft.azure.cosmos;
+import com.microsoft.azure.cosmosdb.ConflictResolutionPolicy;
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.IndexingPolicy;
import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
@@ -179,6 +180,30 @@ public void setPartitionKey(PartitionKeyDefinition partitionKeyDefinition) {
this.partitionKeyDefinition = partitionKeyDefinition;
}
+ /**
+ * Gets the conflictResolutionPolicy that is used for resolving conflicting writes
+ * on documents in different regions, in a collection in the Azure Cosmos DB service.
+ *
+ * @return ConflictResolutionPolicy
+ */
+ public ConflictResolutionPolicy getConflictResolutionPolicy() {
+ return super.getObject(Constants.Properties.CONFLICT_RESOLUTION_POLICY, ConflictResolutionPolicy.class);
+ }
+
+ /**
+ * Sets the conflictResolutionPolicy that is used for resolving conflicting writes
+ * on documents in different regions, in a collection in the Azure Cosmos DB service.
+ *
+ * @param value ConflictResolutionPolicy to be used.
+ */
+ public void setConflictResolutionPolicy(ConflictResolutionPolicy value) {
+ if (value == null) {
+ throw new IllegalArgumentException("CONFLICT_RESOLUTION_POLICY cannot be null.");
+ }
+
+ super.set(Constants.Properties.CONFLICT_RESOLUTION_POLICY, value);
+ }
+
DocumentCollection getV2Collection(){
DocumentCollection collection = new DocumentCollection(this.toJson());
collection.setPartitionKey(this.getPartitionKey());
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseRequestOptions.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseRequestOptions.java
similarity index 99%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseRequestOptions.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseRequestOptions.java
index 77a60f70d98f0..8047375213272 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseRequestOptions.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseRequestOptions.java
@@ -56,4 +56,4 @@ protected RequestOptions toRequestOptions() {
requestOptions.setOfferThroughput(offerThroughput);
return requestOptions;
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseSettings.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseSettings.java
similarity index 99%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseSettings.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseSettings.java
index c53a386f7601c..6415bd16753b2 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseSettings.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosDatabaseSettings.java
@@ -59,4 +59,4 @@ public CosmosDatabaseSettings(String id) {
static List getFromV2Results(List results){
return results.stream().map(CosmosDatabaseSettings::new).collect(Collectors.toList());
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosFeedResponse.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosFeedResponse.java
similarity index 100%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosFeedResponse.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosFeedResponse.java
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemRequestOptions.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosItemRequestOptions.java
similarity index 99%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemRequestOptions.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosItemRequestOptions.java
index 62f017ab337dc..bce7b56e5fcff 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemRequestOptions.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosItemRequestOptions.java
@@ -179,4 +179,4 @@ protected RequestOptions toRequestOptions() {
requestOptions.setPartitionKey(partitionKey);
return requestOptions;
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemSettings.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosItemSettings.java
similarity index 89%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemSettings.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosItemSettings.java
index 9000ed4ed3ca0..6bee41456d56e 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemSettings.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosItemSettings.java
@@ -49,6 +49,17 @@ public CosmosItemSettings(String jsonString) {
super(jsonString);
}
+
+ /**
+ * Initialize an CosmosItemSettings object from json string.
+ *
+ * @param jsonString the json string that represents the item object.
+ * @param objectMapper the custom object mapper
+ */
+ public CosmosItemSettings(String jsonString, ObjectMapper objectMapper) {
+ super(jsonString, objectMapper);
+ }
+
/**
* fromObject returns Document for compatibility with V2 sdk
*
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosRequestOptions.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosRequestOptions.java
similarity index 99%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosRequestOptions.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosRequestOptions.java
index 30a5085021ccd..fb9df401c432f 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosRequestOptions.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosRequestOptions.java
@@ -56,4 +56,4 @@ protected RequestOptions toRequestOptions(){
requestOptions.setAccessCondition(accessCondition);
return requestOptions;
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosResource.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosResource.java
similarity index 100%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosResource.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosResource.java
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosResponse.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosResponse.java
similarity index 84%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosResponse.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosResponse.java
index 4154283e16584..8a74e286daacd 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosResponse.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosResponse.java
@@ -22,12 +22,16 @@
*/
package com.microsoft.azure.cosmos;
+import com.microsoft.azure.cosmosdb.ClientSideRequestStatistics;
import com.microsoft.azure.cosmosdb.Resource;
import com.microsoft.azure.cosmosdb.ResourceResponse;
import com.microsoft.azure.cosmosdb.StoredProcedureResponse;
+import java.time.Duration;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+
public class CosmosResponse {
private T resourceSettings;
protected ResourceResponse resourceResponseWrapper;
@@ -134,5 +138,21 @@ public Map getResponseHeaders() {
return resourceResponseWrapper.getResponseHeaders();
}
-
+ /**
+ * Gets the diagnostics information for the current request to Azure Cosmos DB service.
+ *
+ * @return diagnostics information for the current request to Azure Cosmos DB service.
+ */
+ public String getRequestDiagnosticsString() {
+ return resourceResponseWrapper.getRequestDiagnosticsString();
+ }
+
+ /**
+ * Gets the end-to-end request latency for the current request to Azure Cosmos DB service.
+ *
+ * @return end-to-end request latency for the current request to Azure Cosmos DB service.
+ */
+ public Duration getRequestLatency() {
+ return resourceResponseWrapper.getRequestLatency();
+ }
}
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosStoredProcedureRequestOptions.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosStoredProcedureRequestOptions.java
similarity index 100%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosStoredProcedureRequestOptions.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosStoredProcedureRequestOptions.java
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosStoredProcedureSettings.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosStoredProcedureSettings.java
similarity index 100%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosStoredProcedureSettings.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosStoredProcedureSettings.java
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosTriggerSettings.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosTriggerSettings.java
similarity index 100%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosTriggerSettings.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosTriggerSettings.java
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserDefinedFunctionSettings.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosUserDefinedFunctionSettings.java
similarity index 100%
rename from sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserDefinedFunctionSettings.java
rename to commons/src/main/java/com/microsoft/azure/cosmos/CosmosUserDefinedFunctionSettings.java
diff --git a/commons/src/main/java/com/microsoft/azure/cosmos/CosmosUserSettings.java b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosUserSettings.java
new file mode 100644
index 0000000000000..cc81ad0d96e6d
--- /dev/null
+++ b/commons/src/main/java/com/microsoft/azure/cosmos/CosmosUserSettings.java
@@ -0,0 +1,59 @@
+package com.microsoft.azure.cosmos;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.microsoft.azure.cosmosdb.DocumentCollection;
+import com.microsoft.azure.cosmosdb.Resource;
+import com.microsoft.azure.cosmosdb.ResourceResponse;
+import com.microsoft.azure.cosmosdb.User;
+import com.microsoft.azure.cosmosdb.internal.Constants;
+
+public class CosmosUserSettings extends Resource {
+ /**
+ * Initialize a user object.
+ */
+ public CosmosUserSettings() {
+ super();
+ }
+
+ /**
+ * Initialize a user object from json string.
+ *
+ * @param jsonString the json string that represents the database user.
+ */
+ public CosmosUserSettings(String jsonString) {
+ super(jsonString);
+ }
+
+ CosmosUserSettings(ResourceResponse response) {
+ super(response.getResource().toJson());
+ }
+
+ // Converting document collection to CosmosContainerSettings
+ CosmosUserSettings(User user){
+ super(user.toJson());
+ }
+
+ /**
+ * Gets the self-link of the permissions associated with the user.
+ *
+ * @return the permissions link.
+ */
+ public String getPermissionsLink() {
+ String selfLink = this.getSelfLink();
+ if (selfLink.endsWith("/")) {
+ return selfLink + super.getString(Constants.Properties.PERMISSIONS_LINK);
+ } else {
+ return selfLink + "/" + super.getString(Constants.Properties.PERMISSIONS_LINK);
+ }
+ }
+
+ public User getV2User() {
+ return new User(this.toJson());
+ }
+
+ static List getFromV2Results(List results) {
+ return results.stream().map(CosmosUserSettings::new).collect(Collectors.toList());
+ }
+}
\ No newline at end of file
diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/BridgeInternal.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/BridgeInternal.java
index e0bc791440772..f7c90034da6c1 100644
--- a/commons/src/main/java/com/microsoft/azure/cosmosdb/BridgeInternal.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/BridgeInternal.java
@@ -27,6 +27,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.internal.query.metrics.ClientSideMetrics;
+import com.microsoft.azure.cosmosdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceResponse;
import com.microsoft.azure.cosmosdb.rx.internal.Strings;
@@ -282,4 +283,12 @@ public static String getInnerErrorMessage(DocumentClientException documentClient
}
return documentClientException.getInnerErrorMessage();
}
+
+ public static PartitionKeyInternal getNonePartitionKey(PartitionKeyDefinition partitionKeyDefinition) {
+ return partitionKeyDefinition.getNonePartitionKeyValue();
+ }
+
+ public static PartitionKey getPartitionKey(PartitionKeyInternal partitionKeyInternal) {
+ return new PartitionKey(partitionKeyInternal);
+ }
}
diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/PartitionKey.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/PartitionKey.java
index 8defae8237b43..2f25785475d25 100644
--- a/commons/src/main/java/com/microsoft/azure/cosmosdb/PartitionKey.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/PartitionKey.java
@@ -58,6 +58,8 @@ public static PartitionKey FromJsonString(String jsonString) {
return new PartitionKey(PartitionKeyInternal.fromJsonString(jsonString));
}
+ public static PartitionKey None = new PartitionKey(PartitionKeyInternal.None);
+
/**
* Serialize the PartitionKey object to a JSON string.
*
diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/PartitionKeyDefinition.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/PartitionKeyDefinition.java
index e9f71e9928b0a..bc6fda9090410 100644
--- a/commons/src/main/java/com/microsoft/azure/cosmosdb/PartitionKeyDefinition.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/PartitionKeyDefinition.java
@@ -24,6 +24,7 @@
package com.microsoft.azure.cosmosdb;
import com.microsoft.azure.cosmosdb.internal.Constants;
+import com.microsoft.azure.cosmosdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.cosmosdb.rx.internal.Strings;
import java.util.ArrayList;
@@ -40,6 +41,7 @@ public final class PartitionKeyDefinition extends JsonSerializable {
private List paths;
private PartitionKind kind;
private PartitionKeyDefinitionVersion version;
+ private Boolean systemKey;
/**
* Constructor. Creates a new instance of the PartitionKeyDefinition object.
@@ -134,6 +136,31 @@ public void setPaths(List paths) {
this.paths = paths;
}
+ /**
+ * Indicates if the partition key is generated by the system.
+ *
+ * @return the boolean indicating is it is a system key.
+ */
+ Boolean isSystemKey() {
+ if (this.systemKey == null) {
+ if (super.has(Constants.Properties.SYSTEM_KEY)) {
+ this.systemKey = super.getBoolean(Constants.Properties.SYSTEM_KEY);
+ } else {
+ this.systemKey = false;
+ }
+ }
+
+ return this.systemKey;
+ }
+
+ PartitionKeyInternal getNonePartitionKeyValue() {
+ if (this.getPaths().size() == 0 || this.isSystemKey()) {
+ return PartitionKeyInternal.Empty;
+ } else {
+ return PartitionKeyInternal.UndefinedPartitionKey;
+ }
+ }
+
@Override
void populatePropertyBag() {
if (this.kind != null) {
diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/Constants.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/Constants.java
index 6ae4ca51597a2..c3d11aa753d36 100644
--- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/Constants.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/Constants.java
@@ -157,6 +157,7 @@ public static final class Properties {
public static final String PARTITION_KEY_PATHS = "paths";
public static final String PARTITION_KIND = "kind";
public static final String PARTITION_KEY_DEFINITION_VERSION = "version";
+ public static final String SYSTEM_KEY = "systemKey";
public static final String RESOURCE_PARTITION_KEY = "resourcePartitionKey";
public static final String PARTITION_KEY_RANGE_ID = "partitionKeyRangeId";
diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java
index 2a90ef59c029c..78c0bf4e23ffa 100644
--- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/HttpConstants.java
@@ -265,7 +265,7 @@ public static class A_IMHeaderValues {
}
public static class Versions {
- public static final String CURRENT_VERSION = "2018-09-17";
+ public static final String CURRENT_VERSION = "2018-12-31";
// TODO: FIXME we can use maven plugin for generating a version file
// @see https://stackoverflow.com/questions/2469922/generate-a-version-java-file-in-maven
diff --git a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternal.java b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternal.java
index 1051193584ae6..35bcc575ec2bb 100644
--- a/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternal.java
+++ b/commons/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternal.java
@@ -62,6 +62,9 @@ public class PartitionKeyInternal implements Comparable {
private static final String MAX_STRING = "MaxString";
private static final String INFINITY = "Infinity";
+ public static final PartitionKeyInternal NonePartitionKey =
+ new PartitionKeyInternal();
+
public static final PartitionKeyInternal EmptyPartitionKey =
new PartitionKeyInternal(new ArrayList<>());
@@ -71,9 +74,16 @@ public class PartitionKeyInternal implements Comparable {
add(new InfinityPartitionKeyComponent());
}});
+ @SuppressWarnings("serial")
+ public static final PartitionKeyInternal UndefinedPartitionKey =
+ new PartitionKeyInternal(new ArrayList() {{
+ add(new UndefinedPartitionKeyComponent());
+ }});
+
public static final PartitionKeyInternal InclusiveMinimum = PartitionKeyInternal.EmptyPartitionKey;
public static final PartitionKeyInternal ExclusiveMaximum = PartitionKeyInternal.InfinityPartitionKey;
public static final PartitionKeyInternal Empty = PartitionKeyInternal.EmptyPartitionKey;
+ public static final PartitionKeyInternal None = PartitionKeyInternal.NonePartitionKey;
final List components;
@@ -85,6 +95,10 @@ public PartitionKeyInternal(List values) {
this.components = values;
}
+ public PartitionKeyInternal() {
+ this.components = null;
+ }
+
public static PartitionKeyInternal fromJsonString(String partitionKey) {
if (Strings.isNullOrEmpty(partitionKey)) {
throw new IllegalArgumentException(String.format(RMResources.UnableToDeserializePartitionKeyValue, partitionKey));
@@ -178,6 +192,10 @@ public boolean equals(Object obj) {
public int compareTo(PartitionKeyInternal other) {
if (other == null) {
throw new IllegalArgumentException("other");
+ } else if (other.components == null || this.components == null) {
+ int otherComponentsCount = other.components == null ? 0 : other.components.size();
+ int thisComponentsCount = this.components == null ? 0 : this.components.size();
+ return (int) Math.signum(thisComponentsCount - otherComponentsCount);
}
for (int i = 0; i < Math.min(this.components.size(), other.components.size()); i++) {
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/AddressResolver.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/AddressResolver.java
index db65feef97d5d..37527ab7d9ca0 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/AddressResolver.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/AddressResolver.java
@@ -655,7 +655,7 @@ private PartitionKeyRange tryResolveServerPartitionByPartitionKey(
throw new InternalServerErrorException(String.format("partition key is null '%s'", partitionKeyString));
}
- if (partitionKey.getComponents().size() == collection.getPartitionKey().getPaths().size()) {
+ if (partitionKey.equals(PartitionKeyInternal.Empty) || partitionKey.getComponents().size() == collection.getPartitionKey().getPaths().size()) {
// Although we can compute effective partition key here, in general case this Gateway can have outdated
// partition key definition cached - like if collection with same name but with Range partitioning is created.
// In this case server will not pass x-ms-documentdb-collection-rid check and will return back InvalidPartitionException.
diff --git a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalHelper.java b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalHelper.java
index 316f6e87dc353..d0c81ddf8c194 100644
--- a/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalHelper.java
+++ b/direct-impl/src/main/java/com/microsoft/azure/cosmosdb/internal/routing/PartitionKeyInternalHelper.java
@@ -140,6 +140,10 @@ public static String getEffectivePartitionKeyString(PartitionKeyInternal partiti
}
public static String getEffectivePartitionKeyString(PartitionKeyInternal partitionKeyInternal, PartitionKeyDefinition partitionKeyDefinition, boolean strict) {
+ if (partitionKeyInternal.components == null) {
+ throw new IllegalArgumentException(RMResources.TooFewPartitionKeyComponents);
+ }
+
if (partitionKeyInternal.equals(PartitionKeyInternal.EmptyPartitionKey)) {
return MinimumInclusiveEffectivePartitionKey;
}
diff --git a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/CollectionCRUDAsyncAPITest.java b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/CollectionCRUDAsyncAPITest.java
index e9d0c4d336042..4ed8e550f35ed 100644
--- a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/CollectionCRUDAsyncAPITest.java
+++ b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/CollectionCRUDAsyncAPITest.java
@@ -106,6 +106,11 @@ public void setUp() {
public void before() {
collectionDefinition = new DocumentCollection();
collectionDefinition.setId(UUID.randomUUID().toString());
+ PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
+ ArrayList paths = new ArrayList();
+ paths.add("/mypk");
+ partitionKeyDef.setPaths(paths);
+ collectionDefinition.setPartitionKey(partitionKeyDef);
}
@AfterClass(groups = "samples", timeOut = TIMEOUT)
diff --git a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/ConflictAPITest.java b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/ConflictAPITest.java
index 8aad1f45b8e93..d82c4aa07b9dd 100644
--- a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/ConflictAPITest.java
+++ b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/ConflictAPITest.java
@@ -32,6 +32,7 @@
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.FeedResponse;
+import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import org.testng.annotations.AfterClass;
@@ -40,6 +41,7 @@
import rx.Observable;
import rx.observable.ListenableFutureObservable;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -77,6 +79,11 @@ public void setUp() {
DocumentCollection collectionDefinition = new DocumentCollection();
collectionDefinition.setId(UUID.randomUUID().toString());
+ PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
+ ArrayList paths = new ArrayList();
+ paths.add("/mypk");
+ partitionKeyDef.setPaths(paths);
+ collectionDefinition.setPartitionKey(partitionKeyDef);
// Create database
createdDatabase = Utils.createDatabaseForTest(client);
diff --git a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/DocumentCRUDAsyncAPITest.java b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/DocumentCRUDAsyncAPITest.java
index a9df3991e84c8..c68157454ca46 100644
--- a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/DocumentCRUDAsyncAPITest.java
+++ b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/DocumentCRUDAsyncAPITest.java
@@ -221,9 +221,11 @@ public void createDocumentWithProgrammableDocumentDefinition() throws Exception
.createDocument(getCollectionLink(), documentDefinition, null, false).toBlocking().single()
.getResource();
+ RequestOptions options = new RequestOptions();
+ options.setPartitionKey(PartitionKey.None);
// Read the created document
Observable> readDocumentObservable = asyncClient
- .readDocument(getDocumentLink(createdDocument), null);
+ .readDocument(getDocumentLink(createdDocument), options);
final CountDownLatch completionLatch = new CountDownLatch(1);
diff --git a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/DocumentQueryAsyncAPITest.java b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/DocumentQueryAsyncAPITest.java
index 35d652c3efdf5..0cb6ec4013996 100644
--- a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/DocumentQueryAsyncAPITest.java
+++ b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/DocumentQueryAsyncAPITest.java
@@ -83,7 +83,7 @@
* {@link #transformObservableToGoogleGuavaListenableFuture()}
*/
public class DocumentQueryAsyncAPITest {
- private final static int TIMEOUT = 60000;
+ private final static int TIMEOUT = 3 * 60000;
private AsyncDocumentClient asyncClient;
private DocumentCollection createdCollection;
private Database createdDatabase;
@@ -102,6 +102,11 @@ public void setUp() {
DocumentCollection collectionDefinition = new DocumentCollection();
collectionDefinition.setId(UUID.randomUUID().toString());
+ PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
+ ArrayList paths = new ArrayList();
+ paths.add("/mypk");
+ partitionKeyDef.setPaths(paths);
+ collectionDefinition.setPartitionKey(partitionKeyDef);
// Create database
@@ -136,6 +141,7 @@ public void queryDocuments_Async() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
+ options.setEnableCrossPartitionQuery(true);
Observable> documentQueryObservable = asyncClient
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
@@ -182,6 +188,7 @@ public void queryDocuments_Async_withoutLambda() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
+ options.setEnableCrossPartitionQuery(true);
Observable> documentQueryObservable = asyncClient
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
@@ -231,6 +238,7 @@ public void queryDocuments_findTotalRequestCharge() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
+ options.setEnableCrossPartitionQuery(true);
Observable totalChargeObservable = asyncClient
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options)
@@ -256,6 +264,7 @@ public void queryDocuments_unsubscribeAfterFirstPage() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
+ options.setEnableCrossPartitionQuery(true);
Observable> requestChargeObservable = asyncClient
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
@@ -300,6 +309,7 @@ public void queryDocuments_filterFetchedResults() throws Exception {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
+ options.setEnableCrossPartitionQuery(true);
Func1 isPrimeNumber = new Func1() {
@@ -359,6 +369,7 @@ public void queryDocuments_toBlocking_toIterator() {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
+ options.setEnableCrossPartitionQuery(true);
Observable> documentQueryObservable = asyncClient
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
@@ -448,6 +459,7 @@ public void transformObservableToGoogleGuavaListenableFuture() throws Exception
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
+ options.setEnableCrossPartitionQuery(true);
Observable> documentQueryObservable = asyncClient
.queryDocuments(getCollectionLink(), "SELECT * FROM root", options);
diff --git a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/InMemoryGroupbyTest.java b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/InMemoryGroupbyTest.java
index baeac40b2a029..23c97a7dadba0 100644
--- a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/InMemoryGroupbyTest.java
+++ b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/InMemoryGroupbyTest.java
@@ -29,6 +29,7 @@
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.FeedOptions;
+import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
import com.microsoft.azure.cosmosdb.SqlParameter;
import com.microsoft.azure.cosmosdb.SqlParameterCollection;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
@@ -40,6 +41,7 @@
import rx.observables.GroupedObservable;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -66,6 +68,11 @@ public static void setUp() throws Exception {
DocumentCollection collectionDefinition = new DocumentCollection();
collectionDefinition.setId(UUID.randomUUID().toString());
+ PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
+ ArrayList paths = new ArrayList();
+ paths.add("/mypk");
+ partitionKeyDef.setPaths(paths);
+ collectionDefinition.setPartitionKey(partitionKeyDef);
// Create collection
createdCollection = asyncClient
@@ -112,6 +119,7 @@ public void groupByInMemory() {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
+ options.setEnableCrossPartitionQuery(true);
Observable documentsObservable = asyncClient
.queryDocuments(getCollectionLink(),
@@ -144,7 +152,7 @@ public void groupByInMemory_MoreDetail() {
int requestPageSize = 3;
FeedOptions options = new FeedOptions();
options.setMaxItemCount(requestPageSize);
-
+ options.setEnableCrossPartitionQuery(true);
Observable documentsObservable = asyncClient
.queryDocuments(getCollectionLink(),
diff --git a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/TokenResolverTest.java b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/TokenResolverTest.java
index c33f14fdc2b38..4c56170eb4972 100644
--- a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/TokenResolverTest.java
+++ b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/TokenResolverTest.java
@@ -31,6 +31,8 @@
import com.microsoft.azure.cosmosdb.Database;
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentCollection;
+import com.microsoft.azure.cosmosdb.PartitionKey;
+import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
import com.microsoft.azure.cosmosdb.Permission;
import com.microsoft.azure.cosmosdb.PermissionMode;
import com.microsoft.azure.cosmosdb.RequestOptions;
@@ -86,6 +88,11 @@ public void setUp() {
DocumentCollection collectionDefinition = new DocumentCollection();
collectionDefinition.setId(UUID.randomUUID().toString());
+ PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
+ ArrayList paths = new ArrayList();
+ paths.add("/mypk");
+ partitionKeyDef.setPaths(paths);
+ collectionDefinition.setPartitionKey(partitionKeyDef);
// Create database
createdDatabase = Utils.createDatabaseForTest(asyncClient);
@@ -163,6 +170,7 @@ public void readDocumentThroughTokenResolver() throws Exception {
.build();
RequestOptions requestOptions = new RequestOptions();
requestOptions.setProperties(properties);
+ requestOptions.setPartitionKey(PartitionKey.None);
Observable> readDocumentObservable = asyncClientWithTokenResolver
.readDocument(documentLink, requestOptions);
readDocumentObservable.subscribe(resourceResponse -> {
@@ -204,6 +212,7 @@ public void deleteDocumentThroughTokenResolver() throws Exception {
RequestOptions requestOptions = new RequestOptions();
requestOptions.setProperties(properties);
+ requestOptions.setPartitionKey(PartitionKey.None);
Observable> readDocumentObservable = asyncClientWithTokenResolver
.deleteDocument(documentLink, requestOptions);
readDocumentObservable.subscribe(resourceResponse -> {
diff --git a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/UniqueIndexAsyncAPITest.java b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/UniqueIndexAsyncAPITest.java
index e253729f1ee4b..a1e8e3cdf4972 100644
--- a/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/UniqueIndexAsyncAPITest.java
+++ b/examples/src/test/java/com/microsoft/azure/cosmosdb/rx/examples/UniqueIndexAsyncAPITest.java
@@ -30,6 +30,7 @@
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.DocumentCollection;
+import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
import com.microsoft.azure.cosmosdb.ResourceResponse;
import com.microsoft.azure.cosmosdb.UniqueKey;
import com.microsoft.azure.cosmosdb.UniqueKeyPolicy;
@@ -40,6 +41,7 @@
import rx.Observable;
import rx.observers.TestSubscriber;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
@@ -62,6 +64,11 @@ public void uniqueIndex() {
uniqueKey.setPaths(ImmutableList.of("/name", "/field"));
uniqueKeyPolicy.setUniqueKeys(Collections.singleton(uniqueKey));
collectionDefinition.setUniqueKeyPolicy(uniqueKeyPolicy);
+ PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
+ ArrayList paths = new ArrayList();
+ paths.add("/mypk");
+ partitionKeyDef.setPaths(paths);
+ collectionDefinition.setPartitionKey(partitionKeyDef);
DocumentCollection collection = client.createCollection(getDatabaseLink(), collectionDefinition, null).toBlocking().single().getResource();
diff --git a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextFactory.java b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextFactory.java
index 4b4b2891dfe58..32a99af6e715a 100644
--- a/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextFactory.java
+++ b/gateway/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentQueryExecutionContextFactory.java
@@ -25,8 +25,10 @@
import java.util.List;
import java.util.UUID;
+import com.microsoft.azure.cosmosdb.BridgeInternal;
import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.FeedOptions;
+import com.microsoft.azure.cosmosdb.PartitionKey;
import com.microsoft.azure.cosmosdb.PartitionKeyRange;
import com.microsoft.azure.cosmosdb.Resource;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
@@ -86,17 +88,21 @@ public static Observable extends IDocumentQueryExecutionC
// PipelinedDocumentQueryExecutionContext by providing the partition query execution info that's needed(which we get from the exception returned from Gateway).
Observable> proxyQueryExecutionContext =
- collectionObs.flatMap(collection ->
- ProxyDocumentQueryExecutionContext.createAsync(
- client,
- resourceTypeEnum,
- resourceType,
- query,
- feedOptions,
- resourceLink,
- collection,
- isContinuationExpected,
- correlatedActivityId));
+ collectionObs.flatMap(collection -> {
+ if (feedOptions != null && feedOptions.getPartitionKey() != null && feedOptions.getPartitionKey().equals(PartitionKey.None)) {
+ feedOptions.setPartitionKey(BridgeInternal.getPartitionKey(BridgeInternal.getNonePartitionKey(collection.getPartitionKey())));
+ }
+ return ProxyDocumentQueryExecutionContext.createAsync(
+ client,
+ resourceTypeEnum,
+ resourceType,
+ query,
+ feedOptions,
+ resourceLink,
+ collection,
+ isContinuationExpected,
+ correlatedActivityId);
+ });
return proxyQueryExecutionContext;
}
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosClient.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosClient.java
index 920df75cb493c..5ce31ded234ba 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosClient.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosClient.java
@@ -26,13 +26,17 @@
import com.microsoft.azure.cosmosdb.ConnectionPolicy;
import com.microsoft.azure.cosmosdb.ConsistencyLevel;
import com.microsoft.azure.cosmosdb.Database;
+import com.microsoft.azure.cosmosdb.DatabaseAccount;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.FeedResponse;
import com.microsoft.azure.cosmosdb.Permission;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
+import com.microsoft.azure.cosmosdb.TokenResolver;
import com.microsoft.azure.cosmosdb.internal.HttpConstants;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
+import com.microsoft.azure.cosmosdb.rx.internal.Configs;
+
import hu.akarnokd.rxjava.interop.RxJavaInterop;
import reactor.adapter.rxjava.RxJava2Adapter;
import reactor.core.publisher.Flux;
@@ -48,25 +52,31 @@
public class CosmosClient {
//Document client wrapper
+ private final Configs configs;
private final AsyncDocumentClient asyncDocumentClient;
private final String serviceEndpoint;
private final String keyOrResourceToken;
private final ConnectionPolicy connectionPolicy;
private final ConsistencyLevel desiredConsistencyLevel;
private final List permissions;
+ private final TokenResolver tokenResolver;
CosmosClient(CosmosClientBuilder builder) {
- this.serviceEndpoint = builder.getServiceEndpoint();
+ this.configs = builder.getConfigs();
+ this.serviceEndpoint = builder.getServiceEndpoint();
this.keyOrResourceToken = builder.getKeyOrResourceToken();
this.connectionPolicy = builder.getConnectionPolicy();
this.desiredConsistencyLevel = builder.getDesiredConsistencyLevel();
this.permissions = builder.getPermissions();
+ this.tokenResolver = builder.getTokenResolver();
this.asyncDocumentClient = new AsyncDocumentClient.Builder()
.withServiceEndpoint(this.serviceEndpoint)
.withMasterKeyOrResourceToken(this.keyOrResourceToken)
.withConnectionPolicy(this.connectionPolicy)
.withConsistencyLevel(this.desiredConsistencyLevel)
+ .withConfigs(this.configs)
+ .withTokenResolver(this.tokenResolver)
.build();
}
@@ -130,6 +140,22 @@ AsyncDocumentClient getDocClientWrapper(){
return asyncDocumentClient;
}
+ /**
+ * Gets the configs
+ * @return the configs
+ */
+ public Configs getConfigs() {
+ return configs;
+ }
+
+ /**
+ * Gets the token resolver
+ * @return the token resolver
+ */
+ public TokenResolver getTokenResolver() {
+ return tokenResolver;
+ }
+
/**
* Create a Database if it does not already exist on the service
*
@@ -283,6 +309,10 @@ public Flux> queryDatabases(SqlQuerySpec qu
response.getResponseHeaders()))));
}
+ Mono getDatabaseAccount() {
+ return RxJava2Adapter.singleToMono(RxJavaInterop.toV2Single(asyncDocumentClient.getDatabaseAccount().toSingle()));
+ }
+
/**
* Gets a database object without making a service call.
*
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosClientBuilder.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosClientBuilder.java
index eb0266bc6ecf8..34143019f2f3d 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosClientBuilder.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosClientBuilder.java
@@ -25,6 +25,8 @@
import com.microsoft.azure.cosmosdb.ConnectionPolicy;
import com.microsoft.azure.cosmosdb.ConsistencyLevel;
import com.microsoft.azure.cosmosdb.Permission;
+import com.microsoft.azure.cosmosdb.TokenResolver;
+import com.microsoft.azure.cosmosdb.rx.internal.Configs;
import java.util.List;
@@ -47,15 +49,37 @@
*/
public class CosmosClientBuilder {
+ private Configs configs = new Configs();
private String serviceEndpoint;
private String keyOrResourceToken;
private ConnectionPolicy connectionPolicy;
private ConsistencyLevel desiredConsistencyLevel;
private List permissions;
+ private TokenResolver tokenResolver;
CosmosClientBuilder() {
}
+ /**
+ * Configs
+ * @param configs
+ * @return current builder
+ */
+ public CosmosClientBuilder configs(Configs configs) {
+ this.configs = configs;
+ return this;
+ }
+
+ /**
+ * Token Resolver
+ * @param tokenResolver
+ * @return current builder
+ */
+ public CosmosClientBuilder tokenResolver(TokenResolver tokenResolver) {
+ this.tokenResolver = tokenResolver;
+ return this;
+ }
+
/**
* The service endpoint url
* @param serviceEndpoint the service endpoint
@@ -138,15 +162,23 @@ String getKeyOrResourceToken() {
return keyOrResourceToken;
}
- ConnectionPolicy getConnectionPolicy() {
+ public ConnectionPolicy getConnectionPolicy() {
return connectionPolicy;
}
- ConsistencyLevel getDesiredConsistencyLevel() {
+ public ConsistencyLevel getDesiredConsistencyLevel() {
return desiredConsistencyLevel;
}
List getPermissions() {
return permissions;
}
+
+ public Configs getConfigs() {
+ return configs;
+ }
+
+ public TokenResolver getTokenResolver() {
+ return tokenResolver;
+ }
}
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainer.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainer.java
index e7d31c656818d..9f168dbd83d34 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainer.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosContainer.java
@@ -31,8 +31,11 @@
import com.microsoft.azure.cosmosdb.StoredProcedure;
import com.microsoft.azure.cosmosdb.Trigger;
import com.microsoft.azure.cosmosdb.UserDefinedFunction;
+import com.microsoft.azure.cosmosdb.internal.Constants;
import com.microsoft.azure.cosmosdb.internal.Paths;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
+import com.microsoft.azure.cosmosdb.internal.routing.PartitionKeyInternal;
+
import hu.akarnokd.rxjava.interop.RxJavaInterop;
import reactor.adapter.rxjava.RxJava2Adapter;
import reactor.core.publisher.Flux;
@@ -87,11 +90,11 @@ public static String getSelfLink(CosmosContainer cosmosContainer) {
* Reads the document container
*
* After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a single cossmos container response with the read
+ * The {@link Mono} upon successful completion will contain a single cosmos container response with the read
* container.
* In case of failure the {@link Mono} will error.
*
- * @return an {@link Mono} containing the single cossmos container response with the read container or an error.
+ * @return an {@link Mono} containing the single cosmos container response with the read container or an error.
*/
public Mono read() {
return read(new CosmosContainerRequestOptions());
@@ -101,10 +104,11 @@ public Mono read() {
* Reads the document container by the container link.
*
* After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a single cossmos container response with the read container.
+ * The {@link Mono} upon successful completion will contain a single cosmos container response with the read container.
* In case of failure the {@link Mono} will error.
- * @param options the cosmos container request options
- * @return an {@link Mono} containing the single cossmos container response with the read container or an error.
+ *
+ * @param options The cosmos container request options.
+ * @return an {@link Mono} containing the single cosmos container response with the read container or an error.
*/
public Mono read(CosmosContainerRequestOptions options) {
if (options == null) {
@@ -119,11 +123,11 @@ public Mono read(CosmosContainerRequestOptions options)
* Deletes the item container
*
* After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a single cossmos container response for the deleted database.
+ * The {@link Mono} upon successful completion will contain a single cosmos container response for the deleted database.
* In case of failure the {@link Mono} will error.
*
* @param options the request options.
- * @return an {@link Mono} containing the single cossmos container response for the deleted database or an error.
+ * @return an {@link Mono} containing the single cosmos container response for the deleted database or an error.
*/
public Mono delete(CosmosContainerRequestOptions options) {
if (options == null) {
@@ -143,7 +147,7 @@ public Mono delete(CosmosContainerRequestOptions option
* The {@link Mono} upon successful completion will contain a single cosmos container response for the deleted container.
* In case of failure the {@link Mono} will error.
*
- * @return an {@link Mono} containing the single cossmos container response for the deleted container or an error.
+ * @return an {@link Mono} containing the single cosmos container response for the deleted container or an error.
*/
public Mono delete() {
return delete(new CosmosContainerRequestOptions());
@@ -153,12 +157,12 @@ public Mono delete() {
* Replaces a document container.
*
* After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a single cossmos container response with the replaced document container.
+ * The {@link Mono} upon successful completion will contain a single cosmos container response with the replaced document container.
* In case of failure the {@link Mono} will error.
*
* @param containerSettings the item container settings
* @param options the cosmos container request options.
- * @return an {@link Mono} containing the single cossmos container response with the replaced document container or an error.
+ * @return an {@link Mono} containing the single cosmos container response with the replaced document container or an error.
*/
public Mono replace(CosmosContainerSettings containerSettings,
CosmosContainerRequestOptions options) {
@@ -186,7 +190,7 @@ public Mono replace(CosmosContainerSettings containerSe
* @return an {@link Mono} containing the single resource response with the created cosmos item or an error.
*/
public Mono createItem(Object item){
- return createItem(item, null);
+ return createItem(item, new CosmosItemRequestOptions());
}
/**
@@ -232,6 +236,35 @@ public Mono createItem(Object item, CosmosItemRequestOptions
.toSingle()));
}
+ /**
+ * Upserts an item.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a single resource response with the upserted item.
+ * In case of failure the {@link Mono} will error.
+ *
+ * @param item the item represented as a POJO or Item object to upsert.
+ * @return an {@link Mono} containing the single resource response with the upserted document or an error.
+ */
+ public Mono upsertItem(Object item) {
+ return upsertItem(item, new CosmosItemRequestOptions());
+ }
+
+ /**
+ * Upserts an item.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a single resource response with the upserted item.
+ * In case of failure the {@link Mono} will error.
+ *
+ * @param item the item represented as a POJO or Item object to upsert.
+ * @param partitionKey the partitionKey to be used.
+ * @return an {@link Mono} containing the single resource response with the upserted document or an error.
+ */
+ public Mono upsertItem(Object item, Object partitionKey) {
+ return upsertItem(item, new CosmosItemRequestOptions(partitionKey));
+ }
+
/**
* Upserts a cosmos item.
*
@@ -362,6 +395,21 @@ public CosmosItem getItem(String id, Object partitionKey){
/* CosmosStoredProcedure operations */
+ /**
+ * Creates a cosmos stored procedure.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a single cosmos stored procedure response with the
+ * created cosmos stored procedure.
+ * In case of failure the {@link Mono} will error.
+ *
+ * @param settings the cosmos stored procedure settings.
+ * @return an {@link Mono} containing the single cosmos stored procedure resource response or an error.
+ */
+ public Mono createStoredProcedure(CosmosStoredProcedureSettings settings){
+ return this.createStoredProcedure(settings, new CosmosStoredProcedureRequestOptions());
+ }
+
/**
* Creates a cosmos stored procedure.
*
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabase.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabase.java
index e8306ff8bc7f0..75fb16970a13e 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabase.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosDatabase.java
@@ -282,15 +282,19 @@ public Flux> queryContainers(SqlQuerySpec
public CosmosContainer getContainer(String id) {
return new CosmosContainer(id, this);
}
-
+
/** User operations **/
+ public Mono createUser(CosmosUserSettings settings) {
+ return this.createUser(settings, null);
+ }
+
/**
* Creates a user
* After subscription the operation will be performed.
* The {@link Mono} upon successful completion will contain a single resource response with the created user.
* In case of failure the {@link Mono} will error.
- *
+ *
* @param settings the cosmos user settings
* @param options the request options
* @return an {@link Mono} containing the single resource response with the created cosmos user or an error.
@@ -298,7 +302,11 @@ public CosmosContainer getContainer(String id) {
public Mono createUser(CosmosUserSettings settings, RequestOptions options){
return RxJava2Adapter.singleToMono(RxJavaInterop.toV2Single(getDocClientWrapper().createUser(this.getLink(),
settings.getV2User(), options).map(response ->
- new CosmosUserResponse(response, this)).toSingle()));
+ new CosmosUserResponse(response, this)).toSingle()));
+ }
+
+ public Mono upsertUser(CosmosUserSettings settings) {
+ return this.upsertUser(settings, null);
}
/**
@@ -306,7 +314,7 @@ public Mono createUser(CosmosUserSettings settings, RequestO
* After subscription the operation will be performed.
* The {@link Mono} upon successful completion will contain a single resource response with the created user.
* In case of failure the {@link Mono} will error.
- *
+ *
* @param settings the cosmos user settings
* @param options the request options
* @return an {@link Mono} containing the single resource response with the upserted user or an error.
@@ -317,6 +325,10 @@ public Mono upsertUser(CosmosUserSettings settings, RequestO
new CosmosUserResponse(response, this)).toSingle()));
}
+ public Flux> listUsers() {
+ return listUsers(new FeedOptions());
+ }
+
/**
* Reads all cosmos users in a database.
*
@@ -335,10 +347,14 @@ public Flux> listUsers(FeedOptions options){
response.getResponseHeaders()))));
}
+ public Flux> queryUsers(String query, FeedOptions options){
+ return queryUsers(new SqlQuerySpec(query), options);
+ }
+
/**
* Query for cosmos users in a database.
*
- * After subscription the operation will be performed.
+ * After subscription the operation will be performed.
* The {@link Flux} will contain one or several feed response of the obtained users.
* In case of failure the {@link Flux} will error.
*
@@ -355,6 +371,10 @@ public Flux> queryUsers(SqlQuerySpec querySpec,
response.getResponseHeaders(), response.getQueryMetrics()))));
}
+ public CosmosUser getUser(String id) {
+ return new CosmosUser(id, this);
+ }
+
CosmosClient getClient() {
return client;
}
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItem.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItem.java
index fca34f5d8480c..883abecd108f8 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItem.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItem.java
@@ -1,163 +1,162 @@
-/*
- * The MIT License (MIT)
- * Copyright (c) 2018 Microsoft Corporation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-package com.microsoft.azure.cosmos;
-
-import com.microsoft.azure.cosmosdb.Document;
-import com.microsoft.azure.cosmosdb.RequestOptions;
-import com.microsoft.azure.cosmosdb.internal.Paths;
-import hu.akarnokd.rxjava.interop.RxJavaInterop;
-import reactor.adapter.rxjava.RxJava2Adapter;
-import reactor.core.publisher.Mono;
-
-public class CosmosItem extends CosmosResource{
- private Object partitionKey;
- private CosmosContainer container;
-
- CosmosItem(String id, Object partitionKey, CosmosContainer container) {
- super(id);
- this.partitionKey = partitionKey;
- this.container = container;
- }
-
- /**
- * Reads an item.
- *
- * After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a cosmos item response with the read item
- * In case of failure the {@link Mono} will error.
- *
- * @return an {@link Mono} containing the cosmos item response with the read item or an error
- */
- public Mono read() {
- return read(new CosmosItemRequestOptions(partitionKey));
- }
-
- /**
- * Reads an item.
- *
- * After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a cosmos item response with the read item
- * In case of failure the {@link Mono} will error.
- *
- * @param options the request comosItemRequestOptions
- * @return an {@link Mono} containing the cosmos item response with the read item or an error
- */
- public Mono read(CosmosItemRequestOptions options) {
- if (options == null) {
- options = new CosmosItemRequestOptions();
- }
- RequestOptions requestOptions = options.toRequestOptions();
- return RxJava2Adapter.singleToMono(RxJavaInterop.toV2Single(container.getDatabase().getDocClientWrapper()
- .readDocument(getLink(), requestOptions)
- .map(response -> new CosmosItemResponse(response, requestOptions.getPartitionKey(), container))
- .toSingle()));
- }
-
- /**
- * Replaces an item with the passed in item.
- *
- * After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item.
- * In case of failure the {@link Mono} will error.
- *
- * @param item the item to replace (containing the document id).
- * @return an {@link Mono} containing the cosmos item resource response with the replaced item or an error.
- */
- public Mono replace(Object item){
- return replace(item, new CosmosItemRequestOptions(partitionKey));
- }
-
- /**
- * Replaces an item with the passed in item.
- *
- * After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item.
- * In case of failure the {@link Mono} will error.
- *
- * @param item the item to replace (containing the document id).
- * @param options the request comosItemRequestOptions
- * @return an {@link Mono} containing the cosmos item resource response with the replaced item or an error.
- */
- public Mono replace(Object item, CosmosItemRequestOptions options){
- Document doc = CosmosItemSettings.fromObject(item);
- if (options == null) {
- options = new CosmosItemRequestOptions();
- }
- RequestOptions requestOptions = options.toRequestOptions();
- return RxJava2Adapter.singleToMono(RxJavaInterop.toV2Single(container.getDatabase()
- .getDocClientWrapper()
- .replaceDocument(getLink(), doc, requestOptions)
- .map(response -> new CosmosItemResponse(response, requestOptions.getPartitionKey(), container))
- .toSingle()));
- }
-
- /**
- * Deletes the item.
- *
- * After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item.
- * In case of failure the {@link Mono} will error.
- * @return an {@link Mono} containing the cosmos item resource response.
- */
- public Mono delete() {
- return delete(new CosmosItemRequestOptions(partitionKey));
- }
-
- /**
- * Deletes the item.
- *
- * After subscription the operation will be performed.
- * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item.
- * In case of failure the {@link Mono} will error.
- *
- * @param options the request options
- * @return an {@link Mono} containing the cosmos item resource response.
- */
- public Mono delete(CosmosItemRequestOptions options){
- if (options == null) {
- options = new CosmosItemRequestOptions();
- }
- RequestOptions requestOptions = options.toRequestOptions();
- return RxJava2Adapter.singleToMono(
- RxJavaInterop.toV2Single(container.getDatabase()
- .getDocClientWrapper()
- .deleteDocument(getLink(), requestOptions)
- .map(response -> new CosmosItemResponse(response, requestOptions.getPartitionKey(), container))
- .toSingle()));
- }
-
- void setContainer(CosmosContainer container) {
- this.container = container;
- }
-
- @Override
- protected String getURIPathSegment() {
- return Paths.DOCUMENTS_PATH_SEGMENT;
- }
-
- @Override
- protected String getParentLink() {
- return this.container.getLink();
- }
-
-}
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2018 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmos;
+
+import com.microsoft.azure.cosmosdb.Document;
+import com.microsoft.azure.cosmosdb.RequestOptions;
+import com.microsoft.azure.cosmosdb.internal.Paths;
+import hu.akarnokd.rxjava.interop.RxJavaInterop;
+import reactor.adapter.rxjava.RxJava2Adapter;
+import reactor.core.publisher.Mono;
+
+public class CosmosItem extends CosmosResource{
+ private Object partitionKey;
+ private CosmosContainer container;
+
+ CosmosItem(String id, Object partitionKey, CosmosContainer container) {
+ super(id);
+ this.partitionKey = partitionKey;
+ this.container = container;
+ }
+
+ /**
+ * Reads an item.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a cosmos item response with the read item
+ * In case of failure the {@link Mono} will error.
+ *
+ * @return an {@link Mono} containing the cosmos item response with the read item or an error
+ */
+ public Mono read() {
+ return read(new CosmosItemRequestOptions(partitionKey));
+ }
+
+ /**
+ * Reads an item.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a cosmos item response with the read item
+ * In case of failure the {@link Mono} will error.
+ *
+ * @param options the request comosItemRequestOptions
+ * @return an {@link Mono} containing the cosmos item response with the read item or an error
+ */
+ public Mono read(CosmosItemRequestOptions options) {
+ if (options == null) {
+ options = new CosmosItemRequestOptions();
+ }
+ RequestOptions requestOptions = options.toRequestOptions();
+ return RxJava2Adapter.singleToMono(RxJavaInterop.toV2Single(container.getDatabase().getDocClientWrapper()
+ .readDocument(getLink(), requestOptions)
+ .map(response -> new CosmosItemResponse(response, requestOptions.getPartitionKey(), container))
+ .toSingle()));
+ }
+
+ /**
+ * Replaces an item with the passed in item.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item.
+ * In case of failure the {@link Mono} will error.
+ *
+ * @param item the item to replace (containing the document id).
+ * @return an {@link Mono} containing the cosmos item resource response with the replaced item or an error.
+ */
+ public Mono replace(Object item){
+ return replace(item, new CosmosItemRequestOptions(partitionKey));
+ }
+
+ /**
+ * Replaces an item with the passed in item.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item.
+ * In case of failure the {@link Mono} will error.
+ *
+ * @param item the item to replace (containing the document id).
+ * @param options the request comosItemRequestOptions
+ * @return an {@link Mono} containing the cosmos item resource response with the replaced item or an error.
+ */
+ public Mono replace(Object item, CosmosItemRequestOptions options){
+ Document doc = CosmosItemSettings.fromObject(item);
+ if (options == null) {
+ options = new CosmosItemRequestOptions();
+ }
+ RequestOptions requestOptions = options.toRequestOptions();
+ return RxJava2Adapter.singleToMono(RxJavaInterop.toV2Single(container.getDatabase()
+ .getDocClientWrapper()
+ .replaceDocument(getLink(), doc, requestOptions)
+ .map(response -> new CosmosItemResponse(response, requestOptions.getPartitionKey(), container))
+ .toSingle()));
+ }
+
+ /**
+ * Deletes the item.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item.
+ * In case of failure the {@link Mono} will error.
+ * @return an {@link Mono} containing the cosmos item resource response.
+ */
+ public Mono delete() {
+ return delete(new CosmosItemRequestOptions(partitionKey));
+ }
+
+ /**
+ * Deletes the item.
+ *
+ * After subscription the operation will be performed.
+ * The {@link Mono} upon successful completion will contain a single cosmos item response with the replaced item.
+ * In case of failure the {@link Mono} will error.
+ *
+ * @param options the request options
+ * @return an {@link Mono} containing the cosmos item resource response.
+ */
+ public Mono delete(CosmosItemRequestOptions options){
+ if (options == null) {
+ options = new CosmosItemRequestOptions();
+ }
+ RequestOptions requestOptions = options.toRequestOptions();
+ return RxJava2Adapter.singleToMono(
+ RxJavaInterop.toV2Single(container.getDatabase()
+ .getDocClientWrapper()
+ .deleteDocument(getLink(), requestOptions)
+ .map(response -> new CosmosItemResponse(response, requestOptions.getPartitionKey(), container))
+ .toSingle()));
+ }
+
+ void setContainer(CosmosContainer container) {
+ this.container = container;
+ }
+
+ @Override
+ protected String getURIPathSegment() {
+ return Paths.DOCUMENTS_PATH_SEGMENT;
+ }
+
+ @Override
+ protected String getParentLink() {
+ return this.container.getLink();
+ }
+}
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemResponse.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemResponse.java
index f150206113cf8..5d4024d6bc004 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemResponse.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosItemResponse.java
@@ -1,57 +1,65 @@
-/*
- * The MIT License (MIT)
- * Copyright (c) 2018 Microsoft Corporation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-package com.microsoft.azure.cosmos;
-
-import com.microsoft.azure.cosmosdb.Document;
-import com.microsoft.azure.cosmosdb.PartitionKey;
-import com.microsoft.azure.cosmosdb.ResourceResponse;
-
-public class CosmosItemResponse extends CosmosResponse{
- private CosmosItem itemClient;
-
- CosmosItemResponse(ResourceResponse response, PartitionKey partitionKey, CosmosContainer container) {
- super(response);
- if(response.getResource() == null){
- super.setResourceSettings(null);
- }else{
- super.setResourceSettings(new CosmosItemSettings(response.getResource().toJson()));
- itemClient = new CosmosItem(response.getResource().getId(),partitionKey, container);
- }
- }
-
- /**
- * Gets the itemSettings
- * @return the itemSettings
- */
- public CosmosItemSettings getCosmosItemSettings() {
- return getResourceSettings();
- }
-
- /**
- * Gets the CosmosItem
- * @return the cosmos item
- */
- public CosmosItem getItem() {
- return itemClient;
- }
-}
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2018 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmos;
+
+import com.microsoft.azure.cosmosdb.Document;
+import com.microsoft.azure.cosmosdb.PartitionKey;
+import com.microsoft.azure.cosmosdb.ResourceResponse;
+
+public class CosmosItemResponse extends CosmosResponse{
+ private CosmosItem itemClient;
+
+ CosmosItemResponse(ResourceResponse response, PartitionKey partitionKey, CosmosContainer container) {
+ super(response);
+ if(response.getResource() == null){
+ super.setResourceSettings(null);
+ }else{
+ super.setResourceSettings(new CosmosItemSettings(response.getResource().toJson()));
+ itemClient = new CosmosItem(response.getResource().getId(),partitionKey, container);
+ }
+ }
+
+ /**
+ * Gets the itemSettings
+ * @return the itemSettings
+ */
+ public CosmosItemSettings getCosmosItemSettings() {
+ return getResourceSettings();
+ }
+
+ /**
+ * Gets the CosmosItem
+ * @return the cosmos item
+ */
+ public CosmosItem getItem() {
+ return itemClient;
+ }
+
+ /**
+ * Gets the CosmosItem
+ * @return the cosmos item
+ */
+ public CosmosItem getCosmosItem() {
+ return itemClient;
+ }
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosTriggerResponse.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosTriggerResponse.java
index 6dd0b4593f36c..cbdaa7130016b 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosTriggerResponse.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosTriggerResponse.java
@@ -33,6 +33,7 @@ public class CosmosTriggerResponse extends CosmosResponse
CosmosTriggerResponse(ResourceResponse response, CosmosContainer container) {
super(response);
if(response.getResource() != null) {
+ super.setResourceSettings(new CosmosTriggerSettings(response));
cosmosTriggerSettings = new CosmosTriggerSettings(response);
cosmosTrigger = new CosmosTrigger(cosmosTriggerSettings.getId(), container);
}
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUser.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUser.java
index 511f1654e5626..fc8b3cc7f8f1a 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUser.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUser.java
@@ -15,7 +15,14 @@ public CosmosUser(String id, CosmosDatabase database) {
/**
* Reads a cosmos user
- *
+ * @return an {@link Mono} containing the single cosmos user response with the read user or an error.
+ */
+ public Mono read() {
+ return this.read(null);
+ }
+
+ /**
+ * Reads a cosmos user
* @param options the request options
* @return a {@link Mono} containing the single resource response with the read user or an error.
*/
@@ -59,4 +66,4 @@ protected String getURIPathSegment() {
protected String getParentLink() {
return database.getLink() ;
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserDefinedFunctionResponse.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserDefinedFunctionResponse.java
index 2b4f301b9a45d..59251a9f93c12 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserDefinedFunctionResponse.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserDefinedFunctionResponse.java
@@ -1,56 +1,57 @@
-/*
- * The MIT License (MIT)
- * Copyright (c) 2018 Microsoft Corporation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-package com.microsoft.azure.cosmos;
-
-import com.microsoft.azure.cosmosdb.ResourceResponse;
-import com.microsoft.azure.cosmosdb.UserDefinedFunction;
-
-public class CosmosUserDefinedFunctionResponse extends CosmosResponse {
-
- private CosmosUserDefinedFunctionSettings cosmosUserDefinedFunctionSettings;
- private CosmosUserDefinedFunction cosmosUserDefinedFunction;
-
- CosmosUserDefinedFunctionResponse(ResourceResponse response, CosmosContainer container) {
- super(response);
- if(response.getResource() != null) {
- cosmosUserDefinedFunctionSettings = new CosmosUserDefinedFunctionSettings(response);
- cosmosUserDefinedFunction = new CosmosUserDefinedFunction(cosmosUserDefinedFunctionSettings.getId(), container);
- }
- }
-
- /**
- * Gets the cosmos user defined function settings
- * @return the cosmos user defined function settings
- */
- public CosmosUserDefinedFunctionSettings getCosmosUserDefinedFunctionSettings() {
- return cosmosUserDefinedFunctionSettings;
- }
-
- /**
- * Gets the cosmos user defined function object
- * @return the cosmos user defined function object
- */
- public CosmosUserDefinedFunction getCosmosUserDefinedFunction() {
- return cosmosUserDefinedFunction;
- }
-}
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2018 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmos;
+
+import com.microsoft.azure.cosmosdb.ResourceResponse;
+import com.microsoft.azure.cosmosdb.UserDefinedFunction;
+
+public class CosmosUserDefinedFunctionResponse extends CosmosResponse {
+
+ private CosmosUserDefinedFunctionSettings cosmosUserDefinedFunctionSettings;
+ private CosmosUserDefinedFunction cosmosUserDefinedFunction;
+
+ CosmosUserDefinedFunctionResponse(ResourceResponse response, CosmosContainer container) {
+ super(response);
+ if(response.getResource() != null) {
+ super.setResourceSettings(new CosmosUserDefinedFunctionSettings(response));
+ cosmosUserDefinedFunctionSettings = new CosmosUserDefinedFunctionSettings(response);
+ cosmosUserDefinedFunction = new CosmosUserDefinedFunction(cosmosUserDefinedFunctionSettings.getId(), container);
+ }
+ }
+
+ /**
+ * Gets the cosmos user defined function settings
+ * @return the cosmos user defined function settings
+ */
+ public CosmosUserDefinedFunctionSettings getCosmosUserDefinedFunctionSettings() {
+ return cosmosUserDefinedFunctionSettings;
+ }
+
+ /**
+ * Gets the cosmos user defined function object
+ * @return the cosmos user defined function object
+ */
+ public CosmosUserDefinedFunction getCosmosUserDefinedFunction() {
+ return cosmosUserDefinedFunction;
+ }
+}
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserResponse.java b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserResponse.java
index a4b170353c182..6b9f8b9a36ef8 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserResponse.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmos/CosmosUserResponse.java
@@ -34,5 +34,5 @@ public CosmosUserSettings getCosmosUserSettings(){
return getResourceSettings();
}
-
+
}
diff --git a/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java b/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java
index 40c9ba72a7fe5..e79e27b859c4a 100644
--- a/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java
+++ b/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/RxDocumentClientImpl.java
@@ -909,10 +909,6 @@ private Map getRequestHeaders(RequestOptions options) {
headers.put(HttpConstants.HttpHeaders.OFFER_TYPE, options.getOfferType());
}
- if (options.getPartitionKey() != null) {
- headers.put(HttpConstants.HttpHeaders.PARTITION_KEY, options.getPartitionKey().toString());
- }
-
if (options.isPopulateQuotaInfo()) {
headers.put(HttpConstants.HttpHeaders.POPULATE_QUOTA_INFO, String.valueOf(true));
}
@@ -972,7 +968,9 @@ private void addPartitionKeyInformation(RxDocumentServiceRequest request, Docume
PartitionKeyDefinition partitionKeyDefinition = collection.getPartitionKey();
PartitionKeyInternal partitionKeyInternal = null;
- if (options != null && options.getPartitionKey() != null) {
+ if (options != null && options.getPartitionKey() != null && options.getPartitionKey().equals(PartitionKey.None)){
+ partitionKeyInternal = BridgeInternal.getNonePartitionKey(partitionKeyDefinition);
+ } else if (options != null && options.getPartitionKey() != null) {
partitionKeyInternal = options.getPartitionKey().getInternalPartitionKey();
} else if (partitionKeyDefinition == null || partitionKeyDefinition.getPaths().size() == 0) {
// For backward compatibility, if collection doesn't have partition key defined, we assume all documents
@@ -1009,10 +1007,14 @@ private static PartitionKeyInternal extractPartitionKeyValueFromDocument(
if (parts.size() >= 1) {
Object value = document.getObjectByPath(parts);
if (value == null || value.getClass() == ObjectNode.class) {
- value = Undefined.Value();
+ value = BridgeInternal.getNonePartitionKey(partitionKeyDefinition);
}
- return PartitionKeyInternal.fromObjectArray(Collections.singletonList(value), false);
+ if (value instanceof PartitionKeyInternal) {
+ return (PartitionKeyInternal) value;
+ } else {
+ return PartitionKeyInternal.fromObjectArray(Collections.singletonList(value), false);
+ }
}
}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ClientUnderTestBuilder.java b/sdk/src/test/java/com/microsoft/azure/cosmos/ClientUnderTestBuilder.java
similarity index 50%
rename from sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ClientUnderTestBuilder.java
rename to sdk/src/test/java/com/microsoft/azure/cosmos/ClientUnderTestBuilder.java
index 914014c2bb2c7..d7fcc8954e705 100644
--- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/ClientUnderTestBuilder.java
+++ b/sdk/src/test/java/com/microsoft/azure/cosmos/ClientUnderTestBuilder.java
@@ -20,28 +20,39 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
-package com.microsoft.azure.cosmosdb.rx;
+package com.microsoft.azure.cosmos;
+
+import java.net.URI;
+import java.net.URISyntaxException;
-import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentClientUnderTest;
+import com.microsoft.azure.cosmosdb.rx.internal.directconnectivity.ReflectionUtils;
-public class ClientUnderTestBuilder extends Builder {
+public class ClientUnderTestBuilder extends CosmosClientBuilder {
- public ClientUnderTestBuilder(Builder builder) {
- this.configs = builder.configs;
- this.connectionPolicy = builder.connectionPolicy;
- this.desiredConsistencyLevel = builder.desiredConsistencyLevel;
- this.masterKeyOrResourceToken = builder.masterKeyOrResourceToken;
- this.serviceEndpoint = builder.serviceEndpoint;
+ public ClientUnderTestBuilder(CosmosClientBuilder builder) {
+ this.configs(builder.getConfigs());
+ this.connectionPolicy(builder.getConnectionPolicy());
+ this.consistencyLevel(builder.getDesiredConsistencyLevel());
+ this.key(builder.getKeyOrResourceToken());
+ this.endpoint(builder.getServiceEndpoint());
}
@Override
- public RxDocumentClientUnderTest build() {
- return new RxDocumentClientUnderTest(
- this.serviceEndpoint,
- this.masterKeyOrResourceToken,
- this.connectionPolicy,
- this.desiredConsistencyLevel,
- this.configs);
+ public CosmosClient build() {
+ RxDocumentClientUnderTest rxClient;
+ try {
+ rxClient = new RxDocumentClientUnderTest(
+ new URI(this.getServiceEndpoint()),
+ this.getKeyOrResourceToken(),
+ this.getConnectionPolicy(),
+ this.getDesiredConsistencyLevel(),
+ this.getConfigs());
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ CosmosClient cosmosClient = super.build();
+ ReflectionUtils.setAsyncDocumentClient(cosmosClient, rxClient);
+ return cosmosClient;
}
}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosBridgeInternal.java b/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosBridgeInternal.java
new file mode 100644
index 0000000000000..9e12488d5a0c6
--- /dev/null
+++ b/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosBridgeInternal.java
@@ -0,0 +1,34 @@
+package com.microsoft.azure.cosmos;
+
+import com.microsoft.azure.cosmosdb.DatabaseAccount;
+import com.microsoft.azure.cosmosdb.DocumentCollection;
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
+
+import reactor.core.publisher.Mono;
+
+public class CosmosBridgeInternal {
+
+ public static String getLink(CosmosResource resource) {
+ return resource.getLink();
+ }
+
+ public static DocumentCollection toDocumentCollection(CosmosContainerSettings cosmosContainerSettings) {
+ return new DocumentCollection(cosmosContainerSettings.toJson());
+ }
+
+ public static AsyncDocumentClient getAsyncDocumentClient(CosmosClient client) {
+ return client.getDocClientWrapper();
+ }
+
+ public static CosmosDatabase getCosmosDatabaseWithNewClient(CosmosDatabase cosmosDatabase, CosmosClient client) {
+ return new CosmosDatabase(cosmosDatabase.getId(), client);
+ }
+
+ public static CosmosContainer getCosmosContainerWithNewClient(CosmosContainer cosmosContainer, CosmosDatabase cosmosDatabase, CosmosClient client) {
+ return new CosmosContainer(cosmosContainer.getId(), CosmosBridgeInternal.getCosmosDatabaseWithNewClient(cosmosDatabase, client));
+ }
+
+ public static Mono getDatabaseAccount(CosmosClient client) {
+ return client.getDatabaseAccount();
+ }
+}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosDatabaseForTest.java b/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosDatabaseForTest.java
new file mode 100644
index 0000000000000..925f1836878f3
--- /dev/null
+++ b/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosDatabaseForTest.java
@@ -0,0 +1,137 @@
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2018 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.microsoft.azure.cosmos;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.cosmos.CosmosDatabaseSettings;
+import com.microsoft.azure.cosmosdb.FeedResponse;
+import com.microsoft.azure.cosmosdb.SqlParameter;
+import com.microsoft.azure.cosmosdb.SqlParameterCollection;
+import com.microsoft.azure.cosmosdb.SqlQuerySpec;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CosmosDatabaseForTest {
+ private static Logger logger = LoggerFactory.getLogger(CosmosDatabaseForTest.class);
+ public static final String SHARED_DB_ID_PREFIX = "RxJava.SDKTest.SharedDatabase";
+ private static final Duration CLEANUP_THRESHOLD_DURATION = Duration.ofHours(2);
+ private static final String DELIMITER = "_";
+ private static DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss");
+
+ public LocalDateTime createdTime;
+ public CosmosDatabase createdDatabase;
+
+ private CosmosDatabaseForTest(CosmosDatabase db, LocalDateTime createdTime) {
+ this.createdDatabase = db;
+ this.createdTime = createdTime;
+ }
+
+ private boolean isStale() {
+ return isOlderThan(CLEANUP_THRESHOLD_DURATION);
+ }
+
+ private boolean isOlderThan(Duration dur) {
+ return createdTime.isBefore(LocalDateTime.now().minus(dur));
+ }
+
+ public static String generateId() {
+ return SHARED_DB_ID_PREFIX + DELIMITER + TIME_FORMATTER.format(LocalDateTime.now()) + DELIMITER + RandomStringUtils.randomAlphabetic(3);
+ }
+
+ private static CosmosDatabaseForTest from(CosmosDatabase db) {
+ if (db == null || db.getId() == null || db.getLink() == null) {
+ return null;
+ }
+
+ String id = db.getId();
+ if (id == null) {
+ return null;
+ }
+
+ String[] parts = StringUtils.split(id, DELIMITER);
+ if (parts.length != 3) {
+ return null;
+ }
+ if (!StringUtils.equals(parts[0], SHARED_DB_ID_PREFIX)) {
+ return null;
+ }
+
+ try {
+ LocalDateTime parsedTime = LocalDateTime.parse(parts[1], TIME_FORMATTER);
+ return new CosmosDatabaseForTest(db, parsedTime);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ public static CosmosDatabaseForTest create(DatabaseManager client) {
+ CosmosDatabaseSettings dbDef = new CosmosDatabaseSettings(generateId());
+
+ CosmosDatabase db = client.createDatabase(dbDef).block().getDatabase();
+ CosmosDatabaseForTest dbForTest = CosmosDatabaseForTest.from(db);
+ assertThat(dbForTest).isNotNull();
+ return dbForTest;
+ }
+
+ public static void cleanupStaleTestDatabases(DatabaseManager client) {
+ logger.info("Cleaning stale test databases ...");
+ List dbs = client.queryDatabases(
+ new SqlQuerySpec("SELECT * FROM c WHERE STARTSWITH(c.id, @PREFIX)",
+ new SqlParameterCollection(new SqlParameter("@PREFIX", CosmosDatabaseForTest.SHARED_DB_ID_PREFIX))))
+ .flatMap(page -> Flux.fromIterable(page.getResults())).collectList().block();
+
+ for (CosmosDatabaseSettings db : dbs) {
+ assertThat(db.getId()).startsWith(CosmosDatabaseForTest.SHARED_DB_ID_PREFIX);
+
+ CosmosDatabaseForTest dbForTest = CosmosDatabaseForTest.from(client.getDatabase(db.getId()));
+
+ if (db != null && dbForTest.isStale()) {
+ logger.info("Deleting database {}", db.getId());
+ dbForTest.deleteDatabase(db.getId());
+ }
+ }
+ }
+
+ private void deleteDatabase(String id) {
+ this.createdDatabase.delete().block();
+ }
+
+ public interface DatabaseManager {
+ Flux> queryDatabases(SqlQuerySpec query);
+ Mono createDatabase(CosmosDatabaseSettings databaseDefinition);
+ CosmosDatabase getDatabase(String id);
+ }
+}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosPartitionKeyTests.java b/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosPartitionKeyTests.java
new file mode 100644
index 0000000000000..eff5aa465293d
--- /dev/null
+++ b/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosPartitionKeyTests.java
@@ -0,0 +1,331 @@
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2018 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmos;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+import com.microsoft.azure.cosmosdb.ConnectionPolicy;
+import com.microsoft.azure.cosmosdb.Document;
+import com.microsoft.azure.cosmosdb.DocumentCollection;
+import com.microsoft.azure.cosmosdb.FeedOptions;
+import com.microsoft.azure.cosmosdb.FeedResponse;
+import com.microsoft.azure.cosmosdb.PartitionKey;
+import com.microsoft.azure.cosmosdb.RequestOptions;
+import com.microsoft.azure.cosmosdb.internal.BaseAuthorizationTokenProvider;
+import com.microsoft.azure.cosmosdb.internal.HttpConstants;
+import com.microsoft.azure.cosmosdb.internal.OperationType;
+import com.microsoft.azure.cosmosdb.internal.Paths;
+import com.microsoft.azure.cosmosdb.internal.ResourceType;
+import com.microsoft.azure.cosmosdb.internal.Utils;
+import com.microsoft.azure.cosmosdb.rx.FeedResponseListValidator;
+import com.microsoft.azure.cosmosdb.rx.TestConfigurations;
+import com.microsoft.azure.cosmosdb.rx.TestSuiteBase;
+import com.microsoft.azure.cosmosdb.rx.internal.Configs;
+import com.microsoft.azure.cosmosdb.rx.internal.HttpClientFactory;
+import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http.HttpMethod;
+import io.reactivex.netty.client.RxClient;
+import io.reactivex.netty.protocol.http.client.CompositeHttpClient;
+import io.reactivex.netty.protocol.http.client.HttpClientRequest;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import rx.Observable;
+
+public class CosmosPartitionKeyTests extends TestSuiteBase {
+
+ private final static String NON_PARTITIONED_CONTAINER_ID = "NonPartitionContainer" + UUID.randomUUID().toString();
+ private final static String NON_PARTITIONED_CONTAINER_DOCUEMNT_ID = "NonPartitionContainer_Document" + UUID.randomUUID().toString();
+
+ private CosmosClient client;
+ private CosmosDatabase createdDatabase;
+ private CosmosClientBuilder clientBuilder;
+
+ @Factory(dataProvider = "clientBuilders")
+ public CosmosPartitionKeyTests(CosmosClientBuilder clientBuilder) {
+ this.clientBuilder = clientBuilder;
+ }
+
+ @BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
+ public void beforeClass() throws URISyntaxException, IOException {
+ client = clientBuilder.build();
+ createdDatabase = getSharedCosmosDatabase(client);
+ }
+
+ @AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
+ public void afterClass() {
+ safeDeleteDatabase(createdDatabase);
+ safeClose(client);
+ }
+
+ private void createContainerWithoutPk() throws URISyntaxException, IOException {
+ ConnectionPolicy connectionPolicy = new ConnectionPolicy();
+ HttpClientFactory factory = new HttpClientFactory(new Configs())
+ .withMaxIdleConnectionTimeoutInMillis(connectionPolicy.getIdleConnectionTimeoutInMillis())
+ .withPoolSize(connectionPolicy.getMaxPoolSize())
+ .withHttpProxy(connectionPolicy.getProxy())
+ .withRequestTimeoutInMillis(connectionPolicy.getRequestTimeoutInMillis());
+
+ CompositeHttpClient httpClient = factory.toHttpClientBuilder().build();
+
+ // Create a non partitioned collection using the rest API and older version
+ String resourceId = Paths.DATABASES_PATH_SEGMENT + "/" + createdDatabase.getId();
+ String path = Paths.DATABASES_PATH_SEGMENT + "/" + createdDatabase.getId() + "/" + Paths.COLLECTIONS_PATH_SEGMENT + "/";
+ DocumentCollection collection = new DocumentCollection();
+ collection.setId(NON_PARTITIONED_CONTAINER_ID);
+
+ HashMap headers = new HashMap();
+ headers.put(HttpConstants.HttpHeaders.X_DATE, Utils.nowAsRFC1123());
+ headers.put(HttpConstants.HttpHeaders.VERSION, "2018-09-17");
+ BaseAuthorizationTokenProvider base = new BaseAuthorizationTokenProvider(TestConfigurations.MASTER_KEY);
+ String authorization = base.generateKeyAuthorizationSignature(HttpConstants.HttpMethods.POST, resourceId, Paths.COLLECTIONS_PATH_SEGMENT, headers);
+ headers.put(HttpConstants.HttpHeaders.AUTHORIZATION, URLEncoder.encode(authorization, "UTF-8"));
+ RxDocumentServiceRequest request = RxDocumentServiceRequest.create(OperationType.Create,
+ ResourceType.DocumentCollection, path, collection, headers, new RequestOptions());
+
+ String[] baseUrlSplit = TestConfigurations.HOST.split(":");
+ String resourceUri = baseUrlSplit[0] + ":" + baseUrlSplit[1] + ":" + baseUrlSplit[2].split("/")[
+ 0] + "//" + Paths.DATABASES_PATH_SEGMENT + "/" + createdDatabase.getId() + "/" + Paths.COLLECTIONS_PATH_SEGMENT + "/";
+ URI uri = new URI(resourceUri);
+
+ HttpClientRequest httpRequest = HttpClientRequest.create(HttpMethod.POST, uri.toString());
+
+ for (Map.Entry entry : headers.entrySet()) {
+ httpRequest.withHeader(entry.getKey(), entry.getValue());
+ }
+
+ httpRequest.withContent(request.getContent());
+
+ RxClient.ServerInfo serverInfo = new RxClient.ServerInfo(uri.getHost(), uri.getPort());
+
+ InputStream responseStream = httpClient.submit(serverInfo, httpRequest).flatMap(clientResponse -> {
+ return toInputStream(clientResponse.getContent());
+ })
+ .toBlocking().single();
+ String createdContainerAsString = IOUtils.readLines(responseStream, "UTF-8").get(0);
+ assertThat(createdContainerAsString).contains("\"id\":\"" + NON_PARTITIONED_CONTAINER_ID + "\"");
+
+ // Create a document in the non partitioned collection using the rest API and older version
+ resourceId = Paths.DATABASES_PATH_SEGMENT + "/" + createdDatabase.getId() + "/" + Paths.COLLECTIONS_PATH_SEGMENT + "/" + collection.getId();
+ path = Paths.DATABASES_PATH_SEGMENT + "/" + createdDatabase.getId() + "/" + Paths.COLLECTIONS_PATH_SEGMENT
+ + "/" + collection.getId() + "/" + Paths.DOCUMENTS_PATH_SEGMENT + "/";
+ Document document = new Document();
+ document.setId(NON_PARTITIONED_CONTAINER_DOCUEMNT_ID);
+
+ authorization = base.generateKeyAuthorizationSignature(HttpConstants.HttpMethods.POST, resourceId, Paths.DOCUMENTS_PATH_SEGMENT, headers);
+ headers.put(HttpConstants.HttpHeaders.AUTHORIZATION, URLEncoder.encode(authorization, "UTF-8"));
+ request = RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Document, path,
+ document, headers, new RequestOptions());
+
+ resourceUri = baseUrlSplit[0] + ":" + baseUrlSplit[1] + ":" + baseUrlSplit[2].split("/")[0] + "//" + Paths.DATABASES_PATH_SEGMENT + "/"
+ + createdDatabase.getId() + "/" + Paths.COLLECTIONS_PATH_SEGMENT + "/" + collection.getId() + "/" + Paths.DOCUMENTS_PATH_SEGMENT + "/";
+ uri = new URI(resourceUri);
+
+ httpRequest = HttpClientRequest.create(HttpMethod.POST, uri.toString());
+
+ for (Map.Entry entry : headers.entrySet()) {
+ httpRequest.withHeader(entry.getKey(), entry.getValue());
+ }
+
+ httpRequest.withContent(request.getContent());
+
+ serverInfo = new RxClient.ServerInfo(uri.getHost(), uri.getPort());
+
+ responseStream = httpClient.submit(serverInfo, httpRequest).flatMap(clientResponse -> {
+ return toInputStream(clientResponse.getContent());
+ }).toBlocking().single();
+ String createdItemAsString = IOUtils.readLines(responseStream, "UTF-8").get(0);
+ assertThat(createdItemAsString).contains("\"id\":\"" + NON_PARTITIONED_CONTAINER_DOCUEMNT_ID + "\"");
+ }
+
+ @Test(groups = { "simple" }, timeOut = 10 * TIMEOUT)
+ public void testNonPartitionedCollectionOperations() throws Exception {
+ createContainerWithoutPk();
+ CosmosContainer createdContainer = createdDatabase.getContainer(NON_PARTITIONED_CONTAINER_ID);
+
+ Mono readMono = createdContainer.getItem(NON_PARTITIONED_CONTAINER_DOCUEMNT_ID, PartitionKey.None).read();
+ CosmosResponseValidator validator = new CosmosResponseValidator.Builder()
+ .withId(NON_PARTITIONED_CONTAINER_DOCUEMNT_ID).build();
+ validateSuccess(readMono, validator);
+
+ String createdItemId = UUID.randomUUID().toString();
+ Mono createMono = createdContainer.createItem(new CosmosItemSettings("{'id':'" + createdItemId + "'}"));
+ validator = new CosmosResponseValidator.Builder()
+ .withId(createdItemId).build();
+ validateSuccess(createMono, validator);
+
+ readMono = createdContainer.getItem(createdItemId, PartitionKey.None).read();
+ validator = new CosmosResponseValidator.Builder()
+ .withId(createdItemId).build();
+ validateSuccess(readMono, validator);
+
+ CosmosItem itemToReplace = createdContainer.getItem(createdItemId, PartitionKey.None).read().block().getCosmosItem();
+ CosmosItemSettings itemSettingsToReplace = itemToReplace.read().block().getCosmosItemSettings();
+ String replacedItemId = UUID.randomUUID().toString();
+ itemSettingsToReplace.setId(replacedItemId);
+ Mono replaceMono = itemToReplace.replace(itemSettingsToReplace);
+ validator = new CosmosResponseValidator.Builder()
+ .withId(replacedItemId).build();
+ validateSuccess(replaceMono, validator);
+
+ String upsertedItemId = UUID.randomUUID().toString();
+
+ Mono upsertMono = createdContainer.upsertItem(new CosmosItemSettings("{'id':'" + upsertedItemId + "'}"));
+ validator = new CosmosResponseValidator.Builder()
+ .withId(upsertedItemId).build();
+ validateSuccess(upsertMono, validator);
+
+ // one document was created during setup, one with create (which was replaced) and one with upsert
+ FeedOptions feedOptions = new FeedOptions();
+ feedOptions.setPartitionKey(PartitionKey.None);
+ ArrayList expectedIds = new ArrayList();
+ expectedIds.add(NON_PARTITIONED_CONTAINER_DOCUEMNT_ID);
+ expectedIds.add(replacedItemId);
+ expectedIds.add(upsertedItemId);
+ Flux> queryFlux = createdContainer.queryItems("SELECT * from c", feedOptions);
+ FeedResponseListValidator queryValidator = new FeedResponseListValidator.Builder()
+ .totalSize(3)
+ .numberOfPages(1)
+ .containsExactlyIds(expectedIds)
+ .build();
+ validateQuerySuccess(queryFlux, queryValidator);
+
+ queryFlux = createdContainer.listItems(feedOptions);
+ queryValidator = new FeedResponseListValidator.Builder()
+ .totalSize(3)
+ .numberOfPages(1)
+ .containsExactlyIds(expectedIds)
+ .build();
+ validateQuerySuccess(queryFlux, queryValidator);
+
+ String documentCreatedBySprocId = "testDoc";
+ CosmosStoredProcedureSettings sproc = new CosmosStoredProcedureSettings(
+ "{" +
+ " 'id': '" +UUID.randomUUID().toString() + "'," +
+ " 'body':'" +
+ " function() {" +
+ " var client = getContext().getCollection();" +
+ " var doc = client.createDocument(client.getSelfLink(), { \\'id\\': \\'" + documentCreatedBySprocId + "\\'}, {}, function(err, docCreated, options) { " +
+ " if(err) throw new Error(\\'Error while creating document: \\' + err.message);" +
+ " else {" +
+ " getContext().getResponse().setBody(1);" +
+ " }" +
+ " });" +
+ "}'" +
+ "}");
+ CosmosStoredProcedure createdSproc = createdContainer.createStoredProcedure(sproc).block().getStoredProcedure();
+
+ // Partiton Key value same as what is specified in the stored procedure body
+ RequestOptions options = new RequestOptions();
+ options.setPartitionKey(PartitionKey.None);
+ int result = Integer.parseInt(createdSproc.execute(null, options).block().getResponseAsString());
+ assertThat(result).isEqualTo(1);
+
+ // 3 previous items + 1 created from the sproc
+ expectedIds.add(documentCreatedBySprocId);
+ queryFlux = createdContainer.listItems(feedOptions);
+ queryValidator = new FeedResponseListValidator.Builder()
+ .totalSize(4)
+ .numberOfPages(1)
+ .containsExactlyIds(expectedIds)
+ .build();
+ validateQuerySuccess(queryFlux, queryValidator);
+
+ Mono deleteMono = createdContainer.getItem(upsertedItemId, PartitionKey.None).delete();
+ validator = new CosmosResponseValidator.Builder()
+ .nullResource().build();
+ validateSuccess(deleteMono, validator);
+
+ deleteMono = createdContainer.getItem(replacedItemId, PartitionKey.None).delete();
+ validator = new CosmosResponseValidator.Builder()
+ .nullResource().build();
+ validateSuccess(deleteMono, validator);
+
+ deleteMono = createdContainer.getItem(NON_PARTITIONED_CONTAINER_DOCUEMNT_ID, PartitionKey.None).delete();
+ validator = new CosmosResponseValidator.Builder()
+ .nullResource().build();
+ validateSuccess(deleteMono, validator);
+
+ deleteMono = createdContainer.getItem(documentCreatedBySprocId, PartitionKey.None).delete();
+ validator = new CosmosResponseValidator.Builder()
+ .nullResource().build();
+ validateSuccess(deleteMono, validator);
+
+ queryFlux = createdContainer.listItems(feedOptions);
+ queryValidator = new FeedResponseListValidator.Builder()
+ .totalSize(0)
+ .numberOfPages(1)
+ .build();
+ validateQuerySuccess(queryFlux, queryValidator);
+ }
+
+ @Test(groups = { "simple" }, timeOut = TIMEOUT*100)
+ public void testMultiPartitionCollectionReadDocumentWithNoPk() throws InterruptedException {
+ String partitionedCollectionId = "PartitionedCollection" + UUID.randomUUID().toString();
+ String IdOfDocumentWithNoPk = UUID.randomUUID().toString();
+ CosmosContainerSettings containerSettings = new CosmosContainerSettings(partitionedCollectionId, "/mypk");
+ CosmosContainer createdContainer = createdDatabase.createContainer(containerSettings).block().getContainer();
+ CosmosItemSettings cosmosItemSettings = new CosmosItemSettings();
+ cosmosItemSettings.setId(IdOfDocumentWithNoPk);
+ CosmosItem createdItem = createdContainer.createItem(cosmosItemSettings).block().getCosmosItem();
+ CosmosItemRequestOptions options = new CosmosItemRequestOptions();
+ options.setPartitionKey(PartitionKey.None);
+ Mono readMono = createdItem.read(options);
+ CosmosResponseValidator validator = new CosmosResponseValidator.Builder()
+ .withId(IdOfDocumentWithNoPk).build();
+ validateSuccess(readMono, validator);
+ }
+
+ private Observable toInputStream(Observable contentObservable) {
+ return contentObservable.reduce(new ByteArrayOutputStream(), (out, bb) -> {
+ try {
+ bb.readBytes(out, bb.readableBytes());
+ return out;
+ } catch (java.io.IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).map(out -> {
+ return new ByteArrayInputStream(out.toByteArray());
+ });
+ }
+
+}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosResponseValidator.java b/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosResponseValidator.java
index 1dba279227cc7..91a319718fb18 100644
--- a/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosResponseValidator.java
+++ b/sdk/src/test/java/com/microsoft/azure/cosmos/CosmosResponseValidator.java
@@ -1,109 +1,269 @@
-/*
- * The MIT License (MIT)
- * Copyright (c) 2018 Microsoft Corporation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-package com.microsoft.azure.cosmos;
-
-import com.microsoft.azure.cosmosdb.IndexingMode;
-import com.microsoft.azure.cosmosdb.Resource;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public interface CosmosResponseValidator {
- void validate(T cosmosResponse);
-
- class Builder {
- private List> validators = new ArrayList<>();
-
- public CosmosResponseValidator build() {
- return new CosmosResponseValidator() {
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public void validate(T resourceResponse) {
- for (CosmosResponseValidator validator : validators) {
- validator.validate(resourceResponse);
- }
- }
- };
- }
-
- public Builder withId(final String resourceId) {
- validators.add(new CosmosResponseValidator() {
-
- @Override
- public void validate(T resourceResponse) {
- assertThat(getResource(resourceResponse)).isNotNull();
- assertThat(getResource(resourceResponse).getId()).as("check Resource Id").isEqualTo(resourceId);
- }
- });
- return this;
- }
-
- private Resource getResource(T resourceResponse) {
- if(resourceResponse instanceof CosmosDatabaseResponse){
- return ((CosmosDatabaseResponse)resourceResponse).getCosmosDatabaseSettings();
- }else if(resourceResponse instanceof CosmosContainerResponse){
- return ((CosmosContainerResponse)resourceResponse).getCosmosContainerSettings();
- }else if(resourceResponse instanceof CosmosItemResponse){
- return ((CosmosItemResponse)resourceResponse).getCosmosItemSettings();
- }
- return null;
- }
-
- public Builder nullResource() {
- validators.add(new CosmosResponseValidator() {
-
- @Override
- public void validate(T resourceResponse) {
- assertThat(getResource(resourceResponse)).isNull();
- }
- });
- return this;
- }
-
- public Builder indexingMode(IndexingMode mode) {
- validators.add(new CosmosResponseValidator() {
-
- @Override
- public void validate(CosmosContainerResponse resourceResponse) {
- assertThat(resourceResponse.getCosmosContainerSettings()).isNotNull();
- assertThat(resourceResponse.getCosmosContainerSettings().getIndexingPolicy()).isNotNull();
- assertThat(resourceResponse.getCosmosContainerSettings().getIndexingPolicy().getIndexingMode()).isEqualTo(mode);
- }
- });
- return this;
- }
-
- public Builder withProperty(String propertyName, String value) {
- validators.add(new CosmosResponseValidator() {
- @Override
- public void validate(T cosmosResponse) {
- assertThat(getResource(cosmosResponse)).isNotNull();
- assertThat(getResource(cosmosResponse).get(propertyName)).isEqualTo(value);
- }
- });
- return this;
- }
- }
-}
+/*
+ * The MIT License (MIT)
+ * Copyright (c) 2018 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmos;
+
+import com.microsoft.azure.cosmosdb.CompositePath;
+import com.microsoft.azure.cosmosdb.DocumentCollection;
+import com.microsoft.azure.cosmosdb.IndexingMode;
+import com.microsoft.azure.cosmosdb.Resource;
+import com.microsoft.azure.cosmosdb.ResourceResponse;
+import com.microsoft.azure.cosmosdb.SpatialSpec;
+import com.microsoft.azure.cosmosdb.SpatialType;
+import com.microsoft.azure.cosmosdb.StoredProcedure;
+import com.microsoft.azure.cosmosdb.Trigger;
+import com.microsoft.azure.cosmosdb.TriggerOperation;
+import com.microsoft.azure.cosmosdb.TriggerType;
+import com.microsoft.azure.cosmosdb.UserDefinedFunction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public interface CosmosResponseValidator {
+ void validate(T cosmosResponse);
+
+ class Builder {
+ private List> validators = new ArrayList<>();
+
+ public CosmosResponseValidator build() {
+ return new CosmosResponseValidator() {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public void validate(T resourceResponse) {
+ for (CosmosResponseValidator validator : validators) {
+ validator.validate(resourceResponse);
+ }
+ }
+ };
+ }
+
+ public Builder withId(final String resourceId) {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(T resourceResponse) {
+ assertThat(getResource(resourceResponse)).isNotNull();
+ assertThat(getResource(resourceResponse).getId()).as("check Resource Id").isEqualTo(resourceId);
+ }
+ });
+ return this;
+ }
+
+ private Resource getResource(T resourceResponse) {
+ if (resourceResponse instanceof CosmosDatabaseResponse) {
+ return ((CosmosDatabaseResponse)resourceResponse).getCosmosDatabaseSettings();
+ } else if (resourceResponse instanceof CosmosContainerResponse) {
+ return ((CosmosContainerResponse)resourceResponse).getCosmosContainerSettings();
+ } else if (resourceResponse instanceof CosmosItemResponse) {
+ return ((CosmosItemResponse)resourceResponse).getCosmosItemSettings();
+ } else if (resourceResponse instanceof CosmosStoredProcedureResponse) {
+ return ((CosmosStoredProcedureResponse)resourceResponse).getStoredProcedureSettings();
+ } else if (resourceResponse instanceof CosmosTriggerResponse) {
+ return ((CosmosTriggerResponse)resourceResponse).getCosmosTriggerSettings();
+ } else if (resourceResponse instanceof CosmosUserDefinedFunctionResponse) {
+ return ((CosmosUserDefinedFunctionResponse)resourceResponse).getCosmosUserDefinedFunctionSettings();
+ } else if (resourceResponse instanceof CosmosUserResponse) {
+ return ((CosmosUserResponse)resourceResponse).getCosmosUserSettings();
+ }
+ return null;
+ }
+
+ public Builder nullResource() {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(T resourceResponse) {
+ assertThat(getResource(resourceResponse)).isNull();
+ }
+ });
+ return this;
+ }
+
+ public Builder indexingMode(IndexingMode mode) {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(CosmosContainerResponse resourceResponse) {
+ assertThat(resourceResponse.getCosmosContainerSettings()).isNotNull();
+ assertThat(resourceResponse.getCosmosContainerSettings().getIndexingPolicy()).isNotNull();
+ assertThat(resourceResponse.getCosmosContainerSettings().getIndexingPolicy().getIndexingMode()).isEqualTo(mode);
+ }
+ });
+ return this;
+ }
+
+ public Builder withProperty(String propertyName, String value) {
+ validators.add(new CosmosResponseValidator() {
+ @Override
+ public void validate(T cosmosResponse) {
+ assertThat(getResource(cosmosResponse)).isNotNull();
+ assertThat(getResource(cosmosResponse).get(propertyName)).isEqualTo(value);
+ }
+ });
+ return this;
+ }
+
+ public Builder withCompositeIndexes(Collection> compositeIndexesWritten) {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(CosmosContainerResponse resourceResponse) {
+ Iterator> compositeIndexesReadIterator = resourceResponse.getCosmosContainerSettings()
+ .getIndexingPolicy().getCompositeIndexes().iterator();
+ Iterator> compositeIndexesWrittenIterator = compositeIndexesWritten.iterator();
+
+ ArrayList readIndexesStrings = new ArrayList();
+ ArrayList writtenIndexesStrings = new ArrayList();
+
+ while (compositeIndexesReadIterator.hasNext() && compositeIndexesWrittenIterator.hasNext()) {
+ Iterator compositeIndexReadIterator = compositeIndexesReadIterator.next().iterator();
+ Iterator compositeIndexWrittenIterator = compositeIndexesWrittenIterator.next().iterator();
+
+ StringBuilder readIndexesString = new StringBuilder();
+ StringBuilder writtenIndexesString = new StringBuilder();
+
+ while (compositeIndexReadIterator.hasNext() && compositeIndexWrittenIterator.hasNext()) {
+ CompositePath compositePathRead = compositeIndexReadIterator.next();
+ CompositePath compositePathWritten = compositeIndexWrittenIterator.next();
+
+ readIndexesString.append(compositePathRead.getPath() + ":" + compositePathRead.getOrder() + ";");
+ writtenIndexesString.append(compositePathWritten.getPath() + ":" + compositePathRead.getOrder() + ";");
+ }
+
+ readIndexesStrings.add(readIndexesString.toString());
+ writtenIndexesStrings.add(writtenIndexesString.toString());
+ }
+
+ assertThat(readIndexesStrings).containsExactlyInAnyOrderElementsOf(writtenIndexesStrings);
+ }
+
+ });
+ return this;
+ }
+
+ public Builder withSpatialIndexes(Collection spatialIndexes) {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(CosmosContainerResponse resourceResponse) {
+ Iterator spatialIndexesReadIterator = resourceResponse.getCosmosContainerSettings()
+ .getIndexingPolicy().getSpatialIndexes().iterator();
+ Iterator spatialIndexesWrittenIterator = spatialIndexes.iterator();
+
+ HashMap> readIndexMap = new HashMap>();
+ HashMap> writtenIndexMap = new HashMap>();
+
+ while (spatialIndexesReadIterator.hasNext() && spatialIndexesWrittenIterator.hasNext()) {
+ SpatialSpec spatialSpecRead = spatialIndexesReadIterator.next();
+ SpatialSpec spatialSpecWritten = spatialIndexesWrittenIterator.next();
+
+ String readPath = spatialSpecRead.getPath() + ":";
+ String writtenPath = spatialSpecWritten.getPath() + ":";
+
+ ArrayList readSpatialTypes = new ArrayList();
+ ArrayList writtenSpatialTypes = new ArrayList();
+
+ Iterator spatialTypesReadIterator = spatialSpecRead.getSpatialTypes().iterator();
+ Iterator spatialTypesWrittenIterator = spatialSpecWritten.getSpatialTypes().iterator();
+
+ while (spatialTypesReadIterator.hasNext() && spatialTypesWrittenIterator.hasNext()) {
+ readSpatialTypes.add(spatialTypesReadIterator.next());
+ writtenSpatialTypes.add(spatialTypesWrittenIterator.next());
+ }
+
+ readIndexMap.put(readPath, readSpatialTypes);
+ writtenIndexMap.put(writtenPath, writtenSpatialTypes);
+ }
+
+ for (Entry> entry : readIndexMap.entrySet()) {
+ assertThat(entry.getValue())
+ .containsExactlyInAnyOrderElementsOf(writtenIndexMap.get(entry.getKey()));
+ }
+ }
+ });
+ return this;
+ }
+
+ public Builder withStoredProcedureBody(String storedProcedureBody) {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(CosmosStoredProcedureResponse resourceResponse) {
+ assertThat(resourceResponse.getStoredProcedureSettings().getBody()).isEqualTo(storedProcedureBody);
+ }
+ });
+ return this;
+ }
+
+ public Builder notNullEtag() {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(T resourceResponse) {
+ assertThat(resourceResponse.getResourceSettings()).isNotNull();
+ assertThat(resourceResponse.getResourceSettings().getETag()).isNotNull();
+ }
+ });
+ return this;
+ }
+
+ public Builder withTriggerBody(String functionBody) {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(CosmosTriggerResponse resourceResponse) {
+ assertThat(resourceResponse.getCosmosTriggerSettings().getBody()).isEqualTo(functionBody);
+ }
+ });
+ return this;
+ }
+
+ public Builder withTriggerInternals(TriggerType type, TriggerOperation op) {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(CosmosTriggerResponse resourceResponse) {
+ assertThat(resourceResponse.getCosmosTriggerSettings().getTriggerType()).isEqualTo(type);
+ assertThat(resourceResponse.getCosmosTriggerSettings().getTriggerOperation()).isEqualTo(op);
+ }
+ });
+ return this;
+ }
+
+ public Builder withUserDefinedFunctionBody(String functionBody) {
+ validators.add(new CosmosResponseValidator() {
+
+ @Override
+ public void validate(CosmosUserDefinedFunctionResponse resourceResponse) {
+ assertThat(resourceResponse.getCosmosUserDefinedFunctionSettings().getBody()).isEqualTo(functionBody);
+ }
+ });
+ return this;
+ }
+ }
+}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/DCDocumentCrudTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/DCDocumentCrudTest.java
similarity index 96%
rename from sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/DCDocumentCrudTest.java
rename to sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/DCDocumentCrudTest.java
index 7a73fa0e04bde..a6d5286aab9d5 100644
--- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/DCDocumentCrudTest.java
+++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/DCDocumentCrudTest.java
@@ -20,7 +20,7 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
-package com.microsoft.azure.cosmosdb.rx;
+package com.microsoft.azure.cosmosdb.internal.directconnectivity;
import com.microsoft.azure.cosmosdb.ConnectionMode;
import com.microsoft.azure.cosmosdb.ConnectionPolicy;
@@ -38,10 +38,15 @@
import com.microsoft.azure.cosmosdb.internal.OperationType;
import com.microsoft.azure.cosmosdb.internal.ResourceType;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol;
+import com.microsoft.azure.cosmosdb.rx.DocumentServiceRequestValidator;
+import com.microsoft.azure.cosmosdb.rx.FeedResponseListValidator;
+import com.microsoft.azure.cosmosdb.rx.ResourceResponseValidator;
+import com.microsoft.azure.cosmosdb.rx.TestConfigurations;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
import com.microsoft.azure.cosmosdb.rx.internal.Configs;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import com.microsoft.azure.cosmosdb.rx.internal.SpyClientUnderTestFactory;
+import com.microsoft.azure.cosmosdb.rx.internal.TestSuiteBase;
import org.mockito.stubbing.Answer;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
@@ -67,6 +72,7 @@
* The tests in other test classes validate the actual behaviour and different scenarios.
*/
public class DCDocumentCrudTest extends TestSuiteBase {
+
private final static int QUERY_TIMEOUT = 40000;
private final static String PARTITION_KEY_FIELD_NAME = "mypk";
@@ -249,8 +255,8 @@ public void crossPartitionQuery() {
// validates only the first query for fetching query plan goes to gateway.
assertThat(client.getCapturedRequests().stream().filter(r -> r.getResourceType() == ResourceType.Document)).hasSize(1);
} catch (Throwable error) {
- if (clientBuilder.configs.getProtocol() == Protocol.Tcp) {
- String message = String.format("Direct TCP test failure ignored: desiredConsistencyLevel=%s", this.clientBuilder.desiredConsistencyLevel);
+ if (clientBuilder.getConfigs().getProtocol() == Protocol.Tcp) {
+ String message = String.format("Direct TCP test failure ignored: desiredConsistencyLevel=%s", this.clientBuilder.getDesiredConsistencyLevel());
logger.info(message, error);
throw new SkipException(message, error);
}
@@ -340,4 +346,5 @@ private Document getDocumentDefinition() {
doc.set("name", "Hafez");
return doc;
}
+
}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCacheTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCacheTest.java
index 54d581f42615c..9357869954706 100644
--- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCacheTest.java
+++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayAddressCacheTest.java
@@ -36,12 +36,12 @@
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
import com.microsoft.azure.cosmosdb.rx.TestConfigurations;
-import com.microsoft.azure.cosmosdb.rx.TestSuiteBase;
import com.microsoft.azure.cosmosdb.rx.internal.Configs;
import com.microsoft.azure.cosmosdb.rx.internal.HttpClientFactory;
import com.microsoft.azure.cosmosdb.rx.internal.IAuthorizationTokenProvider;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentClientImpl;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
+import com.microsoft.azure.cosmosdb.rx.internal.TestSuiteBase;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.CompositeHttpClient;
import org.mockito.Matchers;
@@ -877,4 +877,4 @@ private Document getDocumentDefinition() {
, uuid, uuid));
return doc;
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java
index 691139545995a..0f28160d1f3da 100644
--- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java
+++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/internal/directconnectivity/GatewayServiceConfigurationReaderTest.java
@@ -45,7 +45,7 @@
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
import com.microsoft.azure.cosmosdb.rx.TestConfigurations;
-import com.microsoft.azure.cosmosdb.rx.TestSuiteBase;
+import com.microsoft.azure.cosmosdb.rx.internal.TestSuiteBase;
import com.microsoft.azure.cosmosdb.rx.internal.SpyClientUnderTestFactory;
import com.microsoft.azure.cosmosdb.rx.internal.SpyClientUnderTestFactory.ClientUnderTest;
@@ -202,4 +202,4 @@ private HttpClientResponse getMockResponse(String databaseAccountJson)
}
return resp;
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AggregateQueryTests.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AggregateQueryTests.java
index 3a0fb11fb207f..2a3c7696d3f69 100644
--- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AggregateQueryTests.java
+++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AggregateQueryTests.java
@@ -23,23 +23,25 @@
package com.microsoft.azure.cosmosdb.rx;
import java.util.ArrayList;
+import java.util.UUID;
+import com.microsoft.azure.cosmos.CosmosClientBuilder;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol;
+
+import reactor.core.publisher.Flux;
+
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
-import com.microsoft.azure.cosmosdb.Database;
+import com.microsoft.azure.cosmos.CosmosClient;
+import com.microsoft.azure.cosmos.CosmosContainer;
+import com.microsoft.azure.cosmos.CosmosItemSettings;
import com.microsoft.azure.cosmosdb.Document;
-import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.FeedResponse;
-import com.microsoft.azure.cosmosdb.ResourceResponse;
-import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
-
-import rx.Observable;
public class AggregateQueryTests extends TestSuiteBase {
@@ -67,9 +69,8 @@ public AggregateConfig(String operator, Object expected, String condition) {
}
}
- private Database createdDatabase;
- private DocumentCollection createdCollection;
- private ArrayList docs = new ArrayList();
+ private CosmosContainer createdCollection;
+ private ArrayList docs = new ArrayList();
private ArrayList queryConfigs = new ArrayList();
private String partitionKey = "mypk";
@@ -80,10 +81,10 @@ public AggregateConfig(String operator, Object expected, String condition) {
private int numberOfDocumentsWithNumericId;
private int numberOfDocsWithSamePartitionKey = 400;
- private AsyncDocumentClient client;
+ private CosmosClient client;
@Factory(dataProvider = "clientBuildersWithDirect")
- public AggregateQueryTests(Builder clientBuilder) {
+ public AggregateQueryTests(CosmosClientBuilder clientBuilder) {
this.clientBuilder = clientBuilder;
}
@@ -105,10 +106,9 @@ public void queryDocumentsWithAggregates(boolean qmEnabled) throws Exception {
for (QueryConfig queryConfig : queryConfigs) {
- Observable> queryObservable = client
- .queryDocuments(createdCollection.getSelfLink(), queryConfig.query, options);
+ Flux> queryObservable = createdCollection.queryItems(queryConfig.query, options);
- FeedResponseListValidator validator = new FeedResponseListValidator.Builder()
+ FeedResponseListValidator validator = new FeedResponseListValidator.Builder()
.withAggregateValue(queryConfig.expected)
.numberOfPages(1)
.hasValidQueryMetrics(qmEnabled)
@@ -117,8 +117,8 @@ public void queryDocumentsWithAggregates(boolean qmEnabled) throws Exception {
try {
validateQuerySuccess(queryObservable, validator);
} catch (Throwable error) {
- if (this.clientBuilder.configs.getProtocol() == Protocol.Tcp) {
- String message = String.format("Direct TCP test failure ignored: desiredConsistencyLevel=%s", this.clientBuilder.desiredConsistencyLevel);
+ if (this.clientBuilder.getConfigs().getProtocol() == Protocol.Tcp) {
+ String message = String.format("Direct TCP test failure ignored: desiredConsistencyLevel=%s", this.clientBuilder.getDesiredConsistencyLevel());
logger.info(message, error);
throw new SkipException(message, error);
}
@@ -127,38 +127,35 @@ public void queryDocumentsWithAggregates(boolean qmEnabled) throws Exception {
}
}
- public void bulkInsert(AsyncDocumentClient client) {
+ public void bulkInsert() {
generateTestData();
-
- ArrayList>> result = new ArrayList>>();
- for (int i = 0; i < docs.size(); i++) {
- result.add(client.createDocument("dbs/" + createdDatabase.getId() + "/colls/" + createdCollection.getId(), docs.get(i), null, false));
- }
-
- Observable.merge(result, 100).toList().toBlocking().single();
+ bulkInsertBlocking(createdCollection, docs);
}
public void generateTestData() {
Object[] values = new Object[]{null, false, true, "abc", "cdfg", "opqrs", "ttttttt", "xyz", "oo", "ppp"};
for (int i = 0; i < values.length; i++) {
- Document d = new Document();
+ CosmosItemSettings d = new CosmosItemSettings();
+ d.setId(UUID.randomUUID().toString());
d.set(partitionKey, values[i]);
docs.add(d);
}
for (int i = 0; i < numberOfDocsWithSamePartitionKey; i++) {
- Document d = new Document();
+ CosmosItemSettings d = new CosmosItemSettings();
d.set(partitionKey, uniquePartitionKey);
d.set("resourceId", Integer.toString(i));
d.set(field, i + 1);
+ d.setId(UUID.randomUUID().toString());
docs.add(d);
}
numberOfDocumentsWithNumericId = numberOfDocuments - values.length - numberOfDocsWithSamePartitionKey;
for (int i = 0; i < numberOfDocumentsWithNumericId; i++) {
- Document d = new Document();
+ CosmosItemSettings d = new CosmosItemSettings();
d.set(partitionKey, i + 1);
+ d.setId(UUID.randomUUID().toString());
docs.add(d);
}
@@ -223,14 +220,13 @@ public void afterClass() {
safeClose(client);
}
- @BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
+ @BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT * 100)
public void beforeClass() throws Exception {
client = clientBuilder.build();
- createdDatabase = SHARED_DATABASE;
- createdCollection = SHARED_MULTI_PARTITION_COLLECTION;
- truncateCollection(SHARED_MULTI_PARTITION_COLLECTION);
+ createdCollection = getSharedMultiPartitionCosmosContainer(client);
+ truncateCollection(createdCollection);
- bulkInsert(client);
+ bulkInsert();
generateTestConfigs();
waitIfNeededForReplicasToCatchUp(clientBuilder);
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AttachmentCrudTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AttachmentCrudTest.java
deleted file mode 100644
index a229e251a2a5d..0000000000000
--- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AttachmentCrudTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * The MIT License (MIT)
- * Copyright (c) 2018 Microsoft Corporation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-package com.microsoft.azure.cosmosdb.rx;
-
-import java.util.UUID;
-
-import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol;
-import org.testng.SkipException;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-
-import com.microsoft.azure.cosmosdb.Attachment;
-import com.microsoft.azure.cosmosdb.Database;
-import com.microsoft.azure.cosmosdb.Document;
-import com.microsoft.azure.cosmosdb.DocumentCollection;
-import com.microsoft.azure.cosmosdb.PartitionKey;
-import com.microsoft.azure.cosmosdb.RequestOptions;
-import com.microsoft.azure.cosmosdb.ResourceResponse;
-import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
-
-import rx.Observable;
-
-import javax.net.ssl.SSLException;
-
-public class AttachmentCrudTest extends TestSuiteBase {
-
- private Database createdDatabase;
- private DocumentCollection createdCollection;
- private Document createdDocument;
-
- private AsyncDocumentClient client;
-
- @Factory(dataProvider = "clientBuildersWithDirectHttps") // Direct TCP mode does not support attachments
- public AttachmentCrudTest(AsyncDocumentClient.Builder clientBuilder) {
- this.clientBuilder = clientBuilder;
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void createAttachment() throws Exception {
- // create an Attachment
- String uuid = UUID.randomUUID().toString();
- Attachment attachment = getAttachmentDefinition(uuid, "application/text");
-
- RequestOptions options = new RequestOptions();
- options.setPartitionKey(new PartitionKey(createdDocument.getId()));
- Observable> createObservable = client.createAttachment(getDocumentLink(), attachment, options);
-
- // validate attachment creation
- ResourceResponseValidator validator = new ResourceResponseValidator.Builder()
- .withId(attachment.getId())
- .withContentType("application/text")
- .notNullEtag()
- .build();
- validateSuccess(createObservable, validator);
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void readAttachment() throws Exception {
- // create an Attachment
- String uuid = UUID.randomUUID().toString();
- Attachment attachment = getAttachmentDefinition(uuid, "application/text");
-
- RequestOptions options = new RequestOptions();
- options.setPartitionKey(new PartitionKey(createdDocument.getId()));
- Attachment readBackAttachment = client.createAttachment(getDocumentLink(), attachment, options).toBlocking().single().getResource();
-
- waitIfNeededForReplicasToCatchUp(clientBuilder);
-
- // read attachment
- Observable> readObservable = client.readAttachment(readBackAttachment.getSelfLink(), options);
-
- // validate attachment read
- ResourceResponseValidator validator = new ResourceResponseValidator.Builder()
- .withId(attachment.getId())
- .withContentType("application/text")
- .notNullEtag()
- .build();
- validateSuccess(readObservable, validator);
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void deleteAttachment() throws Exception {
- // create an Attachment
- String uuid = UUID.randomUUID().toString();
- Attachment attachment = getAttachmentDefinition(uuid, "application/text");
-
- RequestOptions options = new RequestOptions();
- options.setPartitionKey(new PartitionKey(createdDocument.getId()));
- Attachment readBackAttachment = client.createAttachment(getDocumentLink(), attachment, options).toBlocking().single().getResource();
-
- // delete attachment
- Observable> deleteObservable = client.deleteAttachment(readBackAttachment.getSelfLink(), options);
-
- // validate attachment delete
- ResourceResponseValidator validator = new ResourceResponseValidator.Builder()
- .nullResource()
- .build();
- validateSuccess(deleteObservable, validator);
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void upsertAttachment() throws Exception {
- // create an Attachment
- String uuid = UUID.randomUUID().toString();
- Attachment attachment = getAttachmentDefinition(uuid, "application/text");
-
- RequestOptions options = new RequestOptions();
- options.setPartitionKey(new PartitionKey(createdDocument.getId()));
- Attachment readBackAttachment = client.upsertAttachment(getDocumentLink(), attachment, options).toBlocking().single().getResource();
-
- // read attachment
- waitIfNeededForReplicasToCatchUp(clientBuilder);
- Observable> readObservable = client.readAttachment(readBackAttachment.getSelfLink(), options);
-
- // validate attachment read
- ResourceResponseValidator validator = new ResourceResponseValidator.Builder()
- .withId(attachment.getId())
- .withContentType("application/text")
- .notNullEtag()
- .build();
- validateSuccess(readObservable, validator);
-
- //update attachment
- readBackAttachment.setContentType("application/json");
-
- Observable> updateObservable = client.upsertAttachment(getDocumentLink(), readBackAttachment, options);
-
- // validate attachment update
- ResourceResponseValidator validatorForUpdate = new ResourceResponseValidator.Builder()
- .withId(readBackAttachment.getId())
- .withContentType("application/json")
- .notNullEtag()
- .build();
- validateSuccess(updateObservable, validatorForUpdate);
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void replaceAttachment() throws Exception {
- // create an Attachment
- String uuid = UUID.randomUUID().toString();
- Attachment attachment = getAttachmentDefinition(uuid, "application/text");
-
- RequestOptions options = new RequestOptions();
- options.setPartitionKey(new PartitionKey(createdDocument.getId()));
- Attachment readBackAttachment = client.createAttachment(getDocumentLink(), attachment, options).toBlocking().single().getResource();
-
-
- // read attachment
- waitIfNeededForReplicasToCatchUp(clientBuilder);
- Observable> readObservable = client.readAttachment(readBackAttachment.getSelfLink(), options);
-
- // validate attachment read
- ResourceResponseValidator validator = new ResourceResponseValidator.Builder()
- .withId(attachment.getId())
- .withContentType("application/text")
- .notNullEtag()
- .build();
- validateSuccess(readObservable, validator);
-
- //update attachment
- readBackAttachment.setContentType("application/json");
-
- Observable> updateObservable = client.replaceAttachment(readBackAttachment, options);
-
- // validate attachment update
- ResourceResponseValidator validatorForUpdate = new ResourceResponseValidator.Builder()
- .withId(readBackAttachment.getId())
- .withContentType("application/json")
- .notNullEtag()
- .build();
- validateSuccess(updateObservable, validatorForUpdate);
- }
-
- @BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
- public void beforeClass() {
- client = clientBuilder.build();
- createdDatabase = SHARED_DATABASE;
- createdCollection = SHARED_MULTI_PARTITION_COLLECTION;
- createdDocument = createDocument(client, createdDatabase.getId(), createdCollection.getId(), getDocumentDefinition());
- }
-
- @AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
- public void afterClass() {
- safeClose(client);
- }
-
- private String getDocumentLink() {
- return createdDocument.getSelfLink();
- }
-
- private static Document getDocumentDefinition() {
- String uuid = UUID.randomUUID().toString();
- Document doc = new Document(String.format("{ "
- + "\"id\": \"%s\", "
- + "\"mypk\": \"%s\", "
- + "\"sgmts\": [[6519456, 1471916863], [2498434, 1455671440]]"
- + "}"
- , uuid, uuid));
- return doc;
- }
-
- private static Attachment getAttachmentDefinition(String uuid, String type) {
- return new Attachment(String.format(
- "{" +
- " 'id': '%s'," +
- " 'media': 'http://xstore.'," +
- " 'MediaType': 'Book'," +
- " 'Author': 'My Book Author'," +
- " 'Title': 'My Book Title'," +
- " 'contentType': '%s'" +
- "}", uuid, type));
- }
-}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AttachmentQueryTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AttachmentQueryTest.java
deleted file mode 100644
index 1a353241e4b95..0000000000000
--- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/AttachmentQueryTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * The MIT License (MIT)
- * Copyright (c) 2018 Microsoft Corporation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-package com.microsoft.azure.cosmosdb.rx;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-
-import com.microsoft.azure.cosmosdb.Attachment;
-import com.microsoft.azure.cosmosdb.Database;
-import com.microsoft.azure.cosmosdb.Document;
-import com.microsoft.azure.cosmosdb.DocumentClientException;
-import com.microsoft.azure.cosmosdb.DocumentCollection;
-import com.microsoft.azure.cosmosdb.FeedOptions;
-import com.microsoft.azure.cosmosdb.FeedResponse;
-import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
-import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
-
-import rx.Observable;
-
-public class AttachmentQueryTest extends TestSuiteBase {
-
- private Database createdDatabase;
- private DocumentCollection createdCollection;
- private List createdAttachments = new ArrayList<>();
-
- private Document createdDocument;
-
- private AsyncDocumentClient client;
-
- public String getCollectionLink() {
- return Utils.getCollectionNameLink(createdDatabase.getId(), createdCollection.getId());
- }
-
- public String getDocumentLink() {
- return createdDocument.getSelfLink();
- }
-
- @Factory(dataProvider = "clientBuilders")
- public AttachmentQueryTest(Builder clientBuilder) {
- this.clientBuilder = clientBuilder;
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void queryWithFilter() throws Exception {
-
- String filterId = createdAttachments.get(0).getId();
- String query = String.format("SELECT * from c where c.id = '%s'", filterId);
-
- FeedOptions options = new FeedOptions();
- options.setMaxItemCount(2);
- Observable> queryObservable = client
- .queryAttachments(getDocumentLink(), query, options);
-
- List expectedDocs = createdAttachments.stream().filter(sp -> filterId.equals(sp.getId()) ).collect(Collectors.toList());
- assertThat(expectedDocs).isNotEmpty();
-
- int expectedPageSize = (expectedDocs.size() + options.getMaxItemCount() - 1) / options.getMaxItemCount();
-
- FeedResponseListValidator validator = new FeedResponseListValidator.Builder()
- .totalSize(expectedDocs.size())
- .exactlyContainsInAnyOrder(expectedDocs.stream().map(d -> d.getResourceId()).collect(Collectors.toList()))
- .numberOfPages(expectedPageSize)
- .pageSatisfy(0, new FeedResponseValidator.Builder()
- .requestChargeGreaterThanOrEqualTo(1.0).build())
- .build();
-
- validateQuerySuccess(queryObservable, validator, 10000);
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void query_NoResults() throws Exception {
-
- String query = "SELECT * from root r where r.id = '2'";
- FeedOptions options = new FeedOptions();
- Observable> queryObservable = client
- .queryAttachments(getDocumentLink(), query, options);
-
- FeedResponseListValidator validator = new FeedResponseListValidator.Builder()
- .containsExactly(new ArrayList<>())
- .numberOfPages(1)
- .pageSatisfy(0, new FeedResponseValidator.Builder()
- .requestChargeGreaterThanOrEqualTo(1.0).build())
- .build();
- validateQuerySuccess(queryObservable, validator);
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void queryAll() throws Exception {
-
- String query = "SELECT * from root";
- FeedOptions options = new FeedOptions();
- options.setMaxItemCount(2);
- options.setEnableCrossPartitionQuery(true);
- Observable> queryObservable = client
- .queryAttachments(getDocumentLink(), query, options);
-
- List expectedDocs = createdAttachments;
-
- int expectedPageSize = (expectedDocs.size() + options.getMaxItemCount() - 1) / options.getMaxItemCount();
-
- FeedResponseListValidator validator = new FeedResponseListValidator
- .Builder()
- .exactlyContainsInAnyOrder(expectedDocs
- .stream()
- .map(d -> d.getResourceId())
- .collect(Collectors.toList()))
- .numberOfPages(expectedPageSize)
- .allPagesSatisfy(new FeedResponseValidator.Builder()
- .requestChargeGreaterThanOrEqualTo(1.0).build())
- .build();
- validateQuerySuccess(queryObservable, validator);
- }
-
- @Test(groups = { "simple" }, timeOut = TIMEOUT)
- public void invalidQuerySytax() throws Exception {
- String query = "I am an invalid query";
- FeedOptions options = new FeedOptions();
- options.setEnableCrossPartitionQuery(true);
- Observable> queryObservable = client
- .queryDocuments(getCollectionLink(), query, options);
-
- FailureValidator validator = new FailureValidator.Builder()
- .instanceOf(DocumentClientException.class)
- .statusCode(400)
- .notNullActivityId()
- .build();
- validateQueryFailure(queryObservable, validator);
- }
-
- public Attachment createAttachment(AsyncDocumentClient client) {
- Attachment attachment = getAttachmentDefinition();
- return client.createAttachment(getDocumentLink(), attachment, null).toBlocking().single().getResource();
- }
-
- @AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
- public void afterClass() {
- safeClose(client);
- }
-
- @BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
- public void beforeClass() throws Exception {
- client = clientBuilder.build();
- createdDatabase = SHARED_DATABASE;
- createdCollection = SHARED_SINGLE_PARTITION_COLLECTION_WITHOUT_PARTITION_KEY;
- truncateCollection(SHARED_SINGLE_PARTITION_COLLECTION_WITHOUT_PARTITION_KEY);
-
- Document docDef = new Document();
- docDef.setId(UUID.randomUUID().toString());
-
- createdDocument = createDocument(client, createdDatabase.getId(), createdCollection.getId(), docDef);
-
- for(int i = 0; i < 5; i++) {
- createdAttachments.add(createAttachment(client));
- }
-
- waitIfNeededForReplicasToCatchUp(clientBuilder);
- }
-
- private static Attachment getAttachmentDefinition() {
- return new Attachment(String.format(
- "{" +
- " 'id': '%s'," +
- " 'media': 'http://xstore.'," +
- " 'MediaType': 'Book'," +
- " 'Author': 'My Book Author'," +
- " 'Title': 'My Book Title'," +
- " 'contentType': '%s'" +
- "}", UUID.randomUUID().toString(), "application/text"));
- }
-}
diff --git a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/BackPressureCrossPartitionTest.java b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/BackPressureCrossPartitionTest.java
index 2a40e985d409b..a8fc9f00ab9c2 100644
--- a/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/BackPressureCrossPartitionTest.java
+++ b/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/BackPressureCrossPartitionTest.java
@@ -22,21 +22,29 @@
*/
package com.microsoft.azure.cosmosdb.rx;
+import com.microsoft.azure.cosmos.ClientUnderTestBuilder;
+import com.microsoft.azure.cosmos.CosmosBridgeInternal;
+import com.microsoft.azure.cosmos.CosmosClient;
+import com.microsoft.azure.cosmos.CosmosClientBuilder;
+import com.microsoft.azure.cosmos.CosmosContainer;
+import com.microsoft.azure.cosmos.CosmosContainerRequestOptions;
+import com.microsoft.azure.cosmos.CosmosContainerSettings;
+import com.microsoft.azure.cosmos.CosmosDatabase;
+import com.microsoft.azure.cosmos.CosmosItemSettings;
import com.microsoft.azure.cosmosdb.DataType;
-import com.microsoft.azure.cosmosdb.Database;
-import com.microsoft.azure.cosmosdb.Document;
-import com.microsoft.azure.cosmosdb.DocumentCollection;
import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.FeedResponse;
import com.microsoft.azure.cosmosdb.IncludedPath;
import com.microsoft.azure.cosmosdb.Index;
import com.microsoft.azure.cosmosdb.IndexingPolicy;
import com.microsoft.azure.cosmosdb.PartitionKeyDefinition;
-import com.microsoft.azure.cosmosdb.RequestOptions;
-import com.microsoft.azure.cosmosdb.ResourceResponse;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Protocol;
-import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient.Builder;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentClientUnderTest;
+
+import io.reactivex.subscribers.TestSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.SkipException;
@@ -45,10 +53,8 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
+import reactor.util.concurrent.Queues;
import rx.Observable;
-import rx.internal.util.RxRingBuffer;
-import rx.observers.TestSubscriber;
-import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.Collection;
@@ -65,18 +71,18 @@ public class BackPressureCrossPartitionTest extends TestSuiteBase {
private static final int SETUP_TIMEOUT = 60000;
private int numberOfDocs = 4000;
- private Database createdDatabase;
- private DocumentCollection createdCollection;
- private List createdDocuments;
+ private CosmosDatabase createdDatabase;
+ private CosmosContainer createdCollection;
+ private List createdDocuments;
- private RxDocumentClientUnderTest client;
+ private CosmosClient client;
private int numberOfPartitions;
public String getCollectionLink() {
return Utils.getCollectionNameLink(createdDatabase.getId(), createdCollection.getId());
}
- static protected DocumentCollection getCollectionDefinition() {
+ static protected CosmosContainerSettings getCollectionDefinition() {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList paths = new ArrayList<>();
paths.add("/mypk");
@@ -98,16 +104,16 @@ static protected DocumentCollection getCollectionDefinition() {
includedPaths.add(includedPath);
indexingPolicy.setIncludedPaths(includedPaths);
- DocumentCollection collectionDefinition = new DocumentCollection();
- collectionDefinition.setId(UUID.randomUUID().toString());
- collectionDefinition.setPartitionKey(partitionKeyDef);
+ CosmosContainerSettings collectionDefinition = new CosmosContainerSettings(
+ UUID.randomUUID().toString(),
+ partitionKeyDef);
collectionDefinition.setIndexingPolicy(indexingPolicy);
return collectionDefinition;
}
@Factory(dataProvider = "simpleClientBuildersWithDirectHttps")
- public BackPressureCrossPartitionTest(Builder clientBuilder) {
+ public BackPressureCrossPartitionTest(CosmosClientBuilder clientBuilder) {
this.clientBuilder = clientBuilder;
}
@@ -115,19 +121,19 @@ private void warmUp() {
FeedOptions options = new FeedOptions();
options.setEnableCrossPartitionQuery(true);
// ensure collection is cached
- client.queryDocuments(getCollectionLink(), "SELECT * FROM r", options).first().toBlocking().single();
+ createdCollection.queryItems("SELECT * FROM r", options).blockFirst();
}
@DataProvider(name = "queryProvider")
public Object[][] queryProvider() {
return new Object[][] {
// query, maxItemCount, max expected back pressure buffered, total number of expected query results
- { "SELECT * FROM r", 1, 2 * RxRingBuffer.SIZE, numberOfDocs},
- { "SELECT * FROM r", 100, 2 * RxRingBuffer.SIZE, numberOfDocs},
- { "SELECT * FROM r ORDER BY r.prop", 100, 2 * RxRingBuffer.SIZE + 3 * numberOfPartitions, numberOfDocs},
- { "SELECT TOP 1000 * FROM r", 1, 2 * RxRingBuffer.SIZE, 1000},
- { "SELECT TOP 1000 * FROM r", 100, 2 * RxRingBuffer.SIZE, 1000},
- { "SELECT TOP 1000 * FROM r ORDER BY r.prop", 100, 2 * RxRingBuffer.SIZE + 3 * numberOfPartitions , 1000},
+ { "SELECT * FROM r", 1, 2 * Queues.SMALL_BUFFER_SIZE, numberOfDocs},
+ { "SELECT * FROM r", 100, 2 * Queues.SMALL_BUFFER_SIZE, numberOfDocs},
+ { "SELECT * FROM r ORDER BY r.prop", 100, 2 * Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions, numberOfDocs},
+ { "SELECT TOP 1000 * FROM r", 1, 2 * Queues.SMALL_BUFFER_SIZE, 1000},
+ { "SELECT TOP 1000 * FROM r", 100, 2 * Queues.SMALL_BUFFER_SIZE, 1000},
+ { "SELECT TOP 1000 * FROM r ORDER BY r.prop", 100, 2 * Queues.SMALL_BUFFER_SIZE + 3 * numberOfPartitions , 1000},
};
}
@@ -140,20 +146,20 @@ public void query(String query, int maxItemCount, int maxExpectedBufferedCountFo
options.setEnableCrossPartitionQuery(true);
options.setMaxItemCount(maxItemCount);
options.setMaxDegreeOfParallelism(2);
- Observable> queryObservable = client
- .queryDocuments(getCollectionLink(), query, options);
+ Flux> queryObservable = createdCollection.queryItems(query, options);
- client.httpRequests.clear();
+ RxDocumentClientUnderTest rxClient = (RxDocumentClientUnderTest)CosmosBridgeInternal.getAsyncDocumentClient(client);
+ rxClient.httpRequests.clear();
log.info("instantiating subscriber ...");
- TestSubscriber