Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: auto-upload SPROCs to CosmosDB #1339

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ in the detailed section referring to by linking pull requests or issues.
* Make all the services injectable (#1285)
* Fix CosmosDB Integration tests (#1313)
* Remove ContractDef from Cosmos DB cache when deleting (#1330)
* Auto-upload of Cosmos stored procedures (#1338)

## [milestone-3] - 2022-04-08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ protected CosmosContractNegotiationStoreConfig(ServiceExtensionContext context)
super(context);
}

/**
* Boolean setting to allow or disallow auto-uploading any stored procedures that this extension needs.
* Disable to reduce startup times.
*
* @return the key of the setting
*/
public String allowSprocAutoUploadSetting() {
return "edc.contractnegotiationstore.cosmos.allow.sproc.autoupload";
}

@Override
protected String getAccountNameSetting() {
return "edc.contractnegotiationstore.cosmos.account-name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.dataspaceconnector.spi.system.ServiceExtensionContext;
import org.eclipse.dataspaceconnector.spi.system.health.HealthCheckService;


@Provides({ ContractNegotiationStore.class })
public class CosmosContractNegotiationStoreExtension implements ServiceExtension {

Expand All @@ -49,6 +50,10 @@ public void initialize(ServiceExtensionContext context) {

context.getService(HealthCheckService.class).addReadinessProvider(() -> cosmosDbApi.get().forComponent(name()));

if (context.getSetting(configuration.allowSprocAutoUploadSetting(), true)) {
cosmosDbApi.uploadStoredProcedure("nextForState");
cosmosDbApi.uploadStoredProcedure("lease");
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ public interface CosmosDbApi extends ReadinessProvider {
Stream<Object> queryItems(String query);

String invokeStoredProcedure(String procedureName, String partitionKey, Object... args);

/**
* Uploads stored procedure into a container.
*
* @param name of stored procedure js file
*/
void uploadStoredProcedure(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.CosmosStoredProcedureProperties;
import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions;
import com.azure.cosmos.models.CosmosStoredProcedureResponse;
import com.azure.cosmos.models.PartitionKey;
Expand All @@ -42,6 +43,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Scanner;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -216,6 +218,27 @@ public String invokeStoredProcedure(String procedureName, String partitionKey, O
return response.getResponseAsString();
}

@Override
public void uploadStoredProcedure(String name) {
// upload stored procedure
var is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name + ".js");
if (is == null) {
throw new AssertionError("The input stream referring to the " + name + " file cannot be null!");
}

var s = new Scanner(is).useDelimiter("\\A");
if (!s.hasNext()) {
throw new IllegalArgumentException("Error loading resource with name " + ".js");
}
var body = s.next();
var props = new CosmosStoredProcedureProperties(name, body);

var scripts = container.getScripts();
if (scripts.readAllStoredProcedures().stream().noneMatch(sp -> sp.getId().equals(name))) {
scripts.createStoredProcedure(props);
}
}

@Override
public HealthCheckResult get() {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.dataspaceconnector.spi.transfer.store.TransferProcessStore;
import org.eclipse.dataspaceconnector.transfer.store.cosmos.model.TransferProcessDocument;


/**
* Provides an in-memory implementation of the {@link org.eclipse.dataspaceconnector.spi.transfer.store.TransferProcessStore} for testing.
*/
Expand Down Expand Up @@ -59,6 +60,10 @@ public void initialize(ServiceExtensionContext context) {

healthService.addReadinessProvider(() -> cosmosDbApi.get().forComponent(name()));

if (context.getSetting(configuration.allowSprocAutoUploadSetting(), true)) {
cosmosDbApi.uploadStoredProcedure("nextForState");
cosmosDbApi.uploadStoredProcedure("lease");
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ protected TransferProcessStoreCosmosConfig(ServiceExtensionContext context) {
super(context);
}

/**
* Boolean setting to allow or disallow auto-uploading any stored procedures that this extension needs.
* Disable to reduce startup times.
*
* @return the key of the setting
*/
public String allowSprocAutoUploadSetting() {
return "edc.transfer-process-store.cosmos.allow.sproc.autoupload";
}

@Override
protected String getAccountNameSetting() {
return COSMOS_ACCOUNTNAME_SETTING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.eclipse.dataspaceconnector.azure.cosmos.util.StoredProcedureTestUtils.uploadStoredProcedure;
import static org.eclipse.dataspaceconnector.transfer.store.cosmos.TestHelper.createTransferProcess;
import static org.eclipse.dataspaceconnector.transfer.store.cosmos.TestHelper.createTransferProcessDocument;

Expand Down Expand Up @@ -98,10 +97,11 @@ void setUp() {
var containerName = CONTAINER_PREFIX + UUID.randomUUID();
var containerIfNotExists = database.createContainerIfNotExists(containerName, "/partitionKey");
container = database.getContainer(containerIfNotExists.getProperties().getId());
uploadStoredProcedure(container, "nextForState");
uploadStoredProcedure(container, "lease");

var retryPolicy = new RetryPolicy<>().withMaxRetries(5).withBackoff(1, 3, ChronoUnit.SECONDS);
var cosmosDbApi = new CosmosDbApiImpl(container, false);
cosmosDbApi.uploadStoredProcedure("nextForState");
cosmosDbApi.uploadStoredProcedure("lease");
store = new CosmosTransferProcessStore(cosmosDbApi, typeManager, partitionKey, connectorId, retryPolicy);
}

Expand Down