diff --git a/core/common/boot/src/main/java/org/eclipse/edc/boot/system/DefaultServiceExtensionContext.java b/core/common/boot/src/main/java/org/eclipse/edc/boot/system/DefaultServiceExtensionContext.java index 2ea8deaa856..5826755ae32 100644 --- a/core/common/boot/src/main/java/org/eclipse/edc/boot/system/DefaultServiceExtensionContext.java +++ b/core/common/boot/src/main/java/org/eclipse/edc/boot/system/DefaultServiceExtensionContext.java @@ -117,6 +117,9 @@ public void initialize() { runtimeId = getSetting(RUNTIME_ID, null); if (runtimeId != null) { getMonitor().warning("A configuration value for '%s' was found. Explicitly configuring this is not supported anymore and may get removed in the future. A random value will be assigned.".formatted(RUNTIME_ID)); + } else { + // runtime-id should always be randomized to guarantee a working lease mechanism + runtimeId = UUID.randomUUID().toString(); } componentId = getSetting(COMPONENT_ID, null); @@ -128,8 +131,6 @@ public void initialize() { }); } - // runtime-id should always be randomized to guarantee a working lease mechanism - runtimeId = UUID.randomUUID().toString(); } } diff --git a/core/common/boot/src/main/java/org/eclipse/edc/boot/system/runtime/BaseRuntime.java b/core/common/boot/src/main/java/org/eclipse/edc/boot/system/runtime/BaseRuntime.java index b7dd73793a3..7a52d79857e 100644 --- a/core/common/boot/src/main/java/org/eclipse/edc/boot/system/runtime/BaseRuntime.java +++ b/core/common/boot/src/main/java/org/eclipse/edc/boot/system/runtime/BaseRuntime.java @@ -112,7 +112,7 @@ public void boot(boolean addShutdownHook) { healthCheckService.addStartupStatusProvider(startupStatusRef::get); } - } catch (Exception e) { + } catch (Throwable e) { onError(e); } @@ -147,7 +147,7 @@ protected ServiceExtensionContext createServiceExtensionContext(Config config) { /** * Callback for any error that happened during runtime initialization */ - protected void onError(Exception e) { + protected void onError(Throwable e) { monitor.severe(String.format("Error booting runtime: %s", e.getMessage()), e); throw new EdcException(e); } diff --git a/core/common/boot/src/test/java/org/eclipse/edc/boot/system/DefaultServiceExtensionContextTest.java b/core/common/boot/src/test/java/org/eclipse/edc/boot/system/DefaultServiceExtensionContextTest.java index b9e860170d6..034dccc4829 100644 --- a/core/common/boot/src/test/java/org/eclipse/edc/boot/system/DefaultServiceExtensionContextTest.java +++ b/core/common/boot/src/test/java/org/eclipse/edc/boot/system/DefaultServiceExtensionContextTest.java @@ -85,25 +85,24 @@ void registerService_throwsWhenFrozen() { @Nested class GetRuntimeId { @Test - void shouldReturnRandomUuid_whenNotConfigured_andComponentIdNull() { + void shouldReturnRandomUuid_whenNotConfigured() { when(config.getConfig(any())).thenReturn(ConfigFactory.empty()); context.initialize(); var runtimeId = context.getRuntimeId(); assertThat(UUID.fromString(runtimeId)).isNotNull(); - verify(monitor, times(2)).warning(and(isA(String.class), argThat(message -> !message.contains(RUNTIME_ID)))); + verify(monitor, times(1)).warning(and(isA(String.class), argThat(message -> !message.contains(RUNTIME_ID)))); } - @Test - void shouldReturnRandomId_whenComponentIdNotConfigured() { + void shouldReturnConfiguredRuntimeId() { when(config.getConfig(any())).thenReturn(ConfigFactory.fromMap(Map.of(RUNTIME_ID, "runtime-id"))); context.initialize(); var runtimeId = context.getRuntimeId(); - assertThat(UUID.fromString(runtimeId)).isNotNull(); + assertThat(runtimeId).isEqualTo("runtime-id"); } } @@ -125,7 +124,9 @@ void shouldReturnRandomUuid_whenNotConfigured() { void shouldUseRuntimeId_whenComponentIdNotConfigured() { when(config.getConfig(any())).thenReturn(ConfigFactory.fromMap(Map.of(RUNTIME_ID, "runtime-id"))); context.initialize(); + var componentId = context.getComponentId(); + assertThat(componentId).isEqualTo("runtime-id"); } @@ -138,7 +139,7 @@ void shouldUseConfiguredValue_whenBothAreConfigured() { var componentId = context.getComponentId(); var runtimeId = context.getRuntimeId(); - assertThat(UUID.fromString(runtimeId)).isNotNull(); + assertThat(runtimeId).isEqualTo("runtime-id"); assertThat(componentId).isEqualTo("component-id"); verify(monitor).warning(and(isA(String.class), argThat(message -> !message.contains(RUNTIME_ID)))); } diff --git a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java index 9e87744a61a..6484072d183 100644 --- a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java +++ b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.SECONDS; @@ -48,6 +49,7 @@ public class EmbeddedRuntime extends BaseRuntime { private final MultiSourceServiceLocator serviceLocator; private final URL[] classPathEntries; private Future runtimeThread; + private final AtomicBoolean isRunning = new AtomicBoolean(false); public EmbeddedRuntime(String name, Map properties, String... additionalModules) { this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules)); @@ -87,6 +89,8 @@ public void boot(boolean addShutdownHook) { super.boot(false); latch.countDown(); + + isRunning.set(true); } catch (Exception e) { runtimeException.set(e); throw new EdcException(e); @@ -113,6 +117,7 @@ public void shutdown() { if (runtimeThread != null && !runtimeThread.isDone()) { runtimeThread.cancel(true); } + isRunning.set(false); } @Override @@ -148,4 +153,8 @@ public T getService(Class clazz) { public ServiceExtensionContext getContext() { return context; } + + public boolean isRunning() { + return isRunning.get(); + } } diff --git a/core/common/junit/src/main/java/org/eclipse/edc/junit/matchers/ArrayContainsMatcher.java b/core/common/junit/src/main/java/org/eclipse/edc/junit/matchers/ArrayContainsMatcher.java new file mode 100644 index 00000000000..f7a78298472 --- /dev/null +++ b/core/common/junit/src/main/java/org/eclipse/edc/junit/matchers/ArrayContainsMatcher.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.junit.matchers; + +import org.mockito.internal.matchers.Equals; + +import java.util.Arrays; +import java.util.HashSet; + +import static org.mockito.ArgumentMatchers.argThat; + +/** + * Matches if the items contained in the "wanted" array are contained in the actual one. + * Heavily inspired on {@link org.mockito.internal.matchers.ArrayEquals} + */ +public class ArrayContainsMatcher extends Equals { + + @SuppressWarnings("unchecked") + public static T[] arrayContains(T[] items) { + return (T[]) argThat(new ArrayContainsMatcher(items)); + } + + public ArrayContainsMatcher(Object[] wanted) { + super(wanted); + } + + @Override + public boolean matches(Object actual) { + var wanted = getWanted(); + if (wanted instanceof Object[] wantedArray && actual instanceof Object[] actualArray) { + return new HashSet<>(Arrays.asList(actualArray)).containsAll(Arrays.asList(wantedArray)); + } + + return super.matches(actual); + } +} diff --git a/core/common/junit/src/main/java/org/eclipse/edc/junit/matchers/PredicateMatcher.java b/core/common/junit/src/main/java/org/eclipse/edc/junit/matchers/PredicateMatcher.java index a55b692ca9e..ffbdd82bfc2 100644 --- a/core/common/junit/src/main/java/org/eclipse/edc/junit/matchers/PredicateMatcher.java +++ b/core/common/junit/src/main/java/org/eclipse/edc/junit/matchers/PredicateMatcher.java @@ -18,6 +18,7 @@ import java.util.function.Predicate; +@Deprecated(since = "0.11.0") // as not used anywhere public class PredicateMatcher implements ArgumentMatcher { private final Predicate predicate; diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java index 59cf9fc31c4..34e32273a14 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java @@ -24,11 +24,13 @@ import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; +import org.eclipse.edc.runtime.metamodel.annotation.Configuration; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; import org.eclipse.edc.runtime.metamodel.annotation.Provides; import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.runtime.metamodel.annotation.Settings; import org.eclipse.edc.spi.retry.ExponentialWaitStrategy; import org.eclipse.edc.spi.system.ExecutorInstrumentation; import org.eclipse.edc.spi.system.ServiceExtension; @@ -40,6 +42,8 @@ import java.time.Clock; import java.util.concurrent.Executors; +import static org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager.DEFAULT_FLOW_LEASE_FACTOR; +import static org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager.DEFAULT_FLOW_LEASE_TIME; import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_BATCH_SIZE; import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_ITERATION_WAIT; import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_SEND_RETRY_BASE_DELAY; @@ -89,6 +93,9 @@ public class DataPlaneFrameworkExtension implements ServiceExtension { ) private int numThreads; + @Configuration + private FlowLeaseConfiguration flowLeaseConfiguration; + private DataPlaneManagerImpl dataPlaneManager; @Inject @@ -135,6 +142,8 @@ public void initialize(ServiceExtensionContext context) { .transferProcessClient(transferProcessApiClient) .monitor(monitor) .telemetry(telemetry) + .runtimeId(context.getRuntimeId()) + .flowLeaseConfiguration(flowLeaseConfiguration) .build(); context.registerService(DataPlaneManager.class, dataPlaneManager); @@ -165,4 +174,32 @@ public DataTransferExecutorServiceContainer dataTransferExecutorServiceContainer private EntityRetryProcessConfiguration getEntityRetryProcessConfiguration() { return new EntityRetryProcessConfiguration(sendRetryLimit, () -> new ExponentialWaitStrategy(sendRetryBaseDelay)); } + + @Settings + public record FlowLeaseConfiguration( + @Setting( + key = "edc.dataplane.state-machine.flow.lease.time", + description = "The time in milliseconds after which a runtime renews its ownership on a started data flow.", + defaultValue = DEFAULT_FLOW_LEASE_TIME + "") + long time, + @Setting( + key = "edc.dataplane.state-machine.flow.lease.factor", + description = "After flow lease time * factor a started data flow will be considered abandoned by the owner and so another runtime can caught it up and start it again.", + defaultValue = DEFAULT_FLOW_LEASE_FACTOR + "") + int factor + ) { + + public FlowLeaseConfiguration() { + this(DEFAULT_FLOW_LEASE_TIME, DEFAULT_FLOW_LEASE_FACTOR); + } + + /** + * After this time has passed, a DataFlow can be considered "abandoned" and it can be picked up by another runtime. + * + * @return the abandoned time. + */ + public long abandonTime() { + return time * factor; + } + } } diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index c762bc99153..6cc25c91d5c 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -15,6 +15,7 @@ package org.eclipse.edc.connector.dataplane.framework.manager; import org.eclipse.edc.connector.controlplane.api.client.spi.transferprocess.TransferProcessApiClient; +import org.eclipse.edc.connector.dataplane.framework.DataPlaneFrameworkExtension.FlowLeaseConfiguration; import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; @@ -35,10 +36,14 @@ import org.eclipse.edc.statemachine.StateMachineManager; import org.jetbrains.annotations.Nullable; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; import static java.lang.String.format; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED; @@ -59,6 +64,8 @@ public class DataPlaneManagerImpl extends AbstractStateEntityManager start(DataFlowStartMessage startMessage) .callbackAddress(startMessage.getCallbackAddress()) .traceContext(telemetry.getCurrentTraceContext()) .properties(startMessage.getProperties()) - .transferType(startMessage.getTransferType()); + .transferType(startMessage.getTransferType()) + .runtimeId(runtimeId); var response = switch (startMessage.getFlowType()) { case PULL -> handlePull(startMessage, dataFlowBuilder); @@ -135,23 +143,41 @@ public StatusResult restartFlows() { new Criterion("transferType.flowType", "=", PUSH.toString()) ); - toBeRestarted.forEach(dataFlow -> { - dataFlow.transitToReceived(); - processReceived(dataFlow); - }); + toBeRestarted.forEach(this::restartFlow); } while (!toBeRestarted.isEmpty()); return StatusResult.success(); } - + @Override protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) { + Supplier ownedByThisRuntime = () -> new Criterion("runtimeId", "=", runtimeId); + Supplier ownedByAnotherRuntime = () -> new Criterion("runtimeId", "!=", runtimeId); + Supplier flowLeaseNeedsToBeUpdated = () -> new Criterion("updatedAt", "<", clock.millis() - flowLeaseConfiguration.time()); + Supplier danglingTransfer = () -> new Criterion("updatedAt", "<", clock.millis() - flowLeaseConfiguration.abandonTime()); + return builder + .processor(processDataFlowInState(STARTED, this::updateFlowLease, ownedByThisRuntime, flowLeaseNeedsToBeUpdated)) + .processor(processDataFlowInState(STARTED, this::restartFlow, ownedByAnotherRuntime, danglingTransfer)) .processor(processDataFlowInState(RECEIVED, this::processReceived)) .processor(processDataFlowInState(COMPLETED, this::processCompleted)) .processor(processDataFlowInState(FAILED, this::processFailed)); } + private boolean updateFlowLease(DataFlow dataFlow) { + dataFlow.transitToReceived(); + dataFlow.transitionToStarted(runtimeId); + store.save(dataFlow); + return true; + } + + private boolean restartFlow(DataFlow dataFlow) { + monitor.debug("Restarting interrupted flow %s, it was owned by runtime %s".formatted(dataFlow.getId(), dataFlow.getRuntimeId())); + dataFlow.transitToReceived(); + processReceived(dataFlow); + return true; + } + private StatusResult stop(String dataFlowId) { return stop(dataFlowId, null); } @@ -225,8 +251,9 @@ private boolean processReceived(DataFlow dataFlow) { return true; } - dataFlow.transitionToStarted(); - store.save(dataFlow); + dataFlow.transitionToStarted(runtimeId); + monitor.info("UPDATE dataflow %s. RuntimeId %s, UpdatedAt %s".formatted(dataFlow.getId(), dataFlow.getRuntimeId(), dataFlow.getUpdatedAt())); + update(dataFlow); return entityRetryProcessFactory.doAsyncProcess(dataFlow, () -> transferService.transfer(request)) .entityRetrieve(id -> store.findByIdAndLease(id).orElse(f -> null)) @@ -277,9 +304,16 @@ private boolean processFailed(DataFlow dataFlow) { return true; } - private Processor processDataFlowInState(DataFlowStates state, Function function) { - var filter = new Criterion[]{ hasState(state.code()) }; - return ProcessorImpl.Builder.newInstance(() -> store.nextNotLeased(batchSize, filter)) + @SafeVarargs + private Processor processDataFlowInState(DataFlowStates state, Function function, Supplier... additionalCriteria) { + Supplier> entitiesSupplier = () -> { + var additional = Arrays.stream(additionalCriteria).map(Supplier::get); + var filter = Stream.concat(Stream.of(new Criterion[]{ hasState(state.code()) }), additional) + .toArray(Criterion[]::new); + return store.nextNotLeased(batchSize, filter); + }; + + return ProcessorImpl.Builder.newInstance(entitiesSupplier) .process(telemetry.contextPropagationMiddleware(function)) .onNotProcessed(this::breakLease) .build(); @@ -322,6 +356,15 @@ public Builder authorizationService(DataPlaneAuthorizationService authorizationS return this; } + public Builder runtimeId(String runtimeId) { + manager.runtimeId = runtimeId; + return this; + } + + public Builder flowLeaseConfiguration(FlowLeaseConfiguration flowLeaseConfiguration) { + manager.flowLeaseConfiguration = flowLeaseConfiguration; + return this; + } } } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index c2a536e2333..358e90811b3 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -40,6 +40,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import static java.util.Collections.emptyList; @@ -56,6 +57,7 @@ import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.SUSPENDED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; +import static org.eclipse.edc.junit.matchers.ArrayContainsMatcher.arrayContains; import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; @@ -84,6 +86,7 @@ class DataPlaneManagerImplTest { private final DataFlowStartMessage request = createRequest(); private final TransferServiceRegistry registry = mock(); private final DataPlaneAuthorizationService authorizationService = mock(); + private final String runtimeId = UUID.randomUUID().toString(); private DataPlaneManager manager; @BeforeEach @@ -96,6 +99,7 @@ public void setUp() { .transferProcessClient(transferProcessApiClient) .authorizationService(authorizationService) .monitor(mock()) + .runtimeId(runtimeId) .build(); } @@ -368,7 +372,11 @@ void shouldStartTransferTransitionAndTransitionToStarted() { await().untilAsserted(() -> { verify(transferService).transfer(isA(DataFlowStartMessage.class)); - verify(store).save(argThat(it -> it.getState() == STARTED.code())); + var captor = ArgumentCaptor.forClass(DataFlow.class); + verify(store).save(captor.capture()); + var storedDataFlow = captor.getValue(); + assertThat(storedDataFlow.getState()).isEqualTo(STARTED.code()); + assertThat(storedDataFlow.getRuntimeId()).isEqualTo(runtimeId); }); } @@ -594,6 +602,52 @@ void shouldRestartFlows() { } } + @Nested + class UpdateFlowLease { + + @Test + void shouldUpdateFlow_whenFlowStartedAfterFlowLease() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.nextNotLeased(anyInt(), startedFlowOwnedByThisRuntime())) + .thenReturn(List.of(dataFlow)).thenReturn(emptyList()); + + manager.start(); + + await().untilAsserted(() -> { + var captor = ArgumentCaptor.forClass(DataFlow.class); + verify(store).save(captor.capture()); + var storedDataFlow = captor.getValue(); + assertThat(storedDataFlow.getState()).isEqualTo(STARTED.code()); + assertThat(storedDataFlow.getRuntimeId()).isEqualTo(runtimeId); + assertThat(storedDataFlow.getStateCount()).isEqualTo(1); + }); + } + } + + @Nested + class RestartFlowOwnedByAnotherRuntime { + @Test + void shouldRestartFlow_whenAnotherRuntimeAbandonedIt() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.nextNotLeased(anyInt(), startedFlowOwnedByAnotherRuntime())) + .thenReturn(List.of(dataFlow)).thenReturn(emptyList()); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.canHandle(any())).thenReturn(true); + when(transferService.transfer(any())).thenReturn(new CompletableFuture<>()); + + manager.start(); + + await().untilAsserted(() -> { + verify(transferService).transfer(isA(DataFlowStartMessage.class)); + var captor = ArgumentCaptor.forClass(DataFlow.class); + verify(store).save(captor.capture()); + var storedDataFlow = captor.getValue(); + assertThat(storedDataFlow.getState()).isEqualTo(STARTED.code()); + assertThat(storedDataFlow.getRuntimeId()).isEqualTo(runtimeId); + }); + } + } + private DataFlow.Builder dataFlowBuilder() { return DataFlow.Builder.newInstance() .source(DataAddress.Builder.newInstance().type("source").build()) @@ -604,7 +658,15 @@ private DataFlow.Builder dataFlowBuilder() { } private Criterion[] stateIs(int state) { - return aryEq(new Criterion[]{ hasState(state) }); + return aryEq(new Criterion[]{hasState(state)}); + } + + private Criterion[] startedFlowOwnedByThisRuntime() { + return arrayContains(new Criterion[] { hasState(STARTED.code()), new Criterion("runtimeId", "=", runtimeId) }); + } + + private Criterion[] startedFlowOwnedByAnotherRuntime() { + return arrayContains(new Criterion[] { hasState(STARTED.code()), new Criterion("runtimeId", "!=", runtimeId) }); } private DataFlowStartMessage createRequest() { diff --git a/dist/bom/dataplane-base-bom/build.gradle.kts b/dist/bom/dataplane-base-bom/build.gradle.kts index e5a8485c951..a2c92666cc4 100644 --- a/dist/bom/dataplane-base-bom/build.gradle.kts +++ b/dist/bom/dataplane-base-bom/build.gradle.kts @@ -41,4 +41,4 @@ dependencies { api(project(":extensions:data-plane-selector:data-plane-selector-client")) api(project(":extensions:common:api:api-observability")) api(project(":extensions:common:http")) -} \ No newline at end of file +} diff --git a/extensions/common/sql/sql-bootstrapper/src/main/java/org/eclipse/edc/sql/bootstrapper/SqlSchemaBootstrapperExtension.java b/extensions/common/sql/sql-bootstrapper/src/main/java/org/eclipse/edc/sql/bootstrapper/SqlSchemaBootstrapperExtension.java index 6bc5ef581d1..28186a97410 100644 --- a/extensions/common/sql/sql-bootstrapper/src/main/java/org/eclipse/edc/sql/bootstrapper/SqlSchemaBootstrapperExtension.java +++ b/extensions/common/sql/sql-bootstrapper/src/main/java/org/eclipse/edc/sql/bootstrapper/SqlSchemaBootstrapperExtension.java @@ -17,10 +17,10 @@ import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.persistence.EdcPersistenceException; import org.eclipse.edc.spi.system.ServiceExtension; -import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.sql.QueryExecutor; import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; import org.eclipse.edc.transaction.spi.TransactionContext; @@ -33,6 +33,12 @@ public class SqlSchemaBootstrapperExtension implements ServiceExtension { public static final String SCHEMA_AUTOCREATE_PROPERTY = "edc.sql.schema.autocreate"; public static final boolean SCHEMA_AUTOCREATE_DEFAULT = false; + @Setting( + description = "When true, the schema for the sql stores will be created automatically on the configured datasource", + key = "edc.sql.schema.autocreate", defaultValue = SCHEMA_AUTOCREATE_DEFAULT + "", required = false + ) + private boolean shouldAutoCreate; + @Inject private TransactionContext transactionContext; @Inject @@ -43,12 +49,6 @@ public class SqlSchemaBootstrapperExtension implements ServiceExtension { private Monitor monitor; private SqlSchemaBootstrapperImpl bootstrapper; - private Boolean shouldAutoCreate; - - @Override - public void initialize(ServiceExtensionContext context) { - shouldAutoCreate = context.getConfig().getBoolean(SCHEMA_AUTOCREATE_PROPERTY, SCHEMA_AUTOCREATE_DEFAULT); - } @Override public void prepare() { diff --git a/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityMapping.java b/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityMapping.java index 02206fadd4a..6472c9d9516 100644 --- a/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityMapping.java +++ b/extensions/common/sql/sql-lease/src/main/java/org/eclipse/edc/sql/lease/StatefulEntityMapping.java @@ -29,6 +29,7 @@ protected StatefulEntityMapping(StatefulEntityStatements statements) { add("stateCount", statements.getStateCountColumn()); add("stateTimestamp", statements.getStateTimestampColumn()); add("createdAt", statements.getCreatedAtColumn()); + add("updatedAt", statements.getUpdatedAtColumn()); add("traceContext", new JsonFieldTranslator(statements.getTraceContextColumn())); add("errorDetail", statements.getErrorDetailColumn()); } diff --git a/extensions/common/sql/sql-lease/src/test/java/org/eclipse/edc/sql/lease/PostgresLeaseContextTest.java b/extensions/common/sql/sql-lease/src/test/java/org/eclipse/edc/sql/lease/PostgresLeaseContextTest.java index ac05ad6f4ad..6d17359adc5 100644 --- a/extensions/common/sql/sql-lease/src/test/java/org/eclipse/edc/sql/lease/PostgresLeaseContextTest.java +++ b/extensions/common/sql/sql-lease/src/test/java/org/eclipse/edc/sql/lease/PostgresLeaseContextTest.java @@ -18,12 +18,10 @@ import org.eclipse.edc.junit.annotations.ComponentTest; import org.eclipse.edc.sql.ResultSetMapper; import org.eclipse.edc.sql.SqlQueryExecutor; -import org.eclipse.edc.sql.testfixtures.PostgresqlLocalInstance; import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension; import org.eclipse.edc.transaction.spi.NoopTransactionContext; import org.eclipse.edc.transaction.spi.TransactionContext; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -54,11 +52,6 @@ class PostgresLeaseContextTest { private SqlLeaseContextBuilder builder; private SqlLeaseContext leaseContext; - @BeforeAll - static void prepare(PostgresqlLocalInstance postgres) { - postgres.createDatabase(); - } - @BeforeEach void setup(PostgresqlStoreSetupExtension setupExtension, Connection connection) throws IOException { var schema = Files.readString(Paths.get("./src/test/resources/schema.sql")); diff --git a/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlEndToEndInstance.java b/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlEndToEndInstance.java index 88245633267..d4deaf9dc73 100644 --- a/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlEndToEndInstance.java +++ b/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlEndToEndInstance.java @@ -32,8 +32,8 @@ static void createDatabase(String participantName) { throw new EdcPersistenceException(e); } - var postgres = new PostgresqlLocalInstance(USER, PASSWORD, JDBC_URL_PREFIX, participantName); - postgres.createDatabase(); + var postgres = new PostgresqlLocalInstance(USER, PASSWORD, JDBC_URL_PREFIX); + postgres.createDatabase(participantName); } static Map defaultDatasourceConfiguration(String name) { diff --git a/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlLocalInstance.java b/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlLocalInstance.java index 76607198b91..4406a5c633c 100644 --- a/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlLocalInstance.java +++ b/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlLocalInstance.java @@ -17,25 +17,25 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; import static java.lang.String.format; public final class PostgresqlLocalInstance { - private final String password; private final String jdbcUrlPrefix; private final String username; - private final String databaseName; + private final String password; - public PostgresqlLocalInstance(String username, String password, String jdbcUrlPrefix, String databaseName) { + public PostgresqlLocalInstance(String username, String password, String jdbcUrlPrefix) { this.username = username; this.password = password; this.jdbcUrlPrefix = jdbcUrlPrefix; - this.databaseName = databaseName; } - public void createDatabase() { + public void createDatabase(String name) { try (var connection = getConnection("postgres")) { - connection.createStatement().execute(format("create database %s;", databaseName)); + connection.createStatement().execute(format("create database %s;", name)); } catch (SQLException e) { // database could already exist } @@ -49,4 +49,14 @@ public Connection getConnection(String databaseName) { } } + public Map createDefaultDatasourceConfiguration(String databaseName) { + return new HashMap<>() { + { + put("edc.datasource.default.url", jdbcUrlPrefix + databaseName); + put("edc.datasource.default.user", username); + put("edc.datasource.default.password", password); + } + }; + } + } diff --git a/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlStoreSetupExtension.java b/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlStoreSetupExtension.java index 68cd2302410..f2ef93d6b75 100644 --- a/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlStoreSetupExtension.java +++ b/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlStoreSetupExtension.java @@ -34,10 +34,12 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; import static java.lang.String.format; +import static org.eclipse.edc.util.io.Ports.getFreePort; /** * Extension for running PG SQL store implementation. It automatically creates a test database and provided all the base data structure @@ -46,17 +48,24 @@ public class PostgresqlStoreSetupExtension implements BeforeEachCallback, BeforeAllCallback, AfterAllCallback, ParameterResolver { public static final String POSTGRES_IMAGE_NAME = "postgres:16.1"; - public static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>(POSTGRES_IMAGE_NAME) + private final PostgreSQLContainer postgreSqlContainer = new PostgreSQLContainer<>(POSTGRES_IMAGE_NAME) .withExposedPorts(5432) .withUsername("postgres") .withPassword("password") .withDatabaseName("itest"); - private static PostgresqlLocalInstance postgres; + private final PostgresqlLocalInstance postgres; private final QueryExecutor queryExecutor = new SqlQueryExecutor(); private final TransactionContext transactionContext = new NoopTransactionContext(); private final DataSourceRegistry dataSourceRegistry = new DefaultDataSourceRegistry(); private final String datasourceName = UUID.randomUUID().toString(); - private String jdbcUrlPrefix; + private final String jdbcUrlPrefix; + + public PostgresqlStoreSetupExtension() { + var exposedPort = getFreePort(); + postgreSqlContainer.setPortBindings(List.of("%s:5432".formatted(exposedPort))); + jdbcUrlPrefix = format("jdbc:postgresql://%s:%s/", postgreSqlContainer.getHost(), exposedPort); + postgres = new PostgresqlLocalInstance(postgreSqlContainer.getUsername(), postgreSqlContainer.getPassword(), jdbcUrlPrefix); + } public String getDatasourceName() { return datasourceName; @@ -84,27 +93,25 @@ public DataSourceRegistry getDataSourceRegistry() { @Override public void beforeAll(ExtensionContext context) { - postgreSQLContainer.start(); - jdbcUrlPrefix = format("jdbc:postgresql://%s:%s/", postgreSQLContainer.getHost(), postgreSQLContainer.getFirstMappedPort()); - postgres = new PostgresqlLocalInstance(postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword(), jdbcUrlPrefix, postgreSQLContainer.getDatabaseName()); - postgres.createDatabase(); + postgreSqlContainer.start(); + postgres.createDatabase(postgreSqlContainer.getDatabaseName()); } @Override public void beforeEach(ExtensionContext context) { var properties = new Properties(); - properties.put("user", postgreSQLContainer.getUsername()); - properties.put("password", postgreSQLContainer.getPassword()); + properties.put("user", postgreSqlContainer.getUsername()); + properties.put("password", postgreSqlContainer.getPassword()); var connectionFactory = new DriverManagerConnectionFactory(); - var jdbcUrl = jdbcUrlPrefix + postgreSQLContainer.getDatabaseName(); + var jdbcUrl = jdbcUrlPrefix + postgreSqlContainer.getDatabaseName(); var dataSource = new ConnectionFactoryDataSource(connectionFactory, jdbcUrl, properties); dataSourceRegistry.register(datasourceName, dataSource); } @Override public void afterAll(ExtensionContext context) { - postgreSQLContainer.stop(); - postgreSQLContainer.close(); + postgreSqlContainer.stop(); + postgreSqlContainer.close(); } @Override @@ -128,4 +135,7 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte return null; } + public Map getDatasourceConfiguration() { + return postgres.createDefaultDatasourceConfiguration(postgreSqlContainer.getDatabaseName()); + } } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java index 1989571ce12..3cd4cf2286c 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java @@ -17,7 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; -import org.eclipse.edc.connector.dataplane.store.sql.schema.DataPlaneStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.DataFlowStatements; import org.eclipse.edc.spi.persistence.EdcPersistenceException; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.query.QuerySpec; @@ -52,13 +52,13 @@ */ public class SqlDataPlaneStore extends AbstractSqlStore implements DataPlaneStore { - private final DataPlaneStatements statements; + private final DataFlowStatements statements; private final SqlLeaseContextBuilder leaseContext; private final Clock clock; private final String leaseHolderName; public SqlDataPlaneStore(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext, - DataPlaneStatements statements, ObjectMapper objectMapper, Clock clock, QueryExecutor queryExecutor, + DataFlowStatements statements, ObjectMapper objectMapper, Clock clock, QueryExecutor queryExecutor, String leaseHolderName) { super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); this.statements = statements; @@ -151,7 +151,8 @@ private void insert(Connection connection, DataFlow dataFlow) { toJson(dataFlow.getDestination()), toJson(dataFlow.getProperties()), dataFlow.getTransferType().flowType().toString(), - dataFlow.getTransferType().destinationType() + dataFlow.getTransferType().destinationType(), + dataFlow.getRuntimeId() ); } @@ -170,6 +171,7 @@ private void update(Connection connection, DataFlow dataFlow) { toJson(dataFlow.getProperties()), dataFlow.getTransferType().flowType().toString(), dataFlow.getTransferType().destinationType(), + dataFlow.getRuntimeId(), dataFlow.getId()); } @@ -191,6 +193,7 @@ private DataFlow mapDataFlow(ResultSet resultSet) throws SQLException { resultSet.getString(statements.getTransferTypeDestinationColumn()), FlowType.valueOf(resultSet.getString(statements.getFlowTypeColumn())) )) + .runtimeId(resultSet.getString(statements.getRuntimeIdColumn())) .build(); } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStoreExtension.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStoreExtension.java index c66a4836fc5..12e68679f55 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStoreExtension.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStoreExtension.java @@ -15,8 +15,8 @@ package org.eclipse.edc.connector.dataplane.store.sql; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; -import org.eclipse.edc.connector.dataplane.store.sql.schema.DataPlaneStatements; -import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.PostgresDataPlaneStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.DataFlowStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.PostgresDataFlowStatements; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; @@ -49,7 +49,7 @@ public class SqlDataPlaneStoreExtension implements ServiceExtension { private TransactionContext transactionContext; @Inject(required = false) - private DataPlaneStatements statements; + private DataFlowStatements statements; @Inject private Clock clock; @@ -77,7 +77,7 @@ public DataPlaneStore dataPlaneStore(ServiceExtensionContext context) { /** * returns an externally-provided sql statement dialect, or postgres as a default */ - private DataPlaneStatements getStatementImpl() { - return statements != null ? statements : new PostgresDataPlaneStatements(); + private DataFlowStatements getStatementImpl() { + return statements != null ? statements : new PostgresDataFlowStatements(); } } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataFlowStatements.java similarity index 93% rename from extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java rename to extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataFlowStatements.java index 816d64e6a1d..d6ce8d8b146 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataFlowStatements.java @@ -21,11 +21,11 @@ import static java.lang.String.format; -public class BaseSqlDataPlaneStatements implements DataPlaneStatements { +public class BaseSqlDataFlowStatements implements DataFlowStatements { protected final SqlOperatorTranslator operatorTranslator; - public BaseSqlDataPlaneStatements(SqlOperatorTranslator operatorTranslator) { + public BaseSqlDataFlowStatements(SqlOperatorTranslator operatorTranslator) { this.operatorTranslator = operatorTranslator; } @@ -46,6 +46,7 @@ public String getInsertTemplate() { .jsonColumn(getPropertiesColumn()) .column(getFlowTypeColumn()) .column(getTransferTypeDestinationColumn()) + .column(getRuntimeIdColumn()) .insertInto(getDataPlaneTable()); } @@ -64,6 +65,7 @@ public String getUpdateTemplate() { .jsonColumn(getPropertiesColumn()) .column(getFlowTypeColumn()) .column(getTransferTypeDestinationColumn()) + .column(getRuntimeIdColumn()) .update(getDataPlaneTable(), getIdColumn()); } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataFlowStatements.java similarity index 90% rename from extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java rename to extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataFlowStatements.java index d9f849a9947..5abc06a9ec8 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataFlowStatements.java @@ -22,7 +22,7 @@ /** * Sql Statements for DataPlane Store */ -public interface DataPlaneStatements extends StatefulEntityStatements, LeaseStatements { +public interface DataFlowStatements extends StatefulEntityStatements, LeaseStatements { default String getIdColumn() { return "process_id"; @@ -56,6 +56,10 @@ default String getTransferTypeDestinationColumn() { return "transfer_type_destination"; } + default String getRuntimeIdColumn() { + return "runtime_id"; + } + String getInsertTemplate(); String getUpdateTemplate(); @@ -63,6 +67,5 @@ default String getTransferTypeDestinationColumn() { String getSelectTemplate(); SqlQueryStatement createQuery(QuerySpec querySpec); - } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataFlowMapping.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataFlowMapping.java index f3fe2fd98ef..ea5b63c6ddc 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataFlowMapping.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/DataFlowMapping.java @@ -15,7 +15,7 @@ package org.eclipse.edc.connector.dataplane.store.sql.schema.postgres; import org.eclipse.edc.connector.dataplane.spi.DataFlow; -import org.eclipse.edc.connector.dataplane.store.sql.schema.DataPlaneStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.DataFlowStatements; import org.eclipse.edc.sql.lease.StatefulEntityMapping; import org.eclipse.edc.sql.translation.TranslationMapping; @@ -25,14 +25,15 @@ */ public class DataFlowMapping extends StatefulEntityMapping { - public DataFlowMapping(DataPlaneStatements statements) { + public DataFlowMapping(DataFlowStatements statements) { super(statements); add("transferType", new TransferTypeMapping(statements)); + add("runtimeId", statements.getRuntimeIdColumn()); } private static class TransferTypeMapping extends TranslationMapping { - TransferTypeMapping(DataPlaneStatements statements) { + TransferTypeMapping(DataFlowStatements statements) { add("flowType", statements.getFlowTypeColumn()); } } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresDataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresDataFlowStatements.java similarity index 85% rename from extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresDataPlaneStatements.java rename to extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresDataFlowStatements.java index 47ce616ec19..9f40ab7abb2 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresDataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresDataFlowStatements.java @@ -14,13 +14,13 @@ package org.eclipse.edc.connector.dataplane.store.sql.schema.postgres; -import org.eclipse.edc.connector.dataplane.store.sql.schema.BaseSqlDataPlaneStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.BaseSqlDataFlowStatements; import org.eclipse.edc.sql.dialect.PostgresDialect; import org.eclipse.edc.sql.translation.PostgresqlOperatorTranslator; -public class PostgresDataPlaneStatements extends BaseSqlDataPlaneStatements { +public class PostgresDataFlowStatements extends BaseSqlDataFlowStatements { - public PostgresDataPlaneStatements() { + public PostgresDataFlowStatements() { super(new PostgresqlOperatorTranslator()); } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/resources/dataplane-schema.sql b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/resources/dataplane-schema.sql index 35ad86047d8..90d6b0be103 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/resources/dataplane-schema.sql +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/resources/dataplane-schema.sql @@ -32,7 +32,8 @@ CREATE TABLE IF NOT EXISTS edc_data_plane destination JSON, properties JSON, flow_type VARCHAR, - transfer_type_destination VARCHAR + transfer_type_destination VARCHAR, + runtime_id VARCHAR ); COMMENT ON COLUMN edc_data_plane.trace_context IS 'Java Map serialized as JSON'; diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/test/java/org/eclipse/edc/connector/dataplane/store/sql/PostgresDataPlaneStoreTest.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/test/java/org/eclipse/edc/connector/dataplane/store/sql/PostgresDataPlaneStoreTest.java index 488ba890059..be7e9ea7505 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/test/java/org/eclipse/edc/connector/dataplane/store/sql/PostgresDataPlaneStoreTest.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/test/java/org/eclipse/edc/connector/dataplane/store/sql/PostgresDataPlaneStoreTest.java @@ -16,8 +16,8 @@ import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; import org.eclipse.edc.connector.dataplane.spi.testfixtures.store.DataPlaneStoreTestBase; -import org.eclipse.edc.connector.dataplane.store.sql.schema.DataPlaneStatements; -import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.PostgresDataPlaneStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.DataFlowStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.PostgresDataFlowStatements; import org.eclipse.edc.json.JacksonTypeManager; import org.eclipse.edc.junit.annotations.ComponentTest; import org.eclipse.edc.junit.testfixtures.TestUtils; @@ -37,7 +37,7 @@ @ExtendWith(PostgresqlStoreSetupExtension.class) public class PostgresDataPlaneStoreTest extends DataPlaneStoreTestBase { - private final DataPlaneStatements statements = new PostgresDataPlaneStatements(); + private final DataFlowStatements statements = new PostgresDataFlowStatements(); private LeaseUtil leaseUtil; private SqlDataPlaneStore store; diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java index bb93049f4de..c803f6bdb03 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java @@ -47,8 +47,8 @@ public class DataFlow extends StatefulEntity { private DataAddress destination; private URI callbackAddress; private Map properties = new HashMap<>(); - private TransferType transferType; + private String runtimeId; @Override public DataFlow copy() { @@ -57,7 +57,8 @@ public DataFlow copy() { .destination(destination) .callbackAddress(callbackAddress) .properties(properties) - .transferType(getTransferType()); + .transferType(getTransferType()) + .runtimeId(runtimeId); return copy(builder); } @@ -87,6 +88,10 @@ public TransferType getTransferType() { return transferType; } + public String getRuntimeId() { + return runtimeId; + } + public DataFlowStartMessage toRequest() { return DataFlowStartMessage.Builder.newInstance() .id(getId()) @@ -124,7 +129,8 @@ public void transitToTerminated(@Nullable String reason) { } } - public void transitionToStarted() { + public void transitionToStarted(String runtimeId) { + this.runtimeId = runtimeId; transitionTo(STARTED.code()); } @@ -185,5 +191,9 @@ public Builder properties(Map properties) { return this; } + public Builder runtimeId(String runtimeId) { + entity.runtimeId = runtimeId; + return this; + } } } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java index a5adda87231..41897de289c 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java @@ -29,6 +29,9 @@ @ExtensionPoint public interface DataPlaneManager extends StateEntityManager { + long DEFAULT_FLOW_LEASE_TIME = 500; + int DEFAULT_FLOW_LEASE_FACTOR = 5; + /** * Determines if the data flow request is valid and can be processed by this runtime. */ diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java index 98945eb6a11..1a193b54c1d 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java @@ -202,6 +202,19 @@ void shouldFilterByFlowType() { .isEqualTo(push.getId()); } + @Test + void shouldFilterByRuntimeId() { + var runtime1 = createDataFlowBuilder().runtimeId("runtime1").build(); + var runtime2 = createDataFlowBuilder().runtimeId("runtime2").build(); + getStore().save(runtime1); + getStore().save(runtime2); + + var leased = getStore().nextNotLeased(2, new Criterion("runtimeId", "=", "runtime2")); + + assertThat(leased).hasSize(1).first().extracting(DataFlow::getId) + .isEqualTo(runtime2.getId()); + } + private void delayByTenMillis(StatefulEntity t) { try { Thread.sleep(10); @@ -257,7 +270,8 @@ private DataFlow.Builder createDataFlowBuilder() { .source(DataAddress.Builder.newInstance().type("src-type").build()) .destination(DataAddress.Builder.newInstance().type("dest-type").build()) .state(STARTED.code()) - .transferType(new TransferType("transferType", PUSH)); + .transferType(new TransferType("transferType", PUSH)) + .runtimeId("runtimeId"); } } diff --git a/system-tests/e2e-dataplane-tests/runtimes/data-plane/src/main/java/org/eclipse/edc/test/e2e/runtime/dataplane/PollingHttpExtension.java b/system-tests/e2e-dataplane-tests/runtimes/data-plane/src/main/java/org/eclipse/edc/test/e2e/runtime/dataplane/PollingHttpExtension.java new file mode 100644 index 00000000000..a0c78a5e9a7 --- /dev/null +++ b/system-tests/e2e-dataplane-tests/runtimes/data-plane/src/main/java/org/eclipse/edc/test/e2e/runtime/dataplane/PollingHttpExtension.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2024 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.runtime.dataplane; + +import okhttp3.Request; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.http.spi.EdcHttpClient; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.stream.Stream; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * Testing extension that registers a PollingHttp DataSourceFactory. + * + * Used to test long-running push transfers. + */ +public class PollingHttpExtension implements ServiceExtension { + + @Inject + private PipelineService pipelineService; + + @Inject + private EdcHttpClient edcHttpClient; + + @Override + public void initialize(ServiceExtensionContext context) { + pipelineService.registerFactory(new PollingHttpDataSourceFactory(edcHttpClient)); + } + + private record PollingHttpDataSourceFactory(EdcHttpClient edcHttpClient) implements DataSourceFactory { + + @Override + public String supportedType() { + return "PollingHttp"; + } + + @Override + public DataSource createSource(DataFlowStartMessage request) { + return new PollingHttpDataSource(edcHttpClient, request.getSourceDataAddress()); + } + + @Override + public @NotNull Result validateRequest(DataFlowStartMessage request) { + return Result.success(); + } + } + + private static class PollingHttpDataSource implements DataSource { + + private final EdcHttpClient edcHttpClient; + private final DataAddress dataAddress; + private final ScheduledExecutorService executor; + private final BlockingQueue requestQueue = new ArrayBlockingQueue<>(10); + private ScheduledFuture future; + + PollingHttpDataSource(EdcHttpClient edcHttpClient, DataAddress dataAddress) { + this.edcHttpClient = edcHttpClient; + this.dataAddress = dataAddress; + this.executor = Executors.newScheduledThreadPool(1); + } + + @Override + public StreamResult> openPartStream() { + future = executor.scheduleAtFixedRate(() -> { + var request = new Request.Builder().url(dataAddress.getStringProperty("baseUrl")).get().build(); + try { + var responseBody = edcHttpClient.execute(request).body(); + requestQueue.add(new PollingHttpPart(UUID.randomUUID().toString(), responseBody.byteStream(), "application/ octet-stream")); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, 0L, 100L, MILLISECONDS); + + return StreamResult.success(Stream.generate(() -> { + try { + return requestQueue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + } + + @Override + public void close() { + if (future != null) { + future.cancel(true); + } + } + } + + public record PollingHttpPart(String name, InputStream content, String mediaType) implements DataSource.Part { + + @Override + public long size() { + return SIZE_UNKNOWN; + } + + @Override + public InputStream openStream() { + return content; + } + + @Override + public String mediaType() { + return mediaType; + } + } +} diff --git a/system-tests/e2e-dataplane-tests/runtimes/data-plane/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/system-tests/e2e-dataplane-tests/runtimes/data-plane/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000000..c759cdfe16e --- /dev/null +++ b/system-tests/e2e-dataplane-tests/runtimes/data-plane/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1 @@ +org.eclipse.edc.test.e2e.runtime.dataplane.PollingHttpExtension diff --git a/system-tests/e2e-dataplane-tests/tests/build.gradle.kts b/system-tests/e2e-dataplane-tests/tests/build.gradle.kts index 6a8ab2566e4..94223ccba0f 100644 --- a/system-tests/e2e-dataplane-tests/tests/build.gradle.kts +++ b/system-tests/e2e-dataplane-tests/tests/build.gradle.kts @@ -17,14 +17,15 @@ plugins { } dependencies { - testImplementation(project(":spi:data-plane:data-plane-spi")) - testImplementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform")) - - testImplementation(testFixtures(project(":extensions:control-plane:api:management-api:management-api-test-fixtures"))) testImplementation(project(":core:common:junit")) testImplementation(project(":core:common:lib:keys-lib")) + testImplementation(project(":core:common:lib:transform-lib")) // for the transformer registry impl + testImplementation(project(":core:common:lib:crypto-common-lib")) testImplementation(project(":extensions:common:json-ld")) + testImplementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform")) + testImplementation(testFixtures(project(":extensions:common:sql:sql-test-fixtures"))) + testImplementation(testFixtures(project(":extensions:control-plane:api:management-api:management-api-test-fixtures"))) testImplementation(libs.restAssured) testImplementation(libs.assertj) @@ -32,8 +33,7 @@ dependencies { testImplementation(libs.junit.jupiter.api) testImplementation(libs.mockserver.netty) testImplementation(libs.mockserver.client) - testImplementation(project(":core:common:lib:transform-lib")) // for the transformer registry impl - testImplementation(project(":core:common:lib:crypto-common-lib")) + testImplementation(libs.testcontainers.postgres) testCompileOnly(project(":system-tests:e2e-dataplane-tests:runtimes:data-plane")) } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/ClusteredDataPlaneEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/ClusteredDataPlaneEndToEndTest.java new file mode 100644 index 00000000000..67281facbe3 --- /dev/null +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/ClusteredDataPlaneEndToEndTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2024 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; +import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.extensions.EmbeddedRuntime; +import org.eclipse.edc.junit.extensions.RuntimeExtension; +import org.eclipse.edc.junit.extensions.RuntimePerClassExtension; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; +import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension; +import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockserver.integration.ClientAndServer; + +import java.util.Map; +import java.util.UUID; +import java.util.function.BiFunction; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; +import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.util.io.Ports.getFreePort; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.verify.VerificationTimes.atLeast; +import static org.mockserver.verify.VerificationTimes.never; + +@EndToEndTest +public class ClusteredDataPlaneEndToEndTest { + + @RegisterExtension + @Order(0) + private static final PostgresqlStoreSetupExtension POSTGRESQL = new PostgresqlStoreSetupExtension(); + + private static final DataPlaneParticipant FOO_DATAPLANE = DataPlaneParticipant.Builder.newInstance() + .name("provider") + .id("urn:connector:provider") + .build(); + + private static final DataPlaneParticipant BAR_DATAPLANE = DataPlaneParticipant.Builder.newInstance() + .name("provider") + .id("urn:connector:provider") + .build(); + + private static final BiFunction RUNTIME_SUPPLIER = (name, dataPlaneParticipant) -> { + var config = dataPlaneParticipant.dataPlaneConfiguration(); + config.put("edc.runtime.id", name); + config.put("edc.sql.schema.autocreate", "true"); + config.put("edc.core.retry.retries.max", "0"); + config.putAll(POSTGRESQL.getDatasourceConfiguration()); + return new EmbeddedRuntime( + name, + config, + ":system-tests:e2e-dataplane-tests:runtimes:data-plane", + ":dist:bom:dataplane-feature-sql-bom" + ); + }; + + private static final EmbeddedRuntime FOO_RUNTIME = RUNTIME_SUPPLIER.apply("foo", FOO_DATAPLANE); + private static final EmbeddedRuntime BAR_RUNTIME = RUNTIME_SUPPLIER.apply("bar", BAR_DATAPLANE); + private static final Map RUNTIMES = Map.of( + "foo", FOO_RUNTIME, + "bar", BAR_RUNTIME + ); + + @RegisterExtension + private static final RuntimeExtension FOO = new RuntimePerClassExtension(FOO_RUNTIME); + + @RegisterExtension + private static final RuntimeExtension BAR = new RuntimePerClassExtension(BAR_RUNTIME); + + @Test + void shouldRestartTransferOwnedByAnotherInstance_whenFlowLeaseExpires() { + var source = ClientAndServer.startClientAndServer(getFreePort()); + source.when(request()).respond(response("data")); + var destination = ClientAndServer.startClientAndServer(getFreePort()); + destination.when(request()).respond(response("ok")); + + var sourceAddress = DataAddress.Builder.newInstance().type("PollingHttp") + .property(EDC_NAMESPACE + "baseUrl", "http://localhost:" + source.getPort()) + .build(); + + var destinationAddress = DataAddress.Builder.newInstance().type("HttpData") + .property(EDC_NAMESPACE + "baseUrl", "http://localhost:" + destination.getPort()) + .build(); + + var startMessage = DataFlowStartMessage.Builder.newInstance() + .processId(UUID.randomUUID().toString()) + .sourceDataAddress(sourceAddress) + .destinationDataAddress(destinationAddress) + .transferType(new TransferType("HttpData", FlowType.PUSH)) + .build(); + + var start = runtime().getService(DataPlaneManager.class).start(startMessage); + + assertThat(start).isSucceeded(); + + await().untilAsserted(() -> { + destination.verify(request(), atLeast(1)); + }); + + var firstOwnerRuntimeId = runtime().getService(DataPlaneStore.class) + .findById(startMessage.getProcessId()).getRuntimeId(); + var firstOwner = RUNTIMES.get(firstOwnerRuntimeId); + firstOwner.shutdown(); + + destination.reset(); + destination.verify(request(), never()); + + await().untilAsserted(() -> { + var dataFlow = runtime().getService(DataPlaneStore.class).findById(startMessage.getProcessId()); + assertThat(dataFlow.getRuntimeId()).isNotEqualTo(firstOwner); + destination.verify(request(), atLeast(1)); + }); + } + + /** + * Returns a runtime, it does not matter which one. + * + * @return a running runtime. + */ + private @NotNull EmbeddedRuntime runtime() { + return RUNTIMES.values().stream().filter(EmbeddedRuntime::isRunning).findAny().get(); + } + +} diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java index fb8a59ccdb2..edecb88edee 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import static java.io.File.separator; import static org.eclipse.edc.util.io.Ports.getFreePort; @@ -32,6 +33,7 @@ public class DataPlaneParticipant extends Participant { private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); private final URI dataplanePublicResponse = dataPlanePublic.resolve("/public/responseChannel"); + private final String componentId = UUID.randomUUID().toString(); private DataPlaneParticipant() { super(); @@ -48,6 +50,7 @@ public Endpoint getDataPlanePublicEndpoint() { public Map dataPlaneConfiguration() { return new HashMap<>() { { + put("edc.component.id", componentId); put("web.http.port", String.valueOf(dataPlaneDefault.getPort())); put("web.http.path", "/api"); put("web.http.public.port", String.valueOf(dataPlanePublic.getPort())); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index e169dfb6093..d4c48988aab 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -38,9 +38,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.RegisterExtension; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; import org.testcontainers.utility.DockerImageName; import java.time.Duration; @@ -156,7 +156,7 @@ void shouldResumeTransfer_whenDataPlaneRestarts() { abstract static class Tests extends TransferEndToEndTestBase { @Container - private static final KafkaContainer KAFKA = new KafkaContainer(KAFKA_CONTAINER_VERSION); + private static final ConfluentKafkaContainer KAFKA = new ConfluentKafkaContainer(KAFKA_CONTAINER_VERSION); private final String sourceTopic = "source_topic_" + UUID.randomUUID(); protected final String sinkTopic = "sink_topic_" + UUID.randomUUID();