Skip to content

Commit

Permalink
Add missing notify methods on service
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Apr 3, 2023
1 parent dd272c3 commit 3b3bb3d
Show file tree
Hide file tree
Showing 28 changed files with 848 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionedContentResource;
Expand All @@ -33,13 +34,23 @@
import org.eclipse.edc.connector.transfer.spi.types.command.AddProvisionedResourceCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionCompleteCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.NotifyCompletedTransfer;
import org.eclipse.edc.connector.transfer.spi.types.command.NotifyStartedTransfer;
import org.eclipse.edc.connector.transfer.spi.types.command.NotifyTerminatedTransfer;
import org.eclipse.edc.connector.transfer.spi.types.command.SingleTransferProcessCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRemoteMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.dataaddress.DataAddressValidator;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.AbstractResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
import org.eclipse.edc.spi.types.domain.transfer.command.CompleteTransferCommand;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.jetbrains.annotations.NotNull;
Expand All @@ -48,6 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -98,28 +110,19 @@ public ServiceResult<Stream<TransferProcess>> query(QuerySpec query) {
});
}

@Override
public ServiceResult<TransferProcess> notifyStarted(String dataRequestId) {
return transactionContext.execute(() -> Optional.of(dataRequestId)
.map(transferProcessStore::processIdForDataRequestId)
.map(id -> apply(id, this::startedImpl))
.orElse(ServiceResult.notFound(format("TransferProcess with DataRequest id %s not found", dataRequestId)))
);
}

@Override
public @NotNull ServiceResult<TransferProcess> terminate(String transferProcessId, String reason) {
return apply(transferProcessId, transferProcess -> terminateImpl(transferProcess, reason));
return transactionContext.execute(() -> runAsync(new TerminateTransferCommand(transferProcessId, reason)));
}

@Override
public @NotNull ServiceResult<TransferProcess> complete(String transferProcessId) {
return apply(transferProcessId, this::completeImpl);
return transactionContext.execute(() -> runAsync(new CompleteTransferCommand(transferProcessId)));
}

@Override
public @NotNull ServiceResult<TransferProcess> deprovision(String transferProcessId) {
return apply(transferProcessId, this::deprovisionImpl);
return transactionContext.execute(() -> runAsync(new DeprovisionRequest(transferProcessId)));
}

@Override
Expand All @@ -140,30 +143,97 @@ public ServiceResult<TransferProcess> notifyStarted(String dataRequestId) {
}

@Override
public @NotNull ServiceResult<String> initiateTransfer(TransferRequest request, ClaimToken claimToken) {
var validDestination = dataAddressValidator.validate(request.getDataRequest().getDataDestination());
public ServiceResult<TransferProcess> completeDeprovision(String transferProcessId, DeprovisionedResource resource) {
return transactionContext.execute(() -> runAsync(new DeprovisionCompleteCommand(transferProcessId, resource)));
}

@Override
public ServiceResult<TransferProcess> addProvisionedResource(String transferProcessId, ProvisionResponse response) {
return transactionContext.execute(() -> runAsync(new AddProvisionedResourceCommand(transferProcessId, response)));
}

@Override
public @NotNull ServiceResult<String> notifyRequested(TransferRequestMessage message, ClaimToken claimToken) {
var validDestination = dataAddressValidator.validate(message.getDataDestination());
if (validDestination.failed()) {
return ServiceResult.badRequest(validDestination.getFailureMessages().toArray(new String[]{}));
}

var dataRequest = DataRequest.Builder.newInstance()
.id(message.getId())
.protocol(message.getProtocol())
.connectorId(message.getConnectorId())
.connectorAddress(message.getConnectorAddress())
.dataDestination(message.getDataDestination())
.properties(message.getProperties())
.assetId(message.getAssetId())
.contractId(message.getContractId())
.build();

var transferRequest = TransferRequest.Builder.newInstance().dataRequest(dataRequest).build();

return transactionContext.execute(() ->
Optional.ofNullable(negotiationStore.findContractAgreement(request.getDataRequest().getContractId()))
Optional.ofNullable(negotiationStore.findContractAgreement(message.getContractId()))
.filter(agreement -> contractValidationService.validateAgreement(claimToken, agreement).succeeded())
.map(agreement -> manager.initiateProviderRequest(request))
.map(agreement -> manager.initiateProviderRequest(transferRequest))
.filter(AbstractResult::succeeded)
.map(AbstractResult::getContent)
.map(ServiceResult::success)
.orElse(ServiceResult.conflict("Request couldn't be initialised.")));
}

@Override
public ServiceResult<TransferProcess> completeDeprovision(String transferProcessId, DeprovisionedResource resource) {
return apply(transferProcessId, completeDeprovisionImpl(resource));
public ServiceResult<TransferProcess> notifyStarted(TransferStartMessage message, ClaimToken claimToken) {
return transactionContext.execute(() -> handleRemoteMessage(message));
}

@Override
public ServiceResult<TransferProcess> addProvisionedResource(String transferProcessId, ProvisionResponse response) {
return apply(transferProcessId, addProvisionedResourceImpl(response));
public ServiceResult<TransferProcess> notifyCompleted(TransferCompletionMessage message, ClaimToken claimToken) {
return transactionContext.execute(() -> handleRemoteMessage(message));
}

@Override
public ServiceResult<TransferProcess> notifyTerminated(TransferTerminationMessage message, ClaimToken claimToken) {
return transactionContext.execute(() -> handleRemoteMessage(message));
}

@NotNull
private ServiceResult<TransferProcess> handleRemoteMessage(TransferRemoteMessage message) {
return Optional.of(message.getProcessId())
.map(transferProcessStore::processIdForDataRequestId)
.map(id -> messageToCommandFactories.get(message.getClass()).apply(id))
.map(this::runSync)
.orElse(ServiceResult.notFound(format("TransferProcess with DataRequest id %s not found", message.getProcessId())));
}

private ServiceResult<TransferProcess> runAsync(SingleTransferProcessCommand command) {
return Optional.of(command.getTransferProcessId())
.map(transferProcessStore::find)
.map(transferProcess -> {
var validator = asyncCommandValidators.get(command.getClass());
var validationResult = validator.apply(command, transferProcess);
if (validationResult.failed()) {
return ServiceResult.<TransferProcess>conflict(format("Cannot %s because %s", command.getClass().getSimpleName(), validationResult.getFailureDetail()));
}

manager.enqueueCommand(command);
return ServiceResult.success(transferProcess);
})
.orElse(ServiceResult.notFound(format("TransferProcess with id %s not found", command.getTransferProcessId())));
}

private ServiceResult<TransferProcess> runSync(SingleTransferProcessCommand command) {
return Optional.of(command.getTransferProcessId())
.map(transferProcessStore::find)
.map(transferProcess -> {
var commandResult = manager.runCommand(command);
if (commandResult.succeeded()) {
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.<TransferProcess>conflict(format("Cannot %s because %s", command.getClass().getSimpleName(), commandResult.getFailureDetail()));
}
})
.orElse(ServiceResult.notFound(format("TransferProcess with id %s not found", command.getTransferProcessId())));
}

private Map<Class<?>, List<Class<?>>> getSubtypes() {
Expand All @@ -173,63 +243,34 @@ private Map<Class<?>, List<Class<?>>> getSubtypes() {
);
}

private ServiceResult<TransferProcess> apply(String transferProcessId, Function<TransferProcess, ServiceResult<TransferProcess>> function) {
return transactionContext.execute(() -> {
var transferProcess = transferProcessStore.find(transferProcessId);
return Optional.ofNullable(transferProcess)
.map(function)
.orElse(ServiceResult.notFound(format("TransferProcess %s does not exist", transferProcessId)));
});
}
private final Map<Class<? extends RemoteMessage>, MessageToCommandFactory> messageToCommandFactories = Map.of(
TransferStartMessage.class, NotifyStartedTransfer::new,
TransferCompletionMessage.class, NotifyCompletedTransfer::new,
TransferTerminationMessage.class, NotifyTerminatedTransfer::new
);

private ServiceResult<TransferProcess> terminateImpl(TransferProcess transferProcess, String reason) {
if (transferProcess.canBeTerminated()) {
manager.enqueueCommand(new TerminateTransferCommand(transferProcess.getId(), reason));
private final Map<Class<? extends SingleTransferProcessCommand>, CommandValidator> asyncCommandValidators = Map.of(
TerminateTransferCommand.class,
(command, transferProcess) -> transferProcess.canBeTerminated()
? Result.success()
: Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))),

return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("TransferProcess %s cannot be terminated as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState())));
}
}
org.eclipse.edc.spi.types.domain.transfer.command.CompleteTransferCommand.class,
(command, transferProcess) -> transferProcess.canBeCompleted()
? Result.success()
: Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))),

private ServiceResult<TransferProcess> deprovisionImpl(TransferProcess transferProcess) {
if (transferProcess.canBeDeprovisioned()) {
manager.enqueueCommand(new DeprovisionRequest(transferProcess.getId()));
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("TransferProcess %s cannot be deprovisioned as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState())));
}
}
DeprovisionRequest.class,
(command, transferProcess) -> transferProcess.canBeDeprovisioned()
? Result.success()
: Result.failure(format("TransferProcess %s is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState()))),

private ServiceResult<TransferProcess> completeImpl(TransferProcess transferProcess) {
if (transferProcess.canBeCompleted()) {
manager.enqueueCommand(new CompleteTransferCommand(transferProcess.getId()));
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("TransferProcess %s cannot be completed as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState())));
}
}
DeprovisionCompleteCommand.class, (command, transferProcess) -> Result.success(),

private ServiceResult<TransferProcess> startedImpl(TransferProcess transferProcess) {
if (transferProcess.canBeStartedConsumer()) {
manager.enqueueCommand(new NotifyStartedTransferCommand(transferProcess.getId()));
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("TransferProcess %s cannot be started as it is in state %s", transferProcess.getId(), TransferProcessStates.from(transferProcess.getState())));
}
}
AddProvisionedResourceCommand.class, (command, transferProcess) -> Result.success()
);

private Function<TransferProcess, ServiceResult<TransferProcess>> completeDeprovisionImpl(DeprovisionedResource resource) {
return (transferProcess -> {
manager.enqueueCommand(new DeprovisionCompleteCommand(transferProcess.getId(), resource));
return ServiceResult.success(transferProcess);
});
}
private interface MessageToCommandFactory extends Function<String, SingleTransferProcessCommand> { }

private Function<TransferProcess, ServiceResult<TransferProcess>> addProvisionedResourceImpl(ProvisionResponse response) {
return (transferProcess -> {
manager.enqueueCommand(new AddProvisionedResourceCommand(transferProcess.getId(), response));
return ServiceResult.success(transferProcess);
});
}
private interface CommandValidator extends BiFunction<SingleTransferProcessCommand, TransferProcess, Result<Void>> {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.connector.transfer.spi.retry.TransferWaitStrategy;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.event.EventSubscriber;
Expand All @@ -31,6 +32,7 @@
import org.eclipse.edc.spi.event.transferprocess.TransferProcessRequested;
import org.eclipse.edc.spi.event.transferprocess.TransferProcessStarted;
import org.eclipse.edc.spi.event.transferprocess.TransferProcessTerminated;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.message.RemoteMessageDispatcher;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -101,7 +103,8 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se
verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessRequested.class)));
});

service.notifyStarted("dataRequestId");
var startMessage = TransferStartMessage.Builder.newInstance().processId("dataRequestId").protocol("any").connectorAddress("http://any").build();
service.notifyStarted(startMessage, ClaimToken.Builder.newInstance().build());

await().untilAsserted(() -> {
verify(eventSubscriber).on(argThat(isEnvelopeOf(TransferProcessStarted.class)));
Expand Down
Loading

0 comments on commit 3b3bb3d

Please sign in to comment.