From 505119931a9ae6cd6a07d303774d0bb5829045d9 Mon Sep 17 00:00:00 2001 From: sbuettner Date: Mon, 5 Jun 2023 11:57:35 +0200 Subject: [PATCH] feat(webhook): Unique Webhook Id support --- .../lifecycle/InboundConnectorManager.java | 11 +- .../webhook/InboundWebhookRestController.java | 70 ++++---- .../webhook/WebhookConnectorRegistry.java | 71 +++----- .../WebhookControllerPlainJavaTests.java | 161 ++++++------------ .../WebhookControllerTestZeebeTests.java | 151 ---------------- .../InboundConnectorManagerTest.java | 71 +++++--- 6 files changed, 158 insertions(+), 377 deletions(-) delete mode 100644 connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerTestZeebeTests.java diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/lifecycle/InboundConnectorManager.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/lifecycle/InboundConnectorManager.java index 709fa2eb6f..0f51e6355c 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/lifecycle/InboundConnectorManager.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/lifecycle/InboundConnectorManager.java @@ -138,9 +138,9 @@ private void activateConnector(InboundConnectorProperties newProperties) { + "Check whether property camunda.connector.webhook.enabled is set to true."); } executable.activate(inboundContext); - if (webhookConnectorRegistry != null && executable instanceof WebhookConnectorExecutable wh) { - webhookConnectorRegistry.registerWebhookFunction(newProperties.getType(), wh); - webhookConnectorRegistry.activateEndpoint(inboundContext); + if (webhookConnectorRegistry != null + && connector.executable() instanceof WebhookConnectorExecutable) { + webhookConnectorRegistry.register(connector); LOG.trace("Registering webhook: " + newProperties.getType()); } inboundContext.reportHealth(Health.up()); @@ -173,6 +173,11 @@ private void deactivateConnector(ActiveInboundConnector connector) { try { connector.executable().deactivate(); activeConnectorsByBpmnId.get(connector.properties().getBpmnProcessId()).remove(connector); + if (webhookConnectorRegistry != null + && connector.executable() instanceof WebhookConnectorExecutable) { + webhookConnectorRegistry.deregister(connector); + LOG.trace("Unregistering webhook: " + connector.properties().getType()); + } } catch (Exception e) { // log and continue with other connectors anyway LOG.error("Failed to deactivate inbound connector " + connector, e); diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/InboundWebhookRestController.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/InboundWebhookRestController.java index 8228670a49..d88a71f469 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/InboundWebhookRestController.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/InboundWebhookRestController.java @@ -25,22 +25,20 @@ import static org.springframework.web.bind.annotation.RequestMethod.POST; import static org.springframework.web.bind.annotation.RequestMethod.PUT; -import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorResult; import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable; import io.camunda.connector.api.inbound.webhook.WebhookProcessingPayload; import io.camunda.connector.api.inbound.webhook.WebhookProcessingResult; +import io.camunda.connector.runtime.inbound.lifecycle.ActiveInboundConnector; import io.camunda.connector.runtime.inbound.webhook.model.HttpServletRequestWebhookProcessingPayload; import io.camunda.zeebe.spring.client.metrics.MetricsRecorder; import jakarta.servlet.http.HttpServletRequest; import java.io.IOException; -import java.util.Collection; import java.util.Map; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; @@ -48,7 +46,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.server.ResponseStatusException; @RestController public class InboundWebhookRestController { @@ -77,50 +74,51 @@ public InboundWebhookRestController( @RequestMapping( method = {GET, POST, PUT, DELETE}, path = "/inbound/{context}") - public ResponseEntity inbound( + public ResponseEntity> inbound( @PathVariable String context, @RequestHeader Map headers, @RequestBody(required = false) byte[] bodyAsByteArray, @RequestParam Map params, HttpServletRequest httpServletRequest) throws IOException { - LOG.trace("Received inbound hook on {}", context); - if (!webhookConnectorRegistry.containsContextPath(context)) { - throw new ResponseStatusException( - HttpStatus.NOT_FOUND.value(), "No webhook found for context: " + context, null); - } - incrementMetric(ACTION_ACTIVATED); - - WebhookProcessingPayload payload = - new HttpServletRequestWebhookProcessingPayload( - httpServletRequest, params, headers, bodyAsByteArray); - - WebhookResponse response = new WebhookResponse(); - Collection connectors = + Optional connectorByContextPath = webhookConnectorRegistry.getWebhookConnectorByContextPath(context); - for (InboundConnectorContext connectorContext : connectors) { - WebhookConnectorExecutable executable = - webhookConnectorRegistry.getByType(connectorContext.getProperties().getType()); - try { - var webhookResult = executable.triggerWebhook(payload); - Map ctxData = toConnectorVariablesContext(webhookResult); - InboundConnectorResult result = connectorContext.correlate(ctxData); - if (result != null && result.isActivated()) { - response.addExecutedConnector(connectorContext.getProperties(), result); - } else { - response.addUnactivatedConnector(connectorContext.getProperties()); - } - } catch (Exception e) { - response.addException(connectorContext.getProperties(), e); - incrementMetric(METRIC_NAME_INBOUND_CONNECTOR); - } + ResponseEntity> response = null; + if (connectorByContextPath.isEmpty()) { + response = ResponseEntity.notFound().build(); + } else { + var connector = connectorByContextPath.get(); + incrementMetric(ACTION_ACTIVATED); + WebhookProcessingPayload payload = + new HttpServletRequestWebhookProcessingPayload( + httpServletRequest, params, headers, bodyAsByteArray); + response = processWebhook(connector, payload); + incrementMetric(ACTION_COMPLETED); } + return response; + } - incrementMetric(ACTION_COMPLETED); - return ResponseEntity.ok(response); + private ResponseEntity> processWebhook( + ActiveInboundConnector connector, WebhookProcessingPayload payload) { + ResponseEntity> connectorResponse; + try { + incrementMetric(METRIC_NAME_INBOUND_CONNECTOR); + var webhookResult = + ((WebhookConnectorExecutable) connector.executable()).triggerWebhook(payload); + Map ctxData = toConnectorVariablesContext(webhookResult); + InboundConnectorResult result = connector.context().correlate(ctxData); + if (result != null && result.isActivated()) { + connectorResponse = ResponseEntity.ok(result); + } else { + connectorResponse = ResponseEntity.unprocessableEntity().build(); + } + } catch (Exception e) { + connectorResponse = ResponseEntity.internalServerError().build(); + } + return connectorResponse; } private void incrementMetric(final String action) { diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/WebhookConnectorRegistry.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/WebhookConnectorRegistry.java index 074445e699..95d912063d 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/WebhookConnectorRegistry.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/WebhookConnectorRegistry.java @@ -18,13 +18,11 @@ import static io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager.WEBHOOK_CONTEXT_BPMN_FIELD; -import io.camunda.connector.api.inbound.InboundConnectorContext; -import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable; import io.camunda.connector.impl.inbound.InboundConnectorProperties; -import java.util.ArrayList; +import io.camunda.connector.runtime.inbound.lifecycle.ActiveInboundConnector; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,57 +30,28 @@ public class WebhookConnectorRegistry { private final Logger LOG = LoggerFactory.getLogger(WebhookConnectorRegistry.class); - // active endpoints grouped by context path (additionally indexed by correlationPointId for faster - // lookup) - private final Map> activeEndpointsByContext = - new HashMap<>(); - private final Map webhookExecsByType = new HashMap<>(); + private final Map activeEndpointsByContext = new HashMap<>(); - public boolean containsContextPath(String context) { - return activeEndpointsByContext.containsKey(context) - && !activeEndpointsByContext.get(context).isEmpty(); + public Optional getWebhookConnectorByContextPath(String context) { + return Optional.ofNullable(activeEndpointsByContext.get(context)); } - public List getWebhookConnectorByContextPath(String context) { - return new ArrayList<>(activeEndpointsByContext.get(context).values()); + public void register(ActiveInboundConnector connector) { + InboundConnectorProperties properties = connector.properties(); + var context = properties.getRequiredProperty(WEBHOOK_CONTEXT_BPMN_FIELD); + var existingEndpoint = activeEndpointsByContext.putIfAbsent(context, connector); + if (existingEndpoint != null) { + var bpmnProcessId = existingEndpoint.properties().getBpmnProcessId(); + var elementId = existingEndpoint.properties().getElementId(); + var logMessage = + "Context: " + context + " already in use by " + bpmnProcessId + "/" + elementId + "."; + LOG.debug(logMessage); + throw new RuntimeException(logMessage); + } } - public void activateEndpoint(InboundConnectorContext connectorContext) { - InboundConnectorProperties properties = connectorContext.getProperties(); - activeEndpointsByContext.compute( - properties.getRequiredProperty(WEBHOOK_CONTEXT_BPMN_FIELD), - (context, endpoints) -> { - if (endpoints == null) { - Map newEndpoints = new HashMap<>(); - newEndpoints.put(properties.getCorrelationPointId(), connectorContext); - return newEndpoints; - } - endpoints.put(properties.getCorrelationPointId(), connectorContext); - return endpoints; - }); - } - - public void deactivateEndpoint(InboundConnectorProperties inboundConnectorProperties) { - activeEndpointsByContext.compute( - inboundConnectorProperties.getRequiredProperty(WEBHOOK_CONTEXT_BPMN_FIELD), - (context, endpoints) -> { - if (endpoints == null - || !endpoints.containsKey(inboundConnectorProperties.getCorrelationPointId())) { - LOG.warn( - "Attempted to disable non-existing webhook endpoint. " - + "This indicates a potential error in the connector lifecycle."); - return endpoints; - } - endpoints.remove(inboundConnectorProperties.getCorrelationPointId()); - return endpoints; - }); - } - - public WebhookConnectorExecutable getByType(String type) { - return webhookExecsByType.get(type); - } - - public void registerWebhookFunction(String type, WebhookConnectorExecutable function) { - webhookExecsByType.put(type, function); + public void deregister(ActiveInboundConnector connector) { + var context = connector.properties().getRequiredProperty(WEBHOOK_CONTEXT_BPMN_FIELD); + activeEndpointsByContext.remove(context); } } diff --git a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerPlainJavaTests.java b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerPlainJavaTests.java index cda00fb990..68e827289e 100644 --- a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerPlainJavaTests.java +++ b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerPlainJavaTests.java @@ -19,35 +19,32 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable; import io.camunda.connector.api.inbound.webhook.WebhookProcessingPayload; import io.camunda.connector.api.inbound.webhook.WebhookProcessingResult; +import io.camunda.connector.api.secret.SecretProvider; import io.camunda.connector.impl.inbound.InboundConnectorProperties; import io.camunda.connector.impl.inbound.correlation.StartEventCorrelationPoint; -import io.camunda.connector.impl.inbound.result.MessageCorrelationResult; -import io.camunda.connector.runtime.inbound.webhook.InboundWebhookRestController; +import io.camunda.connector.runtime.core.inbound.InboundConnectorContextImpl; +import io.camunda.connector.runtime.inbound.lifecycle.ActiveInboundConnector; import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry; -import io.camunda.connector.runtime.inbound.webhook.WebhookResponse; -import io.camunda.connector.test.inbound.InboundConnectorContextBuilder; -import io.camunda.zeebe.spring.client.metrics.MetricsRecorder; import io.camunda.zeebe.spring.client.metrics.SimpleMetricsRecorder; -import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Set; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.http.ResponseEntity; -import org.springframework.mock.web.MockHttpServletRequest; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class WebhookControllerPlainJavaTests { private static final String CONNECTOR_SECRET_NAME = "DUMMY_SECRET"; @@ -61,110 +58,39 @@ public void setupMetrics() { } @Test - public void multipleWebhooksOnSameContextPath() throws Exception { - WebhookConnectorRegistry webhook = new WebhookConnectorRegistry(); - - WebhookConnectorExecutable executable = Mockito.mock(WebhookConnectorExecutable.class); - Mockito.when(executable.triggerWebhook(any(WebhookProcessingPayload.class))) - .thenReturn(Mockito.mock(WebhookProcessingResult.class)); - - // Register webhook function 'implementation' - webhook.registerWebhookFunction("webhook", executable); - - InboundConnectorContext webhookA = - new InboundConnectorContextBuilder() - .properties(webhookProperties("processA", 1, "myPath")) - .secret(CONNECTOR_SECRET_NAME, CONNECTOR_SECRET_VALUE) - .result(new MessageCorrelationResult("", 0)) - .build(); - - InboundConnectorContext webhookB = - new InboundConnectorContextBuilder() - .properties(webhookProperties("processB", 1, "myPath")) - .secret(CONNECTOR_SECRET_NAME, CONNECTOR_SECRET_VALUE) - .result(new MessageCorrelationResult("", 0)) - .build(); - - InboundWebhookRestController controller = new InboundWebhookRestController(webhook, metrics); - - // Register processes - webhook.activateEndpoint(webhookA); - webhook.activateEndpoint(webhookB); - - ResponseEntity responseEntity = - controller.inbound( - "myPath", - new HashMap<>(), - "{}".getBytes(), - new HashMap<>(), - new MockHttpServletRequest()); - - assertEquals(200, responseEntity.getStatusCode().value()); - assertTrue(responseEntity.getBody().getUnactivatedConnectors().isEmpty()); - assertEquals(2, responseEntity.getBody().getExecutedConnectors().size()); - assertEquals( - Set.of("myPath-processA-1", "myPath-processB-1"), - responseEntity.getBody().getExecutedConnectors().keySet()); - assertEquals( - 1, - metrics.getCount( - MetricsRecorder.METRIC_NAME_INBOUND_CONNECTOR, - MetricsRecorder.ACTION_ACTIVATED, - InboundWebhookRestController.METRIC_WEBHOOK_VALUE)); - assertEquals( - 1, - metrics.getCount( - MetricsRecorder.METRIC_NAME_INBOUND_CONNECTOR, - MetricsRecorder.ACTION_COMPLETED, - InboundWebhookRestController.METRIC_WEBHOOK_VALUE)); - assertEquals( - 0, - metrics.getCount( - MetricsRecorder.METRIC_NAME_INBOUND_CONNECTOR, - MetricsRecorder.ACTION_FAILED, - InboundWebhookRestController.METRIC_WEBHOOK_VALUE)); + public void multipleWebhooksOnSameContextPathAreNotSupported() throws Exception { + WebhookConnectorRegistry webhookConnectorRegistry = new WebhookConnectorRegistry(); + var connectorA = buildConnector(webhookProperties("processA", 1, "myPath")); + webhookConnectorRegistry.register(connectorA); + + var connectorB = buildConnector(webhookProperties("processA", 1, "myPath")); + assertThrowsExactly( + RuntimeException.class, () -> webhookConnectorRegistry.register(connectorB)); } @Test - public void webhookMultipleVersionsDisableWebhook() { + public void webhookMultipleVersionsDisableWebhook() throws Exception { WebhookConnectorRegistry webhook = new WebhookConnectorRegistry(); - var processA1 = - new InboundConnectorContextBuilder() - .properties(webhookProperties("processA", 1, "myPath")) - .secret(CONNECTOR_SECRET_NAME, CONNECTOR_SECRET_VALUE) - .build(); - - var processA2 = - new InboundConnectorContextBuilder() - .properties(webhookProperties("processA", 2, "myPath")) - .secret(CONNECTOR_SECRET_NAME, CONNECTOR_SECRET_VALUE) - .build(); - - var processB1 = - new InboundConnectorContextBuilder() - .properties(webhookProperties("processB", 1, "myPath2")) - .secret(CONNECTOR_SECRET_NAME, CONNECTOR_SECRET_VALUE) - .build(); + var processA1 = buildConnector(webhookProperties("processA", 1, "myPath")); + var processA2 = buildConnector(webhookProperties("processA", 2, "myPath")); + var processB1 = buildConnector(webhookProperties("processB", 1, "myPath2")); - webhook.activateEndpoint(processA1); - webhook.deactivateEndpoint(processA1.getProperties()); + webhook.register(processA1); + webhook.deregister(processA1); - webhook.activateEndpoint(processA2); + webhook.register(processA2); - webhook.activateEndpoint(processB1); - webhook.deactivateEndpoint(processB1.getProperties()); + webhook.register(processB1); + webhook.deregister(processB1); - Collection connectors1 = - webhook.getWebhookConnectorByContextPath("myPath"); + var connectorForPath1 = webhook.getWebhookConnectorByContextPath("myPath"); - assertEquals(1, connectors1.size()); // only one - assertEquals( - 2, connectors1.iterator().next().getProperties().getVersion()); // And the newest one + assertTrue(connectorForPath1.isPresent(), "Connector is present"); + assertEquals(2, connectorForPath1.get().properties().getVersion(), "The newest one"); - Collection connectors2 = - webhook.getWebhookConnectorByContextPath("myPath2"); - assertEquals(0, connectors2.size()); // No one - as it was disabled + var connectorForPath2 = webhook.getWebhookConnectorByContextPath("myPath2"); + assertTrue(connectorForPath2.isEmpty(), "No one - as it was deleted."); } @Test @@ -172,22 +98,35 @@ public void webhookDeactivation_shouldReturnNotFound() { WebhookConnectorRegistry webhook = new WebhookConnectorRegistry(); // given - var processA1 = - new InboundConnectorContextBuilder() - .properties(webhookProperties("processA", 1, "myPath")) - .secret(CONNECTOR_SECRET_NAME, CONNECTOR_SECRET_VALUE) - .build(); + var processA1 = buildConnector(webhookProperties("processA", 1, "myPath")); // when - webhook.activateEndpoint(processA1); - webhook.deactivateEndpoint(processA1.getProperties()); + webhook.register(processA1); + webhook.deregister(processA1); // then - assertFalse(webhook.containsContextPath("myPath")); + assertFalse(webhook.getWebhookConnectorByContextPath("myPath").isPresent()); } private static long nextProcessDefinitionKey = 0L; + public static ActiveInboundConnector buildConnector(InboundConnectorProperties properties) { + WebhookConnectorExecutable executable = Mockito.mock(WebhookConnectorExecutable.class); + try { + Mockito.when(executable.triggerWebhook(any(WebhookProcessingPayload.class))) + .thenReturn(Mockito.mock(WebhookProcessingResult.class)); + } catch (Exception e) { + throw new RuntimeException(e); + } + return new ActiveInboundConnector(executable, properties, buildContext(properties)); + } + + public static InboundConnectorContextImpl buildContext(InboundConnectorProperties properties) { + final Map secrets = new HashMap<>(); + SecretProvider secretProvider = secrets::get; + return new InboundConnectorContextImpl(secretProvider, properties, null, (x) -> {}); + } + public static InboundConnectorProperties webhookProperties( String bpmnProcessId, int version, String contextPath) { return webhookProperties(++nextProcessDefinitionKey, bpmnProcessId, version, contextPath); diff --git a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerTestZeebeTests.java b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerTestZeebeTests.java deleted file mode 100644 index 18c131d860..0000000000 --- a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerTestZeebeTests.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH - * under one or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. Camunda licenses this file to you under the Apache License, - * Version 2.0; you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.camunda.connector.runtime.inbound; - -import static io.camunda.connector.runtime.inbound.WebhookControllerPlainJavaTests.webhookProperties; -import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat; -import static io.camunda.zeebe.spring.test.ZeebeTestThreadSupport.waitForProcessInstanceCompleted; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; - -import io.camunda.connector.api.inbound.InboundConnectorResult; -import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable; -import io.camunda.connector.api.inbound.webhook.WebhookProcessingPayload; -import io.camunda.connector.api.inbound.webhook.WebhookProcessingResult; -import io.camunda.connector.api.secret.SecretProvider; -import io.camunda.connector.impl.inbound.result.ProcessInstance; -import io.camunda.connector.impl.inbound.result.StartEventCorrelationResult; -import io.camunda.connector.runtime.app.TestConnectorRuntimeApplication; -import io.camunda.connector.runtime.core.inbound.InboundConnectorContextImpl; -import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler; -import io.camunda.connector.runtime.inbound.webhook.InboundWebhookRestController; -import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry; -import io.camunda.connector.runtime.inbound.webhook.WebhookResponse; -import io.camunda.zeebe.client.ZeebeClient; -import io.camunda.zeebe.model.bpmn.Bpmn; -import io.camunda.zeebe.process.test.inspections.model.InspectedProcessInstance; -import io.camunda.zeebe.spring.test.ZeebeSpringTest; -import java.util.HashMap; -import java.util.Set; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.http.ResponseEntity; -import org.springframework.mock.web.MockHttpServletRequest; - -@SpringBootTest( - classes = TestConnectorRuntimeApplication.class, - properties = { - "spring.main.allow-bean-definition-overriding=true", - "camunda.connector.webhook.enabled=true" - }) -@ZeebeSpringTest -@ExtendWith(MockitoExtension.class) -class WebhookControllerTestZeebeTests { - - @Test - public void contextLoaded() {} - - @Autowired private WebhookConnectorRegistry webhook; - - @Autowired private ZeebeClient zeebeClient; - - @Autowired private SecretProvider secretProvider; - - @Autowired private InboundCorrelationHandler correlationHandler; - - @Autowired @InjectMocks private InboundWebhookRestController controller; - - // This test is wired by Spring - but this is not really giving us any advantage - // Better move to plain Java as shown in InboundWebhookRestControllerTests - @Test - public void multipleWebhooksOnSameContextPath() throws Exception { - WebhookConnectorExecutable executable = Mockito.mock(WebhookConnectorExecutable.class); - Mockito.when(executable.triggerWebhook(any(WebhookProcessingPayload.class))) - .thenReturn(Mockito.mock(WebhookProcessingResult.class)); - - // Register webhook function 'implementation' - webhook.registerWebhookFunction("webhook", executable); - - deployProcess("processA"); - deployProcess("processB"); - - var webhookA = - new InboundConnectorContextImpl( - secretProvider, - webhookProperties("processA", 1, "myPath"), - correlationHandler, - (e) -> {}); - - var webhookB = - new InboundConnectorContextImpl( - secretProvider, - webhookProperties("processB", 1, "myPath"), - correlationHandler, - (e) -> {}); - - webhook.activateEndpoint(webhookA); - webhook.activateEndpoint(webhookB); - - ResponseEntity responseEntity = - controller.inbound( - "myPath", - new HashMap<>(), - "{}".getBytes(), - new HashMap<>(), - new MockHttpServletRequest()); - - assertEquals(200, responseEntity.getStatusCode().value()); - assertTrue(responseEntity.getBody().getUnactivatedConnectors().isEmpty()); - assertEquals(2, responseEntity.getBody().getExecutedConnectors().size()); - assertEquals( - Set.of("myPath-processA-1", "myPath-processB-1"), - responseEntity.getBody().getExecutedConnectors().keySet()); - - InboundConnectorResult piA = - responseEntity.getBody().getExecutedConnectors().get("myPath-processA-1"); - assertInstanceOf(StartEventCorrelationResult.class, piA); - ProcessInstance piEventA = ((StartEventCorrelationResult) piA).getResponseData().get(); - - waitForProcessInstanceCompleted(piEventA.getProcessInstanceKey()); - assertThat(new InspectedProcessInstance(piEventA.getProcessInstanceKey())).isCompleted(); - - InboundConnectorResult piB = - responseEntity.getBody().getExecutedConnectors().get("myPath-processB-1"); - assertInstanceOf(StartEventCorrelationResult.class, piB); - ProcessInstance piEventB = ((StartEventCorrelationResult) piB).getResponseData().get(); - - waitForProcessInstanceCompleted(piEventB.getProcessInstanceKey()); - assertThat(new InspectedProcessInstance(piEventB.getProcessInstanceKey())).isCompleted(); - } - - public void deployProcess(String bpmnProcessId) { - zeebeClient - .newDeployResourceCommand() - .addProcessModel( - Bpmn.createExecutableProcess(bpmnProcessId).startEvent().endEvent().done(), - bpmnProcessId + ".bpmn") - .send() - .join(); - } -} diff --git a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/lifecycle/InboundConnectorManagerTest.java b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/lifecycle/InboundConnectorManagerTest.java index 49bca27c42..2c2412ba5d 100644 --- a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/lifecycle/InboundConnectorManagerTest.java +++ b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/lifecycle/InboundConnectorManagerTest.java @@ -61,8 +61,8 @@ public class InboundConnectorManagerTest { private InboundConnectorManager manager; private ProcessDefinitionTestUtil procDefUtil; private InboundConnectorFactory factory; - private InboundConnectorExecutable mockExecutable; - private WebhookConnectorExecutable mockWebhook; + private InboundConnectorExecutable inboundConnectorExecutable; + private WebhookConnectorExecutable webhookConnectorExecutable; private WebhookConnectorRegistry webhookRegistry; private SecretProviderAggregator secretProviderAggregator; private InboundCorrelationHandler correlationHandler; @@ -71,11 +71,11 @@ public class InboundConnectorManagerTest { void resetMocks() { correlationHandler = mock(InboundCorrelationHandler.class); - mockExecutable = spy(new TestInboundConnector()); - mockWebhook = spy(new TestWebhookConnector()); - webhookRegistry = mock(WebhookConnectorRegistry.class); + inboundConnectorExecutable = spy(new TestInboundConnector()); + webhookConnectorExecutable = spy(new TestWebhookConnector()); + webhookRegistry = new WebhookConnectorRegistry(); factory = mock(InboundConnectorFactory.class); - when(factory.getInstance(any())).thenReturn(mockExecutable); + when(factory.getInstance(any())).thenReturn(inboundConnectorExecutable); secretProviderAggregator = mock(SecretProviderAggregator.class); @@ -99,7 +99,7 @@ void shouldActivateConnector_NewBpmnDeployed_SingleConnector() throws Exception // then assertTrue(manager.isProcessDefinitionRegistered(process.getKey())); verify(factory, times(1)).getInstance(connector.getType()); - verify(mockExecutable, times(1)).activate(eq(inboundContext(connector))); + verify(inboundConnectorExecutable, times(1)).activate(eq(inboundContext(connector))); } @Test @@ -115,7 +115,7 @@ void shouldActivateConnector_NewBpmnDeployed_MultipleConnectors() throws Excepti assertTrue(manager.isProcessDefinitionRegistered(process.getKey())); verify(factory, times(2)).getInstance(connectors.get(0).getType()); for (var connector : connectors) { - verify(mockExecutable, times(1)).activate(eq(inboundContext(connector))); + verify(inboundConnectorExecutable, times(1)).activate(eq(inboundContext(connector))); } } @@ -137,10 +137,10 @@ void shouldReplaceConnector_NewVersionDeployed() throws Exception { verify(factory, times(2)).getInstance(connector1.getType()); - verify(mockExecutable, times(1)).activate(eq(inboundContext(connector1))); - verify(mockExecutable, times(1)).deactivate(); - verify(mockExecutable, times(1)).activate(eq(inboundContext(connector2))); - verifyNoMoreInteractions(mockExecutable); + verify(inboundConnectorExecutable, times(1)).activate(eq(inboundContext(connector1))); + verify(inboundConnectorExecutable, times(1)).deactivate(); + verify(inboundConnectorExecutable, times(1)).activate(eq(inboundContext(connector2))); + verifyNoMoreInteractions(inboundConnectorExecutable); } @Test @@ -154,7 +154,7 @@ void shouldNotActivate_NewBpmnDeployed_NoConnectors() throws Exception { // then assertTrue(manager.isProcessDefinitionRegistered(process.getKey())); verifyNoInteractions(factory); - verifyNoInteractions(mockExecutable); + verifyNoInteractions(inboundConnectorExecutable); } @Test @@ -178,10 +178,10 @@ void shouldOnlyActivateLatestConnectors_BulkImport() throws Exception { assertTrue(manager.isProcessDefinitionRegistered(process2.getKey())); verify(factory, times(1)).getInstance(connector2.getType()); - verify(mockExecutable, times(1)).activate(inboundContext(connector2)); + verify(inboundConnectorExecutable, times(1)).activate(inboundContext(connector2)); verifyNoMoreInteractions(factory); - verifyNoMoreInteractions(mockExecutable); + verifyNoMoreInteractions(inboundConnectorExecutable); } @Test @@ -192,7 +192,7 @@ void shouldHandleCancellationCallback() throws Exception { // when procDefUtil.deployProcessDefinition(process, connector); - var context = ((TestInboundConnector) mockExecutable).getProvidedContext(); + var context = ((TestInboundConnector) inboundConnectorExecutable).getProvidedContext(); context.cancel(new RuntimeException("subscription interrupted")); // then @@ -202,21 +202,42 @@ void shouldHandleCancellationCallback() throws Exception { .query(new ActiveInboundConnectorQuery(process.getBpmnProcessId(), null, null)) .isEmpty()); - verify(mockExecutable, times(1)).activate(eq(inboundContext(connector))); - verify(mockExecutable, times(1)).deactivate(); + verify(inboundConnectorExecutable, times(1)).activate(eq(inboundContext(connector))); + verify(inboundConnectorExecutable, times(1)).deactivate(); } @Test void shouldActivateAndRegisterWebhook() throws Exception { - when(factory.getInstance("io.camunda:test-webhook:1")).thenReturn(mockWebhook); + when(factory.getInstance("io.camunda:test-webhook:1")).thenReturn(webhookConnectorExecutable); var process = processDefinition("webhook1", 1); var webhook = webhookConnector(process); - procDefUtil.deployProcessDefinition(process, webhook); + verify(webhookConnectorExecutable, times(1)).activate(eq(inboundContext(webhook))); + } - verify(mockWebhook, times(1)).activate(eq(inboundContext(webhook))); - verify(webhookRegistry, times(1)).registerWebhookFunction(webhookConfig.getType(), mockWebhook); - verify(webhookRegistry, times(1)).activateEndpoint(eq(inboundContext(webhook))); + @Test + void shouldActivateAndRegisterWebhookWithANewVersion() throws Exception { + when(factory.getInstance(webhookConfig.getType())).thenReturn(webhookConnectorExecutable); + + // Deploy one process with a webhook + var pv1 = processDefinition("webhook1", 1); + var wh1 = webhookConnector(pv1); + procDefUtil.deployProcessDefinition(pv1, wh1); + + // Deploy a new version of the process + var pv2 = processDefinition("webhook1", 2); + var wh2 = webhookConnector(pv2); + procDefUtil.deployProcessDefinition(pv2, wh2); + + verify(factory, times(2)).getInstance(webhookConfig.getType()); + verify(webhookConnectorExecutable, times(1)).activate(eq(inboundContext(wh1))); + verify(webhookConnectorExecutable).deactivate(); + verify(webhookConnectorExecutable, times(1)).activate(eq(inboundContext(wh2))); + verifyNoMoreInteractions(inboundConnectorExecutable); + + // New version should be active + var connector = webhookRegistry.getWebhookConnectorByContextPath("myWebhookEndpoint"); + assertEquals(2, connector.get().properties().getVersion()); } @Test @@ -230,7 +251,7 @@ void shouldNotActivateWebhookWhenDisabled() throws Exception { factory, correlationHandler, inspector, secretProviderAggregator, null); procDefUtil = new ProcessDefinitionTestUtil(manager, inspector); - when(factory.getInstance("io.camunda:test-webhook:1")).thenReturn(mockWebhook); + when(factory.getInstance("io.camunda:test-webhook:1")).thenReturn(webhookConnectorExecutable); var process = processDefinition("webhook1", 1); var webhook = webhookConnector(process); @@ -238,7 +259,7 @@ void shouldNotActivateWebhookWhenDisabled() throws Exception { procDefUtil.deployProcessDefinition(process, webhook); // Then - verify(mockWebhook, times(0)).activate(eq(inboundContext(webhook))); + verify(webhookConnectorExecutable, times(0)).activate(eq(inboundContext(webhook))); var query = new ActiveInboundConnectorQuery("webhook1", null, null); var activeInboundConnectors = manager.query(query);