diff --git a/edc-dataplane/edc-dataplane-base/build.gradle.kts b/edc-dataplane/edc-dataplane-base/build.gradle.kts index 7847c2f9f..03a5ee3db 100644 --- a/edc-dataplane/edc-dataplane-base/build.gradle.kts +++ b/edc-dataplane/edc-dataplane-base/build.gradle.kts @@ -29,6 +29,7 @@ dependencies { runtimeOnly(project(":edc-extensions:dataplane-proxy:edc-dataplane-proxy-provider-core")) runtimeOnly(libs.edc.config.filesystem) + runtimeOnly(libs.edc.auth.tokenbased) runtimeOnly(libs.edc.dpf.awss3) runtimeOnly(libs.edc.dpf.oauth2) runtimeOnly(libs.edc.dpf.http) diff --git a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/README.md b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/README.md index c5a34edb9..ec590c1a2 100644 --- a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/README.md +++ b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/README.md @@ -17,6 +17,27 @@ The path is `/aas/request` and the body is something like this exa The body should contain the `assetId` or the `transferProcessId` which identify the data that we want to fetch and an `endpointUrl` which is the provider gateway on which the data is available. More info [here](../edc-dataplane-proxy-provider-api/README.md) on the gateway. +Alternatively if the `endpointUrl` is not known or the gateway on the provider side is not configured, it can be omitted and the `Edr#endpointUrl` +will be used. In this scenario if needed users can provide additional properties to the request for composing the final +url: + +- `pathSegments` sub path to append to the base url +- `queryParams` query parameters to add to the url + +Example with base url `http://localhost:8080/test` + +```json +{ + "assetId": "1", + "pathSegments": "/sub", + "queryParams": "foo=bar" +} +``` + +The final url will look like `http://localhost:8080/test/sub?foo=bar` composed by the DataPlane manager with the Http request flow, + +> Note: the endpoint is not protected with configured `AuthenticationService`, which most likely will be the token based (auth key) one. + ## Configuration | Key | Required | Default | Description | diff --git a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/build.gradle.kts b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/build.gradle.kts index 4cb349bca..fbe98c1a5 100644 --- a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/build.gradle.kts +++ b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { implementation(libs.edc.dpf.framework) implementation(libs.edc.dpf.util) implementation(libs.edc.ext.http) + implementation(libs.edc.spi.auth) implementation(project(":spi:edr-spi")) diff --git a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/DataPlaneProxyConsumerApiExtension.java b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/DataPlaneProxyConsumerApiExtension.java index dffbd97a0..3abbecbbc 100644 --- a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/DataPlaneProxyConsumerApiExtension.java +++ b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/DataPlaneProxyConsumerApiExtension.java @@ -14,6 +14,8 @@ package org.eclipse.tractusx.edc.dataplane.proxy.consumer.api; +import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter; +import org.eclipse.edc.api.auth.spi.AuthenticationService; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; @@ -63,6 +65,9 @@ public class DataPlaneProxyConsumerApiExtension implements ServiceExtension { @Inject private WebServiceConfigurer configurer; + @Inject + private AuthenticationService authenticationService; + @Inject private Monitor monitor; @@ -80,6 +85,7 @@ public void initialize(ServiceExtensionContext context) { executorService = newFixedThreadPool(context.getSetting(THREAD_POOL_SIZE, DEFAULT_THREAD_POOL)); + webService.registerResource(CONSUMER_API_ALIAS, new AuthenticationRequestFilter(authenticationService)); webService.registerResource(CONSUMER_API_ALIAS, new ClientErrorExceptionMapper()); webService.registerResource(CONSUMER_API_ALIAS, new ConsumerAssetRequestController(edrCache, dataPlaneManager, executorService, monitor)); } diff --git a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/ConsumerAssetRequestController.java b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/ConsumerAssetRequestController.java index e0201596f..fce090e3a 100644 --- a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/ConsumerAssetRequestController.java +++ b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/ConsumerAssetRequestController.java @@ -25,12 +25,15 @@ import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.HttpDataAddress; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.model.AssetRequest; import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; @@ -41,15 +44,17 @@ import static jakarta.ws.rs.core.Response.status; import static java.lang.String.format; import static java.util.UUID.randomUUID; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS; /** * Implements the HTTP proxy API. */ @Path("/aas") public class ConsumerAssetRequestController implements ConsumerAssetRequestApi { + public static final String BASE_URL = "baseUrl"; private static final String HTTP_DATA = "HttpData"; private static final String ASYNC_TYPE = "async"; - private static final String BASE_URL = "baseUrl"; private static final String HEADER_AUTHORIZATION = "header:authorization"; private static final String BEARER_PREFIX = "Bearer "; @@ -76,22 +81,27 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response // resolve the EDR and add it to the request var edr = resolveEdr(request); - var sourceAddress = DataAddress.Builder.newInstance() - .type(HTTP_DATA) - .property(BASE_URL, request.getEndpointUrl()) - .property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode()) - .build(); + var sourceAddress = Optional.ofNullable(request.getEndpointUrl()) + .map(url -> gatewayAddress(url, edr)) + .orElseGet(() -> dataPlaneAddress(edr)); + var destinationAddress = DataAddress.Builder.newInstance() .type(ASYNC_TYPE) .build(); + + var properties = Optional.ofNullable(request.getEndpointUrl()) + .map((url) -> Map.of()) + .orElseGet(() -> dataPlaneProperties(request)); + var flowRequest = DataFlowRequest.Builder.newInstance() .processId(randomUUID().toString()) .trackable(false) .sourceDataAddress(sourceAddress) .destinationDataAddress(destinationAddress) .traceContext(Map.of()) + .properties(properties) .build(); // transfer the data asynchronously @@ -104,6 +114,30 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response } } + + private Map dataPlaneProperties(AssetRequest request) { + var props = new HashMap(); + Optional.ofNullable(request.getQueryParams()).ifPresent((queryParams) -> props.put(QUERY_PARAMS, queryParams)); + Optional.ofNullable(request.getPathSegments()).ifPresent((path) -> props.put(PATH, path)); + return props; + } + + private DataAddress gatewayAddress(String url, EndpointDataReference edr) { + return HttpDataAddress.Builder.newInstance() + .baseUrl(url) + .property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode()) + .build(); + } + + private DataAddress dataPlaneAddress(EndpointDataReference edr) { + return HttpDataAddress.Builder.newInstance() + .baseUrl(edr.getEndpoint()) + .proxyQueryParams("true") + .proxyPath("true") + .property(HEADER_AUTHORIZATION, edr.getAuthCode()) + .build(); + } + private EndpointDataReference resolveEdr(AssetRequest request) { if (request.getTransferProcessId() != null) { var edr = edrCache.resolveReference(request.getTransferProcessId()); diff --git a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/AssetRequest.java b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/AssetRequest.java index ba4b06db1..562b9b612 100644 --- a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/AssetRequest.java +++ b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/main/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/AssetRequest.java @@ -19,8 +19,6 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; -import static java.util.Objects.requireNonNull; - /** * A request for asset data. The request may contain a transfer process ID or asset ID and must specify an endpoint for retrieving the data. */ @@ -33,6 +31,10 @@ public class AssetRequest { private String providerId; private String endpointUrl; + private String queryParams; + + private String pathSegments; + private AssetRequest() { } @@ -52,6 +54,14 @@ public String getProviderId() { return providerId; } + public String getQueryParams() { + return queryParams; + } + + public String getPathSegments() { + return pathSegments; + } + @JsonPOJOBuilder(withPrefix = "") public static class Builder { private final AssetRequest request; @@ -85,11 +95,20 @@ public Builder providerId(String providerId) { return this; } + public Builder queryParams(String queryParams) { + request.queryParams = queryParams; + return this; + } + + public Builder pathSegments(String pathSegments) { + request.pathSegments = pathSegments; + return this; + } + public AssetRequest build() { if (request.assetId == null && request.transferProcessId == null) { throw new NullPointerException("An assetId or endpointReferenceId must be set"); } - requireNonNull(request.endpointUrl, "endpointUrl"); return request; } } diff --git a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/AssetRequestTest.java b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/AssetRequestTest.java index 1da34af3a..4a81ff653 100644 --- a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/AssetRequestTest.java +++ b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/AssetRequestTest.java @@ -32,8 +32,10 @@ void verify_SerializeDeserialize() throws JsonProcessingException { .endpointUrl("https://test.com") .providerId("providerId") .transferProcessId("tp1") + .queryParams("params") + .pathSegments("path") .build(); - + var serialized = mapper.writeValueAsString(request); var deserialized = mapper.readValue(serialized, AssetRequest.class); @@ -42,13 +44,14 @@ void verify_SerializeDeserialize() throws JsonProcessingException { assertThat(deserialized.getTransferProcessId()).isEqualTo(request.getTransferProcessId()); assertThat(deserialized.getEndpointUrl()).isEqualTo(request.getEndpointUrl()); assertThat(deserialized.getProviderId()).isEqualTo(request.getProviderId()); + assertThat(deserialized.getPathSegments()).isEqualTo(request.getPathSegments()); + assertThat(deserialized.getQueryParams()).isEqualTo(request.getQueryParams()); } @Test void verify_NullArguments() { - assertThatThrownBy(() -> AssetRequest.Builder.newInstance().endpointUrl("https://test.com").build()).isInstanceOf(NullPointerException.class); - assertThatThrownBy(() -> AssetRequest.Builder.newInstance().assetId("asset1").build()).isInstanceOf(NullPointerException.class); + assertThatThrownBy(() -> AssetRequest.Builder.newInstance().build()).isInstanceOf(NullPointerException.class); } @Test diff --git a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/ConsumerAssetRequestControllerTest.java b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/ConsumerAssetRequestControllerTest.java index 9707cf0b8..8f4528088 100644 --- a/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/ConsumerAssetRequestControllerTest.java +++ b/edc-extensions/dataplane-proxy/edc-dataplane-proxy-consumer-api/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/consumer/api/asset/model/ConsumerAssetRequestControllerTest.java @@ -26,6 +26,7 @@ import org.eclipse.edc.junit.annotations.ApiTest; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController; import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache; @@ -33,6 +34,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -48,8 +50,12 @@ import static jakarta.ws.rs.core.Response.Status.NOT_FOUND; import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED; import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS; +import static org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController.BASE_URL; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ApiTest @@ -243,6 +249,61 @@ void requestAsset_shouldReturnError_whenEdrByTransferProcessIdNotFound() { } + @Test + void requestAsset_shouldReturnData_withDataPlaneUrl() throws IOException { + + var transferProcessId = "tp"; + var url = "http://localhost:8080/test"; + var request = Map.of("transferProcessId", transferProcessId, PATH, "/path", QUERY_PARAMS, "test=10&foo=bar"); + var edr = EndpointDataReference.Builder.newInstance() + .id(transferProcessId) + .authKey("authKey") + .authCode("authCode") + .endpoint(url) + .build(); + + var response = Map.of("response", "ok"); + var responseBytes = mapper.writeValueAsBytes(response); + + var datasource = mock(DataSource.class); + var partStream = mock(DataSource.Part.class); + + when(datasource.openPartStream()).thenReturn(StreamResult.success(Stream.of(partStream))); + when(partStream.openStream()).thenReturn(new ByteArrayInputStream(responseBytes)); + + when(cache.resolveReference(transferProcessId)).thenReturn(edr); + when(dataPlaneManager.transfer(any(DataSink.class), any())) + .thenAnswer(a -> { + AsyncStreamingDataSink sink = a.getArgument(0); + return sink.transfer(datasource); + }); + + var proxyResponseBytes = baseRequest() + .contentType(MediaType.APPLICATION_JSON) + .body(request) + .post(ASSET_REQUEST_PATH) + .then() + .statusCode(200) + .extract().body().asByteArray(); + + var proxyResponse = mapper.readValue(proxyResponseBytes, new TypeReference>() { + }); + + assertThat(proxyResponse).containsAllEntriesOf(response); + + var captor = ArgumentCaptor.forClass(DataFlowRequest.class); + verify(dataPlaneManager).transfer(any(DataSink.class), captor.capture()); + + + var flowRequest = captor.getValue(); + + assertThat(flowRequest.getSourceDataAddress().getProperty(BASE_URL)).isEqualTo(edr.getEndpoint()); + + assertThat(flowRequest.getProperties().get(QUERY_PARAMS)).isEqualTo(request.get(QUERY_PARAMS)); + assertThat(flowRequest.getProperties().get(PATH)).isEqualTo(request.get(PATH)); + + } + @Override protected Object controller() { return new ConsumerAssetRequestController(cache, dataPlaneManager, Executors.newSingleThreadExecutor(), mock(Monitor.class)); diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/Participant.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/Participant.java index 93b51560a..8e6b9ac66 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/Participant.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/Participant.java @@ -331,6 +331,17 @@ public String pullProxyDataByAssetId(Participant provider, String assetId) { return getProxyData(body); } + + public String pullProviderDataPlaneDataByAssetId(Participant provider, String assetId) { + var body = Map.of("assetId", assetId); + return getProxyData(body); + } + + public String pullProviderDataPlaneDataByAssetIdAndCustomProperties(Participant provider, String assetId, String path, String params) { + var body = Map.of("assetId", assetId, "pathSegments", path, "queryParams", params); + return getProxyData(body); + } + public Response pullProxyDataResponseByAssetId(Participant provider, String assetId) { var body = Map.of("assetId", assetId, "endpointUrl", format("%s/aas/test", provider.gatewayEndpoint), @@ -345,6 +356,12 @@ public String pullProxyDataByTransferProcessId(Participant provider, String tran } + public String pullProviderDataPlaneDataByTransferProcessId(Participant provider, String transferProcessId) { + var body = Map.of("transferProcessId", transferProcessId); + return getProxyData(body); + + } + public JsonObject getDatasetForAsset(Participant provider, String assetId) { var datasets = getCatalogDatasets(provider); return datasets.stream() @@ -374,6 +391,7 @@ private String getProxyData(Map body) { private Response proxyRequest(Map body) { return given() .baseUri(proxyUrl) + .header("x-api-key", apiKey) .contentType("application/json") .body(body) .post(PROXY_SUBPATH); diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/proxy/AbstractDataPlaneProxyTest.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/proxy/AbstractDataPlaneProxyTest.java index 60f02d677..54bdd7e73 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/proxy/AbstractDataPlaneProxyTest.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/proxy/AbstractDataPlaneProxyTest.java @@ -17,11 +17,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.json.Json; +import okhttp3.mockwebserver.Dispatcher; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.tractusx.edc.lifecycle.Participant; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -49,6 +52,12 @@ public abstract class AbstractDataPlaneProxyTest { protected static final Participant SOKRATES = new Participant(SOKRATES_NAME, SOKRATES_BPN, sokratesConfiguration()); protected static final Participant PLATO = new Participant(PLATO_NAME, PLATO_BPN, platoConfiguration()); + private static final String CUSTOM_BASE_PATH = "/custom"; + private static final String CUSTOM_SUB_PATH = "/sub"; + + private static final String CUSTOM_QUERY_PARAMS = "foo=bar"; + + private static final String CUSTOM_FULL_PATH = CUSTOM_BASE_PATH + CUSTOM_SUB_PATH + "?" + CUSTOM_QUERY_PARAMS; private final ObjectMapper mapper = new ObjectMapper(); private MockWebServer server; @@ -173,6 +182,102 @@ void httpPullDataTransfer_shouldFailForAsset_withTwoEdrAndProxy() throws IOExcep assertThat(data).isEqualTo(body); } + @Test + @DisplayName("Verify E2E flow with Data Plane provider and EDR") + void httpPullDataTransfer_withEdrAndProviderDataPlaneProxy() throws IOException { + + var eventsUrl = server.url(PROXIED_PATH); + + var assetId = UUID.randomUUID().toString(); + var authCodeHeaderName = "test-authkey"; + var authCode = "test-authcode"; + PLATO.createAsset(assetId, Json.createObjectBuilder().build(), Json.createObjectBuilder() + .add(EDC_NAMESPACE + "type", "HttpData") + .add(EDC_NAMESPACE + "contentType", "application/json") + .add(EDC_NAMESPACE + "baseUrl", eventsUrl.toString()) + .add(EDC_NAMESPACE + "authKey", authCodeHeaderName) + .add(EDC_NAMESPACE + "authCode", authCode) + .build()); + + PLATO.createPolicy(businessPartnerNumberPolicy("policy-1", SOKRATES.getBpn())); + PLATO.createPolicy(businessPartnerNumberPolicy("policy-2", SOKRATES.getBpn())); + PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2"); + + var callbacks = Json.createArrayBuilder() + .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed"))) + .build(); + + // response to callback + server.enqueue(new MockResponse()); + + SOKRATES.negotiateEdr(PLATO, assetId, callbacks); + + var transferEvent = waitForTransferCompletion(); + + var body = "{\"response\": \"ok\"}"; + + server.enqueue(new MockResponse().setBody(body)); + var data = SOKRATES.pullProviderDataPlaneDataByAssetId(PLATO, assetId); + assertThat(data).isEqualTo(body); + + server.enqueue(new MockResponse().setBody(body)); + data = SOKRATES.pullProviderDataPlaneDataByTransferProcessId(PLATO, transferEvent.getPayload().getTransferProcessId()); + assertThat(data).isEqualTo(body); + } + + + @Test + @DisplayName("Verify E2E flow with Data Plane provider and EDR") + void httpPullDataTransfer_withEdrAndProviderDataPlaneProxyAndCustomProperties() throws IOException { + + var eventsUrl = server.url(PROXIED_PATH); + + var customUrl = server.url(CUSTOM_BASE_PATH); + + var body = "{\"response\": \"ok\"}"; + + server.setDispatcher(new Dispatcher() { + @NotNull + @Override + public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws InterruptedException { + return switch (recordedRequest.getPath()) { + case PROXIED_PATH -> new MockResponse(); + case CUSTOM_FULL_PATH -> new MockResponse().setBody(body); + default -> new MockResponse().setResponseCode(404); + }; + } + }); + + var assetId = UUID.randomUUID().toString(); + var authCodeHeaderName = "test-authkey"; + var authCode = "test-authcode"; + PLATO.createAsset(assetId, Json.createObjectBuilder().build(), Json.createObjectBuilder() + .add(EDC_NAMESPACE + "type", "HttpData") + .add(EDC_NAMESPACE + "contentType", "application/json") + .add(EDC_NAMESPACE + "baseUrl", customUrl.toString()) + .add(EDC_NAMESPACE + "authKey", authCodeHeaderName) + .add(EDC_NAMESPACE + "authCode", authCode) + .add(EDC_NAMESPACE + "proxyPath", "true") + .add(EDC_NAMESPACE + "proxyQueryParams", "true") + .build()); + + PLATO.createPolicy(businessPartnerNumberPolicy("policy-1", SOKRATES.getBpn())); + PLATO.createPolicy(businessPartnerNumberPolicy("policy-2", SOKRATES.getBpn())); + PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2"); + + var callbacks = Json.createArrayBuilder() + .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed"))) + .build(); + + SOKRATES.negotiateEdr(PLATO, assetId, callbacks); + + waitForTransferCompletion(); + + var data = SOKRATES.pullProviderDataPlaneDataByAssetIdAndCustomProperties(PLATO, assetId, CUSTOM_SUB_PATH, CUSTOM_QUERY_PARAMS); + assertThat(data).isEqualTo(body); + + } + @BeforeEach void setup() throws IOException { server = new MockWebServer(); diff --git a/edc-tests/edc-dataplane-proxy-e2e/build.gradle.kts b/edc-tests/edc-dataplane-proxy-e2e/build.gradle.kts index 4dcda6a8f..eeb8473f3 100644 --- a/edc-tests/edc-dataplane-proxy-e2e/build.gradle.kts +++ b/edc-tests/edc-dataplane-proxy-e2e/build.gradle.kts @@ -24,6 +24,7 @@ dependencies { // test runtime config testImplementation(libs.edc.config.filesystem) testImplementation(libs.edc.dpf.http) + testImplementation(libs.edc.auth.tokenbased) testImplementation(project(":spi:edr-spi")) testImplementation(project(":core:edr-cache-core")) testImplementation(project(":edc-extensions:dataplane-proxy:edc-dataplane-proxy-consumer-api")) diff --git a/edc-tests/edc-dataplane-proxy-e2e/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/e2e/DpfProxyEndToEndTest.java b/edc-tests/edc-dataplane-proxy-e2e/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/e2e/DpfProxyEndToEndTest.java index 484cb67c2..588f6d698 100644 --- a/edc-tests/edc-dataplane-proxy-e2e/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/e2e/DpfProxyEndToEndTest.java +++ b/edc-tests/edc-dataplane-proxy-e2e/src/test/java/org/eclipse/tractusx/edc/dataplane/proxy/e2e/DpfProxyEndToEndTest.java @@ -74,6 +74,7 @@ public class DpfProxyEndToEndTest { private static final String REQUEST_TEMPLATE_TP = "{\"transferProcessId\": \"%s\", \"endpointUrl\" : \"http://localhost:%s/api/gateway/aas/test\"}"; private static final String REQUEST_TEMPLATE_ASSET = "{\"assetId\": \"%s\", \"endpointUrl\" : \"http://localhost:%s/api/gateway/aas/test\"}"; private static final String MOCK_ENDPOINT_200_BODY = "{\"message\":\"test\"}"; + private static final String API_KEY = "testkey"; @RegisterExtension static EdcRuntimeExtension consumer = new EdcRuntimeExtension( @@ -81,9 +82,9 @@ public class DpfProxyEndToEndTest { "consumer", baseConfig(Map.of( "web.http.port", valueOf(CONSUMER_HTTP_PORT), + "edc.api.auth.key", API_KEY, "tx.dpf.consumer.proxy.port", valueOf(CONSUMER_PROXY_PORT) ))); - @RegisterExtension static EdcRuntimeExtension provider = new EdcRuntimeExtension( LAUNCHER_MODULE, @@ -184,6 +185,7 @@ private RequestSpecification createSpecification(String body) { return given() .baseUri("http://localhost:" + CONSUMER_PROXY_PORT) .contentType("application/json") + .header("x-api-key", API_KEY) .body(body); }