diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index 7c07ecaf035..a529e32577b 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -41,6 +41,7 @@ import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED; +import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED; import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; @@ -119,7 +120,7 @@ public StatusResult terminate(String dataFlowId) { if (terminateResult.failed()) { return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail())); } - dataFlow.transitToCompleted(); + dataFlow.transitToTerminated(); store.save(dataFlow); return StatusResult.success(); } else { @@ -140,6 +141,10 @@ private boolean processReceived(DataFlow dataFlow) { return entityRetryProcessFactory.doAsyncProcess(dataFlow, () -> transferService.transfer(request)) .entityRetrieve(id -> store.findById(id)) .onSuccess((f, r) -> { + if (f.getState() == TERMINATED.code()) { + return; + } + if (r.succeeded()) { f.transitToCompleted(); } else { diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index 5a2a7a92720..158b1aa706c 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -46,6 +46,7 @@ import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.NOTIFIED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED; +import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY; @@ -146,7 +147,7 @@ void terminate_shouldTerminateDataFlow() { var result = manager.terminate("dataFlowId"); assertThat(result).isSucceeded(); - verify(store).save(argThat(d -> d.getState() == COMPLETED.code())); + verify(store).save(argThat(d -> d.getState() == TERMINATED.code())); verify(transferService).terminate(dataFlow); } @@ -215,6 +216,24 @@ void received_shouldStartTransferAndTransitionToCompleted_whenTransferSucceeds() }); } + @Test + void received_shouldStartTransferAndNotTransitionToCompleted_whenTransferSucceedsBecauseItsTermination() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + var terminatedDataFlow = dataFlowBuilder().state(TERMINATED.code()).build(); + when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList()); + when(store.findById(any())).thenReturn(terminatedDataFlow); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.canHandle(any())).thenReturn(true); + when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success())); + + manager.start(); + + await().untilAsserted(() -> { + verify(transferService).transfer(isA(DataFlowRequest.class)); + verify(store, never()).save(any()); + }); + } + @Test void received_shouldStartTransferAndTransitionToFailed_whenTransferFails() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java index 91bcf7be449..053fe2096b0 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java @@ -28,6 +28,7 @@ import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.NOTIFIED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED; +import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED; /** * Entity that represent a Data Plane Transfer Flow @@ -108,6 +109,10 @@ public void transitToNotified() { transitionTo(NOTIFIED.code()); } + public void transitToTerminated() { + transitionTo(TERMINATED.code()); + } + @JsonPOJOBuilder(withPrefix = "") public static class Builder extends StatefulEntity.Builder { diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlowStates.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlowStates.java index 88c085a444d..6fcfd604bbb 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlowStates.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlowStates.java @@ -24,6 +24,7 @@ public enum DataFlowStates { NOT_TRACKED(0), RECEIVED(100), COMPLETED(200), + TERMINATED(250), FAILED(300), NOTIFIED(400); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java index d3353944e41..4b8fa2f4658 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.eclipse.edc.test.e2e.annotations.KafkaIntegrationTest; import org.eclipse.edc.test.e2e.participant.EndToEndTransferParticipant; @@ -45,13 +46,13 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.validation.constraints.NotNull; import static java.lang.String.format; import static java.time.Duration.ZERO; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED; @@ -136,12 +137,12 @@ void kafkaToHttpTransfer() throws JsonProcessingException { await().atMost(TIMEOUT).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(TERMINATED.name()); + assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED); }); destinationServer.clear(request) .when(request).respond(response()); - await().pollDelay(2, TimeUnit.SECONDS).atMost(TIMEOUT).untilAsserted(() -> { + await().pollDelay(2, SECONDS).atMost(TIMEOUT).untilAsserted(() -> { destinationServer.verify(request, never()); }); @@ -167,11 +168,11 @@ void kafkaToKafkaTransfer() { await().atMost(TIMEOUT).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(TERMINATED.name()); + assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED); }); consumer.poll(ZERO); - await().pollDelay(5, TimeUnit.SECONDS).atMost(TIMEOUT).untilAsserted(() -> { + await().pollDelay(5, SECONDS).atMost(TIMEOUT).untilAsserted(() -> { var recordsFound = consumer.poll(Duration.ofSeconds(1)).records(SINK_TOPIC); assertThat(recordsFound).isEmpty(); });