Skip to content

Commit

Permalink
feat(EDR): adds EDR state machine for handling EDR renewal
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Jul 18, 2023
1 parent 5adb26d commit ecbb766
Show file tree
Hide file tree
Showing 54 changed files with 2,313 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ protected Object property(String key, Object object) {
case "assetId" -> entry.getAssetId();
case "agreementId" -> entry.getAgreementId();
case "providerId" -> entry.getProviderId();
case "state" -> entry.getState();
default -> null;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
Expand All @@ -24,18 +26,22 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.Comparator.comparingLong;
import static java.util.stream.Collectors.toList;
import static org.eclipse.edc.spi.result.StoreResult.notFound;
import static org.eclipse.edc.spi.result.StoreResult.success;
Expand All @@ -44,45 +50,70 @@
* An in-memory, threadsafe implementation of the cache.
*/
public class InMemoryEndpointDataReferenceCache implements EndpointDataReferenceCache {
private static final long DEFAULT_LEASE_TIME_MILLIS = 60_000;
private final LockManager lockManager;

private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter();

private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;

private final Map<String, EndpointDataReference> edrsByTransferProcessId;
private final String lockId;

private final Map<String, Lease> leases;

private final Clock clock;

public InMemoryEndpointDataReferenceCache() {
this(UUID.randomUUID().toString(), Clock.systemUTC(), new ConcurrentHashMap<>());
}

public InMemoryEndpointDataReferenceCache(String lockId, Clock clock, Map<String, Lease> leases) {
this.lockId = lockId;
lockManager = new LockManager(new ReentrantReadWriteLock());
entriesByAssetId = new HashMap<>();
entriesByEdrId = new ConcurrentHashMap<>();
edrsByTransferProcessId = new HashMap<>();
this.leases = leases;
this.clock = clock;
}

@Override
public @Nullable EndpointDataReference resolveReference(String transferProcessId) {
return lockManager.readLock(() -> edrsByTransferProcessId.get(transferProcessId));
}

@Override
public @Nullable EndpointDataReferenceEntry findByTransferProcessId(String transferProcessId) {
return lockManager.readLock(() -> {
var edr = edrsByTransferProcessId.get(transferProcessId);
return entriesByEdrId.get(edr.getId());
});
}

@Override
@NotNull
public List<EndpointDataReference> referencesForAsset(String assetId, String providerId) {
var entries = entriesByAssetId.get(assetId);
return lockManager.readLock(() -> {
var entries = entriesByAssetId.get(assetId);

Predicate<EndpointDataReferenceEntry> providerIdFilter = (cached) ->
Optional.ofNullable(providerId)
.map(id -> id.equals(cached.getProviderId()))
.orElse(true);
Predicate<EndpointDataReferenceEntry> providerIdFilter = (cached) ->
Optional.ofNullable(providerId)
.map(id -> id.equals(cached.getProviderId()))
.orElse(true);

if (entries == null) {
return emptyList();
}
return entries.stream()
.filter(providerIdFilter)
.map(e -> resolveReference(e.getTransferProcessId()))
.filter(Objects::nonNull)
.collect(toList());
if (entries == null) {
return emptyList();
}
return entries.stream()
.filter(providerIdFilter)
.filter(this::filterActive)
.map(e -> resolveReference(e.getTransferProcessId()))
.filter(Objects::nonNull)
.collect(toList());

});
}

@Override
Expand All @@ -102,9 +133,26 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
});
}

@Override
public void update(EndpointDataReferenceEntry entry) {
lockManager.writeLock(() -> {
acquireLease(entry.getTransferProcessId(), lockId);
var edr = edrsByTransferProcessId.get(entry.getTransferProcessId());
entriesByEdrId.put(edr.getId(), entry);
var list = entriesByAssetId.computeIfAbsent(entry.getAssetId(), k -> new ArrayList<>());
list.removeIf((edrEntry) -> edrEntry.getTransferProcessId().equals(entry.getTransferProcessId()));
list.add(entry);
freeLease(entry.getTransferProcessId());
return null;
});
}

@Override
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id) {
return lockManager.writeLock(() -> {
if (isLeased(id)) {
throw new IllegalStateException("EndpointDataReferenceEntry is leased and cannot be deleted!");
}
var edr = edrsByTransferProcessId.remove(id);
if (edr == null) {
return notFound("EDR entry not found for id: " + id);
Expand All @@ -120,12 +168,55 @@ public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String
});
}

@Override
public @NotNull List<EndpointDataReferenceEntry> nextNotLeased(int max, Criterion... criteria) {
return leaseAndGet(max, criteria);
}


public @NotNull List<EndpointDataReferenceEntry> leaseAndGet(int max, Criterion... criteria) {
return lockManager.writeLock(() -> {
var filterPredicate = Arrays.stream(criteria).map(predicateConverter::convert).reduce(x -> true, Predicate::and);
var entities = entriesByEdrId.values().stream()
.filter(filterPredicate)
.filter(e -> !isLeased(e.getId()))
.sorted(comparingLong(StatefulEntity::getStateTimestamp)) //order by state timestamp, oldest first
.limit(max)
.toList();
entities.forEach(i -> acquireLease(i.getId(), lockId));
return entities.stream().map(StatefulEntity::copy).collect(toList());
});
}

private Stream<EndpointDataReferenceEntry> filterBy(List<Criterion> criteria) {
var predicate = criteria.stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and);
return lockManager.readLock(() -> {
var predicate = criteria.stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and);

return entriesByEdrId.values().stream()
.filter(predicate);
});

}

private void freeLease(String id) {
leases.remove(id);
}

private void acquireLease(String id, String lockId) {
if (!isLeased(id) || isLeasedBy(id, lockId)) {
leases.put(id, new Lease(lockId, clock.millis(), DEFAULT_LEASE_TIME_MILLIS));
} else {
throw new IllegalStateException("Cannot acquire lease, is already leased by someone else!");
}
}

private boolean isLeased(String id) {
return leases.containsKey(id) && !leases.get(id).isExpired(clock.millis());
}

return entriesByEdrId.values().stream()
.filter(predicate);
private boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,42 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.BeforeEach;

import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest {
private final InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache();
private final HashMap<String, Lease> leases = new HashMap<>();
private InMemoryEndpointDataReferenceCache cache;

@BeforeEach
void setUp() {
cache = new InMemoryEndpointDataReferenceCache(CONNECTOR_NAME, Clock.systemUTC(), leases);
}

@Override
protected EndpointDataReferenceCache getStore() {
return cache;
}

@Override
protected void lockEntity(String negotiationId, String owner, Duration duration) {
leases.put(negotiationId, new Lease(owner, Clock.systemUTC().millis(), duration.toMillis()));
}

@Override
protected boolean isLockedBy(String negotiationId, String owner) {
return leases.entrySet().stream().anyMatch(e -> e.getKey().equals(negotiationId) &&
e.getValue().getLeasedBy().equals(owner) &&
!isExpired(e.getValue()));
}

private boolean isExpired(Lease e) {
return e.getLeasedAt() + e.getLeaseDuration() < Clock.systemUTC().millis();
}
}
26 changes: 26 additions & 0 deletions core/edr-core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# EDR core extension

This extension provide a base implementation of `EdrManager` and `EdrService` both
required for interacting with the EDR APIs and state machine

The EDR state machine handle the lifecycle of a negotiated EDR. The negotiation request can be submitted
via EDR APIs, and it will go through two phases:

- Contract Negotiation
- Transfer Request

Once the latter has completed the EDR entry will be saved with the associated EDR in the primordial state `NEGOTIATED`
The state machine will also manage the lifecycle and the renewal of the `EDR`. If a token is about to expire it will
transition to the `REFRESHING` state and fire off another transfer process with the same parameter of the expiring
one. Once completed the new `EDR` will be cached and the old ones, with same `assetId` and `agreementId` will transition
into the `EXPIRED` state. Then the state machine will also monitor the `EXPIRED` ones, and will delete them according to the
retention configuration.

## 1. EDR state machine Configuration

| Key | Description | Mandatory | Default |
|:--------------------------------------------|:----------------------------------------------------------------------------------------------------|-----------|---------|
| edc.edr.state-machine.iteration-wait-millis | The iteration wait time in milliseconds in the edr state machine | | 1000 |
| edc.edr.state-machine.batch-size | The batch size in the edr negotiation state machine | | 20 |
| edc.edr.state-machine.expiring-duration | The minimum duration on which the EDR token can be eligible for renewal (seconds) | | 60 |
| edc.edr.state-machine.expired-retention | The minimum duration on with the EDR token can be eligible for deletion when it's expired (seconds) | | 60 |
6 changes: 5 additions & 1 deletion core/edr-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ dependencies {
implementation(libs.edc.spi.aggregateservices)
implementation(libs.edc.spi.contract)
implementation(libs.edc.spi.controlplane)
implementation(libs.edc.statemachine)

implementation(project(":spi:edr-spi"))

implementation(project(":spi:core-spi"))


testImplementation(libs.edc.junit)
testImplementation(libs.awaitility)
testImplementation(testFixtures(project(":spi:edr-spi")))

}
Expand Down
Loading

0 comments on commit ecbb766

Please sign in to comment.