Skip to content

Commit

Permalink
Adding the QueryExecutor implementation (#19515)
Browse files Browse the repository at this point in the history
* Adding the QueryExecutor implementation, without using it. Refactored resource management

* Rebased off the latest main branch

* Changing the batch load params, and explicitly setting the PartKey in the queryItems

* Adding a configurable param for the bulkLoad batch size

* Resolving changes from the PR review

Co-authored-by: Amar Athavale <[email protected]>
  • Loading branch information
amarathavale and Amar Athavale authored Mar 5, 2021
1 parent e8505ad commit 7786a39
Show file tree
Hide file tree
Showing 16 changed files with 593 additions and 127 deletions.
12 changes: 10 additions & 2 deletions sdk/cosmos/azure-cosmos-benchmark/ctl/linkedin/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
## - ctl_concurrency (optional: default 50)
## - ctl_consistency_level (optional: default Session)
## - ctl_number_of_precreated_documents (optional: default 100,000)
## - ctl_bulk_load_batch_size (optional: default 200,000)
## - ctl_number_of_operations (optional: default 1,000,000)
## - ctl_max_running_time_duration (optional: default 10 minutes)
## - ctl_printing_interval (optional: default 30 seconds)
Expand Down Expand Up @@ -63,6 +64,13 @@ else
number_of_precreated_documents=$ctl_number_of_precreated_documents
fi

if [ -z "$ctl_bulk_load_batch_size" ]
then
bulk_load_batch_size=200000
else
bulk_load_batch_size=$ctl_bulk_load_batch_size
fi

if [ -z "$ctl_number_of_operations" ]
then
number_of_operations=-1
Expand Down Expand Up @@ -98,9 +106,9 @@ jvm_opt=""

if [ -z "$ctl_graphite_endpoint" ]
then
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -numberOfPreCreatedDocuments $number_of_precreated_documents -printingInterval $printing_interval -manageResources 2>&1 | tee -a "$log_filename"
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -numberOfPreCreatedDocuments $number_of_precreated_documents -bulkloadBatchSize $bulk_load_batch_size -printingInterval $printing_interval 2>&1 | tee -a "$log_filename"
else
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -graphiteEndpoint $ctl_graphite_endpoint -numberOfPreCreatedDocuments $number_of_precreated_documents -printingInterval $printing_interval -manageResources 2>&1 | tee -a "$log_filename"
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -graphiteEndpoint $ctl_graphite_endpoint -numberOfPreCreatedDocuments $number_of_precreated_documents -bulkloadBatchSize $bulk_load_batch_size -printingInterval $printing_interval 2>&1 | tee -a "$log_filename"
fi

end=`date +%s`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public class Configuration {
@Parameter(names = "-readWriteQueryPct", description = "Comma separated read write query workload percent")
private String readWriteQueryPct = "90,9,1";

@Parameter(names = "-manageResources", description = "Control switch for creating/deleting underlying test resources")
private boolean manageResources = false;
@Parameter(names = "-manageDatabase", description = "Control switch for creating/deleting underlying database resource")
private boolean manageDatabase = false;

@Parameter(names = "-operation", description = "Type of Workload:\n"
+ "\tReadThroughput- run a READ workload that prints only throughput *\n"
Expand Down Expand Up @@ -161,6 +161,9 @@ public Duration convert(String value) {
@Parameter(names = "-contentResponseOnWriteEnabled", description = "if set to false, does not returns content response on document write operations")
private String contentResponseOnWriteEnabled = String.valueOf(true);

@Parameter(names = "-bulkloadBatchSize", description = "Control the number of documents uploaded in each BulkExecutor load iteration (Only supported for the LinkedInCtlWorkload)")
private int bulkloadBatchSize = 200000;

@Parameter(names = {"-h", "-help", "--help"}, description = "Help", help = true)
private boolean help = false;

Expand Down Expand Up @@ -393,8 +396,12 @@ public String getReadWriteQueryPct() {
return this.readWriteQueryPct;
}

public boolean shouldManageResources() {
return this.manageResources;
public boolean shouldManageDatabase() {
return this.manageDatabase;
}

public int getBulkloadBatchSize() {
return this.bulkloadBatchSize;
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,83 +10,69 @@
import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.linkedin.data.CollectionAttributes;
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
import com.azure.cosmos.benchmark.linkedin.impl.Constants;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.ThroughputProperties;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.azure.cosmos.benchmark.linkedin.impl.Constants.PARTITION_KEY_PATH;
import static com.azure.cosmos.models.ThroughputProperties.createManualThroughput;


public class ResourceManagerImpl implements ResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ResourceManagerImpl.class);
/**
* Implementation for managing only the Collections for this test. This class facilitates
* container creation after the CTL environment has provisioned the database with the
* required throughput
*/
public class CollectionResourceManager implements ResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(CollectionResourceManager.class);
private static final Duration RESOURCE_CRUD_WAIT_TIME = Duration.ofSeconds(30);

private final Configuration _configuration;
private final EntityConfiguration _entityConfiguration;
private final CosmosAsyncClient _client;

public ResourceManagerImpl(final Configuration configuration,
public CollectionResourceManager(final Configuration configuration,
final EntityConfiguration entityConfiguration,
final CosmosAsyncClient client) {
Preconditions.checkNotNull(configuration,
"The Workload configuration defining the parameters can not be null");
Preconditions.checkNotNull(entityConfiguration,
"The Test Entity specific configuration can not be null");
Preconditions.checkNotNull(client, "Need a non-null client for "
+ "setting up the Database and containers for the test");
+ "setting up the Database and collections for the test");
_configuration = configuration;
_entityConfiguration = entityConfiguration;
_client = client;
}

@Override
public void createDatabase() throws CosmosException {
try {
LOGGER.info("Creating database {} for the ctl workload if one doesn't exist", _configuration.getDatabaseId());
final ThroughputProperties throughputProperties = createManualThroughput(_configuration.getThroughput());
_client.createDatabaseIfNotExists(_configuration.getDatabaseId(), throughputProperties)
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Exception while creating database {}", _configuration.getDatabaseId(), e);
throw e;
}

deleteExistingContainers();
}

@Override
public void createContainer() throws CosmosException {
public void createResources() throws CosmosException {
final String containerName = _configuration.getCollectionId();
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
final CollectionAttributes collectionAttributes = _entityConfiguration.collectionAttributes();
try {
LOGGER.info("Creating container {} in the database {}", containerName, database.getId());
final CosmosContainerProperties containerProperties =
new CosmosContainerProperties(containerName, PARTITION_KEY_PATH)
new CosmosContainerProperties(containerName, Constants.PARTITION_KEY_PATH)
.setIndexingPolicy(collectionAttributes.indexingPolicy());
database.createContainerIfNotExists(containerProperties)
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Exception while creating container {}", containerName, e);
LOGGER.error("Exception while creating collection {}", containerName, e);
throw e;
}
}

@Override
public void deleteResources() {
// Delete all the containers in the database
deleteExistingContainers();
deleteExistingCollections();

LOGGER.info("Resource cleanup completed");
LOGGER.info("Collection resource cleanup completed");
}

private void deleteExistingContainers() {
private void deleteExistingCollections() {
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
final List<CosmosAsyncContainer> cosmosAsyncContainers = database.readAllContainers()
.byPage()
Expand All @@ -98,12 +84,12 @@ private void deleteExistingContainers() {

// Run a best effort attempt to delete all existing containers and data there-in
for (CosmosAsyncContainer cosmosAsyncContainer : cosmosAsyncContainers) {
LOGGER.info("Deleting container {} in the Database {}", cosmosAsyncContainer.getId(), _configuration.getDatabaseId());
LOGGER.info("Deleting collection {} in the Database {}", cosmosAsyncContainer.getId(), _configuration.getDatabaseId());
try {
cosmosAsyncContainer.delete()
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Error deleting container {} in the Database {}",
LOGGER.error("Error deleting collection {} in the Database {}",
cosmosAsyncContainer.getId(), _configuration.getDatabaseId(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,32 @@
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;


/**
* This class facilitates generating data in batches.
*/
public class DataGenerationIterator implements Iterator<Map<Key, ObjectNode>> {

public static final int BATCH_SIZE = 200000;

private final DataGenerator _dataGenerator;
private final int _totalRecordCount;
private int _totalDataGenerated;
private final int _batchLoadBatchSize;

/**
* @param dataGenerator The underlying DataGenerator capable of generating a batch of records
* @param recordCount Number of records we want to generate generate for this test.
* Actual data generation happens in pre-determined batch size
* @param batchLoadBatchSize The number of documents to generate, and load, in each BulkExecutor iteration
*/
public DataGenerationIterator(final DataGenerator dataGenerator, int recordCount) {
public DataGenerationIterator(final DataGenerator dataGenerator, int recordCount, int batchLoadBatchSize) {
Preconditions.checkArgument(recordCount > 0,
"The number of documents to generate must be greater than 0");
Preconditions.checkArgument(batchLoadBatchSize > 0,
"The number of documents to generate and load on each BulkExecutor load iteration must be greater than 0");
_dataGenerator = Preconditions.checkNotNull(dataGenerator,
"The underlying DataGenerator for this iterator can not be null");
_totalRecordCount = recordCount;
_batchLoadBatchSize = batchLoadBatchSize;
_totalDataGenerated = 0;
}

Expand All @@ -42,7 +45,7 @@ public boolean hasNext() {

@Override
public Map<Key, ObjectNode> next() {
final int recordsToGenerate = Math.min(BATCH_SIZE, _totalRecordCount - _totalDataGenerated);
final int recordsToGenerate = Math.min(_batchLoadBatchSize, _totalRecordCount - _totalDataGenerated);

// Filter Keys in case there are duplicates
final Map<Key, ObjectNode> newDocuments = _dataGenerator.generate(recordsToGenerate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
public class DataLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(DataLoader.class);

private static final int MAX_BATCH_SIZE = 20000;
private static final int BULK_OPERATION_CONCURRENCY = 5;
private static final Duration BULK_LOAD_WAIT_DURATION = Duration.ofSeconds(120);
private static final int MAX_BATCH_SIZE = 10000;
private static final int BULK_OPERATION_CONCURRENCY = 10;
private static final Duration VALIDATE_DATA_WAIT_DURATION = Duration.ofSeconds(120);
private static final String COUNT_ALL_QUERY = "SELECT COUNT(1) FROM c";
private static final String COUNT_ALL_QUERY_RESULT_FIELD = "$1";

Expand All @@ -48,11 +48,13 @@ public DataLoader(final Configuration configuration,
_client = Preconditions.checkNotNull(client,
"The CosmosAsyncClient needed for data loading can not be null");
_dataGenerator = new DataGenerationIterator(entityConfiguration.dataGenerator(),
_configuration.getNumberOfPreCreatedDocuments());
_configuration.getNumberOfPreCreatedDocuments(),
_configuration.getBulkloadBatchSize());
}

public void loadData() {
LOGGER.info("Starting batched data loading, loading {} documents in each iteration", DataGenerationIterator.BATCH_SIZE);
LOGGER.info("Starting batched data loading, loading {} documents in each iteration",
_configuration.getBulkloadBatchSize());
while (_dataGenerator.hasNext()) {
final Map<Key, ObjectNode> newDocuments = _dataGenerator.next();
bulkCreateItems(newDocuments);
Expand All @@ -71,11 +73,14 @@ private void bulkCreateItems(final Map<Key, ObjectNode> records) {
database.getId(),
containerName);

// We want to wait longer depending on the number of documents in each iteration
final Duration blockingWaitTime = Duration.ofSeconds(120 *
(((_configuration.getBulkloadBatchSize() - 1) / 200000) + 1));
final BulkProcessingOptions<Object> bulkProcessingOptions = new BulkProcessingOptions<>(Object.class);
bulkProcessingOptions.setMaxMicroBatchSize(MAX_BATCH_SIZE)
.setMaxMicroBatchConcurrency(BULK_OPERATION_CONCURRENCY);
container.processBulkOperations(Flux.fromIterable(cosmosItemOperations), bulkProcessingOptions)
.blockLast(BULK_LOAD_WAIT_DURATION);
.blockLast(blockingWaitTime);

LOGGER.info("Completed loading {} documents into [{}:{}]", cosmosItemOperations.size(),
database.getId(),
Expand All @@ -93,7 +98,7 @@ private void validateDataCreation(int expectedSize) {
.queryItems(COUNT_ALL_QUERY, ObjectNode.class)
.byPage()
.collectList()
.block(BULK_LOAD_WAIT_DURATION);
.block(VALIDATE_DATA_WAIT_DURATION);
final int resultCount = Optional.ofNullable(queryItemsResponseList)
.map(responseList -> responseList.get(0))
.map(FeedResponse::getResults)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.benchmark.linkedin;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
import com.azure.cosmos.models.ThroughputProperties;
import com.google.common.base.Preconditions;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* For local testing, the database creation needs to happen as part of the Test setup. This class
* manages the database AND collection setup, and useful for ensuring database and other resources
* and not left unused after local testing
*/
public class DatabaseResourceManager implements ResourceManager {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseResourceManager.class);
private static final Duration RESOURCE_CRUD_WAIT_TIME = Duration.ofSeconds(30);

private final Configuration _configuration;
private final CosmosAsyncClient _client;
private final CollectionResourceManager _collectionResourceManager;

public DatabaseResourceManager(final Configuration configuration,
final EntityConfiguration entityConfiguration,
final CosmosAsyncClient client) {
Preconditions.checkNotNull(configuration,
"The Workload configuration defining the parameters can not be null");
Preconditions.checkNotNull(entityConfiguration,
"The Test Entity specific configuration can not be null");
Preconditions.checkNotNull(client, "Need a non-null client for "
+ "setting up the Database and collections for the test");
_configuration = configuration;
_client = client;
_collectionResourceManager = new CollectionResourceManager(_configuration, entityConfiguration, _client);
}

@Override
public void createResources() throws CosmosException {
try {
LOGGER.info("Creating database {} for the ctl workload if one doesn't exist", _configuration.getDatabaseId());
final ThroughputProperties throughputProperties =
ThroughputProperties.createManualThroughput(_configuration.getThroughput());
_client.createDatabaseIfNotExists(_configuration.getDatabaseId(), throughputProperties)
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Exception while creating database {}", _configuration.getDatabaseId(), e);
throw e;
}

// Delete any existing collections/containers in this database
_collectionResourceManager.deleteResources();

// And recreate the collections for this test
_collectionResourceManager.createResources();
}

@Override
public void deleteResources() {
// Followed by the main database used for testing
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
try {
LOGGER.info("Deleting the main database {} used in this test. Collection", _configuration.getDatabaseId());
database.delete()
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Exception deleting the database {}", _configuration.getDatabaseId(), e);
throw e;
}

LOGGER.info("Database resource cleanup completed");
}
}
Loading

0 comments on commit 7786a39

Please sign in to comment.