Skip to content

Commit

Permalink
feat(webhook): Unique Webhook Id support
Browse files Browse the repository at this point in the history
  • Loading branch information
sbuettner committed Jun 5, 2023
1 parent 4663339 commit 5051199
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 377 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ private void activateConnector(InboundConnectorProperties newProperties) {
+ "Check whether property camunda.connector.webhook.enabled is set to true.");
}
executable.activate(inboundContext);
if (webhookConnectorRegistry != null && executable instanceof WebhookConnectorExecutable wh) {
webhookConnectorRegistry.registerWebhookFunction(newProperties.getType(), wh);
webhookConnectorRegistry.activateEndpoint(inboundContext);
if (webhookConnectorRegistry != null
&& connector.executable() instanceof WebhookConnectorExecutable) {
webhookConnectorRegistry.register(connector);
LOG.trace("Registering webhook: " + newProperties.getType());
}
inboundContext.reportHealth(Health.up());
Expand Down Expand Up @@ -173,6 +173,11 @@ private void deactivateConnector(ActiveInboundConnector connector) {
try {
connector.executable().deactivate();
activeConnectorsByBpmnId.get(connector.properties().getBpmnProcessId()).remove(connector);
if (webhookConnectorRegistry != null
&& connector.executable() instanceof WebhookConnectorExecutable) {
webhookConnectorRegistry.deregister(connector);
LOG.trace("Unregistering webhook: " + connector.properties().getType());
}
} catch (Exception e) {
// log and continue with other connectors anyway
LOG.error("Failed to deactivate inbound connector " + connector, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,27 @@
import static org.springframework.web.bind.annotation.RequestMethod.POST;
import static org.springframework.web.bind.annotation.RequestMethod.PUT;

import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorResult;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable;
import io.camunda.connector.api.inbound.webhook.WebhookProcessingPayload;
import io.camunda.connector.api.inbound.webhook.WebhookProcessingResult;
import io.camunda.connector.runtime.inbound.lifecycle.ActiveInboundConnector;
import io.camunda.connector.runtime.inbound.webhook.model.HttpServletRequestWebhookProcessingPayload;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import jakarta.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;

@RestController
public class InboundWebhookRestController {
Expand Down Expand Up @@ -77,50 +74,51 @@ public InboundWebhookRestController(
@RequestMapping(
method = {GET, POST, PUT, DELETE},
path = "/inbound/{context}")
public ResponseEntity<WebhookResponse> inbound(
public ResponseEntity<InboundConnectorResult<?>> inbound(
@PathVariable String context,
@RequestHeader Map<String, String> headers,
@RequestBody(required = false) byte[] bodyAsByteArray,
@RequestParam Map<String, String> params,
HttpServletRequest httpServletRequest)
throws IOException {

LOG.trace("Received inbound hook on {}", context);

if (!webhookConnectorRegistry.containsContextPath(context)) {
throw new ResponseStatusException(
HttpStatus.NOT_FOUND.value(), "No webhook found for context: " + context, null);
}
incrementMetric(ACTION_ACTIVATED);

WebhookProcessingPayload payload =
new HttpServletRequestWebhookProcessingPayload(
httpServletRequest, params, headers, bodyAsByteArray);

WebhookResponse response = new WebhookResponse();
Collection<InboundConnectorContext> connectors =
Optional<ActiveInboundConnector> connectorByContextPath =
webhookConnectorRegistry.getWebhookConnectorByContextPath(context);

for (InboundConnectorContext connectorContext : connectors) {
WebhookConnectorExecutable executable =
webhookConnectorRegistry.getByType(connectorContext.getProperties().getType());
try {
var webhookResult = executable.triggerWebhook(payload);
Map<String, Object> ctxData = toConnectorVariablesContext(webhookResult);
InboundConnectorResult<?> result = connectorContext.correlate(ctxData);
if (result != null && result.isActivated()) {
response.addExecutedConnector(connectorContext.getProperties(), result);
} else {
response.addUnactivatedConnector(connectorContext.getProperties());
}
} catch (Exception e) {
response.addException(connectorContext.getProperties(), e);
incrementMetric(METRIC_NAME_INBOUND_CONNECTOR);
}
ResponseEntity<InboundConnectorResult<?>> response = null;
if (connectorByContextPath.isEmpty()) {
response = ResponseEntity.notFound().build();
} else {
var connector = connectorByContextPath.get();
incrementMetric(ACTION_ACTIVATED);
WebhookProcessingPayload payload =
new HttpServletRequestWebhookProcessingPayload(
httpServletRequest, params, headers, bodyAsByteArray);
response = processWebhook(connector, payload);
incrementMetric(ACTION_COMPLETED);
}
return response;
}

incrementMetric(ACTION_COMPLETED);
return ResponseEntity.ok(response);
private ResponseEntity<InboundConnectorResult<?>> processWebhook(
ActiveInboundConnector connector, WebhookProcessingPayload payload) {
ResponseEntity<InboundConnectorResult<?>> connectorResponse;
try {
incrementMetric(METRIC_NAME_INBOUND_CONNECTOR);
var webhookResult =
((WebhookConnectorExecutable) connector.executable()).triggerWebhook(payload);
Map<String, Object> ctxData = toConnectorVariablesContext(webhookResult);
InboundConnectorResult<?> result = connector.context().correlate(ctxData);
if (result != null && result.isActivated()) {
connectorResponse = ResponseEntity.ok(result);
} else {
connectorResponse = ResponseEntity.unprocessableEntity().build();
}
} catch (Exception e) {
connectorResponse = ResponseEntity.internalServerError().build();
}
return connectorResponse;
}

private void incrementMetric(final String action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,71 +18,40 @@

import static io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager.WEBHOOK_CONTEXT_BPMN_FIELD;

import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable;
import io.camunda.connector.impl.inbound.InboundConnectorProperties;
import java.util.ArrayList;
import io.camunda.connector.runtime.inbound.lifecycle.ActiveInboundConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebhookConnectorRegistry {

private final Logger LOG = LoggerFactory.getLogger(WebhookConnectorRegistry.class);

// active endpoints grouped by context path (additionally indexed by correlationPointId for faster
// lookup)
private final Map<String, Map<String, InboundConnectorContext>> activeEndpointsByContext =
new HashMap<>();
private final Map<String, WebhookConnectorExecutable> webhookExecsByType = new HashMap<>();
private final Map<String, ActiveInboundConnector> activeEndpointsByContext = new HashMap<>();

public boolean containsContextPath(String context) {
return activeEndpointsByContext.containsKey(context)
&& !activeEndpointsByContext.get(context).isEmpty();
public Optional<ActiveInboundConnector> getWebhookConnectorByContextPath(String context) {
return Optional.ofNullable(activeEndpointsByContext.get(context));
}

public List<InboundConnectorContext> getWebhookConnectorByContextPath(String context) {
return new ArrayList<>(activeEndpointsByContext.get(context).values());
public void register(ActiveInboundConnector connector) {
InboundConnectorProperties properties = connector.properties();
var context = properties.getRequiredProperty(WEBHOOK_CONTEXT_BPMN_FIELD);
var existingEndpoint = activeEndpointsByContext.putIfAbsent(context, connector);
if (existingEndpoint != null) {
var bpmnProcessId = existingEndpoint.properties().getBpmnProcessId();
var elementId = existingEndpoint.properties().getElementId();
var logMessage =
"Context: " + context + " already in use by " + bpmnProcessId + "/" + elementId + ".";
LOG.debug(logMessage);
throw new RuntimeException(logMessage);
}
}

public void activateEndpoint(InboundConnectorContext connectorContext) {
InboundConnectorProperties properties = connectorContext.getProperties();
activeEndpointsByContext.compute(
properties.getRequiredProperty(WEBHOOK_CONTEXT_BPMN_FIELD),
(context, endpoints) -> {
if (endpoints == null) {
Map<String, InboundConnectorContext> newEndpoints = new HashMap<>();
newEndpoints.put(properties.getCorrelationPointId(), connectorContext);
return newEndpoints;
}
endpoints.put(properties.getCorrelationPointId(), connectorContext);
return endpoints;
});
}

public void deactivateEndpoint(InboundConnectorProperties inboundConnectorProperties) {
activeEndpointsByContext.compute(
inboundConnectorProperties.getRequiredProperty(WEBHOOK_CONTEXT_BPMN_FIELD),
(context, endpoints) -> {
if (endpoints == null
|| !endpoints.containsKey(inboundConnectorProperties.getCorrelationPointId())) {
LOG.warn(
"Attempted to disable non-existing webhook endpoint. "
+ "This indicates a potential error in the connector lifecycle.");
return endpoints;
}
endpoints.remove(inboundConnectorProperties.getCorrelationPointId());
return endpoints;
});
}

public WebhookConnectorExecutable getByType(String type) {
return webhookExecsByType.get(type);
}

public void registerWebhookFunction(String type, WebhookConnectorExecutable function) {
webhookExecsByType.put(type, function);
public void deregister(ActiveInboundConnector connector) {
var context = connector.properties().getRequiredProperty(WEBHOOK_CONTEXT_BPMN_FIELD);
activeEndpointsByContext.remove(context);
}
}
Loading

0 comments on commit 5051199

Please sign in to comment.