diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java
index 85b7ca8f1b..294c41ea2b 100644
--- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java
+++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java
@@ -47,6 +47,7 @@ class SqsWorkerIT {
private PluginMetrics pluginMetrics;
private S3ObjectGenerator s3ObjectGenerator;
private String bucket;
+ private S3EventMessageParser s3EventMessageParser;
private Backoff backoff;
private AcknowledgementSetManager acknowledgementSetManager;
@@ -58,6 +59,7 @@ void setUp() {
.build();
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
+ s3EventMessageParser = new S3EventMessageParser();
sqsClient = SqsClient.builder()
.region(Region.of(System.getProperty("tests.s3source.region")))
@@ -88,7 +90,7 @@ void setUp() {
}
private SqsWorker createObjectUnderTest() {
- return new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
+ return new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff);
}
@AfterEach
diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParser.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParser.java
new file mode 100644
index 0000000000..a7b3d72190
--- /dev/null
+++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParser.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Handles parsing of S3 Event messages.
+ */
+public class S3EventMessageParser {
+ private static final String SNS_MESSAGE_KEY = "Message";
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Parses a message body into an {@link S3EventNotification} class.
+ *
+ * @param messageBody Input body
+ * @return The parsed event notification
+ * @throws JsonProcessingException An exception with parsing the event notification.
+ */
+ S3EventNotification parseMessage(final String messageBody) throws JsonProcessingException {
+ final JsonNode parsedNode = objectMapper.readTree(messageBody);
+
+ final JsonNode eventNode = getS3EventNode(parsedNode);
+
+ return objectMapper.treeToValue(eventNode, S3EventNotification.class);
+ }
+
+ private JsonNode getS3EventNode(final JsonNode parsedNode) throws JsonProcessingException {
+ if(isSnsWrappedMessage(parsedNode)) {
+ final String messageString = parsedNode.get(SNS_MESSAGE_KEY).asText();
+ return objectMapper.readValue(messageString, JsonNode.class);
+ }
+
+ return parsedNode;
+ }
+
+ private boolean isSnsWrappedMessage(final JsonNode parsedNode) {
+ return parsedNode.has(SNS_MESSAGE_KEY);
+ }
+}
diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java
index 09670f4b39..0e81f0f2dc 100644
--- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java
+++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java
@@ -5,22 +5,19 @@
package org.opensearch.dataprepper.plugins.source;
-import java.util.List;
-
-import org.joda.time.DateTime;
-
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.joda.time.DateTime;
import software.amazon.awssdk.utils.http.SdkHttpUtils;
+import java.util.List;
+
/**
* A helper class that represents a strongly typed S3 EventNotification item sent
* to SQS, SNS, or Lambda.
- *
+ *
* This class is derived from S3EventNotification
in the AWS SDKv1 for Java.
*/
public class S3EventNotification {
@@ -34,30 +31,6 @@ public S3EventNotification(
this.records = records;
}
- /**
- *
- * Parse the JSON string into a S3EventNotification object.
- *
- *
- * The function will try its best to parse input JSON string as best as it can.
- * It will not fail even if the JSON string contains unknown properties.
- * The function will throw SdkClientException if the input JSON string is
- * not valid JSON.
- *
- * @param json
- * JSON string to parse. Typically this is the body of your SQS
- * notification message body.
- *
- * @return The resulting S3EventNotification object.
- */
- public static S3EventNotification parseJson(String json) throws JsonProcessingException {
- if (json == null) {
- return null;
- }
- final ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.readValue(json, S3EventNotification.class);
- }
-
/**
* @return the records in this notification
*/
diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java
index 371c353d5c..f083249c13 100644
--- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java
+++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java
@@ -27,6 +27,7 @@ public class SqsService {
private final SqsClient sqsClient;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
+ private final S3EventMessageParser s3EventMessageParser;
private Thread sqsWorkerThread;
@@ -39,12 +40,13 @@ public SqsService(final AcknowledgementSetManager acknowledgementSetManager,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sqsClient = createSqsClient();
+ s3EventMessageParser = new S3EventMessageParser();
}
public void start() {
final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
- sqsWorkerThread = new Thread(new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, backoff));
+ sqsWorkerThread = new Thread(new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff));
sqsWorkerThread.start();
}
diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java
index 93e6a0931b..9ee5d95068 100644
--- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java
+++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java
@@ -61,6 +61,7 @@ public class SqsWorker implements Runnable {
private final Counter sqsMessagesDeleteFailedCounter;
private final Counter acknowledgementSetCallbackCounter;
private final Timer sqsMessageDelayTimer;
+ private final S3EventMessageParser s3EventMessageParser;
private final Backoff standardBackoff;
private int failedAttemptCount;
private boolean endToEndAcknowledgementsEnabled;
@@ -71,11 +72,13 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
final S3Service s3Service,
final S3SourceConfig s3SourceConfig,
final PluginMetrics pluginMetrics,
+ final S3EventMessageParser s3EventMessageParser,
final Backoff backoff) {
this.sqsClient = sqsClient;
this.s3Service = s3Service;
this.s3SourceConfig = s3SourceConfig;
this.acknowledgementSetManager = acknowledgementSetManager;
+ this.s3EventMessageParser = s3EventMessageParser;
this.standardBackoff = backoff;
this.endToEndAcknowledgementsEnabled = s3SourceConfig.getAcknowledgements();
sqsOptions = s3SourceConfig.getSqsOptions();
@@ -178,7 +181,7 @@ private Collection getS3MessageEventNotificationRecords(final Lis
private ParsedMessage convertS3EventMessages(final Message message) {
try {
- final S3EventNotification s3EventNotification = S3EventNotification.parseJson(message.body());
+ final S3EventNotification s3EventNotification = s3EventMessageParser.parseMessage(message.body());
if (s3EventNotification.getRecords() != null)
return new ParsedMessage(message, s3EventNotification.getRecords());
else {
diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParserTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParserTest.java
new file mode 100644
index 0000000000..b14c8ac7f8
--- /dev/null
+++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParserTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.source;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class S3EventMessageParserTest {
+ private static final String DIRECT_SQS_MESSAGE =
+ "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventTime\":\"2023-04-28T16:00:11.324Z\"," +
+ "\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:xyz\"},\"requestParameters\":{\"sourceIPAddress\":\"127.0.0.1\"}," +
+ "\"responseElements\":{\"x-amz-request-id\":\"xyz\",\"x-amz-id-2\":\"xyz\"},\"s3\":{\"s3SchemaVersion\":\"1.0\"," +
+ "\"configurationId\":\"xyz\",\"bucket\":{\"name\":\"my-bucket\",\"ownerIdentity\":{\"principalId\":\"ABC\"}," +
+ "\"arn\":\"arn:aws:s3:::my-bucket\"},\"object\":{\"key\":\"path/to/myfile.log.gz\",\"size\":3159112,\"eTag\":\"abcd123\"," +
+ "\"sequencer\":\"000\"}}}]}";
+
+ private static final String SNS_BASED_MESSAGE = "{\n" +
+ " \"Type\" : \"Notification\",\n" +
+ " \"MessageId\" : \"4e01e115-5b91-5096-8a74-bee95ed1e123\",\n" +
+ " \"TopicArn\" : \"arn:aws:sns:us-east-1:123456789012:notifications\",\n" +
+ " \"Subject\" : \"Amazon S3 Notification\",\n" +
+ " \"Message\" : \"{\\\"Records\\\":[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us-east-1\\\",\\\"eventTime\\\":\\\"2023-05-02T13:37:03.502Z\\\",\\\"eventName\\\":\\\"ObjectCreated:Put\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:ABC\\\"},\\\"requestParameters\\\":{\\\"sourceIPAddress\\\":\\\"127.0.0.1\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\"ABC\\\",\\\"x-amz-id-2\\\":\\\"ABC\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\"configurationId\\\":\\\"S3ToSnsTest\\\",\\\"bucket\\\":{\\\"name\\\":\\\"my-sns-bucket\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"ABC\\\"},\\\"arn\\\":\\\"arn:aws:s3:::my-sns-bucket\\\"},\\\"object\\\":{\\\"key\\\":\\\"path/to/testlogs.log.gz\\\",\\\"size\\\":25,\\\"eTag\\\":\\\"abc\\\",\\\"sequencer\\\":\\\"ABC\\\"}}}]}\",\n" +
+ " \"Timestamp\" : \"2023-05-02T13:37:04.554Z\",\n" +
+ " \"SignatureVersion\" : \"1\",\n" +
+ " \"Signature\" : \"x//abcde==\",\n" +
+ " \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService.pem\",\n" +
+ " \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:notifications:abc\"\n" +
+ "}";
+
+ private S3EventMessageParser createObjectUnderTest() {
+ return new S3EventMessageParser();
+ }
+
+ @Test
+ void parseMessage_returns_expected_S3EventNotification_from_SQS_message() throws JsonProcessingException {
+ final S3EventNotification s3EventNotification = createObjectUnderTest().parseMessage(DIRECT_SQS_MESSAGE);
+
+ assertThat(s3EventNotification, notNullValue());
+ assertThat(s3EventNotification.getRecords(), notNullValue());
+ assertThat(s3EventNotification.getRecords(), hasSize(1));
+ final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = s3EventNotification.getRecords().get(0);
+ assertThat(s3EventNotificationRecord, notNullValue());
+ assertThat(s3EventNotificationRecord.getEventName(), equalTo("ObjectCreated:Put"));
+ assertThat(s3EventNotificationRecord.getS3(), notNullValue());
+ assertThat(s3EventNotificationRecord.getS3().getBucket(), notNullValue());
+ assertThat(s3EventNotificationRecord.getS3().getBucket().getName(), equalTo("my-bucket"));
+ assertThat(s3EventNotificationRecord.getS3().getObject(), notNullValue());
+ assertThat(s3EventNotificationRecord.getS3().getObject().getKey(), equalTo("path/to/myfile.log.gz"));
+ }
+
+ @Test
+ void parseMessage_returns_expected_S3EventNotification_from_SNS_to_SQS_message() throws JsonProcessingException {
+ final S3EventNotification s3EventNotification = createObjectUnderTest().parseMessage(SNS_BASED_MESSAGE);
+
+ assertThat(s3EventNotification, notNullValue());
+ assertThat(s3EventNotification.getRecords(), notNullValue());
+ assertThat(s3EventNotification.getRecords(), hasSize(1));
+ final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = s3EventNotification.getRecords().get(0);
+ assertThat(s3EventNotificationRecord, notNullValue());
+ assertThat(s3EventNotificationRecord.getEventName(), equalTo("ObjectCreated:Put"));
+ assertThat(s3EventNotificationRecord.getS3(), notNullValue());
+ assertThat(s3EventNotificationRecord.getS3().getBucket(), notNullValue());
+ assertThat(s3EventNotificationRecord.getS3().getBucket().getName(), equalTo("my-sns-bucket"));
+ assertThat(s3EventNotificationRecord.getS3().getObject(), notNullValue());
+ assertThat(s3EventNotificationRecord.getS3().getObject().getKey(), equalTo("path/to/testlogs.log.gz"));
+ }
+
+ @Test
+ void parseMessage_throws_for_TestEvent() {
+ final String testEventMessage = "{\"Service\":\"Amazon S3\",\"Event\":\"s3:TestEvent\",\"Time\":\"2022-10-15T16:36:25.510Z\"," +
+ "\"Bucket\":\"bucketname\",\"RequestId\":\"abcdefg\",\"HostId\":\"hijklm\"}";
+
+ final S3EventMessageParser objectUnderTest = createObjectUnderTest();
+
+ assertThrows(JsonProcessingException.class, () -> objectUnderTest.parseMessage(testEventMessage));
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java
index 52e12f4b65..3230e62a80 100644
--- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java
+++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java
@@ -90,6 +90,7 @@ class SqsWorkerTest {
private Timer sqsMessageDelayTimer;
private AcknowledgementSetManager acknowledgementSetManager;
private AcknowledgementSet acknowledgementSet;
+ private S3EventMessageParser s3EventMessageParser;
@BeforeEach
void setUp() {
@@ -104,6 +105,8 @@ void setUp() {
AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);
+ s3EventMessageParser = new S3EventMessageParser();
+
SqsOptions sqsOptions = mock(SqsOptions.class);
when(sqsOptions.getSqsUrl()).thenReturn("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue");
@@ -124,7 +127,7 @@ void setUp() {
when(pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME)).thenReturn(sqsMessagesDeleteFailedCounter);
when(pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME)).thenReturn(sqsMessageDelayTimer);
- sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
+ sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff);
}
@AfterEach
@@ -188,7 +191,7 @@ void processSqsMessages_should_return_number_of_messages_processed(final String
void processSqsMessages_should_return_number_of_messages_processed_with_acknowledgements(final String eventName) throws IOException {
when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet);
when(s3SourceConfig.getAcknowledgements()).thenReturn(true);
- sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
+ sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff);
Instant startTime = Instant.now().minus(1, ChronoUnit.HOURS);
final Message message = mock(Message.class);
when(message.body()).thenReturn(createEventNotification(eventName, startTime));