Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed synchronized lists and maps for order by query race condition #1

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class FeedResponseDiagnostics {
private static final Logger LOGGER = LoggerFactory.getLogger(FeedResponseDiagnostics.class);
private Map<String, QueryMetrics> queryMetricsMap;
private QueryInfo.QueryPlanDiagnosticsContext diagnosticsContext;
private List<ClientSideRequestStatistics> clientSideRequestStatisticsList;
private final List<ClientSideRequestStatistics> clientSideRequestStatisticsList;

public FeedResponseDiagnostics(Map<String, QueryMetrics> queryMetricsMap) {
this.queryMetricsMap = queryMetricsMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected Flux<DocumentProducerFeedResponse> produceOnSplit(Flux<DocumentProduce
return replacementProducers.collectList().flux().flatMap(documentProducers -> {
RequestChargeTracker tracker = new RequestChargeTracker();
Map<String, QueryMetrics> queryMetricsMap = new HashMap<>();
List<ClientSideRequestStatistics> clientSideRequestStatisticsList = new ArrayList<>();
List<ClientSideRequestStatistics> clientSideRequestStatisticsList = Collections.synchronizedList(new ArrayList<>());
return OrderByUtils.orderedMerge(resourceType, consumeComparer, tracker, documentProducers, queryMetricsMap,
targetRangeToOrderByContinuationTokenMap, clientSideRequestStatisticsList)
.map(orderByQueryResult -> resultPageFrom(tracker, orderByQueryResult));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class OrderByDocumentQueryExecutionContext<T extends Resource>
private final OrderbyRowComparer<T> consumeComparer;
private final RequestChargeTracker tracker;
private final ConcurrentMap<String, QueryMetrics> queryMetricMap;
List<ClientSideRequestStatistics> clientSideRequestStatisticsList;
private final List<ClientSideRequestStatistics> clientSideRequestStatisticsList;
private Flux<OrderByRowResult<T>> orderByObservable;
private final Map<FeedRangeEpkImpl, OrderByContinuationToken> targetRangeToOrderByContinuationTokenMap;

Expand All @@ -84,8 +84,8 @@ private OrderByDocumentQueryExecutionContext(
this.consumeComparer = consumeComparer;
this.tracker = new RequestChargeTracker();
this.queryMetricMap = new ConcurrentHashMap<>();
this.clientSideRequestStatisticsList = new ArrayList<>();
targetRangeToOrderByContinuationTokenMap = new HashMap<>();
this.clientSideRequestStatisticsList = Collections.synchronizedList(new ArrayList<>());
targetRangeToOrderByContinuationTokenMap = new ConcurrentHashMap<>();
}

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
Expand Down Expand Up @@ -204,7 +204,10 @@ private void initializeWithTokenAndFilter(Map<FeedRangeEpkImpl, OrderByContinuat
String filter) {
for (Map.Entry<FeedRangeEpkImpl, OrderByContinuationToken> entry :
rangeToTokenMapping.entrySet()) {
targetRangeToOrderByContinuationTokenMap.put(entry.getKey(), entry.getValue());
// only put the entry if the value is not null
if (entry.getValue() != null) {
targetRangeToOrderByContinuationTokenMap.put(entry.getKey(), entry.getValue());
}
Map<FeedRangeEpkImpl, String> partitionKeyRangeToContinuationToken = new HashMap<FeedRangeEpkImpl, String>();
partitionKeyRangeToContinuationToken.put(entry.getKey(), null);
super.initialize(collectionRid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -48,6 +50,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
Expand All @@ -58,11 +61,15 @@
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -73,6 +80,7 @@ public class CosmosDiagnosticsTest extends TestSuiteBase {
private static final DateTimeFormatter RESPONSE_TIME_FORMATTER = DateTimeFormatter.ISO_INSTANT;
private CosmosClient gatewayClient;
private CosmosClient directClient;
private CosmosAsyncDatabase cosmosAsyncDatabase;
private CosmosContainer container;
private CosmosAsyncContainer cosmosAsyncContainer;

Expand All @@ -92,6 +100,7 @@ public void beforeClass() {
.directMode()
.buildClient();
cosmosAsyncContainer = getSharedMultiPartitionCosmosContainer(this.gatewayClient.asyncClient());
cosmosAsyncDatabase = directClient.asyncClient().getDatabase(cosmosAsyncContainer.getDatabase().getId());
container = gatewayClient.getDatabase(cosmosAsyncContainer.getDatabase().getId()).getContainer(cosmosAsyncContainer.getId());
}

Expand Down Expand Up @@ -397,6 +406,50 @@ public void queryMetrics(String query, Boolean qmEnabled) {
}
}

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void queryDiagnosticsOnOrderBy() {
// create container with more than 4 physical partitions
String containerId = "testcontainer";
cosmosAsyncDatabase.createContainer(containerId, "/mypk",
ThroughputProperties.createManualThroughput(40000)).block();
CosmosAsyncContainer testcontainer = cosmosAsyncDatabase.getContainer(containerId);
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
testcontainer.createItem(getInternalObjectNode()).block();
options.setMaxDegreeOfParallelism(-1);
String query = "SELECT * from c ORDER BY c._ts DESC";
CosmosPagedFlux<InternalObjectNode> cosmosPagedFlux = testcontainer.queryItems(query, options,
InternalObjectNode.class);
Set<String> partitionKeyRangeIds = new HashSet<>();
Set<String> pkRids = new HashSet<>();
cosmosPagedFlux.byPage().flatMap(feedResponse -> {
String cosmosDiagnosticsString = feedResponse.getCosmosDiagnostics().toString();
// find all partition key range ids in cosmos diagnostics
Pattern pattern = Pattern.compile("(\"partitionKeyRangeId\":\")(\\d)");
Matcher matcher = pattern.matcher(cosmosDiagnosticsString);
while (matcher.find()) {
// get the partition key range id from cosmos diagnostics
String group = matcher.group(2);
partitionKeyRangeIds.add(group);
}
// find all partition key range ids in query metrics
pattern = Pattern.compile("(pkrId:)(\\d)");
matcher = pattern.matcher(cosmosDiagnosticsString);
while (matcher.find()) {
// get the partition key range id from query metrics
String group = matcher.group(2);
pkRids.add(group);
}
return Flux.just(feedResponse);
}).blockLast();

// assert that cosmos diagnostics has diagnostics information for all partitions ids same as query metrics
assertThat(pkRids).isNotEmpty();
assertThat(pkRids).isEqualTo(partitionKeyRangeIds);

deleteCollection(testcontainer);
}

private void validateDirectModeQueryDiagnostics(String diagnostics) {
assertThat(diagnostics).contains("\"connectionMode\":\"DIRECT\"");
assertThat(diagnostics).contains("supplementalResponseStatisticsList");
Expand Down Expand Up @@ -832,14 +885,14 @@ private void validateRntbdStatistics(CosmosDiagnostics cosmosDiagnostics,
assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText()))
.isAfterOrEqualTo(beforeInitializationThreshold);

// Adding 1 ms to cover for rounding errors (only 3 fractional digits)
Instant afterInitializationThreshold = afterInitializingRntbdServiceEndpoint.plusMillis(1);
// Adding 2 ms to cover for rounding errors (only 3 fractional digits)
Instant afterInitializationThreshold = afterInitializingRntbdServiceEndpoint.plusMillis(2);
assertThat(Instant.parse(serviceEndpointStatistics.get("createdTime").asText()))
.isBeforeOrEqualTo(afterInitializationThreshold);

// Adding 1 ms to cover for rounding errors (only 3 fractional digits)
Instant afterOperation2Threshold = afterOperation2.plusMillis(1);
Instant beforeOperation2Threshold = beforeOperation2.minusMillis(1);
// Adding 2 ms to cover for rounding errors (only 3 fractional digits)
Instant afterOperation2Threshold = afterOperation2.plusMillis(2);
Instant beforeOperation2Threshold = beforeOperation2.minusMillis(2);
assertThat(Instant.parse(serviceEndpointStatistics.get("lastRequestTime").asText()))
.isAfterOrEqualTo(beforeOperation2Threshold)
.isBeforeOrEqualTo(afterOperation2Threshold);
Expand Down
Loading