Skip to content

Commit

Permalink
auto-upload SPROCs to CosmosDB (#1339)
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger authored and ndr-brt committed May 20, 2022
1 parent dc8ff6d commit acd0233
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ in the detailed section referring to by linking pull requests or issues.
* Make all the services injectable (#1285)
* Fix CosmosDB Integration tests (#1313)
* Remove ContractDef from Cosmos DB cache when deleting (#1330)
* Auto-upload of Cosmos stored procedures (#1338)

## [milestone-3] - 2022-04-08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ protected CosmosContractNegotiationStoreConfig(ServiceExtensionContext context)
super(context);
}

/**
* Boolean setting to allow or disallow auto-uploading any stored procedures that this extension needs.
* Disable to reduce startup times.
*
* @return the key of the setting
*/
public String allowSprocAutoUploadSetting() {
return "edc.contractnegotiationstore.cosmos.allow.sproc.autoupload";
}

@Override
protected String getAccountNameSetting() {
return "edc.contractnegotiationstore.cosmos.account-name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.dataspaceconnector.spi.system.ServiceExtensionContext;
import org.eclipse.dataspaceconnector.spi.system.health.HealthCheckService;


@Provides({ ContractNegotiationStore.class })
public class CosmosContractNegotiationStoreExtension implements ServiceExtension {

Expand All @@ -46,6 +47,10 @@ public void initialize(ServiceExtensionContext context) {

context.getService(HealthCheckService.class).addReadinessProvider(() -> cosmosDbApi.get().forComponent(name()));

if (context.getSetting(configuration.allowSprocAutoUploadSetting(), true)) {
cosmosDbApi.uploadStoredProcedure("nextForState");
cosmosDbApi.uploadStoredProcedure("lease");
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ public interface CosmosDbApi extends ReadinessProvider {
Stream<Object> queryItems(String query);

String invokeStoredProcedure(String procedureName, String partitionKey, Object... args);

/**
* Uploads stored procedure into a container.
*
* @param name of stored procedure js file
*/
void uploadStoredProcedure(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.CosmosStoredProcedureProperties;
import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions;
import com.azure.cosmos.models.CosmosStoredProcedureResponse;
import com.azure.cosmos.models.PartitionKey;
Expand All @@ -42,6 +43,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Scanner;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -216,6 +218,27 @@ public String invokeStoredProcedure(String procedureName, String partitionKey, O
return response.getResponseAsString();
}

@Override
public void uploadStoredProcedure(String name) {
// upload stored procedure
var is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name + ".js");
if (is == null) {
throw new AssertionError("The input stream referring to the " + name + " file cannot be null!");
}

var s = new Scanner(is).useDelimiter("\\A");
if (!s.hasNext()) {
throw new IllegalArgumentException("Error loading resource with name " + ".js");
}
var body = s.next();
var props = new CosmosStoredProcedureProperties(name, body);

var scripts = container.getScripts();
if (scripts.readAllStoredProcedures().stream().noneMatch(sp -> sp.getId().equals(name))) {
scripts.createStoredProcedure(props);
}
}

@Override
public HealthCheckResult get() {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.dataspaceconnector.spi.transfer.store.TransferProcessStore;
import org.eclipse.dataspaceconnector.transfer.store.cosmos.model.TransferProcessDocument;


/**
* Provides an in-memory implementation of the {@link org.eclipse.dataspaceconnector.spi.transfer.store.TransferProcessStore} for testing.
*/
Expand Down Expand Up @@ -59,6 +60,10 @@ public void initialize(ServiceExtensionContext context) {

healthService.addReadinessProvider(() -> cosmosDbApi.get().forComponent(name()));

if (context.getSetting(configuration.allowSprocAutoUploadSetting(), true)) {
cosmosDbApi.uploadStoredProcedure("nextForState");
cosmosDbApi.uploadStoredProcedure("lease");
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ protected TransferProcessStoreCosmosConfig(ServiceExtensionContext context) {
super(context);
}

/**
* Boolean setting to allow or disallow auto-uploading any stored procedures that this extension needs.
* Disable to reduce startup times.
*
* @return the key of the setting
*/
public String allowSprocAutoUploadSetting() {
return "edc.transfer-process-store.cosmos.allow.sproc.autoupload";
}

@Override
protected String getAccountNameSetting() {
return COSMOS_ACCOUNTNAME_SETTING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.eclipse.dataspaceconnector.azure.cosmos.util.StoredProcedureTestUtils.uploadStoredProcedure;
import static org.eclipse.dataspaceconnector.transfer.store.cosmos.TestHelper.createTransferProcess;
import static org.eclipse.dataspaceconnector.transfer.store.cosmos.TestHelper.createTransferProcessDocument;

Expand Down Expand Up @@ -98,10 +97,11 @@ void setUp() {
var containerName = CONTAINER_PREFIX + UUID.randomUUID();
var containerIfNotExists = database.createContainerIfNotExists(containerName, "/partitionKey");
container = database.getContainer(containerIfNotExists.getProperties().getId());
uploadStoredProcedure(container, "nextForState");
uploadStoredProcedure(container, "lease");

var retryPolicy = new RetryPolicy<>().withMaxRetries(5).withBackoff(1, 3, ChronoUnit.SECONDS);
var cosmosDbApi = new CosmosDbApiImpl(container, false);
cosmosDbApi.uploadStoredProcedure("nextForState");
cosmosDbApi.uploadStoredProcedure("lease");
store = new CosmosTransferProcessStore(cosmosDbApi, typeManager, partitionKey, connectorId, retryPolicy);
}

Expand Down

0 comments on commit acd0233

Please sign in to comment.