diff --git a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/ClasspathReader.java b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/ClasspathReader.java new file mode 100644 index 00000000000..97e6b72fbfc --- /dev/null +++ b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/ClasspathReader.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.junit.extensions; + +import org.eclipse.edc.junit.testfixtures.TestUtils; +import org.eclipse.edc.spi.EdcException; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.Locale; +import java.util.stream.Stream; + +/** + * Read the classpath entries of the gradle modules. + */ +public class ClasspathReader { + + private static final File PROJECT_ROOT = TestUtils.findBuildRoot(); + private static final File GRADLE_WRAPPER = new File(PROJECT_ROOT, TestUtils.GRADLE_WRAPPER); + + /** + * Get classpath entries for the passed modules + * + * @param modules the modules. + * @return the classpath entries. + */ + public static URL[] classpathFor(String... modules) { + try { + // Run a Gradle custom task to determine the runtime classpath of the module to run + var printClasspath = Arrays.stream(modules).map(it -> it + ":printClasspath"); + var commandStream = Stream.of(GRADLE_WRAPPER.getCanonicalPath(), "-q"); + var command = Stream.concat(commandStream, printClasspath).toArray(String[]::new); + + var exec = Runtime.getRuntime().exec(command); + + try (var reader = exec.inputReader()) { + return reader.lines().map(line -> line.split(":|\\s")) + .flatMap(Arrays::stream) + .filter(s -> !s.isBlank()) + .map(File::new) + .flatMap(ClasspathReader::resolveClasspathEntry) + .toArray(URL[]::new); + } + + } catch (IOException e) { + throw new EdcException(e); + } + } + + /** + * Replace Gradle subproject JAR entries with subproject build directories in classpath. This ensures modified + * classes are picked up without needing to rebuild dependent JARs. + * + * @param classpathFile classpath entry file. + * @return resolved classpath entries for the input argument. + */ + private static Stream resolveClasspathEntry(File classpathFile) { + try { + if (isJar(classpathFile) && isUnderRoot(classpathFile)) { + var buildDir = classpathFile.getCanonicalFile().getParentFile().getParentFile(); + return Stream.of( + new File(buildDir, "/classes/java/main").toURI().toURL(), + new File(buildDir, "../src/main/resources").toURI().toURL() + ); + } else { + var sanitizedClassPathEntry = classpathFile.getCanonicalPath().replace("build/resources/main", "src/main/resources"); + return Stream.of(new File(sanitizedClassPathEntry).toURI().toURL()); + } + + } catch (IOException e) { + throw new EdcException(e); + } + } + + private static boolean isUnderRoot(File classPathFile) throws IOException { + return classPathFile.getCanonicalPath().startsWith(ClasspathReader.PROJECT_ROOT.getCanonicalPath() + File.separator); + } + + private static boolean isJar(File classPathFile) throws IOException { + return classPathFile.getCanonicalPath().toLowerCase(Locale.ROOT).endsWith(".jar"); + } + +} diff --git a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java index 8beca2b679f..b3795579ccd 100644 --- a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java +++ b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java @@ -15,7 +15,6 @@ package org.eclipse.edc.junit.extensions; import org.eclipse.edc.boot.system.runtime.BaseRuntime; -import org.eclipse.edc.junit.testfixtures.TestUtils; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.monitor.ConsoleMonitor; import org.eclipse.edc.spi.monitor.Monitor; @@ -24,22 +23,16 @@ import org.eclipse.edc.spi.system.configuration.Config; import org.jetbrains.annotations.NotNull; -import java.io.File; -import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; -import java.util.Arrays; import java.util.LinkedHashMap; -import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; -import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; import static org.eclipse.edc.boot.system.ExtensionLoader.loadMonitor; @@ -52,52 +45,31 @@ public class EmbeddedRuntime extends BaseRuntime { private final String name; private final Map properties; - private final String[] additionalModules; private final LinkedHashMap, Object> serviceMocks = new LinkedHashMap<>(); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final MultiSourceServiceLocator serviceLocator; + private final URL[] classPathEntries; public EmbeddedRuntime(String name, Map properties, String... additionalModules) { - this(new MultiSourceServiceLocator(), name, properties, additionalModules); + this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules)); } - private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, Map properties, String... additionalModules) { + public EmbeddedRuntime(String name, Map properties, URL[] classpathEntries) { + this(new MultiSourceServiceLocator(), name, properties, classpathEntries); + } + + private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, Map properties, URL[] classPathEntries) { super(serviceLocator); this.serviceLocator = serviceLocator; this.name = name; this.properties = properties; - this.additionalModules = additionalModules; + this.classPathEntries = classPathEntries; } @Override public void boot(boolean addShutdownHook) { try { - // Find the project root directory, moving up the directory tree - var root = TestUtils.findBuildRoot(); - - // Run a Gradle custom task to determine the runtime classpath of the module to run - var printClasspath = Arrays.stream(additionalModules).map(it -> it + ":printClasspath"); - var commandStream = Stream.of(new File(root, TestUtils.GRADLE_WRAPPER).getCanonicalPath(), "-q"); - var command = Stream.concat(commandStream, printClasspath).toArray(String[]::new); - - var exec = Runtime.getRuntime().exec(command); - var classpathString = new String(exec.getInputStream().readAllBytes()); - var errorOutput = new String(exec.getErrorStream().readAllBytes()); - if (exec.waitFor() != 0) { - throw new EdcException(format("Failed to run gradle command: [%s]. Output: %s %s", - String.join(" ", command), classpathString, errorOutput)); - } - - // Replace subproject JAR entries with subproject build directories in classpath. - // This ensures modified classes are picked up without needing to rebuild dependent JARs. - var classPathEntries = Arrays.stream(classpathString.split(":|\\s")) - .filter(s -> !s.isBlank()) - .flatMap(p -> resolveClassPathEntry(root, p)) - .toArray(URL[]::new); - - // Create a ClassLoader that only has the target module class path, and is not - // parented with the current ClassLoader. - var classLoader = URLClassLoader.newInstance(classPathEntries); + MONITOR.info("Starting runtime %s".formatted(name)); // Temporarily inject system properties. var savedProperties = (Properties) System.getProperties().clone(); @@ -106,10 +78,10 @@ public void boot(boolean addShutdownHook) { var runtimeException = new AtomicReference(); var latch = new CountDownLatch(1); - MONITOR.info("Starting runtime %s with additional modules: [%s]".formatted(name, String.join(",", additionalModules))); - executorService.execute(() -> { try { + var classLoader = URLClassLoader.newInstance(classPathEntries); + Thread.currentThread().setContextClassLoader(classLoader); super.boot(false); @@ -173,34 +145,4 @@ public T getService(Class clazz) { public ServiceExtensionContext getContext() { return context; } - - /** - * Replace Gradle subproject JAR entries with subproject build directories in classpath. This ensures modified - * classes are picked up without needing to rebuild dependent JARs. - * - * @param root project root directory. - * @param classPathEntry class path entry to resolve. - * @return resolved class path entries for the input argument. - */ - private Stream resolveClassPathEntry(File root, String classPathEntry) { - try { - var f = new File(classPathEntry).getCanonicalFile(); - - // If class path entry is not a JAR under the root (i.e. a sub-project), do not transform it - var isUnderRoot = f.getCanonicalPath().startsWith(root.getCanonicalPath() + File.separator); - if (!classPathEntry.toLowerCase(Locale.ROOT).endsWith(".jar") || !isUnderRoot) { - var sanitizedClassPathEntry = classPathEntry.replace("build/resources/main", "src/main/resources"); - return Stream.of(new File(sanitizedClassPathEntry).toURI().toURL()); - } - - // Replace JAR entry with the resolved classes and resources folder - var buildDir = f.getParentFile().getParentFile(); - return Stream.of( - new File(buildDir, "/classes/java/main").toURI().toURL(), - new File(buildDir, "../src/main/resources").toURI().toURL() - ); - } catch (IOException e) { - throw new EdcException(e); - } - } } diff --git a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java index 32423b51011..e175f2b64cc 100644 --- a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java +++ b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java @@ -14,6 +14,7 @@ package org.eclipse.edc.connector.controlplane.test.system.utils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.restassured.specification.RequestSpecification; import jakarta.json.Json; @@ -243,7 +244,6 @@ public JsonArray getCatalogDatasets(Participant provider, JsonObject querySpec) * @return dataset. */ public JsonObject getDatasetForAsset(Participant provider, String assetId) { - var datasetReference = new AtomicReference(); var requestBody = createObjectBuilder() .add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE)) .add(TYPE, "DatasetRequest") @@ -253,25 +253,24 @@ public JsonObject getDatasetForAsset(Participant provider, String assetId) { .add("protocol", protocol) .build(); - await().atMost(timeout).untilAsserted(() -> { - var response = managementEndpoint.baseRequest() - .contentType(JSON) - .when() - .body(requestBody) - .post("/v3/catalog/dataset/request") - .then() - .log().ifError() - .statusCode(200) - .extract().body().asString(); - - var compacted = objectMapper.readValue(response, JsonObject.class); - - var dataset = jsonLd.expand(compacted).orElseThrow(f -> new EdcException(f.getFailureDetail())); - - datasetReference.set(dataset); - }); - - return datasetReference.get(); + var response = managementEndpoint.baseRequest() + .contentType(JSON) + .when() + .body(requestBody) + .post("/v3/catalog/dataset/request") + .then() + .statusCode(200) + .contentType(JSON) + .log().ifValidationFails() + .extract(); + + try { + var responseBody = response.body().asString(); + var compacted = objectMapper.readValue(responseBody, JsonObject.class); + return jsonLd.expand(compacted).orElseThrow(f -> new EdcException(f.getFailureDetail())); + } catch (JsonProcessingException e) { + throw new EdcException("Cannot deserialize dataset", e); + } } /** diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/build.gradle.kts b/extensions/control-plane/transfer/transfer-data-plane-signaling/build.gradle.kts index 2f8660e3361..06684457562 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/build.gradle.kts +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/build.gradle.kts @@ -29,11 +29,4 @@ dependencies { testImplementation(project(":core:common:junit")) } -edcBuild { - swagger { - apiGroup.set("control-api") - } -} - - diff --git a/system-tests/e2e-transfer-test/control-plane/build.gradle.kts b/system-tests/e2e-transfer-test/control-plane/build.gradle.kts index 848496621f0..d31d264eeb5 100644 --- a/system-tests/e2e-transfer-test/control-plane/build.gradle.kts +++ b/system-tests/e2e-transfer-test/control-plane/build.gradle.kts @@ -17,15 +17,22 @@ plugins { } dependencies { + implementation(project(":core:common:edr-store-core")) implementation(project(":core:common:token-core")) implementation(project(":core:control-plane:control-plane-core")) implementation(project(":data-protocols:dsp")) implementation(project(":extensions:common:http")) + implementation(project(":extensions:common:api:control-api-configuration")) + implementation(project(":extensions:common:api:management-api-configuration")) implementation(project(":extensions:common:iam:iam-mock")) + implementation(project(":extensions:control-plane:api:control-plane-api")) implementation(project(":extensions:control-plane:api:management-api")) - implementation(project(":extensions:common:api:control-api-configuration")) - implementation(project(":extensions:common:api:management-api-configuration")) + implementation(project(":extensions:control-plane:api:management-api:edr-cache-api")) + implementation(project(":extensions:control-plane:callback:callback-event-dispatcher")) + implementation(project(":extensions:control-plane:callback:callback-http-dispatcher")) + implementation(project(":extensions:control-plane:edr:edr-store-receiver")) + implementation(project(":extensions:control-plane:transfer:transfer-data-plane-signaling")) implementation(project(":core:data-plane-selector:data-plane-selector-core")) implementation(project(":extensions:data-plane-selector:data-plane-selector-api")) diff --git a/system-tests/e2e-transfer-test/data-plane/build.gradle.kts b/system-tests/e2e-transfer-test/data-plane/build.gradle.kts index 43fcfd089c7..12526456dfa 100644 --- a/system-tests/e2e-transfer-test/data-plane/build.gradle.kts +++ b/system-tests/e2e-transfer-test/data-plane/build.gradle.kts @@ -25,6 +25,7 @@ dependencies { implementation(project(":extensions:data-plane:data-plane-kafka")) implementation(project(":extensions:data-plane:data-plane-http-oauth2")) implementation(project(":extensions:data-plane:data-plane-control-api")) + implementation(project(":extensions:data-plane:data-plane-public-api-v2")) implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api")) } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java index 5c6c405d01c..d01b8c8de0e 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java @@ -14,81 +14,66 @@ package org.eclipse.edc.test.e2e; +import org.eclipse.edc.junit.extensions.ClasspathReader; import org.eclipse.edc.junit.extensions.EmbeddedRuntime; +import java.net.URL; import java.util.Map; -public interface Runtimes { +/** + * Runtimes for E2E transfer test. + * The usage of this pattern permits to initialize the classpath once for runtime in a lazy manner. + * Tests will be quite faster this way because classpath loading (that requires interaction with gradlew) is a pretty slow activity. + */ +public enum Runtimes { - interface InMemory { + BACKEND_SERVICE( + ":system-tests:e2e-transfer-test:backend-service" + ), - static EmbeddedRuntime controlPlane(String name, Map configuration) { - return new EmbeddedRuntime(name, configuration, - ":system-tests:e2e-transfer-test:control-plane", - ":core:common:edr-store-core", - ":extensions:control-plane:transfer:transfer-data-plane-signaling", - ":extensions:control-plane:api:management-api:edr-cache-api", - ":extensions:control-plane:edr:edr-store-receiver", - ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client", - ":extensions:control-plane:callback:callback-event-dispatcher", - ":extensions:control-plane:callback:callback-http-dispatcher" - ); - } + IN_MEMORY_CONTROL_PLANE( + ":system-tests:e2e-transfer-test:control-plane", + ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client" + ), - static EmbeddedRuntime controlPlaneEmbeddedDataPlane(String name, Map configuration) { - return new EmbeddedRuntime(name, configuration, - ":system-tests:e2e-transfer-test:control-plane", - ":system-tests:e2e-transfer-test:data-plane", - ":extensions:control-plane:transfer:transfer-data-plane-signaling", - ":extensions:data-plane:data-plane-self-registration", - ":extensions:data-plane:data-plane-public-api-v2" - ); - } + IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE( + ":system-tests:e2e-transfer-test:control-plane", + ":system-tests:e2e-transfer-test:data-plane" + ), - static EmbeddedRuntime dataPlane(String name, Map configuration) { - return new EmbeddedRuntime(name, configuration, - ":system-tests:e2e-transfer-test:data-plane", - ":extensions:data-plane:data-plane-public-api-v2", - ":extensions:data-plane-selector:data-plane-selector-client" - ); - } - } + IN_MEMORY_DATA_PLANE( + ":system-tests:e2e-transfer-test:data-plane", + ":extensions:data-plane-selector:data-plane-selector-client" + ), - interface Postgres { + POSTGRES_CONTROL_PLANE( + ":system-tests:e2e-transfer-test:control-plane", + ":extensions:common:store:sql:edr-index-sql", + ":extensions:common:sql:sql-pool:sql-pool-apache-commons", + ":extensions:common:transaction:transaction-local", + ":extensions:control-plane:store:sql:control-plane-sql", + ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client" + ), - static EmbeddedRuntime controlPlane(String name, Map configuration) { - return new EmbeddedRuntime(name, configuration, - ":system-tests:e2e-transfer-test:control-plane", - ":core:common:edr-store-core", - ":extensions:common:store:sql:edr-index-sql", - ":extensions:common:sql:sql-pool:sql-pool-apache-commons", - ":extensions:common:transaction:transaction-local", - ":extensions:control-plane:transfer:transfer-data-plane-signaling", - ":extensions:control-plane:api:management-api:edr-cache-api", - ":extensions:control-plane:edr:edr-store-receiver", - ":extensions:control-plane:store:sql:control-plane-sql", - ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client", - ":extensions:control-plane:callback:callback-event-dispatcher", - ":extensions:control-plane:callback:callback-http-dispatcher" - ); - } + POSTGRES_DATA_PLANE( + ":system-tests:e2e-transfer-test:data-plane", + ":extensions:data-plane:store:sql:data-plane-store-sql", + ":extensions:common:sql:sql-pool:sql-pool-apache-commons", + ":extensions:common:transaction:transaction-local", + ":extensions:data-plane-selector:data-plane-selector-client" + ); - static EmbeddedRuntime dataPlane(String name, Map configuration) { - return new EmbeddedRuntime(name, configuration, - ":system-tests:e2e-transfer-test:data-plane", - ":extensions:data-plane:store:sql:data-plane-store-sql", - ":extensions:common:sql:sql-pool:sql-pool-apache-commons", - ":extensions:common:transaction:transaction-local", - ":extensions:data-plane:data-plane-public-api-v2", - ":extensions:data-plane-selector:data-plane-selector-client" - ); - } + private URL[] classpathEntries; + private final String[] modules; + Runtimes(String... modules) { + this.modules = modules; } - static EmbeddedRuntime backendService(String name, Map configuration) { - return new EmbeddedRuntime(name, configuration, - ":system-tests:e2e-transfer-test:backend-service" - ); + public EmbeddedRuntime create(String name, Map configuration) { + if (classpathEntries == null) { + classpathEntries = ClasspathReader.classpathFor(modules); + } + return new EmbeddedRuntime(name, configuration, classpathEntries); } } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java index 971e832d1d9..b28df03cdb5 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java @@ -15,13 +15,16 @@ package org.eclipse.edc.test.e2e; import jakarta.json.JsonObject; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.extensions.RuntimeExtension; import org.eclipse.edc.spi.security.Vault; import java.time.Duration; import java.util.Map; +import java.util.Objects; import java.util.UUID; +import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; import static org.eclipse.edc.junit.testfixtures.TestUtils.getResourceFileContentAsString; @@ -56,4 +59,11 @@ protected void createResourcesOnProvider(String assetId, JsonObject contractPoli PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), accessPolicyId, contractPolicyId); } + protected void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { + await().atMost(timeout).until( + () -> CONSUMER.getTransferProcessState(transferProcessId), + it -> Objects.equals(it, state.name()) + ); + } + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 131b83ebdb7..c3485ab0c3f 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -24,7 +24,6 @@ import jakarta.json.JsonObject; import org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted; -import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; import org.eclipse.edc.junit.extensions.RuntimeExtension; @@ -62,7 +61,6 @@ import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase; -import static org.eclipse.edc.test.e2e.Runtimes.backendService; import static org.eclipse.edc.util.io.Ports.getFreePort; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockserver.integration.ClientAndServer.startClientAndServer; @@ -113,17 +111,13 @@ void httpPull_dataTransfer_withCallbacks() { .withCallbacks(callbacks) .execute(); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); - }); + awaitTransferToBeInState(transferProcessId, STARTED); await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); var event = events.get(transferProcessId); var msg = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), equalTo(msg))); - } @Test @@ -272,13 +266,6 @@ public JsonObject createCallback(String url, boolean transactional, Set protected abstract void seedVaults(); - private void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { - await().atMost(timeout).until( - () -> CONSUMER.getTransferProcessState(transferProcessId), - it -> Objects.equals(it, state.name()) - ); - } - @NotNull private Map httpDataAddressProperties() { return Map.of( @@ -318,19 +305,23 @@ class InMemory extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension CONSUMER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlane("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.dataPlane("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("provider-backend-service", PROVIDER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("provider-backend-service", PROVIDER.backendServiceConfiguration())); @Override protected void seedVaults() { @@ -346,16 +337,19 @@ class EmbeddedDataPlane extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneEmbeddedDataPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension CONSUMER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlaneEmbeddedDataPlane("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("provider-backend-service", PROVIDER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("provider-backend-service", PROVIDER.backendServiceConfiguration())); @Override protected void seedVaults() { @@ -376,19 +370,23 @@ class Postgres extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.Postgres.controlPlane("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration())); + @RegisterExtension static final RuntimeExtension CONSUMER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.Postgres.controlPlane("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.Postgres.dataPlane("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration())); + Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("provider-backend-service", PROVIDER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("provider-backend-service", PROVIDER.backendServiceConfiguration())); @Override protected void seedVaults() { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java index 13146afc0de..09e6cc0d1c4 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java @@ -14,7 +14,6 @@ package org.eclipse.edc.test.e2e; -import jakarta.json.Json; import jakarta.json.JsonObject; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; @@ -31,14 +30,11 @@ import static io.restassured.RestAssured.given; import static jakarta.json.Json.createObjectBuilder; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETED; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase; -import static org.eclipse.edc.test.e2e.Runtimes.backendService; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -55,19 +51,18 @@ void httpPushDataTransfer() { createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(COMPLETED.name()); - - given() - .baseUri(CONSUMER.backendService().toString()) - .when() - .get("/api/consumer/data") - .then() - .statusCode(anyOf(is(200), is(204))) - .body(is(notNullValue())); - }); + var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + .withDestination(destination).withTransferType("HttpData-PUSH").execute(); + + awaitTransferToBeInState(transferProcessId, COMPLETED); + + given() + .baseUri(CONSUMER.backendService().toString()) + .when() + .get("/api/consumer/data") + .then() + .statusCode(anyOf(is(200), is(204))) + .body(is(notNullValue())); } @Test @@ -85,20 +80,18 @@ void httpToHttp_oauth2Provisioning() { createResourcesOnProvider(assetId, noConstraintPolicy(), sourceDataAddressProperties); var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); + var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + .withDestination(destination).withTransferType("HttpData-PUSH").execute(); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(COMPLETED.name()); + awaitTransferToBeInState(transferProcessId, COMPLETED); - given() - .baseUri(CONSUMER.backendService().toString()) - .when() - .get("/api/consumer/data") - .then() - .statusCode(anyOf(is(200), is(204))) - .body(is(notNullValue())); - }); + given() + .baseUri(CONSUMER.backendService().toString()) + .when() + .get("/api/consumer/data") + .then() + .statusCode(anyOf(is(200), is(204))) + .body(is(notNullValue())); } protected abstract void seedVaults(); @@ -120,10 +113,6 @@ private Map httpDataAddressProperties() { "proxyQueryParams", "true" ); } - - private JsonObject noPrivateProperty() { - return Json.createObjectBuilder().build(); - } } @Nested @@ -132,19 +121,23 @@ class InMemory extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension CONSUMER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlane("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.dataPlane("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("provider-backend-service", PROVIDER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("provider-backend-service", PROVIDER.backendServiceConfiguration())); @Override protected void seedVaults() { @@ -160,16 +153,19 @@ class EmbeddedDataPlane extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension CONSUMER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlaneEmbeddedDataPlane("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("provider-backend-service", PROVIDER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("provider-backend-service", PROVIDER.backendServiceConfiguration())); @Override protected void seedVaults() { @@ -190,19 +186,23 @@ class Postgres extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.Postgres.controlPlane("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration())); + @RegisterExtension static final RuntimeExtension CONSUMER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.Postgres.controlPlane("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.Postgres.dataPlane("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration())); + Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("provider-backend-service", PROVIDER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("provider-backend-service", PROVIDER.backendServiceConfiguration())); @Override protected void seedVaults() { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index 8cb08c4ec72..cbf453df81d 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.RuntimeExtension; import org.eclipse.edc.junit.extensions.RuntimePerClassExtension; @@ -42,7 +41,6 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.UUID; import javax.validation.constraints.NotNull; @@ -61,7 +59,6 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; -import static org.eclipse.edc.test.e2e.Runtimes.backendService; import static org.eclipse.edc.util.io.Ports.getFreePort; import static org.mockserver.integration.ClientAndServer.startClientAndServer; import static org.mockserver.model.HttpRequest.request; @@ -78,19 +75,23 @@ class InMemory extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension CONSUMER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("consumer-backend-service", CONSUMER.backendServiceConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.controlPlane("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.InMemory.dataPlane("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + @RegisterExtension static final RuntimeExtension PROVIDER_BACKEND_SERVICE = new RuntimePerClassExtension( - backendService("provider-backend-service", PROVIDER.backendServiceConfiguration())); + Runtimes.BACKEND_SERVICE.create("provider-backend-service", PROVIDER.backendServiceConfiguration())); } @@ -124,16 +125,14 @@ void kafkaToHttpTransfer() { createResourcesOnProvider(assetId, contractExpiresIn("10s"), kafkaSourceProperty()); var destination = httpSink(destinationServer.getLocalPort(), "/api/service"); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); + var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + .withDestination(destination).withTransferType("HttpData-PUSH").execute(); await().atMost(timeout).untilAsserted(() -> { destinationServer.verify(request, atLeast(1)); }); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED); - }); + awaitTransferToBeInState(transferProcessId, TERMINATED); destinationServer.clear(request) .when(request).respond(response()); @@ -158,7 +157,8 @@ void kafkaToKafkaTransfer() { var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, contractExpiresIn("10s"), kafkaSourceProperty()); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), kafkaSink(), "Kafka-PUSH"); + var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); awaitTransferToBeInState(transferProcessId, TERMINATED); @@ -174,7 +174,8 @@ void shouldSuspendAndResumeTransfer() { var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, noConstraintPolicy(), kafkaSourceProperty()); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), kafkaSink(), "Kafka-PUSH"); + var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); CONSUMER.suspendTransfer(transferProcessId, "any kind of reason"); @@ -203,13 +204,6 @@ private void assertNoMoreMessagesAreSentTo(Consumer consumer) { }); } - private void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { - await().atMost(timeout).until( - () -> CONSUMER.getTransferProcessState(transferProcessId), - it -> Objects.equals(it, state.name()) - ); - } - private JsonObject httpSink(Integer port, String path) { return Json.createObjectBuilder() .add(TYPE, EDC_NAMESPACE + "DataAddress") @@ -266,10 +260,6 @@ private String kafkaProperty(String property) { return "kafka." + property; } - private JsonObject noPrivateProperty() { - return Json.createObjectBuilder().build(); - } - private String sampleMessage() { return "sampleMessage"; }