diff --git a/connectors/sqs/element-templates/aws-sqs-start-event-connector.json b/connectors/sqs/element-templates/aws-sqs-start-event-connector.json new file mode 100644 index 0000000000..68628d72ef --- /dev/null +++ b/connectors/sqs/element-templates/aws-sqs-start-event-connector.json @@ -0,0 +1,190 @@ +{ + "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json", + "name": "Amazon SQS connector", + "id": "io.camunda.connectors.AWSSQS.StartEvent.v1", + "version": 1, + "description": "Receive message from a queue", + "documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/aws-sqs/", + "appliesTo": [ + "bpmn:StartEvent" + ], + "elementType": { + "value": "bpmn:StartEvent" + }, + "category": { + "id": "connectors", + "name": "Connectors" + }, + "groups": [ + { + "id": "authentication", + "label": "Authentication" + }, + { + "id": "queueProperties", + "label": "Queue properties" + }, + { + "id": "messagePollingProperties", + "label": "Message polling properties" + }, + { + "id": "input", + "label": "Use next attribute names for activation condition" + }, + { + "id": "activation", + "label": "Activation" + }, + { + "id": "variable-mapping", + "label": "Variable Mapping" + } + ], + "properties": [ + { + "type": "Hidden", + "value": "io.camunda:connector-aws-sqs-polling:1", + "binding": { + "type": "zeebe:property", + "name": "inbound.type" + } + }, + { + "label": "Access key", + "description": "Provide AWS IAM access key that has permission to send to desired SQS", + "group": "authentication", + "type": "String", + "binding": { + "type": "zeebe:property", + "name": "authentication.accessKey" + }, + "constraints": { + "notEmpty": true + } + }, + { + "label": "Secret key", + "description": "Provide AWS IAM secret key that has permission to send to desired SQS", + "group": "authentication", + "type": "String", + "binding": { + "type": "zeebe:property", + "name": "authentication.secretKey" + }, + "constraints": { + "notEmpty": true + } + }, + { + "label": "Queue URL", + "description": "Specify the URL of the SQS queue where you would like to send message to", + "group": "queueProperties", + "type": "String", + "binding": { + "type": "zeebe:property", + "name": "queue.url" + }, + "constraints": { + "notEmpty": true, + "pattern": { + "value": "(^https?://.+)|(^arn:.+)(^secrets\\..+)", + "message": "Must be an queue URL or ARN or a secret.XYZ" + } + } + }, + { + "label": "Region", + "description": "Specify the AWS region of your queue", + "group": "queueProperties", + "type": "String", + "binding": { + "type": "zeebe:property", + "name": "queue.region" + }, + "constraints": { + "notEmpty": true + } + }, + { + "label": "Polling wait time", + "description": "The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. See documentation for details", + "group": "messagePollingProperties", + "type": "String", + "value": "1", + "binding": { + "type": "zeebe:property", + "name": "queue.pollingWaitTime" + }, + "constraints": { + "notEmpty": true, + "pattern": { + "value": "(^[0-9]?$|^1[0-9]$|^20$)|(^secrets\\..+)", + "message": "Must be >= 0 and <= 20 or a secret.XYZ" + } + } + }, + { + "label": "Attribute names", + "description": "Queue attribute names. See documentation for details", + "group": "input", + "type": "String", + "optional": true, + "binding": { + "type": "zeebe:property", + "name": "queue.attributeNames" + }, + "feel": "required" + }, + { + "label": "Message attribute names", + "description": "Message attribute names. See documentation for details", + "group": "input", + "type": "String", + "optional": true, + "binding": { + "type": "zeebe:property", + "name": "queue.messageAttributeNames" + }, + "feel": "required" + }, + { + "label": "Activation condition", + "type": "String", + "group": "activation", + "feel": "required", + "optional": true, + "binding": { + "type": "zeebe:property", + "name": "activationCondition" + }, + "description": "Condition under which the Connector triggers. Leave empty to catch all events" + }, + { + "label": "Result variable", + "type": "String", + "group": "variable-mapping", + "optional": true, + "binding": { + "type": "zeebe:property", + "name": "resultVariable" + }, + "description": "Name of variable to store the result of the Connector in" + }, + { + "label": "Result expression", + "type": "String", + "group": "variable-mapping", + "feel": "required", + "optional": true, + "binding": { + "type": "zeebe:property", + "name": "resultExpression" + }, + "description": "Expression to map the inbound payload to process variables" + } + ], + "icon": { + "contents": "data:image/svg+xml,%3Csvg width='18' height='18' viewBox='0 0 40 40' version='1.1' xmlns='http://www.w3.org/2000/svg' xmlns:xlink='http://www.w3.org/1999/xlink'%3E%3C!-- Generator: Sketch 64 (93537) - https://sketch.com --%3E%3Ctitle%3EIcon-Architecture/32/Arch_AWS-Simple-Queue-Service_32%3C/title%3E%3Cdesc%3ECreated with Sketch.%3C/desc%3E%3Cdefs%3E%3ClinearGradient x1='0%25' y1='100%25' x2='100%25' y2='0%25' id='linearGradient-1'%3E%3Cstop stop-color='%23B0084D' offset='0%25'%3E%3C/stop%3E%3Cstop stop-color='%23FF4F8B' offset='100%25'%3E%3C/stop%3E%3C/linearGradient%3E%3C/defs%3E%3Cg id='Icon-Architecture/32/Arch_AWS-Simple-Queue-Service_32' stroke='none' stroke-width='1' fill='none' fill-rule='evenodd'%3E%3Cg id='Icon-Architecture-BG/32/Application-Integration' fill='url(%23linearGradient-1)'%3E%3Crect id='Rectangle' x='0' y='0' width='40' height='40'%3E%3C/rect%3E%3C/g%3E%3Cpath d='M14.3422051,22.3493786 L15.8466767,20.9061074 C15.9428347,20.8141539 15.9969235,20.687218 15.9999285,20.5552846 C16.0019317,20.4223517 15.9518495,20.2934168 15.8596981,20.1984648 L14.3552264,18.6432502 L13.6350433,19.3378994 L14.311154,20.037546 L11.9913429,20.037546 L11.9913429,21.0370413 L14.2650783,21.0370413 L13.6480647,21.6287425 L14.3422051,22.3493786 Z M26.3579452,22.3533765 L27.9074909,20.9001104 C28.0066538,20.8081569 28.0627459,20.679222 28.0647492,20.5442901 C28.0667525,20.4093583 28.0136653,20.2784244 27.918509,20.1834724 L26.3689633,18.6372532 L25.6607999,19.3438963 L26.3549403,20.037546 L24.0110896,20.037546 L24.0110896,21.0370413 L26.2988481,21.0370413 L25.671818,21.6247445 L26.3579452,22.3533765 Z M17.5875367,23.3608678 C18.3387708,23.0570212 19.1621235,22.8941035 20.0045074,22.8941035 C20.8468913,22.8941035 21.670244,23.0570212 22.4214781,23.3608678 C21.7523789,21.5897622 21.7523789,19.3898731 22.4214781,17.6187675 C20.9190098,18.2264606 19.090005,18.2264606 17.5875367,17.6187675 C18.2566359,19.3898731 18.2566359,21.5897622 17.5875367,23.3608678 L17.5875367,23.3608678 Z M15.6443443,25.3408679 C15.546183,25.2439168 15.4971024,25.1159814 15.4971024,24.988046 C15.4971024,24.8601106 15.546183,24.7321753 15.6443443,24.6342247 C17.5845317,22.6982024 17.5845317,18.2824324 15.6443443,16.3454106 C15.546183,16.2484595 15.4971024,16.1205241 15.4971024,15.9925912 C15.4971024,15.8646534 15.546183,15.736718 15.6443443,15.6387674 C15.8396652,15.4438659 16.1571868,15.4438659 16.3525077,15.6387674 C17.2740216,16.5583031 18.6052086,17.0860366 20.0045074,17.0860366 C21.4048079,17.0860366 22.7359948,16.5583031 23.6575088,15.6387674 C23.8528296,15.4438659 24.1703513,15.4438659 24.3656722,15.6387674 C24.4628318,15.736718 24.5119124,15.8646534 24.5119124,15.9925912 C24.5119124,16.1205241 24.4628318,16.2484595 24.3656722,16.3454106 C22.4244831,18.2824324 22.4244831,22.6982024 24.3656722,24.6342247 C24.4628318,24.7321753 24.5119124,24.8601106 24.5119124,24.988046 C24.5119124,25.1159814 24.4628318,25.2439168 24.3656722,25.3408679 C24.2675109,25.4388184 24.1393003,25.4877937 24.0110896,25.4877937 C23.882879,25.4877937 23.7546684,25.4388184 23.6575088,25.3408679 C22.7359948,24.4213322 21.4048079,23.8935987 20.0045074,23.8935987 C18.6052086,23.8935987 17.2740216,24.4213322 16.3525077,25.3408679 C16.1571868,25.5357694 15.8396652,25.5357694 15.6443443,25.3408679 L15.6443443,25.3408679 Z M32.5421049,19.4358499 C32.236603,19.1320033 31.8369464,18.9800801 31.4362882,18.9800801 C31.0366316,18.9800801 30.636975,19.1320033 30.3314731,19.4358499 C29.721471,20.0445425 29.721471,21.0340428 30.3314731,21.6417359 C30.9414753,22.2504285 31.9321027,22.2504285 32.5421049,21.6417359 C33.1511054,21.0340428 33.1511054,20.0445425 32.5421049,19.4358499 L32.5421049,19.4358499 Z M33.2502683,22.3493786 C32.7504472,22.8481267 32.0933677,23.0980005 31.4362882,23.0980005 C30.7802103,23.0980005 30.1231309,22.8481267 29.6233097,22.3493786 C28.6236675,21.3508828 28.6236675,19.7277025 29.6233097,18.7292068 C30.622952,17.7317105 32.250626,17.7317105 33.2502683,18.7292068 C34.2499106,19.7277025 34.2499106,21.3508828 33.2502683,22.3493786 L33.2502683,22.3493786 Z M9.66852687,19.4468443 C9.36302497,19.1429978 8.96336839,18.9910745 8.56271017,18.9910745 C8.16305359,18.9910745 7.76339701,19.1429978 7.45789511,19.4468443 C6.84889461,20.055537 6.84889461,21.0450373 7.45789511,21.6527304 C8.06789726,22.261423 9.05852472,22.261423 9.66852687,21.6527304 C10.2775274,21.0450373 10.2775274,20.055537 9.66852687,19.4468443 L9.66852687,19.4468443 Z M10.3766903,22.3593735 C9.87686914,22.8581217 9.21978965,23.1079955 8.56271017,23.1079955 C7.90663232,23.1079955 7.24955284,22.8581217 6.7497317,22.3593735 C5.75008943,21.3618773 5.75008943,19.738697 6.7497317,18.7402012 C7.74937397,17.7427049 9.37704801,17.7427049 10.3766903,18.7402012 C11.3763325,19.738697 11.3763325,21.3618773 10.3766903,22.3593735 L10.3766903,22.3593735 Z M27.4337125,28.9100654 C25.4364313,30.903059 22.7820705,32.0005047 19.9574301,32.0005047 C17.1327896,32.0005047 14.4784288,30.903059 12.4821492,28.9100654 C11.165987,27.5977281 10.4077413,26.469298 9.94498104,25.1359713 L8.99842599,25.4628063 C9.50726193,26.9290658 10.3626672,28.2104187 11.7739858,29.6167086 C13.9585748,31.7986067 16.8663519,33 19.9574301,33 C23.0495099,33 25.9562853,31.7986067 28.1418759,29.6167086 C29.2827502,28.4782835 30.4206196,27.1869356 31.0115905,25.4608073 L30.0640338,25.1379703 C29.5391715,26.6701966 28.4894469,27.8565974 27.4337125,28.9100654 L27.4337125,28.9100654 Z M9.94498104,15.8596559 L8.99842599,15.5318214 C9.51026687,14.0645624 10.3656722,12.7832095 11.7759891,11.3759202 C16.2863991,6.87519304 23.6264578,6.87419354 28.1378694,11.3759202 C29.2186449,12.4533761 30.4035916,13.7897012 31.0115905,15.5318214 L30.0640338,15.8596559 C29.5241468,14.3094387 28.4293482,13.0800596 27.4297059,12.0825633 C25.434428,10.0915688 22.7810689,8.99612197 19.9574301,8.99612197 C17.1337912,8.99612197 14.4804321,10.0915688 12.4851542,12.0825633 C11.1870215,13.3779092 10.4037347,14.5423211 9.94498104,15.8596559 L9.94498104,15.8596559 Z' id='AWS-Simple-Queue-Service_Icon_32_Squid' fill='%23FFFFFF'%3E%3C/path%3E%3C/g%3E%3C/svg%3E" + } +} \ No newline at end of file diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsAuthenticationRequestData.java b/connectors/sqs/src/main/java/io/camunda/connector/common/model/SqsAuthenticationRequestData.java similarity index 97% rename from connectors/sqs/src/main/java/io/camunda/connector/model/SqsAuthenticationRequestData.java rename to connectors/sqs/src/main/java/io/camunda/connector/common/model/SqsAuthenticationRequestData.java index 7817d83c1e..e63da8c91b 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsAuthenticationRequestData.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/common/model/SqsAuthenticationRequestData.java @@ -4,7 +4,7 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector.model; +package io.camunda.connector.common.model; import io.camunda.connector.api.annotation.Secret; import java.util.Objects; diff --git a/connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/AmazonSQSClientSupplier.java b/connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/AmazonSQSClientSupplier.java new file mode 100644 index 0000000000..4e0c741a4e --- /dev/null +++ b/connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/AmazonSQSClientSupplier.java @@ -0,0 +1,13 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.common.suppliers; + +import com.amazonaws.services.sqs.AmazonSQS; + +public interface AmazonSQSClientSupplier { + AmazonSQS sqsClient(String accessKey, String secretKey, String region); +} diff --git a/connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsClientSupplier.java b/connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/DefaultAmazonSQSClientSupplier.java similarity index 86% rename from connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsClientSupplier.java rename to connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/DefaultAmazonSQSClientSupplier.java index 25d9cb9a5b..a9c79f6ee1 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsClientSupplier.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/DefaultAmazonSQSClientSupplier.java @@ -4,14 +4,14 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector.suppliers; +package io.camunda.connector.common.suppliers; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -public class SqsClientSupplier { +public class DefaultAmazonSQSClientSupplier implements AmazonSQSClientSupplier { public AmazonSQS sqsClient(final String accessKey, final String secretKey, final String region) { return AmazonSQSClientBuilder.standard() diff --git a/connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsGsonComponentSupplier.java b/connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/SqsGsonComponentSupplier.java similarity index 92% rename from connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsGsonComponentSupplier.java rename to connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/SqsGsonComponentSupplier.java index bd56e091fa..9c17d07ee1 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsGsonComponentSupplier.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/common/suppliers/SqsGsonComponentSupplier.java @@ -4,7 +4,7 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector.suppliers; +package io.camunda.connector.common.suppliers; import com.google.gson.Gson; import com.google.gson.GsonBuilder; diff --git a/connectors/sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java b/connectors/sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java new file mode 100644 index 0000000000..1781ac1b79 --- /dev/null +++ b/connectors/sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java @@ -0,0 +1,92 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.inbound; + +import com.amazonaws.services.sqs.AmazonSQS; +import io.camunda.connector.api.annotation.InboundConnector; +import io.camunda.connector.api.inbound.InboundConnectorContext; +import io.camunda.connector.api.inbound.InboundConnectorExecutable; +import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier; +import io.camunda.connector.common.suppliers.DefaultAmazonSQSClientSupplier; +import io.camunda.connector.inbound.model.SqsInboundProperties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InboundConnector(name = "AWSSQS_POLLING", type = "io.camunda:connector-aws-sqs-polling:1") +public class SqsExecutable implements InboundConnectorExecutable { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsExecutable.class); + + private final AmazonSQSClientSupplier sqsClientSupplier; + private final ExecutorService executorService; + private AmazonSQS amazonSQS; + private final AtomicBoolean isQueueConsumerActive; + + public SqsExecutable() { + this.sqsClientSupplier = new DefaultAmazonSQSClientSupplier(); + this.executorService = Executors.newSingleThreadExecutor(); + this.isQueueConsumerActive = new AtomicBoolean(false); + } + + public SqsExecutable( + final AmazonSQSClientSupplier sqsClientSupplier, + final ExecutorService executorService, + final AtomicBoolean isQueueConsumerActive) { + this.sqsClientSupplier = sqsClientSupplier; + this.executorService = executorService; + this.isQueueConsumerActive = isQueueConsumerActive; + } + + @Override + public void activate(final InboundConnectorContext context) { + SqsInboundProperties properties = context.getPropertiesAsType(SqsInboundProperties.class); + LOGGER.info("Subscription activation requested by the Connector runtime: {}", properties); + + context.replaceSecrets(properties); + context.validate(properties); + + amazonSQS = + sqsClientSupplier.sqsClient( + properties.getAuthentication().getAccessKey(), + properties.getAuthentication().getSecretKey(), + properties.getQueue().getRegion()); + LOGGER.debug("SQS client created successfully"); + + isQueueConsumerActive.set(true); + executorService.execute( + new SqsQueueConsumer(amazonSQS, properties, context, isQueueConsumerActive)); + LOGGER.debug("SQS queue consumer started successfully"); + } + + @Override + public void deactivate() { + isQueueConsumerActive.set(false); + LOGGER.debug("Deactivating subscription"); + if (executorService != null) { + LOGGER.debug("Shutting down executor service"); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) { + LOGGER.debug("Executor service did not terminate gracefully, forcing shutdown"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOGGER.debug( + "Interrupted while waiting for executor service to terminate, forcing shutdown"); + executorService.shutdownNow(); + } + } + if (amazonSQS != null) { + LOGGER.debug("Shutting down SQS client"); + amazonSQS.shutdown(); + LOGGER.debug("SQS client shut down successfully"); + } + } +} diff --git a/connectors/sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java b/connectors/sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java new file mode 100644 index 0000000000..1ab75e62e4 --- /dev/null +++ b/connectors/sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java @@ -0,0 +1,81 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.inbound; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import io.camunda.connector.api.inbound.InboundConnectorContext; +import io.camunda.connector.api.inbound.InboundConnectorResult; +import io.camunda.connector.inbound.model.SqsInboundProperties; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SqsQueueConsumer implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(SqsQueueConsumer.class); + + private final AmazonSQS sqsClient; + private final SqsInboundProperties properties; + private final InboundConnectorContext context; + private final AtomicBoolean isQueueConsumerActive; + + public SqsQueueConsumer( + AmazonSQS sqsClient, + SqsInboundProperties properties, + InboundConnectorContext context, + AtomicBoolean isQueueConsumerActive) { + this.sqsClient = sqsClient; + this.properties = properties; + this.context = context; + this.isQueueConsumerActive = isQueueConsumerActive; + } + + @Override + public void run() { + LOGGER.info("Started SQS consumer for queue {}", properties.getQueue().getUrl()); + final ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest(); + ReceiveMessageResult receiveMessageResult; + do { + receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest); + List messages = receiveMessageResult.getMessages(); + for (Message message : messages) { + InboundConnectorResult correlate = context.correlate(message); + if (correlate.isActivated()) { + sqsClient.deleteMessage(properties.getQueue().getUrl(), message.getReceiptHandle()); + LOGGER.debug( + "Inbound event correlated successfully: {}, and message with ID {} was deleted", + correlate.getResponseData(), + message.getMessageId()); + } else { + LOGGER.debug("Inbound event not correlated: {}", correlate.getErrorData()); + } + } + } while (isQueueConsumerActive.get()); + LOGGER.info("Stopping SQS consumer for queue {}", properties.getQueue().getUrl()); + } + + private ReceiveMessageRequest createReceiveMessageRequest() { + ReceiveMessageRequest receiveMessageRequest = + new ReceiveMessageRequest() + .withWaitTimeSeconds(Integer.valueOf(properties.getQueue().getPollingWaitTime())) + .withQueueUrl(properties.getQueue().getUrl()); + + if (properties.getQueue().isContainAttributeNames()) { + receiveMessageRequest.withAttributeNames(properties.getQueue().getAttributeNames()); + } + if (properties.getQueue().isContainMessageAttributeNames()) { + receiveMessageRequest.withMessageAttributeNames( + properties.getQueue().getMessageAttributeNames()); + } + + return receiveMessageRequest; + } +} diff --git a/connectors/sqs/src/main/java/io/camunda/connector/inbound/model/SqsInboundProperties.java b/connectors/sqs/src/main/java/io/camunda/connector/inbound/model/SqsInboundProperties.java new file mode 100644 index 0000000000..fe79ed39d6 --- /dev/null +++ b/connectors/sqs/src/main/java/io/camunda/connector/inbound/model/SqsInboundProperties.java @@ -0,0 +1,56 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.inbound.model; + +import io.camunda.connector.api.annotation.Secret; +import io.camunda.connector.common.model.SqsAuthenticationRequestData; +import java.util.Objects; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; + +public class SqsInboundProperties { + @Valid @NotNull @Secret private SqsAuthenticationRequestData authentication; + @Valid @NotNull @Secret private SqsInboundQueueProperties queue; + + public SqsAuthenticationRequestData getAuthentication() { + return authentication; + } + + public void setAuthentication(final SqsAuthenticationRequestData authentication) { + this.authentication = authentication; + } + + public SqsInboundQueueProperties getQueue() { + return queue; + } + + public void setQueue(final SqsInboundQueueProperties queue) { + this.queue = queue; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final SqsInboundProperties that = (SqsInboundProperties) o; + return Objects.equals(authentication, that.authentication) && Objects.equals(queue, that.queue); + } + + @Override + public int hashCode() { + return Objects.hash(authentication, queue); + } + + @Override + public String toString() { + return "SqsInboundProperties{" + "authentication=" + authentication + ", queue=" + queue + "}"; + } +} diff --git a/connectors/sqs/src/main/java/io/camunda/connector/inbound/model/SqsInboundQueueProperties.java b/connectors/sqs/src/main/java/io/camunda/connector/inbound/model/SqsInboundQueueProperties.java new file mode 100644 index 0000000000..d613f0d9a8 --- /dev/null +++ b/connectors/sqs/src/main/java/io/camunda/connector/inbound/model/SqsInboundQueueProperties.java @@ -0,0 +1,117 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.inbound.model; + +import io.camunda.connector.api.annotation.Secret; +import java.util.List; +import java.util.Objects; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.Pattern; + +public class SqsInboundQueueProperties { + @NotEmpty @Secret private String region; + @NotEmpty @Secret private String url; + @Secret private List attributeNames; + @Secret private List messageAttributeNames; + private boolean deleteAfterReceipt; + + @Pattern(regexp = "(^[0-9]?$|^1[0-9]$|^20$)|(^secrets\\\\..+)") + @Secret + private String pollingWaitTime; + + public String getRegion() { + return region; + } + + public void setRegion(final String region) { + this.region = region; + } + + public String getUrl() { + return url; + } + + public void setUrl(final String url) { + this.url = url; + } + + public boolean isContainAttributeNames() { + return attributeNames != null && !attributeNames.isEmpty(); + } + + public boolean isContainMessageAttributeNames() { + return messageAttributeNames != null && !messageAttributeNames.isEmpty(); + } + + public List getAttributeNames() { + return attributeNames; + } + + public void setAttributeNames(final List attributeNames) { + this.attributeNames = attributeNames; + } + + public List getMessageAttributeNames() { + return messageAttributeNames; + } + + public void setMessageAttributeNames(final List messageAttributeNames) { + this.messageAttributeNames = messageAttributeNames; + } + + public String getPollingWaitTime() { + return pollingWaitTime; + } + + public void setPollingWaitTime(final String pollingWaitTime) { + this.pollingWaitTime = pollingWaitTime; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final SqsInboundQueueProperties that = (SqsInboundQueueProperties) o; + return deleteAfterReceipt == that.deleteAfterReceipt + && Objects.equals(region, that.region) + && Objects.equals(url, that.url) + && Objects.equals(attributeNames, that.attributeNames) + && Objects.equals(messageAttributeNames, that.messageAttributeNames) + && Objects.equals(pollingWaitTime, that.pollingWaitTime); + } + + @Override + public int hashCode() { + return Objects.hash( + region, url, attributeNames, messageAttributeNames, deleteAfterReceipt, pollingWaitTime); + } + + @Override + public String toString() { + return "SqsInboundQueueProperties{" + + "region='" + + region + + "'" + + ", url='" + + url + + "'" + + ", attributeNames=" + + attributeNames + + ", messageAttributeNames=" + + messageAttributeNames + + ", deleteAfterReceipt=" + + deleteAfterReceipt + + ", pollingWaitTime='" + + pollingWaitTime + + "'" + + "}"; + } +} diff --git a/connectors/sqs/src/main/java/io/camunda/connector/SqsConnectorFunction.java b/connectors/sqs/src/main/java/io/camunda/connector/outbound/SqsConnectorFunction.java similarity index 81% rename from connectors/sqs/src/main/java/io/camunda/connector/SqsConnectorFunction.java rename to connectors/sqs/src/main/java/io/camunda/connector/outbound/SqsConnectorFunction.java index 8f36d6de96..4455f1d197 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/SqsConnectorFunction.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/outbound/SqsConnectorFunction.java @@ -4,7 +4,7 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector; +package io.camunda.connector.outbound; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.SendMessageRequest; @@ -13,10 +13,11 @@ import io.camunda.connector.api.annotation.OutboundConnector; import io.camunda.connector.api.outbound.OutboundConnectorContext; import io.camunda.connector.api.outbound.OutboundConnectorFunction; -import io.camunda.connector.model.SqsConnectorRequest; -import io.camunda.connector.model.SqsConnectorResult; -import io.camunda.connector.suppliers.SqsClientSupplier; -import io.camunda.connector.suppliers.SqsGsonComponentSupplier; +import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier; +import io.camunda.connector.common.suppliers.DefaultAmazonSQSClientSupplier; +import io.camunda.connector.common.suppliers.SqsGsonComponentSupplier; +import io.camunda.connector.outbound.model.SqsConnectorRequest; +import io.camunda.connector.outbound.model.SqsConnectorResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,14 +28,14 @@ public class SqsConnectorFunction implements OutboundConnectorFunction { private static final Logger LOGGER = LoggerFactory.getLogger(SqsConnectorFunction.class); - private final SqsClientSupplier sqsClientSupplier; + private final AmazonSQSClientSupplier sqsClientSupplier; private final Gson gson; public SqsConnectorFunction() { - this(new SqsClientSupplier(), SqsGsonComponentSupplier.gsonInstance()); + this(new DefaultAmazonSQSClientSupplier(), SqsGsonComponentSupplier.gsonInstance()); } - public SqsConnectorFunction(final SqsClientSupplier sqsClientSupplier, final Gson gson) { + public SqsConnectorFunction(final AmazonSQSClientSupplier sqsClientSupplier, final Gson gson) { this.sqsClientSupplier = sqsClientSupplier; this.gson = gson; } diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/QueueRequestData.java b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/QueueRequestData.java similarity index 99% rename from connectors/sqs/src/main/java/io/camunda/connector/model/QueueRequestData.java rename to connectors/sqs/src/main/java/io/camunda/connector/outbound/model/QueueRequestData.java index ebbeeb6891..b07c1879e1 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/QueueRequestData.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/QueueRequestData.java @@ -4,7 +4,7 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector.model; +package io.camunda.connector.outbound.model; import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.util.StringUtils; diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/QueueType.java b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/QueueType.java similarity index 93% rename from connectors/sqs/src/main/java/io/camunda/connector/model/QueueType.java rename to connectors/sqs/src/main/java/io/camunda/connector/outbound/model/QueueType.java index 1889d6a7e4..399e60b784 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/QueueType.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/QueueType.java @@ -4,7 +4,7 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector.model; +package io.camunda.connector.outbound.model; public enum QueueType { /** diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorRequest.java b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsConnectorRequest.java similarity index 93% rename from connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorRequest.java rename to connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsConnectorRequest.java index 9962eb2045..3f08bb6d5a 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorRequest.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsConnectorRequest.java @@ -4,9 +4,10 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector.model; +package io.camunda.connector.outbound.model; import io.camunda.connector.api.annotation.Secret; +import io.camunda.connector.common.model.SqsAuthenticationRequestData; import java.util.Objects; import javax.validation.Valid; import javax.validation.constraints.NotNull; diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorResult.java b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsConnectorResult.java similarity index 93% rename from connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorResult.java rename to connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsConnectorResult.java index 91633982ed..6cc1dbe128 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorResult.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsConnectorResult.java @@ -4,7 +4,7 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector.model; +package io.camunda.connector.outbound.model; public class SqsConnectorResult { diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsMessageAttribute.java b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsMessageAttribute.java similarity index 97% rename from connectors/sqs/src/main/java/io/camunda/connector/model/SqsMessageAttribute.java rename to connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsMessageAttribute.java index 6f7d934c36..15b021c719 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsMessageAttribute.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/outbound/model/SqsMessageAttribute.java @@ -4,7 +4,7 @@ * See the License.txt file for more information. You may not use this file * except in compliance with the proprietary license. */ -package io.camunda.connector.model; +package io.camunda.connector.outbound.model; import com.google.gson.annotations.SerializedName; import java.util.Objects; diff --git a/connectors/sqs/src/main/resources/META-INF/services/io.camunda.connector.api.inbound.InboundConnectorExecutable b/connectors/sqs/src/main/resources/META-INF/services/io.camunda.connector.api.inbound.InboundConnectorExecutable new file mode 100644 index 0000000000..2c15c5ca4c --- /dev/null +++ b/connectors/sqs/src/main/resources/META-INF/services/io.camunda.connector.api.inbound.InboundConnectorExecutable @@ -0,0 +1 @@ +io.camunda.connector.inbound.SqsExecutable \ No newline at end of file diff --git a/connectors/sqs/src/main/resources/META-INF/services/io.camunda.connector.api.outbound.OutboundConnectorFunction b/connectors/sqs/src/main/resources/META-INF/services/io.camunda.connector.api.outbound.OutboundConnectorFunction index c7d3316c24..a709384961 100644 --- a/connectors/sqs/src/main/resources/META-INF/services/io.camunda.connector.api.outbound.OutboundConnectorFunction +++ b/connectors/sqs/src/main/resources/META-INF/services/io.camunda.connector.api.outbound.OutboundConnectorFunction @@ -1 +1 @@ -io.camunda.connector.SqsConnectorFunction \ No newline at end of file +io.camunda.connector.outbound.SqsConnectorFunction \ No newline at end of file diff --git a/connectors/sqs/src/test/java/io/camunda/connector/BaseTest.java b/connectors/sqs/src/test/java/io/camunda/connector/BaseTest.java index 7f9ecfff2e..d2be2c48d7 100644 --- a/connectors/sqs/src/test/java/io/camunda/connector/BaseTest.java +++ b/connectors/sqs/src/test/java/io/camunda/connector/BaseTest.java @@ -7,7 +7,7 @@ package io.camunda.connector; import com.google.gson.Gson; -import io.camunda.connector.suppliers.SqsGsonComponentSupplier; +import io.camunda.connector.common.suppliers.SqsGsonComponentSupplier; public abstract class BaseTest { diff --git a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionParametrizedTest.java b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionParametrizedTest.java index f91eedec3d..922ed7b9fa 100644 --- a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionParametrizedTest.java +++ b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionParametrizedTest.java @@ -18,7 +18,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.file.Files.readString; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -26,10 +26,11 @@ import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; import io.camunda.connector.api.outbound.OutboundConnectorContext; -import io.camunda.connector.model.SqsConnectorRequest; -import io.camunda.connector.model.SqsConnectorResult; -import io.camunda.connector.suppliers.SqsClientSupplier; -import io.camunda.connector.suppliers.SqsGsonComponentSupplier; +import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier; +import io.camunda.connector.common.suppliers.SqsGsonComponentSupplier; +import io.camunda.connector.outbound.SqsConnectorFunction; +import io.camunda.connector.outbound.model.SqsConnectorRequest; +import io.camunda.connector.outbound.model.SqsConnectorResult; import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder; import java.io.File; import java.io.IOException; @@ -56,7 +57,7 @@ class SqsConnectorFunctionParametrizedTest { private static final String FAIL_CASES_RESOURCE_PATH = "src/test/resources/requests/fail-test-cases.json"; - @Mock private SqsClientSupplier sqsClientSupplier; + @Mock private AmazonSQSClientSupplier sqsClientSupplier; @Mock private AmazonSQS sqsClient; @Captor private ArgumentCaptor sendMessageRequest; diff --git a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionTest.java b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionTest.java index 28fc344c81..d80d466fbb 100644 --- a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionTest.java +++ b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionTest.java @@ -13,8 +13,9 @@ import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; import io.camunda.connector.api.outbound.OutboundConnectorContext; -import io.camunda.connector.model.SqsConnectorResult; -import io.camunda.connector.suppliers.SqsClientSupplier; +import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier; +import io.camunda.connector.outbound.SqsConnectorFunction; +import io.camunda.connector.outbound.model.SqsConnectorResult; import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -59,7 +60,7 @@ public void execute_shouldExecuteRequestAndReturnResultWithMsgId() { AmazonSQS sqsClient = Mockito.mock(AmazonSQS.class); Mockito.when(sqsClient.sendMessage(ArgumentMatchers.any(SendMessageRequest.class))) .thenReturn(sendMessageResult); - SqsClientSupplier sqsClientSupplier = Mockito.mock(SqsClientSupplier.class); + AmazonSQSClientSupplier sqsClientSupplier = Mockito.mock(AmazonSQSClientSupplier.class); Mockito.when( sqsClientSupplier.sqsClient( ArgumentMatchers.anyString(), @@ -85,7 +86,7 @@ public void execute_shouldPassPayloadAsJsonWhenJsonArrivesFromForm() { AmazonSQS sqsClient = Mockito.mock(AmazonSQS.class); Mockito.when(sqsClient.sendMessage(ArgumentMatchers.any(SendMessageRequest.class))) .thenReturn(sendMessageResult); - SqsClientSupplier sqsClientSupplier = Mockito.mock(SqsClientSupplier.class); + AmazonSQSClientSupplier sqsClientSupplier = Mockito.mock(AmazonSQSClientSupplier.class); Mockito.when( sqsClientSupplier.sqsClient( ArgumentMatchers.anyString(), @@ -118,7 +119,7 @@ public void execute_shouldPassPayloadAsStringWhenStringArrivesFromForm() { AmazonSQS sqsClient = Mockito.mock(AmazonSQS.class); Mockito.when(sqsClient.sendMessage(ArgumentMatchers.any(SendMessageRequest.class))) .thenReturn(sendMessageResult); - SqsClientSupplier sqsClientSupplier = Mockito.mock(SqsClientSupplier.class); + AmazonSQSClientSupplier sqsClientSupplier = Mockito.mock(AmazonSQSClientSupplier.class); Mockito.when( sqsClientSupplier.sqsClient( ArgumentMatchers.anyString(), diff --git a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorRequestTest.java b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorRequestTest.java index a3b8abc238..2381b88873 100644 --- a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorRequestTest.java +++ b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorRequestTest.java @@ -12,8 +12,8 @@ import com.amazonaws.services.sqs.model.MessageAttributeValue; import io.camunda.connector.api.outbound.OutboundConnectorContext; import io.camunda.connector.impl.ConnectorInputException; -import io.camunda.connector.model.SqsConnectorRequest; -import io.camunda.connector.model.SqsMessageAttribute; +import io.camunda.connector.outbound.model.SqsConnectorRequest; +import io.camunda.connector.outbound.model.SqsMessageAttribute; import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder; import java.util.Map; import org.junit.jupiter.api.BeforeEach;