Skip to content

Commit

Permalink
Add STOPPING state on TransferProcess
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Sep 28, 2023
1 parent 90856ac commit 5ebe8cd
Show file tree
Hide file tree
Showing 15 changed files with 339 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.eclipse.edc.connector.transfer.spi.types.command.CompleteTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionCompleteCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.StopTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.command.CommandHandlerRegistry;
Expand Down Expand Up @@ -91,6 +92,11 @@ public ServiceResult<Stream<TransferProcess>> query(QuerySpec query) {
});
}

@Override
public ServiceResult<Void> stop(StopTransferCommand command) {
return transactionContext.execute(() -> commandHandlerRegistry.execute(command).flatMap(ServiceResult::from));
}

@Override
public @NotNull ServiceResult<Void> complete(String transferProcessId) {
var command = new CompleteTransferCommand(transferProcessId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.StopTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand;
import org.eclipse.edc.service.spi.result.ServiceFailure;
import org.eclipse.edc.service.spi.result.ServiceResult;
Expand All @@ -49,6 +50,7 @@
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETING;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.service.spi.result.ServiceFailure.Reason.BAD_REQUEST;
import static org.eclipse.edc.service.spi.result.ServiceFailure.Reason.NOT_FOUND;
Expand Down Expand Up @@ -173,6 +175,27 @@ void terminate_shouldFailWhenCommandHandlerFails() {
assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND);
}

@Test
void stop_shouldExecuteCommandAndReturnResult() {
when(commandHandlerRegistry.execute(any())).thenReturn(CommandResult.success());
var command = new StopTransferCommand("id", "reason", COMPLETING);

var result = service.stop(command);

assertThat(result).isSucceeded();
verify(commandHandlerRegistry).execute(command);
}

@Test
void stop_shouldFailWhenCommandHandlerFails() {
when(commandHandlerRegistry.execute(any())).thenReturn(CommandResult.notFound("not found"));
var command = new StopTransferCommand("id", "reason", COMPLETING);

var result = service.stop(command);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND);
}

@Test
void deprovision() {
when(commandHandlerRegistry.execute(any())).thenReturn(CommandResult.success());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.connector.transfer.command.handlers.CompleteTransferCommandHandler;
import org.eclipse.edc.connector.transfer.command.handlers.DeprovisionRequestCommandHandler;
import org.eclipse.edc.connector.transfer.command.handlers.StopTransferCommandHandler;
import org.eclipse.edc.connector.transfer.command.handlers.TerminateTransferCommandHandler;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.runtime.metamodel.annotation.CoreExtension;
Expand All @@ -40,6 +41,7 @@ public void initialize(ServiceExtensionContext context) {
registry.register(new TerminateTransferCommandHandler(store));
registry.register(new DeprovisionRequestCommandHandler(store));
registry.register(new CompleteTransferCommandHandler(store));
registry.register(new StopTransferCommandHandler(store));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.transfer.command.handlers;

import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.connector.transfer.spi.types.command.StopTransferCommand;
import org.eclipse.edc.spi.command.EntityCommandHandler;

import java.util.List;

import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.SUSPENDING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATING;

/**
* Transition a TransferProcess to the {@link TransferProcessStates#STOPPING} state.
* Only possible on PROVIDER side.
*/
public class StopTransferCommandHandler extends EntityCommandHandler<StopTransferCommand, TransferProcess> {

public StopTransferCommandHandler(TransferProcessStore store) {
super(store);
}

@Override
public Class<StopTransferCommand> getType() {
return StopTransferCommand.class;
}

@Override
protected boolean modify(TransferProcess process, StopTransferCommand command) {
if (process.canBeStopped() && List.of(COMPLETING, TERMINATING, SUSPENDING).contains(command.getSubsequentState())) {
process.transitionStopping(command.getReason(), command.getSubsequentState());
return true;
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.asset.DataAddressResolver;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.protocol.ProtocolWebhook;
Expand Down Expand Up @@ -70,6 +71,7 @@
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.REQUESTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.REQUESTING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STOPPING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATING;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
import static org.eclipse.edc.spi.persistence.StateEntityStore.isNotPending;
Expand Down Expand Up @@ -164,6 +166,7 @@ protected StateMachineManager.Builder configureStateMachineManager(StateMachineM
.processor(processTransfersInState(PROVISIONED, this::processProvisioned))
.processor(processConsumerTransfersInState(REQUESTING, this::processRequesting))
.processor(processProviderTransfersInState(STARTING, this::processStarting))
.processor(processProviderTransfersInState(STOPPING, this::processStopping))
.processor(processTransfersInState(COMPLETING, this::processCompleting))
.processor(processTransfersInState(TERMINATING, this::processTerminating))
.processor(processTransfersInState(DEPROVISIONING, this::processDeprovisioning));
Expand Down Expand Up @@ -319,25 +322,25 @@ private boolean processStarting(TransferProcess process) {
.execute(description);
}

/**
* Process STOPPING transfer<p>
* Terminate data transfer and transition to the pre-set subsequent state, that could be one of:
* - COMPLETING
* - TERMINATING
* - SUSPENDING
* Can never happen for CONSUMER TransferProcess.
*
* @param process the TransferProcess.
* @return if the transfer has been processed or not
*/
@WithSpan
private void sendTransferStartMessage(TransferProcess process, DataFlowResponse dataFlowResponse, Policy policy) {
var message = TransferStartMessage.Builder.newInstance()
.processId(process.getCorrelationId())
.protocol(process.getProtocol())
.dataAddress(dataFlowResponse.getDataAddress())
.counterPartyAddress(process.getConnectorAddress())
.policy(policy)
.build();

var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress());

entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message))
.entityRetrieve(id -> store.findById(id))
.onSuccess((t, content) -> transitionToStarted(t))
.onFailure((t, throwable) -> transitionToStarting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.execute(description);
private boolean processStopping(TransferProcess process) {
return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.terminate(process))
.onSuccess((p, dataFlowResponse) -> transitionTo(p, process.stoppingSubsequentState()))
.onFatalError((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.onFailure((t, failure) -> transitionToStopping(t))
.onRetryExhausted((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.execute("Terminate data flow");
}

/**
Expand Down Expand Up @@ -419,6 +422,27 @@ private boolean processDeprovisioning(TransferProcess process) {
.execute("deprovisioning");
}

@WithSpan
private void sendTransferStartMessage(TransferProcess process, DataFlowResponse dataFlowResponse, Policy policy) {
var message = TransferStartMessage.Builder.newInstance()
.processId(process.getCorrelationId())
.protocol(process.getProtocol())
.dataAddress(dataFlowResponse.getDataAddress())
.counterPartyAddress(process.getConnectorAddress())
.policy(policy)
.build();

var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress());

entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message))
.entityRetrieve(id -> store.findById(id))
.onSuccess((t, content) -> transitionToStarted(t))
.onFailure((t, throwable) -> transitionToStarting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.execute(description);
}

private <T> void handleResult(TransferProcess transferProcess, List<StatusResult<T>> responses, ResponsesHandler<StatusResult<T>> handler) {
if (handler.handle(transferProcess, responses)) {
update(transferProcess);
Expand Down Expand Up @@ -506,6 +530,21 @@ private void transitionToTerminating(TransferProcess process, String message, Th
update(process);
}

private void transitionToStopping(TransferProcess transferProcess) {
transferProcess.transitionStopping(transferProcess.getErrorDetail(), transferProcess.stoppingSubsequentState());
update(transferProcess);
}

private void transitionTo(TransferProcess transferProcess, TransferProcessStates state) {
switch (state) {
case COMPLETING -> transferProcess.transitionCompleting();
case TERMINATING -> transferProcess.transitionTerminating();
case SUSPENDING -> transferProcess.transitionSuspending();
default -> throw new EdcException("this case should never happen");
}
update(transferProcess);
}

private void transitionToTerminated(TransferProcess process, Throwable throwable) {
transitionToTerminated(process, throwable.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - Initial implementation
*
*/

package org.eclipse.edc.connector.transfer.command.handlers;

import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.command.StopTransferCommand;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.CONSUMER;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.PROVIDER;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STOPPING;
import static org.mockito.Mockito.mock;

class StopTransferCommandHandlerTest {

private final TransferProcessStore store = mock();
private final StopTransferCommandHandler handler = new StopTransferCommandHandler(store);

@Test
void verifyCorrectType() {
assertThat(handler.getType()).isEqualTo(StopTransferCommand.class);
}

@Test
void shouldModify_ifItCanBeStopped() {
var command = new StopTransferCommand("test-id", "a reason", COMPLETING);
var entity = TransferProcess.Builder.newInstance().type(PROVIDER).state(STARTED.code()).build();

var result = handler.modify(entity, command);

assertThat(result).isTrue();
assertThat(entity.getState()).isEqualTo(STOPPING.code());
assertThat(entity.getErrorDetail()).isEqualTo("a reason");
assertThat(entity.stoppingSubsequentState()).isEqualTo(COMPLETING);
}

@Test
void shouldNotModify_ifItCannotBeStopped() {
var command = new StopTransferCommand("test-id", "a reason", STARTING);
var entity = TransferProcess.Builder.newInstance().type(PROVIDER).state(COMPLETED.code()).build();

var result = handler.modify(entity, command);

assertThat(result).isFalse();
assertThat(entity.getState()).isEqualTo(COMPLETED.code());
assertThat(entity.getErrorDetail()).isNull();
}

@Test
void shouldNotModify_ifItIsConsumer() {
var command = new StopTransferCommand("test-id", "a reason", COMPLETING);
var entity = TransferProcess.Builder.newInstance().type(CONSUMER).state(STARTED.code()).build();

var result = handler.modify(entity, command);

assertThat(result).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.REQUESTING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STOPPING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATING;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
Expand Down Expand Up @@ -556,6 +557,22 @@ void starting_whenShouldWait_updatesStateCount() {
verify(transferProcessStore).save(argThat(p -> p.getState() == STARTING.code()));
});
}

@Test
void stopping_shouldStopDataTransferAndTransitionToSubsequentState() {
var process = createTransferProcess(STARTED).toBuilder().type(PROVIDER).build();
process.transitionStopping(null, COMPLETING);
when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build());
when(transferProcessStore.nextNotLeased(anyInt(), providerStateIs(STOPPING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success());

manager.start();

await().untilAsserted(() -> {
verify(dataFlowManager).terminate(process);
verify(transferProcessStore).save(argThat(p -> p.getState() == COMPLETING.code()));
});
}

@Test
void completing_shouldTransitionToCompleted_whenSendingMessageSucceed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore;
import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.types.command.StopTransferCommand;
import org.eclipse.edc.policy.engine.spi.PolicyContextImpl;
import org.eclipse.edc.policy.engine.spi.PolicyEngine;
import org.eclipse.edc.spi.query.Criterion;
Expand All @@ -33,6 +34,7 @@
import java.util.function.Function;

import static org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntryStates.STARTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETING;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;

/**
Expand Down Expand Up @@ -85,7 +87,8 @@ private boolean processMonitoring(PolicyMonitorEntry entry) {
var result = policyEngine.evaluate("transfer.process", policy, policyContext);
if (result.failed()) {
monitor.debug(() -> "[policy-monitor] Policy evaluation for TP %s failed: %s".formatted(entry.getId(), result.getFailureDetail()));
var completeResult = transferProcessService.complete(entry.getId());
var command = new StopTransferCommand(entry.getId(), result.getFailureDetail(), COMPLETING);
var completeResult = transferProcessService.stop(command);
if (completeResult.succeeded()) {
entry.transitionToCompleted();
update(entry);
Expand Down
Loading

0 comments on commit 5ebe8cd

Please sign in to comment.