diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImpl.java index b38fb8eb849..c5d2b52115b 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImpl.java @@ -21,6 +21,7 @@ import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService; import org.eclipse.edc.connector.transfer.spi.TransferProcessManager; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource; import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedContentResource; @@ -33,13 +34,23 @@ import org.eclipse.edc.connector.transfer.spi.types.command.AddProvisionedResourceCommand; import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionCompleteCommand; import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionRequest; -import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransferCommand; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyCompletedTransfer; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransfer; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyTerminatedTransfer; +import org.eclipse.edc.connector.transfer.spi.types.command.SingleTransferProcessCommand; import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRemoteMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage; import org.eclipse.edc.service.spi.result.ServiceResult; import org.eclipse.edc.spi.dataaddress.DataAddressValidator; import org.eclipse.edc.spi.iam.ClaimToken; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.AbstractResult; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import org.eclipse.edc.spi.types.domain.transfer.command.CompleteTransferCommand; import org.eclipse.edc.transaction.spi.TransactionContext; import org.jetbrains.annotations.NotNull; @@ -48,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; @@ -98,28 +110,19 @@ public ServiceResult> query(QuerySpec query) { }); } - @Override - public ServiceResult notifyStarted(String dataRequestId) { - return transactionContext.execute(() -> Optional.of(dataRequestId) - .map(transferProcessStore::processIdForDataRequestId) - .map(id -> apply(id, this::startedImpl)) - .orElse(ServiceResult.notFound(format("TransferProcess with DataRequest id %s not found", dataRequestId))) - ); - } - @Override public @NotNull ServiceResult terminate(String transferProcessId, String reason) { - return apply(transferProcessId, transferProcess -> terminateImpl(transferProcess, reason)); + return transactionContext.execute(() -> runAsync(new TerminateTransferCommand(transferProcessId, reason))); } @Override public @NotNull ServiceResult complete(String transferProcessId) { - return apply(transferProcessId, this::completeImpl); + return transactionContext.execute(() -> runAsync(new CompleteTransferCommand(transferProcessId))); } @Override public @NotNull ServiceResult deprovision(String transferProcessId) { - return apply(transferProcessId, this::deprovisionImpl); + return transactionContext.execute(() -> runAsync(new DeprovisionRequest(transferProcessId))); } @Override @@ -140,16 +143,39 @@ public ServiceResult notifyStarted(String dataRequestId) { } @Override - public @NotNull ServiceResult initiateTransfer(TransferRequest request, ClaimToken claimToken) { - var validDestination = dataAddressValidator.validate(request.getDataRequest().getDataDestination()); + public ServiceResult completeDeprovision(String transferProcessId, DeprovisionedResource resource) { + return transactionContext.execute(() -> runAsync(new DeprovisionCompleteCommand(transferProcessId, resource))); + } + + @Override + public ServiceResult addProvisionedResource(String transferProcessId, ProvisionResponse response) { + return transactionContext.execute(() -> runAsync(new AddProvisionedResourceCommand(transferProcessId, response))); + } + + @Override + public @NotNull ServiceResult notifyRequested(TransferRequestMessage message, ClaimToken claimToken) { + var validDestination = dataAddressValidator.validate(message.getDataDestination()); if (validDestination.failed()) { return ServiceResult.badRequest(validDestination.getFailureMessages().toArray(new String[]{})); } + var dataRequest = DataRequest.Builder.newInstance() + .id(message.getId()) + .protocol(message.getProtocol()) + .connectorId(message.getConnectorId()) + .connectorAddress(message.getConnectorAddress()) + .dataDestination(message.getDataDestination()) + .properties(message.getProperties()) + .assetId(message.getAssetId()) + .contractId(message.getContractId()) + .build(); + + var transferRequest = TransferRequest.Builder.newInstance().dataRequest(dataRequest).build(); + return transactionContext.execute(() -> - Optional.ofNullable(negotiationStore.findContractAgreement(request.getDataRequest().getContractId())) + Optional.ofNullable(negotiationStore.findContractAgreement(message.getContractId())) .filter(agreement -> contractValidationService.validateAgreement(claimToken, agreement).succeeded()) - .map(agreement -> manager.initiateProviderRequest(request)) + .map(agreement -> manager.initiateProviderRequest(transferRequest)) .filter(AbstractResult::succeeded) .map(AbstractResult::getContent) .map(ServiceResult::success) @@ -157,13 +183,57 @@ public ServiceResult notifyStarted(String dataRequestId) { } @Override - public ServiceResult completeDeprovision(String transferProcessId, DeprovisionedResource resource) { - return apply(transferProcessId, completeDeprovisionImpl(resource)); + public ServiceResult notifyStarted(TransferStartMessage message, ClaimToken claimToken) { + return transactionContext.execute(() -> handleRemoteMessage(message)); } @Override - public ServiceResult addProvisionedResource(String transferProcessId, ProvisionResponse response) { - return apply(transferProcessId, addProvisionedResourceImpl(response)); + public ServiceResult notifyCompleted(TransferCompletionMessage message, ClaimToken claimToken) { + return transactionContext.execute(() -> handleRemoteMessage(message)); + } + + @Override + public ServiceResult notifyTerminated(TransferTerminationMessage message, ClaimToken claimToken) { + return transactionContext.execute(() -> handleRemoteMessage(message)); + } + + @NotNull + private ServiceResult handleRemoteMessage(TransferRemoteMessage message) { + return Optional.of(message.getProcessId()) + .map(transferProcessStore::processIdForDataRequestId) + .map(id -> messageToCommandFactories.get(message.getClass()).apply(id)) + .map(this::runSync) + .orElse(ServiceResult.notFound(format("TransferProcess with DataRequest id %s not found", message.getProcessId()))); + } + + private ServiceResult runAsync(SingleTransferProcessCommand command) { + return Optional.of(command.getTransferProcessId()) + .map(transferProcessStore::find) + .map(transferProcess -> { + var validator = asyncCommandValidators.get(command.getClass()); + var validationResult = validator.apply(command, transferProcess); + if (validationResult.failed()) { + return ServiceResult.conflict(format("Cannot %s because %s", command.getClass().getSimpleName(), validationResult.getFailureDetail())); + } + + manager.enqueueCommand(command); + return ServiceResult.success(transferProcess); + }) + .orElse(ServiceResult.notFound(format("TransferProcess with id %s not found", command.getTransferProcessId()))); + } + + private ServiceResult runSync(SingleTransferProcessCommand command) { + return Optional.of(command.getTransferProcessId()) + .map(transferProcessStore::find) + .map(transferProcess -> { + var commandResult = manager.runCommand(command); + if (commandResult.succeeded()) { + return ServiceResult.success(transferProcess); + } else { + return ServiceResult.conflict(format("Cannot %s because %s", command.getClass().getSimpleName(), commandResult.getFailureDetail())); + } + }) + .orElse(ServiceResult.notFound(format("TransferProcess with id %s not found", command.getTransferProcessId()))); } private Map, List>> getSubtypes() { @@ -173,63 +243,34 @@ private Map, List>> getSubtypes() { ); } - private ServiceResult apply(String transferProcessId, Function> function) { - return transactionContext.execute(() -> { - var transferProcess = transferProcessStore.find(transferProcessId); - return Optional.ofNullable(transferProcess) - .map(function) - .orElse(ServiceResult.notFound(format("TransferProcess %s does not exist", transferProcessId))); - }); - } + private final Map, MessageToCommandFactory> messageToCommandFactories = Map.of( + TransferStartMessage.class, NotifyStartedTransfer::new, + TransferCompletionMessage.class, NotifyCompletedTransfer::new, + TransferTerminationMessage.class, NotifyTerminatedTransfer::new + ); - private ServiceResult terminateImpl(TransferProcess transferProcess, String reason) { - if (transferProcess.canBeTerminated()) { - manager.enqueueCommand(new TerminateTransferCommand(transferProcess.getId(), reason)); + private final Map, CommandValidator> asyncCommandValidators = Map.of( + TerminateTransferCommand.class, + (command, transferProcess) -> transferProcess.canBeTerminated() + ? Result.success() + : Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))), - return ServiceResult.success(transferProcess); - } else { - return ServiceResult.conflict(format("TransferProcess %s cannot be terminated as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))); - } - } + org.eclipse.edc.spi.types.domain.transfer.command.CompleteTransferCommand.class, + (command, transferProcess) -> transferProcess.canBeCompleted() + ? Result.success() + : Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))), - private ServiceResult deprovisionImpl(TransferProcess transferProcess) { - if (transferProcess.canBeDeprovisioned()) { - manager.enqueueCommand(new DeprovisionRequest(transferProcess.getId())); - return ServiceResult.success(transferProcess); - } else { - return ServiceResult.conflict(format("TransferProcess %s cannot be deprovisioned as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))); - } - } + DeprovisionRequest.class, + (command, transferProcess) -> transferProcess.canBeDeprovisioned() + ? Result.success() + : Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))), - private ServiceResult completeImpl(TransferProcess transferProcess) { - if (transferProcess.canBeCompleted()) { - manager.enqueueCommand(new CompleteTransferCommand(transferProcess.getId())); - return ServiceResult.success(transferProcess); - } else { - return ServiceResult.conflict(format("TransferProcess %s cannot be completed as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))); - } - } + DeprovisionCompleteCommand.class, (command, transferProcess) -> Result.success(), - private ServiceResult startedImpl(TransferProcess transferProcess) { - if (transferProcess.canBeStartedConsumer()) { - manager.enqueueCommand(new NotifyStartedTransferCommand(transferProcess.getId())); - return ServiceResult.success(transferProcess); - } else { - return ServiceResult.conflict(format("TransferProcess %s cannot be started as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))); - } - } + AddProvisionedResourceCommand.class, (command, transferProcess) -> Result.success() + ); - private Function> completeDeprovisionImpl(DeprovisionedResource resource) { - return (transferProcess -> { - manager.enqueueCommand(new DeprovisionCompleteCommand(transferProcess.getId(), resource)); - return ServiceResult.success(transferProcess); - }); - } + private interface MessageToCommandFactory extends Function { } - private Function> addProvisionedResourceImpl(ProvisionResponse response) { - return (transferProcess -> { - manager.enqueueCommand(new AddProvisionedResourceCommand(transferProcess.getId(), response)); - return ServiceResult.success(transferProcess); - }); - } + private interface CommandValidator extends BiFunction> {} } diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessEventDispatchTest.java index b2b6122019d..7ad840499d6 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessEventDispatchTest.java @@ -20,6 +20,7 @@ import org.eclipse.edc.connector.transfer.spi.retry.TransferWaitStrategy; import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferRequest; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; import org.eclipse.edc.junit.extensions.EdcExtension; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.event.EventSubscriber; @@ -31,6 +32,7 @@ import org.eclipse.edc.spi.event.transferprocess.TransferProcessRequested; import org.eclipse.edc.spi.event.transferprocess.TransferProcessStarted; import org.eclipse.edc.spi.event.transferprocess.TransferProcessTerminated; +import org.eclipse.edc.spi.iam.ClaimToken; import org.eclipse.edc.spi.message.RemoteMessageDispatcher; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.junit.jupiter.api.BeforeEach; @@ -101,7 +103,8 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessRequested.class))); }); - service.notifyStarted("dataRequestId"); + var startMessage = TransferStartMessage.Builder.newInstance().processId("dataRequestId").protocol("any").connectorAddress("http://any").build(); + service.notifyStarted(startMessage, ClaimToken.Builder.newInstance().build()); await().untilAsserted(() -> { verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessStarted.class))); diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImplTest.java index 1025e98eb62..e18da23c903 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImplTest.java @@ -26,8 +26,14 @@ import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.connector.transfer.spi.types.TransferRequest; import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionRequest; -import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransferCommand; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyCompletedTransfer; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransfer; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyTerminatedTransfer; import org.eclipse.edc.connector.transfer.spi.types.command.SingleTransferProcessCommand; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.service.spi.result.ServiceResult; import org.eclipse.edc.spi.dataaddress.DataAddressValidator; @@ -35,6 +41,7 @@ import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.transaction.spi.NoopTransactionContext; import org.eclipse.edc.transaction.spi.TransactionContext; import org.junit.jupiter.api.Test; @@ -49,7 +56,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETING; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.REQUESTED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.service.spi.result.ServiceFailure.Reason.BAD_REQUEST; import static org.junit.jupiter.params.provider.EnumSource.Mode.EXCLUDE; import static org.junit.jupiter.params.provider.EnumSource.Mode.INCLUDE; @@ -164,9 +174,52 @@ void initiateTransfer_consumer_invalidDestination_shouldNotInitiateTransfer() { verifyNoInteractions(manager); } + @ParameterizedTest + @EnumSource(value = TransferProcessStates.class, mode = INCLUDE, names = { "COMPLETED", "DEPROVISIONING", "TERMINATED" }) + void deprovision(TransferProcessStates state) { + var process = transferProcess(state, id); + when(store.find(id)).thenReturn(process); + + var result = service.deprovision(id); + + assertThat(result.succeeded()).isTrue(); + verify(manager).enqueueCommand(commandCaptor.capture()); + assertThat(commandCaptor.getValue()).isInstanceOf(DeprovisionRequest.class); + assertThat(commandCaptor.getValue().getTransferProcessId()) + .isEqualTo(id); + verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); + } + + @ParameterizedTest + @EnumSource(value = TransferProcessStates.class, mode = EXCLUDE, names = { "COMPLETED", "DEPROVISIONING", "DEPROVISIONED", "DEPROVISIONING_REQUESTED", "TERMINATED" }) + void deprovision_whenNonDeprovisionable(TransferProcessStates state) { + var process = transferProcess(state, id); + when(store.find(id)).thenReturn(process); + + var result = service.deprovision(id); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailureMessages()).containsExactly("Cannot DeprovisionRequest because TransferProcess " + process.getId() + " is in state " + state); + verifyNoInteractions(manager); + verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); + } + @Test - void initiateTransfer_provider_validAgreement_shouldInitiateTransfer() { - var transferRequest = transferRequest(); + void deprovision_whenNotFound() { + var result = service.deprovision(id); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailureMessages()).containsExactly("TransferProcess with id " + id + " not found"); + verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); + } + + @Test + void notifyRequested_validAgreement_shouldInitiateTransfer() { + var message = TransferRequestMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .dataDestination(DataAddress.Builder.newInstance().type("any").build()) + .build(); var claimToken = claimToken(); var processId = "processId"; @@ -175,32 +228,40 @@ void initiateTransfer_provider_validAgreement_shouldInitiateTransfer() { when(manager.initiateProviderRequest(any())).thenReturn(StatusResult.success(processId)); when(dataAddressValidator.validate(any())).thenReturn(Result.success()); - var result = service.initiateTransfer(transferRequest, claimToken); + var result = service.notifyRequested(message, claimToken); assertThat(result.succeeded()).isTrue(); assertThat(result.getContent()).isEqualTo(processId); - verify(manager).initiateProviderRequest(transferRequest); + verify(manager).initiateProviderRequest(any()); } @Test - void initiateTransfer_provider_invalidAgreement_shouldNotInitiateTransfer() { - var transferRequest = transferRequest(); + void notifyRequested_invalidAgreement_shouldNotInitiateTransfer() { + var message = TransferRequestMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .dataDestination(DataAddress.Builder.newInstance().type("any").build()) + .build(); var claimToken = claimToken(); when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement()); when(validationService.validateAgreement(any(), any())).thenReturn(Result.failure("error")); when(dataAddressValidator.validate(any())).thenReturn(Result.success()); - var result = service.initiateTransfer(transferRequest, claimToken); + var result = service.notifyRequested(message, claimToken); assertThat(result.succeeded()).isFalse(); verifyNoInteractions(manager); } @Test - void initiateTransfer_provider_invalidDestination_shouldNotInitiateTransfer() { + void notifyRequested_invalidDestination_shouldNotInitiateTransfer() { when(dataAddressValidator.validate(any())).thenReturn(Result.failure("invalid data address")); - var result = service.initiateTransfer(transferRequest(), claimToken()); + var result = service.notifyRequested(TransferRequestMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .dataDestination(DataAddress.Builder.newInstance().type("any").build()) + .build(), claimToken()); assertThat(result).satisfies(ServiceResult::failed) .extracting(ServiceResult::reason) @@ -208,78 +269,171 @@ void initiateTransfer_provider_invalidDestination_shouldNotInitiateTransfer() { verifyNoInteractions(manager); } - @ParameterizedTest - @EnumSource(value = TransferProcessStates.class, mode = INCLUDE, names = { "COMPLETED", "DEPROVISIONING", "TERMINATED" }) - void deprovision(TransferProcessStates state) { - var process = transferProcess(state, id); - when(store.find(id)).thenReturn(process); + @Test + void notifyStarted_shouldSucceed_whenCommandSucceeds() { + when(store.processIdForDataRequestId("dataRequestId")).thenReturn("processId"); + when(store.find("processId")).thenReturn(transferProcess(REQUESTED, "processId")); + when(manager.runCommand(isA(NotifyStartedTransfer.class))).thenReturn(Result.success()); + var message = TransferStartMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); - var result = service.deprovision(id); + var result = service.notifyStarted(message, token); - assertThat(result.succeeded()).isTrue(); - verify(manager).enqueueCommand(commandCaptor.capture()); - assertThat(commandCaptor.getValue()).isInstanceOf(DeprovisionRequest.class); - assertThat(commandCaptor.getValue().getTransferProcessId()) - .isEqualTo(id); - verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); + assertThat(result).matches(ServiceResult::succeeded); + verify(manager).runCommand(isA(NotifyStartedTransfer.class)); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } - @ParameterizedTest - @EnumSource(value = TransferProcessStates.class, mode = EXCLUDE, names = { "COMPLETED", "DEPROVISIONING", "DEPROVISIONED", "DEPROVISIONING_REQUESTED", "TERMINATED" }) - void deprovision_whenNonDeprovisionable(TransferProcessStates state) { - var process = transferProcess(state, id); - when(store.find(id)).thenReturn(process); + @Test + void notifyStarted_shouldFail_whenTransferProcessNotFound() { + when(store.processIdForDataRequestId("dataRequestId")).thenReturn("processId"); + when(store.find("processId")).thenReturn(null); + var message = TransferStartMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); - var result = service.deprovision(id); + var result = service.notifyStarted(message, token); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailureMessages()).containsExactly("TransferProcess " + process.getId() + " cannot be deprovisioned as it is in state " + state); - verifyNoInteractions(manager); - verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); + assertThat(result).matches(ServiceResult::failed); + verify(manager, never()).runCommand(any()); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } @Test - void deprovision_whenNotFound() { - var result = service.deprovision(id); + void notifyStarted_shouldFail_whenCommandFails() { + when(store.processIdForDataRequestId("dataRequestId")).thenReturn("processId"); + when(store.find("processId")).thenReturn(TransferProcess.Builder.newInstance().id(UUID.randomUUID().toString()).state(COMPLETED.code()).build()); + when(manager.runCommand(isA(NotifyStartedTransfer.class))).thenReturn(Result.failure("an error")); + var message = TransferStartMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailureMessages()).containsExactly("TransferProcess " + id + " does not exist"); - verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); + var result = service.notifyStarted(message, token); + + assertThat(result).matches(ServiceResult::failed); + verify(manager).runCommand(isA(NotifyStartedTransfer.class)); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); + } + + @Test + void notifyCompleted_shouldSucceed_whenCommandSucceeds() { + when(store.processIdForDataRequestId("dataRequestId")).thenReturn("transferProcessId"); + when(store.find("transferProcessId")).thenReturn(transferProcess(STARTED, "transferProcessId")); + when(manager.runCommand(isA(NotifyCompletedTransfer.class))).thenReturn(Result.success()); + var message = TransferCompletionMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); + + var result = service.notifyCompleted(message, token); + + assertThat(result).matches(ServiceResult::succeeded); + verify(manager).runCommand(isA(NotifyCompletedTransfer.class)); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } @Test - void started_shouldEnqueueCommand() { + void notifyCompleted_shouldFail_whenTransferProcessNotFound() { when(store.processIdForDataRequestId("dataRequestId")).thenReturn("processId"); - when(store.find("processId")).thenReturn(transferProcess(REQUESTED, "processId")); + when(store.find("processId")).thenReturn(null); + var message = TransferCompletionMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); - var result = service.notifyStarted("dataRequestId"); + var result = service.notifyCompleted(message, token); + + assertThat(result).matches(ServiceResult::failed); + verifyNoInteractions(manager); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); + } + + @Test + void notifyCompleted_shouldFail_whenCommandFails() { + when(store.processIdForDataRequestId("dataRequestId")).thenReturn("processId"); + when(store.find("processId")).thenReturn(TransferProcess.Builder.newInstance().id(UUID.randomUUID().toString()).state(REQUESTED.code()).build()); + when(manager.runCommand(isA(NotifyCompletedTransfer.class))).thenReturn(Result.failure("an error")); + var message = TransferCompletionMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); + + var result = service.notifyCompleted(message, token); + + assertThat(result).matches(ServiceResult::failed); + verify(manager).runCommand(isA(NotifyCompletedTransfer.class)); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); + } + + @Test + void notifyTerminated_shouldSucceed_whenCommandSucceeds() { + when(store.processIdForDataRequestId("dataRequestId")).thenReturn("transferProcessId"); + when(store.find("transferProcessId")).thenReturn(transferProcess(COMPLETING, "transferProcessId")); + when(manager.runCommand(isA(NotifyTerminatedTransfer.class))).thenReturn(Result.success()); + var message = TransferTerminationMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); + + var result = service.notifyTerminated(message, token); assertThat(result).matches(ServiceResult::succeeded); - verify(manager).enqueueCommand(isA(NotifyStartedTransferCommand.class)); + verify(manager).runCommand(isA(NotifyTerminatedTransfer.class)); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } @Test - void started_shouldNotEnqueueCommandIfProcessIsNotFound() { + void notifyTerminated_shouldFail_whenTransferProcessNotFound() { when(store.processIdForDataRequestId("dataRequestId")).thenReturn("processId"); when(store.find("processId")).thenReturn(null); + var message = TransferTerminationMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); - var result = service.notifyStarted("dataRequestId"); + var result = service.notifyTerminated(message, token); assertThat(result).matches(ServiceResult::failed); - verify(manager, never()).enqueueCommand(any()); + verifyNoInteractions(manager); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } @Test - void started_shouldNotEnqueueCommandIfIsNotConsumerAndStateIsNotValid() { + void notifyTerminated_shouldFail_whenCommandFails() { when(store.processIdForDataRequestId("dataRequestId")).thenReturn("processId"); - when(store.find("processId")).thenReturn(TransferProcess.Builder.newInstance().id(UUID.randomUUID().toString()).state(COMPLETED.code()).build()); + when(store.find("processId")).thenReturn(TransferProcess.Builder.newInstance().id(UUID.randomUUID().toString()).state(TERMINATED.code()).build()); + when(manager.runCommand(isA(NotifyTerminatedTransfer.class))).thenReturn(Result.failure("an error")); + var message = TransferTerminationMessage.Builder.newInstance() + .protocol("protocol") + .connectorAddress("http://any") + .processId("dataRequestId") + .build(); + var token = ClaimToken.Builder.newInstance().build(); - var result = service.notifyStarted("dataRequestId"); + var result = service.notifyTerminated(message, token); assertThat(result).matches(ServiceResult::failed); - verify(manager, never()).enqueueCommand(any()); + verify(manager).runCommand(isA(NotifyTerminatedTransfer.class)); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/TransferProcessCommandExtension.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/TransferProcessCommandExtension.java index b0e0a1aacd2..1f0f87d5eaa 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/TransferProcessCommandExtension.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/TransferProcessCommandExtension.java @@ -17,7 +17,9 @@ import org.eclipse.edc.connector.transfer.command.handlers.CancelTransferCommandHandler; import org.eclipse.edc.connector.transfer.command.handlers.CompleteTransferCommandHandler; import org.eclipse.edc.connector.transfer.command.handlers.DeprovisionRequestHandler; +import org.eclipse.edc.connector.transfer.command.handlers.NotifyCompletedTransferCommandHandler; import org.eclipse.edc.connector.transfer.command.handlers.NotifyStartedTransferCommandHandler; +import org.eclipse.edc.connector.transfer.command.handlers.NotifyTerminatedTransferCommandHandler; import org.eclipse.edc.connector.transfer.command.handlers.TerminateTransferCommandHandler; import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; @@ -48,6 +50,8 @@ public void initialize(ServiceExtensionContext context) { registry.register(new DeprovisionRequestHandler(store)); registry.register(new CompleteTransferCommandHandler(store)); registry.register(new NotifyStartedTransferCommandHandler(store, observable)); + registry.register(new NotifyCompletedTransferCommandHandler(store, observable)); + registry.register(new NotifyTerminatedTransferCommandHandler(store, observable)); } } diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyCompletedTransferCommandHandler.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyCompletedTransferCommandHandler.java new file mode 100644 index 00000000000..d9f38e4441e --- /dev/null +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyCompletedTransferCommandHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.command.handlers; + +import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; +import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyCompletedTransfer; + +/** + * Puts a TransferProcess in the COMPLETED state as the counter-party actually completed the transfer. + */ +public class NotifyCompletedTransferCommandHandler extends SingleTransferProcessCommandHandler { + + private final TransferProcessObservable observable; + + public NotifyCompletedTransferCommandHandler(TransferProcessStore store, TransferProcessObservable observable) { + super(store); + this.observable = observable; + } + + @Override + public Class getType() { + return NotifyCompletedTransfer.class; + } + + @Override + protected boolean modify(TransferProcess process, NotifyCompletedTransfer command) { + if (process.canBeCompleted()) { + observable.invokeForEach(l -> l.preCompleted(process)); + process.transitionCompleted(); + return true; + } + + return false; + } + + @Override + protected void postAction(TransferProcess process) { + observable.invokeForEach(l -> l.completed(process)); + } +} diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyStartedTransferCommandHandler.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyStartedTransferCommandHandler.java index 95a1eeae405..0cd02b00a0b 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyStartedTransferCommandHandler.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyStartedTransferCommandHandler.java @@ -17,14 +17,14 @@ import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; -import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransferCommand; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransfer; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.CONSUMER; /** * Puts a TransferProcess in the STARTED state as the counter-party actually started the transfer. */ -public class NotifyStartedTransferCommandHandler extends SingleTransferProcessCommandHandler { +public class NotifyStartedTransferCommandHandler extends SingleTransferProcessCommandHandler { private final TransferProcessObservable observable; @@ -34,12 +34,12 @@ public NotifyStartedTransferCommandHandler(TransferProcessStore store, TransferP } @Override - public Class getType() { - return NotifyStartedTransferCommand.class; + public Class getType() { + return NotifyStartedTransfer.class; } @Override - protected boolean modify(TransferProcess process, NotifyStartedTransferCommand command) { + protected boolean modify(TransferProcess process, NotifyStartedTransfer command) { if (process.getType() == CONSUMER && process.canBeStartedConsumer()) { observable.invokeForEach(l -> l.preStarted(process)); process.transitionStarted(); diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyTerminatedTransferCommandHandler.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyTerminatedTransferCommandHandler.java new file mode 100644 index 00000000000..65ec734e46d --- /dev/null +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyTerminatedTransferCommandHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.command.handlers; + +import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; +import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyTerminatedTransfer; + +/** + * Puts a TransferProcess in the TERMINATED state as the counter-party actually completed the transfer. + */ +public class NotifyTerminatedTransferCommandHandler extends SingleTransferProcessCommandHandler { + + private final TransferProcessObservable observable; + + public NotifyTerminatedTransferCommandHandler(TransferProcessStore store, TransferProcessObservable observable) { + super(store); + this.observable = observable; + } + + @Override + public Class getType() { + return NotifyTerminatedTransfer.class; + } + + @Override + protected boolean modify(TransferProcess process, NotifyTerminatedTransfer command) { + if (process.canBeTerminated()) { + observable.invokeForEach(l -> l.preTerminated(process)); + process.transitionTerminated(); + return true; + } + + return false; + } + + @Override + protected void postAction(TransferProcess process) { + observable.invokeForEach(l -> l.terminated(process)); + } +} diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java index 34582244796..9fefed0af01 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java @@ -185,6 +185,11 @@ public void enqueueCommand(TransferProcessCommand command) { commandQueue.enqueue(command); } + @Override + public Result runCommand(TransferProcessCommand command) { + return commandRunner.runCommand(command); + } + @Override public void handleProvisionResult(String processId, List> responses) { var transferProcess = transferProcessStore.find(processId); @@ -384,8 +389,8 @@ private boolean processRequesting(TransferProcess process) { var message = TransferRequestMessage.Builder.newInstance() .id(dataRequest.getId()) .protocol(dataRequest.getProtocol()) - .connectorAddress(dataRequest.getConnectorAddress()) .connectorId(dataRequest.getConnectorId()) + .connectorAddress(dataRequest.getConnectorAddress()) .dataDestination(dataRequest.getDataDestination()) .properties(dataRequest.getProperties()) .assetId(dataRequest.getAssetId()) @@ -461,6 +466,7 @@ private void sendTransferStartMessage(TransferProcess process) { var message = TransferStartMessage.Builder.newInstance() .protocol(dataRequest.getProtocol()) .connectorAddress(dataRequest.getConnectorAddress()) // TODO: is this correct? it shouldn't be for provider. + .processId(dataRequest.getId()) .build(); var description = format("Send %s to %s", dataRequest.getClass().getSimpleName(), dataRequest.getConnectorAddress()); @@ -529,6 +535,7 @@ private boolean processCompleting(TransferProcess process) { var message = TransferCompletionMessage.Builder.newInstance() .protocol(dataRequest.getProtocol()) .connectorAddress(dataRequest.getConnectorAddress()) + .processId(dataRequest.getId()) .build(); var description = format("Send %s to %s", dataRequest.getClass().getSimpleName(), dataRequest.getConnectorAddress()); @@ -559,6 +566,7 @@ private boolean processTerminating(TransferProcess process) { var message = TransferTerminationMessage.Builder.newInstance() .connectorAddress(dataRequest.getConnectorAddress()) .protocol(dataRequest.getProtocol()) + .processId(dataRequest.getId()) .build(); var description = format("Send %s to %s", dataRequest.getClass().getSimpleName(), dataRequest.getConnectorAddress()); diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyCompletedTransferCommandHandlerTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyCompletedTransferCommandHandlerTest.java new file mode 100644 index 00000000000..5efafaaa7cd --- /dev/null +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyCompletedTransferCommandHandlerTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.command.handlers; + +import org.eclipse.edc.connector.transfer.observe.TransferProcessObservableImpl; +import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener; +import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyCompletedTransfer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.CONSUMER; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.REQUESTED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +class NotifyCompletedTransferCommandHandlerTest { + + private final TransferProcessStore store = mock(TransferProcessStore.class); + private final TransferProcessListener listener = mock(TransferProcessListener.class); + + private NotifyCompletedTransferCommandHandler handler; + + @BeforeEach + void setup() { + var observable = new TransferProcessObservableImpl(); + observable.registerListener(listener); + handler = new NotifyCompletedTransferCommandHandler(store, observable); + } + + @Test + void type() { + assertThat(handler.getType()).isEqualTo(NotifyCompletedTransfer.class); + } + + @Test + void shouldTransitionStateToCompleted() { + var process = TransferProcess.Builder.newInstance().id("processId").type(CONSUMER).state(STARTED.code()).build(); + when(store.find(process.getId())).thenReturn(process); + + handler.handle(new NotifyCompletedTransfer("processId")); + + verify(store).save(argThat(p -> p.currentStateIsOneOf(COMPLETED) && p.getId().equals("processId"))); + verify(listener).completed(isA(TransferProcess.class)); + } + + @Test + void shouldNotTransitionToCompleted_whenItInAnInvalidState() { + var process = TransferProcess.Builder.newInstance().id("processId").type(CONSUMER).state(REQUESTED.code()).build(); + when(store.find(process.getId())).thenReturn(process); + + handler.handle(new NotifyCompletedTransfer("processId")); + + verify(store, never()).save(any()); + verifyNoInteractions(listener); + } +} diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyStartedTransferCommandHandlerTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyStartedTransferCommandHandlerTest.java index b78f394d4c0..f544c018490 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyStartedTransferCommandHandlerTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyStartedTransferCommandHandlerTest.java @@ -18,7 +18,7 @@ import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; -import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransferCommand; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransfer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -52,26 +52,26 @@ void setup() { @Test void type() { - assertThat(handler.getType()).isEqualTo(NotifyStartedTransferCommand.class); + assertThat(handler.getType()).isEqualTo(NotifyStartedTransfer.class); } @Test - void shouldTransitStateToStarted() { + void shouldTransitionStateToStarted() { var process = TransferProcess.Builder.newInstance().id("processId").type(CONSUMER).state(REQUESTED.code()).build(); when(store.find(process.getId())).thenReturn(process); - handler.handle(new NotifyStartedTransferCommand("processId")); + handler.handle(new NotifyStartedTransfer("processId")); verify(store).save(argThat(p -> p.currentStateIsOneOf(STARTED) && p.getId().equals("processId"))); verify(listener).started(isA(TransferProcess.class)); } @Test - void shouldNotTransitToStarted_whenItInAnInvalidState() { + void shouldNotTransitionToStarted_whenItInAnInvalidState() { var process = TransferProcess.Builder.newInstance().id("processId").type(CONSUMER).state(COMPLETED.code()).build(); when(store.find(process.getId())).thenReturn(process); - handler.handle(new NotifyStartedTransferCommand("processId")); + handler.handle(new NotifyStartedTransfer("processId")); verify(store, never()).save(any()); verifyNoInteractions(listener); diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyTerminatedTransferCommandHandlerTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyTerminatedTransferCommandHandlerTest.java new file mode 100644 index 00000000000..1151de77d69 --- /dev/null +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/command/handlers/NotifyTerminatedTransferCommandHandlerTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.command.handlers; + +import org.eclipse.edc.connector.transfer.observe.TransferProcessObservableImpl; +import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener; +import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.transfer.spi.types.command.NotifyTerminatedTransfer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.CONSUMER; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +class NotifyTerminatedTransferCommandHandlerTest { + + private final TransferProcessStore store = mock(TransferProcessStore.class); + private final TransferProcessListener listener = mock(TransferProcessListener.class); + + private NotifyTerminatedTransferCommandHandler handler; + + @BeforeEach + void setup() { + var observable = new TransferProcessObservableImpl(); + observable.registerListener(listener); + handler = new NotifyTerminatedTransferCommandHandler(store, observable); + } + + @Test + void type() { + assertThat(handler.getType()).isEqualTo(NotifyTerminatedTransfer.class); + } + + @Test + void shouldTransitionStateToTerminated() { + var process = TransferProcess.Builder.newInstance().id("processId").type(CONSUMER).state(STARTED.code()).build(); + when(store.find(process.getId())).thenReturn(process); + + handler.handle(new NotifyTerminatedTransfer("processId")); + + verify(store).save(argThat(p -> p.currentStateIsOneOf(TERMINATED) && p.getId().equals("processId"))); + verify(listener).terminated(isA(TransferProcess.class)); + } + + @Test + void shouldNotTransitionToTerminated_whenItInAnInvalidState() { + var process = TransferProcess.Builder.newInstance().id("processId").type(CONSUMER).state(TERMINATED.code()).build(); + when(store.find(process.getId())).thenReturn(process); + + handler.handle(new NotifyTerminatedTransfer("processId")); + + verify(store, never()).save(any()); + verifyNoInteractions(listener); + } +} diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/transfer/TransferProcessManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/transfer/TransferProcessManagerImplTest.java index 0a25159cf5f..0369d31797f 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/transfer/TransferProcessManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/transfer/TransferProcessManagerImplTest.java @@ -42,6 +42,7 @@ import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.connector.transfer.spi.types.TransferRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferType; +import org.eclipse.edc.connector.transfer.spi.types.command.TransferProcessCommand; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; @@ -79,6 +80,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.CONSUMER; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.PROVIDER; @@ -131,6 +133,8 @@ class TransferProcessManagerImplTest { private final Vault vault = mock(Vault.class); private final Clock clock = Clock.systemUTC(); private final TransferProcessListener listener = mock(TransferProcessListener.class); + private final CommandQueue commandQueue = mock(CommandQueue.class); + private final CommandRunner commandRunner = mock(CommandRunner.class); private TransferProcessManagerImpl manager; @@ -148,8 +152,8 @@ void setup() { .dispatcherRegistry(dispatcherRegistry) .manifestGenerator(manifestGenerator) .monitor(mock(Monitor.class)) - .commandQueue(mock(CommandQueue.class)) - .commandRunner(mock(CommandRunner.class)) + .commandQueue(commandQueue) + .commandRunner(commandRunner) .typeManager(new TypeManager()) .clock(clock) .statusCheckerRegistry(statusCheckerRegistry) @@ -837,6 +841,26 @@ void dispatchFailure(TransferProcessStates starting, TransferProcessStates endin }); } + @Test + void enqueueCommand_willEnqueueCommandOnCommandQueue() { + var command = new TransferProcessCommand() {}; + + manager.enqueueCommand(command); + + verify(commandQueue).enqueue(command); + } + + @Test + void runCommand_willRunCommandAndReturnResult() { + var command = new TransferProcessCommand() {}; + when(commandRunner.runCommand(command)).thenReturn(Result.success()); + + var result = manager.runCommand(command); + + assertThat(result).matches(Result::succeeded); + verify(commandRunner).runCommand(command); + } + private static class DispatchFailureArguments implements ArgumentsProvider { private static final int RETRIES_NOT_EXHAUSTED = RETRY_LIMIT; diff --git a/data-protocols/ids/ids-api-multipart-dispatcher-v1/src/test/java/org/eclipse/edc/protocol/ids/api/multipart/dispatcher/IdsMultipartRemoteMessageDispatcherTest.java b/data-protocols/ids/ids-api-multipart-dispatcher-v1/src/test/java/org/eclipse/edc/protocol/ids/api/multipart/dispatcher/IdsMultipartRemoteMessageDispatcherTest.java index 006f993a734..6d4804bb06c 100644 --- a/data-protocols/ids/ids-api-multipart-dispatcher-v1/src/test/java/org/eclipse/edc/protocol/ids/api/multipart/dispatcher/IdsMultipartRemoteMessageDispatcherTest.java +++ b/data-protocols/ids/ids-api-multipart-dispatcher-v1/src/test/java/org/eclipse/edc/protocol/ids/api/multipart/dispatcher/IdsMultipartRemoteMessageDispatcherTest.java @@ -35,6 +35,7 @@ void shouldNotSendTransferStartMessage() { var message = TransferStartMessage.Builder.newInstance() .protocol("ids-multipart") .connectorAddress("http://an/address") + .processId("processId") .build(); var future = dispatcher.send(Object.class, message); @@ -48,6 +49,7 @@ void shouldNotSendTransferCompletionMessage() { var message = TransferCompletionMessage.Builder.newInstance() .protocol("ids-multipart") .connectorAddress("http://an/address") + .processId("processId") .build(); var future = dispatcher.send(Object.class, message); diff --git a/data-protocols/ids/ids-api-multipart-endpoint-v1/src/main/java/org/eclipse/edc/protocol/ids/api/multipart/handler/ArtifactRequestHandler.java b/data-protocols/ids/ids-api-multipart-endpoint-v1/src/main/java/org/eclipse/edc/protocol/ids/api/multipart/handler/ArtifactRequestHandler.java index ddaaaee0258..ccf28ffa5a5 100644 --- a/data-protocols/ids/ids-api-multipart-endpoint-v1/src/main/java/org/eclipse/edc/protocol/ids/api/multipart/handler/ArtifactRequestHandler.java +++ b/data-protocols/ids/ids-api-multipart-endpoint-v1/src/main/java/org/eclipse/edc/protocol/ids/api/multipart/handler/ArtifactRequestHandler.java @@ -19,8 +19,7 @@ import de.fraunhofer.iais.eis.ArtifactRequestMessage; import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; -import org.eclipse.edc.connector.transfer.spi.types.TransferRequest; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage; import org.eclipse.edc.protocol.ids.api.multipart.message.MultipartRequest; import org.eclipse.edc.protocol.ids.api.multipart.message.MultipartResponse; import org.eclipse.edc.protocol.ids.spi.types.IdsId; @@ -144,24 +143,19 @@ public boolean canHandle(@NotNull MultipartRequest multipartRequest) { return createMultipartResponse(badParameters(multipartRequest.getHeader(), connectorId)); } - // NB: DO NOT use the asset id provided by the client as that can open aan attack vector where a client references an artifact that - // is different from the one specified by the contract - - var dataRequest = DataRequest.Builder.newInstance() + var requestMessage = TransferRequestMessage.Builder.newInstance() .id(message.getId().toString()) .protocol(MessageProtocol.IDS_MULTIPART) - .dataDestination(dataDestination) .connectorId(connectorId.toString()) + .connectorAddress(idsWebhookAddress) + .dataDestination(dataDestination) + .properties(props) .assetId(contractAgreement.getAssetId()) .contractId(contractAgreement.getId()) - .properties(props) - .connectorAddress(idsWebhookAddress) .build(); - var transferRequest = TransferRequest.Builder.newInstance().dataRequest(dataRequest).build(); - // Initiate a transfer process for the request - var transferInitiateResult = transferProcessService.initiateTransfer(transferRequest, claimToken); + var transferInitiateResult = transferProcessService.notifyRequested(requestMessage, claimToken); // Store secret if process initiated successfully if (transferInitiateResult.succeeded() && artifactRequestMessagePayload.getSecret() != null) { diff --git a/data-protocols/ids/ids-api-multipart-endpoint-v1/src/test/java/org/eclipse/edc/protocol/ids/api/multipart/handler/ArtifactRequestHandlerTest.java b/data-protocols/ids/ids-api-multipart-endpoint-v1/src/test/java/org/eclipse/edc/protocol/ids/api/multipart/handler/ArtifactRequestHandlerTest.java index 87e093a9d10..6ce37608dc0 100644 --- a/data-protocols/ids/ids-api-multipart-endpoint-v1/src/test/java/org/eclipse/edc/protocol/ids/api/multipart/handler/ArtifactRequestHandlerTest.java +++ b/data-protocols/ids/ids-api-multipart-endpoint-v1/src/test/java/org/eclipse/edc/protocol/ids/api/multipart/handler/ArtifactRequestHandlerTest.java @@ -24,7 +24,7 @@ import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService; -import org.eclipse.edc.connector.transfer.spi.types.TransferRequest; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.protocol.ids.api.multipart.message.MultipartRequest; import org.eclipse.edc.protocol.ids.serialization.IdsTypeManagerUtil; @@ -99,23 +99,23 @@ void handleRequestOkTest() throws JsonProcessingException { var multipartRequest = createMultipartRequest(destination, artifactRequestId, assetId, contractId, claimToken); var header = (ArtifactRequestMessage) multipartRequest.getHeader(); - var trCapture = ArgumentCaptor.forClass(TransferRequest.class); - when(transferProcessService.initiateTransfer(trCapture.capture(), eq(claimToken))).thenReturn(ServiceResult.success("Transfer success")); + var trCapture = ArgumentCaptor.forClass(TransferRequestMessage.class); + when(transferProcessService.notifyRequested(trCapture.capture(), eq(claimToken))).thenReturn(ServiceResult.success("Transfer success")); when(contractNegotiationStore.findContractAgreement(contractId)).thenReturn(agreement); handler.handleRequest(multipartRequest); - verify(transferProcessService).initiateTransfer(trCapture.capture(), eq(claimToken)); + verify(transferProcessService).notifyRequested(trCapture.capture(), eq(claimToken)); - var dataRequest = trCapture.getValue().getDataRequest(); + var requestMessage = trCapture.getValue(); - assertThat(dataRequest.getId()).hasToString(artifactRequestId); - assertThat(dataRequest.getDataDestination().getKeyName()).isEqualTo(destination.getKeyName()); - assertThat(dataRequest.getConnectorId()).isEqualTo(connectorId.toString()); - assertThat(dataRequest.getAssetId()).isEqualTo(agreement.getAssetId()); - assertThat(dataRequest.getContractId()).isEqualTo(agreement.getId()); - assertThat(dataRequest.getConnectorAddress()).isEqualTo(header.getProperties().get(IDS_WEBHOOK_ADDRESS_PROPERTY).toString()); - assertThat(dataRequest.getProperties()).containsExactlyEntriesOf(Map.of("foo", "bar")); + assertThat(requestMessage.getId()).hasToString(artifactRequestId); + assertThat(requestMessage.getDataDestination().getKeyName()).isEqualTo(destination.getKeyName()); + assertThat(requestMessage.getConnectorId()).isEqualTo(connectorId.toString()); + assertThat(requestMessage.getAssetId()).isEqualTo(agreement.getAssetId()); + assertThat(requestMessage.getContractId()).isEqualTo(agreement.getId()); + assertThat(requestMessage.getConnectorAddress()).isEqualTo(header.getProperties().get(IDS_WEBHOOK_ADDRESS_PROPERTY).toString()); + assertThat(requestMessage.getProperties()).containsExactlyEntriesOf(Map.of("foo", "bar")); } @Test diff --git a/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java b/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java index bdc4d0f2ed0..ed9720ef353 100644 --- a/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java +++ b/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java @@ -40,9 +40,9 @@ import java.net.URL; import java.util.Map; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; @@ -80,7 +80,9 @@ void shouldCallTransferProcessApiWithComplete(TransferProcessStore store, DataPl when(service.transfer(any())).thenReturn(completedFuture(StatusResult.success())); var id = "tp-id"; store.save(createTransferProcess(id)); - manager.initiateTransfer(createDataFlowRequest(id, callbackUrl.get())); + var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get()); + + manager.initiateTransfer(dataFlowRequest); await().untilAsserted(() -> { var transferProcess = store.find("tp-id"); @@ -94,7 +96,9 @@ void shouldCallTransferProcessApiWithFailed(TransferProcessStore store, DataPlan when(service.transfer(any())).thenReturn(completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, "error"))); var id = "tp-id"; store.save(createTransferProcess(id)); - manager.initiateTransfer(createDataFlowRequest(id, callbackUrl.get())); + var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get()); + + manager.initiateTransfer(dataFlowRequest); await().untilAsserted(() -> { var transferProcess = store.find("tp-id"); @@ -107,10 +111,12 @@ void shouldCallTransferProcessApiWithFailed(TransferProcessStore store, DataPlan @Test void shouldCallTransferProcessApiWithException(TransferProcessStore store, DataPlaneManager manager, ControlPlaneApiUrl callbackUrl) { - when(service.transfer(any())).thenReturn(CompletableFuture.failedFuture(new EdcException("error"))); + when(service.transfer(any())).thenReturn(failedFuture(new EdcException("error"))); var id = "tp-id"; store.save(createTransferProcess(id)); - manager.initiateTransfer(createDataFlowRequest(id, callbackUrl.get())); + var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get()); + + manager.initiateTransfer(dataFlowRequest); await().untilAsserted(() -> { var transferProcess = store.find("tp-id"); @@ -127,6 +133,7 @@ private TransferProcess createTransferProcess(String id) { .state(TransferProcessStates.STARTED.code()) .type(TransferProcess.Type.PROVIDER) .dataRequest(DataRequest.Builder.newInstance() + .id(UUID.randomUUID().toString()) .destinationType("file") .protocol("any") .connectorAddress("http://an/address") @@ -147,6 +154,7 @@ private DataFlowRequest createDataFlowRequest(String processId, URL callbackAddr private static class TransferServiceMockExtension implements ServiceExtension { private final TransferService transferService; + @Inject private TransferServiceRegistry registry; @@ -159,4 +167,5 @@ public void initialize(ServiceExtensionContext context) { registry.registerTransferService(transferService); } } + } diff --git a/extensions/control-plane/api/control-plane-api/src/test/java/org/eclipse/edc/connector/api/TransferProcessControlApiControllerIntegrationTest.java b/extensions/control-plane/api/control-plane-api/src/test/java/org/eclipse/edc/connector/api/TransferProcessControlApiControllerIntegrationTest.java index 37c88e12a17..e6e4bd316ed 100644 --- a/extensions/control-plane/api/control-plane-api/src/test/java/org/eclipse/edc/connector/api/TransferProcessControlApiControllerIntegrationTest.java +++ b/extensions/control-plane/api/control-plane-api/src/test/java/org/eclipse/edc/connector/api/TransferProcessControlApiControllerIntegrationTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import static io.restassured.RestAssured.given; @@ -175,6 +176,7 @@ private TransferProcess.Builder createTransferProcessBuilder() { .state(TransferProcessStates.STARTED.code()) .type(TransferProcess.Type.PROVIDER) .dataRequest(DataRequest.Builder.newInstance() + .id(UUID.randomUUID().toString()) .destinationType("file") .protocol("protocol") .connectorAddress("http://an/address") diff --git a/extensions/control-plane/api/management-api/transfer-process-api/src/test/java/org/eclipse/edc/connector/api/management/transferprocess/TransferProcessApiControllerIntegrationTest.java b/extensions/control-plane/api/management-api/transfer-process-api/src/test/java/org/eclipse/edc/connector/api/management/transferprocess/TransferProcessApiControllerIntegrationTest.java index 0276c159f48..2d48015adb9 100644 --- a/extensions/control-plane/api/management-api/transfer-process-api/src/test/java/org/eclipse/edc/connector/api/management/transferprocess/TransferProcessApiControllerIntegrationTest.java +++ b/extensions/control-plane/api/management-api/transfer-process-api/src/test/java/org/eclipse/edc/connector/api/management/transferprocess/TransferProcessApiControllerIntegrationTest.java @@ -44,10 +44,10 @@ import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; import static org.hamcrest.Matchers.emptyString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.startsWith; @ApiTest @ExtendWith(EdcExtension.class) @@ -204,7 +204,7 @@ void cancel_conflict(TransferProcessStore store) { .post("/transferprocess/" + PROCESS_ID + "/cancel") .then() .statusCode(409) - .body("[0].message", startsWith(format("TransferProcess %s cannot be terminated as it is in state", PROCESS_ID))); + .body("[0].message", endsWith(format("because TransferProcess %s is in state COMPLETED", PROCESS_ID))); } @Test @@ -252,7 +252,7 @@ void terminate_conflict(TransferProcessStore store) { .post("/transferprocess/" + PROCESS_ID + "/terminate") .then() .statusCode(409) - .body("[0].message", startsWith(format("TransferProcess %s cannot be terminated as it is in state", PROCESS_ID))); + .body("[0].message", endsWith(format("because TransferProcess %s is in state COMPLETED", PROCESS_ID))); } @Test @@ -284,7 +284,7 @@ void deprovision_conflict(TransferProcessStore store) { .post("/transferprocess/" + PROCESS_ID + "/deprovision") .then() .statusCode(409) - .body("[0].message", startsWith(format("TransferProcess %s cannot be deprovisioned as it is in state", PROCESS_ID))); + .body("[0].message", endsWith(format("because TransferProcess %s is in state INITIAL", PROCESS_ID))); } @Test diff --git a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/transferprocess/TransferProcessService.java b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/transferprocess/TransferProcessService.java index 8bc12cb16d5..8d254b2d9db 100644 --- a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/transferprocess/TransferProcessService.java +++ b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/transferprocess/TransferProcessService.java @@ -19,6 +19,10 @@ import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.transfer.spi.types.TransferRequest; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; +import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage; import org.eclipse.edc.service.spi.result.ServiceResult; import org.eclipse.edc.spi.iam.ClaimToken; import org.eclipse.edc.spi.query.QuerySpec; @@ -58,15 +62,6 @@ public interface TransferProcessService { @Nullable String getState(String transferProcessId); - /** - * Notifies the TransferProcess that it has been STARTED by the counter-part. - * Only callable on CONSUMER TransferProcess - * - * @param dataRequestId the dataRequestId - * @return a succeeded result if the operation was successful, a failed one otherwise - */ - ServiceResult notifyStarted(String dataRequestId); - /** * Asynchronously requests cancellation of the transfer process. *

@@ -142,17 +137,6 @@ default ServiceResult fail(String transferProcessId, String err @NotNull ServiceResult initiateTransfer(TransferRequest request); - /** - * Initiate transfer request for type provider. - * - * @param transferRequest for the transfer. - * @param claimToken of the requesting participant. - * @return a result that is successful if the transfer process was initiated with id of created transferProcess. - */ - @NotNull - ServiceResult initiateTransfer(TransferRequest transferRequest, ClaimToken claimToken); - - /** * Asynchronously informs the system that the {@link DeprovisionedResource} has been provisioned * @@ -171,4 +155,41 @@ default ServiceResult fail(String transferProcessId, String err */ ServiceResult addProvisionedResource(String transferProcessId, ProvisionResponse response); + /** + * Notifies the TransferProcess that it has been requested by the counter-part. + * + * @param message the incoming message + * @param claimToken the counter-party claim token + * @return a succeeded result if the operation was successful, a failed one otherwise + */ + @NotNull + ServiceResult notifyRequested(TransferRequestMessage message, ClaimToken claimToken); + + /** + * Notifies the TransferProcess that it has been started by the counter-part. + * + * @param message the incoming message + * @param claimToken the counter-party claim token + * @return a succeeded result if the operation was successful, a failed one otherwise + */ + ServiceResult notifyStarted(TransferStartMessage message, ClaimToken claimToken); + + /** + * Notifies the TransferProcess that it has been completed by the counter-part. + * + * @param message the incoming message + * @param claimToken the counter-party claim token + * @return a succeeded result if the operation was successful, a failed one otherwise + */ + ServiceResult notifyCompleted(TransferCompletionMessage message, ClaimToken claimToken); + + /** + * Notifies the TransferProcess that it has been terminated by the counter-part. + * + * @param message the incoming message + * @param claimToken the counter-party claim token + * @return a succeeded result if the operation was successful, a failed one otherwise + */ + ServiceResult notifyTerminated(TransferTerminationMessage message, ClaimToken claimToken); + } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/TransferProcessManager.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/TransferProcessManager.java index fd076e1f43a..f3602debd00 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/TransferProcessManager.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/TransferProcessManager.java @@ -18,6 +18,7 @@ import org.eclipse.edc.connector.transfer.spi.types.command.TransferProcessCommand; import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; /** * Manages data transfer processes. Currently synchronous and asynchronous data transfers are supported. @@ -40,4 +41,9 @@ public interface TransferProcessManager { */ void enqueueCommand(TransferProcessCommand command); + /** + * Run a command syncronously + */ + Result runCommand(TransferProcessCommand command); + } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyStartedTransferCommand.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyCompletedTransfer.java similarity index 71% rename from spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyStartedTransferCommand.java rename to spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyCompletedTransfer.java index 685436848b7..cf0a60586c3 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyStartedTransferCommand.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyCompletedTransfer.java @@ -15,11 +15,11 @@ package org.eclipse.edc.connector.transfer.spi.types.command; /** - * Notifies that a Transfer has been started on the counter-party side + * Notifies that a TransferProcess has been completed by the counter-party */ -public class NotifyStartedTransferCommand extends SingleTransferProcessCommand { +public class NotifyCompletedTransfer extends SingleTransferProcessCommand { - public NotifyStartedTransferCommand(String transferProcessId) { + public NotifyCompletedTransfer(String transferProcessId) { super(transferProcessId); } } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyStartedTransfer.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyStartedTransfer.java new file mode 100644 index 00000000000..ff4b405e9f3 --- /dev/null +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyStartedTransfer.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.spi.types.command; + +/** + * Notifies that a TransferProcess has been started by the counter-party + */ +public class NotifyStartedTransfer extends SingleTransferProcessCommand { + + public NotifyStartedTransfer(String transferProcessId) { + super(transferProcessId); + } +} diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyTerminatedTransfer.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyTerminatedTransfer.java new file mode 100644 index 00000000000..8d734f3073a --- /dev/null +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/command/NotifyTerminatedTransfer.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.spi.types.command; + +/** + * Notifies that a TransferProcess has been terminated by the counter-party + */ +public class NotifyTerminatedTransfer extends SingleTransferProcessCommand { + + public NotifyTerminatedTransfer(String transferProcessId) { + super(transferProcessId); + } +} diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferCompletionMessage.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferCompletionMessage.java index 17f2101565b..83a7685653f 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferCompletionMessage.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferCompletionMessage.java @@ -16,7 +16,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; -import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import java.util.Objects; @@ -25,10 +24,11 @@ * that some data plane implementations may optimize completion notification by performing it as part of its wire * protocol. In those cases, a {@link TransferCompletionMessage} message does not need to be sent. */ -public class TransferCompletionMessage implements RemoteMessage { +public class TransferCompletionMessage implements TransferRemoteMessage { private String connectorAddress; private String protocol; + private String processId; @Override public String getProtocol() { @@ -40,6 +40,10 @@ public String getConnectorAddress() { return connectorAddress; } + public String getProcessId() { + return processId; + } + @JsonPOJOBuilder(withPrefix = "") public static class Builder { private final TransferCompletionMessage message; @@ -63,9 +67,15 @@ public Builder protocol(String protocol) { return this; } + public Builder processId(String processId) { + message.processId = processId; + return this; + } + public TransferCompletionMessage build() { Objects.requireNonNull(message.protocol, "The protocol must be specified"); Objects.requireNonNull(message.connectorAddress, "The connectorAddress must be specified"); + Objects.requireNonNull(message.processId, "The processId must be specified"); return message; } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferRemoteMessage.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferRemoteMessage.java new file mode 100644 index 00000000000..cce95417f29 --- /dev/null +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferRemoteMessage.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.spi.types.protocol; + +import org.eclipse.edc.spi.types.domain.message.RemoteMessage; + +/** + * A remote message related to the TransferProcess context + */ +public interface TransferRemoteMessage extends RemoteMessage { + + /** + * Returns the process id. + * + * @return the processId property. + */ + String getProcessId(); +} diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferRequestMessage.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferRequestMessage.java index 726b615ae3c..85d5d22f297 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferRequestMessage.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferRequestMessage.java @@ -17,7 +17,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import java.util.HashMap; import java.util.Map; @@ -26,7 +25,7 @@ /** * The {@link TransferRequestMessage} is sent by a consumer to initiate a transfer process. */ -public class TransferRequestMessage implements RemoteMessage { +public class TransferRequestMessage implements TransferRemoteMessage { private String connectorAddress; private String protocol; @@ -47,6 +46,11 @@ public String getConnectorAddress() { return connectorAddress; } + @Override + public String getProcessId() { + return id; + } + public String getAssetId() { return assetId; } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferStartMessage.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferStartMessage.java index 1ad5f664b4f..2bc4e8dc836 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferStartMessage.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferStartMessage.java @@ -16,17 +16,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; -import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import java.util.Objects; /** * The {@link TransferStartMessage} is sent by the provider to indicate the asset transfer has been initiated. */ -public class TransferStartMessage implements RemoteMessage { +public class TransferStartMessage implements TransferRemoteMessage { private String connectorAddress; private String protocol; + private String processId; @Override public String getProtocol() { @@ -38,6 +38,11 @@ public String getConnectorAddress() { return connectorAddress; } + @Override + public String getProcessId() { + return processId; + } + @JsonPOJOBuilder(withPrefix = "") public static class Builder { private final TransferStartMessage message; @@ -61,9 +66,15 @@ public Builder protocol(String protocol) { return this; } + public Builder processId(String processId) { + message.processId = processId; + return this; + } + public TransferStartMessage build() { Objects.requireNonNull(message.protocol, "The protocol must be specified"); Objects.requireNonNull(message.connectorAddress, "The connectorAddress must be specified"); + Objects.requireNonNull(message.processId, "The processId must be specified"); return message; } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferTerminationMessage.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferTerminationMessage.java index 511bfb3cc85..a6d51e96b0c 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferTerminationMessage.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/protocol/TransferTerminationMessage.java @@ -16,7 +16,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; -import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import java.util.Objects; @@ -25,10 +24,11 @@ * indicate the data transfer process should stop and be placed in a terminal state. If the termination was due to an * error, the sender may include error information. */ -public class TransferTerminationMessage implements RemoteMessage { +public class TransferTerminationMessage implements TransferRemoteMessage { private String connectorAddress; private String protocol; + private String processId; @Override public String getProtocol() { @@ -40,6 +40,11 @@ public String getConnectorAddress() { return connectorAddress; } + @Override + public String getProcessId() { + return processId; + } + @JsonPOJOBuilder(withPrefix = "") public static class Builder { private final TransferTerminationMessage message; @@ -63,11 +68,15 @@ public Builder protocol(String protocol) { return this; } + public Builder processId(String processId) { + message.processId = processId; + return this; + } + public TransferTerminationMessage build() { Objects.requireNonNull(message.protocol, "The protocol must be specified"); Objects.requireNonNull(message.connectorAddress, "The connectorAddress must be specified"); return message; } - } }