Skip to content

Commit

Permalink
Fix: Backport Add Persistence for DataplaneSelector. (#1498)
Browse files Browse the repository at this point in the history
* Feat: Backport Add Persistence for dataplaneselector.

* adds dataplane self-registration extension

* Fix ut's

* Fix dataplane-cloud runtime exclude

* deps

* yaml formatting

* Helm charts.

* Update DEPENDENCIES file to latest gen.

* trigger CI

* Change from PR

Co-authored-by: Enrico Risa <[email protected]>

* trigger CI

---------

Co-authored-by: Rafael Magalhaes <[email protected]>
Co-authored-by: Enrico Risa <[email protected]>
  • Loading branch information
3 people authored Aug 22, 2024
1 parent 195e7e5 commit c4bc7a6
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 7 deletions.
2 changes: 1 addition & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,12 @@ maven/mavencentral/org.eclipse.edc/data-plane-http-oauth2-core/0.7.2, Apache-2.0
maven/mavencentral/org.eclipse.edc/data-plane-http-oauth2/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-http-spi/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-http/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-instance-store-sql/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-public-api-v2/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-selector-client/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-selector-control-api/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-selector-core/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-selector-spi/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-self-registration/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-signaling-api/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-signaling-client/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-signaling-transform/0.7.2, Apache-2.0, approved, technology.edc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ spec:
- name: "EDC_DATASOURCE_EDR_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

# see extension https://github.com/eclipse-edc/Connector/tree/main/extensions/data-plane-selector/store/sql/data-plane-instance-store-sql
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_NAME"
value: "dataplaneinstance"
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_USER"
value: {{ .Values.postgresql.auth.username | required ".Values.postgresql.auth.username is required" | quote }}
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_PASSWORD"
value: {{ .Values.postgresql.auth.password | required ".Values.postgresql.auth.password is required" | quote }}
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

#############################
## IATP / STS / DIM CONFIG ##
#############################
Expand Down
10 changes: 10 additions & 0 deletions charts/tractusx-connector/templates/deployment-controlplane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ spec:
- name: "EDC_DATASOURCE_EDR_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

# see extension https://github.com/eclipse-edc/Connector/tree/main/extensions/data-plane-selector/store/sql/data-plane-instance-store-sql
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_NAME"
value: "dataplaneinstance"
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_USER"
value: {{ .Values.postgresql.auth.username | required ".Values.postgresql.auth.username is required" | quote }}
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_PASSWORD"
value: {{ .Values.postgresql.auth.password | required ".Values.postgresql.auth.password is required" | quote }}
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

#############################
## IATP / STS / DIM CONFIG ##
#############################
Expand Down
2 changes: 1 addition & 1 deletion edc-dataplane/edc-dataplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
runtimeOnly(project(":edc-extensions:dataplane:dataplane-proxy:edc-dataplane-proxy-consumer-api"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-core"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-api"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-self-registration"))

runtimeOnly(libs.edc.jsonld) // needed by the DataPlaneSignalingApi
runtimeOnly(libs.edc.core.did) // for the DID Public Key Resolver
Expand All @@ -43,7 +44,6 @@ dependencies {
runtimeOnly(libs.edc.controlplane.apiclient)

runtimeOnly(libs.edc.data.plane.selector.client)
runtimeOnly(libs.edc.data.plane.self.registration)
runtimeOnly(libs.edc.dpf.api.control)
runtimeOnly(libs.edc.dpf.api.signaling)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/


plugins {
`java-library`
}

dependencies {
implementation(libs.edc.spi.web)
implementation(libs.edc.spi.dataplane.selector)

testImplementation(libs.edc.junit)
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.registration;

import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.health.HealthCheckResult;
import org.eclipse.edc.spi.system.health.HealthCheckService;
import org.eclipse.edc.spi.system.health.LivenessProvider;
import org.eclipse.edc.spi.system.health.ReadinessProvider;
import org.eclipse.edc.spi.system.health.StartupStatusProvider;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;
import org.jetbrains.annotations.NotNull;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH;
import static org.eclipse.tractusx.edc.dataplane.registration.DataplaneSelfRegistrationExtension.NAME;

@Extension(NAME)
public class DataplaneSelfRegistrationExtension implements ServiceExtension {

private static final boolean DEFAULT_SELF_UNREGISTRATION = false;
public static final String NAME = "Dataplane Self Registration";
@Setting(value = "Enable data-plane un-registration at shutdown (not suggested for clustered environments)", type = "boolean", defaultValue = DEFAULT_SELF_UNREGISTRATION + "")
static final String SELF_UNREGISTRATION = "edc.data.plane.self.unregistration";
private final AtomicBoolean isRegistered = new AtomicBoolean(false);
private final AtomicReference<String> registrationError = new AtomicReference<>("Data plane self registration not complete");
@Inject
private DataPlaneSelectorService dataPlaneSelectorService;
@Inject
private ControlApiUrl controlApiUrl;
@Inject
private PipelineService pipelineService;
@Inject
private PublicEndpointGeneratorService publicEndpointGeneratorService;
@Inject
private HealthCheckService healthCheckService;

private ServiceExtensionContext context;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
this.context = context;
}

@Override
public void start() {
var transferTypes = Stream.concat(
toTransferTypes(PULL, publicEndpointGeneratorService.supportedDestinationTypes()),
toTransferTypes(PUSH, pipelineService.supportedSinkTypes())
);

var instance = DataPlaneInstance.Builder.newInstance()
.id(context.getRuntimeId())
.url(controlApiUrl.get().toString() + "/v1/dataflows")
.allowedSourceTypes(pipelineService.supportedSourceTypes())
.allowedDestTypes(pipelineService.supportedSinkTypes())
.allowedTransferType(transferTypes.collect(toSet()))
.build();

var monitor = context.getMonitor().withPrefix("DataPlaneHealthCheck");
var check = new DataPlaneHealthCheck();
healthCheckService.addReadinessProvider(check);
healthCheckService.addLivenessProvider(check);
healthCheckService.addStartupStatusProvider(check);

monitor.debug("Initiate data plane registration.");
dataPlaneSelectorService.addInstance(instance)
.onSuccess(it -> {
monitor.info("data plane registered to control plane");
isRegistered.set(true);
})
.onFailure(f -> registrationError.set(f.getFailureDetail()))
.orElseThrow(f -> new EdcException("Cannot register data plane to the control plane: " + f.getFailureDetail()));
}

@Override
public void shutdown() {
if (context.getConfig().getBoolean(SELF_UNREGISTRATION, DEFAULT_SELF_UNREGISTRATION)) {
dataPlaneSelectorService.unregister(context.getRuntimeId())
.onSuccess(it -> context.getMonitor().info("data plane successfully unregistered"))
.onFailure(failure -> context.getMonitor().severe("error during data plane de-registration. %s: %s"
.formatted(failure.getReason(), failure.getFailureDetail())));
}
}

private @NotNull Stream<String> toTransferTypes(FlowType pull, Set<String> types) {
return types.stream().map(it -> "%s-%s".formatted(it, pull));
}

private class DataPlaneHealthCheck implements LivenessProvider, ReadinessProvider, StartupStatusProvider {

@Override
public HealthCheckResult get() {
return HealthCheckResult.Builder.newInstance()
.component(NAME)
.success(isRegistered.get(), registrationError.get())
.build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#################################################################################
# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
#################################################################################

org.eclipse.tractusx.edc.dataplane.registration.DataplaneSelfRegistrationExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.registration;

import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.spi.system.health.HealthCheckService;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.tractusx.edc.dataplane.registration.DataplaneSelfRegistrationExtension.SELF_UNREGISTRATION;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(DependencyInjectionExtension.class)
class DataplaneSelfRegistrationExtensionTest {

private final DataPlaneSelectorService dataPlaneSelectorService = mock();
private final ControlApiUrl controlApiUrl = mock();
private final PipelineService pipelineService = mock();
private final PublicEndpointGeneratorService publicEndpointGeneratorService = mock();
private final HealthCheckService healthCheckService = mock();

@BeforeEach
void setUp(ServiceExtensionContext context) {
context.registerService(DataPlaneSelectorService.class, dataPlaneSelectorService);
context.registerService(ControlApiUrl.class, controlApiUrl);
context.registerService(PipelineService.class, pipelineService);
context.registerService(PublicEndpointGeneratorService.class, publicEndpointGeneratorService);
var monitor = mock(Monitor.class);
when(monitor.withPrefix(anyString())).thenReturn(monitor);
context.registerService(Monitor.class, monitor);
context.registerService(HealthCheckService.class, healthCheckService);
}

@Test
void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) throws MalformedURLException {
when(context.getRuntimeId()).thenReturn("runtimeId");
when(controlApiUrl.get()).thenReturn(URI.create("http://control/api/url"));
when(pipelineService.supportedSinkTypes()).thenReturn(Set.of("sinkType", "anotherSinkType"));
when(pipelineService.supportedSourceTypes()).thenReturn(Set.of("sourceType", "anotherSourceType"));
when(publicEndpointGeneratorService.supportedDestinationTypes()).thenReturn(Set.of("pullDestType", "anotherPullDestType"));
when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.success());

extension.initialize(context);
extension.start();

var captor = ArgumentCaptor.forClass(DataPlaneInstance.class);
verify(dataPlaneSelectorService).addInstance(captor.capture());
var dataPlaneInstance = captor.getValue();
assertThat(dataPlaneInstance.getId()).isEqualTo("runtimeId");
assertThat(dataPlaneInstance.getUrl()).isEqualTo(new URL("http://control/api/url/v1/dataflows"));
assertThat(dataPlaneInstance.getAllowedSourceTypes()).containsExactlyInAnyOrder("sourceType", "anotherSourceType");
assertThat(dataPlaneInstance.getAllowedDestTypes()).containsExactlyInAnyOrder("sinkType", "anotherSinkType");
assertThat(dataPlaneInstance.getAllowedTransferTypes())
.containsExactlyInAnyOrder("pullDestType-PULL", "anotherPullDestType-PULL", "sinkType-PUSH", "anotherSinkType-PUSH");

verify(healthCheckService).addStartupStatusProvider(any());
verify(healthCheckService).addLivenessProvider(any());
verify(healthCheckService).addReadinessProvider(any());
}

@Test
void shouldNotStart_whenRegistrationFails(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) {
when(controlApiUrl.get()).thenReturn(URI.create("http://control/api/url"));
when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.conflict("cannot register"));

extension.initialize(context);

assertThatThrownBy(extension::start).isInstanceOf(EdcException.class);
}

@Test
void shouldNotUnregisterInstanceAtShutdown(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) {
when(context.getRuntimeId()).thenReturn("runtimeId");
when(dataPlaneSelectorService.unregister(any())).thenReturn(ServiceResult.success());
extension.initialize(context);

extension.shutdown();

verify(dataPlaneSelectorService, never()).unregister(any());
}

@Test
void shouldUnregisterInstanceAtShutdown_whenConfigured(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) {
when(context.getRuntimeId()).thenReturn("runtimeId");
when(context.getConfig()).thenReturn(ConfigFactory.fromMap(Map.of(SELF_UNREGISTRATION, "true")));
when(dataPlaneSelectorService.unregister(any())).thenReturn(ServiceResult.success());
extension.initialize(context);

extension.shutdown();

verify(dataPlaneSelectorService).unregister("runtimeId");
}
}
Loading

0 comments on commit c4bc7a6

Please sign in to comment.