Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Implementation #36611

Merged
merged 49 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
31e8830
Implementing Bulk support in Spring Data Cosmos, this currently is no…
trande4884 Jun 2, 2023
e6e75c6
Reverting files I shouldn't have committed.
trande4884 Jun 2, 2023
c009d4a
Revert "Reverting files I shouldn't have committed."
trande4884 Jun 2, 2023
6d2a965
Revert "Revert "Reverting files I shouldn't have committed.""
trande4884 Jun 2, 2023
c97a9a6
Current working solution.
trande4884 Jun 8, 2023
8c53c20
Merge branch 'main' of github.com:Azure/azure-sdk-for-java into trand…
trande4884 Aug 31, 2023
470ed9a
Updates to the bulk logic, still hitting errors in the tests.
trande4884 Sep 8, 2023
f92a98f
Fixing bugs in implementation and cleaning up code/tests.
trande4884 Sep 22, 2023
acbf25d
Finalizing implementation and implementing for Reactive.
trande4884 Sep 22, 2023
6e3debe
Adding tests and some more cleanup.
trande4884 Sep 22, 2023
0a0004a
Updating changelog and cleaning up javadocs.
trande4884 Sep 22, 2023
1418f09
Fixing build errors.
trande4884 Sep 25, 2023
d2fc09d
Fixing build errors.
trande4884 Sep 25, 2023
5025134
Fixing build errors.
trande4884 Sep 25, 2023
aef3a17
Fixing build errors.
trande4884 Sep 25, 2023
2e2fcbb
Fixing build errors.
trande4884 Sep 25, 2023
fda19c3
Fixing spotbug errors.
trande4884 Sep 26, 2023
0cdc3cf
Fixing spotbug errors.
trande4884 Sep 26, 2023
4b5dc9a
Fixing revapi errors.
trande4884 Sep 26, 2023
9b6a353
Fixing revapi errors.
trande4884 Sep 26, 2023
c7aa001
Fixing revapi errors.
trande4884 Sep 26, 2023
7a977ca
Merge branch 'main' of github.com:Azure/azure-sdk-for-java into trand…
trande4884 Sep 27, 2023
ec48a90
Fixing revapi errors.
trande4884 Sep 27, 2023
b91f9aa
Fixing revapi errors.
trande4884 Sep 27, 2023
617a1df
Fixing revapi errors.
trande4884 Sep 27, 2023
966fbb1
Fixed revAPI for insertAll API
kushagraThapar Sep 27, 2023
5d843aa
Finalizing revapi.
trande4884 Sep 29, 2023
b8d55ef
Fixing logic related to bulk delete and how we determine the Partitio…
trande4884 Oct 5, 2023
10b6fb5
Fixing unchecked cast error.
trande4884 Oct 12, 2023
df147a8
Removing unused import.
trande4884 Oct 12, 2023
1127782
Fixing linting errors.
trande4884 Oct 12, 2023
ecae9a8
Trying to fix linting error.
trande4884 Oct 12, 2023
f821757
Update sdk/spring/azure-spring-data-cosmos/CHANGELOG.md
trande4884 Oct 18, 2023
3d1b24b
Update sdk/spring/azure-spring-data-cosmos/src/main/java/com/azure/sp…
trande4884 Oct 18, 2023
4fd238f
Changing function logic based on review feedback.
trande4884 Oct 18, 2023
02d9f59
Merge remote-tracking branch 'origin/trande_bulkImplemetation' into t…
trande4884 Oct 18, 2023
66effda
Fixing javadocs.
trande4884 Oct 18, 2023
90fc887
Fixing reactive saveAll logic.
trande4884 Oct 19, 2023
4d74d56
Fixing revapi.json
trande4884 Oct 19, 2023
cd9ca16
Removing unused imports.
trande4884 Oct 19, 2023
eb17711
Merge remote-tracking branch 'origin/trande_bulkImplemetation' into t…
trande4884 Oct 19, 2023
b189cda
Fixing revapi.json
trande4884 Oct 19, 2023
7b38498
Cleaning up code after review. Adding options to both the individual …
trande4884 Oct 20, 2023
3e8de55
Merge remote-tracking branch 'origin/trande_bulkImplemetation' into t…
trande4884 Oct 20, 2023
bf8ec28
Fixing revapi.
trande4884 Oct 20, 2023
25b5474
Updating changelog. Added etag related tests which led to some small …
trande4884 Oct 20, 2023
17208f3
Update sdk/spring/azure-spring-data-cosmos/CHANGELOG.md
trande4884 Oct 31, 2023
84a6eb3
Updates from PR requests.
trande4884 Oct 31, 2023
89397c5
Merge remote-tracking branch 'origin/trande_bulkImplemetation' into t…
trande4884 Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions eng/code-quality-reports/src/main/resources/revapi/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@
"new": "method <S extends T> (reactor\\.core\\.publisher\\.Mono<)?S>? com\\.azure\\.spring\\.data\\.cosmos\\.repository\\.(Reactive)?CosmosRepository<T, ID extends java\\.io\\.Serializable>::save\\(ID, com\\.azure\\.cosmos\\.models\\.PartitionKey, java\\.lang\\.Class<S>, com\\.azure\\.cosmos\\.models\\.CosmosPatchOperations(, com\\.azure\\.cosmos\\.models\\.CosmosPatchItemRequestOptions)?\\)",
"justification": "Spring interfaces are allowed to add methods."
},
{
"code": "java.method.addedToInterface",
"new": "method <S extends T, T> void com.azure.spring.data.cosmos.core.CosmosOperations::deleteEntities(com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation<T, ?>, java.lang.Iterable<S>)",
"justification": "Spring interfaces are allowed to add methods."
},
{
"code": "java.method.addedToInterface",
"new": "method <S extends T, T> java.lang.Iterable<S> com.azure.spring.data.cosmos.core.CosmosOperations::insertAll(com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation<T, ?>, java.lang.Iterable<S>)",
"justification": "Spring interfaces are allowed to add methods."
},
{
"code": "java.method.addedToInterface",
"new": "method <S extends T, T> reactor.core.publisher.Mono<java.lang.Void> com.azure.spring.data.cosmos.core.ReactiveCosmosOperations::deleteEntities(com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation<T, ?>, reactor.core.publisher.Flux<S>)",
"justification": "Spring interfaces are allowed to add methods."
},
{
"code": "java.method.addedToInterface",
"new": "method <S extends T, T> reactor.core.publisher.Flux<S> com.azure.spring.data.cosmos.core.ReactiveCosmosOperations::insertAll(com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation<T, ?>, reactor.core.publisher.Flux<S>)",
"justification": "Spring interfaces are allowed to add methods."
},
{
"regex": true,
"code": "java\\.annotation\\.(added|attributeValueChanged)",
Expand Down
1 change: 1 addition & 0 deletions sdk/spring/azure-spring-data-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 3.39.0-beta.1 (Unreleased)

#### Features Added
* Updated Spring and Reactive Spring repository `saveAll` and `deleteAll` APIs to use bulk functionality implementation. NOTE: `azure-spring-data-cosmos` is currently unable to set throughput control limits at the request level, which will need to be achieved by creating multiple clients. - See [PR 36611](https://github.com/Azure/azure-sdk-for-java/pull/36611).

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public interface CosmosOperations {
* @return the patched item
*/
<T> T patch(Object id, PartitionKey partitionKey, Class<T> domainType, CosmosPatchOperations patchOperations, CosmosPatchItemRequestOptions options);

/**
* Inserts item
*
Expand All @@ -158,6 +159,17 @@ public interface CosmosOperations {
*/
<T> T insert(String containerName, T objectToSave, PartitionKey partitionKey);

/**
* Insert items using bulk operations.
*
* @param information must not be {@literal null}
* @param entities must not be {@literal null}
* @param <T> type class of domain type
* @param <S> type class of domain type
* @return the inserted item
*/
<S extends T, T> Iterable<S> insertAll(CosmosEntityInformation<T, ?> information, Iterable<S> entities);

/**
* Inserts item
* @param containerName must not be {@literal null}
Expand Down Expand Up @@ -213,7 +225,17 @@ public interface CosmosOperations {
<T> void deleteEntity(String containerName, T entity);

/**
* Delete all items in a container
* Delete using a list of entities with bulk
*
* @param <T> type class of domain type
* @param <S> type class of domain type
* @param information must not be {@literal null}
* @param entities must not be {@literal null}
*/
<S extends T, T> void deleteEntities(CosmosEntityInformation<T, ?> information, Iterable<S> entities);

/**
* Delete all items in a container. Uses bulk if possible.
*
* @param containerName the container name
* @param domainType the domainType
Expand All @@ -228,7 +250,7 @@ public interface CosmosOperations {
void deleteContainer(String containerName);

/**
* Delete items matching query
* Delete items matching query. Uses bulk if possible.
*
* @param query the document query
* @param domainType type class
Expand Down Expand Up @@ -386,5 +408,4 @@ public interface CosmosOperations {
* @return the Page
*/
<T> Page<T> runPaginationQuery(SqlQuerySpec querySpec, Pageable pageable, Class<?> domainType, Class<T> returnType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkItemRequestOptions;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
Expand Down Expand Up @@ -239,6 +243,54 @@ public <T> T insert(String containerName, T objectToSave, PartitionKey partition
return toDomainObject(domainType, response.getItem());
}

/**
* Insert all items with bulk.
*
* @param information the CosmosEntityInformation
* @param entities the Iterable entities to be inserted
* @param <T> type class of domain type
* @param <S> type class of domain type
* @return Flux of result
*/
@SuppressWarnings("unchecked")
public <S extends T, T> Iterable<S> insertAll(CosmosEntityInformation<T, ?> information, Iterable<S> entities) {
Assert.notNull(entities, "entities to be inserted should not be null");

String containerName = information.getContainerName();
Class<T> domainType = information.getJavaType();

List<CosmosItemOperation> cosmosItemOperations = new ArrayList<>();
entities.forEach(entity -> {
JsonNode originalItem = mappingCosmosConverter.writeJsonNode(entity);
PartitionKey partitionKey = new PartitionKey(information.getPartitionKeyFieldValue(entity));
final CosmosBulkItemRequestOptions options = new CosmosBulkItemRequestOptions();
applyBulkVersioning(domainType, originalItem, options);
cosmosItemOperations.add(CosmosBulkOperations.getUpsertItemOperation(originalItem,
partitionKey, options));
});

// Default micro batch size is 100 which will be too high for most Spring cases, this configuration
// allows it to start at 1 and increase until it finds the appropriate batch size.
CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
cosmosBulkExecutionOptions.setInitialMicroBatchSize(1);

return (Iterable<S>) this.getCosmosAsyncClient()
.getDatabase(this.getDatabaseName())
.getContainer(containerName)
.executeBulkOperations(Flux.fromIterable(cosmosItemOperations), cosmosBulkExecutionOptions)
.publishOn(Schedulers.parallel())
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to insert item(s)", throwable,
this.responseDiagnosticsProcessor))
.flatMap(r -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
r.getResponse().getCosmosDiagnostics(), null);
JsonNode responseItem = r.getResponse().getItem(JsonNode.class);
return responseItem != null ? Flux.just(toDomainObject(domainType, responseItem)) : Flux.empty();
})
.collectList().block();
}

/**
* Patches item
*
Expand Down Expand Up @@ -681,6 +733,51 @@ public <T> void deleteEntity(String containerName, T entity) {
deleteItem(originalItem, containerName, domainType);
}

/**
* Deletes the entities using bulk
*
* @param information the CosmosEntityInformation
* @param entities the Iterable entities to be inserted
* @param <T> type class of domain type
* @param <S> type class of domain type
*/
@Override
public <S extends T, T> void deleteEntities(CosmosEntityInformation<T, ?> information, Iterable<S> entities) {
Assert.notNull(entities, "entities to be deleted should not be null");

String containerName = information.getContainerName();
Class<T> domainType = information.getJavaType();

List<CosmosItemOperation> cosmosItemOperations = new ArrayList<>();
entities.forEach(entity -> {
JsonNode originalItem = mappingCosmosConverter.writeJsonNode(entity);
PartitionKey partitionKey = new PartitionKey(information.getPartitionKeyFieldValue(entity));
final CosmosBulkItemRequestOptions options = new CosmosBulkItemRequestOptions();
applyBulkVersioning(domainType, originalItem, options);
cosmosItemOperations.add(CosmosBulkOperations.getDeleteItemOperation(String.valueOf(information.getId(entity)),
new PartitionKey(information.getPartitionKeyFieldValue(entity)), options));
});

// Default micro batch size is 100 which will be too high for most Spring cases, this configuration
// allows it to start at 1 and increase until it finds the appropriate batch size.
CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
cosmosBulkExecutionOptions.setInitialMicroBatchSize(1);

this.getCosmosAsyncClient()
.getDatabase(this.getDatabaseName())
.getContainer(containerName)
.executeBulkOperations(Flux.fromIterable(cosmosItemOperations), cosmosBulkExecutionOptions)
.publishOn(Schedulers.parallel())
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to delete item(s)", throwable,
this.responseDiagnosticsProcessor))
.flatMap(response -> {
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
response.getResponse().getCosmosDiagnostics(), null);
return Flux.empty();
}).blockLast();
}

private void deleteById(String containerName, Object id, PartitionKey partitionKey,
CosmosItemRequestOptions options) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Expand Down Expand Up @@ -755,7 +852,7 @@ public <T> Boolean exists(@NonNull CosmosQuery query, @NonNull Class<T> domainTy

/**
* Delete the DocumentQuery, need to query the domains at first, then delete the item from the result. The cosmos db
* Sql API do _NOT_ support DELETE query, we cannot add one DeleteQueryGenerator.
* Sql API do _NOT_ support DELETE query, we cannot add one DeleteQueryGenerator. Uses bulk if possible.
trande4884 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param query The representation for query method.
* @param domainType Class of domain
Expand All @@ -771,11 +868,46 @@ public <T> Iterable<T> delete(@NonNull CosmosQuery query, @NonNull Class<T> doma
Assert.hasText(containerName, "container should not be null, empty or only whitespaces");
String finalContainerName = getContainerNameOverride(containerName);

@SuppressWarnings("unchecked")
CosmosEntityInformation<T, Object> entityInfo = (CosmosEntityInformation<T, Object>) CosmosEntityInformation.getInstance(domainType);

final List<JsonNode> results = findItemsAsFlux(query, finalContainerName, domainType).collectList().block();
assert results != null;
return results.stream()
.map(item -> deleteItem(item, finalContainerName, domainType))
.collect(Collectors.toList());

if (entityInfo.getPartitionKeyFieldName() != null) {
trande4884 marked this conversation as resolved.
Show resolved Hide resolved
Flux<CosmosItemOperation> cosmosItemOperationFlux = Flux.fromIterable(results).map(item -> {
T object = toDomainObject(domainType, item);
Object id = entityInfo.getId(object);
String idString = id != null ? id.toString() : "";
final CosmosBulkItemRequestOptions options = new CosmosBulkItemRequestOptions();
applyBulkVersioning(domainType, item, options);
return CosmosBulkOperations.getDeleteItemOperation(idString,
new PartitionKey(entityInfo.getPartitionKeyFieldValue(object)), options);
});

// Default micro batch size is 100 which will be too high for most Spring cases, this configuration
// allows it to start at 1 and increase until it finds the appropriate batch size.
CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
cosmosBulkExecutionOptions.setInitialMicroBatchSize(1);

this.getCosmosAsyncClient()
.getDatabase(this.getDatabaseName())
.getContainer(containerName)
.executeBulkOperations(cosmosItemOperationFlux, cosmosBulkExecutionOptions)
.publishOn(Schedulers.parallel())
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to delete item(s)",
throwable, this.responseDiagnosticsProcessor))
.collectList().block();

return results.stream()
.map(jsonNode -> toDomainObject(domainType, jsonNode))
.collect(Collectors.toList());
} else {
return results.stream()
.map(item -> deleteItem(item, finalContainerName, domainType))
.collect(Collectors.toList());
}
}

@Override
Expand Down Expand Up @@ -1129,6 +1261,15 @@ private void applyVersioning(Class<?> domainType,
}
}

private void applyBulkVersioning(Class<?> domainType,
JsonNode jsonNode,
CosmosBulkItemRequestOptions options) {
CosmosEntityInformation<?, ?> entityInformation = CosmosEntityInformation.getInstance(domainType);
if (entityInformation.isVersioned()) {
options.setIfMatchETag(jsonNode.get(Constants.ETAG_PROPERTY_DEFAULT_NAME).asText());
}
}

private void maybeEmitEvent(CosmosMappingEvent<?> event) {
if (canPublishEvent()) {
this.applicationContext.publishEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ Mono<CosmosContainerProperties> replaceContainerProperties(String containerName,
*/
<T> Mono<T> insert(String containerName, T objectToSave);

/**
* Insert all items with bulk.
*
* @param entityInformation must not be {@literal null}
* @param entities must not be {@literal null}
* @param <T> type class of domain type
* @param <S> type class of domain type
* @return Flux of result
*/
<S extends T, T> Flux<S> insertAll(CosmosEntityInformation<T, ?> entityInformation, Flux<S> entities);

/**
* patches item
* @param id must not be {@literal null}
Expand Down Expand Up @@ -210,7 +221,18 @@ Mono<CosmosContainerProperties> replaceContainerProperties(String containerName,
<T> Mono<Void> deleteEntity(String containerName, T entity);

/**
* Delete all items in a container
* Delete all items with bulk.
*
* @param entityInformation must not be {@literal null}
* @param entities must not be {@literal null}
* @param <T> type class of domain type
* @param <S> type class of domain type
* @return void Mono
*/
<S extends T, T> Mono<Void> deleteEntities(CosmosEntityInformation<T, ?> entityInformation, Flux<S> entities);

/**
* Delete all items in a container. Uses bulk if possible.
*
* @param containerName the container name
* @param domainType the domainType
Expand Down
Loading
Loading