-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add inbound kafka connector (#373)
* feat: add inbound kafka connector * feat: refactor kafka inbound connector * feat: close previous Kafka consumer before activating an other one * feat: fix kafka message deserialization * feat: add unit tests; set manual commit * feat: add authenticationType, offset, and autoOffsetReset to kafka inbound * fix: set partition offsets, add integration tests * fix: formatting, make AutoOffsetReset enum, delete unnecessary dependency * fix: add delay simulation to kafka poll in test * fix: AutoOffsetReset enum deserialization * chore: refactor Kafka inbound connector, use bpmn process id in kafka group id * fix: add missing jackson dependency * fix: minor refactoring * fix: code styling with mvn spotless * fix: minor code refactor
- Loading branch information
1 parent
bfaff85
commit 10bf9a7
Showing
21 changed files
with
1,255 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
226 changes: 226 additions & 0 deletions
226
connectors/kafka/element-templates/kafka-inbound-connector.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
{ | ||
"$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", | ||
"name": "Kafka Consumer Connector", | ||
"id": "io.camunda.connectors.inbound.KAFKA.v1", | ||
"version": 1, | ||
"description": "Consume Kafka Message", | ||
"documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/", | ||
"category": { | ||
"id": "connectors", | ||
"name": "Connectors" | ||
}, | ||
"appliesTo": [ | ||
"bpmn:StartEvent" | ||
], | ||
"elementType": { | ||
"value": "bpmn:StartEvent" | ||
}, | ||
"groups": [ | ||
{ | ||
"id": "authentication", | ||
"label": "Authentication" | ||
}, | ||
{ | ||
"id": "kafka", | ||
"label": "Kafka" | ||
}, | ||
{ | ||
"id": "activation", | ||
"label": "Activation" | ||
}, | ||
{ | ||
"id": "variable-mapping", | ||
"label": "Variable Mapping" | ||
} | ||
], | ||
"properties": [ | ||
{ | ||
"type": "Hidden", | ||
"value": "io.camunda:connector-kafka-inbound:1", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "inbound.type" | ||
} | ||
}, | ||
{ | ||
"label": "Authentication type", | ||
"description": "Username/password or custom", | ||
"id": "authenticationType", | ||
"group": "authentication", | ||
"type": "Dropdown", | ||
"value": "credentials", | ||
"choices": [ | ||
{ | ||
"name": "Credentials", | ||
"value": "credentials" | ||
}, | ||
{ | ||
"name": "Custom", | ||
"value": "custom" | ||
} | ||
], | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "authenticationType" | ||
} | ||
}, | ||
{ | ||
"label": "Username", | ||
"description": "Provide the username (must have permissions to produce message to the topic)", | ||
"group": "authentication", | ||
"type": "String", | ||
"feel":"optional", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "authentication.username" | ||
}, | ||
"condition": { | ||
"property": "authenticationType", | ||
"equals": "credentials" | ||
} | ||
}, | ||
{ | ||
"label": "Password", | ||
"description": "Provide a password for the user", | ||
"group": "authentication", | ||
"type": "String", | ||
"feel":"optional", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "authentication.password" | ||
}, | ||
"condition": { | ||
"property": "authenticationType", | ||
"equals": "credentials" | ||
} | ||
}, | ||
{ | ||
"label": "Bootstrap servers", | ||
"description": "Provide bootstrap server(s), comma-delimited if there are multiple", | ||
"group": "kafka", | ||
"type": "String", | ||
"feel":"optional", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "topic.bootstrapServers" | ||
}, | ||
"constraints": { | ||
"notEmpty": true | ||
} | ||
}, | ||
{ | ||
"label": "Topic", | ||
"description": "Provide the topic name", | ||
"group": "kafka", | ||
"type": "String", | ||
"feel":"optional", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "topic.topicName" | ||
}, | ||
"constraints": { | ||
"notEmpty": true | ||
} | ||
}, | ||
{ | ||
"label": "Additional properties", | ||
"description": "Provide additional Kafka consumer properties in JSON", | ||
"group": "kafka", | ||
"type": "String", | ||
"optional": true, | ||
"feel":"required", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "additionalProperties" | ||
} | ||
}, | ||
{ | ||
"label": "Offsets", | ||
"description": "Comma-separated list of offsets, e.g., '10,23' or '=[ \"10\", \"23\"]'. If specified, it has to have the same number of values as the number of partitions", | ||
"group": "kafka", | ||
"type": "String", | ||
"feel": "optional", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "offsets" | ||
} | ||
}, | ||
{ | ||
"label": "Auto offset reset", | ||
"description": "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets", | ||
"id": "autoOffsetReset", | ||
"group": "kafka", | ||
"type": "Dropdown", | ||
"value": "latest", | ||
"choices": [ | ||
{ | ||
"name": "Latest", | ||
"value": "latest" | ||
}, | ||
{ | ||
"name": "Earliest", | ||
"value": "earliest" | ||
}, | ||
{ | ||
"name": "None", | ||
"value": "none" | ||
} | ||
], | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "autoOffsetReset" | ||
} | ||
}, | ||
{ | ||
"label": "Activation condition", | ||
"type": "String", | ||
"group": "activation", | ||
"feel": "required", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "activationCondition" | ||
}, | ||
"description": "Condition under which the Connector triggers. Leave empty to catch all events" | ||
}, | ||
{ | ||
"label": "Result variable", | ||
"type": "String", | ||
"group": "variable-mapping", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "resultVariable" | ||
}, | ||
"description": "Name of variable to store the result of the Connector in" | ||
}, | ||
{ | ||
"label": "Result expression", | ||
"description": "Expression to map the inbound payload to process variables", | ||
"group": "variable-mapping", | ||
"type": "Text", | ||
"feel": "required", | ||
"binding": { | ||
"type": "zeebe:taskHeader", | ||
"key": "resultExpression" | ||
} | ||
}, | ||
{ | ||
"label": "Error expression", | ||
"description": "Expression to handle errors. Details in the <a href=\"https://docs.camunda.io/docs/components/connectors/use-connectors/#bpmn-errors\" target=\"_blank\">documentation</a>", | ||
"group": "errors", | ||
"type": "Text", | ||
"feel": "required", | ||
"binding": { | ||
"type": "zeebe:taskHeader", | ||
"key": "errorExpression" | ||
} | ||
} | ||
], | ||
"icon": { | ||
"contents": "data:image/svg+xml;utf8,%3Csvg width='18' height='18' viewBox='0 0 256 416' xmlns='http://www.w3.org/2000/svg' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M201.816 230.216c-16.186 0-30.697 7.171-40.634 18.461l-25.463-18.026c2.703-7.442 4.255-15.433 4.255-23.797 0-8.219-1.498-16.076-4.112-23.408l25.406-17.835c9.936 11.233 24.409 18.365 40.548 18.365 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184-29.875 0-54.184 24.305-54.184 54.184 0 5.348.808 10.505 2.258 15.389l-25.423 17.844c-10.62-13.175-25.911-22.374-43.333-25.182v-30.64c24.544-5.155 43.037-26.962 43.037-53.019C124.171 24.305 99.862 0 69.987 0 40.112 0 15.803 24.305 15.803 54.184c0 25.708 18.014 47.246 42.067 52.769v31.038C25.044 143.753 0 172.401 0 206.854c0 34.621 25.292 63.374 58.355 68.94v32.774c-24.299 5.341-42.552 27.011-42.552 52.894 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-25.883-18.253-47.553-42.552-52.894v-32.775a69.965 69.965 0 0 0 42.6-24.776l25.633 18.143c-1.423 4.84-2.22 9.946-2.22 15.24 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184zm0-126.695c14.487 0 26.27 11.788 26.27 26.271s-11.783 26.27-26.27 26.27-26.27-11.787-26.27-26.27c0-14.483 11.783-26.271 26.27-26.271zm-158.1-49.337c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27zm52.541 307.278c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27zm-26.272-117.97c-20.205 0-36.642-16.434-36.642-36.638 0-20.205 16.437-36.642 36.642-36.642 20.204 0 36.641 16.437 36.641 36.642 0 20.204-16.437 36.638-36.641 36.638zm131.831 67.179c-14.487 0-26.27-11.788-26.27-26.271s11.783-26.27 26.27-26.27 26.27 11.787 26.27 26.27c0 14.483-11.783 26.271-26.27 26.271z' style='fill:%23231f20'/%3E%3C/svg%3E" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.