diff --git a/bundle/mvn/default-bundle/src/test/resources/application.properties b/bundle/mvn/default-bundle/src/test/resources/application.properties
index e8255a47c2..1df173c076 100644
--- a/bundle/mvn/default-bundle/src/test/resources/application.properties
+++ b/bundle/mvn/default-bundle/src/test/resources/application.properties
@@ -10,9 +10,10 @@ management.endpoints.web.exposure.include=metrics,health,prometheus
camunda.connector.polling.enabled=true
camunda.connector.polling.interval=5000
-camunda.connector.webhook.enabled=true
-
+camunda.operate.client.url=http://localhost:8081
camunda.operate.client.username=demo
camunda.operate.client.password=demo
-camunda.operate.client.url=http://localhost:8081
-logging.level.io.camunda.connector=trace
\ No newline at end of file
+
+logging.level.io.camunda.connectors=TRACE
+
+operate.client.enabled=true
diff --git a/connectors/kafka/element-templates/kafka-inbound-connector.json b/connectors/kafka/element-templates/kafka-inbound-connector.json
new file mode 100644
index 0000000000..fbf0e83584
--- /dev/null
+++ b/connectors/kafka/element-templates/kafka-inbound-connector.json
@@ -0,0 +1,226 @@
+{
+ "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json",
+ "name": "Kafka Consumer Connector",
+ "id": "io.camunda.connectors.inbound.KAFKA.v1",
+ "version": 1,
+ "description": "Consume Kafka Message",
+ "documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/",
+ "category": {
+ "id": "connectors",
+ "name": "Connectors"
+ },
+ "appliesTo": [
+ "bpmn:StartEvent"
+ ],
+ "elementType": {
+ "value": "bpmn:StartEvent"
+ },
+ "groups": [
+ {
+ "id": "authentication",
+ "label": "Authentication"
+ },
+ {
+ "id": "kafka",
+ "label": "Kafka"
+ },
+ {
+ "id": "activation",
+ "label": "Activation"
+ },
+ {
+ "id": "variable-mapping",
+ "label": "Variable Mapping"
+ }
+ ],
+ "properties": [
+ {
+ "type": "Hidden",
+ "value": "io.camunda:connector-kafka-inbound:1",
+ "binding": {
+ "type": "zeebe:property",
+ "name": "inbound.type"
+ }
+ },
+ {
+ "label": "Authentication type",
+ "description": "Username/password or custom",
+ "id": "authenticationType",
+ "group": "authentication",
+ "type": "Dropdown",
+ "value": "credentials",
+ "choices": [
+ {
+ "name": "Credentials",
+ "value": "credentials"
+ },
+ {
+ "name": "Custom",
+ "value": "custom"
+ }
+ ],
+ "binding": {
+ "type": "zeebe:property",
+ "name": "authenticationType"
+ }
+ },
+ {
+ "label": "Username",
+ "description": "Provide the username (must have permissions to produce message to the topic)",
+ "group": "authentication",
+ "type": "String",
+ "feel":"optional",
+ "optional": true,
+ "binding": {
+ "type": "zeebe:property",
+ "name": "authentication.username"
+ },
+ "condition": {
+ "property": "authenticationType",
+ "equals": "credentials"
+ }
+ },
+ {
+ "label": "Password",
+ "description": "Provide a password for the user",
+ "group": "authentication",
+ "type": "String",
+ "feel":"optional",
+ "optional": true,
+ "binding": {
+ "type": "zeebe:property",
+ "name": "authentication.password"
+ },
+ "condition": {
+ "property": "authenticationType",
+ "equals": "credentials"
+ }
+ },
+ {
+ "label": "Bootstrap servers",
+ "description": "Provide bootstrap server(s), comma-delimited if there are multiple",
+ "group": "kafka",
+ "type": "String",
+ "feel":"optional",
+ "binding": {
+ "type": "zeebe:property",
+ "name": "topic.bootstrapServers"
+ },
+ "constraints": {
+ "notEmpty": true
+ }
+ },
+ {
+ "label": "Topic",
+ "description": "Provide the topic name",
+ "group": "kafka",
+ "type": "String",
+ "feel":"optional",
+ "binding": {
+ "type": "zeebe:property",
+ "name": "topic.topicName"
+ },
+ "constraints": {
+ "notEmpty": true
+ }
+ },
+ {
+ "label": "Additional properties",
+ "description": "Provide additional Kafka consumer properties in JSON",
+ "group": "kafka",
+ "type": "String",
+ "optional": true,
+ "feel":"required",
+ "binding": {
+ "type": "zeebe:property",
+ "name": "additionalProperties"
+ }
+ },
+ {
+ "label": "Offsets",
+ "description": "Comma-separated list of offsets, e.g., '10,23' or '=[ \"10\", \"23\"]'. If specified, it has to have the same number of values as the number of partitions",
+ "group": "kafka",
+ "type": "String",
+ "feel": "optional",
+ "optional": true,
+ "binding": {
+ "type": "zeebe:property",
+ "name": "offsets"
+ }
+ },
+ {
+ "label": "Auto offset reset",
+ "description": "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets",
+ "id": "autoOffsetReset",
+ "group": "kafka",
+ "type": "Dropdown",
+ "value": "latest",
+ "choices": [
+ {
+ "name": "Latest",
+ "value": "latest"
+ },
+ {
+ "name": "Earliest",
+ "value": "earliest"
+ },
+ {
+ "name": "None",
+ "value": "none"
+ }
+ ],
+ "binding": {
+ "type": "zeebe:property",
+ "name": "autoOffsetReset"
+ }
+ },
+ {
+ "label": "Activation condition",
+ "type": "String",
+ "group": "activation",
+ "feel": "required",
+ "optional": true,
+ "binding": {
+ "type": "zeebe:property",
+ "name": "activationCondition"
+ },
+ "description": "Condition under which the Connector triggers. Leave empty to catch all events"
+ },
+ {
+ "label": "Result variable",
+ "type": "String",
+ "group": "variable-mapping",
+ "optional": true,
+ "binding": {
+ "type": "zeebe:property",
+ "name": "resultVariable"
+ },
+ "description": "Name of variable to store the result of the Connector in"
+ },
+ {
+ "label": "Result expression",
+ "description": "Expression to map the inbound payload to process variables",
+ "group": "variable-mapping",
+ "type": "Text",
+ "feel": "required",
+ "binding": {
+ "type": "zeebe:taskHeader",
+ "key": "resultExpression"
+ }
+ },
+ {
+ "label": "Error expression",
+ "description": "Expression to handle errors. Details in the documentation",
+ "group": "errors",
+ "type": "Text",
+ "feel": "required",
+ "binding": {
+ "type": "zeebe:taskHeader",
+ "key": "errorExpression"
+ }
+ }
+ ],
+ "icon": {
+ "contents": "data:image/svg+xml;utf8,%3Csvg width='18' height='18' viewBox='0 0 256 416' xmlns='http://www.w3.org/2000/svg' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M201.816 230.216c-16.186 0-30.697 7.171-40.634 18.461l-25.463-18.026c2.703-7.442 4.255-15.433 4.255-23.797 0-8.219-1.498-16.076-4.112-23.408l25.406-17.835c9.936 11.233 24.409 18.365 40.548 18.365 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184-29.875 0-54.184 24.305-54.184 54.184 0 5.348.808 10.505 2.258 15.389l-25.423 17.844c-10.62-13.175-25.911-22.374-43.333-25.182v-30.64c24.544-5.155 43.037-26.962 43.037-53.019C124.171 24.305 99.862 0 69.987 0 40.112 0 15.803 24.305 15.803 54.184c0 25.708 18.014 47.246 42.067 52.769v31.038C25.044 143.753 0 172.401 0 206.854c0 34.621 25.292 63.374 58.355 68.94v32.774c-24.299 5.341-42.552 27.011-42.552 52.894 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-25.883-18.253-47.553-42.552-52.894v-32.775a69.965 69.965 0 0 0 42.6-24.776l25.633 18.143c-1.423 4.84-2.22 9.946-2.22 15.24 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184zm0-126.695c14.487 0 26.27 11.788 26.27 26.271s-11.783 26.27-26.27 26.27-26.27-11.787-26.27-26.27c0-14.483 11.783-26.271 26.27-26.271zm-158.1-49.337c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27zm52.541 307.278c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27zm-26.272-117.97c-20.205 0-36.642-16.434-36.642-36.638 0-20.205 16.437-36.642 36.642-36.642 20.204 0 36.641 16.437 36.641 36.642 0 20.204-16.437 36.638-36.641 36.638zm131.831 67.179c-14.487 0-26.27-11.788-26.27-26.271s11.783-26.27 26.27-26.27 26.27 11.787 26.27 26.27c0 14.483-11.783 26.271-26.27 26.271z' style='fill:%23231f20'/%3E%3C/svg%3E"
+ }
+}
\ No newline at end of file
diff --git a/connectors/kafka/pom.xml b/connectors/kafka/pom.xml
index 450c2f77af..854180f070 100644
--- a/connectors/kafka/pom.xml
+++ b/connectors/kafka/pom.xml
@@ -31,16 +31,30 @@ except in compliance with the proprietary license.
+
+ com.google.code.gson
+ gson
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
org.apache.kafka
kafka-clients
-
- com.google.code.gson
- gson
- test
+ org.apache.commons
+ commons-text
+
+
+
+ org.testcontainers
+ kafka
+ ${version.testcontainers}
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.java
new file mode 100644
index 0000000000..415c8d8b2b
--- /dev/null
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. Licensed under a proprietary license.
+ * See the License.txt file for more information. You may not use this file
+ * except in compliance with the proprietary license.
+ */
+package io.camunda.connector.kafka.inbound;
+
+import static io.camunda.connector.kafka.inbound.KafkaPropertyTransformer.convertConsumerRecordToKafkaInboundMessage;
+import static io.camunda.connector.kafka.inbound.KafkaPropertyTransformer.getKafkaProperties;
+import static io.camunda.connector.kafka.inbound.KafkaPropertyTransformer.getOffsets;
+
+import io.camunda.connector.api.inbound.InboundConnectorContext;
+import io.camunda.connector.api.inbound.InboundConnectorResult;
+import io.camunda.connector.impl.ConnectorInputException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConnectorConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectorConsumer.class);
+
+ private final InboundConnectorContext context;
+
+ public CompletableFuture> future;
+
+ Consumer consumer;
+
+ boolean shouldLoop = true;
+
+ private final KafkaConnectorProperties elementProps;
+
+ private final Function> consumerCreatorFunction;
+
+ public KafkaConnectorConsumer(
+ final Function> consumerCreatorFunction,
+ final InboundConnectorContext connectorContext,
+ final KafkaConnectorProperties elementProps) {
+ this.consumerCreatorFunction = consumerCreatorFunction;
+ this.context = connectorContext;
+ this.elementProps = elementProps;
+ }
+
+ public void startConsumer() {
+ this.consumer =
+ createConsumer(
+ this.consumerCreatorFunction,
+ getKafkaProperties(elementProps, this.context),
+ elementProps,
+ getOffsets(elementProps.getOffsets()));
+ LOG.info("Kafka inbound connector initialized");
+ this.future =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ while (shouldLoop) {
+ ConsumerRecords records =
+ this.consumer.poll(Duration.ofMillis(500));
+ for (ConsumerRecord record : records) {
+ handleMessage(record);
+ }
+ if (!records.isEmpty()) {
+ this.consumer.commitSync();
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("Failed to execute connector: {}", ex.getMessage());
+ throw ex;
+ }
+ LOG.debug("Kafka inbound loop finished");
+ return null;
+ });
+ }
+
+ private void handleMessage(ConsumerRecord record) {
+ LOG.trace("Kafka message received: key = %s, value = %s%n", record.key(), record.value());
+ InboundConnectorResult> result =
+ this.context.correlate(convertConsumerRecordToKafkaInboundMessage(record));
+ if (result.isActivated()) {
+ LOG.debug("Inbound event correlated successfully: {}", result.getResponseData());
+ } else {
+ LOG.debug("Inbound event not correlated: {}", result.getErrorData());
+ }
+ }
+
+ public void stopConsumer() throws ExecutionException, InterruptedException {
+ this.shouldLoop = false;
+ if (this.future != null && !this.future.isDone()) {
+ this.future.get();
+ }
+ this.consumer.close();
+ }
+
+ private Consumer createConsumer(
+ final Function> consumerCreatorFunction,
+ final Properties kafkaProps,
+ final KafkaConnectorProperties elementProps,
+ final List offsets) {
+ // init
+ Consumer consumer = consumerCreatorFunction.apply(kafkaProps);
+
+ // dynamically assign partitions to be able to handle offsets
+ List partitions = consumer.partitionsFor(elementProps.getTopic().getTopicName());
+ List topicPartitions =
+ partitions.stream()
+ .map(partition -> new TopicPartition(partition.topic(), partition.partition()))
+ .collect(Collectors.toList());
+ consumer.assign(topicPartitions);
+
+ // set partition offsets if necessary
+ if (offsets != null) {
+ if (partitions.size() != offsets.size()) {
+ throw new ConnectorInputException(
+ new IllegalArgumentException(
+ "Number of offsets provided is not equal the number of partitions!"));
+ }
+ for (int i = 0; i < offsets.size(); i++) {
+ consumer.seek(topicPartitions.get(i), offsets.get(i));
+ }
+ }
+ return consumer;
+ }
+}
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorProperties.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorProperties.java
new file mode 100644
index 0000000000..ace49102f8
--- /dev/null
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaConnectorProperties.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. Licensed under a proprietary license.
+ * See the License.txt file for more information. You may not use this file
+ * except in compliance with the proprietary license.
+ */
+package io.camunda.connector.kafka.inbound;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.camunda.connector.api.annotation.Secret;
+import io.camunda.connector.kafka.outbound.model.KafkaAuthentication;
+import io.camunda.connector.kafka.outbound.model.KafkaTopic;
+import java.util.HashMap;
+import java.util.Map;
+import javax.validation.Valid;
+import javax.validation.constraints.NotNull;
+
+public class KafkaConnectorProperties {
+
+ @NotNull private String authenticationType;
+
+ @Valid @Secret private KafkaAuthentication authentication;
+
+ @Valid @NotNull @Secret private KafkaTopic topic;
+
+ private Map additionalProperties = new HashMap<>();
+
+ @Secret private String activationCondition;
+
+ private Object offsets;
+
+ @NotNull private AutoOffsetReset autoOffsetReset = AutoOffsetReset.NONE;
+
+ public enum AutoOffsetReset {
+ @JsonProperty("none")
+ NONE("none"),
+ @JsonProperty("latest")
+ LATEST("latest"),
+ @JsonProperty("earliest")
+ EARLIEST("earliest");
+
+ public final String label;
+
+ private AutoOffsetReset(String label) {
+ this.label = label;
+ }
+
+ @Override
+ public String toString() {
+ return this.label;
+ }
+ }
+
+ public String getAuthenticationType() {
+ return authenticationType;
+ }
+
+ public void setAuthenticationType(String authenticationType) {
+ this.authenticationType = authenticationType;
+ }
+
+ public AutoOffsetReset getAutoOffsetReset() {
+ return autoOffsetReset;
+ }
+
+ public Object getOffsets() {
+ return offsets;
+ }
+
+ public void setOffsets(Object offsets) {
+ this.offsets = offsets;
+ }
+
+ public void setAutoOffsetReset(AutoOffsetReset autoOffsetReset) {
+ this.autoOffsetReset = autoOffsetReset;
+ }
+
+ public String getActivationCondition() {
+ return activationCondition;
+ }
+
+ public void setActivationCondition(String activationCondition) {
+ this.activationCondition = activationCondition;
+ }
+
+ public KafkaAuthentication getAuthentication() {
+ return authentication;
+ }
+
+ public void setAuthentication(KafkaAuthentication authentication) {
+ this.authentication = authentication;
+ }
+
+ public KafkaTopic getTopic() {
+ return topic;
+ }
+
+ public void setTopic(KafkaTopic topic) {
+ this.topic = topic;
+ }
+
+ public Map getAdditionalProperties() {
+ return additionalProperties;
+ }
+
+ public void setAdditionalProperties(Map additionalProperties) {
+ this.additionalProperties = additionalProperties;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaConnectorProperties{"
+ + "authenticationType='"
+ + authenticationType
+ + '\''
+ + ", authentication="
+ + authentication
+ + ", topic="
+ + topic
+ + ", additionalProperties="
+ + additionalProperties
+ + ", activationCondition='"
+ + activationCondition
+ + '\''
+ + ", offsets="
+ + offsets
+ + ", autoOffsetReset='"
+ + autoOffsetReset
+ + '\''
+ + '}';
+ }
+}
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaExecutable.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaExecutable.java
new file mode 100644
index 0000000000..5f2e278922
--- /dev/null
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaExecutable.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. Licensed under a proprietary license.
+ * See the License.txt file for more information. You may not use this file
+ * except in compliance with the proprietary license.
+ */
+package io.camunda.connector.kafka.inbound;
+
+import io.camunda.connector.api.annotation.InboundConnector;
+import io.camunda.connector.api.inbound.InboundConnectorContext;
+import io.camunda.connector.api.inbound.InboundConnectorExecutable;
+import java.util.Properties;
+import java.util.function.Function;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InboundConnector(name = "KAFKA_INBOUND", type = "io.camunda:connector-kafka-inbound:1")
+public class KafkaExecutable implements InboundConnectorExecutable {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaExecutable.class);
+
+ private final Function> consumerCreatorFunction;
+
+ public KafkaConnectorConsumer kafkaConnectorConsumer;
+
+ public KafkaExecutable(
+ final Function> consumerCreatorFunction) {
+ this.consumerCreatorFunction = consumerCreatorFunction;
+ }
+
+ public KafkaExecutable() {
+ this(KafkaConsumer::new);
+ }
+
+ @Override
+ public void activate(InboundConnectorContext connectorContext) {
+ KafkaConnectorProperties elementProps =
+ connectorContext.getPropertiesAsType(KafkaConnectorProperties.class);
+ LOG.info("Subscription activation requested by the Connector runtime: {}", elementProps);
+
+ connectorContext.replaceSecrets(elementProps);
+ connectorContext.validate(elementProps);
+
+ this.kafkaConnectorConsumer =
+ new KafkaConnectorConsumer(consumerCreatorFunction, connectorContext, elementProps);
+ this.kafkaConnectorConsumer.startConsumer();
+ }
+
+ @Override
+ public void deactivate() {
+ LOG.info("Subscription deactivation requested by the Connector runtime");
+ try {
+ this.kafkaConnectorConsumer.stopConsumer();
+ } catch (Exception e) {
+ LOG.error("Failed to cancel Connector execution: {}", e.getMessage());
+ }
+ }
+}
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaInboundMessage.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaInboundMessage.java
new file mode 100644
index 0000000000..45a864ffb1
--- /dev/null
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaInboundMessage.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. Licensed under a proprietary license.
+ * See the License.txt file for more information. You may not use this file
+ * except in compliance with the proprietary license.
+ */
+package io.camunda.connector.kafka.inbound;
+
+import java.util.Objects;
+
+public class KafkaInboundMessage {
+
+ private String key;
+
+ private String rawValue;
+
+ private Object value;
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getRawValue() {
+ return rawValue;
+ }
+
+ public void setRawValue(String stringValue) {
+ this.rawValue = stringValue;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaInboundMessage{"
+ + "key='"
+ + key
+ + '\''
+ + ", rawValue='"
+ + rawValue
+ + '\''
+ + ", value="
+ + value
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ KafkaInboundMessage that = (KafkaInboundMessage) o;
+ return Objects.equals(key, that.key)
+ && Objects.equals(rawValue, that.rawValue)
+ && Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, rawValue, value);
+ }
+}
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaPropertyTransformer.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaPropertyTransformer.java
new file mode 100644
index 0000000000..576f65dea1
--- /dev/null
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/inbound/KafkaPropertyTransformer.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. Licensed under a proprietary license.
+ * See the License.txt file for more information. You may not use this file
+ * except in compliance with the proprietary license.
+ */
+package io.camunda.connector.kafka.inbound;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonSyntaxException;
+import io.camunda.connector.api.inbound.InboundConnectorContext;
+import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
+import io.camunda.connector.kafka.supplier.GsonSupplier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaPropertyTransformer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaPropertyTransformer.class);
+
+ static final String DEFAULT_GROUP_ID_PREFIX = "kafka-inbound-connector";
+
+ protected static final String DEFAULT_KEY_DESERIALIZER =
+ "org.apache.kafka.common.serialization.StringDeserializer";
+
+ public static List getOffsets(Object offsets) {
+ if (offsets == null) {
+ return null;
+ }
+ List offsetCollection = null;
+ if (offsets instanceof Collection>) {
+ offsetCollection = (List) offsets;
+ } else if (offsets instanceof String) {
+ offsetCollection = convertStringToList((String) offsets);
+ } else {
+ // We accept only List or String input for offsets
+ throw new IllegalArgumentException(
+ "Invalid input type for offsets. Supported types are: List and String. Got "
+ + offsets.getClass()
+ + " instead.");
+ }
+ return offsetCollection;
+ }
+
+ public static List convertStringToList(String string) {
+ if (StringUtils.isBlank(string)) {
+ return new ArrayList<>();
+ }
+ return Arrays.stream(string.split(","))
+ .map(s -> Long.parseLong(s.trim()))
+ .collect(Collectors.toList());
+ }
+
+ public static Properties getKafkaProperties(
+ KafkaConnectorProperties props, InboundConnectorContext context) {
+ KafkaConnectorRequest connectorRequest = new KafkaConnectorRequest();
+ connectorRequest.setTopic(props.getTopic());
+ connectorRequest.setAuthentication(props.getAuthentication());
+ connectorRequest.setAdditionalProperties(props.getAdditionalProperties());
+ final Properties kafkaProps = connectorRequest.assembleKafkaClientProperties();
+ if (kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+ kafkaProps.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ DEFAULT_GROUP_ID_PREFIX
+ + "-"
+ + context
+ .getProperties()
+ .getBpmnProcessId()); // GROUP_ID_CONFIG is mandatory. It will be used to assign a
+ // clint id
+ }
+ kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, props.getAutoOffsetReset().toString());
+ kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ kafkaProps.put(TopicConfig.RETENTION_MS_CONFIG, -1);
+ kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
+ kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
+ return kafkaProps;
+ }
+
+ public static KafkaInboundMessage convertConsumerRecordToKafkaInboundMessage(
+ ConsumerRecord consumerRecord) {
+ KafkaInboundMessage kafkaInboundMessage = new KafkaInboundMessage();
+ kafkaInboundMessage.setKey(consumerRecord.key());
+ kafkaInboundMessage.setRawValue(consumerRecord.value());
+ try {
+ JsonElement bodyAsJsonElement =
+ GsonSupplier.gson()
+ .fromJson(StringEscapeUtils.unescapeJson(consumerRecord.value()), JsonElement.class);
+ kafkaInboundMessage.setValue(GsonSupplier.gson().fromJson(bodyAsJsonElement, Object.class));
+ } catch (JsonSyntaxException e) {
+ LOG.debug("Cannot parse value to json object -> use the raw value");
+ kafkaInboundMessage.setValue(kafkaInboundMessage.getRawValue());
+ }
+ return kafkaInboundMessage;
+ }
+}
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/KafkaConnectorFunction.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/KafkaConnectorFunction.java
similarity index 94%
rename from connectors/kafka/src/main/java/io/camunda/connector/KafkaConnectorFunction.java
rename to connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/KafkaConnectorFunction.java
index bebb0d0b5c..5861f50c4d 100644
--- a/connectors/kafka/src/main/java/io/camunda/connector/KafkaConnectorFunction.java
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/KafkaConnectorFunction.java
@@ -4,14 +4,14 @@
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
-package io.camunda.connector;
+package io.camunda.connector.kafka.outbound;
import io.camunda.connector.api.annotation.OutboundConnector;
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
-import io.camunda.connector.model.KafkaConnectorRequest;
-import io.camunda.connector.model.KafkaConnectorResponse;
+import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
+import io.camunda.connector.kafka.outbound.model.KafkaConnectorResponse;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaAuthentication.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaAuthentication.java
similarity index 98%
rename from connectors/kafka/src/main/java/io/camunda/connector/model/KafkaAuthentication.java
rename to connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaAuthentication.java
index cbe5bc56bc..9a99d2b00e 100644
--- a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaAuthentication.java
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaAuthentication.java
@@ -4,7 +4,7 @@
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
-package io.camunda.connector.model;
+package io.camunda.connector.kafka.outbound.model;
import io.camunda.connector.api.annotation.Secret;
import io.camunda.connector.impl.ConnectorInputException;
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaConnectorRequest.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequest.java
similarity index 94%
rename from connectors/kafka/src/main/java/io/camunda/connector/model/KafkaConnectorRequest.java
rename to connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequest.java
index e1ebee2724..23a4c0ab18 100644
--- a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaConnectorRequest.java
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequest.java
@@ -4,7 +4,7 @@
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
-package io.camunda.connector.model;
+package io.camunda.connector.kafka.outbound.model;
import io.camunda.connector.api.annotation.Secret;
import java.util.HashMap;
@@ -76,8 +76,10 @@ public Properties assembleKafkaClientProperties() {
Properties topicProps = this.topic.produceTopicProperties();
props.putAll(topicProps);
- Properties messageProps = this.message.produceMessageProperties();
- props.putAll(messageProps);
+ if (this.message != null) { // can be valid in case of inbound
+ Properties messageProps = this.message.produceMessageProperties();
+ props.putAll(messageProps);
+ }
// Step 2: set default recommended properties
// See the list of possible values at org.apache.kafka.clients.producer.ProducerConfig
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaConnectorResponse.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorResponse.java
similarity index 97%
rename from connectors/kafka/src/main/java/io/camunda/connector/model/KafkaConnectorResponse.java
rename to connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorResponse.java
index 7078d1bb8c..04339b93d6 100644
--- a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaConnectorResponse.java
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorResponse.java
@@ -4,7 +4,7 @@
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
-package io.camunda.connector.model;
+package io.camunda.connector.kafka.outbound.model;
import java.util.Objects;
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaMessage.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaMessage.java
similarity index 97%
rename from connectors/kafka/src/main/java/io/camunda/connector/model/KafkaMessage.java
rename to connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaMessage.java
index 54c1521899..d0203d6ea3 100644
--- a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaMessage.java
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaMessage.java
@@ -4,7 +4,7 @@
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
-package io.camunda.connector.model;
+package io.camunda.connector.kafka.outbound.model;
import java.util.Objects;
import java.util.Properties;
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaTopic.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaTopic.java
similarity index 97%
rename from connectors/kafka/src/main/java/io/camunda/connector/model/KafkaTopic.java
rename to connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaTopic.java
index 377f587148..bd551f8c76 100644
--- a/connectors/kafka/src/main/java/io/camunda/connector/model/KafkaTopic.java
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/outbound/model/KafkaTopic.java
@@ -4,7 +4,7 @@
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
-package io.camunda.connector.model;
+package io.camunda.connector.kafka.outbound.model;
import io.camunda.connector.api.annotation.Secret;
import java.util.Objects;
diff --git a/connectors/kafka/src/main/java/io/camunda/connector/kafka/supplier/GsonSupplier.java b/connectors/kafka/src/main/java/io/camunda/connector/kafka/supplier/GsonSupplier.java
new file mode 100644
index 0000000000..1c8bd9d7b5
--- /dev/null
+++ b/connectors/kafka/src/main/java/io/camunda/connector/kafka/supplier/GsonSupplier.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. Licensed under a proprietary license.
+ * See the License.txt file for more information. You may not use this file
+ * except in compliance with the proprietary license.
+ */
+package io.camunda.connector.kafka.supplier;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public final class GsonSupplier {
+
+ private static final Gson gson = new GsonBuilder().create();
+
+ private GsonSupplier() {}
+
+ public static Gson gson() {
+ return gson;
+ }
+}
diff --git a/connectors/kafka/src/main/resources/META-INF/services/io.camunda.connector.api.inbound.InboundConnectorExecutable b/connectors/kafka/src/main/resources/META-INF/services/io.camunda.connector.api.inbound.InboundConnectorExecutable
new file mode 100644
index 0000000000..8b09375da3
--- /dev/null
+++ b/connectors/kafka/src/main/resources/META-INF/services/io.camunda.connector.api.inbound.InboundConnectorExecutable
@@ -0,0 +1 @@
+io.camunda.connector.kafka.inbound.KafkaExecutable
\ No newline at end of file
diff --git a/connectors/kafka/src/main/resources/META-INF/services/io.camunda.connector.api.outbound.OutboundConnectorFunction b/connectors/kafka/src/main/resources/META-INF/services/io.camunda.connector.api.outbound.OutboundConnectorFunction
index 93bb3e7a97..d315b680f6 100644
--- a/connectors/kafka/src/main/resources/META-INF/services/io.camunda.connector.api.outbound.OutboundConnectorFunction
+++ b/connectors/kafka/src/main/resources/META-INF/services/io.camunda.connector.api.outbound.OutboundConnectorFunction
@@ -1 +1 @@
-io.camunda.connector.KafkaConnectorFunction
\ No newline at end of file
+io.camunda.connector.kafka.outbound.KafkaConnectorFunction
\ No newline at end of file
diff --git a/connectors/kafka/src/test/java/io/camunda/connector/kafka/inbound/KafkaExecutableTest.java b/connectors/kafka/src/test/java/io/camunda/connector/kafka/inbound/KafkaExecutableTest.java
new file mode 100644
index 0000000000..d1bfadd1bc
--- /dev/null
+++ b/connectors/kafka/src/test/java/io/camunda/connector/kafka/inbound/KafkaExecutableTest.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. Licensed under a proprietary license.
+ * See the License.txt file for more information. You may not use this file
+ * except in compliance with the proprietary license.
+ */
+package io.camunda.connector.kafka.inbound;
+
+import static io.camunda.connector.kafka.inbound.KafkaPropertyTransformer.DEFAULT_KEY_DESERIALIZER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.Gson;
+import io.camunda.connector.kafka.outbound.model.KafkaTopic;
+import io.camunda.connector.test.inbound.InboundConnectorContextBuilder;
+import io.camunda.connector.test.inbound.InboundConnectorPropertiesBuilder;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.internal.stubbing.answers.AnswersWithDelay;
+import org.mockito.internal.stubbing.answers.Returns;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaExecutableTest {
+ private InboundConnectorContextBuilder.TestInboundConnectorContext context;
+ private InboundConnectorContextBuilder.TestInboundConnectorContext originalContext;
+ private List topicPartitions;
+ private KafkaConnectorProperties kafkaConnectorProperties;
+ @Mock private KafkaConsumer mockConsumer;
+
+ private String topic;
+
+ private final String processId = "Process_id";
+
+ @BeforeEach
+ public void setUp() {
+ topic = "my-topic";
+ topicPartitions =
+ Arrays.asList(
+ new PartitionInfo(topic, 0, null, null, null),
+ new PartitionInfo(topic, 1, null, null, null));
+ KafkaTopic kafkaTopic = new KafkaTopic();
+ kafkaTopic.setTopicName(topic);
+ kafkaTopic.setBootstrapServers("localhost:9092");
+ kafkaConnectorProperties = new KafkaConnectorProperties();
+ kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.NONE);
+ kafkaConnectorProperties.setAuthenticationType("custom");
+ kafkaConnectorProperties.setTopic(kafkaTopic);
+ String jsonString =
+ "{'authenticationType':'custom', "
+ + "'topic.topicName':'"
+ + topic
+ + "',"
+ + "'topic.bootstrapServers':'localhost:9092',"
+ + "'autoOffsetReset':'none'}";
+ Gson gson = new Gson();
+ Map propertiesMap = gson.fromJson(jsonString, Map.class);
+ context =
+ InboundConnectorContextBuilder.create()
+ .secret("test", "test")
+ .propertiesAsType(kafkaConnectorProperties)
+ .properties(
+ InboundConnectorPropertiesBuilder.create()
+ .properties(propertiesMap)
+ .bpmnProcessId(processId))
+ .build();
+ originalContext = context;
+ }
+
+ @Test
+ public void testActivateMainFunctionality() throws Exception {
+ // Given
+ when(mockConsumer.partitionsFor(topic)).thenReturn(topicPartitions);
+ doNothing().when(mockConsumer).assign(any());
+ doAnswer(new AnswersWithDelay(100, new Returns(new ConsumerRecords<>(new HashMap<>()))))
+ .when(mockConsumer)
+ .poll(any());
+ KafkaExecutable kafkaExecutable = getConsumerMock();
+
+ // When
+ kafkaExecutable.activate(context);
+
+ // Then
+ assertNotNull(kafkaExecutable.kafkaConnectorConsumer.consumer);
+ assertEquals(mockConsumer, kafkaExecutable.kafkaConnectorConsumer.consumer);
+ assertEquals(originalContext, context);
+ verify(mockConsumer, times(1)).partitionsFor(topic);
+ verify(mockConsumer, times(1)).assign(argThat(list -> list.size() == topicPartitions.size()));
+ verify(mockConsumer, timeout(100)).poll(any());
+ assertNotNull(kafkaExecutable.kafkaConnectorConsumer.future);
+ kafkaExecutable.kafkaConnectorConsumer.shouldLoop = false;
+ kafkaExecutable.kafkaConnectorConsumer.future.get(3, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testActivateAndDeactivate() throws Exception {
+ // Given
+ when(mockConsumer.partitionsFor(topic)).thenReturn(topicPartitions);
+ doNothing().when(mockConsumer).assign(any());
+ KafkaExecutable kafkaExecutable = getConsumerMock();
+
+ // When
+ kafkaExecutable.activate(context);
+ kafkaExecutable.deactivate();
+
+ // Then
+ assertEquals(originalContext, context);
+ assertNotNull(kafkaExecutable.kafkaConnectorConsumer.consumer);
+ assertFalse(kafkaExecutable.kafkaConnectorConsumer.shouldLoop);
+ kafkaExecutable.kafkaConnectorConsumer.future.get(3, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testGetKafkaProperties() throws Exception {
+ // When
+ Properties properties =
+ KafkaPropertyTransformer.getKafkaProperties(kafkaConnectorProperties, context);
+
+ // Then
+ assertEquals("localhost:9092", properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ assertEquals(
+ DEFAULT_KEY_DESERIALIZER, properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+ assertEquals(
+ DEFAULT_KEY_DESERIALIZER, properties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+ assertEquals(false, properties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ assertEquals(
+ "kafka-inbound-connector-" + processId, properties.get(ConsumerConfig.GROUP_ID_CONFIG));
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideStringsForGetOffsets")
+ public void testGetOffsets(Object input, List expected) {
+ // When
+ var result = KafkaPropertyTransformer.getOffsets(input);
+
+ // Then
+ assertEquals(expected, result);
+ }
+
+ private static Stream provideStringsForGetOffsets() {
+ return Stream.of(
+ Arguments.of("10", Arrays.asList(10L)),
+ Arguments.of("10,12", Arrays.asList(10L, 12L)),
+ Arguments.of(Arrays.asList(10L, 12L), Arrays.asList(10L, 12L)),
+ Arguments.of("1,2,3,4,5", Arrays.asList(1L, 2L, 3L, 4L, 5L)));
+ }
+
+ @Test
+ public void testConvertConsumerRecordToKafkaInboundMessage() {
+ // When
+ KafkaInboundMessage kafkaInboundMessage =
+ KafkaPropertyTransformer.convertConsumerRecordToKafkaInboundMessage(
+ new ConsumerRecord<>("my-topic", 0, 0, "my-key", "{\"foo\": \"bar\"}"));
+
+ // Then
+ assertEquals("my-key", kafkaInboundMessage.getKey());
+ assertEquals("{\"foo\": \"bar\"}", kafkaInboundMessage.getRawValue());
+ Map expectedValue = new HashMap<>();
+ expectedValue.put("foo", "bar");
+ assertEquals(expectedValue, kafkaInboundMessage.getValue());
+ }
+
+ public KafkaExecutable getConsumerMock() {
+ return new KafkaExecutable(properties -> mockConsumer);
+ }
+}
diff --git a/connectors/kafka/src/test/java/io/camunda/connector/kafka/integration/KafkaIntegrationTest.java b/connectors/kafka/src/test/java/io/camunda/connector/kafka/integration/KafkaIntegrationTest.java
new file mode 100644
index 0000000000..64618cc2b1
--- /dev/null
+++ b/connectors/kafka/src/test/java/io/camunda/connector/kafka/integration/KafkaIntegrationTest.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. Licensed under a proprietary license.
+ * See the License.txt file for more information. You may not use this file
+ * except in compliance with the proprietary license.
+ */
+package io.camunda.connector.kafka.integration;
+
+import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+import com.google.gson.Gson;
+import io.camunda.connector.api.outbound.OutboundConnectorContext;
+import io.camunda.connector.api.outbound.OutboundConnectorFunction;
+import io.camunda.connector.impl.inbound.result.MessageCorrelationResult;
+import io.camunda.connector.kafka.inbound.KafkaConnectorProperties;
+import io.camunda.connector.kafka.inbound.KafkaExecutable;
+import io.camunda.connector.kafka.inbound.KafkaInboundMessage;
+import io.camunda.connector.kafka.outbound.KafkaConnectorFunction;
+import io.camunda.connector.kafka.outbound.model.KafkaAuthentication;
+import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
+import io.camunda.connector.kafka.outbound.model.KafkaConnectorResponse;
+import io.camunda.connector.kafka.outbound.model.KafkaMessage;
+import io.camunda.connector.kafka.outbound.model.KafkaTopic;
+import io.camunda.connector.test.inbound.InboundConnectorContextBuilder;
+import io.camunda.connector.test.inbound.InboundConnectorPropertiesBuilder;
+import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.junit.ClassRule;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Disabled // to be run manually
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class KafkaIntegrationTest {
+
+ private static final String TOPIC = "my-topic";
+ private static String BOOTSTRAP_SERVERS;
+
+ private final String processId = "Process_id";
+
+ @ClassRule
+ private static final KafkaContainer kafkaContainer =
+ new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
+
+ @BeforeAll
+ public static void init() {
+ kafkaContainer.start();
+ createTopics(TOPIC);
+ BOOTSTRAP_SERVERS = kafkaContainer.getBootstrapServers().replace("PLAINTEXT://", "");
+ }
+
+ private static void createTopics(String... topics) {
+ var newTopics =
+ Arrays.stream(topics)
+ .map(topic -> new NewTopic(topic, 1, (short) 1))
+ .collect(Collectors.toList());
+ try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, getKafkaBrokers()))) {
+ admin.createTopics(newTopics);
+ admin.createPartitions(Map.of(TOPIC, NewPartitions.increaseTo(2)));
+ }
+ }
+
+ private static String getKafkaBrokers() {
+ Integer mappedPort = kafkaContainer.getFirstMappedPort();
+ return String.format("%s:%d", "localhost", mappedPort);
+ }
+
+ @Test
+ @Order(1)
+ void publishMessageWithOutboundConnector() throws Exception {
+ // Given
+ OutboundConnectorFunction function = new KafkaConnectorFunction();
+
+ KafkaConnectorRequest request = new KafkaConnectorRequest();
+ KafkaMessage kafkaMessage = new KafkaMessage();
+ kafkaMessage.setKey("1");
+ kafkaMessage.setValue("{'message': 'Test message'}");
+ KafkaTopic kafkaTopic = new KafkaTopic();
+ kafkaTopic.setTopicName(TOPIC);
+ kafkaTopic.setBootstrapServers(BOOTSTRAP_SERVERS);
+ KafkaAuthentication kafkaAuthentication = new KafkaAuthentication();
+ request.setMessage(kafkaMessage);
+ request.setTopic(kafkaTopic);
+ request.setAuthentication(kafkaAuthentication);
+
+ OutboundConnectorContext context =
+ OutboundConnectorContextBuilder.create().variables(request).build();
+
+ // When
+ var result = function.execute(context);
+
+ // Then
+ assertInstanceOf(KafkaConnectorResponse.class, result);
+ KafkaConnectorResponse castedResult = (KafkaConnectorResponse) result;
+ assertEquals(TOPIC, castedResult.getTopic());
+ }
+
+ @Test
+ @Order(2)
+ void setInvalidOffsetForInboundConnectorWhenAutoOffsetResetIsNone() throws Exception {
+ // Given
+ KafkaTopic kafkaTopic = new KafkaTopic();
+ kafkaTopic.setTopicName(TOPIC);
+ kafkaTopic.setBootstrapServers(BOOTSTRAP_SERVERS);
+ KafkaConnectorProperties kafkaConnectorProperties = new KafkaConnectorProperties();
+ kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.NONE);
+ kafkaConnectorProperties.setAuthenticationType("custom");
+ kafkaConnectorProperties.setOffsets("9999,8888");
+ kafkaConnectorProperties.setTopic(kafkaTopic);
+ String jsonString =
+ "{'authenticationType':'custom', "
+ + "'topic.topicName':'"
+ + TOPIC
+ + "',"
+ + "'topic.bootstrapServers':'"
+ + BOOTSTRAP_SERVERS
+ + "',"
+ + "'autoOffsetReset':'none',"
+ + "'offsets':'9999,8888'}";
+ Gson gson = new Gson();
+ Map propertiesMap = gson.fromJson(jsonString, Map.class);
+ InboundConnectorContextBuilder.TestInboundConnectorContext context =
+ InboundConnectorContextBuilder.create()
+ .result(new MessageCorrelationResult("", 0))
+ .propertiesAsType(kafkaConnectorProperties)
+ .properties(
+ InboundConnectorPropertiesBuilder.create()
+ .properties(propertiesMap)
+ .bpmnProcessId(processId))
+ .build();
+ KafkaExecutable executable = new KafkaExecutable();
+
+ // When
+ OffsetOutOfRangeException thrown =
+ assertThrows(
+ OffsetOutOfRangeException.class,
+ () -> {
+ try {
+ executable.activate(context);
+ executable.kafkaConnectorConsumer.future.get();
+ } catch (Exception ex) {
+ throw ex.getCause();
+ }
+ },
+ "OffsetOutOfRangeException was expected");
+
+ // Then we except exception with message
+ assertThat(thrown.getMessage()).contains("Fetch position FetchPosition");
+ assertThat(thrown.getMessage()).contains("is out of range for partition " + TOPIC + "-");
+ assertEquals(0, context.getCorrelations().size());
+ }
+
+ @Test
+ @Order(3)
+ void consumeMessageWithInboundConnector() throws Exception {
+ // Given
+ KafkaTopic kafkaTopic = new KafkaTopic();
+ kafkaTopic.setTopicName(TOPIC);
+ kafkaTopic.setBootstrapServers(BOOTSTRAP_SERVERS);
+ KafkaConnectorProperties kafkaConnectorProperties = new KafkaConnectorProperties();
+ kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.EARLIEST);
+ kafkaConnectorProperties.setAuthenticationType("custom");
+ kafkaConnectorProperties.setTopic(kafkaTopic);
+ String jsonString =
+ "{'authenticationType':'custom', "
+ + "'topic.topicName':'"
+ + TOPIC
+ + "',"
+ + "'topic.bootstrapServers':'"
+ + BOOTSTRAP_SERVERS
+ + "',"
+ + "'autoOffsetReset':'earliest'}";
+ Gson gson = new Gson();
+ Map propertiesMap = gson.fromJson(jsonString, Map.class);
+ InboundConnectorContextBuilder.TestInboundConnectorContext context =
+ InboundConnectorContextBuilder.create()
+ .result(new MessageCorrelationResult("", 0))
+ .propertiesAsType(kafkaConnectorProperties)
+ .properties(
+ InboundConnectorPropertiesBuilder.create()
+ .properties(propertiesMap)
+ .bpmnProcessId(processId))
+ .build();
+ KafkaExecutable executable = new KafkaExecutable();
+
+ // When
+ executable.activate(context);
+ await().atMost(Duration.ofSeconds(5)).until(() -> context.getCorrelations().size() > 0);
+ executable.deactivate();
+
+ // Then
+ assertEquals(1, context.getCorrelations().size());
+ assertInstanceOf(KafkaInboundMessage.class, context.getCorrelations().get(0));
+ KafkaInboundMessage castedResult = (KafkaInboundMessage) context.getCorrelations().get(0);
+ String rawValue = castedResult.getRawValue();
+ assertInstanceOf(String.class, rawValue);
+ assertEquals("{'message': 'Test message'}", rawValue);
+ Object value = castedResult.getValue();
+ assertInstanceOf(Map.class, value);
+ assertEquals("Test message", ((Map) value).get("message"));
+ }
+
+ @Test
+ @Order(4)
+ void consumeSameMessageWithInboundConnectorAgainWithOffsets() throws Exception {
+ // Given
+ KafkaTopic kafkaTopic = new KafkaTopic();
+ kafkaTopic.setTopicName(TOPIC);
+ kafkaTopic.setBootstrapServers(BOOTSTRAP_SERVERS);
+ KafkaConnectorProperties kafkaConnectorProperties = new KafkaConnectorProperties();
+ kafkaConnectorProperties.setAutoOffsetReset(KafkaConnectorProperties.AutoOffsetReset.EARLIEST);
+ kafkaConnectorProperties.setAuthenticationType("custom");
+ kafkaConnectorProperties.setOffsets("0,0");
+ kafkaConnectorProperties.setTopic(kafkaTopic);
+ String jsonString =
+ "{'authenticationType':'custom', "
+ + "'topic.topicName':'"
+ + TOPIC
+ + "',"
+ + "'topic.bootstrapServers':'"
+ + BOOTSTRAP_SERVERS
+ + "',"
+ + "'autoOffsetReset':'earliest',"
+ + "'offsets':'0,0'}";
+ Gson gson = new Gson();
+ Map propertiesMap = gson.fromJson(jsonString, Map.class);
+ InboundConnectorContextBuilder.TestInboundConnectorContext context =
+ InboundConnectorContextBuilder.create()
+ .result(new MessageCorrelationResult("", 0))
+ .propertiesAsType(kafkaConnectorProperties)
+ .properties(
+ InboundConnectorPropertiesBuilder.create()
+ .properties(propertiesMap)
+ .bpmnProcessId(processId))
+ .build();
+ KafkaExecutable executable = new KafkaExecutable();
+
+ // When
+ executable.activate(context);
+ await().atMost(Duration.ofSeconds(5)).until(() -> context.getCorrelations().size() > 0);
+ executable.deactivate();
+
+ // Then
+ assertEquals(1, context.getCorrelations().size());
+ assertInstanceOf(KafkaInboundMessage.class, context.getCorrelations().get(0));
+ KafkaInboundMessage castedResult = (KafkaInboundMessage) context.getCorrelations().get(0);
+ String rawValue = castedResult.getRawValue();
+ assertInstanceOf(String.class, rawValue);
+ assertEquals("{'message': 'Test message'}", rawValue);
+ Object value = castedResult.getValue();
+ assertInstanceOf(Map.class, value);
+ assertEquals("Test message", ((Map) value).get("message"));
+ }
+}
diff --git a/connectors/kafka/src/test/java/io/camunda/connector/KafkaConnectorFunctionTest.java b/connectors/kafka/src/test/java/io/camunda/connector/kafka/outbound/KafkaConnectorFunctionTest.java
similarity index 98%
rename from connectors/kafka/src/test/java/io/camunda/connector/KafkaConnectorFunctionTest.java
rename to connectors/kafka/src/test/java/io/camunda/connector/kafka/outbound/KafkaConnectorFunctionTest.java
index 89638f34a3..741c10247c 100644
--- a/connectors/kafka/src/test/java/io/camunda/connector/KafkaConnectorFunctionTest.java
+++ b/connectors/kafka/src/test/java/io/camunda/connector/kafka/outbound/KafkaConnectorFunctionTest.java
@@ -4,14 +4,14 @@
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
-package io.camunda.connector;
+package io.camunda.connector.kafka.outbound;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.gson.Gson;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.impl.ConnectorInputException;
-import io.camunda.connector.model.KafkaConnectorRequest;
+import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder;
import java.io.File;
import java.io.IOException;
diff --git a/connectors/kafka/src/test/java/io/camunda/connector/model/KafkaConnectorRequestTest.java b/connectors/kafka/src/test/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequestTest.java
similarity index 98%
rename from connectors/kafka/src/test/java/io/camunda/connector/model/KafkaConnectorRequestTest.java
rename to connectors/kafka/src/test/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequestTest.java
index 8ec4cb4119..945140b7e0 100644
--- a/connectors/kafka/src/test/java/io/camunda/connector/model/KafkaConnectorRequestTest.java
+++ b/connectors/kafka/src/test/java/io/camunda/connector/kafka/outbound/model/KafkaConnectorRequestTest.java
@@ -4,7 +4,7 @@
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
-package io.camunda.connector.model;
+package io.camunda.connector.kafka.outbound.model;
import static org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;