Skip to content

Commit

Permalink
feat(EDR): adds EDR state machine for handling EDR renewal (#620)
Browse files Browse the repository at this point in the history
* feat(EDR): adds EDR state machine for handling EDR renewal

* add testcontainers pg for E2E tests

* fix after review

* fix depedencies file
  • Loading branch information
wolf4ood authored Jul 20, 2023
1 parent 4110d5e commit 313b90b
Show file tree
Hide file tree
Showing 66 changed files with 2,842 additions and 413 deletions.
11 changes: 7 additions & 4 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ maven/mavencentral/com.azure/azure-core-http-netty/1.13.5, MIT AND Apache-2.0, a
maven/mavencentral/com.azure/azure-core/1.39.0, MIT, approved, clearlydefined
maven/mavencentral/com.azure/azure-core/1.40.0, MIT, approved, clearlydefined
maven/mavencentral/com.azure/azure-core/1.41.0, MIT AND Apache-2.0, approved, #9648
maven/mavencentral/com.azure/azure-identity/1.9.0, MIT, approved, clearlydefined
maven/mavencentral/com.azure/azure-identity/1.9.2, , restricted, clearlydefined
maven/mavencentral/com.azure/azure-identity/1.9.0, MIT AND Apache-2.0, approved, #9686
maven/mavencentral/com.azure/azure-identity/1.9.2, MIT AND Apache-2.0, approved, #9686
maven/mavencentral/com.azure/azure-json/1.0.1, MIT AND Apache-2.0, approved, #7933
maven/mavencentral/com.azure/azure-security-keyvault-secrets/4.6.2, MIT, approved, #7940
maven/mavencentral/com.azure/azure-security-keyvault-secrets/4.6.3, MIT, approved, #7940
Expand Down Expand Up @@ -162,8 +162,8 @@ maven/mavencentral/io.netty/netty-transport/4.1.94.Final, Apache-2.0 AND BSD-3-C
maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentation-annotations/1.27.0, Apache-2.0, approved, #9270
maven/mavencentral/io.opentelemetry/opentelemetry-api/1.27.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.opentelemetry/opentelemetry-context/1.27.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.28, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.33, , restricted, clearlydefined
maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.28, Apache-2.0, approved, #9687
maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.33, Apache-2.0, approved, #9687
maven/mavencentral/io.projectreactor.netty/reactor-netty-http/1.0.28, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.projectreactor.netty/reactor-netty-http/1.0.33, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.projectreactor/reactor-core/3.4.27, Apache-2.0, approved, #7517
Expand Down Expand Up @@ -444,7 +444,10 @@ maven/mavencentral/org.slf4j/slf4j-api/1.7.36, MIT, approved, CQ13368
maven/mavencentral/org.slf4j/slf4j-api/1.7.7, MIT, approved, CQ9827
maven/mavencentral/org.slf4j/slf4j-api/2.0.5, MIT, approved, #5915
maven/mavencentral/org.slf4j/slf4j-api/2.0.7, MIT, approved, #5915
maven/mavencentral/org.testcontainers/database-commons/1.18.3, MIT, approved, clearlydefined
maven/mavencentral/org.testcontainers/jdbc/1.18.3, MIT, approved, clearlydefined
maven/mavencentral/org.testcontainers/junit-jupiter/1.18.3, MIT, approved, #7941
maven/mavencentral/org.testcontainers/postgresql/1.18.3, MIT, approved, #9332
maven/mavencentral/org.testcontainers/testcontainers/1.18.3, MIT, approved, #7938
maven/mavencentral/org.testcontainers/vault/1.18.3, MIT, approved, #7927
maven/mavencentral/org.yaml/snakeyaml/1.33, Apache-2.0, approved, clearlydefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ protected Object property(String key, Object object) {
case "assetId" -> entry.getAssetId();
case "agreementId" -> entry.getAgreementId();
case "providerId" -> entry.getProviderId();
case "state" -> entry.getState();
default -> null;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
Expand All @@ -24,18 +26,22 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.Comparator.comparingLong;
import static java.util.stream.Collectors.toList;
import static org.eclipse.edc.spi.result.StoreResult.notFound;
import static org.eclipse.edc.spi.result.StoreResult.success;
Expand All @@ -44,45 +50,71 @@
* An in-memory, threadsafe implementation of the cache.
*/
public class InMemoryEndpointDataReferenceCache implements EndpointDataReferenceCache {
private static final long DEFAULT_LEASE_TIME_MILLIS = 60_000;
private final LockManager lockManager;

private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter();


private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;

private final Map<String, EndpointDataReference> edrsByTransferProcessId;
private final String lockId;

private final Map<String, Lease> leases;

private final Clock clock;

public InMemoryEndpointDataReferenceCache() {
this(UUID.randomUUID().toString(), Clock.systemUTC(), new ConcurrentHashMap<>());
}

public InMemoryEndpointDataReferenceCache(String lockId, Clock clock, Map<String, Lease> leases) {
this.lockId = lockId;
lockManager = new LockManager(new ReentrantReadWriteLock());
entriesByAssetId = new HashMap<>();
entriesByEdrId = new ConcurrentHashMap<>();
edrsByTransferProcessId = new HashMap<>();
this.leases = leases;
this.clock = clock;
}

@Override
public @Nullable EndpointDataReference resolveReference(String transferProcessId) {
return lockManager.readLock(() -> edrsByTransferProcessId.get(transferProcessId));
}

@Override
public @Nullable EndpointDataReferenceEntry findByTransferProcessId(String transferProcessId) {
return lockManager.readLock(() -> {
var edr = edrsByTransferProcessId.get(transferProcessId);
return entriesByEdrId.get(edr.getId());
});
}

@Override
@NotNull
public List<EndpointDataReference> referencesForAsset(String assetId, String providerId) {
var entries = entriesByAssetId.get(assetId);
return lockManager.readLock(() -> {
var entries = entriesByAssetId.get(assetId);

Predicate<EndpointDataReferenceEntry> providerIdFilter = (cached) ->
Optional.ofNullable(providerId)
.map(id -> id.equals(cached.getProviderId()))
.orElse(true);
Predicate<EndpointDataReferenceEntry> providerIdFilter = (cached) ->
Optional.ofNullable(providerId)
.map(id -> id.equals(cached.getProviderId()))
.orElse(true);

if (entries == null) {
return emptyList();
}
return entries.stream()
.filter(providerIdFilter)
.map(e -> resolveReference(e.getTransferProcessId()))
.filter(Objects::nonNull)
.collect(toList());
if (entries == null) {
return emptyList();
}
return entries.stream()
.filter(providerIdFilter)
.filter(this::filterActive)
.map(e -> resolveReference(e.getTransferProcessId()))
.filter(Objects::nonNull)
.collect(toList());

});
}

@Override
Expand All @@ -102,9 +134,26 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
});
}

@Override
public void update(EndpointDataReferenceEntry entry) {
lockManager.writeLock(() -> {
acquireLease(entry.getTransferProcessId(), lockId);
var edr = edrsByTransferProcessId.get(entry.getTransferProcessId());
entriesByEdrId.put(edr.getId(), entry);
var list = entriesByAssetId.computeIfAbsent(entry.getAssetId(), k -> new ArrayList<>());
list.removeIf((edrEntry) -> edrEntry.getTransferProcessId().equals(entry.getTransferProcessId()));
list.add(entry);
freeLease(entry.getTransferProcessId());
return null;
});
}

@Override
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id) {
return lockManager.writeLock(() -> {
if (isLeased(id)) {
throw new IllegalStateException("EndpointDataReferenceEntry is leased and cannot be deleted!");
}
var edr = edrsByTransferProcessId.remove(id);
if (edr == null) {
return notFound("EDR entry not found for id: " + id);
Expand All @@ -120,12 +169,55 @@ public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String
});
}

@Override
public @NotNull List<EndpointDataReferenceEntry> nextNotLeased(int max, Criterion... criteria) {
return leaseAndGet(max, criteria);
}


private @NotNull List<EndpointDataReferenceEntry> leaseAndGet(int max, Criterion... criteria) {
return lockManager.writeLock(() -> {
var filterPredicate = Arrays.stream(criteria).map(predicateConverter::convert).reduce(x -> true, Predicate::and);
var entities = entriesByEdrId.values().stream()
.filter(filterPredicate)
.filter(e -> !isLeased(e.getId()))
.sorted(comparingLong(StatefulEntity::getStateTimestamp)) //order by state timestamp, oldest first
.limit(max)
.toList();
entities.forEach(i -> acquireLease(i.getId(), lockId));
return entities.stream().map(StatefulEntity::copy).collect(toList());
});
}

private Stream<EndpointDataReferenceEntry> filterBy(List<Criterion> criteria) {
var predicate = criteria.stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and);
return lockManager.readLock(() -> {
var predicate = criteria.stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and);

return entriesByEdrId.values().stream()
.filter(predicate);
});

}

private void freeLease(String id) {
leases.remove(id);
}

private void acquireLease(String id, String lockId) {
if (!isLeased(id) || isLeasedBy(id, lockId)) {
leases.put(id, new Lease(lockId, clock.millis(), DEFAULT_LEASE_TIME_MILLIS));
} else {
throw new IllegalStateException("Cannot acquire lease, is already leased by someone else!");
}
}

private boolean isLeased(String id) {
return leases.containsKey(id) && !leases.get(id).isExpired(clock.millis());
}

return entriesByEdrId.values().stream()
.filter(predicate);
private boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,42 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheTestBase;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.BeforeEach;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest {
private final InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache();
import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheTestBase {
private final HashMap<String, Lease> leases = new HashMap<>();
private InMemoryEndpointDataReferenceCache cache;

@BeforeEach
void setUp() {
cache = new InMemoryEndpointDataReferenceCache(CONNECTOR_NAME, Clock.systemUTC(), leases);
}

@Override
protected EndpointDataReferenceCache getStore() {
return cache;
}

@Override
protected void lockEntity(String negotiationId, String owner, Duration duration) {
leases.put(negotiationId, new Lease(owner, Clock.systemUTC().millis(), duration.toMillis()));
}

@Override
protected boolean isLockedBy(String negotiationId, String owner) {
return leases.entrySet().stream().anyMatch(e -> e.getKey().equals(negotiationId) &&
e.getValue().getLeasedBy().equals(owner) &&
!isExpired(e.getValue()));
}

private boolean isExpired(Lease e) {
return e.getLeasedAt() + e.getLeaseDuration() < Clock.systemUTC().millis();
}
}
26 changes: 26 additions & 0 deletions core/edr-core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# EDR core extension

This extension provide a base implementation of `EdrManager` and `EdrService` both
required for interacting with the EDR APIs and state machine

The EDR state machine handle the lifecycle of a negotiated EDR. The negotiation request can be submitted
via EDR APIs, and it will go through two phases:

- Contract Negotiation
- Transfer Request

Once the latter has completed the EDR entry will be saved with the associated EDR in the primordial state `NEGOTIATED`
The state machine will also manage the lifecycle and the renewal of the `EDR`. If a token is about to expire it will
transition to the `REFRESHING` state and fire off another transfer process with the same parameter of the expiring
one. Once completed the new `EDR` will be cached and the old ones, with same `assetId` and `agreementId` will transition
into the `EXPIRED` state. Then the state machine will also monitor the `EXPIRED` ones, and will delete them according to the
retention configuration.

## 1. EDR state machine Configuration

| Key | Description | Mandatory | Default |
|:--------------------------------------------|:----------------------------------------------------------------------------------------------------|-----------|---------|
| edc.edr.state-machine.iteration-wait-millis | The iteration wait time in milliseconds in the edr state machine | | 1000 |
| edc.edr.state-machine.batch-size | The batch size in the edr negotiation state machine | | 20 |
| edc.edr.state-machine.expiring-duration | The minimum duration on which the EDR token can be eligible for renewal (seconds) | | 60 |
| edc.edr.state-machine.expired-retention | The minimum duration on with the EDR token can be eligible for deletion when it's expired (seconds) | | 60 |
6 changes: 5 additions & 1 deletion core/edr-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ dependencies {
implementation(libs.edc.spi.aggregateservices)
implementation(libs.edc.spi.contract)
implementation(libs.edc.spi.controlplane)
implementation(libs.edc.statemachine)

implementation(project(":spi:edr-spi"))

implementation(project(":spi:core-spi"))


testImplementation(libs.edc.junit)
testImplementation(libs.awaitility)
testImplementation(testFixtures(project(":spi:edr-spi")))

}
Expand Down
Loading

0 comments on commit 313b90b

Please sign in to comment.