Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(E2E): adds E2E test for CPA + DataPlaneProxy #420

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand All @@ -45,7 +46,7 @@ public class InMemoryEndpointDataReferenceCache implements EndpointDataReference
private final LockManager lockManager;

private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter();

private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;
Expand All @@ -54,7 +55,7 @@ public class InMemoryEndpointDataReferenceCache implements EndpointDataReference
public InMemoryEndpointDataReferenceCache() {
lockManager = new LockManager(new ReentrantReadWriteLock());
entriesByAssetId = new HashMap<>();
entriesByEdrId = new HashMap<>();
entriesByEdrId = new ConcurrentHashMap<>();
edrsByTransferProcessId = new HashMap<>();
}

Expand Down
4 changes: 4 additions & 0 deletions edc-dataplane/edc-dataplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ plugins {
}

dependencies {
runtimeOnly(project(":core:edr-cache-core"))
runtimeOnly(project(":edc-extensions:observability-api-customization"))
runtimeOnly(project(":edc-dataplane:edc-dataplane-proxy-consumer-api"))
runtimeOnly(project(":edc-dataplane:edc-dataplane-proxy-provider-api"))
runtimeOnly(project(":edc-dataplane:edc-dataplane-proxy-provider-core"))

runtimeOnly(libs.edc.config.filesystem)
runtimeOnly(libs.edc.dpf.awss3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ plugins {
dependencies {
implementation(project(":edc-dataplane:edc-dataplane-base"))
implementation(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(project(":edc-extensions:edr-cache-sql"))
}

tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ dependencies {
implementation(libs.edc.spi.http)
implementation(libs.edc.util)
implementation(libs.edc.dpf.framework)
implementation(libs.edc.api.observability)
implementation(libs.edc.dpf.util)
implementation(libs.edc.ext.http)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ dependencies {
implementation(libs.edc.spi.http)
implementation(libs.edc.util)
implementation(libs.edc.dpf.framework)
implementation(libs.edc.api.observability)
implementation(libs.edc.dpf.util)
implementation(libs.edc.ext.http)
implementation(libs.edc.spi.jwt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ dependencies {

implementation(libs.edc.util)
implementation(libs.edc.dpf.framework)
implementation(libs.edc.api.observability)
implementation(libs.edc.dpf.util)
implementation(libs.edc.jwt.core)
implementation(libs.edc.ext.http)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import jakarta.json.JsonObject;
import org.eclipse.edc.jsonld.TitaniumJsonLd;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.tractusx.edc.api.cp.adapter.dto.NegotiateEdrRequestDto;

Expand Down Expand Up @@ -62,4 +63,7 @@ public static JsonObject createCallback(String url, boolean transactional, Set<S
.build();
}

public static <E extends Event> ReceivedEvent createEvent(Class<E> klass) {
return ReceivedEvent.Builder.newInstance().type(klass.getSimpleName()).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.helpers;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;


@JsonIgnoreProperties(ignoreUnknown = true)
public class ReceivedEvent {
private String type;

public String getType() {
return type;
}

@Override
public String toString() {
return "ReceivedEvent{" +
"type='" + type + '\'' +
'}';
}

public static class Builder {
private final ReceivedEvent event;

private Builder(ReceivedEvent event) {
this.event = event;
}

public static Builder newInstance() {
return new Builder(new ReceivedEvent());
}

public Builder type(String type) {
this.event.type = type;
return this;
}

public ReceivedEvent build() {
return event;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.tractusx.edc.lifecycle;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.restassured.response.Response;
import io.restassured.specification.RequestSpecification;
import jakarta.json.Json;
import jakarta.json.JsonArray;
Expand Down Expand Up @@ -53,28 +54,35 @@

public class Participant {

private static final String PROXY_SUBPATH = "proxy/aas/request";

private final String managementUrl;
private final String apiKey;
private final String dspEndpoint;

private final String gatewayEndpoint;

private final String runtimeName;
private final String bpn;
private final String backend;
private final JsonLd jsonLd;
private final Duration timeout = Duration.ofSeconds(30);

private final ObjectMapper objectMapper = JacksonJsonLd.createObjectMapper();
private final String proxyUrl;

public Participant(String runtimeName, String bpn, Map<String, String> properties) {
this.managementUrl = URI.create(format("http://localhost:%s%s", properties.get("web.http.management.port"), properties.get("web.http.management.path"))).toString();
this.dspEndpoint = URI.create(format("http://localhost:%s%s", properties.get("web.http.protocol.port"), properties.get("web.http.protocol.path"))).toString();
this.apiKey = properties.get("edc.api.auth.key");
this.gatewayEndpoint = URI.create(format("http://localhost:%s/api/gateway", properties.get("web.http.port"))).toString();
this.proxyUrl = URI.create(format("http://localhost:%s", properties.get("tx.dpf.consumer.proxy.port"))).toString();
this.bpn = bpn;
this.runtimeName = runtimeName;
this.backend = properties.get("edc.receiver.http.dynamic.endpoint");
jsonLd = new TitaniumJsonLd(mock(Monitor.class));
}


/**
* Creates an asset with the given ID and props using the participant's Data Management API
*/
Expand Down Expand Up @@ -150,7 +158,7 @@ public String negotiateContract(Participant other, String assetId) {
return response.extract().jsonPath().getString(ID);
}

public void negotiateEdr(Participant other, String assetId, JsonArray callbacks) {
public String negotiateEdr(Participant other, String assetId, JsonArray callbacks) {
var dataset = getDatasetForAsset(other, assetId);
assertThat(dataset).withFailMessage("Catalog received from " + other.runtimeName + " was empty!").isNotEmpty();

Expand All @@ -169,6 +177,7 @@ public void negotiateEdr(Participant other, String assetId, JsonArray callbacks)
var body = response.extract().body().asString();
assertThat(response.extract().statusCode()).withFailMessage(body).isBetween(200, 299);

return response.extract().jsonPath().getString(ID);
}

public String getNegotiationState(String negotiationId) {
Expand All @@ -180,7 +189,6 @@ public String getNegotiationState(String negotiationId) {
.extract().body().jsonPath().getString("'edc:state'");
}


public String getContractAgreementId(String negotiationId) {
return getContractNegotiationField(negotiationId, "contractAgreementId");
}
Expand All @@ -206,7 +214,7 @@ public JsonObject getEdr(String transferProcessId) {
.as(JsonObject.class);
}

public JsonArray getEdrEntries(String assetId) {
public JsonArray getEdrEntriesByAssetId(String assetId) {
return baseRequest()
.when()
.get("/adapter/edrs?assetId={assetId}", assetId)
Expand All @@ -217,6 +225,17 @@ public JsonArray getEdrEntries(String assetId) {
.as(JsonArray.class);
}

public JsonArray getEdrEntriesByAgreementId(String agreementId) {
return baseRequest()
.when()
.get("/adapter/edrs?agreementId={agreementId}", agreementId)
.then()
.statusCode(200)
.extract()
.body()
.as(JsonArray.class);
}


/**
* Returns this participant's BusinessPartnerNumber (=BPN). This is constructed of the runtime name plus "-BPN"
Expand Down Expand Up @@ -309,6 +328,38 @@ public JsonArray getCatalogDatasets(Participant provider, JsonObject querySpec)
return datasetReference.get();
}

public String pullProxyDataByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId, "endpointUrl", format("%s/aas/test", provider.gatewayEndpoint));
return getProxyData(body);
}

public Response pullProxyDataResponseByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId, "endpointUrl", format("%s/aas/test", provider.gatewayEndpoint));
return proxyRequest(body);
}

public String pullProxyDataByTransferProcessId(Participant provider, String transferProcessId) {
var body = Map.of("transferProcessId", transferProcessId,
"endpointUrl", format("%s/aas/test", provider.gatewayEndpoint));
return getProxyData(body);

}

private String getProxyData(Map<String, String> body) {
return proxyRequest(body)
.then()
.assertThat().statusCode(200)
.extract().body().asString();
}

private Response proxyRequest(Map<String, String> body) {
return given()
.baseUri(proxyUrl)
.contentType("application/json")
.body(body)
.post(PROXY_SUBPATH);
}

public JsonObject getDatasetForAsset(Participant provider, String assetId) {
var datasets = getCatalogDatasets(provider);
return datasets.stream()
Expand All @@ -318,7 +369,6 @@ public JsonObject getDatasetForAsset(Participant provider, String assetId) {
.orElseThrow(() -> new EdcException(format("No dataset for asset %s in the catalog", assetId)));
}


private RequestSpecification baseRequest() {
return given()
.baseUri(managementUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class TestRuntimeConfiguration {
public static final String SOKRATES_BPN = SOKRATES_NAME + BPN_SUFFIX;
public static final String PLATO_NAME = "PLATO";
public static final String PLATO_BPN = PLATO_NAME + BPN_SUFFIX;
public static final Integer PLATO_PROXIED_AAS_BACKEND_PORT = getFreePort();
static final String DSP_PATH = "/api/v1/dsp";
static final int PLATO_CONNECTOR_PORT = getFreePort();
static final int PLATO_MANAGEMENT_PORT = getFreePort();
Expand All @@ -46,8 +47,12 @@ public class TestRuntimeConfiguration {
static final String SOKRATES_PUBLIC_API_PORT = String.valueOf(getFreePort());
static final String PLATO_PUBLIC_API_PORT = String.valueOf(getFreePort());
static final String PLATO_DATAPLANE_CONTROL_PORT = String.valueOf(getFreePort());
static final String PLATO_DATAPLANE_PROXY_PORT = String.valueOf(getFreePort());
static final String SOKRATES_DATAPLANE_CONTROL_PORT = String.valueOf(getFreePort());

static final String SOKRATES_DATAPLANE_PROXY_PORT = String.valueOf(getFreePort());


public static Map<String, String> sokratesPostgresqlConfiguration() {
var baseConfiguration = sokratesConfiguration();
var postgresConfiguration = postgresqlConfiguration(SOKRATES_NAME.toLowerCase());
Expand All @@ -64,7 +69,7 @@ public static Map<String, String> platoPostgresqlConfiguration() {

public static Map<String, String> postgresqlConfiguration(String name) {
var jdbcUrl = jdbcUrl(name);
return new HashMap<String, String>() {
return new HashMap<>() {
{
put("edc.datasource.asset.name", "asset");
put("edc.datasource.asset.url", jdbcUrl);
Expand Down Expand Up @@ -113,6 +118,7 @@ public static Map<String, String> sokratesConfiguration() {
// embedded dataplane config
put("web.http.control.path", "/api/dataplane/control");
put("web.http.control.port", SOKRATES_DATAPLANE_CONTROL_PORT);
put("tx.dpf.consumer.proxy.port", SOKRATES_DATAPLANE_PROXY_PORT);
put("edc.dataplane.token.validation.endpoint", "http://localhost:" + SOKRATES_DATAPLANE_CONTROL_PORT + "/api/dataplane/control/token");
put("edc.dataplane.selector.httpplane.url", "http://localhost:" + SOKRATES_DATAPLANE_CONTROL_PORT + "/api/dataplane/control");
put("edc.dataplane.selector.httpplane.sourcetypes", "HttpData");
Expand All @@ -124,7 +130,7 @@ public static Map<String, String> sokratesConfiguration() {
}
};
}

public static Map<String, String> platoConfiguration() {
return new HashMap<>() {
{
Expand All @@ -143,13 +149,16 @@ public static Map<String, String> platoConfiguration() {
// embedded dataplane config
put("web.http.control.path", "/api/dataplane/control");
put("web.http.control.port", PLATO_DATAPLANE_CONTROL_PORT);
put("tx.dpf.consumer.proxy.port", PLATO_DATAPLANE_PROXY_PORT);
put("edc.dataplane.token.validation.endpoint", "http://localhost:" + PLATO_DATAPLANE_CONTROL_PORT + "/api/dataplane/control/token");
put("edc.dataplane.selector.httpplane.url", "http://localhost:" + PLATO_DATAPLANE_CONTROL_PORT + "/api/dataplane/control");
put("edc.dataplane.selector.httpplane.sourcetypes", "HttpData");
put("edc.dataplane.selector.httpplane.destinationtypes", "HttpProxy");
put("edc.dataplane.selector.httpplane.properties", "{\"publicApiUrl\":\"http://localhost:" + PLATO_PUBLIC_API_PORT + "/api/public\"}");
put("tractusx.businesspartnervalidation.log.agreement.validation", "true");
put("edc.agent.identity.key", "BusinessPartnerNumber");
put("tx.dpf.proxy.gateway.aas.proxied.path", "http://localhost:" + PLATO_PROXIED_AAS_BACKEND_PORT);
put("tx.dpf.proxy.gateway.aas.authorization.type", "none");
}
};
}
Expand Down
Loading