diff --git a/aws-lambda-java-events/README.md b/aws-lambda-java-events/README.md index 8c60b3a2..aaa2a38f 100644 --- a/aws-lambda-java-events/README.md +++ b/aws-lambda-java-events/README.md @@ -44,6 +44,8 @@ * `KinesisFirehoseEvent` * `LambdaDestinationEvent` * `LexEvent` +* `MSKFirehoseEvent` +* `MSKFirehoseResponse` * `RabbitMQEvent` * `S3BatchEvent` * `S3BatchResponse` diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java new file mode 100644 index 00000000..1af40ce4 --- /dev/null +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java @@ -0,0 +1,51 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package com.amazonaws.services.lambda.runtime.events; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder(setterPrefix = "with") +@NoArgsConstructor +@AllArgsConstructor + +public class MSKFirehoseEvent { + + private String invocationId; + + private String deliveryStreamArn; + + private String sourceMSKArn; + + private String region; + + private List records; + + @Data + @Builder(setterPrefix = "with") + @NoArgsConstructor + @AllArgsConstructor + public static class Record { + + private ByteBuffer kafkaRecordValue; + + private String recordId; + + private Long approximateArrivalEpoch; + + private Long approximateArrivalTimestamp; + + private Map mskRecordMetadata; + + } +} diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java new file mode 100644 index 00000000..18b5aa13 --- /dev/null +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java @@ -0,0 +1,61 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package com.amazonaws.services.lambda.runtime.events; + +import java.nio.ByteBuffer; +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Response model for Amazon Data Firehose Lambda transformation with MSK as a source. + * [+] Amazon Data Firehose Data Transformation - Data Transformation and Status Model - ... + * OK : Indicates that processing of this item succeeded. + * ProcessingFailed : Indicate that the processing of this item failed. + * Dropped : Indicates that this item should be silently dropped + */ + +@Data +@Builder(setterPrefix = "with") +@NoArgsConstructor +@AllArgsConstructor + +public class MSKFirehoseResponse { + + public enum Result { + + /** + * Indicates that processing of this item succeeded. + */ + Ok, + + /** + * Indicate that the processing of this item failed + */ + ProcessingFailed, + + /** + * Indicates that this item should be silently dropped + */ + Dropped + } + public List records; + + @Data + @NoArgsConstructor + @Builder(setterPrefix = "with") + @AllArgsConstructor + + public static class Record { + public String recordId; + public Result result; + public ByteBuffer kafkaRecordValue; + + } +} diff --git a/aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java b/aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java index aa600749..601d2f3f 100644 --- a/aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java +++ b/aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java @@ -89,6 +89,10 @@ public static LexEvent loadLexEvent(String filename) { return loadEvent(filename, LexEvent.class); } + public static MSKFirehoseEvent loadMSKFirehoseEvent(String filename) { + return loadEvent(filename, MSKFirehoseEvent.class); + } + public static S3Event loadS3Event(String filename) { return loadEvent(filename, S3Event.class); } diff --git a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java index 1c9d17e1..12dc436c 100644 --- a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java +++ b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java @@ -118,6 +118,19 @@ public void testLoadKinesisFirehoseEvent() { assertThat(event.getRecords().get(0).getData().array()).asString().isEqualTo("Hello, this is a test 123."); } + @Test + public void testLoadMSKFirehoseEvent() { + MSKFirehoseEvent event = EventLoader.loadMSKFirehoseEvent("msk_firehose_event.json"); + + assertThat(event).isNotNull(); + assertThat(event.getSourceMSKArn()).isEqualTo("arn:aws:kafka:EXAMPLE"); + assertThat(event.getDeliveryStreamArn()).isEqualTo("arn:aws:firehose:EXAMPLE"); + assertThat(event.getRecords()).hasSize(1); + assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{\"Name\":\"Hello World\"}"); + assertThat(event.getRecords().get(0).getApproximateArrivalTimestamp()).asString().isEqualTo("1716369573887"); + assertThat(event.getRecords().get(0).getMskRecordMetadata()).asString().isEqualTo("{offset=0, partitionId=1, approximateArrivalTimestamp=1716369573887}"); + } + @Test public void testLoadS3Event() { S3Event event = EventLoader.loadS3Event("s3_event.json"); diff --git a/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json b/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json new file mode 100644 index 00000000..6b839912 --- /dev/null +++ b/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json @@ -0,0 +1,18 @@ +{ + "invocationId": "12345621-4787-0000-a418-36e56Example", + "sourceMSKArn": "arn:aws:kafka:EXAMPLE", + "deliveryStreamArn": "arn:aws:firehose:EXAMPLE", + "region": "us-east-1", + "records": [ + { + "recordId": "00000000000000000000000000000000000000000000000000000000000000", + "approximateArrivalTimestamp": 1716369573887, + "mskRecordMetadata": { + "offset": "0", + "partitionId": "1", + "approximateArrivalTimestamp": 1716369573887 + }, + "kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ==" + } + ] +} diff --git a/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java new file mode 100644 index 00000000..f5e51349 --- /dev/null +++ b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java @@ -0,0 +1,39 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package example; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent; +import org.json.JSONObject; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * A sample MSKFirehoseEvent handler + * For more information see the developer guide - ... + */ +public class MSKFirehoseEventHandler implements RequestHandler { + + @Override + public MSKFirehoseResponse handleRequest(MSKFirehoseEvent MSKFirehoseEvent, Context context) { + List records = new ArrayList<>(); + + for (MSKFirehoseEvent.Record record : MSKFirehoseEvent.getRecords()) { + String recordData = new String(record.getKafkaRecordValue().array()); + // Your business logic + JSONObject jsonObject = new JSONObject(recordData); + records.add(new MSKFirehoseResponse.Record(record.getRecordId(), MSKFirehoseResponse.Result.Ok, encode(jsonObject.toString()))); + } + return new MSKFirehoseResponse(records); + } + private ByteBuffer encode(String content) { + return ByteBuffer.wrap(content.getBytes()); + } +} diff --git a/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java b/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java new file mode 100644 index 00000000..77223e51 --- /dev/null +++ b/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java @@ -0,0 +1,32 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package example; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class MSKFirehoseEventHandlerTest { + + private Context context; // intentionally null as it's not used in the test + + @ParameterizedTest + @Event(value = "event.json", type = MSKFirehoseEvent.class) + public void testEventHandler(MSKFirehoseEvent event) { + MSKFirehoseEventHandler Sample = new MSKFirehoseEventHandler(); + MSKFirehoseResponse response = Sample.handleRequest(event, context); + + String expectedString = "{\"Name\":\"Hello World\"}"; + MSKFirehoseResponse.Record firstRecord = response.getRecords().get(0); + Assertions.assertEquals(expectedString, UTF_8.decode(firstRecord.getKafkaRecordValue()).toString()); + Assertions.assertEquals(MSKFirehoseResponse.Result.Ok, firstRecord.getResult()); + } +} diff --git a/samples/msk-firehose-event-handler/src/test/resources/event.json b/samples/msk-firehose-event-handler/src/test/resources/event.json new file mode 100644 index 00000000..91c4b420 --- /dev/null +++ b/samples/msk-firehose-event-handler/src/test/resources/event.json @@ -0,0 +1,18 @@ +{ + "invocationId": "12345621-4787-0000-a418-36e56Example", + "sourceMSKArn": "", + "deliveryStreamArn": "", + "region": "us-east-1", + "records": [ + { + "recordId": "00000000000000000000000000000000000000000000000000000000000000", + "approximateArrivalTimestamp": 1716369573887, + "mskRecordMetadata": { + "offset": "0", + "partitionId": "1", + "approximateArrivalTimestamp": 1716369573887 + }, + "kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ==" + } + ] +}