Skip to content

Commit

Permalink
feat: PolicyMonitorStore SQL implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Oct 4, 2023
1 parent 00e50d2 commit 5e0856e
Show file tree
Hide file tree
Showing 45 changed files with 1,175 additions and 425 deletions.
2 changes: 1 addition & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ maven/mavencentral/org.apache.commons/commons-digester3/3.2, Apache-2.0, approve
maven/mavencentral/org.apache.commons/commons-lang3/3.10, Apache-2.0, approved, clearlydefined
maven/mavencentral/org.apache.commons/commons-lang3/3.11, Apache-2.0, approved, CQ22642
maven/mavencentral/org.apache.commons/commons-lang3/3.12.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/org.apache.commons/commons-pool2/2.11.1, Apache-2.0, approved, CQ23795
maven/mavencentral/org.apache.commons/commons-pool2/2.12.0, Apache-2.0 AND LicenseRef-Public-Domain, approved, #10843
maven/mavencentral/org.apache.commons/commons-text/1.10.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/org.apache.groovy/groovy-bom/4.0.11, Apache-2.0, approved, #9266
maven/mavencentral/org.apache.groovy/groovy-json/4.0.11, Apache-2.0, approved, #7411
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.persistence.StateEntityStore;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.CriterionToPredicateConverter;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.util.concurrency.LockManager;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,49 +45,33 @@
* An in-memory, threadsafe entity store for a {@link StatefulEntity}. This implementation is intended for testing
* purposes only.
*/
public class InMemoryStatefulEntityStore<T extends StatefulEntity<T>> {
private static final long DEFAULT_LEASE_TIME_MILLIS = 60_000;
public class InMemoryStatefulEntityStore<T extends StatefulEntity<T>> implements StateEntityStore<T> {
private static final Duration DEFAULT_LEASE_TIME = Duration.ofSeconds(60);
private final Map<String, T> entitiesById = new ConcurrentHashMap<>();
private final QueryResolver<T> queryResolver;
private final LockManager lockManager = new LockManager(new ReentrantReadWriteLock());
private final String lockId;
private final Clock clock;
private final Map<String, Lease> leases;
private final Map<String, Lease> leases = new HashMap<>();
private final CriterionToPredicateConverter criterionConverter = new CriterionToPredicateConverterImpl();

public InMemoryStatefulEntityStore(Class<T> clazz, String lockId, Clock clock, Map<String, Lease> leases) {
public InMemoryStatefulEntityStore(Class<T> clazz, String lockId, Clock clock) {
queryResolver = new ReflectionBasedQueryResolver<>(clazz);
this.lockId = lockId;
this.clock = clock;
this.leases = leases;
}

public T find(String id) {
@Override
public @Nullable T findById(String id) {
var t = entitiesById.get(id);
if (t == null) {
return null;
}
return t.copy();
}

public void upsert(T entity) {
acquireLease(entity.getId(), lockId);
entitiesById.put(entity.getId(), entity.copy());
freeLease(entity.getId());
}

public void delete(String id) {
if (isLeased(id)) {
throw new IllegalStateException("Entity is leased and cannot be deleted!");
}
entitiesById.remove(id);
}

public Stream<T> findAll(QuerySpec querySpec) {
return queryResolver.query(findAll(), querySpec);
}

public @NotNull List<T> leaseAndGet(int max, Criterion... criteria) {
@Override
public @NotNull List<T> nextNotLeased(int max, Criterion... criteria) {
return lockManager.writeLock(() -> {
var filterPredicate = Arrays.stream(criteria).map(criterionConverter::convert).reduce(x -> true, Predicate::and);
var entities = entitiesById.values().stream()
Expand All @@ -92,48 +80,72 @@ public Stream<T> findAll(QuerySpec querySpec) {
.sorted(comparingLong(StatefulEntity::getStateTimestamp)) //order by state timestamp, oldest first
.limit(max)
.toList();
entities.forEach(i -> acquireLease(i.getId(), lockId));
entities.forEach(i -> acquireLease(i.getId()));
return entities.stream().map(StatefulEntity::copy).collect(toList());
});
}

public StoreResult<T> leaseAndGet(String id) {
@Override
public StoreResult<T> findByIdAndLease(String id) {
return lockManager.writeLock(() -> {
var entity = entitiesById.get(id);
if (entity == null) {
return StoreResult.notFound(format("Entity %s not found", id));
}

try {
acquireLease(id, lockId);
acquireLease(id);
return StoreResult.success(entity);
} catch (IllegalStateException e) {
return StoreResult.alreadyLeased(format("Entity %s is already leased: %s", id, e.getMessage()));
}
});
}

public Stream<T> findAll() {
return entitiesById.values().stream();
@Override
public void save(T entity) {
acquireLease(entity.getId());
entitiesById.put(entity.getId(), entity.copy());
freeLease(entity.getId());
}

private void freeLease(String id) {
leases.remove(id);
public void delete(String id) {
if (isLeased(id)) {
throw new IllegalStateException("Entity is leased and cannot be deleted!");
}
entitiesById.remove(id);
}

public Stream<T> findAll(QuerySpec querySpec) {
return queryResolver.query(findAll(), querySpec);
}

public Stream<T> findAll() {
return entitiesById.values().stream();
}

private void acquireLease(String id, String lockId) {
public void acquireLease(String id, String lockId, Duration leaseTime) {
if (!isLeased(id) || isLeasedBy(id, lockId)) {
leases.put(id, new Lease(lockId, clock.millis(), DEFAULT_LEASE_TIME_MILLIS));
leases.put(id, new Lease(lockId, clock.millis(), leaseTime.toMillis()));
} else {
throw new IllegalStateException("Cannot acquire lease, is already leased by someone else!");
}
}

public boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}

private void freeLease(String id) {
leases.remove(id);
}

private void acquireLease(String id) {
acquireLease(id, lockId, DEFAULT_LEASE_TIME);
}

private boolean isLeased(String id) {
return leases.containsKey(id) && !leases.get(id).isExpired(clock.millis());
}

private boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -78,8 +79,9 @@ class ContractNegotiationIntegrationTest {

private static final Duration DEFAULT_TEST_TIMEOUT = Duration.ofSeconds(15);
private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofMillis(100);
private final InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore();
private final InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore();
private final Clock clock = Clock.systemUTC();
private final InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore(clock);
private final InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore(clock);
private final ContractValidationService validationService = mock(ContractValidationService.class);
private final RemoteMessageDispatcherRegistry providerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class);
private final RemoteMessageDispatcherRegistry consumerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.eclipse.edc.connector.spi.callback.CallbackRegistry;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.asset.DataAddressResolver;
import org.eclipse.edc.spi.query.CriterionToAssetPredicateConverter;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.util.concurrency.LockManager;

import java.time.Clock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
Expand All @@ -51,6 +53,9 @@ public String name() {
return NAME;
}

@Inject
private Clock clock;

@Provider(isDefault = true)
public AssetIndex defaultAssetIndex() {
return getAssetIndex();
Expand All @@ -68,12 +73,12 @@ public ContractDefinitionStore defaultContractDefinitionStore() {

@Provider(isDefault = true)
public ContractNegotiationStore defaultContractNegotiationStore() {
return new InMemoryContractNegotiationStore();
return new InMemoryContractNegotiationStore(clock);
}

@Provider(isDefault = true)
public TransferProcessStore defaultTransferProcessStore() {
return new InMemoryTransferProcessStore();
return new InMemoryTransferProcessStore(clock);
}

@Provider(isDefault = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore;
import org.eclipse.edc.connector.core.store.ReflectionBasedQueryResolver;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Stream;
Expand All @@ -41,74 +36,53 @@
/**
* An in-memory, threadsafe process store. This implementation is intended for testing purposes only.
*/
public class InMemoryContractNegotiationStore implements ContractNegotiationStore {
public class InMemoryContractNegotiationStore extends InMemoryStatefulEntityStore<ContractNegotiation> implements ContractNegotiationStore {

private final QueryResolver<ContractNegotiation> negotiationQueryResolver = new ReflectionBasedQueryResolver<>(ContractNegotiation.class);
private final QueryResolver<ContractAgreement> agreementQueryResolver = new ReflectionBasedQueryResolver<>(ContractAgreement.class);
private final InMemoryStatefulEntityStore<ContractNegotiation> store;

public InMemoryContractNegotiationStore() {
this(UUID.randomUUID().toString(), Clock.systemUTC(), new HashMap<>());
public InMemoryContractNegotiationStore(Clock clock) {
this(UUID.randomUUID().toString(), clock);
}

public InMemoryContractNegotiationStore(String leaseHolder, Clock clock, Map<String, Lease> leases) {
store = new InMemoryStatefulEntityStore<>(ContractNegotiation.class, leaseHolder, clock, leases);
}

@Override
public @Nullable ContractNegotiation findById(String negotiationId) {
return store.find(negotiationId);
public InMemoryContractNegotiationStore(String leaseHolder, Clock clock) {
super(ContractNegotiation.class, leaseHolder, clock);
}

@Override
public @Nullable ContractNegotiation findForCorrelationId(String correlationId) {
return store.findAll().filter(p -> correlationId.equals(p.getCorrelationId())).findFirst().orElse(null);
return super.findAll().filter(p -> correlationId.equals(p.getCorrelationId())).findFirst().orElse(null);
}

@Override
public @Nullable ContractAgreement findContractAgreement(String contractId) {
return store.findAll()
return super.findAll()
.map(ContractNegotiation::getContractAgreement)
.filter(Objects::nonNull)
.filter(a -> Objects.equals(contractId, a.getId()))
.findFirst()
.orElse(null);
}

@Override
public void save(ContractNegotiation negotiation) {
store.upsert(negotiation);
}

@Override
public void delete(String negotiationId) {
var negotiation = store.find(negotiationId);
var negotiation = findById(negotiationId);
if (negotiation != null && negotiation.getContractAgreement() != null) {
throw new IllegalStateException(format("Cannot delete ContractNegotiation [%s]: ContractAgreement already created.", negotiationId));
}
store.delete(negotiationId);
super.delete(negotiationId);
}

@Override
public @NotNull Stream<ContractNegotiation> queryNegotiations(QuerySpec querySpec) {
return negotiationQueryResolver.query(store.findAll(), querySpec);
return negotiationQueryResolver.query(super.findAll(), querySpec);
}

@Override
public @NotNull Stream<ContractAgreement> queryAgreements(QuerySpec querySpec) {
return agreementQueryResolver.query(getAgreements(), querySpec);
}

@Override
public @NotNull List<ContractNegotiation> nextNotLeased(int max, Criterion... criteria) {
return store.leaseAndGet(max, criteria);
}

@Override
public StoreResult<ContractNegotiation> findByIdAndLease(String id) {
return store.leaseAndGet(id);
}

@Override
public StoreResult<ContractNegotiation> findByCorrelationIdAndLease(String correlationId) {
var negotiation = findForCorrelationId(correlationId);
Expand All @@ -121,7 +95,7 @@ public StoreResult<ContractNegotiation> findByCorrelationIdAndLease(String corre

@NotNull
private Stream<ContractAgreement> getAgreements() {
return store.findAll()
return super.findAll()
.map(ContractNegotiation::getContractAgreement)
.filter(Objects::nonNull);
}
Expand Down
Loading

0 comments on commit 5e0856e

Please sign in to comment.