Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Data Plane extensions that implement DSP/AAS integration #357

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions edc-dataplane/edc-dataplane-proxy-consumer-api/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin")
}

dependencies {

implementation(libs.jakarta.rsApi)

implementation(libs.edc.spi.http)
implementation(libs.edc.util)
implementation(libs.edc.dpf.framework)
implementation(libs.edc.api.observability)
implementation(libs.edc.dpf.util)
implementation(libs.edc.ext.http)

implementation(project(":spi:edr-cache-spi"))

testImplementation(libs.edc.junit)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.dataplane.proxy.consumer.api;

import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.web.spi.WebServer;
import org.eclipse.edc.web.spi.WebService;
import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer;
import org.eclipse.edc.web.spi.configuration.WebServiceSettings;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ClientErrorExceptionMapper;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;

import java.util.concurrent.ExecutorService;

import static java.util.concurrent.Executors.newFixedThreadPool;

/**
* Instantiates the Proxy Data API for the consumer-side data plane.
*/
@Extension(value = DataPlaneProxyConsumerApiExtension.NAME)
public class DataPlaneProxyConsumerApiExtension implements ServiceExtension {
static final String NAME = "Data Plane Proxy Consumer API";

private static final int DEFAULT_PROXY_PORT = 8186;
private static final String CONSUMER_API_ALIAS = "consumer.api";
private static final String CONSUMER_CONTEXT_PATH = "/proxy";
private static final String CONSUMER_CONFIG_KEY = "web.http.proxy";

@Setting(value = "Data plane proxy API consumer port", type = "int")
private static final String CONSUMER_PORT = "tx.dpf.consumer.proxy.port";

@Setting(value = "Thread pool size for the consumer data plane proxy gateway", type = "int")
private static final String THREAD_POOL_SIZE = "tx.dpf.consumer.proxy.thread.pool";

public static final int DEFAULT_THREAD_POOL = 10;

@Inject
private WebService webService;

@Inject
private WebServer webServer;

@Inject
private DataPlaneManager dataPlaneManager;

@Inject
private EndpointDataReferenceCache edrCache;

@Inject
private WebServiceConfigurer configurer;

@Inject
private Monitor monitor;

private ExecutorService executorService;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
var port = context.getSetting(CONSUMER_PORT, DEFAULT_PROXY_PORT);
configurer.configure(context, webServer, createApiContext(port));

executorService = newFixedThreadPool(context.getSetting(THREAD_POOL_SIZE, DEFAULT_THREAD_POOL));

webService.registerResource(CONSUMER_API_ALIAS, new ClientErrorExceptionMapper());
webService.registerResource(CONSUMER_API_ALIAS, new ConsumerAssetRequestController(edrCache, dataPlaneManager, executorService, monitor));
}

@Override
public void shutdown() {
if (executorService != null) {
executorService.shutdown();
}
}

private WebServiceSettings createApiContext(int port) {
return WebServiceSettings.Builder.newInstance()
.apiConfigKey(CONSUMER_CONFIG_KEY)
.contextAlias(CONSUMER_API_ALIAS)
.defaultPath(CONSUMER_CONTEXT_PATH)
.defaultPort(port)
.name(NAME)
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset;

import jakarta.ws.rs.ClientErrorException;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.ExceptionMapper;
import jakarta.ws.rs.ext.Provider;

/**
* Maps client errors to return the associated status.
*/
@Provider
public class ClientErrorExceptionMapper implements ExceptionMapper<ClientErrorException> {

public ClientErrorExceptionMapper() {
}

@Override
public Response toResponse(ClientErrorException exception) {
return Response.status(exception.getResponse().getStatus()).build();
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset;

import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.model.AssetRequest;

/**
* Defines the API for receiving asset requests on a consumer.
*/
@OpenAPIDefinition
@Tag(name = "Data Plane Proxy API")
public interface ConsumerAssetRequestApi {

@Operation(responses = {
@ApiResponse(content = @Content(mediaType = "application/json", schema = @Schema(implementation = AssetRequest.class)), description = "Requests asset data")
})
void requestAsset(AssetRequest request, @Suspended AsyncResponse response);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset;

import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.StreamingOutput;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.model.AssetRequest;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;

import java.util.Map;
import java.util.concurrent.ExecutorService;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static jakarta.ws.rs.core.Response.Status.BAD_GATEWAY;
import static jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED;
import static jakarta.ws.rs.core.Response.status;
import static java.lang.String.format;
import static java.util.UUID.randomUUID;

/**
* Implements the HTTP proxy API.
*/
@Path("/aas")
public class ConsumerAssetRequestController implements ConsumerAssetRequestApi {
private static final String HTTP_DATA = "HttpData";
private static final String ASYNC_TYPE = "async";
private static final String BASE_URL = "baseUrl";
private static final String HEADER_AUTHORIZATION = "header:authorization";
private static final String BEARER_PREFIX = "Bearer ";

private final EndpointDataReferenceCache edrCache;
private final DataPlaneManager dataPlaneManager;
private final Monitor monitor;

private final ExecutorService executorService;

public ConsumerAssetRequestController(EndpointDataReferenceCache edrCache,
DataPlaneManager dataPlaneManager,
ExecutorService executorService,
Monitor monitor) {
this.edrCache = edrCache;
this.dataPlaneManager = dataPlaneManager;
this.executorService = executorService;
this.monitor = monitor;
}

@POST
@Path("/request")
@Override
public void requestAsset(AssetRequest request, @Suspended AsyncResponse response) {
// resolve the EDR and add it to the request
var edr = resolveEdr(request);

var sourceAddress = DataAddress.Builder.newInstance()
.type(HTTP_DATA)
.property(BASE_URL, request.getEndpointUrl())
.property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode())
.build();

var destinationAddress = DataAddress.Builder.newInstance()
.type(ASYNC_TYPE)
.build();

var flowRequest = DataFlowRequest.Builder.newInstance()
.processId(randomUUID().toString())
.trackable(false)
.sourceDataAddress(sourceAddress)
.destinationDataAddress(destinationAddress)
.traceContext(Map.of())
.build();

// transfer the data asynchronously
var sink = new AsyncStreamingDataSink(consumer -> response.resume((StreamingOutput) consumer::accept), executorService, monitor);

try {
dataPlaneManager.transfer(sink, flowRequest).whenComplete((result, throwable) -> handleCompletion(response, result, throwable));
} catch (Exception e) {
reportError(response, e);
}
}

private EndpointDataReference resolveEdr(AssetRequest request) {
if (request.getTransferProcessId() != null) {
var edr = edrCache.resolveReference(request.getTransferProcessId());
if (edr == null) {
throw new BadRequestException("No EDR for transfer process: " + request.getTransferProcessId());
}
return edr;
} else {
var resolvedEdrs = edrCache.referencesForAsset(request.getAssetId());
if (resolvedEdrs.isEmpty()) {
throw new BadRequestException("No EDR for asset: " + request.getAssetId());
} else if (resolvedEdrs.size() > 1) {
throw new PreconditionFailedException("More than one EDR for asset: " + request.getAssetId());
}
return resolvedEdrs.get(0);
}
}

/**
* Handles a request completion, checking for errors. If no errors are present, nothing needs to be done as the response will have already been written to the client.
*/
private void handleCompletion(AsyncResponse response, StreamResult<Void> result, Throwable throwable) {
if (result != null && result.failed()) {
switch (result.reason()) {
case NOT_FOUND:
response.resume(status(NOT_FOUND).type(APPLICATION_JSON).build());
break;
case NOT_AUTHORIZED:
response.resume(status(UNAUTHORIZED).type(APPLICATION_JSON).build());
break;
case GENERAL_ERROR:
response.resume(status(INTERNAL_SERVER_ERROR).type(APPLICATION_JSON).build());
break;
}
} else if (throwable != null) {
reportError(response, throwable);
}
}

/**
* Reports an error to the client. On the consumer side, the error is reported as a {@code BAD_GATEWAY} since the consumer data plane acts as proxy.
*/
private void reportError(AsyncResponse response, Throwable throwable) {
monitor.severe("Error processing gateway request", throwable);
var entity = status(BAD_GATEWAY).entity(format("'%s'", throwable.getMessage())).type(APPLICATION_JSON).build();
response.resume(entity);
}

}
Loading