Skip to content

Commit

Permalink
fix(sns,sqs): remove custom deserialization of AWS msg attributes
Browse files Browse the repository at this point in the history
Due to multiple instances of GSON, in a bundled version that contains all the connectors, there is a conflict between the GSON instances. As such, some deserializers seem not to be registered / working as expected.

This commit solves an issue with the SNS Connector.
  • Loading branch information
igpetrov committed Dec 12, 2022
1 parent 0280b31 commit 74de59f
Show file tree
Hide file tree
Showing 20 changed files with 291 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.model.SnsConnectorRequest;
import io.camunda.connector.model.SnsConnectorResult;
import io.camunda.connector.suppliers.GsonComponentSupplier;
import io.camunda.connector.suppliers.SnsClientSupplier;
import io.camunda.connector.suppliers.SnsGsonComponentSupplier;
import org.apache.commons.text.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,7 +32,7 @@ public class SnsConnectorFunction implements OutboundConnectorFunction {
private final Gson gson;

public SnsConnectorFunction() {
this(new SnsClientSupplier(), GsonComponentSupplier.gsonInstance());
this(new SnsClientSupplier(), SnsGsonComponentSupplier.gsonInstance());
}

public SnsConnectorFunction(final SnsClientSupplier snsClientSupplier, final Gson gson) {
Expand Down Expand Up @@ -65,7 +65,7 @@ private PublishResult sendMsgToSns(SnsConnectorRequest request) {
new PublishRequest()
.withTopicArn(request.getTopic().getTopicArn())
.withMessage(topicMessage)
.withMessageAttributes(request.getTopic().getMessageAttributes())
.withMessageAttributes(request.getTopic().getAwsSnsNativeMessageAttributes())
.withSubject(request.getTopic().getSubject());
return snsClient.publish(message);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Objects;
import javax.validation.constraints.NotEmpty;

public class AuthenticationRequestData {
public class SnsAuthenticationRequestData {
@NotEmpty @Secret private String accessKey;
@NotEmpty @Secret private String secretKey;

Expand Down Expand Up @@ -38,7 +38,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
AuthenticationRequestData that = (AuthenticationRequestData) o;
SnsAuthenticationRequestData that = (SnsAuthenticationRequestData) o;
return accessKey.equals(that.accessKey) && secretKey.equals(that.secretKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

public class SnsConnectorRequest {

@Valid @NotNull @Secret private AuthenticationRequestData authentication;
@Valid @NotNull @Secret private SnsAuthenticationRequestData authentication;
@Valid @NotNull @Secret private TopicRequestData topic;

public AuthenticationRequestData getAuthentication() {
public SnsAuthenticationRequestData getAuthentication() {
return authentication;
}

public void setAuthentication(final AuthenticationRequestData authentication) {
public void setAuthentication(final SnsAuthenticationRequestData authentication) {
this.authentication = authentication;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.connector.model;

import com.google.gson.annotations.SerializedName;
import java.util.Objects;
import javax.validation.constraints.NotBlank;

public class SnsMessageAttribute {
@SerializedName(value = "DataType", alternate = "dataType")
@NotBlank
private String dataType;

@SerializedName(value = "StringValue", alternate = "stringValue")
@NotBlank
private String stringValue;

public String getDataType() {
return dataType;
}

public void setDataType(String dataType) {
this.dataType = dataType;
}

public String getStringValue() {
return stringValue;
}

public void setStringValue(String stringValue) {
this.stringValue = stringValue;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

SnsMessageAttribute that = (SnsMessageAttribute) o;
return dataType.equals(that.dataType) && stringValue.equals(that.stringValue);
}

@Override
public int hashCode() {
return Objects.hash(dataType, stringValue);
}

@Override
public String toString() {
return "SNSMessageAttribute{"
+ "dataType='"
+ dataType
+ '\''
+ ", stringValue='"
+ stringValue
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

import com.amazonaws.services.sns.model.MessageAttributeValue;
import io.camunda.connector.api.annotation.Secret;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
Expand All @@ -25,7 +28,7 @@ public class TopicRequestData {
// we don't need to know the customer message as we will pass it as-is
@NotNull private Object message;

private Map<String, MessageAttributeValue> messageAttributes;
private Map<String, SnsMessageAttribute> messageAttributes;

public String getTopicArn() {
return topicArn;
Expand Down Expand Up @@ -59,14 +62,36 @@ public void setMessage(Object message) {
this.message = message;
}

public Map<String, MessageAttributeValue> getMessageAttributes() {
public Map<String, SnsMessageAttribute> getMessageAttributes() {
return messageAttributes;
}

public void setMessageAttributes(Map<String, MessageAttributeValue> messageAttributes) {
public Map<String, MessageAttributeValue> getAwsSnsNativeMessageAttributes() {
if (messageAttributes == null) {
return Collections.emptyMap();
}

final Map<String, MessageAttributeValue> snsNativeMessageAttributes = new HashMap<>();
messageAttributes.forEach(
(key, value) ->
snsNativeMessageAttributes.put(key, messageAttributeTransformer().apply(value)));

return snsNativeMessageAttributes;
}

public void setMessageAttributes(Map<String, SnsMessageAttribute> messageAttributes) {
this.messageAttributes = messageAttributes;
}

private Function<SnsMessageAttribute, MessageAttributeValue> messageAttributeTransformer() {
return snsMessageAttribute -> {
MessageAttributeValue msgAttr = new MessageAttributeValue();
msgAttr.setDataType(snsMessageAttribute.getDataType());
msgAttr.setStringValue(snsMessageAttribute.getStringValue());
return msgAttr;
};
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.connector.suppliers;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public final class SnsGsonComponentSupplier {

private static final Gson GSON = new GsonBuilder().create();

private SnsGsonComponentSupplier() {}

public static Gson gsonInstance() {
return GSON;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package io.camunda.connector;

import com.google.gson.Gson;
import io.camunda.connector.suppliers.GsonComponentSupplier;
import io.camunda.connector.suppliers.SnsGsonComponentSupplier;

public abstract class BaseTest {

Expand Down Expand Up @@ -48,5 +48,5 @@ public abstract class BaseTest {
+ " }\n"
+ "}";

protected static final Gson GSON = GsonComponentSupplier.gsonInstance();
protected static final Gson GSON = SnsGsonComponentSupplier.gsonInstance();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.model.SnsConnectorRequest;
import io.camunda.connector.model.SnsConnectorResult;
import io.camunda.connector.suppliers.GsonComponentSupplier;
import io.camunda.connector.suppliers.SnsClientSupplier;
import io.camunda.connector.suppliers.SnsGsonComponentSupplier;
import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -78,7 +78,7 @@ private static Stream<String> loadRequestCasesFromFile(final String fileName) th

@BeforeEach
void setup() {
function = new SnsConnectorFunction(snsClientSupplier, GsonComponentSupplier.gsonInstance());
function = new SnsConnectorFunction(snsClientSupplier, SnsGsonComponentSupplier.gsonInstance());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.amazonaws.services.sns.model.MessageAttributeValue;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.impl.ConnectorInputException;
import io.camunda.connector.model.SnsConnectorRequest;
import io.camunda.connector.model.SnsMessageAttribute;
import io.camunda.connector.test.outbound.OutboundConnectorContextBuilder;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -96,4 +99,18 @@ void replaceSecrets_shouldNotReplaceSecretsIfTheyDidNotStartFromSecretsWord() {
assertThat(request.getTopic().getTopicArn()).isEqualTo(AWS_TOPIC_ARN);
assertThat(request.getTopic().getRegion()).isEqualTo(AWS_TOPIC_REGION);
}

@Test
void execute_messageAttributesParsedCorrectly() {
// Given request is DEFAULT_REQUEST_BODY with message attributes
// When fetching native AWS SNS message attributes, they (native AWS SNS attributes) are mapped
// correctly
Map<String, SnsMessageAttribute> msgAttrsFromRequest =
request.getTopic().getMessageAttributes();
Map<String, MessageAttributeValue> msgAttrsRemapped =
request.getTopic().getAwsSnsNativeMessageAttributes();

assertThat(msgAttrsRemapped.size()).isNotZero();
assertThat(msgAttrsRemapped).hasSameSizeAs(msgAttrsFromRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.model.SqsConnectorRequest;
import io.camunda.connector.model.SqsConnectorResult;
import io.camunda.connector.suppliers.GsonComponentSupplier;
import io.camunda.connector.suppliers.SqsClientSupplier;
import io.camunda.connector.suppliers.SqsGsonComponentSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,7 +31,7 @@ public class SqsConnectorFunction implements OutboundConnectorFunction {
private final Gson gson;

public SqsConnectorFunction() {
this(new SqsClientSupplier(), GsonComponentSupplier.gsonInstance());
this(new SqsClientSupplier(), SqsGsonComponentSupplier.gsonInstance());
}

public SqsConnectorFunction(final SqsClientSupplier sqsClientSupplier, final Gson gson) {
Expand Down Expand Up @@ -61,7 +61,7 @@ private SendMessageResult sendMsgToSqs(SqsConnectorRequest request) {
new SendMessageRequest()
.withQueueUrl(request.getQueue().getUrl())
.withMessageBody(request.getQueue().getMessageBody().toString())
.withMessageAttributes(request.getQueue().getMessageAttributes());
.withMessageAttributes(request.getQueue().getAwsSqsNativeMessageAttributes());
return sqsClient.sendMessage(message);
} finally {
if (sqsClient != null) {
Expand Down
Loading

0 comments on commit 74de59f

Please sign in to comment.