Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading S3 Event messages from SNS fan-out #2622

Merged
merged 2 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class SqsWorkerIT {
private PluginMetrics pluginMetrics;
private S3ObjectGenerator s3ObjectGenerator;
private String bucket;
private S3EventMessageParser s3EventMessageParser;
private Backoff backoff;
private AcknowledgementSetManager acknowledgementSetManager;

Expand All @@ -58,6 +59,7 @@ void setUp() {
.build();
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
s3EventMessageParser = new S3EventMessageParser();

sqsClient = SqsClient.builder()
.region(Region.of(System.getProperty("tests.s3source.region")))
Expand Down Expand Up @@ -88,7 +90,7 @@ void setUp() {
}

private SqsWorker createObjectUnderTest() {
return new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
return new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Handles parsing of S3 Event messages.
*/
public class S3EventMessageParser {
private static final String SNS_MESSAGE_KEY = "Message";
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Parses a message body into an {@link S3EventNotification} class.
*
* @param messageBody Input body
* @return The parsed event notification
* @throws JsonProcessingException An exception with parsing the event notification.
*/
S3EventNotification parseMessage(final String messageBody) throws JsonProcessingException {
final JsonNode parsedNode = objectMapper.readTree(messageBody);

final JsonNode eventNode = getS3EventNode(parsedNode);

return objectMapper.treeToValue(eventNode, S3EventNotification.class);
}

private JsonNode getS3EventNode(final JsonNode parsedNode) throws JsonProcessingException {
if(isSnsWrappedMessage(parsedNode)) {
final String messageString = parsedNode.get(SNS_MESSAGE_KEY).asText();
return objectMapper.readValue(messageString, JsonNode.class);
}

return parsedNode;
}

private boolean isSnsWrappedMessage(final JsonNode parsedNode) {
return parsedNode.has(SNS_MESSAGE_KEY);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also validate the type of the message or else we are trying to parse any JOSN node with Message? Looks like the JSON object will have "Type" : "Notification" field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine if the Type doesn't match. If the data comes and there is a Message which is not what we expect, we will still have a JSON parsing exception.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, just looked at the failure scenario looks like we log the following if there is a failure.
LOG.error("SQS message with message ID:{} has invalid body which cannot be parsed into S3EventNotification. {}.", message.messageId(), e.getMessage());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@

package org.opensearch.dataprepper.plugins.source;

import java.util.List;

import org.joda.time.DateTime;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.joda.time.DateTime;
import software.amazon.awssdk.utils.http.SdkHttpUtils;

import java.util.List;

/**
* A helper class that represents a strongly typed S3 EventNotification item sent
* to SQS, SNS, or Lambda.
*
* <p>
* This class is derived from <code>S3EventNotification</code> in the AWS SDKv1 for Java.
*/
public class S3EventNotification {
Expand All @@ -34,30 +31,6 @@ public S3EventNotification(
this.records = records;
}

/**
* <p>
* Parse the JSON string into a S3EventNotification object.
* </p>
* <p>
* The function will try its best to parse input JSON string as best as it can.
* It will not fail even if the JSON string contains unknown properties.
* The function will throw SdkClientException if the input JSON string is
* not valid JSON.
* </p>
* @param json
* JSON string to parse. Typically this is the body of your SQS
* notification message body.
*
* @return The resulting S3EventNotification object.
*/
public static S3EventNotification parseJson(String json) throws JsonProcessingException {
if (json == null) {
return null;
}
final ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(json, S3EventNotification.class);
}

/**
* @return the records in this notification
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class SqsService {
private final SqsClient sqsClient;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final S3EventMessageParser s3EventMessageParser;

private Thread sqsWorkerThread;

Expand All @@ -39,12 +40,13 @@ public SqsService(final AcknowledgementSetManager acknowledgementSetManager,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sqsClient = createSqsClient();
s3EventMessageParser = new S3EventMessageParser();
}

public void start() {
final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
sqsWorkerThread = new Thread(new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, backoff));
sqsWorkerThread = new Thread(new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff));
sqsWorkerThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class SqsWorker implements Runnable {
private final Counter sqsMessagesDeleteFailedCounter;
private final Counter acknowledgementSetCallbackCounter;
private final Timer sqsMessageDelayTimer;
private final S3EventMessageParser s3EventMessageParser;
private final Backoff standardBackoff;
private int failedAttemptCount;
private boolean endToEndAcknowledgementsEnabled;
Expand All @@ -71,11 +72,13 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
final S3Service s3Service,
final S3SourceConfig s3SourceConfig,
final PluginMetrics pluginMetrics,
final S3EventMessageParser s3EventMessageParser,
final Backoff backoff) {
this.sqsClient = sqsClient;
this.s3Service = s3Service;
this.s3SourceConfig = s3SourceConfig;
this.acknowledgementSetManager = acknowledgementSetManager;
this.s3EventMessageParser = s3EventMessageParser;
this.standardBackoff = backoff;
this.endToEndAcknowledgementsEnabled = s3SourceConfig.getAcknowledgements();
sqsOptions = s3SourceConfig.getSqsOptions();
Expand Down Expand Up @@ -178,7 +181,7 @@ private Collection<ParsedMessage> getS3MessageEventNotificationRecords(final Lis

private ParsedMessage convertS3EventMessages(final Message message) {
try {
final S3EventNotification s3EventNotification = S3EventNotification.parseJson(message.body());
final S3EventNotification s3EventNotification = s3EventMessageParser.parseMessage(message.body());
if (s3EventNotification.getRecords() != null)
return new ParsedMessage(message, s3EventNotification.getRecords());
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertThrows;

class S3EventMessageParserTest {
private static final String DIRECT_SQS_MESSAGE =
"{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventTime\":\"2023-04-28T16:00:11.324Z\"," +
"\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:xyz\"},\"requestParameters\":{\"sourceIPAddress\":\"127.0.0.1\"}," +
"\"responseElements\":{\"x-amz-request-id\":\"xyz\",\"x-amz-id-2\":\"xyz\"},\"s3\":{\"s3SchemaVersion\":\"1.0\"," +
"\"configurationId\":\"xyz\",\"bucket\":{\"name\":\"my-bucket\",\"ownerIdentity\":{\"principalId\":\"ABC\"}," +
"\"arn\":\"arn:aws:s3:::my-bucket\"},\"object\":{\"key\":\"path/to/myfile.log.gz\",\"size\":3159112,\"eTag\":\"abcd123\"," +
"\"sequencer\":\"000\"}}}]}";

private static final String SNS_BASED_MESSAGE = "{\n" +
" \"Type\" : \"Notification\",\n" +
" \"MessageId\" : \"4e01e115-5b91-5096-8a74-bee95ed1e123\",\n" +
" \"TopicArn\" : \"arn:aws:sns:us-east-1:123456789012:notifications\",\n" +
" \"Subject\" : \"Amazon S3 Notification\",\n" +
" \"Message\" : \"{\\\"Records\\\":[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us-east-1\\\",\\\"eventTime\\\":\\\"2023-05-02T13:37:03.502Z\\\",\\\"eventName\\\":\\\"ObjectCreated:Put\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:ABC\\\"},\\\"requestParameters\\\":{\\\"sourceIPAddress\\\":\\\"127.0.0.1\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\"ABC\\\",\\\"x-amz-id-2\\\":\\\"ABC\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\"configurationId\\\":\\\"S3ToSnsTest\\\",\\\"bucket\\\":{\\\"name\\\":\\\"my-sns-bucket\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"ABC\\\"},\\\"arn\\\":\\\"arn:aws:s3:::my-sns-bucket\\\"},\\\"object\\\":{\\\"key\\\":\\\"path/to/testlogs.log.gz\\\",\\\"size\\\":25,\\\"eTag\\\":\\\"abc\\\",\\\"sequencer\\\":\\\"ABC\\\"}}}]}\",\n" +
" \"Timestamp\" : \"2023-05-02T13:37:04.554Z\",\n" +
" \"SignatureVersion\" : \"1\",\n" +
" \"Signature\" : \"x//abcde==\",\n" +
" \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService.pem\",\n" +
" \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:notifications:abc\"\n" +
"}";

private S3EventMessageParser createObjectUnderTest() {
return new S3EventMessageParser();
}

@Test
void parseMessage_returns_expected_S3EventNotification_from_SQS_message() throws JsonProcessingException {
final S3EventNotification s3EventNotification = createObjectUnderTest().parseMessage(DIRECT_SQS_MESSAGE);

assertThat(s3EventNotification, notNullValue());
assertThat(s3EventNotification.getRecords(), notNullValue());
assertThat(s3EventNotification.getRecords(), hasSize(1));
final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = s3EventNotification.getRecords().get(0);
assertThat(s3EventNotificationRecord, notNullValue());
assertThat(s3EventNotificationRecord.getEventName(), equalTo("ObjectCreated:Put"));
assertThat(s3EventNotificationRecord.getS3(), notNullValue());
assertThat(s3EventNotificationRecord.getS3().getBucket(), notNullValue());
assertThat(s3EventNotificationRecord.getS3().getBucket().getName(), equalTo("my-bucket"));
assertThat(s3EventNotificationRecord.getS3().getObject(), notNullValue());
assertThat(s3EventNotificationRecord.getS3().getObject().getKey(), equalTo("path/to/myfile.log.gz"));
}

@Test
void parseMessage_returns_expected_S3EventNotification_from_SNS_to_SQS_message() throws JsonProcessingException {
final S3EventNotification s3EventNotification = createObjectUnderTest().parseMessage(SNS_BASED_MESSAGE);

assertThat(s3EventNotification, notNullValue());
assertThat(s3EventNotification.getRecords(), notNullValue());
assertThat(s3EventNotification.getRecords(), hasSize(1));
final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = s3EventNotification.getRecords().get(0);
assertThat(s3EventNotificationRecord, notNullValue());
assertThat(s3EventNotificationRecord.getEventName(), equalTo("ObjectCreated:Put"));
assertThat(s3EventNotificationRecord.getS3(), notNullValue());
assertThat(s3EventNotificationRecord.getS3().getBucket(), notNullValue());
assertThat(s3EventNotificationRecord.getS3().getBucket().getName(), equalTo("my-sns-bucket"));
assertThat(s3EventNotificationRecord.getS3().getObject(), notNullValue());
assertThat(s3EventNotificationRecord.getS3().getObject().getKey(), equalTo("path/to/testlogs.log.gz"));
}

@Test
void parseMessage_throws_for_TestEvent() {
final String testEventMessage = "{\"Service\":\"Amazon S3\",\"Event\":\"s3:TestEvent\",\"Time\":\"2022-10-15T16:36:25.510Z\"," +
"\"Bucket\":\"bucketname\",\"RequestId\":\"abcdefg\",\"HostId\":\"hijklm\"}";

final S3EventMessageParser objectUnderTest = createObjectUnderTest();

assertThrows(JsonProcessingException.class, () -> objectUnderTest.parseMessage(testEventMessage));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class SqsWorkerTest {
private Timer sqsMessageDelayTimer;
private AcknowledgementSetManager acknowledgementSetManager;
private AcknowledgementSet acknowledgementSet;
private S3EventMessageParser s3EventMessageParser;

@BeforeEach
void setUp() {
Expand All @@ -104,6 +105,8 @@ void setUp() {
AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);

s3EventMessageParser = new S3EventMessageParser();

SqsOptions sqsOptions = mock(SqsOptions.class);
when(sqsOptions.getSqsUrl()).thenReturn("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue");

Expand All @@ -124,7 +127,7 @@ void setUp() {
when(pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME)).thenReturn(sqsMessagesDeleteFailedCounter);
when(pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME)).thenReturn(sqsMessageDelayTimer);

sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff);
}

@AfterEach
Expand Down Expand Up @@ -188,7 +191,7 @@ void processSqsMessages_should_return_number_of_messages_processed(final String
void processSqsMessages_should_return_number_of_messages_processed_with_acknowledgements(final String eventName) throws IOException {
when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet);
when(s3SourceConfig.getAcknowledgements()).thenReturn(true);
sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Service, s3SourceConfig, pluginMetrics, s3EventMessageParser, backoff);
Instant startTime = Instant.now().minus(1, ChronoUnit.HOURS);
final Message message = mock(Message.class);
when(message.body()).thenReturn(createEventNotification(eventName, startTime));
Expand Down