From 74de59f9def923035c3e0b5488bb6269caacf6c7 Mon Sep 17 00:00:00 2001 From: Igor Petrov Date: Sat, 10 Dec 2022 18:33:47 +0200 Subject: [PATCH] fix(sns,sqs): remove custom deserialization of AWS msg attributes Due to multiple instances of GSON, in a bundled version that contains all the connectors, there is a conflict between the GSON instances. As such, some deserializers seem not to be registered / working as expected. This commit solves an issue with the SNS Connector. --- .../connector/SnsConnectorFunction.java | 6 +- ...java => SnsAuthenticationRequestData.java} | 4 +- .../connector/model/SnsConnectorRequest.java | 6 +- .../connector/model/SnsMessageAttribute.java | 68 +++++++++++++++++++ .../connector/model/TopicRequestData.java | 31 ++++++++- .../suppliers/GsonComponentSupplier.java | 54 --------------- .../suppliers/SnsGsonComponentSupplier.java | 21 ++++++ .../java/io.camunda.connector/BaseTest.java | 4 +- .../SnsConnectorFunctionParametrizedTest.java | 4 +- .../SnsConnectorRequestTest.java | 17 +++++ .../connector/SqsConnectorFunction.java | 6 +- .../connector/model/QueueRequestData.java | 31 ++++++++- ...java => SqsAuthenticationRequestData.java} | 4 +- .../connector/model/SqsConnectorRequest.java | 6 +- .../connector/model/SqsMessageAttribute.java | 67 ++++++++++++++++++ .../suppliers/GsonComponentSupplier.java | 54 --------------- .../suppliers/SqsGsonComponentSupplier.java | 21 ++++++ .../java/io/camunda/connector/BaseTest.java | 4 +- .../SqsConnectorFunctionParametrizedTest.java | 4 +- .../connector/SqsConnectorRequestTest.java | 17 +++++ 20 files changed, 291 insertions(+), 138 deletions(-) rename connectors/sns/src/main/java/io/camunda/connector/model/{AuthenticationRequestData.java => SnsAuthenticationRequestData.java} (91%) create mode 100644 connectors/sns/src/main/java/io/camunda/connector/model/SnsMessageAttribute.java delete mode 100644 connectors/sns/src/main/java/io/camunda/connector/suppliers/GsonComponentSupplier.java create mode 100644 connectors/sns/src/main/java/io/camunda/connector/suppliers/SnsGsonComponentSupplier.java rename connectors/sqs/src/main/java/io/camunda/connector/model/{AuthenticationRequestData.java => SqsAuthenticationRequestData.java} (91%) create mode 100644 connectors/sqs/src/main/java/io/camunda/connector/model/SqsMessageAttribute.java delete mode 100644 connectors/sqs/src/main/java/io/camunda/connector/suppliers/GsonComponentSupplier.java create mode 100644 connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsGsonComponentSupplier.java diff --git a/connectors/sns/src/main/java/io/camunda/connector/SnsConnectorFunction.java b/connectors/sns/src/main/java/io/camunda/connector/SnsConnectorFunction.java index 9c09c0f4eb..b89bbd38fd 100644 --- a/connectors/sns/src/main/java/io/camunda/connector/SnsConnectorFunction.java +++ b/connectors/sns/src/main/java/io/camunda/connector/SnsConnectorFunction.java @@ -15,8 +15,8 @@ import io.camunda.connector.api.outbound.OutboundConnectorFunction; import io.camunda.connector.model.SnsConnectorRequest; import io.camunda.connector.model.SnsConnectorResult; -import io.camunda.connector.suppliers.GsonComponentSupplier; import io.camunda.connector.suppliers.SnsClientSupplier; +import io.camunda.connector.suppliers.SnsGsonComponentSupplier; import org.apache.commons.text.StringEscapeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +32,7 @@ public class SnsConnectorFunction implements OutboundConnectorFunction { private final Gson gson; public SnsConnectorFunction() { - this(new SnsClientSupplier(), GsonComponentSupplier.gsonInstance()); + this(new SnsClientSupplier(), SnsGsonComponentSupplier.gsonInstance()); } public SnsConnectorFunction(final SnsClientSupplier snsClientSupplier, final Gson gson) { @@ -65,7 +65,7 @@ private PublishResult sendMsgToSns(SnsConnectorRequest request) { new PublishRequest() .withTopicArn(request.getTopic().getTopicArn()) .withMessage(topicMessage) - .withMessageAttributes(request.getTopic().getMessageAttributes()) + .withMessageAttributes(request.getTopic().getAwsSnsNativeMessageAttributes()) .withSubject(request.getTopic().getSubject()); return snsClient.publish(message); } finally { diff --git a/connectors/sns/src/main/java/io/camunda/connector/model/AuthenticationRequestData.java b/connectors/sns/src/main/java/io/camunda/connector/model/SnsAuthenticationRequestData.java similarity index 91% rename from connectors/sns/src/main/java/io/camunda/connector/model/AuthenticationRequestData.java rename to connectors/sns/src/main/java/io/camunda/connector/model/SnsAuthenticationRequestData.java index 644851663a..cba671dd8b 100644 --- a/connectors/sns/src/main/java/io/camunda/connector/model/AuthenticationRequestData.java +++ b/connectors/sns/src/main/java/io/camunda/connector/model/SnsAuthenticationRequestData.java @@ -10,7 +10,7 @@ import java.util.Objects; import javax.validation.constraints.NotEmpty; -public class AuthenticationRequestData { +public class SnsAuthenticationRequestData { @NotEmpty @Secret private String accessKey; @NotEmpty @Secret private String secretKey; @@ -38,7 +38,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - AuthenticationRequestData that = (AuthenticationRequestData) o; + SnsAuthenticationRequestData that = (SnsAuthenticationRequestData) o; return accessKey.equals(that.accessKey) && secretKey.equals(that.secretKey); } diff --git a/connectors/sns/src/main/java/io/camunda/connector/model/SnsConnectorRequest.java b/connectors/sns/src/main/java/io/camunda/connector/model/SnsConnectorRequest.java index 249870b18f..2ae479d745 100644 --- a/connectors/sns/src/main/java/io/camunda/connector/model/SnsConnectorRequest.java +++ b/connectors/sns/src/main/java/io/camunda/connector/model/SnsConnectorRequest.java @@ -13,14 +13,14 @@ public class SnsConnectorRequest { - @Valid @NotNull @Secret private AuthenticationRequestData authentication; + @Valid @NotNull @Secret private SnsAuthenticationRequestData authentication; @Valid @NotNull @Secret private TopicRequestData topic; - public AuthenticationRequestData getAuthentication() { + public SnsAuthenticationRequestData getAuthentication() { return authentication; } - public void setAuthentication(final AuthenticationRequestData authentication) { + public void setAuthentication(final SnsAuthenticationRequestData authentication) { this.authentication = authentication; } diff --git a/connectors/sns/src/main/java/io/camunda/connector/model/SnsMessageAttribute.java b/connectors/sns/src/main/java/io/camunda/connector/model/SnsMessageAttribute.java new file mode 100644 index 0000000000..60a57deb22 --- /dev/null +++ b/connectors/sns/src/main/java/io/camunda/connector/model/SnsMessageAttribute.java @@ -0,0 +1,68 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.model; + +import com.google.gson.annotations.SerializedName; +import java.util.Objects; +import javax.validation.constraints.NotBlank; + +public class SnsMessageAttribute { + @SerializedName(value = "DataType", alternate = "dataType") + @NotBlank + private String dataType; + + @SerializedName(value = "StringValue", alternate = "stringValue") + @NotBlank + private String stringValue; + + public String getDataType() { + return dataType; + } + + public void setDataType(String dataType) { + this.dataType = dataType; + } + + public String getStringValue() { + return stringValue; + } + + public void setStringValue(String stringValue) { + this.stringValue = stringValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + SnsMessageAttribute that = (SnsMessageAttribute) o; + return dataType.equals(that.dataType) && stringValue.equals(that.stringValue); + } + + @Override + public int hashCode() { + return Objects.hash(dataType, stringValue); + } + + @Override + public String toString() { + return "SNSMessageAttribute{" + + "dataType='" + + dataType + + '\'' + + ", stringValue='" + + stringValue + + '\'' + + '}'; + } +} diff --git a/connectors/sns/src/main/java/io/camunda/connector/model/TopicRequestData.java b/connectors/sns/src/main/java/io/camunda/connector/model/TopicRequestData.java index 3f7d7b2f36..de79c47f2f 100644 --- a/connectors/sns/src/main/java/io/camunda/connector/model/TopicRequestData.java +++ b/connectors/sns/src/main/java/io/camunda/connector/model/TopicRequestData.java @@ -8,8 +8,11 @@ import com.amazonaws.services.sns.model.MessageAttributeValue; import io.camunda.connector.api.annotation.Secret; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import javax.validation.constraints.Size; @@ -25,7 +28,7 @@ public class TopicRequestData { // we don't need to know the customer message as we will pass it as-is @NotNull private Object message; - private Map messageAttributes; + private Map messageAttributes; public String getTopicArn() { return topicArn; @@ -59,14 +62,36 @@ public void setMessage(Object message) { this.message = message; } - public Map getMessageAttributes() { + public Map getMessageAttributes() { return messageAttributes; } - public void setMessageAttributes(Map messageAttributes) { + public Map getAwsSnsNativeMessageAttributes() { + if (messageAttributes == null) { + return Collections.emptyMap(); + } + + final Map snsNativeMessageAttributes = new HashMap<>(); + messageAttributes.forEach( + (key, value) -> + snsNativeMessageAttributes.put(key, messageAttributeTransformer().apply(value))); + + return snsNativeMessageAttributes; + } + + public void setMessageAttributes(Map messageAttributes) { this.messageAttributes = messageAttributes; } + private Function messageAttributeTransformer() { + return snsMessageAttribute -> { + MessageAttributeValue msgAttr = new MessageAttributeValue(); + msgAttr.setDataType(snsMessageAttribute.getDataType()); + msgAttr.setStringValue(snsMessageAttribute.getStringValue()); + return msgAttr; + }; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/connectors/sns/src/main/java/io/camunda/connector/suppliers/GsonComponentSupplier.java b/connectors/sns/src/main/java/io/camunda/connector/suppliers/GsonComponentSupplier.java deleted file mode 100644 index c82e9175fa..0000000000 --- a/connectors/sns/src/main/java/io/camunda/connector/suppliers/GsonComponentSupplier.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH - * under one or more contributor license agreements. Licensed under a proprietary license. - * See the License.txt file for more information. You may not use this file - * except in compliance with the proprietary license. - */ -package io.camunda.connector.suppliers; - -import com.amazonaws.services.sns.model.MessageAttributeValue; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import java.lang.reflect.Type; -import java.util.Optional; - -public final class GsonComponentSupplier { - - private static final Gson GSON = - new GsonBuilder() - .registerTypeAdapter(MessageAttributeValue.class, new MessageAttributeValueDeserializer()) - .create(); - - private GsonComponentSupplier() {} - - public static Gson gsonInstance() { - return GSON; - } - - protected static class MessageAttributeValueDeserializer - implements JsonDeserializer { - - @Override - public MessageAttributeValue deserialize( - final JsonElement jsonElement, - final Type type, - final JsonDeserializationContext jsonDeserializationContext) - throws JsonParseException { - MessageAttributeValue mav = new MessageAttributeValue(); - JsonObject jObject = jsonElement.getAsJsonObject(); - // While there are currently multiple field types in the MessageAttributeValue, the actual - // support is very limited to DataType, StringValue and BinaryValue. - // At the same time, we do not plan to support custom binary types. - // See more at: - // https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html - Optional.ofNullable(jObject.get("DataType").getAsString()).ifPresent(mav::setDataType); - Optional.ofNullable(jObject.get("StringValue").getAsString()).ifPresent(mav::setStringValue); - return mav; - } - } -} diff --git a/connectors/sns/src/main/java/io/camunda/connector/suppliers/SnsGsonComponentSupplier.java b/connectors/sns/src/main/java/io/camunda/connector/suppliers/SnsGsonComponentSupplier.java new file mode 100644 index 0000000000..d2f858ca51 --- /dev/null +++ b/connectors/sns/src/main/java/io/camunda/connector/suppliers/SnsGsonComponentSupplier.java @@ -0,0 +1,21 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.suppliers; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public final class SnsGsonComponentSupplier { + + private static final Gson GSON = new GsonBuilder().create(); + + private SnsGsonComponentSupplier() {} + + public static Gson gsonInstance() { + return GSON; + } +} diff --git a/connectors/sns/src/test/java/io.camunda.connector/BaseTest.java b/connectors/sns/src/test/java/io.camunda.connector/BaseTest.java index 7b9ddffcdb..23a0e3deef 100644 --- a/connectors/sns/src/test/java/io.camunda.connector/BaseTest.java +++ b/connectors/sns/src/test/java/io.camunda.connector/BaseTest.java @@ -7,7 +7,7 @@ package io.camunda.connector; import com.google.gson.Gson; -import io.camunda.connector.suppliers.GsonComponentSupplier; +import io.camunda.connector.suppliers.SnsGsonComponentSupplier; public abstract class BaseTest { @@ -48,5 +48,5 @@ public abstract class BaseTest { + " }\n" + "}"; - protected static final Gson GSON = GsonComponentSupplier.gsonInstance(); + protected static final Gson GSON = SnsGsonComponentSupplier.gsonInstance(); } diff --git a/connectors/sns/src/test/java/io.camunda.connector/SnsConnectorFunctionParametrizedTest.java b/connectors/sns/src/test/java/io.camunda.connector/SnsConnectorFunctionParametrizedTest.java index 76c2b97a61..d14c3b6976 100644 --- a/connectors/sns/src/test/java/io.camunda.connector/SnsConnectorFunctionParametrizedTest.java +++ b/connectors/sns/src/test/java/io.camunda.connector/SnsConnectorFunctionParametrizedTest.java @@ -29,8 +29,8 @@ import io.camunda.connector.api.outbound.OutboundConnectorContext; import io.camunda.connector.model.SnsConnectorRequest; import io.camunda.connector.model.SnsConnectorResult; -import io.camunda.connector.suppliers.GsonComponentSupplier; import io.camunda.connector.suppliers.SnsClientSupplier; +import io.camunda.connector.suppliers.SnsGsonComponentSupplier; import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder; import java.io.File; import java.io.IOException; @@ -78,7 +78,7 @@ private static Stream loadRequestCasesFromFile(final String fileName) th @BeforeEach void setup() { - function = new SnsConnectorFunction(snsClientSupplier, GsonComponentSupplier.gsonInstance()); + function = new SnsConnectorFunction(snsClientSupplier, SnsGsonComponentSupplier.gsonInstance()); } @ParameterizedTest diff --git a/connectors/sns/src/test/java/io.camunda.connector/SnsConnectorRequestTest.java b/connectors/sns/src/test/java/io.camunda.connector/SnsConnectorRequestTest.java index 53513a7f14..15a7969905 100644 --- a/connectors/sns/src/test/java/io.camunda.connector/SnsConnectorRequestTest.java +++ b/connectors/sns/src/test/java/io.camunda.connector/SnsConnectorRequestTest.java @@ -9,10 +9,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.amazonaws.services.sns.model.MessageAttributeValue; import io.camunda.connector.api.outbound.OutboundConnectorContext; import io.camunda.connector.impl.ConnectorInputException; import io.camunda.connector.model.SnsConnectorRequest; +import io.camunda.connector.model.SnsMessageAttribute; import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -96,4 +99,18 @@ void replaceSecrets_shouldNotReplaceSecretsIfTheyDidNotStartFromSecretsWord() { assertThat(request.getTopic().getTopicArn()).isEqualTo(AWS_TOPIC_ARN); assertThat(request.getTopic().getRegion()).isEqualTo(AWS_TOPIC_REGION); } + + @Test + void execute_messageAttributesParsedCorrectly() { + // Given request is DEFAULT_REQUEST_BODY with message attributes + // When fetching native AWS SNS message attributes, they (native AWS SNS attributes) are mapped + // correctly + Map msgAttrsFromRequest = + request.getTopic().getMessageAttributes(); + Map msgAttrsRemapped = + request.getTopic().getAwsSnsNativeMessageAttributes(); + + assertThat(msgAttrsRemapped.size()).isNotZero(); + assertThat(msgAttrsRemapped).hasSameSizeAs(msgAttrsFromRequest); + } } diff --git a/connectors/sqs/src/main/java/io/camunda/connector/SqsConnectorFunction.java b/connectors/sqs/src/main/java/io/camunda/connector/SqsConnectorFunction.java index 4e5fee6dbb..e63fffa102 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/SqsConnectorFunction.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/SqsConnectorFunction.java @@ -15,8 +15,8 @@ import io.camunda.connector.api.outbound.OutboundConnectorFunction; import io.camunda.connector.model.SqsConnectorRequest; import io.camunda.connector.model.SqsConnectorResult; -import io.camunda.connector.suppliers.GsonComponentSupplier; import io.camunda.connector.suppliers.SqsClientSupplier; +import io.camunda.connector.suppliers.SqsGsonComponentSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,7 @@ public class SqsConnectorFunction implements OutboundConnectorFunction { private final Gson gson; public SqsConnectorFunction() { - this(new SqsClientSupplier(), GsonComponentSupplier.gsonInstance()); + this(new SqsClientSupplier(), SqsGsonComponentSupplier.gsonInstance()); } public SqsConnectorFunction(final SqsClientSupplier sqsClientSupplier, final Gson gson) { @@ -61,7 +61,7 @@ private SendMessageResult sendMsgToSqs(SqsConnectorRequest request) { new SendMessageRequest() .withQueueUrl(request.getQueue().getUrl()) .withMessageBody(request.getQueue().getMessageBody().toString()) - .withMessageAttributes(request.getQueue().getMessageAttributes()); + .withMessageAttributes(request.getQueue().getAwsSqsNativeMessageAttributes()); return sqsClient.sendMessage(message); } finally { if (sqsClient != null) { diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/QueueRequestData.java b/connectors/sqs/src/main/java/io/camunda/connector/model/QueueRequestData.java index fdc26853fc..d687aa8954 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/QueueRequestData.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/model/QueueRequestData.java @@ -8,8 +8,11 @@ import com.amazonaws.services.sqs.model.MessageAttributeValue; import io.camunda.connector.api.annotation.Secret; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; @@ -21,7 +24,7 @@ public class QueueRequestData { @NotNull private Object messageBody; // we don't need to know the customer message as we will pass it as-is - private Map messageAttributes; + private Map messageAttributes; public String getUrl() { return url; @@ -47,14 +50,36 @@ public void setMessageBody(Object messageBody) { this.messageBody = messageBody; } - public Map getMessageAttributes() { + public Map getMessageAttributes() { return messageAttributes; } - public void setMessageAttributes(Map messageAttributes) { + public Map getAwsSqsNativeMessageAttributes() { + if (messageAttributes == null) { + return Collections.emptyMap(); + } + + final Map sqsNativeMessageAttributes = new HashMap<>(); + messageAttributes.forEach( + (key, value) -> + sqsNativeMessageAttributes.put(key, messageAttributeTransformer().apply(value))); + + return sqsNativeMessageAttributes; + } + + public void setMessageAttributes(Map messageAttributes) { this.messageAttributes = messageAttributes; } + private Function messageAttributeTransformer() { + return snsMessageAttribute -> { + MessageAttributeValue msgAttr = new MessageAttributeValue(); + msgAttr.setDataType(snsMessageAttribute.getDataType()); + msgAttr.setStringValue(snsMessageAttribute.getStringValue()); + return msgAttr; + }; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/AuthenticationRequestData.java b/connectors/sqs/src/main/java/io/camunda/connector/model/SqsAuthenticationRequestData.java similarity index 91% rename from connectors/sqs/src/main/java/io/camunda/connector/model/AuthenticationRequestData.java rename to connectors/sqs/src/main/java/io/camunda/connector/model/SqsAuthenticationRequestData.java index 644851663a..7817d83c1e 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/AuthenticationRequestData.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/model/SqsAuthenticationRequestData.java @@ -10,7 +10,7 @@ import java.util.Objects; import javax.validation.constraints.NotEmpty; -public class AuthenticationRequestData { +public class SqsAuthenticationRequestData { @NotEmpty @Secret private String accessKey; @NotEmpty @Secret private String secretKey; @@ -38,7 +38,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - AuthenticationRequestData that = (AuthenticationRequestData) o; + SqsAuthenticationRequestData that = (SqsAuthenticationRequestData) o; return accessKey.equals(that.accessKey) && secretKey.equals(that.secretKey); } diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorRequest.java b/connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorRequest.java index e69186496e..9962eb2045 100644 --- a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorRequest.java +++ b/connectors/sqs/src/main/java/io/camunda/connector/model/SqsConnectorRequest.java @@ -13,14 +13,14 @@ public class SqsConnectorRequest { - @Valid @NotNull @Secret private AuthenticationRequestData authentication; + @Valid @NotNull @Secret private SqsAuthenticationRequestData authentication; @Valid @NotNull @Secret private QueueRequestData queue; - public AuthenticationRequestData getAuthentication() { + public SqsAuthenticationRequestData getAuthentication() { return authentication; } - public void setAuthentication(final AuthenticationRequestData authentication) { + public void setAuthentication(final SqsAuthenticationRequestData authentication) { this.authentication = authentication; } diff --git a/connectors/sqs/src/main/java/io/camunda/connector/model/SqsMessageAttribute.java b/connectors/sqs/src/main/java/io/camunda/connector/model/SqsMessageAttribute.java new file mode 100644 index 0000000000..6f7d934c36 --- /dev/null +++ b/connectors/sqs/src/main/java/io/camunda/connector/model/SqsMessageAttribute.java @@ -0,0 +1,67 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.model; + +import com.google.gson.annotations.SerializedName; +import java.util.Objects; +import javax.validation.constraints.NotBlank; + +public class SqsMessageAttribute { + + @SerializedName(value = "DataType", alternate = "dataType") + @NotBlank + private String dataType; + + @SerializedName(value = "StringValue", alternate = "stringValue") + @NotBlank + private String stringValue; + + public String getDataType() { + return dataType; + } + + public void setDataType(String dataType) { + this.dataType = dataType; + } + + public String getStringValue() { + return stringValue; + } + + public void setStringValue(String stringValue) { + this.stringValue = stringValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqsMessageAttribute that = (SqsMessageAttribute) o; + return dataType.equals(that.dataType) && stringValue.equals(that.stringValue); + } + + @Override + public int hashCode() { + return Objects.hash(dataType, stringValue); + } + + @Override + public String toString() { + return "SqsMessageAttribute{" + + "dataType='" + + dataType + + '\'' + + ", stringValue='" + + stringValue + + '\'' + + '}'; + } +} diff --git a/connectors/sqs/src/main/java/io/camunda/connector/suppliers/GsonComponentSupplier.java b/connectors/sqs/src/main/java/io/camunda/connector/suppliers/GsonComponentSupplier.java deleted file mode 100644 index 61e95318c1..0000000000 --- a/connectors/sqs/src/main/java/io/camunda/connector/suppliers/GsonComponentSupplier.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH - * under one or more contributor license agreements. Licensed under a proprietary license. - * See the License.txt file for more information. You may not use this file - * except in compliance with the proprietary license. - */ -package io.camunda.connector.suppliers; - -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import java.lang.reflect.Type; -import java.util.Optional; - -public final class GsonComponentSupplier { - - private static final Gson GSON = - new GsonBuilder() - .registerTypeAdapter(MessageAttributeValue.class, new MessageAttributeValueDeserializer()) - .create(); - - private GsonComponentSupplier() {} - - public static Gson gsonInstance() { - return GSON; - } - - protected static class MessageAttributeValueDeserializer - implements JsonDeserializer { - - @Override - public MessageAttributeValue deserialize( - final JsonElement jsonElement, - final Type type, - final JsonDeserializationContext jsonDeserializationContext) - throws JsonParseException { - MessageAttributeValue mav = new MessageAttributeValue(); - JsonObject jObject = jsonElement.getAsJsonObject(); - // While there are currently multiple field types in the MessageAttributeValue, the actual - // support is very limited to DataType, StringValue and BinaryValue. - // At the same time, we do not plan to support custom binary types. - // See more at: - // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_MessageAttributeValue.html - Optional.ofNullable(jObject.get("DataType").getAsString()).ifPresent(mav::setDataType); - Optional.ofNullable(jObject.get("StringValue").getAsString()).ifPresent(mav::setStringValue); - return mav; - } - } -} diff --git a/connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsGsonComponentSupplier.java b/connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsGsonComponentSupplier.java new file mode 100644 index 0000000000..bd56e091fa --- /dev/null +++ b/connectors/sqs/src/main/java/io/camunda/connector/suppliers/SqsGsonComponentSupplier.java @@ -0,0 +1,21 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. Licensed under a proprietary license. + * See the License.txt file for more information. You may not use this file + * except in compliance with the proprietary license. + */ +package io.camunda.connector.suppliers; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public final class SqsGsonComponentSupplier { + + private static final Gson GSON = new GsonBuilder().create(); + + private SqsGsonComponentSupplier() {} + + public static Gson gsonInstance() { + return GSON; + } +} diff --git a/connectors/sqs/src/test/java/io/camunda/connector/BaseTest.java b/connectors/sqs/src/test/java/io/camunda/connector/BaseTest.java index 0ecc7c5ccc..3ca614d903 100644 --- a/connectors/sqs/src/test/java/io/camunda/connector/BaseTest.java +++ b/connectors/sqs/src/test/java/io/camunda/connector/BaseTest.java @@ -7,7 +7,7 @@ package io.camunda.connector; import com.google.gson.Gson; -import io.camunda.connector.suppliers.GsonComponentSupplier; +import io.camunda.connector.suppliers.SqsGsonComponentSupplier; public abstract class BaseTest { @@ -50,5 +50,5 @@ public abstract class BaseTest { + " }\n" + " }"; - protected static final Gson GSON = GsonComponentSupplier.gsonInstance(); + protected static final Gson GSON = SqsGsonComponentSupplier.gsonInstance(); } diff --git a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionParametrizedTest.java b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionParametrizedTest.java index deaa45735a..c2f1ab9f45 100644 --- a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionParametrizedTest.java +++ b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorFunctionParametrizedTest.java @@ -28,8 +28,8 @@ import io.camunda.connector.api.outbound.OutboundConnectorContext; import io.camunda.connector.model.SqsConnectorRequest; import io.camunda.connector.model.SqsConnectorResult; -import io.camunda.connector.suppliers.GsonComponentSupplier; import io.camunda.connector.suppliers.SqsClientSupplier; +import io.camunda.connector.suppliers.SqsGsonComponentSupplier; import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder; import java.io.File; import java.io.IOException; @@ -62,7 +62,7 @@ class SqsConnectorFunctionParametrizedTest { @BeforeEach void setup() { - function = new SqsConnectorFunction(sqsClientSupplier, GsonComponentSupplier.gsonInstance()); + function = new SqsConnectorFunction(sqsClientSupplier, SqsGsonComponentSupplier.gsonInstance()); } @ParameterizedTest diff --git a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorRequestTest.java b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorRequestTest.java index ab1a0ad076..2c840a8231 100644 --- a/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorRequestTest.java +++ b/connectors/sqs/src/test/java/io/camunda/connector/SqsConnectorRequestTest.java @@ -9,10 +9,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.amazonaws.services.sqs.model.MessageAttributeValue; import io.camunda.connector.api.outbound.OutboundConnectorContext; import io.camunda.connector.impl.ConnectorInputException; import io.camunda.connector.model.SqsConnectorRequest; +import io.camunda.connector.model.SqsMessageAttribute; import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -90,4 +93,18 @@ void replaceSecrets_shouldDoNotReplaceSecretsIfTheyDidNotStartFromSecretsWord() assertThat(request.getAuthentication().getAccessKey()).isEqualTo(AWS_ACCESS_KEY); assertThat(request.getQueue().getUrl()).isEqualTo(SQS_QUEUE_URL); } + + @Test + void execute_messageAttributesParsedCorrectly() { + // Given request is DEFAULT_REQUEST_BODY with message attributes + // When fetching native AWS SQS message attributes, they (native AWS SNS attributes) are mapped + // correctly + Map msgAttrsFromRequest = + request.getQueue().getMessageAttributes(); + Map msgAttrsRemapped = + request.getQueue().getAwsSqsNativeMessageAttributes(); + + assertThat(msgAttrsRemapped.size()).isNotZero(); + assertThat(msgAttrsRemapped).hasSameSizeAs(msgAttrsFromRequest); + } }