Skip to content

Commit

Permalink
fix after review
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Jul 20, 2023
1 parent aeb1016 commit 79a582c
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class InMemoryEndpointDataReferenceCache implements EndpointDataReference
private final LockManager lockManager;
private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter();


private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;
Expand Down Expand Up @@ -174,7 +175,7 @@ public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String
}


public @NotNull List<EndpointDataReferenceEntry> leaseAndGet(int max, Criterion... criteria) {
private @NotNull List<EndpointDataReferenceEntry> leaseAndGet(int max, Criterion... criteria) {
return lockManager.writeLock(() -> {
var filterPredicate = Arrays.stream(criteria).map(predicateConverter::convert).reduce(x -> true, Predicate::and);
var entities = entriesByEdrId.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.response.ResponseStatus;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static org.eclipse.tractusx.edc.edr.core.EdrCoreExtension.DEFAULT_ITERATION_WAIT;
import static org.eclipse.tractusx.edc.edr.core.EdrCoreExtension.DEFAULT_SEND_RETRY_BASE_DELAY;
import static org.eclipse.tractusx.edc.edr.core.EdrCoreExtension.DEFAULT_SEND_RETRY_LIMIT;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.DELETING;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.EXPIRED;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.NEGOTIATED;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.from;
Expand Down Expand Up @@ -110,6 +112,7 @@ public void start() {
stateMachineManager = StateMachineManager.Builder.newInstance("edr-manager", monitor, executorInstrumentation, waitStrategy)
.processor(processEdrInState(NEGOTIATED, this::processNegotiated))
.processor(processEdrInState(EXPIRED, this::processExpired))
.processor(processDeletingEdr(this::processDeleting))
.build();

stateMachineManager.start();
Expand All @@ -136,17 +139,17 @@ protected void transitionToExpired(EndpointDataReferenceEntry edrEntry) {
update(edrEntry);
}

protected void transitionToDeleted(EndpointDataReferenceEntry edrEntry) {
edrEntry.transitionToDeleted();
update(edrEntry);
}

protected void transitionToError(EndpointDataReferenceEntry edrEntry, String message) {
edrEntry.setErrorDetail(message);
edrEntry.transitionError();
update(edrEntry);
}

protected void transitionToDeleting(EndpointDataReferenceEntry edrEntry) {
edrEntry.transitionToDeleting();
update(edrEntry);
}

private void update(EndpointDataReferenceEntry edrEntry) {
edrCache.update(edrEntry);
monitor.debug(format("Edr entry %s is now in state %s.", edrEntry.getId(), from(edrEntry.getState())));
Expand All @@ -158,6 +161,16 @@ private StateProcessorImpl<EndpointDataReferenceEntry> processEdrInState(Endpoin
return new StateProcessorImpl<>(() -> edrCache.nextNotLeased(batchSize, filter), telemetry.contextPropagationMiddleware(function));
}


private StateProcessorImpl<EndpointDataReferenceEntry> processDeletingEdr(Function<EndpointDataReferenceEntry, Boolean> function) {
var query = QuerySpec.Builder.newInstance()
.filter(hasState(DELETING.code()))
.limit(batchSize)
.build();

return new StateProcessorImpl<>(() -> edrCache.queryForEntries(query).collect(Collectors.toList()), telemetry.contextPropagationMiddleware(function));
}

private ContractRequest createContractRequest(NegotiateEdrRequest request) {
var callbacks = Stream.concat(request.getCallbackAddresses().stream(), Stream.of(LOCAL_CALLBACK)).collect(Collectors.toList());

Expand Down Expand Up @@ -189,23 +202,35 @@ private boolean processNegotiated(EndpointDataReferenceEntry edrEntry) {
}

private boolean processExpired(EndpointDataReferenceEntry edrEntry) {
if (shouldBeRemoved(edrEntry)) {
return entityRetryProcessFactory.doSyncProcess(edrEntry, () -> deleteEntry(edrEntry))
.onDelay(this::breakLease)
.onSuccess((n, result) -> {
})
.onFailure((n, throwable) -> transitionToExpired(n))
.onFatalError((n, failure) -> transitionToError(n, failure.getFailureDetail()))
.onRetryExhausted((n, failure) -> transitionToError(n, format("Failed delete EDR token: %s", failure.getFailureDetail())))
.execute("Start an EDR token deletion");
return entityRetryProcessFactory.doSimpleProcess(edrEntry, () -> checkExpiration(edrEntry))
.onDelay(this::breakLease)
.execute("Start EDR token deletion check");

}


private boolean processDeleting(EndpointDataReferenceEntry edrEntry) {
return entityRetryProcessFactory.doSyncProcess(edrEntry, () -> deleteEntry(edrEntry))
.onDelay(this::breakLease)
.onSuccess((n, result) -> {
})
.onFailure((n, throwable) -> transitionToDeleting(n))
.onFatalError((n, failure) -> transitionToError(n, failure.getFailureDetail()))
.onRetryExhausted((n, failure) -> transitionToError(n, format("Failed deleted EDR token: %s", failure.getFailureDetail())))
.execute("Start EDR token deletion");
}

private boolean checkExpiration(EndpointDataReferenceEntry entry) {
if (shouldBeRemoved(entry)) {
transitionToDeleting(entry);
return true;
} else {
breakLease(edrEntry);
breakLease(entry);
return false;
}
}

private StatusResult<Void> deleteEntry(EndpointDataReferenceEntry entry) {
this.transitionToDeleted(entry);
var result = edrCache.deleteByTransferProcessId(entry.getTransferProcessId());
if (result.succeeded()) {
monitor.debug(format("Deleted EDR cached entry for transfer process id %s", entry.getTransferProcessId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class SqlEndpointDataReferenceCache extends AbstractSqlStore implements E

private final SqlLeaseContextBuilder leaseContext;

private final String leaseHolder;


public SqlEndpointDataReferenceCache(DataSourceRegistry dataSourceRegistry, String dataSourceName,
TransactionContext transactionContext, EdrStatements statements,
Expand All @@ -67,6 +69,7 @@ public SqlEndpointDataReferenceCache(DataSourceRegistry dataSourceRegistry, Stri
this.statements = statements;
this.clock = clock;
this.vault = vault;
this.leaseHolder = connectorId;
leaseContext = SqlLeaseContextBuilder.with(transactionContext, connectorId, statements, clock, queryExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public void clearEdrCache() {
var edrCache = context.getService(EndpointDataReferenceCache.class);
edrCache.queryForEntries(QuerySpec.max()).forEach(entry -> {
try {
entry.transitionToDeleted();
edrCache.update(entry);
edrCache.deleteByTransferProcessId(entry.getTransferProcessId());
} catch (Exception e) {
context.getMonitor().warning("Failed to clean up the cache", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.tests.edr;

import jakarta.json.Json;
import okhttp3.mockwebserver.MockWebServer;
import org.assertj.core.api.Condition;
import org.eclipse.tractusx.edc.lifecycle.Participant;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;

import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.EXPIRED;
import static org.eclipse.tractusx.edc.helpers.PolicyHelperFunctions.businessPartnerNumberPolicy;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.platoConfiguration;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.sokratesConfiguration;

public abstract class AbstractDeleteEdrTest {

protected static final Participant SOKRATES = new Participant(SOKRATES_NAME, SOKRATES_BPN, sokratesConfiguration());
protected static final Participant PLATO = new Participant(PLATO_NAME, PLATO_BPN, platoConfiguration());
private static final Duration ASYNC_TIMEOUT = ofSeconds(45);
MockWebServer server;

@BeforeEach
void setup() {
server = new MockWebServer();
}

@Test
@DisplayName("Verify that expired EDR are deleted")
void negotiateEdr_shouldRemoveExpiredEdrs() throws IOException {

var assetId = UUID.randomUUID().toString();

var authCodeHeaderName = "test-authkey";
var authCode = "test-authcode";
PLATO.createAsset(assetId, Json.createObjectBuilder().build(), Json.createObjectBuilder()
.add(EDC_NAMESPACE + "type", "HttpData")
.add(EDC_NAMESPACE + "contentType", "application/json")
.add(EDC_NAMESPACE + "baseUrl", "http://test:8080")
.add(EDC_NAMESPACE + "authKey", authCodeHeaderName)
.add(EDC_NAMESPACE + "authCode", authCode)
.build());

PLATO.createPolicy(businessPartnerNumberPolicy("policy-1", SOKRATES.getBpn()));
PLATO.createPolicy(businessPartnerNumberPolicy("policy-2", SOKRATES.getBpn()));
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2");

var callbacks = Json.createArrayBuilder()
.build();

SOKRATES.negotiateEdr(PLATO, assetId, callbacks);

var expired = new ArrayList<String>();

await().atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
var localExpired = edrCaches.stream()
.filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name()))
.map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString())
.toList();
assertThat(localExpired).hasSizeGreaterThan(0);
expired.add(localExpired.get(0));
});

await().atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> expired.forEach((id) -> SOKRATES.getEdrRequest(id).statusCode(404)));

}

@AfterEach
void teardown() throws IOException {
server.shutdown();
}


private Condition<String> stateCondition(String value, String description) {
return new Condition<>(m -> m.equals(value), description);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.anyOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -55,7 +56,7 @@ public abstract class AbstractRenewalEdrTest {

protected static final Participant SOKRATES = new Participant(SOKRATES_NAME, SOKRATES_BPN, sokratesConfiguration());
protected static final Participant PLATO = new Participant(PLATO_NAME, PLATO_BPN, platoConfiguration());

private static final Duration ASYNC_TIMEOUT = ofSeconds(45);
MockWebServer server;

@BeforeEach
Expand Down Expand Up @@ -105,11 +106,12 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException {

JsonArrayBuilder edrCaches = Json.createArrayBuilder();

await().untilAsserted(() -> {
var localEdrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(localEdrCaches).hasSizeGreaterThan(1);
localEdrCaches.forEach(edrCaches::add);
});
await().atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var localEdrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(localEdrCaches).hasSizeGreaterThan(1);
localEdrCaches.forEach(edrCaches::add);
});


assertThat(edrCaches.build())
Expand All @@ -118,62 +120,6 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException {
.areAtLeast(1, stateCondition(EXPIRED.name(), "Expired"));
}

@Test
@DisplayName("Verify that expired EDR are deleted")
void negotiateEdr_shouldRemoveExpiredEdrs() throws IOException {

var expectedEvents = List.of(
createEvent(TransferProcessCompleted.class),
createEvent(TransferProcessCompleted.class));

var assetId = UUID.randomUUID().toString();
var url = server.url("/mock/api");
server.start();

var authCodeHeaderName = "test-authkey";
var authCode = "test-authcode";
PLATO.createAsset(assetId, Json.createObjectBuilder().build(), Json.createObjectBuilder()
.add(EDC_NAMESPACE + "type", "HttpData")
.add(EDC_NAMESPACE + "contentType", "application/json")
.add(EDC_NAMESPACE + "baseUrl", url.toString())
.add(EDC_NAMESPACE + "authKey", authCodeHeaderName)
.add(EDC_NAMESPACE + "authCode", authCode)
.build());

PLATO.createPolicy(businessPartnerNumberPolicy("policy-1", SOKRATES.getBpn()));
PLATO.createPolicy(businessPartnerNumberPolicy("policy-2", SOKRATES.getBpn()));
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2");

var callbacks = Json.createArrayBuilder()
.add(createCallback(url.toString(), true, Set.of("transfer.process.completed")))
.build();

expectedEvents.forEach(event -> server.enqueue(new MockResponse()));

SOKRATES.negotiateEdr(PLATO, assetId, callbacks);

var events = expectedEvents.stream()
.map(receivedEvent -> waitForEvent(server, receivedEvent))
.collect(Collectors.toList());

assertThat(expectedEvents).usingRecursiveFieldByFieldElementComparator().containsAll(events);


var expired = new ArrayList<String>();

await().untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
var localExpired = edrCaches.stream()
.filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name()))
.map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString())
.toList();
assertThat(localExpired).hasSizeGreaterThan(0);
expired.addAll(localExpired);
});

await().untilAsserted(() -> expired.forEach((id) -> SOKRATES.getEdrRequest(id).statusCode(404)));

}

@AfterEach
void teardown() throws IOException {
Expand All @@ -184,5 +130,5 @@ void teardown() throws IOException {
private Condition<String> stateCondition(String value, String description) {
return new Condition<>(m -> m.equals(value), description);
}

}
Loading

0 comments on commit 79a582c

Please sign in to comment.