Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PolicyMonitorStore SQL implementation #3505

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ maven/mavencentral/com.jcraft/jzlib/1.1.3, BSD-2-Clause, approved, CQ6218
maven/mavencentral/com.lmax/disruptor/3.4.4, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.networknt/json-schema-validator/1.0.76, Apache-2.0, approved, CQ22638
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.28, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.35, , restricted, clearlydefined
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.35, Apache-2.0, approved, #10851
maven/mavencentral/com.puppycrawl.tools/checkstyle/10.0, LGPL-2.1-or-later, approved, #7936
maven/mavencentral/com.samskivert/jmustache/1.15, BSD-2-Clause, approved, clearlydefined
maven/mavencentral/com.squareup.okhttp3/okhttp-dnsoverhttps/4.11.0, Apache-2.0, approved, clearlydefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.persistence.StateEntityStore;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.CriterionToPredicateConverter;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.util.concurrency.LockManager;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,49 +45,33 @@
* An in-memory, threadsafe entity store for a {@link StatefulEntity}. This implementation is intended for testing
* purposes only.
*/
public class InMemoryStatefulEntityStore<T extends StatefulEntity<T>> {
private static final long DEFAULT_LEASE_TIME_MILLIS = 60_000;
public class InMemoryStatefulEntityStore<T extends StatefulEntity<T>> implements StateEntityStore<T> {
private static final Duration DEFAULT_LEASE_TIME = Duration.ofSeconds(60);
private final Map<String, T> entitiesById = new ConcurrentHashMap<>();
private final QueryResolver<T> queryResolver;
private final LockManager lockManager = new LockManager(new ReentrantReadWriteLock());
private final String lockId;
private final Clock clock;
private final Map<String, Lease> leases;
private final Map<String, Lease> leases = new HashMap<>();
private final CriterionToPredicateConverter criterionConverter = new CriterionToPredicateConverterImpl();

public InMemoryStatefulEntityStore(Class<T> clazz, String lockId, Clock clock, Map<String, Lease> leases) {
public InMemoryStatefulEntityStore(Class<T> clazz, String lockId, Clock clock) {
queryResolver = new ReflectionBasedQueryResolver<>(clazz);
this.lockId = lockId;
this.clock = clock;
this.leases = leases;
}

public T find(String id) {
@Override
public @Nullable T findById(String id) {
var t = entitiesById.get(id);
if (t == null) {
return null;
}
return t.copy();
}

public void upsert(T entity) {
acquireLease(entity.getId(), lockId);
entitiesById.put(entity.getId(), entity.copy());
freeLease(entity.getId());
}

public void delete(String id) {
if (isLeased(id)) {
throw new IllegalStateException("Entity is leased and cannot be deleted!");
}
entitiesById.remove(id);
}

public Stream<T> findAll(QuerySpec querySpec) {
return queryResolver.query(findAll(), querySpec);
}

public @NotNull List<T> leaseAndGet(int max, Criterion... criteria) {
@Override
public @NotNull List<T> nextNotLeased(int max, Criterion... criteria) {
return lockManager.writeLock(() -> {
var filterPredicate = Arrays.stream(criteria).map(criterionConverter::convert).reduce(x -> true, Predicate::and);
var entities = entitiesById.values().stream()
Expand All @@ -92,48 +80,72 @@ public Stream<T> findAll(QuerySpec querySpec) {
.sorted(comparingLong(StatefulEntity::getStateTimestamp)) //order by state timestamp, oldest first
.limit(max)
.toList();
entities.forEach(i -> acquireLease(i.getId(), lockId));
entities.forEach(i -> acquireLease(i.getId()));
return entities.stream().map(StatefulEntity::copy).collect(toList());
});
}

public StoreResult<T> leaseAndGet(String id) {
@Override
public StoreResult<T> findByIdAndLease(String id) {
return lockManager.writeLock(() -> {
var entity = entitiesById.get(id);
if (entity == null) {
return StoreResult.notFound(format("Entity %s not found", id));
}

try {
acquireLease(id, lockId);
acquireLease(id);
return StoreResult.success(entity);
} catch (IllegalStateException e) {
return StoreResult.alreadyLeased(format("Entity %s is already leased: %s", id, e.getMessage()));
}
});
}

public Stream<T> findAll() {
return entitiesById.values().stream();
@Override
public void save(T entity) {
acquireLease(entity.getId());
entitiesById.put(entity.getId(), entity.copy());
freeLease(entity.getId());
}

private void freeLease(String id) {
leases.remove(id);
public void delete(String id) {
if (isLeased(id)) {
throw new IllegalStateException("Entity is leased and cannot be deleted!");
}
entitiesById.remove(id);
}

public Stream<T> findAll(QuerySpec querySpec) {
return queryResolver.query(findAll(), querySpec);
}

public Stream<T> findAll() {
return entitiesById.values().stream();
}

private void acquireLease(String id, String lockId) {
public void acquireLease(String id, String lockId, Duration leaseTime) {
if (!isLeased(id) || isLeasedBy(id, lockId)) {
leases.put(id, new Lease(lockId, clock.millis(), DEFAULT_LEASE_TIME_MILLIS));
leases.put(id, new Lease(lockId, clock.millis(), leaseTime.toMillis()));
} else {
throw new IllegalStateException("Cannot acquire lease, is already leased by someone else!");
}
}

public boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}

private void freeLease(String id) {
leases.remove(id);
}

private void acquireLease(String id) {
acquireLease(id, lockId, DEFAULT_LEASE_TIME);
}

private boolean isLeased(String id) {
return leases.containsKey(id) && !leases.get(id).isExpired(clock.millis());
}

private boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -78,8 +79,9 @@ class ContractNegotiationIntegrationTest {

private static final Duration DEFAULT_TEST_TIMEOUT = Duration.ofSeconds(15);
private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofMillis(100);
private final InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore();
private final InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore();
private final Clock clock = Clock.systemUTC();
private final InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore(clock);
private final InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore(clock);
private final ContractValidationService validationService = mock(ContractValidationService.class);
private final RemoteMessageDispatcherRegistry providerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class);
private final RemoteMessageDispatcherRegistry consumerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.eclipse.edc.connector.spi.callback.CallbackRegistry;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.asset.DataAddressResolver;
import org.eclipse.edc.spi.query.CriterionToAssetPredicateConverter;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.util.concurrency.LockManager;

import java.time.Clock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
Expand All @@ -51,6 +53,9 @@ public String name() {
return NAME;
}

@Inject
private Clock clock;

@Provider(isDefault = true)
public AssetIndex defaultAssetIndex() {
return getAssetIndex();
Expand All @@ -68,12 +73,12 @@ public ContractDefinitionStore defaultContractDefinitionStore() {

@Provider(isDefault = true)
public ContractNegotiationStore defaultContractNegotiationStore() {
return new InMemoryContractNegotiationStore();
return new InMemoryContractNegotiationStore(clock);
}

@Provider(isDefault = true)
public TransferProcessStore defaultTransferProcessStore() {
return new InMemoryTransferProcessStore();
return new InMemoryTransferProcessStore(clock);
}

@Provider(isDefault = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore;
import org.eclipse.edc.connector.core.store.ReflectionBasedQueryResolver;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Stream;
Expand All @@ -41,74 +36,53 @@
/**
* An in-memory, threadsafe process store. This implementation is intended for testing purposes only.
*/
public class InMemoryContractNegotiationStore implements ContractNegotiationStore {
public class InMemoryContractNegotiationStore extends InMemoryStatefulEntityStore<ContractNegotiation> implements ContractNegotiationStore {

private final QueryResolver<ContractNegotiation> negotiationQueryResolver = new ReflectionBasedQueryResolver<>(ContractNegotiation.class);
private final QueryResolver<ContractAgreement> agreementQueryResolver = new ReflectionBasedQueryResolver<>(ContractAgreement.class);
private final InMemoryStatefulEntityStore<ContractNegotiation> store;

public InMemoryContractNegotiationStore() {
this(UUID.randomUUID().toString(), Clock.systemUTC(), new HashMap<>());
public InMemoryContractNegotiationStore(Clock clock) {
this(UUID.randomUUID().toString(), clock);
}

public InMemoryContractNegotiationStore(String leaseHolder, Clock clock, Map<String, Lease> leases) {
store = new InMemoryStatefulEntityStore<>(ContractNegotiation.class, leaseHolder, clock, leases);
}

@Override
public @Nullable ContractNegotiation findById(String negotiationId) {
return store.find(negotiationId);
public InMemoryContractNegotiationStore(String leaseHolder, Clock clock) {
super(ContractNegotiation.class, leaseHolder, clock);
}

@Override
public @Nullable ContractNegotiation findForCorrelationId(String correlationId) {
return store.findAll().filter(p -> correlationId.equals(p.getCorrelationId())).findFirst().orElse(null);
return super.findAll().filter(p -> correlationId.equals(p.getCorrelationId())).findFirst().orElse(null);
}

@Override
public @Nullable ContractAgreement findContractAgreement(String contractId) {
return store.findAll()
return super.findAll()
.map(ContractNegotiation::getContractAgreement)
.filter(Objects::nonNull)
.filter(a -> Objects.equals(contractId, a.getId()))
.findFirst()
.orElse(null);
}

@Override
public void save(ContractNegotiation negotiation) {
store.upsert(negotiation);
}

@Override
public void delete(String negotiationId) {
var negotiation = store.find(negotiationId);
var negotiation = findById(negotiationId);
if (negotiation != null && negotiation.getContractAgreement() != null) {
throw new IllegalStateException(format("Cannot delete ContractNegotiation [%s]: ContractAgreement already created.", negotiationId));
}
store.delete(negotiationId);
super.delete(negotiationId);
}

@Override
public @NotNull Stream<ContractNegotiation> queryNegotiations(QuerySpec querySpec) {
return negotiationQueryResolver.query(store.findAll(), querySpec);
return negotiationQueryResolver.query(super.findAll(), querySpec);
}

@Override
public @NotNull Stream<ContractAgreement> queryAgreements(QuerySpec querySpec) {
return agreementQueryResolver.query(getAgreements(), querySpec);
}

@Override
public @NotNull List<ContractNegotiation> nextNotLeased(int max, Criterion... criteria) {
return store.leaseAndGet(max, criteria);
}

@Override
public StoreResult<ContractNegotiation> findByIdAndLease(String id) {
return store.leaseAndGet(id);
}

@Override
public StoreResult<ContractNegotiation> findByCorrelationIdAndLease(String correlationId) {
var negotiation = findForCorrelationId(correlationId);
Expand All @@ -121,7 +95,7 @@ public StoreResult<ContractNegotiation> findByCorrelationIdAndLease(String corre

@NotNull
private Stream<ContractAgreement> getAgreements() {
return store.findAll()
return super.findAll()
.map(ContractNegotiation::getContractAgreement)
.filter(Objects::nonNull);
}
Expand Down
Loading