Skip to content

Commit

Permalink
Transactional batch for Java (Azure#15775)
Browse files Browse the repository at this point in the history
* Transactional batch seperate

Signed-off-by: Rakesh Kumar <[email protected]>

* Code review changes

Signed-off-by: Rakesh Kumar <[email protected]>

* Few more fixes

Signed-off-by: Rakesh Kumar <[email protected]>

* Fix

Signed-off-by: Rakesh Kumar <[email protected]>

* fix

Signed-off-by: Rakesh Kumar <[email protected]>

* Added issue link

Signed-off-by: Rakesh Kumar <[email protected]>

* beta version change

Signed-off-by: Rakesh Kumar <[email protected]>

* Add Beta change to public classes

Signed-off-by: Rakesh Kumar <[email protected]>

* Minor fix

Signed-off-by: Rakesh Kumar <[email protected]>

* Code review

Signed-off-by: Rakesh Kumar <[email protected]>

* Fix

Signed-off-by: Rakesh Kumar <[email protected]>

* Minor fix

Signed-off-by: Rakesh Kumar <[email protected]>

* Session token fix and test cases

Signed-off-by: Rakesh Kumar <[email protected]>

* code review changes

Signed-off-by: Rakesh Kumar <[email protected]>

* Removing one test case

Signed-off-by: Rakesh Kumar <[email protected]>

* test

Signed-off-by: Rakesh Kumar <[email protected]>

* Fix

Signed-off-by: Rakesh Kumar <[email protected]>

* API change

Signed-off-by: Rakesh Kumar <[email protected]>

* Adding CosmosItemoperation interface

Signed-off-by: Rakesh Kumar <[email protected]>

* Fixing comment

Signed-off-by: Rakesh Kumar <[email protected]>

* Fix

Signed-off-by: Rakesh Kumar <[email protected]>

* Reverting byte buffer change

Signed-off-by: Rakesh Kumar <[email protected]>
  • Loading branch information
rakkuma authored Oct 12, 2020
1 parent 43b549b commit 962f327
Show file tree
Hide file tree
Showing 37 changed files with 3,268 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.ReplicationPolicy;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.RequestTimeline;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.ResourceResponse;
Expand Down Expand Up @@ -607,4 +608,58 @@ public static Duration getRequestTimeoutFromDirectConnectionConfig(DirectConnect
public static Duration getRequestTimeoutFromGatewayConnectionConfig(GatewayConnectionConfig gatewayConnectionConfig) {
return gatewayConnectionConfig.getRequestTimeout();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static String getOperationValueForCosmosItemOperationType(CosmosItemOperationType cosmosItemOperationType) {
return cosmosItemOperationType.getOperationValue();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static RequestOptions toRequestOptions(TransactionalBatchRequestOptions transactionalBatchRequestOptions) {
return transactionalBatchRequestOptions.toRequestOptions();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static TransactionalBatchOperationResult createTransactionBatchResult(
String eTag,
double requestCharge,
ObjectNode resourceObject,
int statusCode,
Duration retryAfter,
int subStatusCode,
CosmosItemOperation cosmosItemOperation) {

return new TransactionalBatchOperationResult(
eTag,
requestCharge,
resourceObject,
statusCode,
retryAfter,
subStatusCode,
cosmosItemOperation);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static TransactionalBatchResponse createTransactionBatchResponse(
int responseStatusCode,
int responseSubStatusCode,
String errorMessage,
Map<String, String> responseHeaders,
CosmosDiagnostics cosmosDiagnostics) {

return new TransactionalBatchResponse(
responseStatusCode,
responseSubStatusCode,
errorMessage,
responseHeaders,
cosmosDiagnostics);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static void addTransactionBatchResultInResponse(
TransactionalBatchResponse transactionalBatchResponse,
List<TransactionalBatchOperationResult> transactionalBatchOperationResults) {

transactionalBatchResponse.addAll(transactionalBatchOperationResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.batch.BatchExecutor;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.models.CosmosConflictProperties;
import com.azure.cosmos.models.CosmosContainerProperties;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class CosmosAsyncContainer {
private final String queryItemsSpanName;
private final String readAllConflictsSpanName;
private final String queryConflictsSpanName;
private final String batchSpanName;
private CosmosAsyncScripts scripts;

CosmosAsyncContainer(String id, CosmosAsyncDatabase database) {
Expand All @@ -87,6 +89,7 @@ public class CosmosAsyncContainer {
this.queryItemsSpanName = "queryItems." + this.id;
this.readAllConflictsSpanName = "readAllConflicts." + this.id;
this.queryConflictsSpanName = "queryConflicts." + this.id;
this.batchSpanName = "transactionalBatch." + this.id;
}

/**
Expand Down Expand Up @@ -493,6 +496,95 @@ private <T> T transform(Object object, Class<T> classType) {
return Utils.getSimpleObjectMapper().convertValue(object, classType);
}

/**
* Executes the transactional batch.
*
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
*
* @return A Mono response which contains details of execution of the transactional batch.
* <p>
* If the transactional batch executes successfully, the value returned by {@link
* TransactionalBatchResponse#getStatusCode} on the response returned will be set to 200}.
* <p>
* If an operation within the transactional batch fails during execution, no changes from the batch will be
* committed and the status of the failing operation is made available by {@link
* TransactionalBatchResponse#getStatusCode} or by the exception. To obtain information about the operations
* that failed in case of some user error like conflict, not found etc, the response can be enumerated.
* This returns {@link TransactionalBatchOperationResult} instances corresponding to each operation in the
* transactional batch in the order they were added to the transactional batch.
* For a result corresponding to an operation within the transactional batch, use
* {@link TransactionalBatchOperationResult#getStatusCode}
* to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
* another operation within the transactional batch, the value of this field will be 424;
* for the operation that caused the batch to abort, the value of this field
* will indicate the cause of failure.
* <p>
* If there are issues such as request timeouts, Gone, session not available, network failure
* or if the service somehow returns 5xx then the Mono will return error instead of TransactionalBatchResponse.
* <p>
* Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
* transactional batch succeeded.
*/
@Beta(Beta.SinceVersion.V4_7_0)
public Mono<TransactionalBatchResponse> executeTransactionalBatch(TransactionalBatch transactionalBatch) {
return executeTransactionalBatch(transactionalBatch, new TransactionalBatchRequestOptions());
}

/**
* Executes the transactional batch.
*
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
* @param requestOptions Options that apply specifically to batch request.
*
* @return A Mono response which contains details of execution of the transactional batch.
* <p>
* If the transactional batch executes successfully, the value returned by {@link
* TransactionalBatchResponse#getStatusCode} on the response returned will be set to 200}.
* <p>
* If an operation within the transactional batch fails during execution, no changes from the batch will be
* committed and the status of the failing operation is made available by {@link
* TransactionalBatchResponse#getStatusCode} or by the exception. To obtain information about the operations
* that failed in case of some user error like conflict, not found etc, the response can be enumerated.
* This returns {@link TransactionalBatchOperationResult} instances corresponding to each operation in the
* transactional batch in the order they were added to the transactional batch.
* For a result corresponding to an operation within the transactional batch, use
* {@link TransactionalBatchOperationResult#getStatusCode}
* to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
* another operation within the transactional batch, the value of this field will be 424;
* for the operation that caused the batch to abort, the value of this field
* will indicate the cause of failure.
* <p>
* If there are issues such as request timeouts, Gone, session not available, network failure
* or if the service somehow returns 5xx then the Mono will return error instead of TransactionalBatchResponse.
* <p>
* Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
* transactional batch succeeded.
*/
@Beta(Beta.SinceVersion.V4_7_0)
public Mono<TransactionalBatchResponse> executeTransactionalBatch(
TransactionalBatch transactionalBatch,
TransactionalBatchRequestOptions requestOptions) {

if (requestOptions == null) {
requestOptions = new TransactionalBatchRequestOptions();
}

final TransactionalBatchRequestOptions transactionalBatchRequestOptions = requestOptions;

return withContext(context -> {
final BatchExecutor executor = new BatchExecutor(this, transactionalBatch, transactionalBatchRequestOptions);
final Mono<TransactionalBatchResponse> responseMono = executor.executeAsync();

return database.getClient().getTracerProvider().
traceEnabledBatchResponsePublisher(
responseMono,
context,
this.batchSpanName,
database.getId(),
database.getClient().getServiceEndpoint());
});
}

/**
* Reads an item.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,19 @@ private CosmosItemResponse<Object> blockDeleteItemResponse(Mono<CosmosItemRespon
}
}

private TransactionalBatchResponse blockBatchResponse(Mono<TransactionalBatchResponse> batchResponseMono) {
try {
return batchResponseMono.block();
} catch (Exception ex) {
final Throwable throwable = Exceptions.unwrap(ex);
if (throwable instanceof CosmosException) {
throw (CosmosException) throwable;
} else {
throw ex;
}
}
}

/**
* Read all items as {@link CosmosPagedIterable} in the current container.
*
Expand Down Expand Up @@ -453,6 +466,78 @@ public <T> CosmosItemResponse<Object> deleteItem(T item, CosmosItemRequestOption
return this.blockDeleteItemResponse(asyncContainer.deleteItem(item, options));
}

/**
* Executes the transactional batch.
*
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
*
* @return A TransactionalBatchResponse which contains details of execution of the transactional batch.
* <p>
* If the transactional batch executes successfully, the value returned by {@link
* TransactionalBatchResponse#getStatusCode} on the response returned will be set to 200}.
* <p>
* If an operation within the transactional batch fails during execution, no changes from the batch will be
* committed and the status of the failing operation is made available by {@link
* TransactionalBatchResponse#getStatusCode} or by the exception. To obtain information about the operations
* that failed in case of some user error like conflict, not found etc, the response can be enumerated.
* This returns {@link TransactionalBatchOperationResult} instances corresponding to each operation in the
* transactional batch in the order they were added to the transactional batch.
* For a result corresponding to an operation within the transactional batch, use
* {@link TransactionalBatchOperationResult#getStatusCode}
* to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
* another operation within the transactional batch, the value of this field will be 424;
* for the operation that caused the batch to abort, the value of this field
* will indicate the cause of failure.
* <p>
* If there are issues such as request timeouts, Gone, session not available, network failure
* or if the service somehow returns 5xx then this will throw an exception instead of returning a TransactionalBatchResponse.
* <p>
* Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
* transactional batch succeeded.
*/
@Beta(Beta.SinceVersion.V4_7_0)
public TransactionalBatchResponse executeTransactionalBatch(TransactionalBatch transactionalBatch) {
return this.blockBatchResponse(asyncContainer.executeTransactionalBatch(transactionalBatch));
}

/**
* Executes the transactional batch.
*
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
* @param requestOptions Options that apply specifically to batch request.
*
* @return A TransactionalBatchResponse which contains details of execution of the transactional batch.
* <p>
* If the transactional batch executes successfully, the value returned by {@link
* TransactionalBatchResponse#getStatusCode} on the response returned will be set to 200}.
* <p>
* If an operation within the transactional batch fails during execution, no changes from the batch will be
* committed and the status of the failing operation is made available by {@link
* TransactionalBatchResponse#getStatusCode} or by the exception. To obtain information about the operations
* that failed in case of some user error like conflict, not found etc, the response can be enumerated.
* This returns {@link TransactionalBatchOperationResult} instances corresponding to each operation in the
* transactional batch in the order they were added to the transactional batch.
* For a result corresponding to an operation within the transactional batch, use
* {@link TransactionalBatchOperationResult#getStatusCode}
* to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
* another operation within the transactional batch, the value of this field will be 424;
* for the operation that caused the batch to abort, the value of this field
* will indicate the cause of failure.
* <p>
* If there are issues such as request timeouts, Gone, session not available, network failure
* or if the service somehow returns 5xx then this will throw an exception instead of returning a TransactionalBatchResponse.
* <p>
* Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
* transactional batch succeeded.
*/
@Beta(Beta.SinceVersion.V4_7_0)
public TransactionalBatchResponse executeTransactionalBatch(
TransactionalBatch transactionalBatch,
TransactionalBatchRequestOptions requestOptions) {

return this.blockBatchResponse(asyncContainer.executeTransactionalBatch(transactionalBatch, requestOptions));
}

/**
* Gets the Cosmos scripts using the current container as context.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.util.Beta;

@Beta(Beta.SinceVersion.V4_7_0)
public interface CosmosItemOperation {
String getId();

PartitionKey getPartitionKeyValue();

CosmosItemOperationType getOperationType();

<T> T getItem();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import com.azure.cosmos.implementation.batch.BatchRequestResponseConstant;
import com.azure.cosmos.util.Beta;

@Beta(Beta.SinceVersion.V4_7_0)
public enum CosmosItemOperationType {

CREATE(BatchRequestResponseConstant.OPERATION_CREATE),
DELETE(BatchRequestResponseConstant.OPERATION_DELETE),
READ(BatchRequestResponseConstant.OPERATION_READ),
REPLACE(BatchRequestResponseConstant.OPERATION_REPLACE),
UPSERT(BatchRequestResponseConstant.OPERATION_UPSERT);

CosmosItemOperationType(String operationValue) {
this.operationValue = operationValue;
}

String getOperationValue() {
return operationValue;
}

private final String operationValue;
}
Loading

0 comments on commit 962f327

Please sign in to comment.