diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java index 9597b17aaad..8a4908c599d 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.function.Function; import static java.lang.String.format; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; @@ -51,18 +52,28 @@ public void register(int priority, DataFlowController controller) { @Override public @NotNull StatusResult initiate(TransferProcess transferProcess, Policy policy) { try { - return controllers.stream() - .sorted(Comparator.comparingInt(a -> -a.priority)) - .map(PrioritizedDataFlowController::controller) - .filter(controller -> controller.canHandle(transferProcess)) - .findFirst() - .map(controller -> controller.initiateFlow(transferProcess, policy)) - .orElseGet(() -> StatusResult.failure(FATAL_ERROR, controllerNotFound(transferProcess.getId()))); + return chooseControllerAndApply(transferProcess, controller -> controller.initiateFlow(transferProcess, policy)); } catch (Exception e) { return StatusResult.failure(FATAL_ERROR, runtimeException(transferProcess.getId(), e.getLocalizedMessage())); } } + @Override + public @NotNull StatusResult terminate(TransferProcess transferProcess) { + return chooseControllerAndApply(transferProcess, controller -> controller.terminate(transferProcess)); + } + + @NotNull + private StatusResult chooseControllerAndApply(TransferProcess transferProcess, Function> function) { + return controllers.stream() + .sorted(Comparator.comparingInt(a -> -a.priority)) + .map(PrioritizedDataFlowController::controller) + .filter(controller -> controller.canHandle(transferProcess)) + .findFirst() + .map(function) + .orElseGet(() -> StatusResult.failure(FATAL_ERROR, controllerNotFound(transferProcess.getId()))); + } + private String runtimeException(String id, String message) { return format("Unable to process transfer %s. Data flow controller throws an exception: %s", id, message); } diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java index 7e7aa6b09bb..3350eec92d5 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java @@ -127,7 +127,8 @@ protected StateMachineManager.Builder configureStateMachineManager(StateMachineM .processor(processConsumerTransfersInState(REQUESTING, this::processRequesting)) .processor(processProviderTransfersInState(STARTING, this::processStarting)) .processor(processConsumerTransfersInState(STARTED, this::processStarted)) - .processor(processTransfersInState(COMPLETING, this::processCompleting)) + .processor(processProviderTransfersInState(COMPLETING, this::processProviderCompleting)) + .processor(processConsumerTransfersInState(COMPLETING, this::processConsumerCompleting)) .processor(processTransfersInState(TERMINATING, this::processTerminating)) .processor(processTransfersInState(DEPROVISIONING, this::processDeprovisioning)); } @@ -324,27 +325,6 @@ private boolean processStarting(TransferProcess process) { .execute(description); } - @WithSpan - private void sendTransferStartMessage(TransferProcess process, DataFlowResponse dataFlowResponse, Policy policy) { - var message = TransferStartMessage.Builder.newInstance() - .processId(process.getCorrelationId()) - .protocol(process.getProtocol()) - .dataAddress(dataFlowResponse.getDataAddress()) - .counterPartyAddress(process.getConnectorAddress()) - .policy(policy) - .build(); - - var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress()); - - entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message)) - .entityRetrieve(id -> store.findById(id)) - .onSuccess((t, content) -> transitionToStarted(t)) - .onFailure((t, throwable) -> transitionToStarting(t)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute(description); - } - /** * Process STARTED transfer

if is completed or there's no checker and it's not managed, set to COMPLETE, * nothing otherwise. @@ -377,28 +357,30 @@ private Boolean checkCompletion(TransferProcess transferProcess) { } /** - * Process COMPLETING transfer

Send COMPLETED message to counter-part + * Process COMPLETING transfer

. Terminate data flow and send COMPLETED message to counter-part * * @param process the COMPLETING transfer fetched * @return if the transfer has been processed or not */ @WithSpan - private boolean processCompleting(TransferProcess process) { - var message = TransferCompletionMessage.Builder.newInstance() - .protocol(process.getProtocol()) - .counterPartyAddress(process.getConnectorAddress()) - .processId(process.getCorrelationId()) - .policy(policyArchive.findPolicyForContract(process.getContractId())) - .build(); + private boolean processProviderCompleting(TransferProcess process) { + return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.terminate(process)) + .onSuccess((p, c) -> sendTransferCompletionMessage(p)) + .onFatalError((p, failure) -> transitionToTerminating(p, failure.getFailureDetail())) + .onFailure((t, failure) -> transitionToCompleting(t)) + .onRetryExhausted((p, failure) -> transitionToTerminating(p, failure.getFailureDetail())) + .execute("Terminate data flow"); + } - var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress()); - return entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message)) - .entityRetrieve(id -> store.findById(id)) - .onSuccess((t, content) -> transitionToCompleted(t)) - .onFailure((t, throwable) -> transitionToCompleting(t)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute(description); + /** + * Process COMPLETING transfer

Send COMPLETED message to counter-part + * + * @param process the COMPLETING transfer fetched + * @return if the transfer has been processed or not + */ + @WithSpan + private boolean processConsumerCompleting(TransferProcess process) { + return sendTransferCompletionMessage(process); } /** @@ -455,6 +437,47 @@ private boolean processDeprovisioning(TransferProcess process) { .execute("deprovisioning"); } + @WithSpan + private void sendTransferStartMessage(TransferProcess process, DataFlowResponse dataFlowResponse, Policy policy) { + var message = TransferStartMessage.Builder.newInstance() + .processId(process.getCorrelationId()) + .protocol(process.getProtocol()) + .dataAddress(dataFlowResponse.getDataAddress()) + .counterPartyAddress(process.getConnectorAddress()) + .policy(policy) + .build(); + + var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress()); + + entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message)) + .entityRetrieve(id -> store.findById(id)) + .onSuccess((t, content) -> transitionToStarted(t)) + .onFailure((t, throwable) -> transitionToStarting(t)) + .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) + .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) + .execute(description); + } + + @WithSpan + private boolean sendTransferCompletionMessage(TransferProcess process) { + var message = TransferCompletionMessage.Builder.newInstance() + .protocol(process.getProtocol()) + .counterPartyAddress(process.getConnectorAddress()) + .processId(process.getCorrelationId()) + .policy(policyArchive.findPolicyForContract(process.getContractId())) + .build(); + + var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress()); + + return entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message)) + .entityRetrieve(id -> store.findById(id)) + .onSuccess((t, content) -> transitionToCompleted(t)) + .onFailure((t, throwable) -> transitionToCompleting(t)) + .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) + .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) + .execute(description); + } + private void handleResult(TransferProcess transferProcess, List> responses, ResponsesHandler> handler) { if (handler.handle(transferProcess, responses)) { update(transferProcess); diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java index b44ce501510..1c4115e11de 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -37,7 +38,7 @@ class DataFlowManagerImplTest { private final DataFlowManagerImpl manager = new DataFlowManagerImpl(); @Test - void should_initiate_flow_on_correct_controller() { + void initiate_shouldInitiateFlowOnCorrectController() { var controller = mock(DataFlowController.class); var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); var policy = Policy.Builder.newInstance().build(); @@ -54,7 +55,7 @@ void should_initiate_flow_on_correct_controller() { } @Test - void should_return_fatal_error_if_no_controller_can_handle_the_request() { + void initiate_shouldReturnFatalError_whenNoControllerCanHandleTheRequest() { var controller = mock(DataFlowController.class); var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); @@ -71,7 +72,7 @@ void should_return_fatal_error_if_no_controller_can_handle_the_request() { } @Test - void should_catch_exceptions_and_return_fatal_error() { + void initiate_shouldCatchExceptionsAndReturnFatalError() { var controller = mock(DataFlowController.class); var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); @@ -91,7 +92,7 @@ void should_catch_exceptions_and_return_fatal_error() { } @Test - void shouldChooseHighestPriorityController() { + void initiate_shouldChooseHighestPriorityController() { var highPriority = createDataFlowController(); var lowPriority = createDataFlowController(); manager.register(1, lowPriority); @@ -103,6 +104,23 @@ void shouldChooseHighestPriorityController() { verifyNoInteractions(lowPriority); } + @Test + void terminate_shouldChooseControllerAndTerminate() { + var controller = mock(DataFlowController.class); + var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); + var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + + when(controller.canHandle(any())).thenReturn(true); + when(controller.terminate(any())).thenReturn(StatusResult.success()); + manager.register(controller); + + var result = manager.terminate(transferProcess); + + assertThat(result).isSucceeded(); + verify(controller).terminate(transferProcess); + } + private DataFlowController createDataFlowController() { var dataFlowController = mock(DataFlowController.class); when(dataFlowController.canHandle(any())).thenReturn(true); diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java index 485ec96f108..34f58086b21 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java @@ -148,7 +148,6 @@ class TransferProcessManagerImplTest { @BeforeEach void setup() { when(protocolWebhook.url()).thenReturn(protocolWebhookUrl); - when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.success(createDataFlowResponse())); var observable = new TransferProcessObservableImpl(); observable.registerListener(listener); var entityRetryProcessConfiguration = new EntityRetryProcessConfiguration(RETRY_LIMIT, () -> new ExponentialWaitStrategy(0L)); @@ -625,15 +624,52 @@ void started_shouldBreakLeaseIfNotConsumer() { } @Test - void completing_shouldTransitionToCompleted_whenSendingMessageSucceed() { + void completing_provider_shouldTerminateDataTransferAndTransitionToCompleted() { var process = createTransferProcessBuilder(COMPLETING).dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); - when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); + when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success()); when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); manager.start(); await().untilAsserted(() -> { + verify(dataFlowManager).terminate(process); + var captor = ArgumentCaptor.forClass(TransferCompletionMessage.class); + verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); + var message = captor.getValue(); + assertThat(message.getProcessId()).isEqualTo("correlationId"); + verify(transferProcessStore, times(RETRY_LIMIT)).save(argThat(p -> p.getState() == COMPLETED.code())); + verify(listener).completed(process); + }); + } + + @Test + void completing_provider_whenDataFlowTerminationFailsAndRetriesNotExhausted_updatesStateCountForRetry() { + var process = createTransferProcess(COMPLETING).toBuilder().type(PROVIDER).build(); + when(dataFlowManager.terminate(any())).thenReturn(StatusResult.failure(ResponseStatus.ERROR_RETRY)); + when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); + + manager.start(); + + await().untilAsserted(() -> { + verify(transferProcessStore, times(RETRY_LIMIT)).save(argThat(p -> p.getState() == COMPLETING.code())); + }); + } + + @Test + void completing_consumer_shouldTransitionToCompletedAndNotTerminateDataTransfer() { + var process = createTransferProcessBuilder(COMPLETING).dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); + when(transferProcessStore.nextNotLeased(anyInt(), consumerStateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); + when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success()); + when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); + + manager.start(); + + await().untilAsserted(() -> { + verifyNoInteractions(dataFlowManager); var captor = ArgumentCaptor.forClass(TransferCompletionMessage.class); verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); var message = captor.getValue(); @@ -764,6 +800,8 @@ void dispatchFailure(TransferProcessStates starting, TransferProcessStates endin .thenReturn(List.of(transferProcess)).thenReturn(emptyList()); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(result); when(transferProcessStore.findById(transferProcess.getId())).thenReturn(transferProcess); + when(dataFlowManager.initiate(any(), any())).thenReturn(StatusResult.success(createDataFlowResponse())); + when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success()); manager.start(); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java index 4c9ed1fc2ac..a71defa5bc1 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java @@ -17,7 +17,6 @@ import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient; import org.eclipse.edc.connector.dataplane.framework.manager.DataPlaneManagerImpl; import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl; -import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceTransferServiceImpl; import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceRegistryImpl; import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; @@ -100,10 +99,9 @@ public void initialize(ServiceExtensionContext context) { var pipelineService = new PipelineServiceImpl(monitor); pipelineService.registerFactory(new OutputStreamDataSinkFactory()); // Added by default to support synchronous data transfer, i.e. pull data context.registerService(PipelineService.class, pipelineService); - var transferService = new PipelineServiceTransferServiceImpl(pipelineService); var transferServiceRegistry = new TransferServiceRegistryImpl(transferServiceSelectionStrategy); - transferServiceRegistry.registerTransferService(transferService); + transferServiceRegistry.registerTransferService(pipelineService); context.registerService(TransferServiceRegistry.class, transferServiceRegistry); var numThreads = context.getSetting(TRANSFER_THREADS, DEFAULT_TRANSFER_THREADS); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index 13f5d8565f8..50ec1ebdf8a 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -26,6 +26,7 @@ import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; import org.eclipse.edc.statemachine.Processor; @@ -42,6 +43,7 @@ import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED; import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; +import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; /** * Default data manager implementation. @@ -100,6 +102,29 @@ public DataFlowStates transferState(String processId) { .map(DataFlowStates::from).orElse(null); } + @Override + public StatusResult terminate(String dataFlowId) { + var result = store.findByIdAndLease(dataFlowId); + if (result.succeeded()) { + var dataFlow = result.getContent(); + var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest()); + + if (transferService == null) { + return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId)); + } + + var terminateResult = transferService.terminate(dataFlow); + if (terminateResult.failed()) { + return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail())); + } + dataFlow.transitToCompleted(); + store.save(dataFlow); + return StatusResult.success(); + } else { + return StatusResult.from(result).map(it -> null); + } + } + private boolean processReceived(DataFlow dataFlow) { var request = dataFlow.toRequest(); var transferService = transferServiceRegistry.resolveTransferService(request); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java index c3d80c7d8b3..96b15f205da 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImpl.java @@ -15,6 +15,7 @@ package org.eclipse.edc.connector.dataplane.framework.pipeline; import io.opentelemetry.instrumentation.annotations.WithSpan; +import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; @@ -28,7 +29,9 @@ import org.jetbrains.annotations.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import static java.lang.String.format; @@ -40,6 +43,7 @@ public class PipelineServiceImpl implements PipelineService { private final List sourceFactories = new ArrayList<>(); private final List sinkFactories = new ArrayList<>(); + private final Map sources = new HashMap<>(); private final Monitor monitor; public PipelineServiceImpl(Monitor monitor) { @@ -91,17 +95,8 @@ public CompletableFuture> transfer(DataFlowRequest request) { return noSinkFactory(request); } var source = sourceFactory.createSource(request); - var sink = sinkFactory.createSink(request); - monitor.debug(() -> format("Transferring from %s to %s.", request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType())); - return sink.transfer(source); - } + sources.put(request.getProcessId(), source); - @Override - public CompletableFuture> transfer(DataSource source, DataFlowRequest request) { - var sinkFactory = getSinkFactory(request); - if (sinkFactory == null) { - return noSinkFactory(request); - } var sink = sinkFactory.createSink(request); monitor.debug(() -> format("Transferring from %s to %s.", request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType())); return sink.transfer(source); @@ -114,10 +109,28 @@ public CompletableFuture> transfer(DataSink sink, DataFlowReq return noSourceFactory(request); } var source = sourceFactory.createSource(request); + sources.put(request.getProcessId(), source); + monitor.debug(() -> format("Transferring from %s to %s.", request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType())); return sink.transfer(source); } + @Override + public StreamResult terminate(DataFlow dataFlow) { + var source = sources.get(dataFlow.getId()); + if (source == null) { + return StreamResult.notFound(); + } else { + try { + source.close(); + sources.remove(dataFlow.getId()); + return StreamResult.success(); + } catch (Exception e) { + return StreamResult.error("Cannot terminate DataFlow %s: %s".formatted(dataFlow.getId(), e.getMessage())); + } + } + } + @Override public void registerFactory(DataSourceFactory factory) { sourceFactories.add(factory); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceTransferServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceTransferServiceImpl.java deleted file mode 100644 index 2144f7be50f..00000000000 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceTransferServiceImpl.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2022 Microsoft Corporation - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Microsoft Corporation - initial API and implementation - * - */ - -package org.eclipse.edc.connector.dataplane.framework.pipeline; - -import io.opentelemetry.instrumentation.annotations.WithSpan; -import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; -import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; -import org.eclipse.edc.connector.dataplane.spi.pipeline.TransferService; -import org.eclipse.edc.spi.result.Result; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; - -import java.util.concurrent.CompletableFuture; - -/** - * Implementation of {@link TransferService} that performs transfers using a {@link PipelineService}. - */ -public class PipelineServiceTransferServiceImpl implements TransferService { - private final PipelineService pipelineService; - - public PipelineServiceTransferServiceImpl(PipelineService pipelineService) { - this.pipelineService = pipelineService; - } - - @Override - public boolean canHandle(DataFlowRequest request) { - return pipelineService.canHandle(request); - } - - @Override - public Result validate(DataFlowRequest request) { - return pipelineService.validate(request); - } - - @WithSpan - @Override - public CompletableFuture> transfer(DataFlowRequest request) { - return pipelineService.transfer(request); - } -} diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index 3cbdeb543b3..19f637c7a12 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -21,6 +21,8 @@ import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.response.ResponseFailure; +import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.system.ExecutorInstrumentation; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; @@ -41,7 +43,10 @@ import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.NOTIFIED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; +import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY; +import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; import static org.mockito.AdditionalMatchers.aryEq; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -49,6 +54,7 @@ import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -102,6 +108,68 @@ void initiateDataFlow() { assertThat(dataFlow.getState()).isEqualTo(RECEIVED.code()); } + @Test + void terminate_shouldTerminateDataFlow() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.terminate(any())).thenReturn(StreamResult.success()); + + var result = manager.terminate("dataFlowId"); + + assertThat(result).isSucceeded(); + verify(store).save(argThat(d -> d.getState() == COMPLETED.code())); + verify(transferService).terminate(dataFlow); + } + + @Test + void terminate_shouldReturnFatalError_whenDataFlowDoesNotExist() { + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.notFound("not found")); + + var result = manager.terminate("dataFlowId"); + + assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); + verify(store, never()).save(any()); + verifyNoInteractions(transferService); + } + + @Test + void terminate_shouldReturnRetryError_whenEntityCannotBeLeased() { + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.alreadyLeased("already leased")); + + var result = manager.terminate("dataFlowId"); + + assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(ERROR_RETRY); + verify(store, never()).save(any()); + verifyNoInteractions(transferService); + } + + @Test + void terminate_shouldReturnFatalError_whenTransferServiceNotFound() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(null); + + var result = manager.terminate("dataFlowId"); + + assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); + verify(store, never()).save(any()); + verifyNoInteractions(transferService); + } + + @Test + void terminate_shouldReturnFatalError_whenDataFlowCannotBeTerminated() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.terminate(any())).thenReturn(StreamResult.error("cannot be terminated")); + + var result = manager.terminate("dataFlowId"); + + assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); + verify(store, never()).save(any()); + } + @Test void received_shouldStartTransferAndTransitionToCompleted_whenTransferSucceeds() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java index 2c24258bade..4a369aa52e6 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceImplTest.java @@ -14,31 +14,45 @@ package org.eclipse.edc.connector.dataplane.framework.pipeline; +import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.NOT_FOUND; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; class PipelineServiceImplTest { - Monitor monitor = mock(Monitor.class); + + Monitor monitor = mock(); PipelineServiceImpl service = new PipelineServiceImpl(monitor); DataFlowRequest request = DataFlowRequest.Builder.newInstance() .id("1") @@ -51,7 +65,8 @@ class PipelineServiceImplTest { DataSource source = mock(DataSource.class); DataSink sink = mock(DataSink.class); - { + @BeforeEach + void setUp() { service.registerFactory(sourceFactory); service.registerFactory(sinkFactory); } @@ -69,8 +84,61 @@ void transfer_invokesSink() { verify(sink).transfer(eq(source)); } + @Test + void terminate_shouldCloseDataSource() throws Exception { + var dataFlow = DataFlow.Builder.newInstance().id("dataFlowId") + .source(DataAddress.Builder.newInstance().type("source").build()) + .destination(DataAddress.Builder.newInstance().type("destination").build()) + .build(); + when(sourceFactory.canHandle(any())).thenReturn(true); + when(sourceFactory.createSource(any())).thenReturn(source); + when(sinkFactory.canHandle(any())).thenReturn(true); + when(sinkFactory.createSink(any())).thenReturn(sink); + when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success())); + + var future = service.transfer(dataFlow.toRequest()).thenApply(result -> service.terminate(dataFlow)); + + assertThat(future).succeedsWithin(5, TimeUnit.SECONDS).satisfies(result -> { + assertThat(result).isSucceeded(); + }); + verify(source).close(); + } + + @Test + void terminate_shouldFail_whenSourceClosureFails() throws Exception { + var dataFlow = DataFlow.Builder.newInstance().id("dataFlowId") + .source(DataAddress.Builder.newInstance().type("source").build()) + .destination(DataAddress.Builder.newInstance().type("destination").build()) + .build(); + when(sourceFactory.canHandle(any())).thenReturn(true); + when(sourceFactory.createSource(any())).thenReturn(source); + when(sinkFactory.canHandle(any())).thenReturn(true); + when(sinkFactory.createSink(any())).thenReturn(sink); + when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success())); + doThrow(IOException.class).when(source).close(); + + var future = service.transfer(dataFlow.toRequest()).thenApply(result -> service.terminate(dataFlow)); + + assertThat(future).succeedsWithin(5, TimeUnit.SECONDS).satisfies(result -> { + assertThat(result).isFailed().extracting(StreamFailure::getReason).isEqualTo(GENERAL_ERROR); + }); + } + + @Test + void terminate_shouldFail_whenTransferDoesNotExist() { + var dataFlow = DataFlow.Builder.newInstance().id("dataFlowId") + .source(DataAddress.Builder.newInstance().type("source").build()) + .destination(DataAddress.Builder.newInstance().type("destination").build()) + .build(); + + var result = service.terminate(dataFlow); + + assertThat(result).isFailed().extracting(StreamFailure::getReason).isEqualTo(NOT_FOUND); + verifyNoInteractions(source); + } + @ParameterizedTest - @MethodSource("canHandleArguments") + @ArgumentsSource(CanHandleArguments.class) void canHandle_returnsTrue_onlyIfSourceAndSinkCanHandle( boolean sourceFactoryResponse, boolean sinkFactoryResponse, @@ -83,12 +151,16 @@ void canHandle_returnsTrue_onlyIfSourceAndSinkCanHandle( .isEqualTo(expectedResult); } - private static Stream canHandleArguments() { - return Stream.of( - arguments(true, true, true), - arguments(true, false, false), - arguments(false, true, false), - arguments(false, false, false) - ); + private static class CanHandleArguments implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext extensionContext) { + return Stream.of( + arguments(true, true, true), + arguments(true, false, false), + arguments(false, true, false), + arguments(false, false, false) + ); + } } } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceIntegrationTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceIntegrationTest.java index d6a8f725153..c2f45cfadc4 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceIntegrationTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/pipeline/PipelineServiceIntegrationTest.java @@ -16,6 +16,8 @@ import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; import org.eclipse.edc.connector.dataplane.spi.pipeline.InputStreamDataSource; import org.eclipse.edc.connector.dataplane.util.sink.OutputStreamDataSink; import org.eclipse.edc.spi.monitor.Monitor; @@ -43,8 +45,9 @@ void transferData() { var pipelineService = new PipelineServiceImpl(monitor); var endpoint = new FixedEndpoint(monitor); pipelineService.registerFactory(endpoint); + pipelineService.registerFactory(new InputStreamDataFactory()); - var result = pipelineService.transfer(new InputStreamDataSource("test", new ByteArrayInputStream("bytes".getBytes())), createRequest().build()); + var result = pipelineService.transfer(createRequest().build()); assertThat(result).succeedsWithin(5, TimeUnit.SECONDS); assertThat(endpoint.stream.size()).isEqualTo("bytes".getBytes().length); @@ -83,4 +86,20 @@ public DataSink createSink(DataFlowRequest request) { } } + private static class InputStreamDataFactory implements DataSourceFactory { + @Override + public boolean canHandle(DataFlowRequest request) { + return true; + } + + @Override + public DataSource createSource(DataFlowRequest request) { + return new InputStreamDataSource("test", new ByteArrayInputStream("bytes".getBytes())); + } + + @Override + public @NotNull Result validateRequest(DataFlowRequest request) { + return Result.success(); + } + } } diff --git a/core/data-plane/data-plane-util/src/test/java/org/eclipse/edc/connector/dataplane/util/sink/OutputStreamDataSinkFactoryTest.java b/core/data-plane/data-plane-util/src/test/java/org/eclipse/edc/connector/dataplane/util/sink/OutputStreamDataSinkFactoryTest.java index b84aebf826f..568f393a940 100644 --- a/core/data-plane/data-plane-util/src/test/java/org/eclipse/edc/connector/dataplane/util/sink/OutputStreamDataSinkFactoryTest.java +++ b/core/data-plane/data-plane-util/src/test/java/org/eclipse/edc/connector/dataplane/util/sink/OutputStreamDataSinkFactoryTest.java @@ -14,6 +14,7 @@ package org.eclipse.edc.connector.dataplane.util.sink; +import org.eclipse.edc.spi.result.Failure; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; import org.junit.jupiter.api.BeforeEach; @@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; class OutputStreamDataSinkFactoryTest { @@ -35,29 +37,33 @@ void setUp() { @Test void verifyCanHandle() { - assertThat(factory.canHandle(createDataFlowRequest(OutputStreamDataSinkFactory.TYPE))) - .isTrue(); + assertThat(factory.canHandle(createDataFlowRequest(OutputStreamDataSinkFactory.TYPE))).isTrue(); + assertThat(factory.canHandle(createDataFlowRequest("dummy"))).isFalse(); + } + + @Test + void validate_shouldSucceed_whenRequestIsManageable() { + var request = createDataFlowRequest(OutputStreamDataSinkFactory.TYPE); + + var result = factory.validateRequest(request); - assertThat(factory.canHandle(createDataFlowRequest("dummy"))) - .isFalse(); + assertThat(result).isSucceeded(); } @Test - void validate() { - assertThat(factory.validateRequest(createDataFlowRequest(OutputStreamDataSinkFactory.TYPE))) - .satisfies(result -> assertThat(result.succeeded()).isTrue()); - - assertThat(factory.validateRequest(createDataFlowRequest("dummy"))) - .satisfies(result -> { - assertThat(result.failed()).isTrue(); - assertThat(result.getFailureMessages()) - .containsExactly("OutputStreamDataSinkFactory: Cannot handle destination data address with type: dummy"); - }); + void validate_shouldFail_whenRequestIsNotManageable() { + var request = createDataFlowRequest("dummy"); + + var result = factory.validateRequest(request); + + assertThat(result).isFailed().extracting(Failure::getMessages).asList() + .containsExactly("OutputStreamDataSinkFactory: Cannot handle destination data address with type: dummy"); } @Test void verifyCreateSinkReturnCompletedFuture() { var sink = factory.createSink(null); + assertThat(sink.transfer(null)).succeedsWithin(500L, TimeUnit.MILLISECONDS); } diff --git a/extensions/control-plane/api/control-plane-api/src/main/java/org/eclipse/edc/connector/api/transferprocess/TransferProcessControlApiController.java b/extensions/control-plane/api/control-plane-api/src/main/java/org/eclipse/edc/connector/api/transferprocess/TransferProcessControlApiController.java index e93d6885984..ac5f325652e 100644 --- a/extensions/control-plane/api/control-plane-api/src/main/java/org/eclipse/edc/connector/api/transferprocess/TransferProcessControlApiController.java +++ b/extensions/control-plane/api/control-plane-api/src/main/java/org/eclipse/edc/connector/api/transferprocess/TransferProcessControlApiController.java @@ -45,7 +45,6 @@ public TransferProcessControlApiController(TransferProcessService transferProces this.transferProcessService = transferProcessService; } - @POST @Path("/{processId}/complete") @Override diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java index dc908c128a9..ccc5810621e 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java @@ -58,6 +58,11 @@ public boolean canHandle(TransferProcess transferProcess) { .orElse(failure(FATAL_ERROR, format("Failed to find DataPlaneInstance for source/destination: %s/%s", contentAddress.getType(), HTTP_PROXY))); } + @Override + public StatusResult terminate(TransferProcess transferProcess) { + return StatusResult.success(); + } + private DataFlowResponse toResponse(DataAddress address) { return DataFlowResponse.Builder.newInstance().dataAddress(address).build(); } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java index 3647bb40f5e..7b21b06d491 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java @@ -57,4 +57,9 @@ public boolean canHandle(TransferProcess transferProcess) { return dataPlaneClient.transfer(dataFlowRequest).map(it -> DataFlowResponse.Builder.newInstance().build()); } + @Override + public StatusResult terminate(TransferProcess transferProcess) { + return dataPlaneClient.terminate(transferProcess.getId()); + } + } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java index 4ca08d4e897..7fd28552ff0 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java @@ -20,6 +20,7 @@ import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.result.Failure; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; import org.junit.jupiter.api.Test; @@ -28,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.connector.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; @@ -47,7 +49,7 @@ void verifyCanHandle() { } @Test - void verifyInitiateFlowSuccess() { + void initiateFlow_success() { var proxyAddress = dataAddress(); var instance = mock(DataPlaneInstance.class); var transferProcess = TransferProcess.Builder.newInstance() @@ -60,13 +62,13 @@ void verifyInitiateFlowSuccess() { var result = flowController.initiateFlow(transferProcess, null); - assertThat(result.succeeded()).isTrue(); - var response = result.getContent(); - assertThat(response.getDataAddress()).isEqualTo(proxyAddress); + assertThat(result).isSucceeded().satisfies(response -> { + assertThat(response.getDataAddress()).isEqualTo(proxyAddress); + }); } @Test - void verifyInitiateFlowReturnsFailureIfNoDataPlaneInstance() { + void initiateFlow_returnsFailureIfNoDataPlaneInstance() { var transferProcess = TransferProcess.Builder.newInstance() .dataRequest(dataRequest()) .contentDataAddress(dataAddress()) @@ -74,13 +76,13 @@ void verifyInitiateFlowReturnsFailureIfNoDataPlaneInstance() { var result = flowController.initiateFlow(transferProcess, null); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailureDetail()) + + assertThat(result).isFailed().extracting(Failure::getFailureDetail).asString() .isEqualTo(String.format("Failed to find DataPlaneInstance for source/destination: %s/%s", transferProcess.getContentDataAddress().getType(), HTTP_PROXY)); } @Test - void verifyInitiateFlowReturnsFailureIfAddressResolutionFails() { + void initiateFlow_returnsFailureIfAddressResolutionFails() { var errorMsg = "Test Error Message"; var instance = mock(DataPlaneInstance.class); var transferProcess = TransferProcess.Builder.newInstance() @@ -93,8 +95,19 @@ void verifyInitiateFlowReturnsFailureIfAddressResolutionFails() { var result = flowController.initiateFlow(transferProcess, Policy.Builder.newInstance().build()); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailureDetail()).contains(errorMsg); + assertThat(result).isFailed().extracting(Failure::getFailureDetail).asString().contains(errorMsg); + } + + @Test + void terminate_shouldAlwaysReturnSuccess() { + var transferProcess = TransferProcess.Builder.newInstance() + .dataRequest(dataRequest()) + .contentDataAddress(dataAddress()) + .build(); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isSucceeded(); } private TransferProcess transferProcess(String destinationType) { diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java index 6a2e147533a..651903b6035 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java @@ -15,7 +15,6 @@ package org.eclipse.edc.connector.transfer.dataplane.flow; import org.eclipse.edc.connector.dataplane.spi.client.DataPlaneClient; -import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl; import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; @@ -23,7 +22,6 @@ import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -32,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.connector.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -40,24 +39,17 @@ class ProviderPushTransferDataFlowControllerTest { private final DataPlaneClient dataPlaneClient = mock(); - private ProviderPushTransferDataFlowController flowController; - - @BeforeEach - void setUp() { - var callbackUrlMock = mock(ControlApiUrl.class); - var url = URI.create("http://localhost"); - when(callbackUrlMock.get()).thenReturn(url); - flowController = new ProviderPushTransferDataFlowController(callbackUrlMock, dataPlaneClient); - } + private final ProviderPushTransferDataFlowController flowController = + new ProviderPushTransferDataFlowController(() -> URI.create("http://localhost"), dataPlaneClient); @Test - void verifyCanHandle() { + void canHandle() { assertThat(flowController.canHandle(transferProcess(HTTP_PROXY))).isFalse(); assertThat(flowController.canHandle(transferProcess("not-http-proxy"))).isTrue(); } @Test - void verifyReturnFailedResultIfTransferFails() { + void initiateFlow_returnFailedResultIfTransferFails() { var errorMsg = "error"; var transferProcess = TransferProcess.Builder.newInstance() .dataRequest(createDataRequest()) @@ -75,7 +67,7 @@ void verifyReturnFailedResultIfTransferFails() { } @Test - void verifyTransferSuccess() { + void initiateFlow_transferSuccess() { var request = createDataRequest(); var source = testDataAddress(); var transferProcess = TransferProcess.Builder.newInstance() @@ -100,7 +92,7 @@ void verifyTransferSuccess() { } @Test - void verifyTransferSuccessWithAdditionalProperties() { + void initiateFlow_transferSuccessWithAdditionalProperties() { var request = createDataRequest("test"); var source = testDataAddress(); var transferProcess = TransferProcess.Builder.newInstance() @@ -123,6 +115,21 @@ void verifyTransferSuccessWithAdditionalProperties() { assertThat(captured.getCallbackAddress()).isNotNull(); } + @Test + void terminate_shouldCallTerminate() { + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .build(); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + + var result = flowController.terminate(transferProcess); + + assertThat(result).isSucceeded(); + verify(dataPlaneClient).terminate("transferProcessId"); + } + private DataAddress testDataAddress() { return DataAddress.Builder.newInstance().type("test-type").build(); } diff --git a/extensions/data-plane-selector/data-plane-selector-api/src/main/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectFromDataPlaneInstanceTransformer.java b/extensions/data-plane-selector/data-plane-selector-api/src/main/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectFromDataPlaneInstanceTransformer.java index 49f4adc8d6d..b5489c17054 100644 --- a/extensions/data-plane-selector/data-plane-selector-api/src/main/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectFromDataPlaneInstanceTransformer.java +++ b/extensions/data-plane-selector/data-plane-selector-api/src/main/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectFromDataPlaneInstanceTransformer.java @@ -27,7 +27,7 @@ import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.ALLOWED_SOURCE_TYPES; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.LAST_ACTIVE; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.PROPERTIES; -import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.TURNCOUNT; +import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.TURN_COUNT; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.URL; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; @@ -49,7 +49,7 @@ public JsonObjectFromDataPlaneInstanceTransformer(JsonBuilderFactory jsonFactory .add(TYPE, DataPlaneInstance.DATAPLANE_INSTANCE_TYPE) .add(URL, dataPlaneInstance.getUrl().toString()) .add(LAST_ACTIVE, dataPlaneInstance.getLastActive()) - .add(TURNCOUNT, dataPlaneInstance.getTurnCount()); + .add(TURN_COUNT, dataPlaneInstance.getTurnCount()); //properties if (dataPlaneInstance.getProperties() != null && !dataPlaneInstance.getProperties().isEmpty()) { diff --git a/extensions/data-plane-selector/data-plane-selector-api/src/main/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectToDataPlaneInstanceTransformer.java b/extensions/data-plane-selector/data-plane-selector-api/src/main/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectToDataPlaneInstanceTransformer.java index b77e9200f3c..18609878546 100644 --- a/extensions/data-plane-selector/data-plane-selector-api/src/main/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectToDataPlaneInstanceTransformer.java +++ b/extensions/data-plane-selector/data-plane-selector-api/src/main/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectToDataPlaneInstanceTransformer.java @@ -35,7 +35,7 @@ import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.Builder; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.LAST_ACTIVE; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.PROPERTIES; -import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.TURNCOUNT; +import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.TURN_COUNT; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.URL; public class JsonObjectToDataPlaneInstanceTransformer extends AbstractJsonLdTransformer { @@ -62,7 +62,7 @@ private void transformProperties(String key, JsonValue jsonValue, DataPlaneInsta } } case LAST_ACTIVE -> transformLong(context, jsonValue, builder::lastActive); - case TURNCOUNT -> builder.turnCount(transformInt(jsonValue, context)); + case TURN_COUNT -> builder.turnCount(transformInt(jsonValue, context)); case ALLOWED_DEST_TYPES -> { var set = jsonValue.asJsonArray().stream().map(jv -> transformString(jv, context)).collect(Collectors.toSet()); builder.allowedDestTypes(set); diff --git a/extensions/data-plane-selector/data-plane-selector-api/src/test/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectFromDataPlaneInstanceTransformerTest.java b/extensions/data-plane-selector/data-plane-selector-api/src/test/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectFromDataPlaneInstanceTransformerTest.java index fea587bfd5d..3cd8ae7ecbf 100644 --- a/extensions/data-plane-selector/data-plane-selector-api/src/test/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectFromDataPlaneInstanceTransformerTest.java +++ b/extensions/data-plane-selector/data-plane-selector-api/src/test/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectFromDataPlaneInstanceTransformerTest.java @@ -28,7 +28,7 @@ import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.ALLOWED_SOURCE_TYPES; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.LAST_ACTIVE; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.PROPERTIES; -import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.TURNCOUNT; +import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.TURN_COUNT; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.URL; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; import static org.eclipse.edc.jsonld.util.JacksonJsonLd.createObjectMapper; @@ -64,8 +64,8 @@ void transform() { assertThat(jsonObject.getJsonArray(ALLOWED_SOURCE_TYPES)).hasSize(1).allMatch(v -> ((JsonString) v).getString().equals("test-source-type")); assertThat(jsonObject.getJsonArray(ALLOWED_DEST_TYPES)).hasSize(1).allMatch(v -> ((JsonString) v).getString().equals("test-dest-type")); assertThat(jsonObject.getJsonNumber(LAST_ACTIVE).intValue()).isEqualTo(15); - assertThat(jsonObject.getJsonNumber(TURNCOUNT).intValue()).isEqualTo(42); + assertThat(jsonObject.getJsonNumber(TURN_COUNT).intValue()).isEqualTo(42); assertThat(jsonObject.getJsonObject(PROPERTIES).getJsonString("foo").getString()).isEqualTo("bar"); } -} \ No newline at end of file +} diff --git a/extensions/data-plane-selector/data-plane-selector-api/src/test/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectToDataPlaneInstanceTransformerTest.java b/extensions/data-plane-selector/data-plane-selector-api/src/test/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectToDataPlaneInstanceTransformerTest.java index 8d380a70c82..9116f125835 100644 --- a/extensions/data-plane-selector/data-plane-selector-api/src/test/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectToDataPlaneInstanceTransformerTest.java +++ b/extensions/data-plane-selector/data-plane-selector-api/src/test/java/org/eclipse/edc/connector/dataplane/selector/transformer/JsonObjectToDataPlaneInstanceTransformerTest.java @@ -33,7 +33,7 @@ import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.ALLOWED_DEST_TYPES; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.ALLOWED_SOURCE_TYPES; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.LAST_ACTIVE; -import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.TURNCOUNT; +import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.TURN_COUNT; import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance.URL; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; @@ -66,7 +66,7 @@ void transform() { .add(URL, "http://somewhere.com:1234/api/v1") .add(ALLOWED_SOURCE_TYPES, createArrayBuilder(Set.of("source1", "source2"))) .add(LAST_ACTIVE, 234L) - .add(TURNCOUNT, 42) + .add(TURN_COUNT, 42) .add(ALLOWED_DEST_TYPES, createArrayBuilder(Set.of("dest1", "dest2"))) .build(); @@ -113,4 +113,4 @@ void transform_malformedUrl() { private JsonObject expand(JsonObject jsonObject) { return jsonLd.expand(jsonObject).orElseThrow(f -> new AssertionError(f.getFailureDetail())); } -} \ No newline at end of file +} diff --git a/extensions/data-plane/data-plane-api/build.gradle.kts b/extensions/data-plane/data-plane-api/build.gradle.kts index d5ce31e5230..84f4509a722 100644 --- a/extensions/data-plane/data-plane-api/build.gradle.kts +++ b/extensions/data-plane/data-plane-api/build.gradle.kts @@ -34,6 +34,7 @@ dependencies { testImplementation(libs.restAssured) testImplementation(libs.mockserver.netty) testImplementation(libs.mockserver.client) + testImplementation(testFixtures(project(":extensions:common:http:jersey-core"))) } edcBuild { swagger { diff --git a/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApi.java b/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApi.java index 691ee1d2f0a..3b5f7dfa754 100644 --- a/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApi.java +++ b/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApi.java @@ -39,5 +39,14 @@ public interface DataPlaneControlApi { @Operation(description = "Get the current state of a data transfer.", responses = @ApiResponse(responseCode = "200", description = "Missing access token") ) - DataFlowStates getTransferState(String processId); + DataFlowStates getTransferState(String transferProcessId); + + @Operation(description = "Terminates a data transfer.", + responses = { + @ApiResponse(responseCode = "204", description = "Data transfer terminated"), + @ApiResponse(responseCode = "404", description = "Data transfer not handled by the data plane"), + @ApiResponse(responseCode = "409", description = "Cannot terminate the transfer"), + } + ) + void terminateTransfer(String transferProcessId, AsyncResponse response); } diff --git a/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java b/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java index ae28f9c76e8..6f80e8184bf 100644 --- a/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java +++ b/extensions/data-plane/data-plane-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java @@ -15,6 +15,7 @@ package org.eclipse.edc.connector.dataplane.api.controller; import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; @@ -33,8 +34,8 @@ import static org.eclipse.edc.connector.dataplane.api.response.ResponseFunctions.validationErrors; @Path("/transfer") -@Consumes({ MediaType.APPLICATION_JSON }) -@Produces({ MediaType.APPLICATION_JSON }) +@Consumes({MediaType.APPLICATION_JSON}) +@Produces({MediaType.APPLICATION_JSON}) public class DataPlaneControlApiController implements DataPlaneControlApi { private final DataPlaneManager dataPlaneManager; @@ -60,8 +61,18 @@ public void initiateTransfer(DataFlowRequest request, @Suspended AsyncResponse r @GET @Override - @Path("/{processId}") - public DataFlowStates getTransferState(@PathParam("processId") String processId) { - return dataPlaneManager.transferState(processId); + @Path("/{transferProcessId}") + public DataFlowStates getTransferState(@PathParam("transferProcessId") String transferProcessId) { + return dataPlaneManager.transferState(transferProcessId); } + + @DELETE + @Path("/{transferProcessId}") + @Override + public void terminateTransfer(@PathParam("transferProcessId") String transferProcessId, @Suspended AsyncResponse response) { + dataPlaneManager.terminate(transferProcessId) + .onSuccess(r -> response.resume(Response.noContent().build())) + .onFailure(f -> response.resume(validationError("Cannot terminate transfer: " + f.getFailureDetail()))); + } + } diff --git a/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java b/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java new file mode 100644 index 00000000000..66a0634b5f2 --- /dev/null +++ b/extensions/data-plane/data-plane-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import io.restassured.specification.RequestSpecification; +import jakarta.ws.rs.core.HttpHeaders; +import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; +import org.eclipse.edc.junit.annotations.ApiTest; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; +import org.junit.jupiter.api.Test; + +import static io.restassured.RestAssured.given; +import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ApiTest +class DataPlaneControlApiControllerTest extends RestControllerTestBase { + + private final DataPlaneManager manager = mock(); + + @Test + void delete_shouldReturnOk_whenTerminationSucceeds() { + when(manager.terminate(any())).thenReturn(StatusResult.success()); + + baseRequest() + .delete("/transfer/transferId") + .then() + .statusCode(204); + + verify(manager).terminate("transferId"); + } + + @Test + void delete_shouldReturnError_whenTerminationFails() { + when(manager.terminate(any())).thenReturn(StatusResult.failure(FATAL_ERROR)); + + baseRequest() + .delete("/transfer/transferId") + .then() + .statusCode(400); + + verify(manager).terminate("transferId"); + } + + @Override + protected Object controller() { + return new DataPlaneControlApiController(manager); + } + + private RequestSpecification baseRequest() { + return given() + .baseUri("http://localhost:" + port) + .header(HttpHeaders.AUTHORIZATION, "auth") + .when(); + } +} diff --git a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java index 4654c7ec0e5..d27e06934f4 100644 --- a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java +++ b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java @@ -45,4 +45,9 @@ public StatusResult transfer(DataFlowRequest request) { dataPlaneManager.initiate(request); return StatusResult.success(); } + + @Override + public StatusResult terminate(String transferProcessId) { + return dataPlaneManager.terminate(transferProcessId); + } } diff --git a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClient.java b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClient.java index 078db106265..e672a7ffd8f 100644 --- a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClient.java +++ b/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClient.java @@ -28,7 +28,6 @@ import org.eclipse.edc.connector.dataplane.spi.response.TransferErrorResponse; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.http.EdcHttpClient; -import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; @@ -37,6 +36,7 @@ import java.util.Optional; import static java.lang.String.format; +import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; /** * Implementation of a {@link DataPlaneClient} that uses a remote {@link DataPlaneManager} accessible from a REST API. @@ -57,27 +57,43 @@ public RemoteDataPlaneClient(EdcHttpClient httpClient, DataPlaneSelectorClient s @WithSpan @Override - public StatusResult transfer(DataFlowRequest request) { - var instance = selectorClient.find(request.getSourceDataAddress(), request.getDestinationDataAddress(), selectorStrategy); + public StatusResult transfer(DataFlowRequest dataFlowRequest) { + var instance = selectorClient.find(dataFlowRequest.getSourceDataAddress(), dataFlowRequest.getDestinationDataAddress(), selectorStrategy); if (instance == null) { - return StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to find data plane instance supporting request: " + request.getId()); + return StatusResult.failure(FATAL_ERROR, "Failed to find data plane instance supporting request: " + dataFlowRequest.getId()); } RequestBody body; try { - body = RequestBody.create(mapper.writeValueAsString(request), TYPE_JSON); + body = RequestBody.create(mapper.writeValueAsString(dataFlowRequest), TYPE_JSON); } catch (JsonProcessingException e) { throw new EdcException(e); } var rq = new Request.Builder().post(body).url(instance.getUrl()).build(); try (var response = httpClient.execute(rq)) { - return handleResponse(response, request.getId()); + return handleResponse(response, dataFlowRequest.getId()); } catch (IOException e) { - return StatusResult.failure(ResponseStatus.FATAL_ERROR, e.getMessage()); + return StatusResult.failure(FATAL_ERROR, e.getMessage()); } } + @Override + public StatusResult terminate(String transferProcessId) { + return selectorClient.getAll().stream() + .map(dataPlane -> { + var request = new Request.Builder().delete().url(dataPlane.getUrl() + "/" + transferProcessId).build(); + + try (var response = httpClient.execute(request)) { // TODO: should retry when status is 409 (???) + return handleResponse(response, transferProcessId); + } catch (IOException e) { + return StatusResult.failure(FATAL_ERROR, e.getMessage()); + } + }) + .findAny() + .orElse(StatusResult.success()); + } + private StatusResult handleResponse(Response response, String requestId) { if (response.isSuccessful()) { return StatusResult.success(); @@ -90,7 +106,7 @@ private StatusResult handleError(Response response, String requestId) { var errorMsg = Optional.ofNullable(response.body()) .map(this::formatErrorMessage) .orElse("null response body"); - return StatusResult.failure(ResponseStatus.FATAL_ERROR, format("Transfer request failed with status code %s for request %s: %s", response.code(), requestId, errorMsg)); + return StatusResult.failure(FATAL_ERROR, format("Transfer request failed with status code %s for request %s: %s", response.code(), requestId, errorMsg)); } private String formatErrorMessage(ResponseBody body) { diff --git a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java b/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java index f6e5a443fca..259c4b9418e 100644 --- a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java +++ b/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java @@ -16,14 +16,14 @@ import org.eclipse.edc.connector.dataplane.spi.client.DataPlaneClient; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; +import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatNullPointerException; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -33,50 +33,46 @@ class EmbeddedDataPlaneClientTest { - private DataPlaneManager dataPlaneManagerMock; - private DataPlaneClient client; - - @BeforeEach - public void setUp() { - dataPlaneManagerMock = mock(DataPlaneManager.class); - client = new EmbeddedDataPlaneClient(dataPlaneManagerMock); - } + private final DataPlaneManager dataPlaneManager = mock(); + private final DataPlaneClient client = new EmbeddedDataPlaneClient(dataPlaneManager); @Test - void verifyDataPlaneManagerMandatory() { - assertThatNullPointerException().isThrownBy(() -> new EmbeddedDataPlaneClient(null)); + void transfer_shouldSucceed_whenTransferInitiatedCorrectly() { + var request = createDataFlowRequest(); + when(dataPlaneManager.validate(any())).thenReturn(Result.success(true)); + doNothing().when(dataPlaneManager).initiate(any()); + + var result = client.transfer(request); + + verify(dataPlaneManager).validate(request); + verify(dataPlaneManager).initiate(request); + + assertThat(result).isSucceeded(); } @Test - void verifyReturnFailedResultIfValidationFailure() { + void transfer_shouldReturnFailedResult_whenValidationFailure() { var errorMsg = "error"; var request = createDataFlowRequest(); - when(dataPlaneManagerMock.validate(any())).thenReturn(Result.failure(errorMsg)); - doNothing().when(dataPlaneManagerMock).initiate(any()); + when(dataPlaneManager.validate(any())).thenReturn(Result.failure(errorMsg)); + doNothing().when(dataPlaneManager).initiate(any()); var result = client.transfer(request); - verify(dataPlaneManagerMock).validate(request); - verify(dataPlaneManagerMock, never()).initiate(any()); + verify(dataPlaneManager).validate(request); + verify(dataPlaneManager, never()).initiate(any()); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailureMessages()) - .hasSize(1) - .allSatisfy(s -> assertThat(s).contains(errorMsg)); + assertThat(result).isFailed().messages().hasSize(1).allSatisfy(s -> assertThat(s).contains(errorMsg)); } @Test - void verifyTransferSuccess() { - var request = createDataFlowRequest(); - when(dataPlaneManagerMock.validate(any())).thenReturn(Result.success(true)); - doNothing().when(dataPlaneManagerMock).initiate(any()); - - var result = client.transfer(request); + void terminate_shouldProxyCallToManager() { + when(dataPlaneManager.terminate(any())).thenReturn(StatusResult.success()); - verify(dataPlaneManagerMock).validate(request); - verify(dataPlaneManagerMock).initiate(request); + var result = client.terminate("dataFlowId"); - assertThat(result.succeeded()).isTrue(); + assertThat(result).isSucceeded(); + verify(dataPlaneManager).terminate("dataFlowId"); } private static DataFlowRequest createDataFlowRequest() { diff --git a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClientTest.java b/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClientTest.java index 09792c5cb6f..6f812ca069b 100644 --- a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClientTest.java +++ b/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/RemoteDataPlaneClientTest.java @@ -20,7 +20,6 @@ import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.dataplane.spi.client.DataPlaneClient; import org.eclipse.edc.connector.dataplane.spi.response.TransferErrorResponse; -import org.eclipse.edc.spi.http.EdcHttpClient; import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.types.TypeManager; import org.eclipse.edc.spi.types.domain.DataAddress; @@ -28,7 +27,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockserver.integration.ClientAndServer; import org.mockserver.model.HttpRequest; @@ -37,22 +35,23 @@ import org.mockserver.model.MediaType; import org.mockserver.verify.VerificationTimes; -import java.net.MalformedURLException; -import java.net.URL; import java.util.List; import java.util.UUID; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatNullPointerException; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; import static org.eclipse.edc.junit.testfixtures.TestUtils.testHttpClient; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockserver.integration.ClientAndServer.startClientAndServer; import static org.mockserver.matchers.Times.once; import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.model.HttpStatusCode.CONFLICT_409; +import static org.mockserver.model.HttpStatusCode.NO_CONTENT_204; import static org.mockserver.stop.Stop.stopQuietly; class RemoteDataPlaneClientTest { @@ -62,55 +61,29 @@ class RemoteDataPlaneClientTest { private static final int DATA_PLANE_API_PORT = getFreePort(); private static final String DATA_PLANE_PATH = "/transfer"; private static final String DATA_PLANE_API_URI = "http://localhost:" + DATA_PLANE_API_PORT + DATA_PLANE_PATH; - - /** - * Data plane mock server. - */ - private static ClientAndServer dataPlaneClientAndServer; - private DataPlaneSelectorClient selectorClientMock; - private DataPlaneClient dataPlaneClient; + private static ClientAndServer dataPlane; + private final DataPlaneSelectorClient selectorClient = mock(); + private final DataPlaneClient dataPlaneClient = new RemoteDataPlaneClient(testHttpClient(), selectorClient, "test", MAPPER); @BeforeAll public static void setUp() { - dataPlaneClientAndServer = startClientAndServer(DATA_PLANE_API_PORT); + dataPlane = startClientAndServer(DATA_PLANE_API_PORT); } @AfterAll public static void tearDown() { - stopQuietly(dataPlaneClientAndServer); + stopQuietly(dataPlane); } - /** - * Reset mock server internal state after every test. - */ @AfterEach public void resetMockServer() { - dataPlaneClientAndServer.reset(); - } - - @BeforeEach - public void init() { - selectorClientMock = mock(DataPlaneSelectorClient.class); - var selectionStrategy = "test"; - dataPlaneClient = new RemoteDataPlaneClient(testHttpClient(), selectorClientMock, selectionStrategy, MAPPER); - } - - @Test - void verifyCtor() { - assertThatNullPointerException().isThrownBy(() -> new RemoteDataPlaneClient(null, selectorClientMock, "test", MAPPER)) - .withMessageContaining("Http client"); - assertThatNullPointerException().isThrownBy(() -> new RemoteDataPlaneClient(mock(EdcHttpClient.class), null, "test", MAPPER)) - .withMessageContaining("Data plane selector client"); - assertThatNullPointerException().isThrownBy(() -> new RemoteDataPlaneClient(mock(EdcHttpClient.class), selectorClientMock, null, MAPPER)) - .withMessageContaining("Selector strategy"); - assertThatNullPointerException().isThrownBy(() -> new RemoteDataPlaneClient(mock(EdcHttpClient.class), selectorClientMock, "test", null)) - .withMessageContaining("Object mapper"); + dataPlane.reset(); } @Test - void verifyReturnsFatalErrorIfNoDataPlaneInstanceFound() { + void transfer_verifyReturnsFatalErrorIfNoDataPlaneInstanceFound() { var flowRequest = createDataFlowRequest(); - when(selectorClientMock.find(any(), any(), any())).thenReturn(null); + when(selectorClient.find(any(), any(), any())).thenReturn(null); var result = dataPlaneClient.transfer(flowRequest); @@ -121,21 +94,18 @@ void verifyReturnsFatalErrorIfNoDataPlaneInstanceFound() { } @Test - void verifyReturnFatalErrorIfReceiveResponseWithNullBody() throws MalformedURLException, JsonProcessingException { + void transfer_verifyReturnFatalErrorIfReceiveResponseWithNullBody() throws JsonProcessingException { var flowRequest = createDataFlowRequest(); - // mock data plane selector - var instance = mock(DataPlaneInstance.class); - when(instance.getUrl()).thenReturn(new URL(DATA_PLANE_API_URI)); - when(selectorClientMock.find(any(), any(), any())).thenReturn(instance); + var instance = DataPlaneInstance.Builder.newInstance().url(DATA_PLANE_API_URI).build(); + when(selectorClient.find(any(), any(), any())).thenReturn(instance); - // config data plane mock server var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(flowRequest)); - dataPlaneClientAndServer.when(httpRequest, once()).respond(response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code())); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code())); var result = dataPlaneClient.transfer(flowRequest); - dataPlaneClientAndServer.verify(httpRequest, VerificationTimes.once()); + dataPlane.verify(httpRequest, VerificationTimes.once()); assertThat(result.failed()).isTrue(); assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); @@ -146,22 +116,19 @@ void verifyReturnFatalErrorIfReceiveResponseWithNullBody() throws MalformedURLEx } @Test - void verifyReturnFatalErrorIfReceiveErrrorInResponse() throws MalformedURLException, JsonProcessingException { + void transfer_verifyReturnFatalErrorIfReceiveErrorInResponse() throws JsonProcessingException { var flowRequest = createDataFlowRequest(); - // mock data plane selector - var instance = mock(DataPlaneInstance.class); - when(instance.getUrl()).thenReturn(new URL(DATA_PLANE_API_URI)); - when(selectorClientMock.find(any(), any(), any())).thenReturn(instance); + var instance = DataPlaneInstance.Builder.newInstance().url(DATA_PLANE_API_URI).build(); + when(selectorClient.find(any(), any(), any())).thenReturn(instance); - // config data plane mock server var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(flowRequest)); var errorMsg = UUID.randomUUID().toString(); - dataPlaneClientAndServer.when(httpRequest, once()).respond(withResponse(errorMsg)); + dataPlane.when(httpRequest, once()).respond(withResponse(errorMsg)); var result = dataPlaneClient.transfer(flowRequest); - dataPlaneClientAndServer.verify(httpRequest, VerificationTimes.once()); + dataPlane.verify(httpRequest, VerificationTimes.once()); assertThat(result.failed()).isTrue(); assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); @@ -172,25 +139,48 @@ void verifyReturnFatalErrorIfReceiveErrrorInResponse() throws MalformedURLExcept } @Test - void verifyTransferSucess() throws JsonProcessingException, MalformedURLException { + void transfer_verifyTransferSuccess() throws JsonProcessingException { var flowRequest = createDataFlowRequest(); - // mock data plane selector - var instance = mock(DataPlaneInstance.class); - when(instance.getUrl()).thenReturn(new URL(DATA_PLANE_API_URI)); - when(selectorClientMock.find(any(), any(), any())).thenReturn(instance); + var instance = DataPlaneInstance.Builder.newInstance().url(DATA_PLANE_API_URI).build(); + when(selectorClient.find(any(), any(), any())).thenReturn(instance); - // config data plane mock server var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(flowRequest)); - dataPlaneClientAndServer.when(httpRequest, once()).respond(response().withStatusCode(HttpStatusCode.OK_200.code())); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(HttpStatusCode.OK_200.code())); var result = dataPlaneClient.transfer(flowRequest); - dataPlaneClientAndServer.verify(httpRequest, VerificationTimes.once()); + dataPlane.verify(httpRequest, VerificationTimes.once()); assertThat(result.succeeded()).isTrue(); } + @Test + void terminate_shouldCallTerminateOnAllTheAvailableDataPlanes() { + var instance = DataPlaneInstance.Builder.newInstance().url(DATA_PLANE_API_URI).build(); + when(selectorClient.getAll()).thenReturn(List.of(instance)); + var httpRequest = new HttpRequest().withMethod("DELETE").withPath(DATA_PLANE_PATH + "/processId"); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(NO_CONTENT_204.code())); + + var result = dataPlaneClient.terminate("processId"); + + assertThat(result).isSucceeded(); + verify(selectorClient).getAll(); + dataPlane.verify(httpRequest, VerificationTimes.once()); + } + + @Test + void terminate_shouldFail_whenConflictResponse() { + var instance = DataPlaneInstance.Builder.newInstance().url(DATA_PLANE_API_URI).build(); + when(selectorClient.getAll()).thenReturn(List.of(instance)); + var httpRequest = new HttpRequest().withMethod("DELETE").withPath(DATA_PLANE_PATH + "/processId"); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(CONFLICT_409.code())); + + var result = dataPlaneClient.terminate("processId"); + + assertThat(result).isFailed(); + } + private static HttpResponse withResponse(String errorMsg) throws JsonProcessingException { return response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code()) .withBody(MAPPER.writeValueAsString(new TransferErrorResponse(List.of(errorMsg))), MediaType.APPLICATION_JSON); diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSource.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSource.java index 056a5da01e7..ed32d0d4919 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSource.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSource.java @@ -83,6 +83,11 @@ public StreamResult> openPartStream() { private HttpDataSource() { } + @Override + public void close() { + + } + public static class Builder { private final HttpDataSource dataSource; diff --git a/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/dataplane/kafka/pipeline/KafkaDataSource.java b/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/dataplane/kafka/pipeline/KafkaDataSource.java index d07734ea4bb..213b66b51c3 100644 --- a/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/dataplane/kafka/pipeline/KafkaDataSource.java +++ b/extensions/data-plane/data-plane-kafka/src/main/java/org/eclipse/edc/dataplane/kafka/pipeline/KafkaDataSource.java @@ -24,7 +24,6 @@ import org.jetbrains.annotations.NotNull; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.InputStream; import java.time.Clock; import java.time.Duration; @@ -39,7 +38,7 @@ import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.success; -class KafkaDataSource implements DataSource, Closeable { +class KafkaDataSource implements DataSource { private String name; private Monitor monitor; @@ -54,6 +53,7 @@ private KafkaDataSource() { @Override public void close() { if (consumer != null) { + // TODO: should be the iterator closed as well? consumer.close(); } } diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/response/StatusResult.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/response/StatusResult.java index 639932e4078..72b79d279a9 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/response/StatusResult.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/response/StatusResult.java @@ -15,12 +15,15 @@ package org.eclipse.edc.spi.response; import org.eclipse.edc.spi.result.AbstractResult; +import org.eclipse.edc.spi.result.StoreFailure; +import org.eclipse.edc.spi.result.StoreResult; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.Collections; import java.util.List; +import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; @@ -56,4 +59,16 @@ public boolean fatalError() { protected , C1> R1 newInstance(@Nullable C1 content, @Nullable ResponseFailure failure) { return (R1) new StatusResult<>(content, failure); } + + public static StatusResult from(StoreResult storeResult) { + if (storeResult.succeeded()) { + return success(storeResult.getContent()); + } + + if (storeResult.reason() == StoreFailure.Reason.ALREADY_LEASED) { + return StatusResult.failure(ERROR_RETRY, storeResult.getFailureDetail()); + } else { + return StatusResult.failure(FATAL_ERROR, storeResult.getFailureDetail()); + } + } } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java index 1d2838cc2f5..5407fc5bc31 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java @@ -48,6 +48,14 @@ public interface DataFlowController { @NotNull StatusResult initiateFlow(TransferProcess transferProcess, Policy policy); + /** + * Terminate a data flow. + * + * @param transferProcess the transfer process. + * @return success if the flow is terminated correctly, failure otherwise; + */ + StatusResult terminate(TransferProcess transferProcess); + /** * Returns true if the manager can handle the data type. * diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java index 2dd703eac71..baf34749c37 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java @@ -54,6 +54,15 @@ public interface DataFlowManager { @NotNull StatusResult initiate(TransferProcess transferProcess, Policy policy); + /** + * Terminates a data flow. + * + * @param transferProcess the transfer process. + * @return success if the flow has been stopped correctly, failed otherwise. + */ + @NotNull + StatusResult terminate(TransferProcess transferProcess); + /** * Initiates a data flow. * diff --git a/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/instance/DataPlaneInstance.java b/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/instance/DataPlaneInstance.java index 3fbf43193e2..5e4cd349be4 100644 --- a/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/instance/DataPlaneInstance.java +++ b/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/instance/DataPlaneInstance.java @@ -39,35 +39,22 @@ public class DataPlaneInstance { public static final String DATAPLANE_INSTANCE_TYPE = EDC_NAMESPACE + "DataPlaneInstance"; - public static final String TURNCOUNT = EDC_NAMESPACE + "turnCount"; + public static final String TURN_COUNT = EDC_NAMESPACE + "turnCount"; public static final String LAST_ACTIVE = EDC_NAMESPACE + "lastActive"; public static final String URL = EDC_NAMESPACE + "url"; public static final String PROPERTIES = EDC_NAMESPACE + "properties"; public static final String ALLOWED_SOURCE_TYPES = EDC_NAMESPACE + "allowedSourceTypes"; public static final String ALLOWED_DEST_TYPES = EDC_NAMESPACE + "allowedDestTypes"; - private Map properties; - - private Set allowedSourceTypes; - - private Set allowedDestTypes; - - private int turnCount; - - private long lastActive; - + private Map properties = new HashMap<>(); + private Set allowedSourceTypes = new HashSet<>(); + private Set allowedDestTypes = new HashSet<>(); + private int turnCount = 0; + private long lastActive = Instant.now().toEpochMilli(); private URL url; - private String id; - protected DataPlaneInstance() { - turnCount = 0; - lastActive = Instant.now().toEpochMilli(); - properties = new HashMap<>(); - url = null; - - allowedSourceTypes = new HashSet<>(); - allowedDestTypes = new HashSet<>(); + private DataPlaneInstance() { } public String getId() { @@ -112,7 +99,6 @@ public Set getAllowedDestTypes() { return Collections.unmodifiableSet(allowedDestTypes); } - @JsonPOJOBuilder(withPrefix = "") public static final class Builder { private final DataPlaneInstance instance; @@ -122,41 +108,41 @@ private Builder() { } @JsonCreator - public static DataPlaneInstance.Builder newInstance() { - return new DataPlaneInstance.Builder(); + public static Builder newInstance() { + return new Builder(); } - public DataPlaneInstance.Builder turnCount(int turnCount) { + public Builder turnCount(int turnCount) { instance.turnCount = turnCount; return this; } - public DataPlaneInstance.Builder lastActive(long lastActive) { + public Builder lastActive(long lastActive) { instance.lastActive = lastActive; return this; } - public DataPlaneInstance.Builder id(String id) { + public Builder id(String id) { instance.id = id; return this; } - public DataPlaneInstance.Builder allowedSourceType(String type) { + public Builder allowedSourceType(String type) { instance.allowedSourceTypes.add(type); return this; } - public DataPlaneInstance.Builder allowedDestType(String type) { + public Builder allowedDestType(String type) { instance.allowedDestTypes.add(type); return this; } - public DataPlaneInstance.Builder url(URL url) { + public Builder url(URL url) { instance.url = url; return this; } - public DataPlaneInstance.Builder url(String url) { + public Builder url(String url) { try { instance.url = new URL(url); } catch (MalformedURLException e) { @@ -165,35 +151,35 @@ public DataPlaneInstance.Builder url(String url) { return this; } - public DataPlaneInstance build() { - if (instance.id == null) { - instance.id = UUID.randomUUID().toString(); - } - Objects.requireNonNull(instance.url, "DataPlaneInstance must have an URL"); - - return instance; - } - - public DataPlaneInstance.Builder property(String key, Object value) { + public Builder property(String key, Object value) { instance.properties.put(key, value); return this; } - public DataPlaneInstance.Builder allowedDestTypes(Set types) { + public Builder allowedDestTypes(Set types) { instance.allowedDestTypes = types; return this; } - public DataPlaneInstance.Builder allowedSourceTypes(Set types) { + public Builder allowedSourceTypes(Set types) { if (types != null) { instance.allowedSourceTypes = types; } return this; } - public DataPlaneInstance.Builder properties(Map properties) { + public Builder properties(Map properties) { instance.properties = properties; return this; } + + public DataPlaneInstance build() { + if (instance.id == null) { + instance.id = UUID.randomUUID().toString(); + } + Objects.requireNonNull(instance.url, "DataPlaneInstance must have an URL"); + + return instance; + } } } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/client/DataPlaneClient.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/client/DataPlaneClient.java index 45f019c2549..6f8838eb597 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/client/DataPlaneClient.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/client/DataPlaneClient.java @@ -28,4 +28,12 @@ public interface DataPlaneClient { * Delegates data transfer to the Data Plane. */ StatusResult transfer(DataFlowRequest request); + + /** + * Terminate the transfer. + * + * @param transferProcessId the transfer process id. + * @return success if the transfer has been terminated, failure otherwise. + */ + StatusResult terminate(String transferProcessId); } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java index e687e5a9cd4..e73cd7ff597 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java @@ -19,6 +19,7 @@ import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint; import org.eclipse.edc.spi.entity.StateEntityManager; +import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; @@ -70,4 +71,12 @@ public interface DataPlaneManager extends StateEntityManager { * Returns the transfer state for the process. */ DataFlowStates transferState(String processId); + + /** + * Terminate the data flow. + * + * @param dataFlowId the data flow id. + * @return success if data flow is terminated, failed otherwise. + */ + StatusResult terminate(String dataFlowId); } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSource.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSource.java index d26c23efc62..9a75a09078d 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSource.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/DataSource.java @@ -18,10 +18,11 @@ import java.util.stream.Stream; /** - * Implements pull semantics for accessing a data source. A data source is composed of one or more named parts. Some implementations may support random access of the underlying - * part content so that large content transfers can be parallelized. + * Implements pull semantics for accessing a data source. A data source is composed of one or more named parts. + * Some implementations may support random access of the underlying part content so that large content transfers can be + * parallelized. */ -public interface DataSource { +public interface DataSource extends AutoCloseable { /** * Opens a stream to the source parts. diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/PipelineService.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/PipelineService.java index 55a9035878a..ec348ec36cb 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/PipelineService.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/PipelineService.java @@ -15,7 +15,6 @@ package org.eclipse.edc.connector.dataplane.spi.pipeline; import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint; -import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; import java.util.concurrent.CompletableFuture; @@ -24,27 +23,7 @@ * Transfers data from a source to a sink. */ @ExtensionPoint -public interface PipelineService { - - /** - * Returns true if this service can transfer the request. - */ - boolean canHandle(DataFlowRequest request); - - /** - * Returns true if the request is valid. - */ - Result validate(DataFlowRequest request); - - /** - * Transfers data from source to destination. - */ - CompletableFuture> transfer(DataFlowRequest request); - - /** - * Transfers data using the supplied data source. - */ - CompletableFuture> transfer(DataSource source, DataFlowRequest request); +public interface PipelineService extends TransferService { /** * Transfers data using the supplied data sink. diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/TransferService.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/TransferService.java index 88e63699326..10b0f0b9a16 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/TransferService.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/pipeline/TransferService.java @@ -14,6 +14,7 @@ package org.eclipse.edc.connector.dataplane.spi.pipeline; +import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; @@ -38,4 +39,11 @@ public interface TransferService { * Transfers data from source to destination. */ CompletableFuture> transfer(DataFlowRequest request); + + /** + * Terminate a data flow. + * + * @param dataFlow the data flow. + */ + StreamResult terminate(DataFlow dataFlow); } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java index bab847bd8b8..304ce1379e3 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java @@ -133,12 +133,7 @@ void kafkaToHttpTransfer() { eventDestination.reset(); }); - /* TODO: this should be enabled to ensure that the transfer gets actually stopped: - * https://github.com/eclipse-edc/Connector/issues/3453 - * await().atLeast(5, TimeUnit.SECONDS).untilAsserted(() -> { - * eventDestination.verifyZeroInteractions(); - * }); - */ + await().atLeast(5, TimeUnit.SECONDS).atMost(TIMEOUT).untilAsserted(eventDestination::verifyZeroInteractions); stopQuietly(eventDestination); }