Skip to content

Commit

Permalink
feat(EdrManagementApi): implements first EDR management APIs (#331)
Browse files Browse the repository at this point in the history
* feat(CPA): adds EDR api for querying the cache and getting the EDR by ID

* open api

* adds more tests on InMemoryEndpointDataReferenceCache

* pr remarks

* pr remarks
  • Loading branch information
wolf4ood authored May 23, 2023
1 parent 5a61b03 commit 7f24bcb
Show file tree
Hide file tree
Showing 31 changed files with 699 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static java.util.Collections.emptyList;
Expand All @@ -37,12 +41,16 @@ public class InMemoryEndpointDataReferenceCache implements EndpointDataReference
private final LockManager lockManager;

private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, List<EndpointDataReferenceEntry>> entriesByAgreementId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;
private final Map<String, EndpointDataReference> edrsByTransferProcessId;

public InMemoryEndpointDataReferenceCache() {
lockManager = new LockManager(new ReentrantReadWriteLock());
entriesByAssetId = new HashMap<>();
entriesByAgreementId = new HashMap<>();
entriesByEdrId = new HashMap<>();
edrsByTransferProcessId = new HashMap<>();
}
Expand All @@ -68,13 +76,21 @@ public List<EndpointDataReferenceEntry> entriesForAsset(String assetId) {
return lockManager.readLock(() -> entriesByAssetId.getOrDefault(assetId, emptyList()));
}

@Override
public @NotNull List<EndpointDataReferenceEntry> entriesForAgreement(String agreementId) {
return lockManager.readLock(() -> entriesByAgreementId.getOrDefault(agreementId, emptyList()));
}

@Override
public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
lockManager.writeLock(() -> {
entriesByEdrId.put(edr.getId(), entry);
var list = entriesByAssetId.computeIfAbsent(entry.getAssetId(), k -> new ArrayList<>());
list.add(entry);

var agreementList = entriesByAgreementId.computeIfAbsent(entry.getAgreementId(), k -> new ArrayList<>());
agreementList.add(entry);

edrsByTransferProcessId.put(entry.getTransferProcessId(), edr);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.junit.jupiter.api.Test;

import static java.util.UUID.randomUUID;
import static org.assertj.core.api.Assertions.assertThat;

class InMemoryEndpointDataReferenceCacheTest {
private static final String TRANSFER_PROCESS_ID = "tp1";
private static final String ASSET_ID = "asset1";
private static final String AGREEMENT_ID = "agreement1";

private static final String EDR_ID = "edr1";

private InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache();
Expand All @@ -39,7 +40,7 @@ void verify_operations() {

var entry = EndpointDataReferenceEntry.Builder.newInstance()
.assetId(ASSET_ID)
.agreementId(randomUUID().toString())
.agreementId(AGREEMENT_ID)
.transferProcessId(TRANSFER_PROCESS_ID)
.build();

Expand All @@ -55,6 +56,10 @@ void verify_operations() {
assertThat(entries.size()).isEqualTo(1);
assertThat(entries.get((0)).getAssetId()).isEqualTo(ASSET_ID);

entries = cache.entriesForAgreement(AGREEMENT_ID);
assertThat(entries.size()).isEqualTo(1);
assertThat(entries.get((0)).getAgreementId()).isEqualTo(AGREEMENT_ID);

assertThat(cache.deleteByTransferProcessId(TRANSFER_PROCESS_ID).succeeded()).isTrue();

assertThat(cache.entriesForAsset(ASSET_ID)).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ plugins {
}

dependencies {
implementation(project(":spi:core-spi"))
api(libs.edc.spi.core)
implementation(libs.edc.spi.policy)
implementation(libs.edc.spi.contract)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.eclipse.tractusx.edc.validation.businesspartner.functions.BusinessPartnerProhibitionFunction;

import static org.eclipse.edc.policy.engine.spi.PolicyEngine.ALL_SCOPES;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;

public class BusinessPartnerValidationExtension implements ServiceExtension {

Expand All @@ -55,8 +54,7 @@ public class BusinessPartnerValidationExtension implements ServiceExtension {
* }
* </pre>
*/
// TODO replace with TX namespace
public static final String BUSINESS_PARTNER_CONSTRAINT_KEY = EDC_NAMESPACE + "BusinessPartnerNumber";
public static final String BUSINESS_PARTNER_CONSTRAINT_KEY = "BusinessPartnerNumber";

public static final String DEFAULT_LOG_AGREEMENT_EVALUATION = "true";

Expand Down
3 changes: 3 additions & 0 deletions edc-extensions/control-plane-adapter-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ plugins {

dependencies {
implementation(project(":spi:control-plane-adapter-spi"))
implementation(project(":spi:edr-cache-spi"))
implementation(project(":spi:core-spi"))

implementation(libs.edc.api.management)
implementation(libs.edc.spi.aggregateservices)
implementation(libs.jakarta.rsApi)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.WebService;
import org.eclipse.tractusx.edc.api.cp.adapter.transform.JsonObjectFromEndpointDataReferenceEntryTransformer;
import org.eclipse.tractusx.edc.api.cp.adapter.transform.JsonObjectToNegotiateEdrRequestDtoTransformer;
import org.eclipse.tractusx.edc.api.cp.adapter.transform.NegotiateEdrRequestDtoToNegotiatedEdrRequestTransformer;
import org.eclipse.tractusx.edc.spi.cp.adapter.service.AdapterTransferProcessService;

import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.TX_NAMESPACE;
import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.TX_PREFIX;

public class AdapterApiExtension implements ServiceExtension {

@Inject
Expand All @@ -43,8 +47,10 @@ public class AdapterApiExtension implements ServiceExtension {

@Override
public void initialize(ServiceExtensionContext context) {
jsonLdService.registerNamespace(TX_PREFIX, TX_NAMESPACE);
transformerRegistry.register(new NegotiateEdrRequestDtoToNegotiatedEdrRequestTransformer());
transformerRegistry.register(new JsonObjectToNegotiateEdrRequestDtoTransformer());
transformerRegistry.register(new JsonObjectFromEndpointDataReferenceEntryTransformer());
webService.registerResource(apiConfig.getContextAlias(), new AdapterEdrController(adapterTransferProcessService, jsonLdService, transformerRegistry));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.json.JsonObject;
import org.eclipse.edc.api.model.DataAddressDto;
import org.eclipse.edc.api.model.IdResponseDto;
import org.eclipse.edc.web.spi.ApiErrorDetail;
import org.eclipse.tractusx.edc.api.cp.adapter.dto.NegotiateEdrRequestDto;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;

import java.util.List;

@OpenAPIDefinition
@Tag(name = "Control Plane Adapter EDR Api")
Expand All @@ -39,4 +43,25 @@ public interface AdapterEdrApi {
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiErrorDetail.class)))),
})
JsonObject initiateEdrNegotiation(@Schema(implementation = NegotiateEdrRequestDto.class) JsonObject dto);

@Operation(description = "Returns all EndpointDataReference entry according to a query",
responses = {
@ApiResponse(responseCode = "200",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = EndpointDataReferenceEntry.class)))),
@ApiResponse(responseCode = "400", description = "Request was malformed",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiErrorDetail.class))))}
)
List<JsonObject> queryEdrs(String assetId, String agreementId);

@Operation(description = "Gets an EDR with the given transfer process ID)",
responses = {
@ApiResponse(responseCode = "200", description = "The EDR cached",
content = @Content(schema = @Schema(implementation = DataAddressDto.class))),
@ApiResponse(responseCode = "400", description = "Request was malformed, e.g. id was null",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiErrorDetail.class)))),
@ApiResponse(responseCode = "404", description = "An EDR with the given ID does not exist",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiErrorDetail.class))))
}
)
JsonObject getEdr(String transferProcessId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,30 @@

import jakarta.json.JsonObject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.edc.api.model.IdResponseDto;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataAddressConstants;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.exception.InvalidRequestException;
import org.eclipse.tractusx.edc.api.cp.adapter.dto.NegotiateEdrRequestDto;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.spi.cp.adapter.model.NegotiateEdrRequest;
import org.eclipse.tractusx.edc.spi.cp.adapter.service.AdapterTransferProcessService;

import java.util.List;
import java.util.stream.Collectors;

import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.exceptionMapper;

@Consumes({MediaType.APPLICATION_JSON})
Expand All @@ -40,6 +51,8 @@ public class AdapterEdrController implements AdapterEdrApi {
private final TypeTransformerRegistry transformerRegistry;
private final JsonLd jsonLdService;

private Monitor monitor;

public AdapterEdrController(AdapterTransferProcessService adapterTransferProcessService, JsonLd jsonLdService, TypeTransformerRegistry transformerRegistry) {
this.adapterTransferProcessService = adapterTransferProcessService;
this.jsonLdService = jsonLdService;
Expand All @@ -65,4 +78,36 @@ public JsonObject initiateEdrNegotiation(JsonObject requestObject) {
.compose(jsonLdService::compact)
.orElseThrow(f -> new EdcException("Error creating response body: " + f.getFailureDetail()));
}

@GET
@Path("/{id}")
@Override
public JsonObject getEdr(@PathParam("id") String transferProcessId) {
var edr = adapterTransferProcessService.findByTransferProcessId(transferProcessId).orElseThrow(exceptionMapper(EndpointDataReference.class, transferProcessId));

return transformerRegistry.transform(EndpointDataAddressConstants.from(edr), JsonObject.class)
.compose(jsonLdService::compact)
.orElseThrow(f -> new EdcException("Error creating response body: " + f.getFailureDetail()));
}

@GET
@Override
public List<JsonObject> queryEdrs(@QueryParam("assetId") String assetId, @QueryParam("agreementId") String agreementId) {
if (assetId == null && agreementId == null) {
throw new InvalidRequestException("At least one of this query parameter is required [assetId,agreementId]");
}
return adapterTransferProcessService.findByAssetAndAgreement(assetId, agreementId)
.orElseThrow(exceptionMapper(EndpointDataReferenceEntry.class))
.stream()
.map(edrCached -> transformerRegistry.transform(edrCached, JsonObject.class)
.compose(jsonLdService::compact))
.peek(this::logIfError)
.filter(Result::succeeded)
.map(Result::getContent)
.collect(Collectors.toList());
}

private void logIfError(Result<?> result) {
result.onFailure(f -> monitor.warning(f.getFailureDetail()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@
import java.util.List;

import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.TX_NAMESPACE;

public class NegotiateEdrRequestDto {
public static final String TYPE = EDC_NAMESPACE + "NegotiateEdrRequestDto";
public static final String CONNECTOR_ADDRESS = EDC_NAMESPACE + "connectorAddress";
public static final String PROTOCOL = EDC_NAMESPACE + "protocol";
public static final String CONNECTOR_ID = EDC_NAMESPACE + "connectorId";
public static final String PROVIDER_ID = EDC_NAMESPACE + "providerId";
public static final String OFFER = EDC_NAMESPACE + "offer";
public static final String CALLBACK_ADDRESSES = EDC_NAMESPACE + "callbackAddresses";

public static final String EDR_REQUEST_DTO_TYPE = TX_NAMESPACE + "NegotiateEdrRequestDto";
public static final String EDR_REQUEST_DTO_CONNECTOR_ADDRESS = EDC_NAMESPACE + "connectorAddress";
public static final String EDR_REQUEST_DTO_PROTOCOL = EDC_NAMESPACE + "protocol";
public static final String EDR_REQUEST_DTO_CONNECTOR_ID = EDC_NAMESPACE + "connectorId";
public static final String EDR_REQUEST_DTO_PROVIDER_ID = EDC_NAMESPACE + "providerId";
public static final String EDR_REQUEST_DTO_OFFER = EDC_NAMESPACE + "offer";
public static final String EDR_REQUEST_DTO_CALLBACK_ADDRESSES = EDC_NAMESPACE + "callbackAddresses";

@NotBlank(message = "connectorAddress is mandatory")
private String connectorAddress;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.api.cp.adapter.transform;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import org.eclipse.edc.jsonld.spi.transformer.AbstractJsonLdTransformer;
import org.eclipse.edc.transform.spi.TransformerContext;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE;
import static org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry.EDR_ENTRY_AGREEMENT_ID;
import static org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry.EDR_ENTRY_ASSET_ID;
import static org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry.EDR_ENTRY_TRANSFER_PROCESS_ID;
import static org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry.EDR_ENTRY_TYPE;


public class JsonObjectFromEndpointDataReferenceEntryTransformer extends AbstractJsonLdTransformer<EndpointDataReferenceEntry, JsonObject> {

public JsonObjectFromEndpointDataReferenceEntryTransformer() {
super(EndpointDataReferenceEntry.class, JsonObject.class);
}

@Override
public @Nullable JsonObject transform(@NotNull EndpointDataReferenceEntry dto, @NotNull TransformerContext context) {
return Json.createObjectBuilder()
.add(TYPE, EDR_ENTRY_TYPE)
.add(EDR_ENTRY_AGREEMENT_ID, dto.getAgreementId())
.add(EDR_ENTRY_TRANSFER_PROCESS_ID, dto.getTransferProcessId())
.add(EDR_ENTRY_ASSET_ID, dto.getAssetId())
.build();
}


}
Loading

0 comments on commit 7f24bcb

Please sign in to comment.