Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Backport Add Persistence for DataplaneSelector. #1498

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,12 @@ maven/mavencentral/org.eclipse.edc/data-plane-http-oauth2-core/0.7.2, Apache-2.0
maven/mavencentral/org.eclipse.edc/data-plane-http-oauth2/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-http-spi/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-http/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-instance-store-sql/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-public-api-v2/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-selector-client/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-selector-control-api/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-selector-core/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-selector-spi/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-self-registration/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-signaling-api/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-signaling-client/0.7.2, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/data-plane-signaling-transform/0.7.2, Apache-2.0, approved, technology.edc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ spec:
- name: "EDC_DATASOURCE_EDR_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

# see extension https://github.com/eclipse-edc/Connector/tree/main/extensions/data-plane-selector/store/sql/data-plane-instance-store-sql
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_NAME"
value: "dataplaneinstance"
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_USER"
value: {{ .Values.postgresql.auth.username | required ".Values.postgresql.auth.username is required" | quote }}
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_PASSWORD"
value: {{ .Values.postgresql.auth.password | required ".Values.postgresql.auth.password is required" | quote }}
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

#############################
## IATP / STS / DIM CONFIG ##
#############################
Expand Down
10 changes: 10 additions & 0 deletions charts/tractusx-connector/templates/deployment-controlplane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ spec:
- name: "EDC_DATASOURCE_EDR_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

# see extension https://github.com/eclipse-edc/Connector/tree/main/extensions/data-plane-selector/store/sql/data-plane-instance-store-sql
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_NAME"
value: "dataplaneinstance"
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_USER"
value: {{ .Values.postgresql.auth.username | required ".Values.postgresql.auth.username is required" | quote }}
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_PASSWORD"
value: {{ .Values.postgresql.auth.password | required ".Values.postgresql.auth.password is required" | quote }}
- name: "EDC_DATASOURCE_DATAPLANEINSTANCE_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

#############################
## IATP / STS / DIM CONFIG ##
#############################
Expand Down
2 changes: 1 addition & 1 deletion edc-dataplane/edc-dataplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
runtimeOnly(project(":edc-extensions:dataplane:dataplane-proxy:edc-dataplane-proxy-consumer-api"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-core"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-api"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-self-registration"))

runtimeOnly(libs.edc.jsonld) // needed by the DataPlaneSignalingApi
runtimeOnly(libs.edc.core.did) // for the DID Public Key Resolver
Expand All @@ -43,7 +44,6 @@ dependencies {
runtimeOnly(libs.edc.controlplane.apiclient)

runtimeOnly(libs.edc.data.plane.selector.client)
runtimeOnly(libs.edc.data.plane.self.registration)
runtimeOnly(libs.edc.dpf.api.control)
runtimeOnly(libs.edc.dpf.api.signaling)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/


plugins {
`java-library`
}

dependencies {
implementation(libs.edc.spi.web)
implementation(libs.edc.spi.dataplane.selector)

testImplementation(libs.edc.junit)
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.registration;

import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.health.HealthCheckResult;
import org.eclipse.edc.spi.system.health.HealthCheckService;
import org.eclipse.edc.spi.system.health.LivenessProvider;
import org.eclipse.edc.spi.system.health.ReadinessProvider;
import org.eclipse.edc.spi.system.health.StartupStatusProvider;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;
import org.jetbrains.annotations.NotNull;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH;
import static org.eclipse.tractusx.edc.dataplane.registration.DataplaneSelfRegistrationExtension.NAME;

@Extension(NAME)
public class DataplaneSelfRegistrationExtension implements ServiceExtension {

private static final boolean DEFAULT_SELF_UNREGISTRATION = false;
public static final String NAME = "Dataplane Self Registration";
@Setting(value = "Enable data-plane un-registration at shutdown (not suggested for clustered environments)", type = "boolean", defaultValue = DEFAULT_SELF_UNREGISTRATION + "")
static final String SELF_UNREGISTRATION = "edc.data.plane.self.unregistration";
private final AtomicBoolean isRegistered = new AtomicBoolean(false);
private final AtomicReference<String> registrationError = new AtomicReference<>("Data plane self registration not complete");
@Inject
private DataPlaneSelectorService dataPlaneSelectorService;
@Inject
private ControlApiUrl controlApiUrl;
@Inject
private PipelineService pipelineService;
@Inject
private PublicEndpointGeneratorService publicEndpointGeneratorService;
@Inject
private HealthCheckService healthCheckService;

private ServiceExtensionContext context;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
this.context = context;
}

@Override
public void start() {
var transferTypes = Stream.concat(
toTransferTypes(PULL, publicEndpointGeneratorService.supportedDestinationTypes()),
toTransferTypes(PUSH, pipelineService.supportedSinkTypes())
);

var instance = DataPlaneInstance.Builder.newInstance()
.id(context.getRuntimeId())
.url(controlApiUrl.get().toString() + "/v1/dataflows")
.allowedSourceTypes(pipelineService.supportedSourceTypes())
.allowedDestTypes(pipelineService.supportedSinkTypes())
.allowedTransferType(transferTypes.collect(toSet()))
.build();

var monitor = context.getMonitor().withPrefix("DataPlaneHealthCheck");
var check = new DataPlaneHealthCheck();
healthCheckService.addReadinessProvider(check);
healthCheckService.addLivenessProvider(check);
healthCheckService.addStartupStatusProvider(check);

monitor.debug("Initiate data plane registration.");
dataPlaneSelectorService.addInstance(instance)
.onSuccess(it -> {
monitor.info("data plane registered to control plane");
isRegistered.set(true);
})
.onFailure(f -> registrationError.set(f.getFailureDetail()))
.orElseThrow(f -> new EdcException("Cannot register data plane to the control plane: " + f.getFailureDetail()));
}

@Override
public void shutdown() {
if (context.getConfig().getBoolean(SELF_UNREGISTRATION, DEFAULT_SELF_UNREGISTRATION)) {
dataPlaneSelectorService.unregister(context.getRuntimeId())
.onSuccess(it -> context.getMonitor().info("data plane successfully unregistered"))
.onFailure(failure -> context.getMonitor().severe("error during data plane de-registration. %s: %s"
.formatted(failure.getReason(), failure.getFailureDetail())));
}
}

private @NotNull Stream<String> toTransferTypes(FlowType pull, Set<String> types) {
return types.stream().map(it -> "%s-%s".formatted(it, pull));
}

private class DataPlaneHealthCheck implements LivenessProvider, ReadinessProvider, StartupStatusProvider {

@Override
public HealthCheckResult get() {
return HealthCheckResult.Builder.newInstance()
.component(NAME)
.success(isRegistered.get(), registrationError.get())
.build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#################################################################################
# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
#################################################################################

org.eclipse.tractusx.edc.dataplane.registration.DataplaneSelfRegistrationExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.dataplane.registration;

import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.spi.system.health.HealthCheckService;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.tractusx.edc.dataplane.registration.DataplaneSelfRegistrationExtension.SELF_UNREGISTRATION;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(DependencyInjectionExtension.class)
class DataplaneSelfRegistrationExtensionTest {

private final DataPlaneSelectorService dataPlaneSelectorService = mock();
private final ControlApiUrl controlApiUrl = mock();
private final PipelineService pipelineService = mock();
private final PublicEndpointGeneratorService publicEndpointGeneratorService = mock();
private final HealthCheckService healthCheckService = mock();

@BeforeEach
void setUp(ServiceExtensionContext context) {
context.registerService(DataPlaneSelectorService.class, dataPlaneSelectorService);
context.registerService(ControlApiUrl.class, controlApiUrl);
context.registerService(PipelineService.class, pipelineService);
context.registerService(PublicEndpointGeneratorService.class, publicEndpointGeneratorService);
var monitor = mock(Monitor.class);
when(monitor.withPrefix(anyString())).thenReturn(monitor);
context.registerService(Monitor.class, monitor);
context.registerService(HealthCheckService.class, healthCheckService);
}

@Test
void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) throws MalformedURLException {
when(context.getRuntimeId()).thenReturn("runtimeId");
when(controlApiUrl.get()).thenReturn(URI.create("http://control/api/url"));
when(pipelineService.supportedSinkTypes()).thenReturn(Set.of("sinkType", "anotherSinkType"));
when(pipelineService.supportedSourceTypes()).thenReturn(Set.of("sourceType", "anotherSourceType"));
when(publicEndpointGeneratorService.supportedDestinationTypes()).thenReturn(Set.of("pullDestType", "anotherPullDestType"));
when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.success());

extension.initialize(context);
extension.start();

var captor = ArgumentCaptor.forClass(DataPlaneInstance.class);
verify(dataPlaneSelectorService).addInstance(captor.capture());
var dataPlaneInstance = captor.getValue();
assertThat(dataPlaneInstance.getId()).isEqualTo("runtimeId");
assertThat(dataPlaneInstance.getUrl()).isEqualTo(new URL("http://control/api/url/v1/dataflows"));
assertThat(dataPlaneInstance.getAllowedSourceTypes()).containsExactlyInAnyOrder("sourceType", "anotherSourceType");
assertThat(dataPlaneInstance.getAllowedDestTypes()).containsExactlyInAnyOrder("sinkType", "anotherSinkType");
assertThat(dataPlaneInstance.getAllowedTransferTypes())
.containsExactlyInAnyOrder("pullDestType-PULL", "anotherPullDestType-PULL", "sinkType-PUSH", "anotherSinkType-PUSH");

verify(healthCheckService).addStartupStatusProvider(any());
verify(healthCheckService).addLivenessProvider(any());
verify(healthCheckService).addReadinessProvider(any());
}

@Test
void shouldNotStart_whenRegistrationFails(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) {
when(controlApiUrl.get()).thenReturn(URI.create("http://control/api/url"));
when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.conflict("cannot register"));

extension.initialize(context);

assertThatThrownBy(extension::start).isInstanceOf(EdcException.class);
}

@Test
void shouldNotUnregisterInstanceAtShutdown(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) {
when(context.getRuntimeId()).thenReturn("runtimeId");
when(dataPlaneSelectorService.unregister(any())).thenReturn(ServiceResult.success());
extension.initialize(context);

extension.shutdown();

verify(dataPlaneSelectorService, never()).unregister(any());
}

@Test
void shouldUnregisterInstanceAtShutdown_whenConfigured(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) {
when(context.getRuntimeId()).thenReturn("runtimeId");
when(context.getConfig()).thenReturn(ConfigFactory.fromMap(Map.of(SELF_UNREGISTRATION, "true")));
when(dataPlaneSelectorService.unregister(any())).thenReturn(ServiceResult.success());
extension.initialize(context);

extension.shutdown();

verify(dataPlaneSelectorService).unregister("runtimeId");
}
}
Loading
Loading