From c9eb9253d0516e96f6931310bf3e381d2a1126cf Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Mon, 2 Oct 2023 16:54:27 +0200 Subject: [PATCH] feat: PolicyMonitorStore SQL implementation --- DEPENDENCIES | 4 +- .../store/InMemoryStatefulEntityStore.java | 84 ++++--- .../ContractNegotiationIntegrationTest.java | 6 +- .../ControlPlaneDefaultServicesExtension.java | 9 +- .../InMemoryContractNegotiationStore.java | 48 +--- .../InMemoryTransferProcessStore.java | 45 +--- .../InMemoryContractNegotiationStoreTest.java | 23 +- .../InMemoryTransferProcessStoreTest.java | 27 +-- ...sferProcessManagerImplIntegrationTest.java | 4 +- .../DataPlaneDefaultServicesExtension.java | 8 +- .../store/InMemoryDataPlaneStore.java | 40 +--- .../store/InMemoryDataPlaneStoreTest.java | 21 +- .../policy-monitor-core/build.gradle.kts | 1 + ...PolicyMonitorDefaultServicesExtension.java | 45 ++++ .../monitor/PolicyMonitorExtension.java | 7 +- .../store/InMemoryPolicyMonitorStore.java | 60 ----- .../store/sql/InMemoryPolicyMonitorStore.java | 36 +++ ...rg.eclipse.edc.spi.system.ServiceExtension | 1 + .../sql/InMemoryPolicyMonitorStoreTest.java | 42 ++++ .../edc/sql/lease/StatefulEntityMapping.java | 34 +++ .../sql/lease/StatefulEntityStatements.java | 48 ++++ .../schema/ContractNegotiationStatements.java | 27 +-- .../postgres/ContractNegotiationMapping.java | 16 +- .../TransferProcessStoreStatements.java | 35 +-- .../postgres/TransferProcessMapping.java | 13 +- .../store/sql/SqlDataPlaneStore.java | 10 +- .../schema/BaseSqlDataPlaneStatements.java | 5 - .../store/sql/schema/DataPlaneStatements.java | 33 +-- .../sql/schema/postgres/DataPlaneMapping.java | 17 +- .../policy-monitor-store-sql/build.gradle.kts | 35 +++ .../policy-monitor-store-sql/docs/schema.sql | 32 +++ .../store/sql/SqlPolicyMonitorStore.java | 181 +++++++++++++++ .../sql/SqlPolicyMonitorStoreExtension.java | 71 ++++++ .../BaseSqlPolicyMonitorStatements.java | 89 ++++++++ .../sql/schema/PolicyMonitorMapping.java | 30 +++ .../sql/schema/PolicyMonitorStatements.java | 43 ++++ .../PostgresPolicyMonitorStatements.java | 26 +++ ...rg.eclipse.edc.spi.system.ServiceExtension | 16 ++ .../sql/PostgresPolicyMonitorStoreTest.java | 82 +++++++ settings.gradle.kts | 2 + .../ContractNegotiationStoreTestBase.java | 3 +- .../policy-monitor-spi/build.gradle.kts | 6 + .../store/PolicyMonitorStoreTestBase.java | 208 ++++++++++++++++++ .../control-plane-postgresql/build.gradle.kts | 3 +- .../eclipse/edc/test/e2e/PostgresUtil.java | 26 +-- 45 files changed, 1176 insertions(+), 426 deletions(-) create mode 100644 core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/PolicyMonitorDefaultServicesExtension.java delete mode 100644 core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/store/InMemoryPolicyMonitorStore.java create mode 100644 core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/InMemoryPolicyMonitorStore.java create mode 100644 core/policy-monitor/policy-monitor-core/src/test/java/org/eclipse/edc/connector/policy/monitor/store/sql/InMemoryPolicyMonitorStoreTest.java create mode 100644 extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityMapping.java create mode 100644 extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityStatements.java create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/build.gradle.kts create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/docs/schema.sql create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStore.java create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStoreExtension.java create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/BaseSqlPolicyMonitorStatements.java create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PolicyMonitorMapping.java create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PolicyMonitorStatements.java create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PostgresPolicyMonitorStatements.java create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension create mode 100644 extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/test/java/org/eclipse/edc/connector/policy/monitor/store/sql/PostgresPolicyMonitorStoreTest.java create mode 100644 spi/policy-monitor/policy-monitor-spi/src/testFixtures/java/org/eclipse/edc/connector/policy/monitor/spi/testfixtures/store/PolicyMonitorStoreTestBase.java diff --git a/DEPENDENCIES b/DEPENDENCIES index bc02712e19a..578f93b8d6c 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -75,7 +75,7 @@ maven/mavencentral/com.jcraft/jzlib/1.1.3, BSD-2-Clause, approved, CQ6218 maven/mavencentral/com.lmax/disruptor/3.4.4, Apache-2.0, approved, clearlydefined maven/mavencentral/com.networknt/json-schema-validator/1.0.76, Apache-2.0, approved, CQ22638 maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.28, Apache-2.0, approved, clearlydefined -maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.32, Apache-2.0, approved, #10561 +maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.35, Apache-2.0, approved, #10851 maven/mavencentral/com.puppycrawl.tools/checkstyle/10.0, LGPL-2.1-or-later, approved, #7936 maven/mavencentral/com.samskivert/jmustache/1.15, BSD-2-Clause, approved, clearlydefined maven/mavencentral/com.squareup.okhttp3/okhttp-dnsoverhttps/4.11.0, Apache-2.0, approved, clearlydefined @@ -190,7 +190,7 @@ maven/mavencentral/org.apache.commons/commons-digester3/3.2, Apache-2.0, approve maven/mavencentral/org.apache.commons/commons-lang3/3.10, Apache-2.0, approved, clearlydefined maven/mavencentral/org.apache.commons/commons-lang3/3.11, Apache-2.0, approved, CQ22642 maven/mavencentral/org.apache.commons/commons-lang3/3.12.0, Apache-2.0, approved, clearlydefined -maven/mavencentral/org.apache.commons/commons-pool2/2.11.1, Apache-2.0, approved, CQ23795 +maven/mavencentral/org.apache.commons/commons-pool2/2.12.0, Apache-2.0 AND LicenseRef-Public-Domain, approved, #10843 maven/mavencentral/org.apache.commons/commons-text/1.10.0, Apache-2.0, approved, clearlydefined maven/mavencentral/org.apache.groovy/groovy-bom/4.0.11, Apache-2.0, approved, #9266 maven/mavencentral/org.apache.groovy/groovy-json/4.0.11, Apache-2.0, approved, #7411 diff --git a/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/store/InMemoryStatefulEntityStore.java b/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/store/InMemoryStatefulEntityStore.java index 8a305b34168..0cf24f1f2a6 100644 --- a/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/store/InMemoryStatefulEntityStore.java +++ b/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/store/InMemoryStatefulEntityStore.java @@ -16,6 +16,7 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.persistence.Lease; +import org.eclipse.edc.spi.persistence.StateEntityStore; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.query.CriterionToPredicateConverter; import org.eclipse.edc.spi.query.QueryResolver; @@ -23,9 +24,12 @@ import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.util.concurrency.LockManager; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.time.Clock; +import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -41,24 +45,24 @@ * An in-memory, threadsafe entity store for a {@link StatefulEntity}. This implementation is intended for testing * purposes only. */ -public class InMemoryStatefulEntityStore> { - private static final long DEFAULT_LEASE_TIME_MILLIS = 60_000; +public class InMemoryStatefulEntityStore> implements StateEntityStore { + private static final Duration DEFAULT_LEASE_TIME = Duration.ofSeconds(60); private final Map entitiesById = new ConcurrentHashMap<>(); private final QueryResolver queryResolver; private final LockManager lockManager = new LockManager(new ReentrantReadWriteLock()); private final String lockId; private final Clock clock; - private final Map leases; + private final Map leases = new HashMap<>(); private final CriterionToPredicateConverter criterionConverter = new CriterionToPredicateConverterImpl(); - public InMemoryStatefulEntityStore(Class clazz, String lockId, Clock clock, Map leases) { + public InMemoryStatefulEntityStore(Class clazz, String lockId, Clock clock) { queryResolver = new ReflectionBasedQueryResolver<>(clazz); this.lockId = lockId; this.clock = clock; - this.leases = leases; } - public T find(String id) { + @Override + public @Nullable T findById(String id) { var t = entitiesById.get(id); if (t == null) { return null; @@ -66,24 +70,8 @@ public T find(String id) { return t.copy(); } - public void upsert(T entity) { - acquireLease(entity.getId(), lockId); - entitiesById.put(entity.getId(), entity.copy()); - freeLease(entity.getId()); - } - - public void delete(String id) { - if (isLeased(id)) { - throw new IllegalStateException("Entity is leased and cannot be deleted!"); - } - entitiesById.remove(id); - } - - public Stream findAll(QuerySpec querySpec) { - return queryResolver.query(findAll(), querySpec); - } - - public @NotNull List leaseAndGet(int max, Criterion... criteria) { + @Override + public @NotNull List nextNotLeased(int max, Criterion... criteria) { return lockManager.writeLock(() -> { var filterPredicate = Arrays.stream(criteria).map(criterionConverter::convert).reduce(x -> true, Predicate::and); var entities = entitiesById.values().stream() @@ -92,12 +80,13 @@ public Stream findAll(QuerySpec querySpec) { .sorted(comparingLong(StatefulEntity::getStateTimestamp)) //order by state timestamp, oldest first .limit(max) .toList(); - entities.forEach(i -> acquireLease(i.getId(), lockId)); + entities.forEach(i -> acquireLease(i.getId())); return entities.stream().map(StatefulEntity::copy).collect(toList()); }); } - public StoreResult leaseAndGet(String id) { + @Override + public StoreResult findByIdAndLease(String id) { return lockManager.writeLock(() -> { var entity = entitiesById.get(id); if (entity == null) { @@ -105,7 +94,7 @@ public StoreResult leaseAndGet(String id) { } try { - acquireLease(id, lockId); + acquireLease(id); return StoreResult.success(entity); } catch (IllegalStateException e) { return StoreResult.alreadyLeased(format("Entity %s is already leased: %s", id, e.getMessage())); @@ -113,27 +102,50 @@ public StoreResult leaseAndGet(String id) { }); } - public Stream findAll() { - return entitiesById.values().stream(); + @Override + public void save(T entity) { + acquireLease(entity.getId()); + entitiesById.put(entity.getId(), entity.copy()); + freeLease(entity.getId()); } - private void freeLease(String id) { - leases.remove(id); + public void delete(String id) { + if (isLeased(id)) { + throw new IllegalStateException("Entity is leased and cannot be deleted!"); + } + entitiesById.remove(id); + } + + public Stream findAll(QuerySpec querySpec) { + return queryResolver.query(findAll(), querySpec); + } + + public Stream findAll() { + return entitiesById.values().stream(); } - private void acquireLease(String id, String lockId) { + public void acquireLease(String id, String lockId, Duration leaseTime) { if (!isLeased(id) || isLeasedBy(id, lockId)) { - leases.put(id, new Lease(lockId, clock.millis(), DEFAULT_LEASE_TIME_MILLIS)); + leases.put(id, new Lease(lockId, clock.millis(), leaseTime.toMillis())); } else { throw new IllegalStateException("Cannot acquire lease, is already leased by someone else!"); } } + public boolean isLeasedBy(String id, String lockId) { + return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId); + } + + private void freeLease(String id) { + leases.remove(id); + } + + private void acquireLease(String id) { + acquireLease(id, lockId, DEFAULT_LEASE_TIME); + } + private boolean isLeased(String id) { return leases.containsKey(id) && !leases.get(id).isExpired(clock.millis()); } - private boolean isLeasedBy(String id, String lockId) { - return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId); - } } diff --git a/core/control-plane/contract-core/src/test/java/org/eclipse/edc/connector/contract/negotiation/ContractNegotiationIntegrationTest.java b/core/control-plane/contract-core/src/test/java/org/eclipse/edc/connector/contract/negotiation/ContractNegotiationIntegrationTest.java index 954f3527b5b..a3a06c623d7 100644 --- a/core/control-plane/contract-core/src/test/java/org/eclipse/edc/connector/contract/negotiation/ContractNegotiationIntegrationTest.java +++ b/core/control-plane/contract-core/src/test/java/org/eclipse/edc/connector/contract/negotiation/ContractNegotiationIntegrationTest.java @@ -54,6 +54,7 @@ import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; +import java.time.Clock; import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -78,8 +79,9 @@ class ContractNegotiationIntegrationTest { private static final Duration DEFAULT_TEST_TIMEOUT = Duration.ofSeconds(15); private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofMillis(100); - private final InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore(); - private final InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore(); + private final Clock clock = Clock.systemUTC(); + private final InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore(clock); + private final InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore(clock); private final ContractValidationService validationService = mock(ContractValidationService.class); private final RemoteMessageDispatcherRegistry providerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class); private final RemoteMessageDispatcherRegistry consumerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class); diff --git a/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/ControlPlaneDefaultServicesExtension.java b/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/ControlPlaneDefaultServicesExtension.java index 7176956b21f..fb52b523795 100644 --- a/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/ControlPlaneDefaultServicesExtension.java +++ b/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/ControlPlaneDefaultServicesExtension.java @@ -27,6 +27,7 @@ import org.eclipse.edc.connector.spi.callback.CallbackRegistry; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; import org.eclipse.edc.spi.asset.AssetIndex; import org.eclipse.edc.spi.asset.DataAddressResolver; @@ -34,6 +35,7 @@ import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.util.concurrency.LockManager; +import java.time.Clock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -51,6 +53,9 @@ public String name() { return NAME; } + @Inject + private Clock clock; + @Provider(isDefault = true) public AssetIndex defaultAssetIndex() { return getAssetIndex(); @@ -68,12 +73,12 @@ public ContractDefinitionStore defaultContractDefinitionStore() { @Provider(isDefault = true) public ContractNegotiationStore defaultContractNegotiationStore() { - return new InMemoryContractNegotiationStore(); + return new InMemoryContractNegotiationStore(clock); } @Provider(isDefault = true) public TransferProcessStore defaultTransferProcessStore() { - return new InMemoryTransferProcessStore(); + return new InMemoryTransferProcessStore(clock); } @Provider(isDefault = true) diff --git a/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/contractnegotiation/InMemoryContractNegotiationStore.java b/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/contractnegotiation/InMemoryContractNegotiationStore.java index fb1fce78e48..e3e5ca0a76e 100644 --- a/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/contractnegotiation/InMemoryContractNegotiationStore.java +++ b/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/contractnegotiation/InMemoryContractNegotiationStore.java @@ -20,8 +20,6 @@ import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation; import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore; import org.eclipse.edc.connector.core.store.ReflectionBasedQueryResolver; -import org.eclipse.edc.spi.persistence.Lease; -import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.query.QueryResolver; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.StoreResult; @@ -29,9 +27,6 @@ import org.jetbrains.annotations.Nullable; import java.time.Clock; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.stream.Stream; @@ -41,33 +36,27 @@ /** * An in-memory, threadsafe process store. This implementation is intended for testing purposes only. */ -public class InMemoryContractNegotiationStore implements ContractNegotiationStore { +public class InMemoryContractNegotiationStore extends InMemoryStatefulEntityStore implements ContractNegotiationStore { private final QueryResolver negotiationQueryResolver = new ReflectionBasedQueryResolver<>(ContractNegotiation.class); private final QueryResolver agreementQueryResolver = new ReflectionBasedQueryResolver<>(ContractAgreement.class); - private final InMemoryStatefulEntityStore store; - public InMemoryContractNegotiationStore() { - this(UUID.randomUUID().toString(), Clock.systemUTC(), new HashMap<>()); + public InMemoryContractNegotiationStore(Clock clock) { + this(UUID.randomUUID().toString(), clock); } - public InMemoryContractNegotiationStore(String leaseHolder, Clock clock, Map leases) { - store = new InMemoryStatefulEntityStore<>(ContractNegotiation.class, leaseHolder, clock, leases); - } - - @Override - public @Nullable ContractNegotiation findById(String negotiationId) { - return store.find(negotiationId); + public InMemoryContractNegotiationStore(String leaseHolder, Clock clock) { + super(ContractNegotiation.class, leaseHolder, clock); } @Override public @Nullable ContractNegotiation findForCorrelationId(String correlationId) { - return store.findAll().filter(p -> correlationId.equals(p.getCorrelationId())).findFirst().orElse(null); + return super.findAll().filter(p -> correlationId.equals(p.getCorrelationId())).findFirst().orElse(null); } @Override public @Nullable ContractAgreement findContractAgreement(String contractId) { - return store.findAll() + return super.findAll() .map(ContractNegotiation::getContractAgreement) .filter(Objects::nonNull) .filter(a -> Objects.equals(contractId, a.getId())) @@ -75,23 +64,18 @@ public InMemoryContractNegotiationStore(String leaseHolder, Clock clock, Map queryNegotiations(QuerySpec querySpec) { - return negotiationQueryResolver.query(store.findAll(), querySpec); + return negotiationQueryResolver.query(super.findAll(), querySpec); } @Override @@ -99,16 +83,6 @@ public void delete(String negotiationId) { return agreementQueryResolver.query(getAgreements(), querySpec); } - @Override - public @NotNull List nextNotLeased(int max, Criterion... criteria) { - return store.leaseAndGet(max, criteria); - } - - @Override - public StoreResult findByIdAndLease(String id) { - return store.leaseAndGet(id); - } - @Override public StoreResult findByCorrelationIdAndLease(String correlationId) { var negotiation = findForCorrelationId(correlationId); @@ -121,7 +95,7 @@ public StoreResult findByCorrelationIdAndLease(String corre @NotNull private Stream getAgreements() { - return store.findAll() + return super.findAll() .map(ContractNegotiation::getContractAgreement) .filter(Objects::nonNull); } diff --git a/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStore.java b/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStore.java index a1c882a1b7f..8c1c4071085 100644 --- a/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStore.java +++ b/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStore.java @@ -17,17 +17,11 @@ import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; -import org.eclipse.edc.spi.persistence.Lease; -import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.StoreResult; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.time.Clock; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.stream.Stream; @@ -37,49 +31,31 @@ /** * An in-memory, threadsafe process store. This implementation is intended for testing purposes only. */ -public class InMemoryTransferProcessStore implements TransferProcessStore { +public class InMemoryTransferProcessStore extends InMemoryStatefulEntityStore implements TransferProcessStore { - private final InMemoryStatefulEntityStore store; - - public InMemoryTransferProcessStore() { - this(UUID.randomUUID().toString(), Clock.systemUTC(), new HashMap<>()); - } - - public InMemoryTransferProcessStore(String leaserId, Clock clock, Map leases) { - store = new InMemoryStatefulEntityStore<>(TransferProcess.class, leaserId, clock, leases); + public InMemoryTransferProcessStore(Clock clock) { + this(UUID.randomUUID().toString(), clock); } - @Nullable - @Override - public TransferProcess findById(String id) { - return store.find(id); + public InMemoryTransferProcessStore(String leaserId, Clock clock) { + super(TransferProcess.class, leaserId, clock); } @Override public @Nullable TransferProcess findForCorrelationId(String correlationId) { var querySpec = QuerySpec.Builder.newInstance().filter(criterion("dataRequest.id", "=", correlationId)).build(); - return store.findAll(querySpec).findFirst().orElse(null); + return super.findAll(querySpec).findFirst().orElse(null); } @Override public void delete(String id) { - store.delete(id); + super.delete(id); } @Override public Stream findAll(QuerySpec querySpec) { - return store.findAll(querySpec); - } - - @Override - public @NotNull List nextNotLeased(int max, Criterion... criteria) { - return store.leaseAndGet(max, criteria); - } - - @Override - public StoreResult findByIdAndLease(String id) { - return store.leaseAndGet(id); + return super.findAll(querySpec); } @Override @@ -92,9 +68,4 @@ public StoreResult findByCorrelationIdAndLease(String correlati return findByIdAndLease(transferProcess.getId()); } - @Override - public void save(TransferProcess entity) { - store.upsert(entity); - } - } diff --git a/core/control-plane/control-plane-core/src/test/java/org/eclipse/edc/connector/defaults/storage/contractnegotiation/InMemoryContractNegotiationStoreTest.java b/core/control-plane/control-plane-core/src/test/java/org/eclipse/edc/connector/defaults/storage/contractnegotiation/InMemoryContractNegotiationStoreTest.java index f9dfb70a933..f3fd9c81a7b 100644 --- a/core/control-plane/control-plane-core/src/test/java/org/eclipse/edc/connector/defaults/storage/contractnegotiation/InMemoryContractNegotiationStoreTest.java +++ b/core/control-plane/control-plane-core/src/test/java/org/eclipse/edc/connector/defaults/storage/contractnegotiation/InMemoryContractNegotiationStoreTest.java @@ -17,23 +17,12 @@ import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.contract.spi.testfixtures.negotiation.store.ContractNegotiationStoreTestBase; -import org.eclipse.edc.spi.persistence.Lease; -import org.junit.jupiter.api.BeforeEach; -import java.time.Clock; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; class InMemoryContractNegotiationStoreTest extends ContractNegotiationStoreTestBase { - private final Map leases = new HashMap<>(); - private InMemoryContractNegotiationStore store; - - @BeforeEach - void setUp() { - store = new InMemoryContractNegotiationStore(CONNECTOR_NAME, Clock.systemUTC(), leases); - } + private final InMemoryContractNegotiationStore store = new InMemoryContractNegotiationStore(CONNECTOR_NAME, clock); @Override protected ContractNegotiationStore getContractNegotiationStore() { @@ -42,18 +31,12 @@ protected ContractNegotiationStore getContractNegotiationStore() { @Override protected void leaseEntity(String negotiationId, String owner, Duration duration) { - leases.put(negotiationId, new Lease(owner, Clock.systemUTC().millis(), duration.toMillis())); + store.acquireLease(negotiationId, owner, duration); } @Override protected boolean isLeasedBy(String negotiationId, String owner) { - return leases.entrySet().stream().anyMatch(e -> e.getKey().equals(negotiationId) && - e.getValue().getLeasedBy().equals(owner) && - !isExpired(e.getValue())); - } - - private boolean isExpired(Lease e) { - return e.getLeasedAt() + e.getLeaseDuration() < Clock.systemUTC().millis(); + return store.isLeasedBy(negotiationId, owner); } } diff --git a/core/control-plane/control-plane-core/src/test/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStoreTest.java b/core/control-plane/control-plane-core/src/test/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStoreTest.java index 484265e5ff6..c10a8c07ae2 100644 --- a/core/control-plane/control-plane-core/src/test/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStoreTest.java +++ b/core/control-plane/control-plane-core/src/test/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStoreTest.java @@ -16,23 +16,13 @@ import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.transfer.spi.testfixtures.store.TransferProcessStoreTestBase; -import org.eclipse.edc.spi.persistence.Lease; -import org.junit.jupiter.api.BeforeEach; import java.time.Clock; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; class InMemoryTransferProcessStoreTest extends TransferProcessStoreTestBase { - private final Map leases = new HashMap<>(); - private InMemoryTransferProcessStore store; - - @BeforeEach - void setUp() { - store = new InMemoryTransferProcessStore(CONNECTOR_NAME, Clock.systemUTC(), leases); - } + private final InMemoryTransferProcessStore store = new InMemoryTransferProcessStore(CONNECTOR_NAME, Clock.systemUTC()); @Override protected TransferProcessStore getTransferProcessStore() { @@ -40,20 +30,13 @@ protected TransferProcessStore getTransferProcessStore() { } @Override - protected void leaseEntity(String negotiationId, String owner, Duration duration) { - leases.put(negotiationId, new Lease(owner, Clock.systemUTC().millis(), duration.toMillis())); + protected void leaseEntity(String entityId, String owner, Duration duration) { + store.acquireLease(entityId, owner, duration); } @Override - protected boolean isLeasedBy(String negotiationId, String owner) { - return leases.entrySet().stream().anyMatch(e -> e.getKey().equals(negotiationId) && - e.getValue().getLeasedBy().equals(owner) && - !isExpired(e.getValue())); + protected boolean isLeasedBy(String entityId, String owner) { + return store.isLeasedBy(entityId, owner); } - private boolean isExpired(Lease e) { - return e.getLeasedAt() + e.getLeaseDuration() < Clock.systemUTC().millis(); - } - - } diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java index 60efdd48fe2..11877800149 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java @@ -63,7 +63,8 @@ class TransferProcessManagerImplIntegrationTest { private static final int TRANSFER_MANAGER_BATCH_SIZE = 10; private final ProvisionManager provisionManager = mock(ProvisionManager.class); private final ResourceManifestGenerator manifestGenerator = mock(ResourceManifestGenerator.class); - private final TransferProcessStore store = new InMemoryTransferProcessStore(); + private final Clock clock = Clock.systemUTC(); + private final TransferProcessStore store = new InMemoryTransferProcessStore(clock); private TransferProcessManagerImpl transferProcessManager; @BeforeEach @@ -76,7 +77,6 @@ void setup() { var monitor = mock(Monitor.class); var waitStrategy = mock(ExponentialWaitStrategy.class); - var clock = Clock.systemUTC(); transferProcessManager = TransferProcessManagerImpl.Builder.newInstance() .provisionManager(provisionManager) .dataFlowManager(mock()) diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java index cab92df8839..42a6b1da4ef 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java @@ -18,9 +18,12 @@ import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; import org.eclipse.edc.spi.system.ServiceExtension; +import java.time.Clock; + @Extension(value = DataPlaneDefaultServicesExtension.NAME) public class DataPlaneDefaultServicesExtension implements ServiceExtension { @@ -31,6 +34,9 @@ public String name() { return NAME; } + @Inject + private Clock clock; + @Provider(isDefault = true) public TransferServiceSelectionStrategy transferServiceSelectionStrategy() { return TransferServiceSelectionStrategy.selectFirst(); @@ -38,6 +44,6 @@ public TransferServiceSelectionStrategy transferServiceSelectionStrategy() { @Provider(isDefault = true) public DataPlaneStore dataPlaneStore() { - return new InMemoryDataPlaneStore(); + return new InMemoryDataPlaneStore(clock); } } diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryDataPlaneStore.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryDataPlaneStore.java index b740c8539f9..22e5b430b3f 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryDataPlaneStore.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryDataPlaneStore.java @@ -17,50 +17,20 @@ import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore; import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; -import org.eclipse.edc.spi.persistence.Lease; -import org.eclipse.edc.spi.query.Criterion; -import org.eclipse.edc.spi.result.StoreResult; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import java.time.Clock; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.UUID; /** * Implements an in-memory, ephemeral store with a maximum capacity. If the store grows beyond capacity, the oldest entry will be evicted. */ -public class InMemoryDataPlaneStore implements DataPlaneStore { +public class InMemoryDataPlaneStore extends InMemoryStatefulEntityStore implements DataPlaneStore { - private final InMemoryStatefulEntityStore store; - - public InMemoryDataPlaneStore() { - this(UUID.randomUUID().toString(), new HashMap<>()); - } - - public InMemoryDataPlaneStore(String connectorName, Map leases) { - store = new InMemoryStatefulEntityStore<>(DataFlow.class, connectorName, Clock.systemUTC(), leases); - } - - @Override - public @Nullable DataFlow findById(String id) { - return store.find(id); - } - - @Override - public @NotNull List nextNotLeased(int max, Criterion... criteria) { - return store.leaseAndGet(max, criteria); - } - - @Override - public StoreResult findByIdAndLease(String id) { - return store.leaseAndGet(id); + public InMemoryDataPlaneStore(Clock clock) { + this(UUID.randomUUID().toString(), clock); } - @Override - public void save(DataFlow entity) { - store.upsert(entity); + public InMemoryDataPlaneStore(String connectorName, Clock clock) { + super(DataFlow.class, connectorName, clock); } } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryDataPlaneStoreTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryDataPlaneStoreTest.java index 28059e595f6..b3573992041 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryDataPlaneStoreTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryDataPlaneStoreTest.java @@ -16,23 +16,13 @@ import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; import org.eclipse.edc.connector.dataplane.spi.testfixtures.store.DataPlaneStoreTestBase; -import org.eclipse.edc.spi.persistence.Lease; -import org.junit.jupiter.api.BeforeEach; import java.time.Clock; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; class InMemoryDataPlaneStoreTest extends DataPlaneStoreTestBase { - private final Map leases = new HashMap<>(); - private InMemoryDataPlaneStore store; - - @BeforeEach - void setUp() { - store = new InMemoryDataPlaneStore(CONNECTOR_NAME, leases); - } + private final InMemoryDataPlaneStore store = new InMemoryDataPlaneStore(CONNECTOR_NAME, Clock.systemUTC()); @Override protected DataPlaneStore getStore() { @@ -41,17 +31,12 @@ protected DataPlaneStore getStore() { @Override protected void leaseEntity(String entityId, String owner, Duration duration) { - leases.put(entityId, new Lease(owner, Clock.systemUTC().millis(), duration.toMillis())); + store.acquireLease(entityId, owner, duration); } @Override protected boolean isLeasedBy(String entityId, String owner) { - return leases.entrySet().stream().anyMatch(e -> e.getKey().equals(entityId) && - e.getValue().getLeasedBy().equals(owner) && - !isExpired(e.getValue())); + return store.isLeasedBy(entityId, owner); } - private boolean isExpired(Lease e) { - return e.getLeasedAt() + e.getLeaseDuration() < Clock.systemUTC().millis(); - } } diff --git a/core/policy-monitor/policy-monitor-core/build.gradle.kts b/core/policy-monitor/policy-monitor-core/build.gradle.kts index 63028506f7a..04e6c0d7022 100644 --- a/core/policy-monitor/policy-monitor-core/build.gradle.kts +++ b/core/policy-monitor/policy-monitor-core/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { testImplementation(project(":core:common:junit")) testImplementation(libs.awaitility) + testImplementation(testFixtures(project(":spi:policy-monitor:policy-monitor-spi"))) } diff --git a/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/PolicyMonitorDefaultServicesExtension.java b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/PolicyMonitorDefaultServicesExtension.java new file mode 100644 index 00000000000..045fabd74b5 --- /dev/null +++ b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/PolicyMonitorDefaultServicesExtension.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor; + +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; +import org.eclipse.edc.connector.policy.monitor.store.sql.InMemoryPolicyMonitorStore; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Provider; +import org.eclipse.edc.spi.system.ServiceExtension; + +import java.time.Clock; + +import static org.eclipse.edc.connector.policy.monitor.PolicyMonitorDefaultServicesExtension.NAME; + +@Extension(value = NAME) +public class PolicyMonitorDefaultServicesExtension implements ServiceExtension { + + public static final String NAME = "PolicyMonitor Default Services"; + + @Override + public String name() { + return NAME; + } + + @Inject + private Clock clock; + + @Provider + public PolicyMonitorStore policyMonitorStore() { + return new InMemoryPolicyMonitorStore(clock); + } +} diff --git a/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/PolicyMonitorExtension.java b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/PolicyMonitorExtension.java index c8c1c9bdcb4..a0b610f9c42 100644 --- a/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/PolicyMonitorExtension.java +++ b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/PolicyMonitorExtension.java @@ -16,7 +16,7 @@ import org.eclipse.edc.connector.policy.monitor.manager.PolicyMonitorManagerImpl; import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorManager; -import org.eclipse.edc.connector.policy.monitor.store.InMemoryPolicyMonitorStore; +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; import org.eclipse.edc.connector.policy.monitor.subscriber.StartMonitoring; import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService; import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService; @@ -72,6 +72,9 @@ public class PolicyMonitorExtension implements ServiceExtension { @Inject private TransferProcessService transferProcessService; + @Inject + private PolicyMonitorStore policyMonitorStore; + private PolicyMonitorManager manager; @Override @@ -89,7 +92,7 @@ public void initialize(ServiceExtensionContext context) { .contractAgreementService(contractAgreementService) .policyEngine(policyEngine) .transferProcessService(transferProcessService) - .store(new InMemoryPolicyMonitorStore()) + .store(policyMonitorStore) .build(); context.registerService(PolicyMonitorManager.class, manager); diff --git a/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/store/InMemoryPolicyMonitorStore.java b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/store/InMemoryPolicyMonitorStore.java deleted file mode 100644 index b2599111f92..00000000000 --- a/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/store/InMemoryPolicyMonitorStore.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.connector.policy.monitor.store; - -import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore; -import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntry; -import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; -import org.eclipse.edc.spi.query.Criterion; -import org.eclipse.edc.spi.result.StoreResult; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import java.time.Clock; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; - -/** - * In-memory implementation of the {@link PolicyMonitorStore} - */ -public class InMemoryPolicyMonitorStore implements PolicyMonitorStore { - - private final InMemoryStatefulEntityStore store; - - public InMemoryPolicyMonitorStore() { - store = new InMemoryStatefulEntityStore<>(PolicyMonitorEntry.class, UUID.randomUUID().toString(), Clock.systemUTC(), new HashMap<>()); - } - - @Override - public @Nullable PolicyMonitorEntry findById(String id) { - return store.find(id); - } - - @Override - public @NotNull List nextNotLeased(int max, Criterion... criteria) { - return store.leaseAndGet(max, criteria); - } - - @Override - public StoreResult findByIdAndLease(String id) { - return store.leaseAndGet(id); - } - - @Override - public void save(PolicyMonitorEntry entity) { - store.upsert(entity); - } -} diff --git a/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/InMemoryPolicyMonitorStore.java b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/InMemoryPolicyMonitorStore.java new file mode 100644 index 00000000000..4590eeddcc2 --- /dev/null +++ b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/InMemoryPolicyMonitorStore.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql; + +import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore; +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntry; +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; + +import java.time.Clock; +import java.util.UUID; + +/** + * In-memory implementation of the {@link PolicyMonitorStore} + */ +public class InMemoryPolicyMonitorStore extends InMemoryStatefulEntityStore implements PolicyMonitorStore { + + public InMemoryPolicyMonitorStore(Clock clock) { + this(UUID.randomUUID().toString(), clock); + } + + public InMemoryPolicyMonitorStore(String owner, Clock clock) { + super(PolicyMonitorEntry.class, owner, clock); + } +} diff --git a/core/policy-monitor/policy-monitor-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/core/policy-monitor/policy-monitor-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension index 38b989d3673..311520a34b8 100644 --- a/core/policy-monitor/policy-monitor-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension +++ b/core/policy-monitor/policy-monitor-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -12,4 +12,5 @@ # # +org.eclipse.edc.connector.policy.monitor.PolicyMonitorDefaultServicesExtension org.eclipse.edc.connector.policy.monitor.PolicyMonitorExtension diff --git a/core/policy-monitor/policy-monitor-core/src/test/java/org/eclipse/edc/connector/policy/monitor/store/sql/InMemoryPolicyMonitorStoreTest.java b/core/policy-monitor/policy-monitor-core/src/test/java/org/eclipse/edc/connector/policy/monitor/store/sql/InMemoryPolicyMonitorStoreTest.java new file mode 100644 index 00000000000..542be0d7251 --- /dev/null +++ b/core/policy-monitor/policy-monitor-core/src/test/java/org/eclipse/edc/connector/policy/monitor/store/sql/InMemoryPolicyMonitorStoreTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql; + +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; +import org.eclipse.edc.connector.policy.monitor.spi.testfixtures.store.PolicyMonitorStoreTestBase; + +import java.time.Clock; +import java.time.Duration; + +class InMemoryPolicyMonitorStoreTest extends PolicyMonitorStoreTestBase { + + private final InMemoryPolicyMonitorStore store = new InMemoryPolicyMonitorStore(CONNECTOR_NAME, Clock.systemUTC()); + + @Override + protected PolicyMonitorStore getStore() { + return store; + } + + @Override + protected void leaseEntity(String entityId, String owner, Duration duration) { + store.acquireLease(entityId, owner, duration); + } + + @Override + protected boolean isLeasedBy(String entityId, String owner) { + return store.isLeasedBy(entityId, owner); + } + +} diff --git a/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityMapping.java b/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityMapping.java new file mode 100644 index 00000000000..e41f596a7c6 --- /dev/null +++ b/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityMapping.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.sql.lease; + +import org.eclipse.edc.sql.translation.JsonFieldMapping; +import org.eclipse.edc.sql.translation.TranslationMapping; + +/** + * Maps fields of a {@link org.eclipse.edc.spi.entity.StatefulEntity} onto the + * corresponding SQL schema (= column names) enabling access through Postgres JSON operators where applicable + */ +public class StatefulEntityMapping extends TranslationMapping { + + protected StatefulEntityMapping(StatefulEntityStatements statements) { + add("id", statements.getIdColumn()); + add("state", statements.getStateColumn()); + add("stateCount", statements.getStateCountColumn()); + add("createdAt", statements.getCreatedAtColumn()); + add("traceContext", new JsonFieldMapping(statements.getTraceContextColumn())); + add("errorDetail", statements.getErrorDetailColumn()); + } +} diff --git a/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityStatements.java b/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityStatements.java new file mode 100644 index 00000000000..b881c994b19 --- /dev/null +++ b/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityStatements.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.sql.lease; + +public interface StatefulEntityStatements { + + String getIdColumn(); + + default String getStateColumn() { + return "state"; + } + + default String getStateTimestampColumn() { + return "state_time_stamp"; + } + + default String getStateCountColumn() { + return "state_count"; + } + + default String getTraceContextColumn() { + return "trace_context"; + } + + default String getErrorDetailColumn() { + return "error_detail"; + } + + default String getCreatedAtColumn() { + return "created_at"; + } + + default String getUpdatedAtColumn() { + return "updated_at"; + } +} diff --git a/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/contractnegotiation/store/schema/ContractNegotiationStatements.java b/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/contractnegotiation/store/schema/ContractNegotiationStatements.java index cffb0239c7b..cc7a5e04faf 100644 --- a/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/contractnegotiation/store/schema/ContractNegotiationStatements.java +++ b/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/contractnegotiation/store/schema/ContractNegotiationStatements.java @@ -16,13 +16,14 @@ import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.sql.lease.LeaseStatements; +import org.eclipse.edc.sql.lease.StatefulEntityStatements; import org.eclipse.edc.sql.translation.SqlQueryStatement; /** * Provides database-related constants, such as column names, table names and statement templates. Methods to compose * statements must be overridden by implementors. */ -public interface ContractNegotiationStatements extends LeaseStatements { +public interface ContractNegotiationStatements extends StatefulEntityStatements, LeaseStatements { String getFindTemplate(); String getFindContractAgreementTemplate(); @@ -97,14 +98,6 @@ default String getContractAgreementIdFkColumn() { return "agreement_id"; } - default String getStateColumn() { - return "state"; - } - - default String getStateCountColumn() { - return "state_count"; - } - default String getStateTimestampColumn() { return "state_timestamp"; } @@ -117,26 +110,10 @@ default String getCallbackAddressesColumn() { return "callback_addresses"; } - default String getErrorDetailColumn() { - return "error_detail"; - } - - default String getTraceContextColumn() { - return "trace_context"; - } - default String getTypeColumn() { return "type"; } - default String getCreatedAtColumn() { - return "created_at"; - } - - default String getUpdatedAtColumn() { - return "updated_at"; - } - default String getPendingColumn() { return "pending"; } diff --git a/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/contractnegotiation/store/schema/postgres/ContractNegotiationMapping.java b/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/contractnegotiation/store/schema/postgres/ContractNegotiationMapping.java index dee4db397fe..193c67d2ef8 100644 --- a/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/contractnegotiation/store/schema/postgres/ContractNegotiationMapping.java +++ b/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/contractnegotiation/store/schema/postgres/ContractNegotiationMapping.java @@ -16,40 +16,30 @@ import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation; import org.eclipse.edc.connector.store.sql.contractnegotiation.store.schema.ContractNegotiationStatements; -import org.eclipse.edc.sql.translation.TranslationMapping; +import org.eclipse.edc.sql.lease.StatefulEntityMapping; /** * Maps fields of a {@link ContractNegotiation} * onto the corresponding SQL schema (= column names) */ -class ContractNegotiationMapping extends TranslationMapping { - private static final String FIELD_ID = "id"; +class ContractNegotiationMapping extends StatefulEntityMapping { private static final String FIELD_CORRELATION_ID = "correlationId"; private static final String FIELD_COUNTER_PARTY_ID = "counterPartyId"; private static final String FIELD_COUNTERPARTY_ADDRESS = "counterPartyAddress"; private static final String FIELD_PROTOCOL = "protocol"; private static final String FIELD_TYPE = "type"; - private static final String FIELD_STATE = "state"; - private static final String FIELD_STATECOUNT = "stateCount"; - private static final String FIELD_STATETIMESTAMP = "stateTimestamp"; - private static final String FIELD_ERRORDETAIL = "errorDetail"; private static final String FIELD_CONTRACT_AGREEMENT = "contractAgreement"; private static final String FIELD_TRACECONTEXT = "traceContext"; private static final String FIELD_PENDING = "pending"; ContractNegotiationMapping(ContractNegotiationStatements statements) { - // cannot use Map.of(), because that only accepts 10 pairs - add(FIELD_ID, statements.getIdColumn()); + super(statements); add(FIELD_CORRELATION_ID, statements.getCorrelationIdColumn()); add(FIELD_COUNTER_PARTY_ID, statements.getCounterPartyIdColumn()); add(FIELD_COUNTERPARTY_ADDRESS, statements.getCounterPartyAddressColumn()); add(FIELD_PROTOCOL, statements.getProtocolColumn()); add(FIELD_TYPE, statements.getTypeColumn()); - add(FIELD_STATE, statements.getStateColumn()); - add(FIELD_STATECOUNT, statements.getStateCountColumn()); - add(FIELD_STATETIMESTAMP, statements.getStateTimestampColumn()); - add(FIELD_ERRORDETAIL, statements.getErrorDetailColumn()); add(FIELD_PENDING, statements.getPendingColumn()); fieldMap.put(FIELD_CONTRACT_AGREEMENT, new ContractAgreementMapping(statements)); diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java index b542cdc747a..4498baae0c3 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java @@ -17,13 +17,14 @@ import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.sql.lease.LeaseStatements; +import org.eclipse.edc.sql.lease.StatefulEntityStatements; import org.eclipse.edc.sql.translation.SqlQueryStatement; /** * Statement templates and SQL table+column names required for the TransferProcessStore */ @ExtensionPoint -public interface TransferProcessStoreStatements extends LeaseStatements { +public interface TransferProcessStoreStatements extends StatefulEntityStatements, LeaseStatements { String getInsertStatement(); @@ -37,28 +38,12 @@ public interface TransferProcessStoreStatements extends LeaseStatements { String getUpdateDataRequestTemplate(); - default String getIdColumn() { - return "transferprocess_id"; - } - default String getTransferProcessTableName() { return "edc_transfer_process"; } - default String getStateColumn() { - return "state"; - } - - default String getStateTimestampColumn() { - return "state_time_stamp"; - } - - default String getTraceContextColumn() { - return "trace_context"; - } - - default String getErrorDetailColumn() { - return "error_detail"; + default String getIdColumn() { + return "transferprocess_id"; } default String getResourceManifestColumn() { @@ -73,14 +58,6 @@ default String getTypeColumn() { return "type"; } - default String getCreatedAtColumn() { - return "created_at"; - } - - default String getUpdatedAtColumn() { - return "updated_at"; - } - default String getContentDataAddressColumn() { return "content_data_address"; } @@ -125,10 +102,6 @@ default String getDataDestinationColumn() { return "data_destination"; } - default String getStateCountColumn() { - return "state_count"; - } - default String getDataRequestIdColumn() { return "datarequest_id"; } diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java index 480cf7c820f..d65aaf4f596 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java @@ -16,21 +16,17 @@ import org.eclipse.edc.connector.store.sql.transferprocess.store.schema.TransferProcessStoreStatements; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.sql.lease.StatefulEntityMapping; import org.eclipse.edc.sql.translation.JsonFieldMapping; -import org.eclipse.edc.sql.translation.TranslationMapping; /** * Maps fields of a {@link TransferProcess} onto the * corresponding SQL schema (= column names) enabling access through Postgres JSON operators where applicable */ -public class TransferProcessMapping extends TranslationMapping { +public class TransferProcessMapping extends StatefulEntityMapping { - private static final String FIELD_ID = "id"; private static final String FIELD_TYPE = "type"; - private static final String FIELD_STATE = "state"; private static final String FIELD_CREATED_TIMESTAMP = "createdAt"; - private static final String FIELD_TRACECONTEXT = "traceContext"; - private static final String FIELD_ERRORDETAIL = "errorDetail"; private static final String FIELD_DATAREQUEST = "dataRequest"; private static final String FIELD_DATAADDRESS = "dataAddress"; // this actually an alias for "dataAddress": @@ -44,12 +40,9 @@ public class TransferProcessMapping extends TranslationMapping { public TransferProcessMapping(TransferProcessStoreStatements statements) { - add(FIELD_ID, statements.getIdColumn()); + super(statements); add(FIELD_TYPE, statements.getTypeColumn()); - add(FIELD_STATE, statements.getStateColumn()); add(FIELD_CREATED_TIMESTAMP, statements.getCreatedAtColumn()); - add(FIELD_TRACECONTEXT, new JsonFieldMapping(statements.getTraceContextColumn())); - add(FIELD_ERRORDETAIL, statements.getErrorDetailColumn()); add(FIELD_DATAREQUEST, new DataRequestMapping(statements)); add(FIELD_DATAADDRESS, new JsonFieldMapping(statements.getContentDataAddressColumn())); add(FIELD_CONTENTDATAADDRESS, new JsonFieldMapping(statements.getContentDataAddressColumn())); diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java index 2344e6badc4..e2e6cc0591b 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java @@ -89,9 +89,9 @@ public SqlDataPlaneStore(DataSourceRegistry dataSourceRegistry, String dataSourc var connection = getConnection(); var stream = queryExecutor.query(connection, true, this::mapDataFlow, statement.getQueryAsString(), statement.getParameters()) ) { - var transferProcesses = stream.collect(Collectors.toList()); - transferProcesses.forEach(transferProcess -> leaseContext.withConnection(connection).acquireLease(transferProcess.getId())); - return transferProcesses; + var entries = stream.collect(Collectors.toList()); + entries.forEach(entry -> leaseContext.withConnection(connection).acquireLease(entry.getId())); + return entries; } catch (SQLException e) { throw new EdcPersistenceException(e); } @@ -104,13 +104,13 @@ public StoreResult findByIdAndLease(String id) { try (var connection = getConnection()) { var entity = findByIdInternal(connection, id); if (entity == null) { - return StoreResult.notFound(format("TransferProcess %s not found", id)); + return StoreResult.notFound(format("DataFlow %s not found", id)); } leaseContext.withConnection(connection).acquireLease(entity.getId()); return StoreResult.success(entity); } catch (IllegalStateException e) { - return StoreResult.alreadyLeased(format("TransferProcess %s is already leased", id)); + return StoreResult.alreadyLeased(format("DataFlow %s is already leased", id)); } catch (SQLException e) { throw new EdcPersistenceException(e); } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java index 4cd69b2ad80..3a606a0cda4 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java @@ -22,11 +22,6 @@ public class BaseSqlDataPlaneStatements implements DataPlaneStatements { - @Override - public String getFindByIdTemplate() { - return String.format("SELECT * FROM %s WHERE %s = ?", getDataPlaneTable(), getIdColumn()); - } - @Override public String getInsertTemplate() { return executeStatement() diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java index fbe4ed6924f..2d75f3126fd 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java @@ -16,12 +16,13 @@ import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.sql.lease.LeaseStatements; +import org.eclipse.edc.sql.lease.StatefulEntityStatements; import org.eclipse.edc.sql.translation.SqlQueryStatement; /** * Sql Statements for DataPlane Store */ -public interface DataPlaneStatements extends LeaseStatements { +public interface DataPlaneStatements extends StatefulEntityStatements, LeaseStatements { default String getIdColumn() { return "process_id"; @@ -31,34 +32,6 @@ default String getDataPlaneTable() { return "edc_data_plane"; } - default String getCreatedAtColumn() { - return "created_at"; - } - - default String getUpdatedAtColumn() { - return "updated_at"; - } - - default String getStateColumn() { - return "state"; - } - - default String getTraceContextColumn() { - return "trace_context"; - } - - default String getStateCountColumn() { - return "state_count"; - } - - default String getStateTimestampColumn() { - return "state_time_stamp"; - } - - default String getErrorDetailColumn() { - return "error_detail"; - } - default String getCallbackAddressColumn() { return "callback_address"; } @@ -79,8 +52,6 @@ default String getPropertiesColumn() { return "properties"; } - String getFindByIdTemplate(); - String getInsertTemplate(); String getUpdateTemplate(); diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataPlaneMapping.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataPlaneMapping.java index a0cd01cc99c..f923f59e9cd 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataPlaneMapping.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataPlaneMapping.java @@ -16,26 +16,15 @@ import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.store.sql.schema.DataPlaneStatements; -import org.eclipse.edc.sql.translation.JsonFieldMapping; -import org.eclipse.edc.sql.translation.TranslationMapping; +import org.eclipse.edc.sql.lease.StatefulEntityMapping; /** * Maps fields of a {@link DataFlow} onto the * corresponding SQL schema (= column names) enabling access through Postgres JSON operators where applicable */ -public class DataPlaneMapping extends TranslationMapping { - - private static final String FIELD_ID = "id"; - private static final String FIELD_STATE = "state"; - private static final String FIELD_CREATED_TIMESTAMP = "createdAt"; - private static final String FIELD_TRACE_CONTEXT = "traceContext"; - private static final String FIELD_ERROR_DETAIL = "errorDetail"; +public class DataPlaneMapping extends StatefulEntityMapping { public DataPlaneMapping(DataPlaneStatements statements) { - add(FIELD_ID, statements.getIdColumn()); - add(FIELD_STATE, statements.getStateColumn()); - add(FIELD_CREATED_TIMESTAMP, statements.getCreatedAtColumn()); - add(FIELD_TRACE_CONTEXT, new JsonFieldMapping(statements.getTraceContextColumn())); - add(FIELD_ERROR_DETAIL, statements.getErrorDetailColumn()); + super(statements); } } diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/build.gradle.kts b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/build.gradle.kts new file mode 100644 index 00000000000..fb4aa827ca8 --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +plugins { + `java-library` + `maven-publish` +} + +dependencies { + api(project(":spi:common:transaction-spi")) + api(project(":spi:policy-monitor:policy-monitor-spi")) + + implementation(project(":spi:common:transaction-datasource-spi")) + implementation(project(":extensions:common:sql:sql-core")) + implementation(project(":extensions:common:sql:sql-lease")) + + testImplementation(project(":core:common:junit")) + testImplementation(testFixtures(project(":spi:policy-monitor:policy-monitor-spi"))) + testImplementation(testFixtures(project(":extensions:common:sql:sql-lease"))) + testImplementation(testFixtures(project(":extensions:common:sql:sql-core"))) + +} + + diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/docs/schema.sql b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/docs/schema.sql new file mode 100644 index 00000000000..9400e939df9 --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/docs/schema.sql @@ -0,0 +1,32 @@ +-- Statements are designed for and tested with Postgres only! + +CREATE TABLE IF NOT EXISTS edc_lease +( + leased_by VARCHAR NOT NULL, + leased_at BIGINT, + lease_duration INTEGER NOT NULL, + lease_id VARCHAR NOT NULL + CONSTRAINT lease_pk + PRIMARY KEY +); + +COMMENT ON COLUMN edc_lease.leased_at IS 'posix timestamp of lease'; +COMMENT ON COLUMN edc_lease.lease_duration IS 'duration of lease in milliseconds'; + +CREATE TABLE IF NOT EXISTS edc_policy_monitor +( + entry_id VARCHAR NOT NULL PRIMARY KEY, + state INTEGER NOT NULL , + created_at BIGINT NOT NULL , + updated_at BIGINT NOT NULL , + state_count INTEGER DEFAULT 0 NOT NULL, + state_time_stamp BIGINT, + trace_context JSON, + error_detail VARCHAR, + lease_id VARCHAR + CONSTRAINT policy_monitor_lease_lease_id_fk + REFERENCES edc_lease + ON DELETE SET NULL, + properties JSON, + contract_id VARCHAR +); diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStore.java b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStore.java new file mode 100644 index 00000000000..eca5eb33f54 --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStore.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntry; +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; +import org.eclipse.edc.connector.policy.monitor.store.sql.schema.PolicyMonitorStatements; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.lease.SqlLeaseContextBuilder; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Clock; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; +import static org.eclipse.edc.spi.query.Criterion.criterion; + +public class SqlPolicyMonitorStore extends AbstractSqlStore implements PolicyMonitorStore { + + private final PolicyMonitorStatements statements; + private final SqlLeaseContextBuilder leaseContext; + private final Clock clock; + private final String leaseHolderName; + + public SqlPolicyMonitorStore(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext, + PolicyMonitorStatements statements, ObjectMapper objectMapper, Clock clock, + QueryExecutor queryExecutor, String leaseHolderName) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); + this.statements = statements; + this.clock = clock; + this.leaseHolderName = leaseHolderName; + leaseContext = SqlLeaseContextBuilder.with(transactionContext, leaseHolderName, statements, clock, queryExecutor); + } + + @Override + public @Nullable PolicyMonitorEntry findById(String id) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + return findByIdInternal(connection, id); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public @NotNull List nextNotLeased(int max, Criterion... criteria) { + return transactionContext.execute(() -> { + var filter = Arrays.stream(criteria).collect(toList()); + var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build(); + var statement = statements.createQuery(querySpec); + statement.addWhereClause(statements.getNotLeasedFilter()); + statement.addParameter(clock.millis()); + + try ( + var connection = getConnection(); + var stream = queryExecutor.query(connection, true, this::mapEntry, statement.getQueryAsString(), statement.getParameters()) + ) { + var entries = stream.collect(Collectors.toList()); + entries.forEach(entry -> leaseContext.withConnection(connection).acquireLease(entry.getId())); + return entries; + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public StoreResult findByIdAndLease(String id) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + var entity = findByIdInternal(connection, id); + if (entity == null) { + return StoreResult.notFound(format("DataFlow %s not found", id)); + } + + leaseContext.withConnection(connection).acquireLease(entity.getId()); + return StoreResult.success(entity); + } catch (IllegalStateException e) { + return StoreResult.alreadyLeased(format("DataFlow %s is already leased", id)); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public void save(PolicyMonitorEntry entity) { + transactionContext.execute(() -> { + try (var connection = getConnection()) { + var existing = findByIdInternal(connection, entity.getId()); + if (existing != null) { + leaseContext.by(leaseHolderName).withConnection(connection).breakLease(entity.getId()); + update(connection, entity); + } else { + insert(connection, entity); + } + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + private @Nullable PolicyMonitorEntry findByIdInternal(Connection conn, String id) { + return transactionContext.execute(() -> { + var querySpec = QuerySpec.Builder.newInstance().filter(criterion("id", "=", id)).build(); + var statement = statements.createQuery(querySpec); + return queryExecutor.query(conn, true, this::mapEntry, statement.getQueryAsString(), statement.getParameters()) + .findFirst().orElse(null); + }); + } + + private void insert(Connection connection, PolicyMonitorEntry entry) { + var sql = statements.getInsertTemplate(); + queryExecutor.execute(connection, sql, + entry.getId(), + entry.getState(), + entry.getCreatedAt(), + entry.getUpdatedAt(), + entry.getStateCount(), + entry.getStateTimestamp(), + toJson(entry.getTraceContext()), + entry.getErrorDetail(), + entry.getContractId() + ); + } + + private void update(Connection connection, PolicyMonitorEntry entry) { + var sql = statements.getUpdateTemplate(); + queryExecutor.execute(connection, sql, + entry.getState(), + entry.getUpdatedAt(), + entry.getStateCount(), + entry.getStateTimestamp(), + toJson(entry.getTraceContext()), + entry.getErrorDetail(), + entry.getContractId(), + entry.getId()); + } + + private PolicyMonitorEntry mapEntry(ResultSet resultSet) throws SQLException { + return PolicyMonitorEntry.Builder.newInstance() + .id(resultSet.getString(statements.getIdColumn())) + .createdAt(resultSet.getLong(statements.getCreatedAtColumn())) + .updatedAt(resultSet.getLong(statements.getUpdatedAtColumn())) + .state(resultSet.getInt(statements.getStateColumn())) + .stateTimestamp(resultSet.getLong(statements.getStateTimestampColumn())) + .stateCount(resultSet.getInt(statements.getStateCountColumn())) + .traceContext(fromJson(resultSet.getString(statements.getTraceContextColumn()), getTypeRef())) + .errorDetail(resultSet.getString(statements.getErrorDetailColumn())) + .contractId(resultSet.getString(statements.getContractIdColumn())) + .build(); + } +} diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStoreExtension.java b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStoreExtension.java new file mode 100644 index 00000000000..e0179380840 --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStoreExtension.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql; + +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; +import org.eclipse.edc.connector.policy.monitor.store.sql.schema.PolicyMonitorStatements; +import org.eclipse.edc.connector.policy.monitor.store.sql.schema.PostgresPolicyMonitorStatements; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Provider; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; + +import java.time.Clock; + +import static org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry.DEFAULT_DATASOURCE; + +public class SqlPolicyMonitorStoreExtension implements ServiceExtension { + + @Setting(value = "Name of the datasource to use for accessing policy monitor store", defaultValue = DEFAULT_DATASOURCE) + private static final String DATASOURCE_SETTING_NAME = "edc.datasource.policy-monitor.name"; + + @Inject + private DataSourceRegistry dataSourceRegistry; + + @Inject + private TransactionContext transactionContext; + + @Inject(required = false) + private PolicyMonitorStatements statements; + + @Inject + private Clock clock; + + @Inject + private TypeManager typeManager; + + @Inject + private QueryExecutor queryExecutor; + + @Provider + public PolicyMonitorStore policyMonitorStore(ServiceExtensionContext context) { + var dataSourceName = context.getConfig().getString(DATASOURCE_SETTING_NAME, DEFAULT_DATASOURCE); + return new SqlPolicyMonitorStore(dataSourceRegistry, dataSourceName, transactionContext, + getStatementImpl(), typeManager.getMapper(), clock, queryExecutor, context.getConnectorId()); + } + + /** + * returns an externally-provided sql statement dialect, or postgres as a default + */ + private PolicyMonitorStatements getStatementImpl() { + return statements != null ? statements : new PostgresPolicyMonitorStatements(); + } + +} diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/BaseSqlPolicyMonitorStatements.java b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/BaseSqlPolicyMonitorStatements.java new file mode 100644 index 00000000000..328d36d1f09 --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/BaseSqlPolicyMonitorStatements.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql.schema; + +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +import static java.lang.String.format; + +public class BaseSqlPolicyMonitorStatements implements PolicyMonitorStatements { + + @Override + public String getInsertTemplate() { + return executeStatement() + .column(getIdColumn()) + .column(getStateColumn()) + .column(getCreatedAtColumn()) + .column(getUpdatedAtColumn()) + .column(getStateCountColumn()) + .column(getStateTimestampColumn()) + .jsonColumn(getTraceContextColumn()) + .column(getErrorDetailColumn()) + .column(getContractIdColumn()) + .insertInto(getPolicyMonitorTable()); + } + + @Override + public String getUpdateTemplate() { + return executeStatement() + .column(getStateColumn()) + .column(getUpdatedAtColumn()) + .column(getStateCountColumn()) + .column(getStateTimestampColumn()) + .jsonColumn(getTraceContextColumn()) + .column(getErrorDetailColumn()) + .column(getContractIdColumn()) + .update(getPolicyMonitorTable(), getIdColumn()); + } + + @Override + public String getSelectTemplate() { + return "SELECT * FROM %s".formatted(getPolicyMonitorTable()); + } + + @Override + public SqlQueryStatement createQuery(QuerySpec querySpec) { + return new SqlQueryStatement(getSelectTemplate(), querySpec, new PolicyMonitorMapping(this)); + } + + @Override + public String getDeleteLeaseTemplate() { + return executeStatement().delete(getLeaseTableName(), getLeaseIdColumn()); + } + + @Override + public String getInsertLeaseTemplate() { + return executeStatement() + .column(getLeaseIdColumn()) + .column(getLeasedByColumn()) + .column(getLeasedAtColumn()) + .column(getLeaseDurationColumn()) + .insertInto(getLeaseTableName()); + } + + @Override + public String getUpdateLeaseTemplate() { + return executeStatement() + .column(getLeaseIdColumn()) + .update(getPolicyMonitorTable(), getIdColumn()); + } + + @Override + public String getFindLeaseByEntityTemplate() { + return format("SELECT * FROM %s WHERE %s = (SELECT lease_id FROM %s WHERE %s=? )", + getLeaseTableName(), getLeaseIdColumn(), getPolicyMonitorTable(), getIdColumn()); + } +} diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PolicyMonitorMapping.java b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PolicyMonitorMapping.java new file mode 100644 index 00000000000..7bda0962c6c --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PolicyMonitorMapping.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql.schema; + +import org.eclipse.edc.sql.lease.StatefulEntityMapping; + +/** + * Maps fields of a {@link org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntry} onto the + * corresponding SQL schema (= column names) enabling access through Postgres JSON operators where applicable + */ +public class PolicyMonitorMapping extends StatefulEntityMapping { + + public PolicyMonitorMapping(PolicyMonitorStatements statements) { + super(statements); + add("contractId", statements.getContractIdColumn()); + } + +} diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PolicyMonitorStatements.java b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PolicyMonitorStatements.java new file mode 100644 index 00000000000..0b998aa71dc --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PolicyMonitorStatements.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql.schema; + +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.lease.LeaseStatements; +import org.eclipse.edc.sql.lease.StatefulEntityStatements; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +public interface PolicyMonitorStatements extends StatefulEntityStatements, LeaseStatements { + + default String getPolicyMonitorTable() { + return "edc_policy_monitor"; + } + + default String getIdColumn() { + return "entry_id"; + } + + default String getContractIdColumn() { + return "contract_id"; + } + + String getInsertTemplate(); + + String getUpdateTemplate(); + + String getSelectTemplate(); + + SqlQueryStatement createQuery(QuerySpec querySpec); +} diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PostgresPolicyMonitorStatements.java b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PostgresPolicyMonitorStatements.java new file mode 100644 index 00000000000..167eab65852 --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/schema/PostgresPolicyMonitorStatements.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql.schema; + +import org.eclipse.edc.sql.dialect.PostgresDialect; + +public class PostgresPolicyMonitorStatements extends BaseSqlPolicyMonitorStatements { + + @Override + public String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } + +} diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000000..0b4089aebbd --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,16 @@ +# +# Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# +# Contributors: +# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +# +# + +org.eclipse.edc.connector.policy.monitor.store.sql.SqlPolicyMonitorStoreExtension + diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/test/java/org/eclipse/edc/connector/policy/monitor/store/sql/PostgresPolicyMonitorStoreTest.java b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/test/java/org/eclipse/edc/connector/policy/monitor/store/sql/PostgresPolicyMonitorStoreTest.java new file mode 100644 index 00000000000..bf595834c64 --- /dev/null +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/test/java/org/eclipse/edc/connector/policy/monitor/store/sql/PostgresPolicyMonitorStoreTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2020 - 2022 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.store.sql; + +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; +import org.eclipse.edc.connector.policy.monitor.spi.testfixtures.store.PolicyMonitorStoreTestBase; +import org.eclipse.edc.connector.policy.monitor.store.sql.schema.PolicyMonitorStatements; +import org.eclipse.edc.connector.policy.monitor.store.sql.schema.PostgresPolicyMonitorStatements; +import org.eclipse.edc.junit.annotations.ComponentTest; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.lease.testfixtures.LeaseUtil; +import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.Clock; +import java.time.Duration; + + +@ComponentTest +@ExtendWith(PostgresqlStoreSetupExtension.class) +public class PostgresPolicyMonitorStoreTest extends PolicyMonitorStoreTestBase { + + private final PolicyMonitorStatements statements = new PostgresPolicyMonitorStatements(); + private LeaseUtil leaseUtil; + private SqlPolicyMonitorStore store; + + @BeforeEach + void setUp(PostgresqlStoreSetupExtension extension, QueryExecutor queryExecutor) throws IOException { + + var typeManager = new TypeManager(); + + var clock = Clock.systemUTC(); + + leaseUtil = new LeaseUtil(extension.getTransactionContext(), extension::getConnection, statements, clock); + store = new SqlPolicyMonitorStore(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), + statements, typeManager.getMapper(), clock, queryExecutor, "test-connector"); + var schema = Files.readString(Paths.get("./docs/schema.sql")); + extension.runQuery(schema); + } + + @AfterEach + void tearDown(PostgresqlStoreSetupExtension extension) { + extension.runQuery("DROP TABLE " + statements.getPolicyMonitorTable() + " CASCADE"); + } + + @Override + protected PolicyMonitorStore getStore() { + return store; + } + + @Override + protected void leaseEntity(String negotiationId, String owner, Duration duration) { + getLeaseUtil().leaseEntity(negotiationId, owner, duration); + } + + @Override + protected boolean isLeasedBy(String negotiationId, String owner) { + return getLeaseUtil().isLeased(negotiationId, owner); + } + + protected LeaseUtil getLeaseUtil() { + return leaseUtil; + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 5a4d5760fc4..d5d8a65c51b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -167,6 +167,8 @@ include(":extensions:data-plane-selector:data-plane-selector-api") include(":extensions:data-plane-selector:data-plane-selector-client") include(":extensions:data-plane-selector:store:sql:data-plane-instance-store-sql") +include(":extensions:policy-monitor:store:sql:policy-monitor-store-sql") + // modules for launchers, i.e. runnable compositions of the app ------------------------------------ include(":launchers:data-plane-server") diff --git a/spi/control-plane/contract-spi/src/testFixtures/java/org/eclipse/edc/connector/contract/spi/testfixtures/negotiation/store/ContractNegotiationStoreTestBase.java b/spi/control-plane/contract-spi/src/testFixtures/java/org/eclipse/edc/connector/contract/spi/testfixtures/negotiation/store/ContractNegotiationStoreTestBase.java index ee4adffbdf3..9cf5ebad7a0 100644 --- a/spi/control-plane/contract-spi/src/testFixtures/java/org/eclipse/edc/connector/contract/spi/testfixtures/negotiation/store/ContractNegotiationStoreTestBase.java +++ b/spi/control-plane/contract-spi/src/testFixtures/java/org/eclipse/edc/connector/contract/spi/testfixtures/negotiation/store/ContractNegotiationStoreTestBase.java @@ -63,6 +63,7 @@ public abstract class ContractNegotiationStoreTestBase { protected static final String CONNECTOR_NAME = "test-connector"; + protected final Clock clock = Clock.systemUTC(); private static final String ASSET_ID = "TEST_ASSET_ID"; @Nested @@ -240,7 +241,7 @@ void leasedBySelf_shouldBreakLease() { var newNegotiation = builder .stateCount(420) //modified .state(800) //modified - .updatedAt(Clock.systemUTC().millis()) + .updatedAt(clock.millis()) .build(); // update should break lease diff --git a/spi/policy-monitor/policy-monitor-spi/build.gradle.kts b/spi/policy-monitor/policy-monitor-spi/build.gradle.kts index 010839e163a..d84636a0237 100644 --- a/spi/policy-monitor/policy-monitor-spi/build.gradle.kts +++ b/spi/policy-monitor/policy-monitor-spi/build.gradle.kts @@ -14,10 +14,16 @@ plugins { `java-library` + `java-test-fixtures` } dependencies { api(project(":spi:common:core-spi")) + + testFixturesApi(project(":core:common:junit")) + testFixturesImplementation(libs.bundles.jupiter) + testFixturesImplementation(libs.assertj) + testFixturesImplementation(libs.awaitility) } diff --git a/spi/policy-monitor/policy-monitor-spi/src/testFixtures/java/org/eclipse/edc/connector/policy/monitor/spi/testfixtures/store/PolicyMonitorStoreTestBase.java b/spi/policy-monitor/policy-monitor-spi/src/testFixtures/java/org/eclipse/edc/connector/policy/monitor/spi/testfixtures/store/PolicyMonitorStoreTestBase.java new file mode 100644 index 00000000000..76d7a4dee97 --- /dev/null +++ b/spi/policy-monitor/policy-monitor-spi/src/testFixtures/java/org/eclipse/edc/connector/policy/monitor/spi/testfixtures/store/PolicyMonitorStoreTestBase.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.policy.monitor.spi.testfixtures.store; + +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntry; +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntryStates; +import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore; +import org.eclipse.edc.spi.entity.Entity; +import org.eclipse.edc.spi.entity.MutableEntity; +import org.eclipse.edc.spi.entity.StatefulEntity; +import org.eclipse.edc.spi.result.StoreFailure; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.UUID; + +import static java.util.stream.IntStream.range; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntryStates.COMPLETED; +import static org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntryStates.STARTED; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; +import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; +import static org.eclipse.edc.spi.result.StoreFailure.Reason.ALREADY_LEASED; +import static org.eclipse.edc.spi.result.StoreFailure.Reason.NOT_FOUND; +import static org.hamcrest.Matchers.hasSize; + +public abstract class PolicyMonitorStoreTestBase { + + protected static final String CONNECTOR_NAME = "test-connector"; + + @Nested + class Create { + + @Test + void shouldStoreEntity_whenItDoesNotAlreadyExist() { + var entry = createPolicyMonitorEntry(UUID.randomUUID().toString(), STARTED); + getStore().save(entry); + + var result = getStore().findById(entry.getId()); + + assertThat(result).isNotNull().usingRecursiveComparison().isEqualTo(entry); + assertThat(result.getCreatedAt()).isGreaterThan(0); + } + + @Test + void shouldUpdate_whenEntityAlreadyExist() { + var entry = createPolicyMonitorEntry(UUID.randomUUID().toString(), STARTED); + getStore().save(entry); + + entry.transitionToCompleted(); + getStore().save(entry); + + var result = getStore().findById(entry.getId()); + + assertThat(result).isNotNull(); + assertThat(result.getState()).isEqualTo(COMPLETED.code()); + } + } + + @Nested + class NextNotLeased { + @Test + void shouldReturnNotLeasedItems() { + var state = STARTED; + var all = range(0, 5) + .mapToObj(i -> createPolicyMonitorEntry("id-" + i, state)) + .peek(getStore()::save) + .peek(this::delayByTenMillis) + .toList(); + + var leased = getStore().nextNotLeased(2, hasState(state.code())); + + assertThat(leased).hasSize(2).extracting(PolicyMonitorEntry::getId) + .isSubsetOf(all.stream().map(Entity::getId).toList()) + .allMatch(id -> isLeasedBy(id, CONNECTOR_NAME)); + + assertThat(leased).extracting(MutableEntity::getUpdatedAt).isSorted(); + } + + @Test + void shouldReturnFreeEntities() { + var state = STARTED; + var all = range(0, 5) + .mapToObj(i -> createPolicyMonitorEntry("id-" + i, state)) + .peek(getStore()::save) + .toList(); + + var firstLeased = getStore().nextNotLeased(2, hasState(state.code())); + var leased = getStore().nextNotLeased(2, hasState(state.code())); + + assertThat(leased.stream().map(Entity::getId)).hasSize(2) + .isSubsetOf(all.stream().map(Entity::getId).toList()) + .doesNotContainAnyElementsOf(firstLeased.stream().map(Entity::getId).toList()); + } + + @Test + void shouldReturnFreeItemInTheExpectedState() { + range(0, 5) + .mapToObj(i -> createPolicyMonitorEntry("id-" + i, STARTED)) + .forEach(getStore()::save); + + var leased = getStore().nextNotLeased(2, hasState(COMPLETED.code())); + + assertThat(leased).isEmpty(); + } + + @Test + void shouldLeaseAgainAfterTimePassed() { + var entry = createPolicyMonitorEntry(UUID.randomUUID().toString(), STARTED); + getStore().save(entry); + + leaseEntity(entry.getId(), CONNECTOR_NAME, Duration.ofMillis(100)); + + await().atMost(Duration.ofMillis(500)) + .until(() -> getStore().nextNotLeased(1, hasState(STARTED.code())), hasSize(1)); + } + + @Test + void shouldReturnReleasedEntityByUpdate() { + var entry = createPolicyMonitorEntry(UUID.randomUUID().toString(), STARTED); + getStore().save(entry); + + var firstLeased = getStore().nextNotLeased(1, hasState(STARTED.code())); + assertThat(firstLeased).hasSize(1); + + var secondLeased = getStore().nextNotLeased(1, hasState(STARTED.code())); + assertThat(secondLeased).isEmpty(); + + getStore().save(firstLeased.get(0)); + + var thirdLeased = getStore().nextNotLeased(1, hasState(STARTED.code())); + assertThat(thirdLeased).hasSize(1); + } + + private void delayByTenMillis(StatefulEntity t) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + // noop + } + t.updateStateTimestamp(); + } + } + + @Nested + class FindByIdAndLease { + @Test + void shouldReturnTheEntityAndLeaseIt() { + var id = UUID.randomUUID().toString(); + getStore().save(createPolicyMonitorEntry(id, STARTED)); + + var result = getStore().findByIdAndLease(id); + + assertThat(result).isSucceeded(); + assertThat(isLeasedBy(id, CONNECTOR_NAME)).isTrue(); + } + + @Test + void shouldReturnNotFound_whenEntityDoesNotExist() { + var result = getStore().findByIdAndLease("unexistent"); + + assertThat(result).isFailed().extracting(StoreFailure::getReason).isEqualTo(NOT_FOUND); + } + + @Test + void shouldReturnAlreadyLeased_whenEntityIsAlreadyLeased() { + var id = UUID.randomUUID().toString(); + getStore().save(createPolicyMonitorEntry(id, STARTED)); + leaseEntity(id, "other owner"); + + var result = getStore().findByIdAndLease(id); + + assertThat(result).isFailed().extracting(StoreFailure::getReason).isEqualTo(ALREADY_LEASED); + } + } + + private PolicyMonitorEntry createPolicyMonitorEntry(String id, PolicyMonitorEntryStates state) { + return PolicyMonitorEntry.Builder.newInstance() + .id(id) + .contractId(UUID.randomUUID().toString()) + .state(state.code()) + .build(); + } + + protected abstract PolicyMonitorStore getStore(); + + protected abstract void leaseEntity(String entityId, String owner, Duration duration); + + protected void leaseEntity(String entityId, String owner) { + leaseEntity(entityId, owner, Duration.ofSeconds(60)); + } + + protected abstract boolean isLeasedBy(String entityId, String owner); +} diff --git a/system-tests/e2e-transfer-test/control-plane-postgresql/build.gradle.kts b/system-tests/e2e-transfer-test/control-plane-postgresql/build.gradle.kts index b7804d7021d..4b618111610 100644 --- a/system-tests/e2e-transfer-test/control-plane-postgresql/build.gradle.kts +++ b/system-tests/e2e-transfer-test/control-plane-postgresql/build.gradle.kts @@ -21,8 +21,9 @@ dependencies { runtimeOnly(project(":extensions:control-plane:store:sql:control-plane-sql")) runtimeOnly(project(":extensions:common:sql:sql-pool:sql-pool-apache-commons")) runtimeOnly(project(":extensions:common:transaction:transaction-local")) - runtimeOnly(libs.postgres) runtimeOnly(project(":extensions:common:api:management-api-configuration")) + runtimeOnly(project(":extensions:policy-monitor:store:sql:policy-monitor-store-sql")) + runtimeOnly(libs.postgres) } edcBuild { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/PostgresUtil.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/PostgresUtil.java index 674ba745463..4277afd7cca 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/PostgresUtil.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/PostgresUtil.java @@ -37,20 +37,18 @@ public static void createDatabase(EndToEndTransferParticipant consumer) throws C var helper = new PostgresqlLocalInstance(USER, PASSWORD, JDBC_URL_PREFIX, consumer.getName()); helper.createDatabase(consumer.getName()); - var controlPlaneScripts = Stream.of( - "asset-index-sql", - "contract-definition-store-sql", - "contract-negotiation-store-sql", - "policy-definition-store-sql", - "transfer-process-store-sql") - .map(module -> "../../../extensions/control-plane/store/sql/" + module + "/docs/schema.sql") - .map(Paths::get); - - var dataPlaneScripts = Stream.of("data-plane-store-sql") - .map(module -> "../../../extensions/data-plane/store/sql/" + module + "/docs/schema.sql") - .map(Paths::get); - - var scripts = Stream.concat(controlPlaneScripts, dataPlaneScripts).toList(); + var scripts = Stream.of( + "extensions/control-plane/store/sql/asset-index-sql", + "extensions/control-plane/store/sql/contract-definition-store-sql", + "extensions/control-plane/store/sql/contract-negotiation-store-sql", + "extensions/control-plane/store/sql/policy-definition-store-sql", + "extensions/control-plane/store/sql/transfer-process-store-sql", + "extensions/data-plane/store/sql/data-plane-store-sql", + "extensions/policy-monitor/store/sql/policy-monitor-store-sql" + ) + .map("../../../%s/docs/schema.sql"::formatted) + .map(Paths::get) + .toList(); try (var connection = DriverManager.getConnection(consumer.jdbcUrl(), USER, PASSWORD)) { for (var script : scripts) {