Skip to content

Commit

Permalink
Multiple fixes (Azure#16326)
Browse files Browse the repository at this point in the history
* This PR fixes multiple issues
- Singlepartition queries going to gateway instead of directMode in directMode
- readMany not working in gateway Mode
- QueryMetricsUtils failing to parse request charge in a non US locale
* Fixing failing test. Since one of the test changes the locale, that might have caused the string format in ParallelDocumentQueryTest to use different locale and fail parsing.
  • Loading branch information
mbhaskar authored Oct 16, 2020
1 parent 236ea87 commit fb85cab
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1757,11 +1757,12 @@ public <T> Mono<FeedResponse<T>> readMany(
CosmosQueryRequestOptions options,
Class<T> klass) {

String resourceLink = parentResourceLinkToQueryLink(collectionLink, ResourceType.Document);
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
OperationType.Query,
ResourceType.Document,
collectionLink, null
);
);

// This should not got to backend
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs =
Expand Down Expand Up @@ -1828,7 +1829,7 @@ public <T> Mono<FeedResponse<T>> readMany(

// create the executable query
return createReadManyQuery(
collectionLink,
resourceLink,
new SqlQuerySpec(DUMMY_SQL_QUERY),
options,
Document.class,
Expand Down Expand Up @@ -1963,7 +1964,7 @@ private SqlQuerySpec createReadManyQuerySpec(List<CosmosItemIdentity> itemIdenti
return new SqlQuerySpec(queryStringBuilder.toString(), parameters);
}



private String createPkSelector(PartitionKeyDefinition partitionKeyDefinition) {
return partitionKeyDefinition.getPaths()
Expand Down Expand Up @@ -3566,7 +3567,9 @@ private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) {
} else {
if ((request.getOperationType() == OperationType.Query || request.getOperationType() == OperationType.SqlQuery) &&
Utils.isCollectionChild(request.getResourceType())) {
if (request.getPartitionKeyRangeIdentity() == null) {
// Go to gateway only when partition key range and partition key are not set. This should be very rare
if (request.getPartitionKeyRangeIdentity() == null &&
request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) == null) {
return this.gatewayProxy;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -65,7 +66,10 @@ class DocumentProducerFeedResponse {
void populatePartitionedQueryMetrics() {
String queryMetricsDelimitedString = pageResult.getResponseHeaders().get(HttpConstants.HttpHeaders.QUERY_METRICS);
if (!StringUtils.isEmpty(queryMetricsDelimitedString)) {
queryMetricsDelimitedString += String.format(";%s=%.2f", QueryMetricsConstants.RequestCharge, pageResult.getRequestCharge());
queryMetricsDelimitedString += String.format(Locale.ROOT,
";%s=%.2f",
QueryMetricsConstants.RequestCharge,
pageResult.getRequestCharge());
ImmutablePair<String, SchedulingTimeSpan> schedulingTimeSpanMap =
new ImmutablePair<>(targetRange.getId(), fetchSchedulingMetrics.getElapsedTime());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;

Expand Down Expand Up @@ -309,6 +310,25 @@ public void queryMetrics(String query, Boolean qmEnabled) {
}
}

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void queryMetricsWithADifferentLocale() {

Locale.setDefault(Locale.GERMAN);
String query = "select * from root where root.id= \"someid\"";
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
Iterator<FeedResponse<InternalObjectNode>> iterator = this.container.queryItems(query, options,
InternalObjectNode.class)
.iterableByPage().iterator();
double requestCharge = 0;
while (iterator.hasNext()) {
FeedResponse<InternalObjectNode> feedResponse = iterator.next();
requestCharge += feedResponse.getRequestCharge();
}
assertThat(requestCharge).isGreaterThan(0);
// resetting locale
Locale.setDefault(Locale.ROOT);
}

private static void validateQueryDiagnostics(
String queryDiagnostics,
Boolean qmEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxStoreModel;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
Expand Down Expand Up @@ -168,4 +169,22 @@ public static Future<?> getFuture() {
public static List<WeakReference<CpuListener>> getListeners() {
return getStaticField(CpuMonitor.class, "cpuListeners");
}

public static RxStoreModel getGatewayProxy(RxDocumentClientImpl rxDocumentClient){
return get(RxStoreModel.class, rxDocumentClient, "gatewayProxy");
}

public static RxStoreModel getRxServerStoreModel(RxDocumentClientImpl rxDocumentClient){
return get(RxStoreModel.class, rxDocumentClient, "storeModel");
}


public static void setGatewayProxy(RxDocumentClientImpl client, RxStoreModel storeModel) {
set(client, storeModel, "gatewayProxy");
}

public static void setServerStoreModel (RxDocumentClientImpl client, RxStoreModel storeModel) {
set(client, storeModel, "storeModel");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -506,7 +507,7 @@ public void afterClass() {
private static InternalObjectNode getDocumentDefinition(int cnt) {
String uuid = UUID.randomUUID().toString();
boolean boolVal = cnt % 2 == 0;
InternalObjectNode doc = new InternalObjectNode(String.format("{ "
InternalObjectNode doc = new InternalObjectNode(String.format(Locale.ROOT, "{ "
+ "\"id\": \"%s\", "
+ "\"prop\" : %d, "
+ "\"_value\" : %f, "
Expand Down Expand Up @@ -677,9 +678,6 @@ private List<InternalObjectNode> queryWithContinuationTokens(String query, int p
//TODO: Fix the test for GW mode
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void readManyWithItemOperations() throws Exception {
if (this.getConnectionPolicy().getConnectionMode() == ConnectionMode.GATEWAY) {
throw new SkipException("Skipping gateway mode. This needs to be fixed");
}

List<Pair<String, PartitionKey>> pairList = new ArrayList<>();
for (int i = 0; i < createdDocuments.size(); i = i + 3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxStoreModel;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.guava25.collect.Lists;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.models.CosmosItemRequestOptions;
Expand All @@ -22,6 +26,7 @@
import com.azure.cosmos.implementation.FeedResponseValidator;
import com.azure.cosmos.implementation.TestUtils;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
Expand Down Expand Up @@ -83,6 +88,41 @@ public void queryDocuments(Boolean qmEnabled) throws Exception {
validateQuerySuccess(queryObservable.byPage(maxItemCount), validator, 10000);
}

@Test(groups = {"simple"})
public void querySinglePartitionDocuments() throws Exception {
// Test to make sure single partition queries go to DirectMode when DirectMode is set
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setPartitionKey(new PartitionKey("mypk"));
CosmosAsyncContainer container = client.getDatabase(createdCollection.getDatabase().getId())
.getContainer(createdCollection.getId());
RxDocumentClientImpl asyncDocumentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(client);
RxStoreModel serverStoreModel = ReflectionUtils.getRxServerStoreModel(asyncDocumentClient);
RxStoreModel gatewayProxy = ReflectionUtils.getGatewayProxy(asyncDocumentClient);



RxStoreModel spyServerStoreModel = Mockito.spy(serverStoreModel);
RxStoreModel spyGatewayProxy = Mockito.spy(gatewayProxy);

ReflectionUtils.setServerStoreModel(asyncDocumentClient, spyServerStoreModel);
ReflectionUtils.setGatewayProxy(asyncDocumentClient, spyGatewayProxy);

CosmosPagedFlux<InternalObjectNode> queryFlux = container
.queryItems("select * from root", options,
InternalObjectNode.class);


queryFlux.byPage().blockLast();

// Validation:
// In gateway mode, serverstoremodel is GatewayStoreModel so below passes
// In direct mode, serverStoreModel is ServerStoreModel. So queryPlan goes through gatewayProxy and the query
// goes through the serverStoreModel
Mockito.verify(spyGatewayProxy, Mockito.times(1)).processMessage(Mockito.any());
Mockito.verify(spyServerStoreModel, Mockito.times(1)).processMessage(Mockito.any());

}

@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void queryDocuments_ParameterizedQueryWithInClause() throws Exception {
String query = "SELECT * from c where c.prop IN (@param1, @param2)";
Expand Down

0 comments on commit fb85cab

Please sign in to comment.