From 3ff2ee910c47189212fc58dd23bf5e6a7f047804 Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 2 May 2023 09:10:07 -0500 Subject: [PATCH 1/2] Support reading S3 Event messages which can from SNS to SQS if the message is wrapped in the Message key. Signed-off-by: David Venable --- .../plugins/source/SqsWorkerIT.java | 4 +- .../plugins/source/S3EventMessageParser.java | 46 ++++++++++ .../plugins/source/S3EventNotification.java | 26 +----- .../plugins/source/SqsService.java | 4 +- .../dataprepper/plugins/source/SqsWorker.java | 5 +- .../source/S3EventMessageParserTest.java | 86 +++++++++++++++++++ .../plugins/source/SqsWorkerTest.java | 7 +- 7 files changed, 148 insertions(+), 30 deletions(-) create mode 100644 data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParser.java create mode 100644 data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParserTest.java diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java index 85b7ca8f1b..294c41ea2b 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java @@ -47,6 +47,7 @@ class SqsWorkerIT { private PluginMetrics pluginMetrics; private S3ObjectGenerator s3ObjectGenerator; private String bucket; + private S3EventMessageParser s3EventMessageParser; private Backoff backoff; private AcknowledgementSetManager acknowledgementSetManager; @@ -58,6 +59,7 @@ void setUp() { .build(); bucket = System.getProperty("tests.s3source.bucket"); s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket); + s3EventMessageParser = new S3EventMessageParser(); sqsClient = SqsClient.builder() .region(Region.of(System.getProperty("tests.s3source.region"))) @@ -88,7 +90,7 @@ void setUp() { } private SqsWorker createObjectUnderTest() { - return new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff); + return new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff); } @AfterEach diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParser.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParser.java new file mode 100644 index 0000000000..a7b3d72190 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParser.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Handles parsing of S3 Event messages. + */ +public class S3EventMessageParser { + private static final String SNS_MESSAGE_KEY = "Message"; + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Parses a message body into an {@link S3EventNotification} class. + * + * @param messageBody Input body + * @return The parsed event notification + * @throws JsonProcessingException An exception with parsing the event notification. + */ + S3EventNotification parseMessage(final String messageBody) throws JsonProcessingException { + final JsonNode parsedNode = objectMapper.readTree(messageBody); + + final JsonNode eventNode = getS3EventNode(parsedNode); + + return objectMapper.treeToValue(eventNode, S3EventNotification.class); + } + + private JsonNode getS3EventNode(final JsonNode parsedNode) throws JsonProcessingException { + if(isSnsWrappedMessage(parsedNode)) { + final String messageString = parsedNode.get(SNS_MESSAGE_KEY).asText(); + return objectMapper.readValue(messageString, JsonNode.class); + } + + return parsedNode; + } + + private boolean isSnsWrappedMessage(final JsonNode parsedNode) { + return parsedNode.has(SNS_MESSAGE_KEY); + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java index 09670f4b39..75334fe40a 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java @@ -20,7 +20,7 @@ /** * A helper class that represents a strongly typed S3 EventNotification item sent * to SQS, SNS, or Lambda. - * + *

* This class is derived from S3EventNotification in the AWS SDKv1 for Java. */ public class S3EventNotification { @@ -34,30 +34,6 @@ public S3EventNotification( this.records = records; } - /** - *

- * Parse the JSON string into a S3EventNotification object. - *

- *

- * The function will try its best to parse input JSON string as best as it can. - * It will not fail even if the JSON string contains unknown properties. - * The function will throw SdkClientException if the input JSON string is - * not valid JSON. - *

- * @param json - * JSON string to parse. Typically this is the body of your SQS - * notification message body. - * - * @return The resulting S3EventNotification object. - */ - public static S3EventNotification parseJson(String json) throws JsonProcessingException { - if (json == null) { - return null; - } - final ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(json, S3EventNotification.class); - } - /** * @return the records in this notification */ diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java index 371c353d5c..f083249c13 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java @@ -27,6 +27,7 @@ public class SqsService { private final SqsClient sqsClient; private final PluginMetrics pluginMetrics; private final AcknowledgementSetManager acknowledgementSetManager; + private final S3EventMessageParser s3EventMessageParser; private Thread sqsWorkerThread; @@ -39,12 +40,13 @@ public SqsService(final AcknowledgementSetManager acknowledgementSetManager, this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.sqsClient = createSqsClient(); + s3EventMessageParser = new S3EventMessageParser(); } public void start() { final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE) .withMaxAttempts(Integer.MAX_VALUE); - sqsWorkerThread = new Thread(new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, backoff)); + sqsWorkerThread = new Thread(new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff)); sqsWorkerThread.start(); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java index 93e6a0931b..9ee5d95068 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java @@ -61,6 +61,7 @@ public class SqsWorker implements Runnable { private final Counter sqsMessagesDeleteFailedCounter; private final Counter acknowledgementSetCallbackCounter; private final Timer sqsMessageDelayTimer; + private final S3EventMessageParser s3EventMessageParser; private final Backoff standardBackoff; private int failedAttemptCount; private boolean endToEndAcknowledgementsEnabled; @@ -71,11 +72,13 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, final S3Service s3Service, final S3SourceConfig s3SourceConfig, final PluginMetrics pluginMetrics, + final S3EventMessageParser s3EventMessageParser, final Backoff backoff) { this.sqsClient = sqsClient; this.s3Service = s3Service; this.s3SourceConfig = s3SourceConfig; this.acknowledgementSetManager = acknowledgementSetManager; + this.s3EventMessageParser = s3EventMessageParser; this.standardBackoff = backoff; this.endToEndAcknowledgementsEnabled = s3SourceConfig.getAcknowledgements(); sqsOptions = s3SourceConfig.getSqsOptions(); @@ -178,7 +181,7 @@ private Collection getS3MessageEventNotificationRecords(final Lis private ParsedMessage convertS3EventMessages(final Message message) { try { - final S3EventNotification s3EventNotification = S3EventNotification.parseJson(message.body()); + final S3EventNotification s3EventNotification = s3EventMessageParser.parseMessage(message.body()); if (s3EventNotification.getRecords() != null) return new ParsedMessage(message, s3EventNotification.getRecords()); else { diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParserTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParserTest.java new file mode 100644 index 0000000000..b14c8ac7f8 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3EventMessageParserTest.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class S3EventMessageParserTest { + private static final String DIRECT_SQS_MESSAGE = + "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventTime\":\"2023-04-28T16:00:11.324Z\"," + + "\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:xyz\"},\"requestParameters\":{\"sourceIPAddress\":\"127.0.0.1\"}," + + "\"responseElements\":{\"x-amz-request-id\":\"xyz\",\"x-amz-id-2\":\"xyz\"},\"s3\":{\"s3SchemaVersion\":\"1.0\"," + + "\"configurationId\":\"xyz\",\"bucket\":{\"name\":\"my-bucket\",\"ownerIdentity\":{\"principalId\":\"ABC\"}," + + "\"arn\":\"arn:aws:s3:::my-bucket\"},\"object\":{\"key\":\"path/to/myfile.log.gz\",\"size\":3159112,\"eTag\":\"abcd123\"," + + "\"sequencer\":\"000\"}}}]}"; + + private static final String SNS_BASED_MESSAGE = "{\n" + + " \"Type\" : \"Notification\",\n" + + " \"MessageId\" : \"4e01e115-5b91-5096-8a74-bee95ed1e123\",\n" + + " \"TopicArn\" : \"arn:aws:sns:us-east-1:123456789012:notifications\",\n" + + " \"Subject\" : \"Amazon S3 Notification\",\n" + + " \"Message\" : \"{\\\"Records\\\":[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us-east-1\\\",\\\"eventTime\\\":\\\"2023-05-02T13:37:03.502Z\\\",\\\"eventName\\\":\\\"ObjectCreated:Put\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:ABC\\\"},\\\"requestParameters\\\":{\\\"sourceIPAddress\\\":\\\"127.0.0.1\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\"ABC\\\",\\\"x-amz-id-2\\\":\\\"ABC\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\"configurationId\\\":\\\"S3ToSnsTest\\\",\\\"bucket\\\":{\\\"name\\\":\\\"my-sns-bucket\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"ABC\\\"},\\\"arn\\\":\\\"arn:aws:s3:::my-sns-bucket\\\"},\\\"object\\\":{\\\"key\\\":\\\"path/to/testlogs.log.gz\\\",\\\"size\\\":25,\\\"eTag\\\":\\\"abc\\\",\\\"sequencer\\\":\\\"ABC\\\"}}}]}\",\n" + + " \"Timestamp\" : \"2023-05-02T13:37:04.554Z\",\n" + + " \"SignatureVersion\" : \"1\",\n" + + " \"Signature\" : \"x//abcde==\",\n" + + " \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService.pem\",\n" + + " \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:notifications:abc\"\n" + + "}"; + + private S3EventMessageParser createObjectUnderTest() { + return new S3EventMessageParser(); + } + + @Test + void parseMessage_returns_expected_S3EventNotification_from_SQS_message() throws JsonProcessingException { + final S3EventNotification s3EventNotification = createObjectUnderTest().parseMessage(DIRECT_SQS_MESSAGE); + + assertThat(s3EventNotification, notNullValue()); + assertThat(s3EventNotification.getRecords(), notNullValue()); + assertThat(s3EventNotification.getRecords(), hasSize(1)); + final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = s3EventNotification.getRecords().get(0); + assertThat(s3EventNotificationRecord, notNullValue()); + assertThat(s3EventNotificationRecord.getEventName(), equalTo("ObjectCreated:Put")); + assertThat(s3EventNotificationRecord.getS3(), notNullValue()); + assertThat(s3EventNotificationRecord.getS3().getBucket(), notNullValue()); + assertThat(s3EventNotificationRecord.getS3().getBucket().getName(), equalTo("my-bucket")); + assertThat(s3EventNotificationRecord.getS3().getObject(), notNullValue()); + assertThat(s3EventNotificationRecord.getS3().getObject().getKey(), equalTo("path/to/myfile.log.gz")); + } + + @Test + void parseMessage_returns_expected_S3EventNotification_from_SNS_to_SQS_message() throws JsonProcessingException { + final S3EventNotification s3EventNotification = createObjectUnderTest().parseMessage(SNS_BASED_MESSAGE); + + assertThat(s3EventNotification, notNullValue()); + assertThat(s3EventNotification.getRecords(), notNullValue()); + assertThat(s3EventNotification.getRecords(), hasSize(1)); + final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = s3EventNotification.getRecords().get(0); + assertThat(s3EventNotificationRecord, notNullValue()); + assertThat(s3EventNotificationRecord.getEventName(), equalTo("ObjectCreated:Put")); + assertThat(s3EventNotificationRecord.getS3(), notNullValue()); + assertThat(s3EventNotificationRecord.getS3().getBucket(), notNullValue()); + assertThat(s3EventNotificationRecord.getS3().getBucket().getName(), equalTo("my-sns-bucket")); + assertThat(s3EventNotificationRecord.getS3().getObject(), notNullValue()); + assertThat(s3EventNotificationRecord.getS3().getObject().getKey(), equalTo("path/to/testlogs.log.gz")); + } + + @Test + void parseMessage_throws_for_TestEvent() { + final String testEventMessage = "{\"Service\":\"Amazon S3\",\"Event\":\"s3:TestEvent\",\"Time\":\"2022-10-15T16:36:25.510Z\"," + + "\"Bucket\":\"bucketname\",\"RequestId\":\"abcdefg\",\"HostId\":\"hijklm\"}"; + + final S3EventMessageParser objectUnderTest = createObjectUnderTest(); + + assertThrows(JsonProcessingException.class, () -> objectUnderTest.parseMessage(testEventMessage)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java index 52e12f4b65..3230e62a80 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java @@ -90,6 +90,7 @@ class SqsWorkerTest { private Timer sqsMessageDelayTimer; private AcknowledgementSetManager acknowledgementSetManager; private AcknowledgementSet acknowledgementSet; + private S3EventMessageParser s3EventMessageParser; @BeforeEach void setUp() { @@ -104,6 +105,8 @@ void setUp() { AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + s3EventMessageParser = new S3EventMessageParser(); + SqsOptions sqsOptions = mock(SqsOptions.class); when(sqsOptions.getSqsUrl()).thenReturn("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"); @@ -124,7 +127,7 @@ void setUp() { when(pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME)).thenReturn(sqsMessagesDeleteFailedCounter); when(pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME)).thenReturn(sqsMessageDelayTimer); - sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff); + sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff); } @AfterEach @@ -188,7 +191,7 @@ void processSqsMessages_should_return_number_of_messages_processed(final String void processSqsMessages_should_return_number_of_messages_processed_with_acknowledgements(final String eventName) throws IOException { when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); when(s3SourceConfig.getAcknowledgements()).thenReturn(true); - sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff); + sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff); Instant startTime = Instant.now().minus(1, ChronoUnit.HOURS); final Message message = mock(Message.class); when(message.body()).thenReturn(createEventNotification(eventName, startTime)); From 218bf0a6e8267a38afe55b15eac687a90415af9e Mon Sep 17 00:00:00 2001 From: David Venable Date: Sat, 6 May 2023 09:08:02 -0500 Subject: [PATCH 2/2] Optimized imports to remove unused imports. Signed-off-by: David Venable --- .../dataprepper/plugins/source/S3EventNotification.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java index 75334fe40a..0e81f0f2dc 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3EventNotification.java @@ -5,18 +5,15 @@ package org.opensearch.dataprepper.plugins.source; -import java.util.List; - -import org.joda.time.DateTime; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.joda.time.DateTime; import software.amazon.awssdk.utils.http.SdkHttpUtils; +import java.util.List; + /** * A helper class that represents a strongly typed S3 EventNotification item sent * to SQS, SNS, or Lambda.