Skip to content

Commit

Permalink
fix after review
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Jul 19, 2023
1 parent 8194e61 commit ac35761
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class InMemoryEndpointDataReferenceCache implements EndpointDataReference
private final LockManager lockManager;
private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter();


private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;
Expand Down Expand Up @@ -148,10 +149,13 @@ public void update(EndpointDataReferenceEntry entry) {
}

@Override
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id) {
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id, boolean allowSelf) {
return lockManager.writeLock(() -> {
if (isLeased(id)) {
throw new IllegalStateException("EndpointDataReferenceEntry is leased and cannot be deleted!");
// if the lockId it's not leasing it throw an exception
if (!allowSelf || !isLeasedBy(id, lockId)) {
throw new IllegalStateException("EndpointDataReferenceEntry is leased and cannot be deleted!");
}
}
var edr = edrsByTransferProcessId.remove(id);
if (edr == null) {
Expand All @@ -174,7 +178,7 @@ public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String
}


public @NotNull List<EndpointDataReferenceEntry> leaseAndGet(int max, Criterion... criteria) {
private @NotNull List<EndpointDataReferenceEntry> leaseAndGet(int max, Criterion... criteria) {
return lockManager.writeLock(() -> {
var filterPredicate = Arrays.stream(criteria).map(predicateConverter::convert).reduce(x -> true, Predicate::and);
var entities = entriesByEdrId.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ protected void transitionToExpired(EndpointDataReferenceEntry edrEntry) {
update(edrEntry);
}

protected void transitionToDeleted(EndpointDataReferenceEntry edrEntry) {
edrEntry.transitionToDeleted();
update(edrEntry);
}

protected void transitionToError(EndpointDataReferenceEntry edrEntry, String message) {
edrEntry.setErrorDetail(message);
edrEntry.transitionError();
Expand Down Expand Up @@ -205,8 +200,7 @@ private boolean processExpired(EndpointDataReferenceEntry edrEntry) {
}

private StatusResult<Void> deleteEntry(EndpointDataReferenceEntry entry) {
this.transitionToDeleted(entry);
var result = edrCache.deleteByTransferProcessId(entry.getTransferProcessId());
var result = edrCache.deleteByTransferProcessId(entry.getTransferProcessId(), true);
if (result.succeeded()) {
monitor.debug(format("Deleted EDR cached entry for transfer process id %s", entry.getTransferProcessId()));
return StatusResult.success();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ void initial_shouldDeleteTheEntry_whenTheRetentionPeriodIsOver() {
.thenReturn(List.of(edrEntry))
.thenReturn(emptyList());

when(edrCache.deleteByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(edrCache.deleteByTransferProcessId(edrEntry.getTransferProcessId(), true)).thenReturn(StoreResult.success(edrEntry));

edrManager.start();

await().untilAsserted(() -> {
verify(edrCache, times(1)).deleteByTransferProcessId(edrEntry.getTransferProcessId());
verify(edrCache, times(1)).deleteByTransferProcessId(edrEntry.getTransferProcessId(), true);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class SqlEndpointDataReferenceCache extends AbstractSqlStore implements E

private final SqlLeaseContextBuilder leaseContext;

private final String leaseHolder;


public SqlEndpointDataReferenceCache(DataSourceRegistry dataSourceRegistry, String dataSourceName,
TransactionContext transactionContext, EdrStatements statements,
Expand All @@ -67,6 +69,7 @@ public SqlEndpointDataReferenceCache(DataSourceRegistry dataSourceRegistry, Stri
this.statements = statements;
this.clock = clock;
this.vault = vault;
this.leaseHolder = connectorId;
leaseContext = SqlLeaseContextBuilder.with(transactionContext, connectorId, statements, clock, queryExecutor);
}

Expand Down Expand Up @@ -162,12 +165,17 @@ public void update(EndpointDataReferenceEntry entry) {
}

@Override
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id) {
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id, boolean allowSelf) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
var entryWrapper = findById(connection, id, this::mapToWrapper);
if (entryWrapper != null) {
leaseContext.withConnection(connection).acquireLease(id);
var lease = leaseContext.withConnection(connection).getLease(id);
if (lease == null) {
leaseContext.withConnection(connection).acquireLease(id);
} else if (!allowSelf || !lease.getLeasedBy().equals(leaseHolder)) {
throw new IllegalStateException("Entity is currently leased!");
}
queryExecutor.execute(connection, statements.getDeleteByIdTemplate(), id);
leaseContext.withConnection(connection).breakLease(id);
vault.deleteSecret(VAULT_PREFIX + entryWrapper.getEdrId()).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ public void clearEdrCache() {
var edrCache = context.getService(EndpointDataReferenceCache.class);
edrCache.queryForEntries(QuerySpec.max()).forEach(entry -> {
try {
entry.transitionToDeleted();
edrCache.update(entry);
edrCache.deleteByTransferProcessId(entry.getTransferProcessId());
edrCache.deleteByTransferProcessId(entry.getTransferProcessId(), true);
} catch (Exception e) {
context.getMonitor().warning("Failed to clean up the cache", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.anyOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -55,7 +57,7 @@ public abstract class AbstractRenewalEdrTest {

protected static final Participant SOKRATES = new Participant(SOKRATES_NAME, SOKRATES_BPN, sokratesConfiguration());
protected static final Participant PLATO = new Participant(PLATO_NAME, PLATO_BPN, platoConfiguration());

private static final Duration ASYNC_TIMEOUT = ofSeconds(45);
MockWebServer server;

@BeforeEach
Expand Down Expand Up @@ -105,11 +107,12 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException {

JsonArrayBuilder edrCaches = Json.createArrayBuilder();

await().untilAsserted(() -> {
var localEdrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(localEdrCaches).hasSizeGreaterThan(1);
localEdrCaches.forEach(edrCaches::add);
});
await().atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var localEdrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(localEdrCaches).hasSizeGreaterThan(1);
localEdrCaches.forEach(edrCaches::add);
});


assertThat(edrCaches.build())
Expand Down Expand Up @@ -161,17 +164,19 @@ void negotiateEdr_shouldRemoveExpiredEdrs() throws IOException {

var expired = new ArrayList<String>();

await().untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
var localExpired = edrCaches.stream()
.filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name()))
.map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString())
.toList();
assertThat(localExpired).hasSizeGreaterThan(0);
expired.addAll(localExpired);
});
await().atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
var localExpired = edrCaches.stream()
.filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name()))
.map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString())
.toList();
assertThat(localExpired).hasSizeGreaterThan(0);
expired.add(localExpired.get(0));
});

await().untilAsserted(() -> expired.forEach((id) -> SOKRATES.getEdrRequest(id).statusCode(404)));
await().atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> expired.forEach((id) -> SOKRATES.getEdrRequest(id).statusCode(404)));

}

Expand All @@ -184,5 +189,5 @@ void teardown() throws IOException {
private Condition<String> stateCondition(String value, String description) {
return new Condition<>(m -> m.equals(value), description);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class TestFunctions {
public static Map<String, String> renewalConfiguration(Map<String, String> config) {
var ssiConfiguration = new HashMap<String, String>() {
{
put("edc.edr.state-machine.expiring-duration", "4");
put("edc.edr.state-machine.expired-retention", "1");
put("edc.transfer.proxy.token.validity.seconds", "4");
put("edc.edr.state-machine.expiring-duration", "10");
put("edc.edr.state-machine.expired-retention", "5");
put("edc.transfer.proxy.token.validity.seconds", "15");
}
};
ssiConfiguration.putAll(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ default boolean filterActive(EndpointDataReferenceEntry entry) {
/**
* Deletes stored endpoint reference data associated with the given transfer process.
*/
StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id);
default StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id) {
return deleteByTransferProcessId(id, false);
}

/**
* Deletes stored endpoint reference data associated with the given transfer process.
*/
StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id, boolean allowSelfLeased);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static java.util.Objects.requireNonNull;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.TX_NAMESPACE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.DELETED;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.ERROR;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.EXPIRED;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.NEGOTIATED;
Expand Down Expand Up @@ -138,10 +137,6 @@ public void transitionToExpired() {
transition(EXPIRED, EXPIRED, NEGOTIATED, REFRESHING);
}

public void transitionToDeleted() {
transition(DELETED, DELETED, EXPIRED, REFRESHING, NEGOTIATED);
}


private void transition(EndpointDataReferenceEntryStates end, Predicate<EndpointDataReferenceEntryStates> canTransitTo) {
if (!canTransitTo.test(EndpointDataReferenceEntryStates.from(state))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ public enum EndpointDataReferenceEntryStates {
REFRESHING(100),

EXPIRED(200),
ERROR(300),

DELETED(400);


ERROR(300);

private final int code;

EndpointDataReferenceEntryStates(int code) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,16 @@ void delete_isLeasedBySelf_shouldThrowException() {
assertThatThrownBy(() -> getStore().deleteByTransferProcessId(entry.getTransferProcessId())).isInstanceOf(IllegalStateException.class);
}

@Test
void delete_isLeasedBySelf_shouldNotThrowException_whenForced() {
var entry = edrEntry("assetId", "agreementId", "tpId");
getStore().save(entry, edr("edrId"));

lockEntity(entry.getId(), CONNECTOR_NAME);

getStore().deleteByTransferProcessId(entry.getTransferProcessId(), true);
}

@Test
void delete_isLeasedByOther_shouldThrowException() {
var entry = edrEntry("assetId", "agreementId", "tpId");
Expand All @@ -370,6 +380,16 @@ void delete_isLeasedByOther_shouldThrowException() {
assertThatThrownBy(() -> getStore().deleteByTransferProcessId(entry.getTransferProcessId())).isInstanceOf(IllegalStateException.class);
}

@Test
void delete_isLeasedByOther_shouldThrowException_WhenForced() {
var entry = edrEntry("assetId", "agreementId", "tpId");
getStore().save(entry, edr("edrId"));

lockEntity(entry.getId(), "someone-else");

assertThatThrownBy(() -> getStore().deleteByTransferProcessId(entry.getTransferProcessId(), true)).isInstanceOf(IllegalStateException.class);
}

protected abstract EndpointDataReferenceCache getStore();

protected abstract void lockEntity(String negotiationId, String owner, Duration duration);
Expand Down

0 comments on commit ac35761

Please sign in to comment.