-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(sqs): implement inbound connector (#504)
* feat(sqs): implement inbound connector
- Loading branch information
1 parent
198f2d1
commit 4d95e4f
Showing
21 changed files
with
586 additions
and
32 deletions.
There are no files selected for viewing
190 changes: 190 additions & 0 deletions
190
connectors/sqs/element-templates/aws-sqs-start-event-connector.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
{ | ||
"$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", | ||
"name": "Amazon SQS connector", | ||
"id": "io.camunda.connectors.AWSSQS.StartEvent.v1", | ||
"version": 1, | ||
"description": "Receive message from a queue", | ||
"documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/aws-sqs/", | ||
"appliesTo": [ | ||
"bpmn:StartEvent" | ||
], | ||
"elementType": { | ||
"value": "bpmn:StartEvent" | ||
}, | ||
"category": { | ||
"id": "connectors", | ||
"name": "Connectors" | ||
}, | ||
"groups": [ | ||
{ | ||
"id": "authentication", | ||
"label": "Authentication" | ||
}, | ||
{ | ||
"id": "queueProperties", | ||
"label": "Queue properties" | ||
}, | ||
{ | ||
"id": "messagePollingProperties", | ||
"label": "Message polling properties" | ||
}, | ||
{ | ||
"id": "input", | ||
"label": "Use next attribute names for activation condition" | ||
}, | ||
{ | ||
"id": "activation", | ||
"label": "Activation" | ||
}, | ||
{ | ||
"id": "variable-mapping", | ||
"label": "Variable Mapping" | ||
} | ||
], | ||
"properties": [ | ||
{ | ||
"type": "Hidden", | ||
"value": "io.camunda:connector-aws-sqs-polling:1", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "inbound.type" | ||
} | ||
}, | ||
{ | ||
"label": "Access key", | ||
"description": "Provide AWS IAM access key that has permission to send to desired SQS", | ||
"group": "authentication", | ||
"type": "String", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "authentication.accessKey" | ||
}, | ||
"constraints": { | ||
"notEmpty": true | ||
} | ||
}, | ||
{ | ||
"label": "Secret key", | ||
"description": "Provide AWS IAM secret key that has permission to send to desired SQS", | ||
"group": "authentication", | ||
"type": "String", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "authentication.secretKey" | ||
}, | ||
"constraints": { | ||
"notEmpty": true | ||
} | ||
}, | ||
{ | ||
"label": "Queue URL", | ||
"description": "Specify the URL of the SQS queue where you would like to send message to", | ||
"group": "queueProperties", | ||
"type": "String", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "queue.url" | ||
}, | ||
"constraints": { | ||
"notEmpty": true, | ||
"pattern": { | ||
"value": "(^https?://.+)|(^arn:.+)(^secrets\\..+)", | ||
"message": "Must be an queue URL or ARN or a secret.XYZ" | ||
} | ||
} | ||
}, | ||
{ | ||
"label": "Region", | ||
"description": "Specify the AWS region of your queue", | ||
"group": "queueProperties", | ||
"type": "String", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "queue.region" | ||
}, | ||
"constraints": { | ||
"notEmpty": true | ||
} | ||
}, | ||
{ | ||
"label": "Polling wait time", | ||
"description": "The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. See <a href=\"https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/aws-sqs/\" target=\"_blank\">documentation</a> for details", | ||
"group": "messagePollingProperties", | ||
"type": "String", | ||
"value": "1", | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "queue.pollingWaitTime" | ||
}, | ||
"constraints": { | ||
"notEmpty": true, | ||
"pattern": { | ||
"value": "(^[0-9]?$|^1[0-9]$|^20$)|(^secrets\\..+)", | ||
"message": "Must be >= 0 and <= 20 or a secret.XYZ" | ||
} | ||
} | ||
}, | ||
{ | ||
"label": "Attribute names", | ||
"description": "Queue attribute names. See <a href=\"https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/aws-sqs/\" target=\"_blank\">documentation</a> for details", | ||
"group": "input", | ||
"type": "String", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "queue.attributeNames" | ||
}, | ||
"feel": "required" | ||
}, | ||
{ | ||
"label": "Message attribute names", | ||
"description": "Message attribute names. See <a href=\"https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/aws-sqs/\" target=\"_blank\">documentation</a> for details", | ||
"group": "input", | ||
"type": "String", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "queue.messageAttributeNames" | ||
}, | ||
"feel": "required" | ||
}, | ||
{ | ||
"label": "Activation condition", | ||
"type": "String", | ||
"group": "activation", | ||
"feel": "required", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "activationCondition" | ||
}, | ||
"description": "Condition under which the Connector triggers. Leave empty to catch all events" | ||
}, | ||
{ | ||
"label": "Result variable", | ||
"type": "String", | ||
"group": "variable-mapping", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "resultVariable" | ||
}, | ||
"description": "Name of variable to store the result of the Connector in" | ||
}, | ||
{ | ||
"label": "Result expression", | ||
"type": "String", | ||
"group": "variable-mapping", | ||
"feel": "required", | ||
"optional": true, | ||
"binding": { | ||
"type": "zeebe:property", | ||
"name": "resultExpression" | ||
}, | ||
"description": "Expression to map the inbound payload to process variables" | ||
} | ||
], | ||
"icon": { | ||
"contents": "data:image/svg+xml,%3Csvg width='18' height='18' viewBox='0 0 40 40' version='1.1' xmlns='http://www.w3.org/2000/svg' xmlns:xlink='http://www.w3.org/1999/xlink'%3E%3C!-- Generator: Sketch 64 (93537) - https://sketch.com --%3E%3Ctitle%3EIcon-Architecture/32/Arch_AWS-Simple-Queue-Service_32%3C/title%3E%3Cdesc%3ECreated with Sketch.%3C/desc%3E%3Cdefs%3E%3ClinearGradient x1='0%25' y1='100%25' x2='100%25' y2='0%25' id='linearGradient-1'%3E%3Cstop stop-color='%23B0084D' offset='0%25'%3E%3C/stop%3E%3Cstop stop-color='%23FF4F8B' offset='100%25'%3E%3C/stop%3E%3C/linearGradient%3E%3C/defs%3E%3Cg id='Icon-Architecture/32/Arch_AWS-Simple-Queue-Service_32' stroke='none' stroke-width='1' fill='none' fill-rule='evenodd'%3E%3Cg id='Icon-Architecture-BG/32/Application-Integration' fill='url(%23linearGradient-1)'%3E%3Crect id='Rectangle' x='0' y='0' width='40' height='40'%3E%3C/rect%3E%3C/g%3E%3Cpath d='M14.3422051,22.3493786 L15.8466767,20.9061074 C15.9428347,20.8141539 15.9969235,20.687218 15.9999285,20.5552846 C16.0019317,20.4223517 15.9518495,20.2934168 15.8596981,20.1984648 L14.3552264,18.6432502 L13.6350433,19.3378994 L14.311154,20.037546 L11.9913429,20.037546 L11.9913429,21.0370413 L14.2650783,21.0370413 L13.6480647,21.6287425 L14.3422051,22.3493786 Z M26.3579452,22.3533765 L27.9074909,20.9001104 C28.0066538,20.8081569 28.0627459,20.679222 28.0647492,20.5442901 C28.0667525,20.4093583 28.0136653,20.2784244 27.918509,20.1834724 L26.3689633,18.6372532 L25.6607999,19.3438963 L26.3549403,20.037546 L24.0110896,20.037546 L24.0110896,21.0370413 L26.2988481,21.0370413 L25.671818,21.6247445 L26.3579452,22.3533765 Z M17.5875367,23.3608678 C18.3387708,23.0570212 19.1621235,22.8941035 20.0045074,22.8941035 C20.8468913,22.8941035 21.670244,23.0570212 22.4214781,23.3608678 C21.7523789,21.5897622 21.7523789,19.3898731 22.4214781,17.6187675 C20.9190098,18.2264606 19.090005,18.2264606 17.5875367,17.6187675 C18.2566359,19.3898731 18.2566359,21.5897622 17.5875367,23.3608678 L17.5875367,23.3608678 Z M15.6443443,25.3408679 C15.546183,25.2439168 15.4971024,25.1159814 15.4971024,24.988046 C15.4971024,24.8601106 15.546183,24.7321753 15.6443443,24.6342247 C17.5845317,22.6982024 17.5845317,18.2824324 15.6443443,16.3454106 C15.546183,16.2484595 15.4971024,16.1205241 15.4971024,15.9925912 C15.4971024,15.8646534 15.546183,15.736718 15.6443443,15.6387674 C15.8396652,15.4438659 16.1571868,15.4438659 16.3525077,15.6387674 C17.2740216,16.5583031 18.6052086,17.0860366 20.0045074,17.0860366 C21.4048079,17.0860366 22.7359948,16.5583031 23.6575088,15.6387674 C23.8528296,15.4438659 24.1703513,15.4438659 24.3656722,15.6387674 C24.4628318,15.736718 24.5119124,15.8646534 24.5119124,15.9925912 C24.5119124,16.1205241 24.4628318,16.2484595 24.3656722,16.3454106 C22.4244831,18.2824324 22.4244831,22.6982024 24.3656722,24.6342247 C24.4628318,24.7321753 24.5119124,24.8601106 24.5119124,24.988046 C24.5119124,25.1159814 24.4628318,25.2439168 24.3656722,25.3408679 C24.2675109,25.4388184 24.1393003,25.4877937 24.0110896,25.4877937 C23.882879,25.4877937 23.7546684,25.4388184 23.6575088,25.3408679 C22.7359948,24.4213322 21.4048079,23.8935987 20.0045074,23.8935987 C18.6052086,23.8935987 17.2740216,24.4213322 16.3525077,25.3408679 C16.1571868,25.5357694 15.8396652,25.5357694 15.6443443,25.3408679 L15.6443443,25.3408679 Z M32.5421049,19.4358499 C32.236603,19.1320033 31.8369464,18.9800801 31.4362882,18.9800801 C31.0366316,18.9800801 30.636975,19.1320033 30.3314731,19.4358499 C29.721471,20.0445425 29.721471,21.0340428 30.3314731,21.6417359 C30.9414753,22.2504285 31.9321027,22.2504285 32.5421049,21.6417359 C33.1511054,21.0340428 33.1511054,20.0445425 32.5421049,19.4358499 L32.5421049,19.4358499 Z M33.2502683,22.3493786 C32.7504472,22.8481267 32.0933677,23.0980005 31.4362882,23.0980005 C30.7802103,23.0980005 30.1231309,22.8481267 29.6233097,22.3493786 C28.6236675,21.3508828 28.6236675,19.7277025 29.6233097,18.7292068 C30.622952,17.7317105 32.250626,17.7317105 33.2502683,18.7292068 C34.2499106,19.7277025 34.2499106,21.3508828 33.2502683,22.3493786 L33.2502683,22.3493786 Z M9.66852687,19.4468443 C9.36302497,19.1429978 8.96336839,18.9910745 8.56271017,18.9910745 C8.16305359,18.9910745 7.76339701,19.1429978 7.45789511,19.4468443 C6.84889461,20.055537 6.84889461,21.0450373 7.45789511,21.6527304 C8.06789726,22.261423 9.05852472,22.261423 9.66852687,21.6527304 C10.2775274,21.0450373 10.2775274,20.055537 9.66852687,19.4468443 L9.66852687,19.4468443 Z M10.3766903,22.3593735 C9.87686914,22.8581217 9.21978965,23.1079955 8.56271017,23.1079955 C7.90663232,23.1079955 7.24955284,22.8581217 6.7497317,22.3593735 C5.75008943,21.3618773 5.75008943,19.738697 6.7497317,18.7402012 C7.74937397,17.7427049 9.37704801,17.7427049 10.3766903,18.7402012 C11.3763325,19.738697 11.3763325,21.3618773 10.3766903,22.3593735 L10.3766903,22.3593735 Z M27.4337125,28.9100654 C25.4364313,30.903059 22.7820705,32.0005047 19.9574301,32.0005047 C17.1327896,32.0005047 14.4784288,30.903059 12.4821492,28.9100654 C11.165987,27.5977281 10.4077413,26.469298 9.94498104,25.1359713 L8.99842599,25.4628063 C9.50726193,26.9290658 10.3626672,28.2104187 11.7739858,29.6167086 C13.9585748,31.7986067 16.8663519,33 19.9574301,33 C23.0495099,33 25.9562853,31.7986067 28.1418759,29.6167086 C29.2827502,28.4782835 30.4206196,27.1869356 31.0115905,25.4608073 L30.0640338,25.1379703 C29.5391715,26.6701966 28.4894469,27.8565974 27.4337125,28.9100654 L27.4337125,28.9100654 Z M9.94498104,15.8596559 L8.99842599,15.5318214 C9.51026687,14.0645624 10.3656722,12.7832095 11.7759891,11.3759202 C16.2863991,6.87519304 23.6264578,6.87419354 28.1378694,11.3759202 C29.2186449,12.4533761 30.4035916,13.7897012 31.0115905,15.5318214 L30.0640338,15.8596559 C29.5241468,14.3094387 28.4293482,13.0800596 27.4297059,12.0825633 C25.434428,10.0915688 22.7810689,8.99612197 19.9574301,8.99612197 C17.1337912,8.99612197 14.4804321,10.0915688 12.4851542,12.0825633 C11.1870215,13.3779092 10.4037347,14.5423211 9.94498104,15.8596559 L9.94498104,15.8596559 Z' id='AWS-Simple-Queue-Service_Icon_32_Squid' fill='%23FFFFFF'%3E%3C/path%3E%3C/g%3E%3C/svg%3E" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
13 changes: 13 additions & 0 deletions
13
...tors/sqs/src/main/java/io/camunda/connector/common/suppliers/AmazonSQSClientSupplier.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
/* | ||
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH | ||
* under one or more contributor license agreements. Licensed under a proprietary license. | ||
* See the License.txt file for more information. You may not use this file | ||
* except in compliance with the proprietary license. | ||
*/ | ||
package io.camunda.connector.common.suppliers; | ||
|
||
import com.amazonaws.services.sqs.AmazonSQS; | ||
|
||
public interface AmazonSQSClientSupplier { | ||
AmazonSQS sqsClient(String accessKey, String secretKey, String region); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
connectors/sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH | ||
* under one or more contributor license agreements. Licensed under a proprietary license. | ||
* See the License.txt file for more information. You may not use this file | ||
* except in compliance with the proprietary license. | ||
*/ | ||
package io.camunda.connector.inbound; | ||
|
||
import com.amazonaws.services.sqs.AmazonSQS; | ||
import io.camunda.connector.api.annotation.InboundConnector; | ||
import io.camunda.connector.api.inbound.InboundConnectorContext; | ||
import io.camunda.connector.api.inbound.InboundConnectorExecutable; | ||
import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier; | ||
import io.camunda.connector.common.suppliers.DefaultAmazonSQSClientSupplier; | ||
import io.camunda.connector.inbound.model.SqsInboundProperties; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@InboundConnector(name = "AWSSQS_POLLING", type = "io.camunda:connector-aws-sqs-polling:1") | ||
public class SqsExecutable implements InboundConnectorExecutable { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(SqsExecutable.class); | ||
|
||
private final AmazonSQSClientSupplier sqsClientSupplier; | ||
private final ExecutorService executorService; | ||
private AmazonSQS amazonSQS; | ||
private final AtomicBoolean isQueueConsumerActive; | ||
|
||
public SqsExecutable() { | ||
this.sqsClientSupplier = new DefaultAmazonSQSClientSupplier(); | ||
this.executorService = Executors.newSingleThreadExecutor(); | ||
this.isQueueConsumerActive = new AtomicBoolean(false); | ||
} | ||
|
||
public SqsExecutable( | ||
final AmazonSQSClientSupplier sqsClientSupplier, | ||
final ExecutorService executorService, | ||
final AtomicBoolean isQueueConsumerActive) { | ||
this.sqsClientSupplier = sqsClientSupplier; | ||
this.executorService = executorService; | ||
this.isQueueConsumerActive = isQueueConsumerActive; | ||
} | ||
|
||
@Override | ||
public void activate(final InboundConnectorContext context) { | ||
SqsInboundProperties properties = context.getPropertiesAsType(SqsInboundProperties.class); | ||
LOGGER.info("Subscription activation requested by the Connector runtime: {}", properties); | ||
|
||
context.replaceSecrets(properties); | ||
context.validate(properties); | ||
|
||
amazonSQS = | ||
sqsClientSupplier.sqsClient( | ||
properties.getAuthentication().getAccessKey(), | ||
properties.getAuthentication().getSecretKey(), | ||
properties.getQueue().getRegion()); | ||
LOGGER.debug("SQS client created successfully"); | ||
|
||
isQueueConsumerActive.set(true); | ||
executorService.execute( | ||
new SqsQueueConsumer(amazonSQS, properties, context, isQueueConsumerActive)); | ||
LOGGER.debug("SQS queue consumer started successfully"); | ||
} | ||
|
||
@Override | ||
public void deactivate() { | ||
isQueueConsumerActive.set(false); | ||
LOGGER.debug("Deactivating subscription"); | ||
if (executorService != null) { | ||
LOGGER.debug("Shutting down executor service"); | ||
executorService.shutdown(); | ||
try { | ||
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) { | ||
LOGGER.debug("Executor service did not terminate gracefully, forcing shutdown"); | ||
executorService.shutdownNow(); | ||
} | ||
} catch (InterruptedException e) { | ||
LOGGER.debug( | ||
"Interrupted while waiting for executor service to terminate, forcing shutdown"); | ||
executorService.shutdownNow(); | ||
} | ||
} | ||
if (amazonSQS != null) { | ||
LOGGER.debug("Shutting down SQS client"); | ||
amazonSQS.shutdown(); | ||
LOGGER.debug("SQS client shut down successfully"); | ||
} | ||
} | ||
} |
Oops, something went wrong.