From 7eb2ef7d895e7bd99c591f583880e30310e716a6 Mon Sep 17 00:00:00 2001 From: andrea bertagnolli Date: Tue, 10 Dec 2024 16:19:49 +0100 Subject: [PATCH] fix: deprovision phase for provision-http (#4660) --- .../test/system/utils/Participant.java | 16 ++++ .../http/impl/HttpProviderProvisioner.java | 11 +-- .../http/impl/HttpProvisionerRequest.java | 4 + .../impl/HttpProviderProvisionerTest.java | 71 +++++++--------- .../test/e2e/TransferPullEndToEndTest.java | 81 ++++++++++--------- 5 files changed, 96 insertions(+), 87 deletions(-) diff --git a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java index d05c6e59bde..480f0de4945 100644 --- a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java +++ b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java @@ -489,6 +489,22 @@ public void resumeTransfer(String id) { .statusCode(204); } + public void terminateTransfer(String id) { + var requestBodyBuilder = createObjectBuilder() + .add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE)) + .add(TYPE, "TerminateTransfer") + .add("reason", "any reason"); + + managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBodyBuilder.build()) + .when() + .post("/v3/transferprocesses/{id}/terminate", id) + .then() + .log().ifError() + .statusCode(204); + } + /** * Get current state of a contract negotiation. * diff --git a/extensions/control-plane/provision/provision-http/src/main/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProviderProvisioner.java b/extensions/control-plane/provision/provision-http/src/main/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProviderProvisioner.java index c01825b37e3..bd96444630d 100644 --- a/extensions/control-plane/provision/provision-http/src/main/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProviderProvisioner.java +++ b/extensions/control-plane/provision/provision-http/src/main/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProviderProvisioner.java @@ -75,13 +75,12 @@ public HttpProviderProvisioner(ProvisionerConfiguration configuration, @Override public boolean canProvision(ResourceDefinition resourceDefinition) { - return resourceDefinition instanceof HttpProviderResourceDefinition && dataAddressType.equals(((HttpProviderResourceDefinition) resourceDefinition).getDataAddressType()); + return resourceDefinition instanceof HttpProviderResourceDefinition definition && dataAddressType.equals(definition.getDataAddressType()); } @Override public boolean canDeprovision(ProvisionedResource provisionedResource) { - return provisionedResource instanceof HttpProvisionedContentResource && - dataAddressType.equals(((HttpProvisionedContentResource) provisionedResource).getDataAddress().getType()); + return provisionedResource instanceof HttpProvisionedContentResource; } @Override @@ -100,10 +99,8 @@ public CompletableFuture> provision(HttpProvider if (response.isSuccessful()) { return completedFuture(StatusResult.success(ProvisionResponse.Builder.newInstance().inProcess(true).build())); } else if (response.code() >= 500 && response.code() <= 504) { - // retry return completedFuture(StatusResult.failure(ResponseStatus.ERROR_RETRY, "HttpProviderProvisioner: received error code: " + response.code())); } else { - // fatal error return completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, "HttpProviderProvisioner: received fatal error code: " + response.code())); } } catch (IOException e) { @@ -128,15 +125,13 @@ public CompletableFuture> deprovision(HttpPr try (var response = httpClient.execute(request)) { if (response.code() == 200) { var deprovisionedResource = DeprovisionedResource.Builder.newInstance() - .provisionedResourceId(provisionedResource.getTransferProcessId()) + .provisionedResourceId(provisionedResource.getId()) .inProcess(true) .build(); return completedFuture(StatusResult.success(deprovisionedResource)); } else if (response.code() >= 500 && response.code() <= 504) { - // retry return completedFuture(StatusResult.failure(ResponseStatus.ERROR_RETRY, "Received error code: " + response.code())); } else { - // fatal error return completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Received fatal error code: " + response.code())); } } catch (IOException e) { diff --git a/extensions/control-plane/provision/provision-http/src/main/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProvisionerRequest.java b/extensions/control-plane/provision/provision-http/src/main/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProvisionerRequest.java index 589213c0525..40ab0aa7eea 100644 --- a/extensions/control-plane/provision/provision-http/src/main/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProvisionerRequest.java +++ b/extensions/control-plane/provision/provision-http/src/main/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProvisionerRequest.java @@ -58,6 +58,10 @@ public String getResourceDefinitionId() { return resourceDefinitionId; } + public Type getType() { + return type; + } + public enum Type { @JsonProperty("provision") PROVISION, diff --git a/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProviderProvisionerTest.java b/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProviderProvisionerTest.java index 0cb1d2ce19e..223f02b2151 100644 --- a/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProviderProvisionerTest.java +++ b/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/controlplane/provision/http/impl/HttpProviderProvisionerTest.java @@ -31,6 +31,7 @@ import java.net.MalformedURLException; import java.net.URL; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.connector.controlplane.provision.http.config.ProvisionerConfiguration.ProvisionerType.PROVIDER; import static org.eclipse.edc.http.client.testfixtures.HttpTestUtils.testHttpClient; @@ -39,8 +40,9 @@ import static org.mockito.Mockito.when; class HttpProviderProvisionerTest { + + private final Interceptor delegate = mock(); private HttpProviderProvisioner provisioner; - private Interceptor delegate; @BeforeEach void setUp() throws MalformedURLException { @@ -52,7 +54,6 @@ void setUp() throws MalformedURLException { .endpoint(new URL("http://bar.com")) .build(); - delegate = mock(Interceptor.class); provisioner = new HttpProviderProvisioner(configuration, new URL("http://foo.com"), mock(PolicyEngine.class), testHttpClient(delegate), new ObjectMapper(), mock(Monitor.class)); } @@ -75,18 +76,6 @@ void verifyCanProvision() { void verifyCanDeprovision() { assertThat(provisioner.canDeprovision(createProvisionedResource())).isTrue(); assertThat(provisioner.canDeprovision(new TestProvisionedResource())).isFalse(); - - var dataAddress = DataAddress.Builder.newInstance().type("another-type").build(); - var differentType = HttpProvisionedContentResource.Builder.newInstance() - .assetId("1") - .transferProcessId("2") - .resourceName("test") - .dataAddress(dataAddress) - .resourceDefinitionId("3") - .id("3") - .build(); - - assertThat(provisioner.canDeprovision(differentType)).isFalse(); } @Test @@ -97,10 +86,10 @@ void verifyProvisionOkAndInProcess() throws Exception { var definition = createResourceDefinition(); - var result = provisioner.provision(definition, policy).get(); - - assertThat(result.succeeded()).isTrue(); - assertThat(result.getContent().isInProcess()).isTrue(); + assertThat(provisioner.provision(definition, policy)).succeedsWithin(5, SECONDS).satisfies(result -> { + assertThat(result.succeeded()).isTrue(); + assertThat(result.getContent().isInProcess()).isTrue(); + }); } @Test @@ -111,10 +100,10 @@ void verifyProvision404Response() throws Exception { var definition = createResourceDefinition(); - var result = provisioner.provision(definition, policy).get(); - - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(provisioner.provision(definition, policy)).succeedsWithin(5, SECONDS).satisfies(result -> { + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + }); } @Test @@ -125,10 +114,10 @@ void verifyProvisionRetryResponse() throws Exception { var definition = createResourceDefinition(); - var result = provisioner.provision(definition, policy).get(); - - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.ERROR_RETRY); + assertThat(provisioner.provision(definition, policy)).succeedsWithin(5, SECONDS).satisfies(result -> { + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.ERROR_RETRY); + }); } @Test @@ -139,28 +128,26 @@ void verifyDeprovisionOkAndInProcess() throws Exception { var definition = createProvisionedResource(); - var result = provisioner.deprovision(definition, policy).get(); - - assertThat(result.succeeded()).isTrue(); - assertThat(result.getContent().isInProcess()).isTrue(); + assertThat(provisioner.deprovision(definition, policy)).succeedsWithin(5, SECONDS).satisfies(result -> { + assertThat(result.succeeded()).isTrue(); + assertThat(result.getContent().getProvisionedResourceId()).isEqualTo(definition.getId()); + assertThat(result.getContent().isInProcess()).isTrue(); + }); } - @Test void verifyDeprovision404Response() throws Exception { when(delegate.intercept(any())).thenAnswer((invocation -> HttpProvisionerFixtures.createResponse(404, invocation))); var policy = Policy.Builder.newInstance().build(); - var definition = createProvisionedResource(); - var result = provisioner.deprovision(definition, policy).get(); - - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(provisioner.deprovision(definition, policy)).succeedsWithin(5, SECONDS).satisfies(result -> { + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + }); } - @Test void verifyDeprovisionRetryResponse() throws Exception { when(delegate.intercept(any())).thenAnswer((invocation -> HttpProvisionerFixtures.createResponse(503, invocation))); @@ -169,13 +156,12 @@ void verifyDeprovisionRetryResponse() throws Exception { var definition = createProvisionedResource(); - var result = provisioner.deprovision(definition, policy).get(); - - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.ERROR_RETRY); + assertThat(provisioner.deprovision(definition, policy)).succeedsWithin(5, SECONDS).satisfies(result -> { + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.ERROR_RETRY); + }); } - private HttpProviderResourceDefinition createResourceDefinition() { return HttpProviderResourceDefinition.Builder.newInstance() .assetId("1") @@ -198,7 +184,6 @@ private HttpProvisionedContentResource createProvisionedResource() { .build(); } - private static class TestResourceDefinition extends ResourceDefinition { @Override public > B toBuilder() { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 413bf892066..ad6a6629402 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -63,6 +63,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.inForceDatePolicy; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.DEPROVISIONED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; @@ -87,6 +88,7 @@ abstract static class Tests extends TransferEndToEndTestBase { @BeforeAll static void beforeAll() { providerDataSource = startClientAndServer(getFreePort()); + providerDataSource.when(request()).respond(HttpResponse.response().withBody("data")); } @AfterAll @@ -105,7 +107,6 @@ static void afterAll() { @Test void httpPull_dataTransfer_withCallbacks() { - providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); var callbacksEndpoint = startClientAndServer(getFreePort()); var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); @@ -135,13 +136,12 @@ void httpPull_dataTransfer_withCallbacks() { var msg = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); - providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); + providerDataSource.verify(request("/source").withMethod("GET")); stopQuietly(callbacksEndpoint); } @Test void httpPull_dataTransfer_withEdrCache() { - providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); var assetId = UUID.randomUUID().toString(); var sourceDataAddress = httpSourceDataAddress(); createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), sourceDataAddress); @@ -164,12 +164,11 @@ void httpPull_dataTransfer_withEdrCache() { // checks that transfer fails await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); - providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); + providerDataSource.verify(request("/source").withMethod("GET")); } @Test void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { - providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); @@ -201,12 +200,11 @@ void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { var secondMessage = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); - providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); + providerDataSource.verify(request("/source").withMethod("GET")); } @Test void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { - providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); @@ -242,14 +240,13 @@ void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { var secondMessage = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); - providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); + providerDataSource.verify(request("/source").withMethod("GET")); } @Test void pullFromHttp_httpProvision() { - providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); var provisionServer = startClientAndServer(PROVIDER.getHttpProvisionerPort()); - provisionServer.when(HttpRequest.request()).respond(new HttpProvisionerCallback()); + provisionServer.when(request("/provision")).respond(new HttpProvisionerCallback()); var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, Map.of( @@ -259,18 +256,28 @@ void pullFromHttp_httpProvision() { "proxyQueryParams", "true" )); - var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) .withTransferType("HttpData-PULL") .execute(); - CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); await().atMost(timeout).untilAsserted(() -> { - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); CONSUMER.pullData(edr, Map.of("message", "some information"), body -> assertThat(body).isEqualTo("data")); }); - provisionServer.verify(HttpRequest.request("/provision")); + provisionServer.verify(request("/provision")); + provisionServer.clear(request("provision")); + + var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() + .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) + .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + + PROVIDER.terminateTransfer(providerTransferProcessId); + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, DEPROVISIONED); + + provisionServer.verify(request("/provision")); stopQuietly(provisionServer); } @@ -342,28 +349,30 @@ private static class HttpProvisionerCallback implements ExpectationResponseCallb public HttpResponse handle(HttpRequest httpRequest) throws Exception { var requestBody = MAPPER.readValue(httpRequest.getBodyAsString(), Map.class); - var callbackRequestBody = Map.of( - "edctype", "dataspaceconnector:provisioner-callback-request", - "resourceDefinitionId", requestBody.get("resourceDefinitionId"), - "assetId", requestBody.get("assetId"), - "resourceName", "aName", - "contentDataAddress", Map.of("properties", httpSourceDataAddress()), - "apiKeyJwt", "unused", - "hasToken", false - ); - - Executors.newScheduledThreadPool(1).schedule(() -> { - try { - var request = new Request.Builder() - .url("%s/%s/provision".formatted(requestBody.get("callbackAddress"), requestBody.get("transferProcessId"))) - .post(RequestBody.create(MAPPER.writeValueAsString(callbackRequestBody), get("application/json"))) - .build(); - - testHttpClient().execute(request).close(); - } catch (Exception e) { - throw new EdcException(e); - } - }, 1, SECONDS); + if ("provision".equals(requestBody.get("type"))) { + var callbackRequestBody = Map.of( + "edctype", "dataspaceconnector:provisioner-callback-request", + "resourceDefinitionId", requestBody.get("resourceDefinitionId"), + "assetId", requestBody.get("assetId"), + "resourceName", "aName", + "contentDataAddress", Map.of("properties", httpSourceDataAddress()), + "apiKeyJwt", "unused", + "hasToken", false + ); + + Executors.newScheduledThreadPool(1).schedule(() -> { + try { + var request = new Request.Builder() + .url("%s/%s/provision".formatted(requestBody.get("callbackAddress"), requestBody.get("transferProcessId"))) + .post(RequestBody.create(MAPPER.writeValueAsString(callbackRequestBody), get("application/json"))) + .build(); + + testHttpClient().execute(request).close(); + } catch (Exception e) { + throw new EdcException(e); + } + }, 1, SECONDS); + } return response(); }