Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: inbound metrics #678

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ camunda.operate.client.url=http://localhost:8081
camunda.operate.client.username=demo
camunda.operate.client.password=demo
camunda.connector.webhook.enabled=true

management.context-path=/actuator
management.endpoints.web.exposure.include=metrics,health,prometheus
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public InboundConnectorResult<?> correlate(
+ " is not supported by Runtime");
}

private InboundConnectorResult<ProcessInstance> triggerStartEvent(
protected InboundConnectorResult<ProcessInstance> triggerStartEvent(
InboundConnectorProperties properties, Object variables) {
StartEventCorrelationPoint correlationPoint =
(StartEventCorrelationPoint) properties.getCorrelationPoint();
Expand Down Expand Up @@ -106,7 +106,7 @@ private InboundConnectorResult<ProcessInstance> triggerStartEvent(
}
}

private InboundConnectorResult<CorrelatedMessage> triggerMessage(
protected InboundConnectorResult<CorrelatedMessage> triggerMessage(
InboundConnectorProperties properties, Object variables) {

MessageCorrelationPoint correlationPoint =
Expand Down Expand Up @@ -142,7 +142,8 @@ private InboundConnectorResult<CorrelatedMessage> triggerMessage(
}
}

private boolean isActivationConditionMet(InboundConnectorProperties properties, Object context) {
protected boolean isActivationConditionMet(
InboundConnectorProperties properties, Object context) {

String activationCondition = properties.getProperty(ACTIVATION_CONDITION_KEYWORD);
if (activationCondition == null || activationCondition.trim().length() == 0) {
Expand All @@ -157,7 +158,7 @@ private boolean isActivationConditionMet(InboundConnectorProperties properties,
}
}

private String extractCorrelationKey(InboundConnectorProperties properties, Object context) {
protected String extractCorrelationKey(InboundConnectorProperties properties, Object context) {
String correlationKeyExpression =
properties.getRequiredProperty(CORRELATION_KEY_EXPRESSION_KEYWORD);
try {
Expand All @@ -167,7 +168,7 @@ private String extractCorrelationKey(InboundConnectorProperties properties, Obje
}
}

private Object extractVariables(Object rawVariables, InboundConnectorProperties properties) {
protected Object extractVariables(Object rawVariables, InboundConnectorProperties properties) {
if (properties.getProperty(LEGACY_VARIABLE_MAPPING_KEYWORD) != null) {
// if legacy variable mapping is used, we don't need to extract variables
// because they are already extracted by the webhook connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionImportConfiguration;
import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorLifecycleConfiguration;
import io.camunda.connector.runtime.inbound.lifecycle.MeteredInboundCorrelationHandler;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -31,7 +33,9 @@ public class InboundConnectorRuntimeConfiguration {

@Bean
public InboundCorrelationHandler inboundCorrelationHandler(
final ZeebeClient zeebeClient, final FeelEngineWrapper feelEngine) {
return new InboundCorrelationHandler(zeebeClient, feelEngine);
final ZeebeClient zeebeClient,
final FeelEngineWrapper feelEngine,
final MetricsRecorder metricsRecorder) {
return new MeteredInboundCorrelationHandler(zeebeClient, feelEngine, metricsRecorder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -26,8 +27,10 @@ public class ProcessDefinitionImportConfiguration {

@Bean
public ProcessDefinitionImporter processDefinitionImporter(
CamundaOperateClient client, InboundConnectorManager manager) {
return new ProcessDefinitionImporter(client, manager);
CamundaOperateClient client,
InboundConnectorManager manager,
MetricsRecorder metricsRecorder) {
return new ProcessDefinitionImporter(client, manager, metricsRecorder);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package io.camunda.connector.runtime.inbound.importer;

import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager;
import io.camunda.connector.runtime.metrics.ConnectorMetrics.Inbound;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.operate.dto.ProcessDefinition;
import io.camunda.operate.dto.SearchResult;
import io.camunda.operate.exception.OperateException;
import io.camunda.operate.search.SearchQuery;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,14 +37,18 @@ public class ProcessDefinitionImporter {

private final CamundaOperateClient camundaOperateClient;
private final InboundConnectorManager inboundManager;
private final MetricsRecorder metricsRecorder;

private List<Object> paginationIndex;

@Autowired
public ProcessDefinitionImporter(
CamundaOperateClient camundaOperateClient, InboundConnectorManager inboundManager) {
CamundaOperateClient camundaOperateClient,
InboundConnectorManager inboundManager,
@Autowired(required = false) MetricsRecorder metricsRecorder) {
this.camundaOperateClient = camundaOperateClient;
this.inboundManager = inboundManager;
this.metricsRecorder = metricsRecorder;
}

@Scheduled(fixedDelayString = "${camunda.connector.polling.interval:5000}")
Expand All @@ -67,15 +73,22 @@ public synchronized void scheduleImport() throws OperateException {
} while (result.getItems().size() > 0);
}

private void handleImportedDefinitions(List<ProcessDefinition> processDefinitions)
throws OperateException {
private void handleImportedDefinitions(List<ProcessDefinition> processDefinitions) {

if (processDefinitions == null) {
if (processDefinitions == null || processDefinitions.isEmpty()) {
LOG.trace("... returned no process definitions.");
return;
}
LOG.trace("... returned " + processDefinitions.size() + " process definitions.");
meter(processDefinitions.size());

inboundManager.registerProcessDefinitions(processDefinitions);
}

private void meter(int count) {
if (metricsRecorder != null) {
metricsRecorder.increase(
Inbound.METRIC_NAME_INBOUND_PROCESS_DEFINITIONS_CHECKED, null, null, count);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionInspector;
import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -42,12 +43,14 @@ public InboundConnectorManager inboundConnectorManager(
InboundCorrelationHandler correlationHandler,
ProcessDefinitionInspector processDefinitionInspector,
SecretProviderAggregator secretProviderAggregator,
MetricsRecorder metricsRecorder,
@Autowired(required = false) WebhookConnectorRegistry webhookConnectorRegistry) {
return new InboundConnectorManager(
connectorFactory,
correlationHandler,
processDefinitionInspector,
secretProviderAggregator,
metricsRecorder,
webhookConnectorRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionInspector;
import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry;
import io.camunda.connector.runtime.metrics.ConnectorMetrics.Inbound;
import io.camunda.operate.dto.ProcessDefinition;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -44,6 +46,7 @@
import org.springframework.beans.factory.annotation.Autowired;

public class InboundConnectorManager {

private static final Logger LOG = LoggerFactory.getLogger(InboundConnectorManager.class);

public static final String WEBHOOK_CONTEXT_BPMN_FIELD = "inbound.context";
Expand All @@ -53,6 +56,7 @@ public class InboundConnectorManager {
private final ProcessDefinitionInspector processDefinitionInspector;
private final SecretProviderAggregator secretProviderAggregator;
private final WebhookConnectorRegistry webhookConnectorRegistry;
private final MetricsRecorder metricsRecorder;

// TODO: consider using external storage instead of these collections to allow multi-instance
// setup
Expand All @@ -65,11 +69,13 @@ public InboundConnectorManager(
InboundCorrelationHandler correlationHandler,
ProcessDefinitionInspector processDefinitionInspector,
SecretProviderAggregator secretProviderAggregator,
MetricsRecorder metricsRecorder,
@Autowired(required = false) WebhookConnectorRegistry webhookConnectorRegistry) {
this.connectorFactory = connectorFactory;
this.correlationHandler = correlationHandler;
this.processDefinitionInspector = processDefinitionInspector;
this.secretProviderAggregator = secretProviderAggregator;
this.metricsRecorder = metricsRecorder;
this.webhookConnectorRegistry = webhookConnectorRegistry;
}

Expand Down Expand Up @@ -144,10 +150,16 @@ private void activateConnector(InboundConnectorProperties newProperties) {
LOG.trace("Registering webhook: " + newProperties.getType());
}
inboundContext.reportHealth(Health.up());
metricsRecorder.increase(
Inbound.METRIC_NAME_ACTIVATIONS, Inbound.ACTION_ACTIVATED, newProperties.getType());
} catch (Exception e) {
inboundContext.reportHealth(Health.down(e));
// log and continue with other connectors anyway
LOG.error("Failed to activate inbound connector " + newProperties, e);
metricsRecorder.increase(
Inbound.METRIC_NAME_ACTIVATIONS,
Inbound.ACTION_ACTIVATION_FAILED,
newProperties.getType());
}
}

Expand All @@ -167,6 +179,8 @@ private void addActiveConnector(ActiveInboundConnector connector) {

private void deactivateConnector(InboundConnectorProperties properties) {
findActiveConnector(properties).ifPresent(this::deactivateConnector);
metricsRecorder.increase(
Inbound.METRIC_NAME_ACTIVATIONS, Inbound.ACTION_DEACTIVATED, properties.getType());
}

private void deactivateConnector(ActiveInboundConnector connector) {
Expand All @@ -178,6 +192,10 @@ private void deactivateConnector(ActiveInboundConnector connector) {
webhookConnectorRegistry.deregister(connector);
LOG.trace("Unregistering webhook: " + connector.properties().getType());
}
metricsRecorder.increase(
Inbound.METRIC_NAME_ACTIVATIONS,
Inbound.ACTION_DEACTIVATED,
connector.properties().getType());
} catch (Exception e) {
// log and continue with other connectors anyway
LOG.error("Failed to deactivate inbound connector " + connector, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.runtime.inbound.lifecycle;

import io.camunda.connector.api.inbound.InboundConnectorResult;
import io.camunda.connector.impl.inbound.InboundConnectorProperties;
import io.camunda.connector.runtime.core.feel.FeelEngineWrapper;
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.metrics.ConnectorMetrics.Inbound;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;

public class MeteredInboundCorrelationHandler extends InboundCorrelationHandler {

private final MetricsRecorder metricsRecorder;

public MeteredInboundCorrelationHandler(
ZeebeClient zeebeClient, FeelEngineWrapper feelEngine, MetricsRecorder metricsRecorder) {
super(zeebeClient, feelEngine);
this.metricsRecorder = metricsRecorder;
}

@Override
protected boolean isActivationConditionMet(
InboundConnectorProperties properties, Object context) {
boolean isConditionMet = super.isActivationConditionMet(properties, context);
if (!isConditionMet) {
metricsRecorder.increase(
Inbound.METRIC_NAME_TRIGGERS,
Inbound.ACTION_ACTIVATION_CONDITION_FAILED,
properties.getType());
}
return isConditionMet;
}

@Override
public InboundConnectorResult<?> correlate(
InboundConnectorProperties properties, Object variables) {
metricsRecorder.increase(
Inbound.METRIC_NAME_TRIGGERS, Inbound.ACTION_TRIGGERED, properties.getType());

try {
var result = super.correlate(properties, variables);
if (result.isActivated()) {
metricsRecorder.increase(
Inbound.METRIC_NAME_TRIGGERS, Inbound.ACTION_CORRELATED, properties.getType());
}
return result;
} catch (Exception e) {
metricsRecorder.increase(
Inbound.METRIC_NAME_TRIGGERS, Inbound.ACTION_CORRELATION_FAILED, properties.getType());
throw e;
}
}
}
Loading