Skip to content

Commit

Permalink
feat: PolicyMonitorStore SQL implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Oct 3, 2023
1 parent 00e50d2 commit 915c7be
Show file tree
Hide file tree
Showing 44 changed files with 1,174 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.persistence.StateEntityStore;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.CriterionToPredicateConverter;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.util.concurrency.LockManager;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,49 +45,33 @@
* An in-memory, threadsafe entity store for a {@link StatefulEntity}. This implementation is intended for testing
* purposes only.
*/
public class InMemoryStatefulEntityStore<T extends StatefulEntity<T>> {
private static final long DEFAULT_LEASE_TIME_MILLIS = 60_000;
public class InMemoryStatefulEntityStore<T extends StatefulEntity<T>> implements StateEntityStore<T> {
private static final Duration DEFAULT_LEASE_TIME = Duration.ofSeconds(60);
private final Map<String, T> entitiesById = new ConcurrentHashMap<>();
private final QueryResolver<T> queryResolver;
private final LockManager lockManager = new LockManager(new ReentrantReadWriteLock());
private final String lockId;
private final Clock clock;
private final Map<String, Lease> leases;
private final Map<String, Lease> leases = new HashMap<>();
private final CriterionToPredicateConverter criterionConverter = new CriterionToPredicateConverterImpl();

public InMemoryStatefulEntityStore(Class<T> clazz, String lockId, Clock clock, Map<String, Lease> leases) {
public InMemoryStatefulEntityStore(Class<T> clazz, String lockId, Clock clock) {
queryResolver = new ReflectionBasedQueryResolver<>(clazz);
this.lockId = lockId;
this.clock = clock;
this.leases = leases;
}

public T find(String id) {
@Override
public @Nullable T findById(String id) {
var t = entitiesById.get(id);
if (t == null) {
return null;
}
return t.copy();
}

public void upsert(T entity) {
acquireLease(entity.getId(), lockId);
entitiesById.put(entity.getId(), entity.copy());
freeLease(entity.getId());
}

public void delete(String id) {
if (isLeased(id)) {
throw new IllegalStateException("Entity is leased and cannot be deleted!");
}
entitiesById.remove(id);
}

public Stream<T> findAll(QuerySpec querySpec) {
return queryResolver.query(findAll(), querySpec);
}

public @NotNull List<T> leaseAndGet(int max, Criterion... criteria) {
@Override
public @NotNull List<T> nextNotLeased(int max, Criterion... criteria) {
return lockManager.writeLock(() -> {
var filterPredicate = Arrays.stream(criteria).map(criterionConverter::convert).reduce(x -> true, Predicate::and);
var entities = entitiesById.values().stream()
Expand All @@ -92,48 +80,72 @@ public Stream<T> findAll(QuerySpec querySpec) {
.sorted(comparingLong(StatefulEntity::getStateTimestamp)) //order by state timestamp, oldest first
.limit(max)
.toList();
entities.forEach(i -> acquireLease(i.getId(), lockId));
entities.forEach(i -> acquireLease(i.getId()));
return entities.stream().map(StatefulEntity::copy).collect(toList());
});
}

public StoreResult<T> leaseAndGet(String id) {
@Override
public StoreResult<T> findByIdAndLease(String id) {
return lockManager.writeLock(() -> {
var entity = entitiesById.get(id);
if (entity == null) {
return StoreResult.notFound(format("Entity %s not found", id));
}

try {
acquireLease(id, lockId);
acquireLease(id);
return StoreResult.success(entity);
} catch (IllegalStateException e) {
return StoreResult.alreadyLeased(format("Entity %s is already leased: %s", id, e.getMessage()));
}
});
}

public Stream<T> findAll() {
return entitiesById.values().stream();
@Override
public void save(T entity) {
acquireLease(entity.getId());
entitiesById.put(entity.getId(), entity.copy());
freeLease(entity.getId());
}

private void freeLease(String id) {
leases.remove(id);
public void delete(String id) {
if (isLeased(id)) {
throw new IllegalStateException("Entity is leased and cannot be deleted!");
}
entitiesById.remove(id);
}

public Stream<T> findAll(QuerySpec querySpec) {
return queryResolver.query(findAll(), querySpec);
}

public Stream<T> findAll() {
return entitiesById.values().stream();
}

private void acquireLease(String id, String lockId) {
public void acquireLease(String id, String lockId, Duration leaseTime) {
if (!isLeased(id) || isLeasedBy(id, lockId)) {
leases.put(id, new Lease(lockId, clock.millis(), DEFAULT_LEASE_TIME_MILLIS));
leases.put(id, new Lease(lockId, clock.millis(), leaseTime.toMillis()));
} else {
throw new IllegalStateException("Cannot acquire lease, is already leased by someone else!");
}
}

public boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}

private void freeLease(String id) {
leases.remove(id);
}

private void acquireLease(String id) {
acquireLease(id, lockId, DEFAULT_LEASE_TIME);
}

private boolean isLeased(String id) {
return leases.containsKey(id) && !leases.get(id).isExpired(clock.millis());
}

private boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -78,8 +79,9 @@ class ContractNegotiationIntegrationTest {

private static final Duration DEFAULT_TEST_TIMEOUT = Duration.ofSeconds(15);
private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofMillis(100);
private final InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore();
private final InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore();
private final Clock clock = Clock.systemUTC();
private final InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore(clock);
private final InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore(clock);
private final ContractValidationService validationService = mock(ContractValidationService.class);
private final RemoteMessageDispatcherRegistry providerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class);
private final RemoteMessageDispatcherRegistry consumerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.eclipse.edc.connector.spi.callback.CallbackRegistry;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.asset.DataAddressResolver;
import org.eclipse.edc.spi.query.CriterionToAssetPredicateConverter;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.util.concurrency.LockManager;

import java.time.Clock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
Expand All @@ -51,6 +53,9 @@ public String name() {
return NAME;
}

@Inject
private Clock clock;

@Provider(isDefault = true)
public AssetIndex defaultAssetIndex() {
return getAssetIndex();
Expand All @@ -68,12 +73,12 @@ public ContractDefinitionStore defaultContractDefinitionStore() {

@Provider(isDefault = true)
public ContractNegotiationStore defaultContractNegotiationStore() {
return new InMemoryContractNegotiationStore();
return new InMemoryContractNegotiationStore(clock);
}

@Provider(isDefault = true)
public TransferProcessStore defaultTransferProcessStore() {
return new InMemoryTransferProcessStore();
return new InMemoryTransferProcessStore(clock);
}

@Provider(isDefault = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore;
import org.eclipse.edc.connector.core.store.ReflectionBasedQueryResolver;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Stream;
Expand All @@ -41,74 +36,53 @@
/**
* An in-memory, threadsafe process store. This implementation is intended for testing purposes only.
*/
public class InMemoryContractNegotiationStore implements ContractNegotiationStore {
public class InMemoryContractNegotiationStore extends InMemoryStatefulEntityStore<ContractNegotiation> implements ContractNegotiationStore {

private final QueryResolver<ContractNegotiation> negotiationQueryResolver = new ReflectionBasedQueryResolver<>(ContractNegotiation.class);
private final QueryResolver<ContractAgreement> agreementQueryResolver = new ReflectionBasedQueryResolver<>(ContractAgreement.class);
private final InMemoryStatefulEntityStore<ContractNegotiation> store;

public InMemoryContractNegotiationStore() {
this(UUID.randomUUID().toString(), Clock.systemUTC(), new HashMap<>());
public InMemoryContractNegotiationStore(Clock clock) {
this(UUID.randomUUID().toString(), clock);
}

public InMemoryContractNegotiationStore(String leaseHolder, Clock clock, Map<String, Lease> leases) {
store = new InMemoryStatefulEntityStore<>(ContractNegotiation.class, leaseHolder, clock, leases);
}

@Override
public @Nullable ContractNegotiation findById(String negotiationId) {
return store.find(negotiationId);
public InMemoryContractNegotiationStore(String leaseHolder, Clock clock) {
super(ContractNegotiation.class, leaseHolder, clock);
}

@Override
public @Nullable ContractNegotiation findForCorrelationId(String correlationId) {
return store.findAll().filter(p -> correlationId.equals(p.getCorrelationId())).findFirst().orElse(null);
return super.findAll().filter(p -> correlationId.equals(p.getCorrelationId())).findFirst().orElse(null);
}

@Override
public @Nullable ContractAgreement findContractAgreement(String contractId) {
return store.findAll()
return super.findAll()
.map(ContractNegotiation::getContractAgreement)
.filter(Objects::nonNull)
.filter(a -> Objects.equals(contractId, a.getId()))
.findFirst()
.orElse(null);
}

@Override
public void save(ContractNegotiation negotiation) {
store.upsert(negotiation);
}

@Override
public void delete(String negotiationId) {
var negotiation = store.find(negotiationId);
var negotiation = findById(negotiationId);
if (negotiation != null && negotiation.getContractAgreement() != null) {
throw new IllegalStateException(format("Cannot delete ContractNegotiation [%s]: ContractAgreement already created.", negotiationId));
}
store.delete(negotiationId);
super.delete(negotiationId);
}

@Override
public @NotNull Stream<ContractNegotiation> queryNegotiations(QuerySpec querySpec) {
return negotiationQueryResolver.query(store.findAll(), querySpec);
return negotiationQueryResolver.query(super.findAll(), querySpec);
}

@Override
public @NotNull Stream<ContractAgreement> queryAgreements(QuerySpec querySpec) {
return agreementQueryResolver.query(getAgreements(), querySpec);
}

@Override
public @NotNull List<ContractNegotiation> nextNotLeased(int max, Criterion... criteria) {
return store.leaseAndGet(max, criteria);
}

@Override
public StoreResult<ContractNegotiation> findByIdAndLease(String id) {
return store.leaseAndGet(id);
}

@Override
public StoreResult<ContractNegotiation> findByCorrelationIdAndLease(String correlationId) {
var negotiation = findForCorrelationId(correlationId);
Expand All @@ -121,7 +95,7 @@ public StoreResult<ContractNegotiation> findByCorrelationIdAndLease(String corre

@NotNull
private Stream<ContractAgreement> getAgreements() {
return store.findAll()
return super.findAll()
.map(ContractNegotiation::getContractAgreement)
.filter(Objects::nonNull);
}
Expand Down
Loading

0 comments on commit 915c7be

Please sign in to comment.