Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dsp): Add missing notify methods on TransferProcessService #2661

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionedContentResource;
Expand All @@ -33,13 +34,23 @@
import org.eclipse.edc.connector.transfer.spi.types.command.AddProvisionedResourceCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionCompleteCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.NotifyCompletedTransfer;
import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransfer;
import org.eclipse.edc.connector.transfer.spi.types.command.NotifyTerminatedTransfer;
import org.eclipse.edc.connector.transfer.spi.types.command.SingleTransferProcessCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRemoteMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.dataaddress.DataAddressValidator;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.AbstractResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
import org.eclipse.edc.spi.types.domain.transfer.command.CompleteTransferCommand;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.jetbrains.annotations.NotNull;
Expand All @@ -48,6 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -98,28 +110,19 @@ public ServiceResult<Stream<TransferProcess>> query(QuerySpec query) {
});
}

@Override
public ServiceResult<TransferProcess> notifyStarted(String dataRequestId) {
return transactionContext.execute(() -> Optional.of(dataRequestId)
.map(transferProcessStore::processIdForDataRequestId)
.map(id -> apply(id, this::startedImpl))
.orElse(ServiceResult.notFound(format("TransferProcess with DataRequest id %s not found", dataRequestId)))
);
}

@Override
public @NotNull ServiceResult<TransferProcess> terminate(String transferProcessId, String reason) {
return apply(transferProcessId, transferProcess -> terminateImpl(transferProcess, reason));
return transactionContext.execute(() -> runAsync(new TerminateTransferCommand(transferProcessId, reason)));
}

@Override
public @NotNull ServiceResult<TransferProcess> complete(String transferProcessId) {
return apply(transferProcessId, this::completeImpl);
return transactionContext.execute(() -> runAsync(new CompleteTransferCommand(transferProcessId)));
}

@Override
public @NotNull ServiceResult<TransferProcess> deprovision(String transferProcessId) {
return apply(transferProcessId, this::deprovisionImpl);
return transactionContext.execute(() -> runAsync(new DeprovisionRequest(transferProcessId)));
}

@Override
Expand All @@ -140,30 +143,97 @@ public ServiceResult<TransferProcess> notifyStarted(String dataRequestId) {
}

@Override
public @NotNull ServiceResult<String> initiateTransfer(TransferRequest request, ClaimToken claimToken) {
var validDestination = dataAddressValidator.validate(request.getDataRequest().getDataDestination());
public ServiceResult<TransferProcess> completeDeprovision(String transferProcessId, DeprovisionedResource resource) {
return transactionContext.execute(() -> runAsync(new DeprovisionCompleteCommand(transferProcessId, resource)));
}

@Override
public ServiceResult<TransferProcess> addProvisionedResource(String transferProcessId, ProvisionResponse response) {
return transactionContext.execute(() -> runAsync(new AddProvisionedResourceCommand(transferProcessId, response)));
}

@Override
public @NotNull ServiceResult<String> notifyRequested(TransferRequestMessage message, ClaimToken claimToken) {
var validDestination = dataAddressValidator.validate(message.getDataDestination());
if (validDestination.failed()) {
return ServiceResult.badRequest(validDestination.getFailureMessages().toArray(new String[]{}));
}

var dataRequest = DataRequest.Builder.newInstance()
.id(message.getId())
.protocol(message.getProtocol())
.connectorId(message.getConnectorId())
.connectorAddress(message.getConnectorAddress())
.dataDestination(message.getDataDestination())
.properties(message.getProperties())
.assetId(message.getAssetId())
.contractId(message.getContractId())
.build();

var transferRequest = TransferRequest.Builder.newInstance().dataRequest(dataRequest).build();

return transactionContext.execute(() ->
Optional.ofNullable(negotiationStore.findContractAgreement(request.getDataRequest().getContractId()))
Optional.ofNullable(negotiationStore.findContractAgreement(message.getContractId()))
.filter(agreement -> contractValidationService.validateAgreement(claimToken, agreement).succeeded())
.map(agreement -> manager.initiateProviderRequest(request))
.map(agreement -> manager.initiateProviderRequest(transferRequest))
.filter(AbstractResult::succeeded)
.map(AbstractResult::getContent)
.map(ServiceResult::success)
.orElse(ServiceResult.conflict("Request couldn't be initialised.")));
}

@Override
public ServiceResult<TransferProcess> completeDeprovision(String transferProcessId, DeprovisionedResource resource) {
return apply(transferProcessId, completeDeprovisionImpl(resource));
public ServiceResult<TransferProcess> notifyStarted(TransferStartMessage message, ClaimToken claimToken) {
return transactionContext.execute(() -> handleRemoteMessage(message));
}

@Override
public ServiceResult<TransferProcess> addProvisionedResource(String transferProcessId, ProvisionResponse response) {
return apply(transferProcessId, addProvisionedResourceImpl(response));
public ServiceResult<TransferProcess> notifyCompleted(TransferCompletionMessage message, ClaimToken claimToken) {
return transactionContext.execute(() -> handleRemoteMessage(message));
}

@Override
public ServiceResult<TransferProcess> notifyTerminated(TransferTerminationMessage message, ClaimToken claimToken) {
return transactionContext.execute(() -> handleRemoteMessage(message));
}

@NotNull
private ServiceResult<TransferProcess> handleRemoteMessage(TransferRemoteMessage message) {
return Optional.of(message.getProcessId())
.map(transferProcessStore::processIdForDataRequestId)
.map(id -> messageToCommandFactories.get(message.getClass()).apply(id))
.map(this::runSync)
.orElse(ServiceResult.notFound(format("TransferProcess with DataRequest id %s not found", message.getProcessId())));
}

private ServiceResult<TransferProcess> runAsync(SingleTransferProcessCommand command) {
return Optional.of(command.getTransferProcessId())
.map(transferProcessStore::find)
.map(transferProcess -> {
var validator = asyncCommandValidators.get(command.getClass());
var validationResult = validator.apply(command, transferProcess);
if (validationResult.failed()) {
return ServiceResult.<TransferProcess>conflict(format("Cannot %s because %s", command.getClass().getSimpleName(), validationResult.getFailureDetail()));
}

manager.enqueueCommand(command);
return ServiceResult.success(transferProcess);
})
.orElse(ServiceResult.notFound(format("TransferProcess with id %s not found", command.getTransferProcessId())));
}

private ServiceResult<TransferProcess> runSync(SingleTransferProcessCommand command) {
return Optional.of(command.getTransferProcessId())
.map(transferProcessStore::find)
.map(transferProcess -> {
var commandResult = manager.runCommand(command);
if (commandResult.succeeded()) {
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.<TransferProcess>conflict(format("Cannot %s because %s", command.getClass().getSimpleName(), commandResult.getFailureDetail()));
}
})
.orElse(ServiceResult.notFound(format("TransferProcess with id %s not found", command.getTransferProcessId())));
}

private Map<Class<?>, List<Class<?>>> getSubtypes() {
Expand All @@ -173,63 +243,34 @@ private Map<Class<?>, List<Class<?>>> getSubtypes() {
);
}

private ServiceResult<TransferProcess> apply(String transferProcessId, Function<TransferProcess, ServiceResult<TransferProcess>> function) {
return transactionContext.execute(() -> {
var transferProcess = transferProcessStore.find(transferProcessId);
return Optional.ofNullable(transferProcess)
.map(function)
.orElse(ServiceResult.notFound(format("TransferProcess %s does not exist", transferProcessId)));
});
}
private final Map<Class<? extends RemoteMessage>, MessageToCommandFactory> messageToCommandFactories = Map.of(
TransferStartMessage.class, NotifyStartedTransfer::new,
TransferCompletionMessage.class, NotifyCompletedTransfer::new,
TransferTerminationMessage.class, NotifyTerminatedTransfer::new
);

private ServiceResult<TransferProcess> terminateImpl(TransferProcess transferProcess, String reason) {
if (transferProcess.canBeTerminated()) {
manager.enqueueCommand(new TerminateTransferCommand(transferProcess.getId(), reason));
private final Map<Class<? extends SingleTransferProcessCommand>, CommandValidator> asyncCommandValidators = Map.of(
TerminateTransferCommand.class,
(command, transferProcess) -> transferProcess.canBeTerminated()
? Result.success()
: Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))),

return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("TransferProcess %s cannot be terminated as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState())));
}
}
org.eclipse.edc.spi.types.domain.transfer.command.CompleteTransferCommand.class,
(command, transferProcess) -> transferProcess.canBeCompleted()
? Result.success()
: Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))),

private ServiceResult<TransferProcess> deprovisionImpl(TransferProcess transferProcess) {
if (transferProcess.canBeDeprovisioned()) {
manager.enqueueCommand(new DeprovisionRequest(transferProcess.getId()));
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("TransferProcess %s cannot be deprovisioned as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState())));
}
}
DeprovisionRequest.class,
(command, transferProcess) -> transferProcess.canBeDeprovisioned()
? Result.success()
: Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))),

private ServiceResult<TransferProcess> completeImpl(TransferProcess transferProcess) {
if (transferProcess.canBeCompleted()) {
manager.enqueueCommand(new CompleteTransferCommand(transferProcess.getId()));
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("TransferProcess %s cannot be completed as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState())));
}
}
DeprovisionCompleteCommand.class, (command, transferProcess) -> Result.success(),

private ServiceResult<TransferProcess> startedImpl(TransferProcess transferProcess) {
if (transferProcess.canBeStartedConsumer()) {
manager.enqueueCommand(new NotifyStartedTransferCommand(transferProcess.getId()));
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("TransferProcess %s cannot be started as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState())));
}
}
AddProvisionedResourceCommand.class, (command, transferProcess) -> Result.success()
);

private Function<TransferProcess, ServiceResult<TransferProcess>> completeDeprovisionImpl(DeprovisionedResource resource) {
return (transferProcess -> {
manager.enqueueCommand(new DeprovisionCompleteCommand(transferProcess.getId(), resource));
return ServiceResult.success(transferProcess);
});
}
private interface MessageToCommandFactory extends Function<String, SingleTransferProcessCommand> { }

private Function<TransferProcess, ServiceResult<TransferProcess>> addProvisionedResourceImpl(ProvisionResponse response) {
return (transferProcess -> {
manager.enqueueCommand(new AddProvisionedResourceCommand(transferProcess.getId(), response));
return ServiceResult.success(transferProcess);
});
}
private interface CommandValidator extends BiFunction<SingleTransferProcessCommand, TransferProcess, Result<Void>> {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.connector.transfer.spi.retry.TransferWaitStrategy;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.event.EventSubscriber;
Expand All @@ -31,6 +32,7 @@
import org.eclipse.edc.spi.event.transferprocess.TransferProcessRequested;
import org.eclipse.edc.spi.event.transferprocess.TransferProcessStarted;
import org.eclipse.edc.spi.event.transferprocess.TransferProcessTerminated;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.message.RemoteMessageDispatcher;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -101,7 +103,8 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se
verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessRequested.class)));
});

service.notifyStarted("dataRequestId");
var startMessage = TransferStartMessage.Builder.newInstance().processId("dataRequestId").protocol("any").connectorAddress("http://any").build();
service.notifyStarted(startMessage, ClaimToken.Builder.newInstance().build());

await().untilAsserted(() -> {
verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessStarted.class)));
Expand Down
Loading