Skip to content

Commit

Permalink
Handle duplicate context path's (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
berndruecker authored and chillleader committed Jul 31, 2023
1 parent 6db4b44 commit e757b09
Show file tree
Hide file tree
Showing 14 changed files with 613 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.camunda.connector.inbound.feel;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FeelConfiguration {

/**
* Provides a {@link FeelEngineWrapper} unless already present in the Spring Context
* (as also used by other applications - as soon as we switch to use the one from util
*/
@Bean
//@ConditionalOnMissingBean(FeelEngineWrapper.class)
public FeelEngineWrapper feelEngine() {
return new FeelEngineWrapper();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.camunda.connector.inbound.feel;

import org.camunda.feel.FeelEngine;
import org.camunda.feel.impl.JavaValueMapper;
import org.camunda.feel.impl.SpiServiceLoader;
import org.springframework.stereotype.Service;
import scala.jdk.javaapi.CollectionConverters;
Expand All @@ -10,16 +11,17 @@
import java.util.Objects;
import java.util.Optional;

@Service
/**
* Wait for https://github.com/camunda/connector-sdk/issues/178 to be solved - then delete this here.
*/
public class FeelEngineWrapper {

private final FeelEngine feelEngine;

public FeelEngineWrapper() {
feelEngine =
new FeelEngine.Builder()
.valueMapper(SpiServiceLoader.loadValueMapper())
.functionProvider(SpiServiceLoader.loadFunctionProvider())
.customValueMapper(new JavaValueMapper())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

Expand All @@ -35,13 +36,11 @@ public class ProcessDefinitionImporter {
private InboundConnectorRegistry registry;

@Autowired
private OperateClientFactory operateClientFactory;
private CamundaOperateClient camundaOperateClient;

@Scheduled(fixedDelay = 5000)
public void scheduleImport() throws OperateException {
LOG.trace("Query process deployments...");
// Lazy initialize the client - could be replaced by some Spring tricks later
CamundaOperateClient camundaOperateClient = operateClientFactory.camundaOperateClient();

// TODO: Think about pagination if we really have more process definitions
SearchQuery processDefinitionQuery = new SearchQuery.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
Expand Down Expand Up @@ -49,10 +50,6 @@ public class OperateClientFactory {
@Value("${camunda.operate.client.keycloak-realm:#{null}}")
private String operateKeycloakRealm;

// Cached client, which is lazy initialized
// TODO: Move lazy initialization to the Spring level
private CamundaOperateClient client;

private String getOperateUrl() {
if (clusterId!=null) {
String url = "https://" + region + ".operate.camunda.io/" + clusterId + "/";
Expand Down Expand Up @@ -92,16 +89,10 @@ public AuthInterface getAuthentication(String operateUrl) {
}

public CamundaOperateClient camundaOperateClient() throws OperateException {
if (client==null) {
client = createCamundaOperateClient();
}
return client;
}

private CamundaOperateClient createCamundaOperateClient() throws OperateException {
String operateUrl = getOperateUrl();
return new CamundaOperateClient.Builder()
.operateUrl(operateUrl)
.authentication(getAuthentication(operateUrl)).build();
.authentication(getAuthentication(operateUrl))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package io.camunda.connector.inbound.operate;

import io.camunda.operate.CamundaOperateClient;
import io.camunda.operate.dto.*;
import io.camunda.operate.exception.OperateException;
import io.camunda.operate.search.SearchQuery;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.Header;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.function.Supplier;

/**
* Lifecycle implementation that also directly acts as a CamundaOperateClient by delegating all methods to the
* CamundaOperateClient that is controlled (and kept in the delegate field)
*
*/
@Component
public class OperateClientLifecycle extends CamundaOperateClient implements SmartLifecycle, Supplier<CamundaOperateClient> {

public static final int PHASE = 22222;
protected boolean autoStartup = true;
protected boolean running = false;
protected boolean runningInTestContext = false;

protected final OperateClientFactory factory;
protected CamundaOperateClient delegate;

@Autowired
public OperateClientLifecycle(final OperateClientFactory factory) {
this.factory = factory;
}

/**
* Allows to set the delegate being used manually, helpful for test cases
*/
public OperateClientLifecycle(final CamundaOperateClient delegate) {
this.factory = null;
this.delegate = delegate;
}

@Override
public void start() {
if (factory!=null) {
try {
delegate = factory.camundaOperateClient();
} catch (OperateException e) {
throw new RuntimeException("Could not start Camunda Operate Client: "+ e.getMessage(), e);
}
this.running = true;
} else {
// in test cases we have injected a delegate already
runningInTestContext = true;
}
}

@Override
public void stop() {
try {
delegate = null;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
running = false;
}
}


@Override
public CamundaOperateClient get() {
if (!isRunning()) {
throw new IllegalStateException("CamundaOperateClient is not yet created!");
}
return delegate;
}

@Override
public boolean isAutoStartup() {
return autoStartup;
}


@Override
public boolean isRunning() {
return running;
}

@Override
public int getPhase() {
return PHASE;
}

@Override
public ProcessDefinition getProcessDefinition(Long key) throws OperateException {
return delegate.getProcessDefinition(key);
}

@Override
public List<ProcessDefinition> searchProcessDefinitions(SearchQuery query) throws OperateException {
return delegate.searchProcessDefinitions(query);
}

@Override
public String getProcessDefinitionXml(Long key) throws OperateException {
return delegate.getProcessDefinitionXml(key);
}

@Override
public BpmnModelInstance getProcessDefinitionModel(Long key) throws OperateException {
return delegate.getProcessDefinitionModel(key);
}

@Override
public ProcessInstance getProcessInstance(Long key) throws OperateException {
return delegate.getProcessInstance(key);
}

@Override
public List<ProcessInstance> searchProcessInstances(SearchQuery query) throws OperateException {
return delegate.searchProcessInstances(query);
}

@Override
public FlownodeInstance getFlownodeInstance(Long key) throws OperateException {
return delegate.getFlownodeInstance(key);
}

@Override
public List<FlownodeInstance> searchFlownodeInstances(SearchQuery query) throws OperateException {
return delegate.searchFlownodeInstances(query);
}

@Override
public Incident getIncident(Long key) throws OperateException {
return delegate.getIncident(key);
}

@Override
public List<Incident> searchIncidents(SearchQuery query) throws OperateException {
return delegate.searchIncidents(query);
}

@Override
public Variable getVariable(Long key) throws OperateException {
return delegate.getVariable(key);
}

@Override
public List<Variable> searchVariables(SearchQuery query) throws OperateException {
return delegate.searchVariables(query);
}

@Override
public String getOperateUrl() {
return delegate.getOperateUrl();
}

@Override
public void setOperateUrl(String operateUrl) {
delegate.setOperateUrl(operateUrl);
}

@Override
public Header getAuthHeader() {
return delegate.getAuthHeader();
}

@Override
public void setAuthHeader(Header authHeader) {
delegate.setAuthHeader(authHeader);
}

@Override
public void setTokenExpiration(int tokenExpiration) {
delegate.setTokenExpiration(tokenExpiration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ public InboundConnectorProperties(String bpmnProcessId, int version, long proces
this.processDefinitionKey = processDefinitionKey;
}

/**
* @return a string identifying this connector in log message or responses
*/
public String getConnectorIdentifier() {
return type + "-" + bpmnProcessId + "-" + version;
}

public String getBpmnProcessId() {
return bpmnProcessId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
import io.camunda.connector.inbound.webhook.WebhookConnectorProperties;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;

@Service
public class InboundConnectorRegistry {

private Set<Long> registeredProcessDefinitionKeys = new HashSet<>();
private Map<String, WebhookConnectorProperties> registeredWebhookConnectorsByContextPath = new HashMap<>();
private Map<String, List<WebhookConnectorProperties>> registeredWebhookConnectorsByContextPath = new HashMap<>();
//private List<InboundConnectorProperties> registeredInboundConnectors = new ArrayList<>();

/**
* Reset registry and forget about all connectors, especially useful in tests when the context needs to get cleared
*/
public void reset() {
registeredProcessDefinitionKeys = new HashSet<>();
registeredWebhookConnectorsByContextPath = new HashMap<>();
}

public boolean processDefinitionChecked(long processDefinitionKey) {
return registeredProcessDefinitionKeys.contains(processDefinitionKey);
}
Expand All @@ -26,14 +31,19 @@ public void markProcessDefinitionChecked(long processDefinitionKey) {
public void registerWebhookConnector(InboundConnectorProperties properties) {
registeredProcessDefinitionKeys.add(properties.getProcessDefinitionKey());
WebhookConnectorProperties webhookConnectorProperties = new WebhookConnectorProperties(properties);
registeredWebhookConnectorsByContextPath.put(webhookConnectorProperties.getContext(), webhookConnectorProperties);
String context = webhookConnectorProperties.getContext();

if (!registeredWebhookConnectorsByContextPath.containsKey(context)) {
registeredWebhookConnectorsByContextPath.put(context, new ArrayList<>());
}
registeredWebhookConnectorsByContextPath.get(context).add(webhookConnectorProperties);
}

public boolean containsContextPath(String context) {
return registeredWebhookConnectorsByContextPath.containsKey(context);
}

public WebhookConnectorProperties getWebhookConnectorByContextPath(String context) {
public Collection<WebhookConnectorProperties> getWebhookConnectorByContextPath(String context) {
return registeredWebhookConnectorsByContextPath.get(context);
}

Expand Down
Loading

0 comments on commit e757b09

Please sign in to comment.