Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(DataPlaneConsumerProxy): adds support for data plane provider url #643

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions edc-dataplane/edc-dataplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ dependencies {
runtimeOnly(project(":edc-extensions:dataplane-proxy:edc-dataplane-proxy-provider-core"))

runtimeOnly(libs.edc.config.filesystem)
runtimeOnly(libs.edc.auth.tokenbased)
runtimeOnly(libs.edc.dpf.awss3)
runtimeOnly(libs.edc.dpf.oauth2)
runtimeOnly(libs.edc.dpf.http)
Original file line number Diff line number Diff line change
@@ -17,6 +17,27 @@ The path is `<proxyContext>/aas/request` and the body is something like this exa
The body should contain the `assetId` or the `transferProcessId` which identify the data that we want to fetch
and an `endpointUrl` which is the provider gateway on which the data is available. More info [here](../edc-dataplane-proxy-provider-api/README.md) on the gateway.

Alternatively if the `endpointUrl` is not known or the gateway on the provider side is not configured, it can be omitted and the `Edr#endpointUrl`
will be used. In this scenario if needed users can provide additional properties to the request for composing the final
url:

- `pathSegments` sub path to append to the base url
- `queryParams` query parameters to add to the url

Example with base url `http://localhost:8080/test`

```json
{
"assetId": "1",
"pathSegments": "/sub",
"queryParams": "foo=bar"
}
```

The final url will look like `http://localhost:8080/test/sub?foo=bar` composed by the DataPlane manager with the Http request flow,

> Note: the endpoint is not protected with configured `AuthenticationService`, which most likely will be the token based (auth key) one.

## Configuration

| Key | Required | Default | Description |
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ dependencies {
implementation(libs.edc.dpf.framework)
implementation(libs.edc.dpf.util)
implementation(libs.edc.ext.http)
implementation(libs.edc.spi.auth)

implementation(project(":spi:edr-spi"))

Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@

package org.eclipse.tractusx.edc.dataplane.proxy.consumer.api;

import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
@@ -63,6 +65,9 @@ public class DataPlaneProxyConsumerApiExtension implements ServiceExtension {
@Inject
private WebServiceConfigurer configurer;

@Inject
private AuthenticationService authenticationService;

@Inject
private Monitor monitor;

@@ -80,6 +85,7 @@ public void initialize(ServiceExtensionContext context) {

executorService = newFixedThreadPool(context.getSetting(THREAD_POOL_SIZE, DEFAULT_THREAD_POOL));

webService.registerResource(CONSUMER_API_ALIAS, new AuthenticationRequestFilter(authenticationService));
webService.registerResource(CONSUMER_API_ALIAS, new ClientErrorExceptionMapper());
webService.registerResource(CONSUMER_API_ALIAS, new ConsumerAssetRequestController(edrCache, dataPlaneManager, executorService, monitor));
}
Original file line number Diff line number Diff line change
@@ -25,12 +25,15 @@
import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.HttpDataAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.model.AssetRequest;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
@@ -41,15 +44,17 @@
import static jakarta.ws.rs.core.Response.status;
import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS;

/**
* Implements the HTTP proxy API.
*/
@Path("/aas")
public class ConsumerAssetRequestController implements ConsumerAssetRequestApi {
public static final String BASE_URL = "baseUrl";
private static final String HTTP_DATA = "HttpData";
private static final String ASYNC_TYPE = "async";
private static final String BASE_URL = "baseUrl";
private static final String HEADER_AUTHORIZATION = "header:authorization";
private static final String BEARER_PREFIX = "Bearer ";

@@ -76,22 +81,27 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response
// resolve the EDR and add it to the request
var edr = resolveEdr(request);

var sourceAddress = DataAddress.Builder.newInstance()
.type(HTTP_DATA)
.property(BASE_URL, request.getEndpointUrl())
.property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode())
.build();
var sourceAddress = Optional.ofNullable(request.getEndpointUrl())
.map(url -> gatewayAddress(url, edr))
.orElseGet(() -> dataPlaneAddress(edr));


var destinationAddress = DataAddress.Builder.newInstance()
.type(ASYNC_TYPE)
.build();


var properties = Optional.ofNullable(request.getEndpointUrl())
.map((url) -> Map.<String, String>of())
.orElseGet(() -> dataPlaneProperties(request));

var flowRequest = DataFlowRequest.Builder.newInstance()
.processId(randomUUID().toString())
.trackable(false)
.sourceDataAddress(sourceAddress)
.destinationDataAddress(destinationAddress)
.traceContext(Map.of())
.properties(properties)
.build();

// transfer the data asynchronously
@@ -104,6 +114,30 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response
}
}


private Map<String, String> dataPlaneProperties(AssetRequest request) {
var props = new HashMap<String, String>();
Optional.ofNullable(request.getQueryParams()).ifPresent((queryParams) -> props.put(QUERY_PARAMS, queryParams));
Optional.ofNullable(request.getPathSegments()).ifPresent((path) -> props.put(PATH, path));
return props;
}

private DataAddress gatewayAddress(String url, EndpointDataReference edr) {
return HttpDataAddress.Builder.newInstance()
.baseUrl(url)
.property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode())
.build();
}

private DataAddress dataPlaneAddress(EndpointDataReference edr) {
return HttpDataAddress.Builder.newInstance()
.baseUrl(edr.getEndpoint())
.proxyQueryParams("true")
.proxyPath("true")
.property(HEADER_AUTHORIZATION, edr.getAuthCode())
.build();
}

private EndpointDataReference resolveEdr(AssetRequest request) {
if (request.getTransferProcessId() != null) {
var edr = edrCache.resolveReference(request.getTransferProcessId());
Original file line number Diff line number Diff line change
@@ -19,8 +19,6 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;

import static java.util.Objects.requireNonNull;

/**
* A request for asset data. The request may contain a transfer process ID or asset ID and must specify an endpoint for retrieving the data.
*/
@@ -33,6 +31,10 @@ public class AssetRequest {
private String providerId;
private String endpointUrl;

private String queryParams;

private String pathSegments;

private AssetRequest() {
}

@@ -52,6 +54,14 @@ public String getProviderId() {
return providerId;
}

public String getQueryParams() {
return queryParams;
}

public String getPathSegments() {
return pathSegments;
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder {
private final AssetRequest request;
@@ -85,11 +95,20 @@ public Builder providerId(String providerId) {
return this;
}

public Builder queryParams(String queryParams) {
request.queryParams = queryParams;
return this;
}

public Builder pathSegments(String pathSegments) {
request.pathSegments = pathSegments;
return this;
}

public AssetRequest build() {
if (request.assetId == null && request.transferProcessId == null) {
throw new NullPointerException("An assetId or endpointReferenceId must be set");
}
requireNonNull(request.endpointUrl, "endpointUrl");
return request;
}
}
Original file line number Diff line number Diff line change
@@ -32,8 +32,10 @@ void verify_SerializeDeserialize() throws JsonProcessingException {
.endpointUrl("https://test.com")
.providerId("providerId")
.transferProcessId("tp1")
.queryParams("params")
.pathSegments("path")
.build();

var serialized = mapper.writeValueAsString(request);

var deserialized = mapper.readValue(serialized, AssetRequest.class);
@@ -42,13 +44,14 @@ void verify_SerializeDeserialize() throws JsonProcessingException {
assertThat(deserialized.getTransferProcessId()).isEqualTo(request.getTransferProcessId());
assertThat(deserialized.getEndpointUrl()).isEqualTo(request.getEndpointUrl());
assertThat(deserialized.getProviderId()).isEqualTo(request.getProviderId());
assertThat(deserialized.getPathSegments()).isEqualTo(request.getPathSegments());
assertThat(deserialized.getQueryParams()).isEqualTo(request.getQueryParams());

}

@Test
void verify_NullArguments() {
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().endpointUrl("https://test.com").build()).isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().assetId("asset1").build()).isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().build()).isInstanceOf(NullPointerException.class);
}

@Test
Original file line number Diff line number Diff line change
@@ -26,13 +26,15 @@
import org.eclipse.edc.junit.annotations.ApiTest;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;

import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -48,8 +50,12 @@
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS;
import static org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController.BASE_URL;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ApiTest
@@ -243,6 +249,61 @@ void requestAsset_shouldReturnError_whenEdrByTransferProcessIdNotFound() {

}

@Test
void requestAsset_shouldReturnData_withDataPlaneUrl() throws IOException {

var transferProcessId = "tp";
var url = "http://localhost:8080/test";
var request = Map.of("transferProcessId", transferProcessId, PATH, "/path", QUERY_PARAMS, "test=10&foo=bar");
var edr = EndpointDataReference.Builder.newInstance()
.id(transferProcessId)
.authKey("authKey")
.authCode("authCode")
.endpoint(url)
.build();

var response = Map.of("response", "ok");
var responseBytes = mapper.writeValueAsBytes(response);

var datasource = mock(DataSource.class);
var partStream = mock(DataSource.Part.class);

when(datasource.openPartStream()).thenReturn(StreamResult.success(Stream.of(partStream)));
when(partStream.openStream()).thenReturn(new ByteArrayInputStream(responseBytes));

when(cache.resolveReference(transferProcessId)).thenReturn(edr);
when(dataPlaneManager.transfer(any(DataSink.class), any()))
.thenAnswer(a -> {
AsyncStreamingDataSink sink = a.getArgument(0);
return sink.transfer(datasource);
});

var proxyResponseBytes = baseRequest()
.contentType(MediaType.APPLICATION_JSON)
.body(request)
.post(ASSET_REQUEST_PATH)
.then()
.statusCode(200)
.extract().body().asByteArray();

var proxyResponse = mapper.readValue(proxyResponseBytes, new TypeReference<Map<String, String>>() {
});

assertThat(proxyResponse).containsAllEntriesOf(response);

var captor = ArgumentCaptor.forClass(DataFlowRequest.class);
verify(dataPlaneManager).transfer(any(DataSink.class), captor.capture());


var flowRequest = captor.getValue();

assertThat(flowRequest.getSourceDataAddress().getProperty(BASE_URL)).isEqualTo(edr.getEndpoint());

assertThat(flowRequest.getProperties().get(QUERY_PARAMS)).isEqualTo(request.get(QUERY_PARAMS));
assertThat(flowRequest.getProperties().get(PATH)).isEqualTo(request.get(PATH));

}

@Override
protected Object controller() {
return new ConsumerAssetRequestController(cache, dataPlaneManager, Executors.newSingleThreadExecutor(), mock(Monitor.class));
Original file line number Diff line number Diff line change
@@ -331,6 +331,17 @@ public String pullProxyDataByAssetId(Participant provider, String assetId) {
return getProxyData(body);
}


public String pullProviderDataPlaneDataByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId);
return getProxyData(body);
}

public String pullProviderDataPlaneDataByAssetIdAndCustomProperties(Participant provider, String assetId, String path, String params) {
var body = Map.of("assetId", assetId, "pathSegments", path, "queryParams", params);
return getProxyData(body);
}

public Response pullProxyDataResponseByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId,
"endpointUrl", format("%s/aas/test", provider.gatewayEndpoint),
@@ -345,6 +356,12 @@ public String pullProxyDataByTransferProcessId(Participant provider, String tran

}

public String pullProviderDataPlaneDataByTransferProcessId(Participant provider, String transferProcessId) {
var body = Map.of("transferProcessId", transferProcessId);
return getProxyData(body);

}

public JsonObject getDatasetForAsset(Participant provider, String assetId) {
var datasets = getCatalogDatasets(provider);
return datasets.stream()
@@ -374,6 +391,7 @@ private String getProxyData(Map<String, String> body) {
private Response proxyRequest(Map<String, String> body) {
return given()
.baseUri(proxyUrl)
.header("x-api-key", apiKey)
.contentType("application/json")
.body(body)
.post(PROXY_SUBPATH);
Loading