From ec8dca0514a35d9b1472dffdb3fbf0c0d215b3c3 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Mon, 1 Jul 2024 13:23:42 +0530 Subject: [PATCH 01/28] Create MskFirehoseEvent.java --- .../runtime/events/MskFirehoseEvent.java | 452 ++++++++++++++++++ 1 file changed, 452 insertions(+) create mode 100644 aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java new file mode 100644 index 00000000..e5820fe9 --- /dev/null +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java @@ -0,0 +1,452 @@ +/* + * Copyright 2012-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with + * the License. A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package events; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +/** + * Created by vermshas on 6/28/24. + */ +public class MskFirehoseEvent implements Serializable, Cloneable { + + private static final long serialVersionUID = -2890373471008001695L; + + private String invocationId; + + private String deliveryStreamArn; + + private String region; + + private List records; + + public static class Record implements Serializable, Cloneable { + + private static final long serialVersionUID = -7231161900431910379L; + + /** + *

+ * The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before + * base64-encoding, is 1,000 KB. + *

+ */ + private ByteBuffer kafkaRecordValue; + + private String recordId; + + private Long approximateArrivalEpoch; + + private Long approximateArrivalTimestamp; + + private Map mskRecordMetadata; + + /** + * default constructor + */ + public Record() {} + + /** + *

+ * The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before + * base64-encoding, is 1,000 KB. + *

+ *

+ * The AWS SDK for Java performs a Base64 encoding on this field before sending this request to the AWS service. + * Users of the SDK should not perform Base64 encoding on this field. + *

+ *

+ * Warning: ByteBuffers returned by the SDK are mutable. Changes to the content or position of the byte buffer will + * be seen by all objects that have a reference to this object. It is recommended to call ByteBuffer.duplicate() or + * ByteBuffer.asReadOnlyBuffer() before using or reading from the buffer. This behavior will be changed in a future + * major version of the SDK. + *

+ * + * @param kafkaRecordValue + * The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, + * before base64-encoding, is 1,000 KB. + */ + public void setKafkaRecordValue(ByteBuffer kafkaRecordValue) { + this.kafkaRecordValue = kafkaRecordValue; + } + + /** + *

+ * The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before + * base64-encoding, is 1,000 KB. + *

+ *

+ * {@code ByteBuffer}s are stateful. Calling their {@code get} methods changes their {@code position}. We recommend + * using {@link java.nio.ByteBuffer#asReadOnlyBuffer()} to create a read-only view of the buffer with an independent + * {@code position}, and calling {@code get} methods on this rather than directly on the returned {@code ByteBuffer}. + * Doing so will ensure that anyone else using the {@code ByteBuffer} will not be affected by changes to the + * {@code position}. + *

+ * + * @return The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, + * before base64-encoding, is 1,000 KB. + */ + public ByteBuffer getKafkaRecordValue() { + return this.kafkaRecordValue; + } + + /** + * @return record id + */ + public String getRecordId() { + return this.recordId; + } + + /** + * @param recordId record id + */ + public void setRecordId(String recordId) { + this.recordId = recordId; + } + + /** + * @param recordId record id + * @return Record + */ + public Record withRecordId(String recordId) { + setRecordId(recordId); + return this; + } + + /** + * @return approximate arrival epoch + */ + public Long getApproximateArrivalEpoch() { + return this.approximateArrivalEpoch; + } + + /** + * @param approximateArrivalEpoch Long epoch + */ + public void setApproximateArrivalEpoch(Long approximateArrivalEpoch) { + this.approximateArrivalEpoch = approximateArrivalEpoch; + } + + /** + * @param approximateArrivalEpoch Long epoch + * @return Record + */ + public Record withApproximateArrivalEpoch(Long approximateArrivalEpoch) { + setApproximateArrivalEpoch(approximateArrivalEpoch); + return this; + } + + /** + * @return approximate arrival timestamp + */ + public Long getApproximateArrivalTimestamp() { + return this.approximateArrivalTimestamp; + } + + /** + * @param approximateArrivalTimestamp approximate arrival timestamp + */ + public void setApproximateArrivalTimestamp(Long approximateArrivalTimestamp) { + this.approximateArrivalTimestamp = approximateArrivalTimestamp; + } + + /** + * @param approximateArrivalTimestamp approximate arrival timestamp + * @return Record + */ + public Record withApproximateArrivalTimestamp(Long approximateArrivalTimestamp) { + setApproximateArrivalTimestamp(approximateArrivalTimestamp); + return this; + } + + /** + * @return Msk record meta data + */ + public Map getMskRecordMetadata() { + return this.mskRecordMetadata; + } + + /** + * @param mskRecordMetadata Msk record metadata + */ + public void setMskRecordMetadata(Map mskRecordMetadata) { + this.mskRecordMetadata = mskRecordMetadata; + } + + /** + * @param mskRecordMetadata Msk record metadata + * @return Record + */ + public Record withMskRecordMetadata(Map mskRecordMetadata) { + setMskRecordMetadata(mskRecordMetadata); + return this; + } + + /** + * Returns a string representation of this object; useful for testing and debugging. + * + * @return A string representation of this object. + * + * @see Object#toString() + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + if (getKafkaRecordValue() != null) + sb.append("data: ").append(getKafkaRecordValue().toString()).append(","); + if (getRecordId() != null) + sb.append("recordId: ").append(getRecordId()).append(","); + if (getApproximateArrivalEpoch() != null) + sb.append("approximateArrivalEpoch: ").append(getApproximateArrivalEpoch().toString()).append(","); + if (getApproximateArrivalTimestamp() != null) + sb.append("approximateArrivalTimestamp: ").append(getApproximateArrivalTimestamp().toString()).append(","); + if (getMskRecordMetadata() != null) + sb.append("mskRecordMetadata: ").append(getMskRecordMetadata().toString()); + sb.append("}"); + return sb.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + + if (obj instanceof Record == false) + return false; + Record other = (Record) obj; + if (other.getKafkaRecordValue() == null ^ this.getKafkaRecordValue() == null) + return false; + if (other.getKafkaRecordValue() != null && other.getKafkaRecordValue().equals(this.getKafkaRecordValue()) == false) + return false; + if (other.getRecordId() == null ^ this.getRecordId() == null) + return false; + if (other.getRecordId() != null && other.getRecordId().equals(this.getRecordId()) == false) + return false; + if (other.getApproximateArrivalEpoch() == null ^ this.getApproximateArrivalEpoch() == null) + return false; + if (other.getApproximateArrivalEpoch() != null && other.getApproximateArrivalEpoch().equals(this.getApproximateArrivalEpoch()) == false) + return false; + if (other.getApproximateArrivalTimestamp() == null ^ this.getApproximateArrivalTimestamp() == null) + return false; + if (other.getApproximateArrivalTimestamp() != null && other.getApproximateArrivalTimestamp().equals(this.getApproximateArrivalTimestamp()) == false) + return false; + if (other.getMskRecordMetadata() == null ^ this.getMskRecordMetadata() == null) + return false; + if (other.getMskRecordMetadata() != null && other.getMskRecordMetadata().equals(this.getMskRecordMetadata()) == false) + return false; + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int hashCode = 1; + + hashCode = prime * hashCode + ((getKafkaRecordValue() == null) ? 0 : getKafkaRecordValue().hashCode()); + hashCode = prime * hashCode + ((getRecordId() == null) ? 0 : getRecordId().hashCode()); + hashCode = prime * hashCode + ((getApproximateArrivalEpoch() == null) ? 0 : getApproximateArrivalEpoch().hashCode()); + hashCode = prime * hashCode + ((getApproximateArrivalTimestamp() == null) ? 0 : getApproximateArrivalTimestamp().hashCode()); + hashCode = prime * hashCode + ((getMskRecordMetadata() == null) ? 0 : getMskRecordMetadata().hashCode()); + + return hashCode; + } + + @Override + public Record clone() { + try { + return (Record) super.clone(); + } catch (CloneNotSupportedException e) { + throw new IllegalStateException("Got a CloneNotSupportedException from Object.clone()", e); + } + } + + } + + /** + * default constructor + */ + public MskFirehoseEvent() {} + + /** + * @return invocation id + */ + public String getInvocationId() { + return this.invocationId; + } + + /** + * @param invocationId invocation id + */ + public void setInvocationId(String invocationId) { + this.invocationId = invocationId; + } + + /** + * @param invocationId invocation id + * @return MskFirehoseEvent + */ + public MskFirehoseEvent withInvocationId(String invocationId) { + setInvocationId(invocationId); + return this; + } + + /** + * @return delivery stream arn + */ + public String getDeliveryStreamArn() { + return this.deliveryStreamArn; + } + + /** + * @param deliveryStreamArn delivery stream arn + */ + public void setDeliveryStreamArn(String deliveryStreamArn) { + this.deliveryStreamArn = deliveryStreamArn; + } + + /**] + * @param deliveryStreamArn delivery stream arn + * @return MskFirehoseEvent + */ + public MskFirehoseEvent withDeliveryStreamArn(String deliveryStreamArn) { + setDeliveryStreamArn(deliveryStreamArn); + return this; + } + + /** + * @return region + */ + public String getRegion() { + return this.region; + } + + /** + * @param region aws region + */ + public void setRegion(String region) { + this.region = region; + } + + /** + * @param region aws region + * @return MskFirehoseEvent + */ + public MskFirehoseEvent withRegion(String region) { + setRegion(region); + return this; + } + + /** + * Gets the list of MSK event records + * + */ + public List getRecords() { + return records; + } + + /** + * Sets the list of MSK event records + * @param records a list of MSK event records + */ + public void setRecords(List records) { + this.records = records; + } + + /** + * @param records a list of MSK event records + * @return MskFirehoseEvent + */ + public MskFirehoseEvent withRecords(List records) { + setRecords(records); + return this; + } + + /** + * Returns a string representation of this object; useful for testing and debugging. + * + * @return A string representation of this object. + * + * @see Object#toString() + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + if (getInvocationId() != null) + sb.append("invocationId: ").append(getInvocationId()).append(","); + if (getDeliveryStreamArn() != null) + sb.append("deliveryStreamArn: ").append(getDeliveryStreamArn()).append(","); + if (getRegion() != null) + sb.append("region: ").append(getRegion()).append(","); + if (getRecords() != null) + sb.append("records: ").append(getRecords().toString()); + sb.append("}"); + return sb.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + + if (obj instanceof MskFirehoseEvent == false) + return false; + MskFirehoseEvent other = (MskFirehoseEvent) obj; + if (other.getInvocationId() == null ^ this.getInvocationId() == null) + return false; + if (other.getInvocationId() != null && other.getInvocationId().equals(this.getInvocationId()) == false) + return false; + if (other.getDeliveryStreamArn() == null ^ this.getDeliveryStreamArn() == null) + return false; + if (other.getDeliveryStreamArn() != null && other.getDeliveryStreamArn().equals(this.getDeliveryStreamArn()) == false) + return false; + if (other.getRegion() == null ^ this.getRegion() == null) + return false; + if (other.getRegion() != null && other.getRegion().equals(this.getRegion()) == false) + return false; + if (other.getRecords() == null ^ this.getRecords() == null) + return false; + if (other.getRecords() != null && other.getRecords().equals(this.getRecords()) == false) + return false; + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int hashCode = 1; + + hashCode = prime * hashCode + ((getInvocationId() == null) ? 0 : getInvocationId().hashCode()); + hashCode = prime * hashCode + ((getDeliveryStreamArn() == null) ? 0 : getDeliveryStreamArn().hashCode()); + hashCode = prime * hashCode + ((getRegion() == null) ? 0 : getRegion().hashCode()); + hashCode = prime * hashCode + ((getRecords() == null) ? 0 : getRecords().hashCode()); + return hashCode; + } + + + @Override + public MskFirehoseEvent clone() { + try { + return (MskFirehoseEvent) super.clone(); + } catch (CloneNotSupportedException e) { + throw new IllegalStateException("Got a CloneNotSupportedException from Object.clone()", e); + } + } +} From 7df36454e631432243b6ae98c04f600a76a8aebd Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Mon, 1 Jul 2024 15:37:56 +0530 Subject: [PATCH 02/28] Update MskFirehoseEvent.java --- .../services/lambda/runtime/events/MskFirehoseEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java index e5820fe9..23e7dcc5 100644 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java @@ -11,7 +11,7 @@ * and limitations under the License. */ -package events; +package com.amazonaws.services.lambda.runtime.events; import java.io.Serializable; import java.nio.ByteBuffer; From f6f4a60186f8e3df1eb1f9c94882af82210ebfed Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Mon, 1 Jul 2024 19:40:49 +0530 Subject: [PATCH 03/28] Update MskFirehoseEvent.java Changed packaging and used Lombok. --- .../runtime/events/MskFirehoseEvent.java | 393 ++---------------- 1 file changed, 35 insertions(+), 358 deletions(-) diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java index 23e7dcc5..cc36f087 100644 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java @@ -1,26 +1,43 @@ -/* - * Copyright 2012-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with - * the License. A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package com.amazonaws.services.lambda.runtime.events; +package com.amazonaws.services.lambda.runtime.events;; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + /** * Created by vermshas on 6/28/24. + * Event format is below: + * { + * "invocationId": "", + * "sourceMSKArn": "", + * "deliveryStreamArn": "", + * "region": "us-east-1", + * "records": [ + * { + * "recordId": "00000000000000000000000000000000000000000000000000000000000000", + * "approximateArrivalTimestamp": 1716369573887, + * "mskRecordMetadata": { + * "offset": "0", + * "partitionId": "1", + * "approximateArrivalTimestamp": 1716369573887 + * }, + * "kafkaRecordValue": "" + * } + * ] + * } */ + +@Getter +@Setter +@ToString +@EqualsAndHashCode + public class MskFirehoseEvent implements Serializable, Cloneable { private static final long serialVersionUID = -2890373471008001695L; @@ -33,16 +50,14 @@ public class MskFirehoseEvent implements Serializable, Cloneable { private List records; + @Getter + @Setter + @ToString + @EqualsAndHashCode public static class Record implements Serializable, Cloneable { private static final long serialVersionUID = -7231161900431910379L; - /** - *

- * The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before - * base64-encoding, is 1,000 KB. - *

- */ private ByteBuffer kafkaRecordValue; private String recordId; @@ -53,219 +68,28 @@ public static class Record implements Serializable, Cloneable { private Map mskRecordMetadata; - /** - * default constructor - */ public Record() {} - /** - *

- * The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before - * base64-encoding, is 1,000 KB. - *

- *

- * The AWS SDK for Java performs a Base64 encoding on this field before sending this request to the AWS service. - * Users of the SDK should not perform Base64 encoding on this field. - *

- *

- * Warning: ByteBuffers returned by the SDK are mutable. Changes to the content or position of the byte buffer will - * be seen by all objects that have a reference to this object. It is recommended to call ByteBuffer.duplicate() or - * ByteBuffer.asReadOnlyBuffer() before using or reading from the buffer. This behavior will be changed in a future - * major version of the SDK. - *

- * - * @param kafkaRecordValue - * The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, - * before base64-encoding, is 1,000 KB. - */ - public void setKafkaRecordValue(ByteBuffer kafkaRecordValue) { - this.kafkaRecordValue = kafkaRecordValue; - } - - /** - *

- * The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before - * base64-encoding, is 1,000 KB. - *

- *

- * {@code ByteBuffer}s are stateful. Calling their {@code get} methods changes their {@code position}. We recommend - * using {@link java.nio.ByteBuffer#asReadOnlyBuffer()} to create a read-only view of the buffer with an independent - * {@code position}, and calling {@code get} methods on this rather than directly on the returned {@code ByteBuffer}. - * Doing so will ensure that anyone else using the {@code ByteBuffer} will not be affected by changes to the - * {@code position}. - *

- * - * @return The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, - * before base64-encoding, is 1,000 KB. - */ - public ByteBuffer getKafkaRecordValue() { - return this.kafkaRecordValue; - } - - /** - * @return record id - */ - public String getRecordId() { - return this.recordId; - } - - /** - * @param recordId record id - */ - public void setRecordId(String recordId) { - this.recordId = recordId; - } - - /** - * @param recordId record id - * @return Record - */ public Record withRecordId(String recordId) { setRecordId(recordId); return this; } - /** - * @return approximate arrival epoch - */ - public Long getApproximateArrivalEpoch() { - return this.approximateArrivalEpoch; - } - - /** - * @param approximateArrivalEpoch Long epoch - */ - public void setApproximateArrivalEpoch(Long approximateArrivalEpoch) { - this.approximateArrivalEpoch = approximateArrivalEpoch; - } - - /** - * @param approximateArrivalEpoch Long epoch - * @return Record - */ public Record withApproximateArrivalEpoch(Long approximateArrivalEpoch) { setApproximateArrivalEpoch(approximateArrivalEpoch); return this; } - /** - * @return approximate arrival timestamp - */ - public Long getApproximateArrivalTimestamp() { - return this.approximateArrivalTimestamp; - } - - /** - * @param approximateArrivalTimestamp approximate arrival timestamp - */ - public void setApproximateArrivalTimestamp(Long approximateArrivalTimestamp) { - this.approximateArrivalTimestamp = approximateArrivalTimestamp; - } - - /** - * @param approximateArrivalTimestamp approximate arrival timestamp - * @return Record - */ public Record withApproximateArrivalTimestamp(Long approximateArrivalTimestamp) { setApproximateArrivalTimestamp(approximateArrivalTimestamp); return this; } - /** - * @return Msk record meta data - */ - public Map getMskRecordMetadata() { - return this.mskRecordMetadata; - } - - /** - * @param mskRecordMetadata Msk record metadata - */ - public void setMskRecordMetadata(Map mskRecordMetadata) { - this.mskRecordMetadata = mskRecordMetadata; - } - - /** - * @param mskRecordMetadata Msk record metadata - * @return Record - */ public Record withMskRecordMetadata(Map mskRecordMetadata) { setMskRecordMetadata(mskRecordMetadata); return this; } - /** - * Returns a string representation of this object; useful for testing and debugging. - * - * @return A string representation of this object. - * - * @see Object#toString() - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{"); - if (getKafkaRecordValue() != null) - sb.append("data: ").append(getKafkaRecordValue().toString()).append(","); - if (getRecordId() != null) - sb.append("recordId: ").append(getRecordId()).append(","); - if (getApproximateArrivalEpoch() != null) - sb.append("approximateArrivalEpoch: ").append(getApproximateArrivalEpoch().toString()).append(","); - if (getApproximateArrivalTimestamp() != null) - sb.append("approximateArrivalTimestamp: ").append(getApproximateArrivalTimestamp().toString()).append(","); - if (getMskRecordMetadata() != null) - sb.append("mskRecordMetadata: ").append(getMskRecordMetadata().toString()); - sb.append("}"); - return sb.toString(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - - if (obj instanceof Record == false) - return false; - Record other = (Record) obj; - if (other.getKafkaRecordValue() == null ^ this.getKafkaRecordValue() == null) - return false; - if (other.getKafkaRecordValue() != null && other.getKafkaRecordValue().equals(this.getKafkaRecordValue()) == false) - return false; - if (other.getRecordId() == null ^ this.getRecordId() == null) - return false; - if (other.getRecordId() != null && other.getRecordId().equals(this.getRecordId()) == false) - return false; - if (other.getApproximateArrivalEpoch() == null ^ this.getApproximateArrivalEpoch() == null) - return false; - if (other.getApproximateArrivalEpoch() != null && other.getApproximateArrivalEpoch().equals(this.getApproximateArrivalEpoch()) == false) - return false; - if (other.getApproximateArrivalTimestamp() == null ^ this.getApproximateArrivalTimestamp() == null) - return false; - if (other.getApproximateArrivalTimestamp() != null && other.getApproximateArrivalTimestamp().equals(this.getApproximateArrivalTimestamp()) == false) - return false; - if (other.getMskRecordMetadata() == null ^ this.getMskRecordMetadata() == null) - return false; - if (other.getMskRecordMetadata() != null && other.getMskRecordMetadata().equals(this.getMskRecordMetadata()) == false) - return false; - return true; - } - - @Override - public int hashCode() { - final int prime = 31; - int hashCode = 1; - - hashCode = prime * hashCode + ((getKafkaRecordValue() == null) ? 0 : getKafkaRecordValue().hashCode()); - hashCode = prime * hashCode + ((getRecordId() == null) ? 0 : getRecordId().hashCode()); - hashCode = prime * hashCode + ((getApproximateArrivalEpoch() == null) ? 0 : getApproximateArrivalEpoch().hashCode()); - hashCode = prime * hashCode + ((getApproximateArrivalTimestamp() == null) ? 0 : getApproximateArrivalTimestamp().hashCode()); - hashCode = prime * hashCode + ((getMskRecordMetadata() == null) ? 0 : getMskRecordMetadata().hashCode()); - - return hashCode; - } - @Override public Record clone() { try { @@ -276,171 +100,24 @@ public Record clone() { } } - - /** - * default constructor - */ public MskFirehoseEvent() {} - - /** - * @return invocation id - */ - public String getInvocationId() { - return this.invocationId; - } - - /** - * @param invocationId invocation id - */ - public void setInvocationId(String invocationId) { - this.invocationId = invocationId; - } - - /** - * @param invocationId invocation id - * @return MskFirehoseEvent - */ public MskFirehoseEvent withInvocationId(String invocationId) { setInvocationId(invocationId); return this; } - - /** - * @return delivery stream arn - */ - public String getDeliveryStreamArn() { - return this.deliveryStreamArn; - } - - /** - * @param deliveryStreamArn delivery stream arn - */ - public void setDeliveryStreamArn(String deliveryStreamArn) { - this.deliveryStreamArn = deliveryStreamArn; - } - - /**] - * @param deliveryStreamArn delivery stream arn - * @return MskFirehoseEvent - */ public MskFirehoseEvent withDeliveryStreamArn(String deliveryStreamArn) { setDeliveryStreamArn(deliveryStreamArn); return this; } - - /** - * @return region - */ - public String getRegion() { - return this.region; - } - - /** - * @param region aws region - */ - public void setRegion(String region) { - this.region = region; - } - - /** - * @param region aws region - * @return MskFirehoseEvent - */ public MskFirehoseEvent withRegion(String region) { setRegion(region); return this; } - - /** - * Gets the list of MSK event records - * - */ - public List getRecords() { - return records; - } - - /** - * Sets the list of MSK event records - * @param records a list of MSK event records - */ - public void setRecords(List records) { - this.records = records; - } - - /** - * @param records a list of MSK event records - * @return MskFirehoseEvent - */ public MskFirehoseEvent withRecords(List records) { setRecords(records); return this; } - /** - * Returns a string representation of this object; useful for testing and debugging. - * - * @return A string representation of this object. - * - * @see Object#toString() - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{"); - if (getInvocationId() != null) - sb.append("invocationId: ").append(getInvocationId()).append(","); - if (getDeliveryStreamArn() != null) - sb.append("deliveryStreamArn: ").append(getDeliveryStreamArn()).append(","); - if (getRegion() != null) - sb.append("region: ").append(getRegion()).append(","); - if (getRecords() != null) - sb.append("records: ").append(getRecords().toString()); - sb.append("}"); - return sb.toString(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - - if (obj instanceof MskFirehoseEvent == false) - return false; - MskFirehoseEvent other = (MskFirehoseEvent) obj; - if (other.getInvocationId() == null ^ this.getInvocationId() == null) - return false; - if (other.getInvocationId() != null && other.getInvocationId().equals(this.getInvocationId()) == false) - return false; - if (other.getDeliveryStreamArn() == null ^ this.getDeliveryStreamArn() == null) - return false; - if (other.getDeliveryStreamArn() != null && other.getDeliveryStreamArn().equals(this.getDeliveryStreamArn()) == false) - return false; - if (other.getRegion() == null ^ this.getRegion() == null) - return false; - if (other.getRegion() != null && other.getRegion().equals(this.getRegion()) == false) - return false; - if (other.getRecords() == null ^ this.getRecords() == null) - return false; - if (other.getRecords() != null && other.getRecords().equals(this.getRecords()) == false) - return false; - return true; - } - - @Override - public int hashCode() { - final int prime = 31; - int hashCode = 1; - - hashCode = prime * hashCode + ((getInvocationId() == null) ? 0 : getInvocationId().hashCode()); - hashCode = prime * hashCode + ((getDeliveryStreamArn() == null) ? 0 : getDeliveryStreamArn().hashCode()); - hashCode = prime * hashCode + ((getRegion() == null) ? 0 : getRegion().hashCode()); - hashCode = prime * hashCode + ((getRecords() == null) ? 0 : getRecords().hashCode()); - return hashCode; - } - - @Override public MskFirehoseEvent clone() { try { From 51b2640e7163ff565a14f7b36b27066e9c71bec1 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Mon, 1 Jul 2024 19:50:44 +0530 Subject: [PATCH 04/28] Update MskFirehoseEvent.java --- .../services/lambda/runtime/events/MskFirehoseEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java index cc36f087..71dadd94 100644 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java @@ -1,4 +1,4 @@ -package com.amazonaws.services.lambda.runtime.events;; +package com.amazonaws.services.lambda.runtime.events; import java.io.Serializable; import java.nio.ByteBuffer; From 7d689bbb011f5c2e18981e830f892facd0f91165 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Mon, 1 Jul 2024 20:16:25 +0530 Subject: [PATCH 05/28] Update README.md Added MSKFirehoseEvent --- aws-lambda-java-events/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/aws-lambda-java-events/README.md b/aws-lambda-java-events/README.md index 8c60b3a2..b3c145c1 100644 --- a/aws-lambda-java-events/README.md +++ b/aws-lambda-java-events/README.md @@ -44,6 +44,7 @@ * `KinesisFirehoseEvent` * `LambdaDestinationEvent` * `LexEvent` +* `MSKFirehoseEvent` * `RabbitMQEvent` * `S3BatchEvent` * `S3BatchResponse` From eb08bbeb20772d641ee72d5ea34cb244e11438b6 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Mon, 1 Jul 2024 20:19:04 +0530 Subject: [PATCH 06/28] Update and rename MskFirehoseEvent.java to MSKFirehoseEvent.java --- .../runtime/events/MSKFirehoseEvent.java | 68 +++++++++ .../runtime/events/MskFirehoseEvent.java | 129 ------------------ 2 files changed, 68 insertions(+), 129 deletions(-) create mode 100644 aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java delete mode 100644 aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java new file mode 100644 index 00000000..70d0796e --- /dev/null +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java @@ -0,0 +1,68 @@ +package com.amazonaws.services.lambda.runtime.events; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Created by vermshas on 6/28/24. + * { + * "invocationId": "", + * "sourceMSKArn": "", + * "deliveryStreamArn": "", + * "region": "us-east-1", + * "records": [ + * { + * "recordId": "00000000000000000000000000000000000000000000000000000000000000", + * "approximateArrivalTimestamp": 1716369573887, + * "mskRecordMetadata": { + * "offset": "0", + * "partitionId": "1", + * "approximateArrivalTimestamp": 1716369573887 + * }, + * "kafkaRecordValue": "" + * } + * ] + * } + */ + +@Data +@Builder(setterPrefix = "with") +@NoArgsConstructor +@AllArgsConstructor + +public class MSKFirehoseEvent { + + private String invocationId; + + private String deliveryStreamArn; + + private String sourceMSKArn; + + private String region; + + private List records; + + @Data + @Builder(setterPrefix = "with") + @NoArgsConstructor + @AllArgsConstructor + public static class Record { + + private ByteBuffer kafkaRecordValue; + + private String recordId; + + private Long approximateArrivalEpoch; + + private Long approximateArrivalTimestamp; + + private Map mskRecordMetadata; + + } +} diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java deleted file mode 100644 index 71dadd94..00000000 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MskFirehoseEvent.java +++ /dev/null @@ -1,129 +0,0 @@ -package com.amazonaws.services.lambda.runtime.events; - -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - -/** - * Created by vermshas on 6/28/24. - * Event format is below: - * { - * "invocationId": "", - * "sourceMSKArn": "", - * "deliveryStreamArn": "", - * "region": "us-east-1", - * "records": [ - * { - * "recordId": "00000000000000000000000000000000000000000000000000000000000000", - * "approximateArrivalTimestamp": 1716369573887, - * "mskRecordMetadata": { - * "offset": "0", - * "partitionId": "1", - * "approximateArrivalTimestamp": 1716369573887 - * }, - * "kafkaRecordValue": "" - * } - * ] - * } - */ - -@Getter -@Setter -@ToString -@EqualsAndHashCode - -public class MskFirehoseEvent implements Serializable, Cloneable { - - private static final long serialVersionUID = -2890373471008001695L; - - private String invocationId; - - private String deliveryStreamArn; - - private String region; - - private List records; - - @Getter - @Setter - @ToString - @EqualsAndHashCode - public static class Record implements Serializable, Cloneable { - - private static final long serialVersionUID = -7231161900431910379L; - - private ByteBuffer kafkaRecordValue; - - private String recordId; - - private Long approximateArrivalEpoch; - - private Long approximateArrivalTimestamp; - - private Map mskRecordMetadata; - - public Record() {} - - public Record withRecordId(String recordId) { - setRecordId(recordId); - return this; - } - - public Record withApproximateArrivalEpoch(Long approximateArrivalEpoch) { - setApproximateArrivalEpoch(approximateArrivalEpoch); - return this; - } - - public Record withApproximateArrivalTimestamp(Long approximateArrivalTimestamp) { - setApproximateArrivalTimestamp(approximateArrivalTimestamp); - return this; - } - - public Record withMskRecordMetadata(Map mskRecordMetadata) { - setMskRecordMetadata(mskRecordMetadata); - return this; - } - - @Override - public Record clone() { - try { - return (Record) super.clone(); - } catch (CloneNotSupportedException e) { - throw new IllegalStateException("Got a CloneNotSupportedException from Object.clone()", e); - } - } - - } - public MskFirehoseEvent() {} - public MskFirehoseEvent withInvocationId(String invocationId) { - setInvocationId(invocationId); - return this; - } - public MskFirehoseEvent withDeliveryStreamArn(String deliveryStreamArn) { - setDeliveryStreamArn(deliveryStreamArn); - return this; - } - public MskFirehoseEvent withRegion(String region) { - setRegion(region); - return this; - } - public MskFirehoseEvent withRecords(List records) { - setRecords(records); - return this; - } - - @Override - public MskFirehoseEvent clone() { - try { - return (MskFirehoseEvent) super.clone(); - } catch (CloneNotSupportedException e) { - throw new IllegalStateException("Got a CloneNotSupportedException from Object.clone()", e); - } - } -} From 5eb704218775f3f755ff17fb5098abcf19e2cdef Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 00:29:06 +0530 Subject: [PATCH 07/28] Create MSKFirehoseResponse.java This is a response class for the Lambda transformation function. --- .../runtime/events/MSKFirehoseResponse.java | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java new file mode 100644 index 00000000..b923eef6 --- /dev/null +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java @@ -0,0 +1,54 @@ +package com.amazonaws.services.lambda.runtime.events; + +import java.nio.ByteBuffer; +import java.util.List; + +import lombok.*; + +/** + * Response model for Amazon Data Firehose Lambda transformation with MSK as a source. + */ + +@Data +@Builder(setterPrefix = "with") +@NoArgsConstructor +@AllArgsConstructor + +public class MSKFirehoseResponse { + + public enum Result { + + /** + * Indicates that processing of this item succeeded. + */ + Ok, + + /** + * Indicate that the processing of this item failed + */ + ProcessingFailed, + + /** + * Indicates that this item should be silently dropped + */ + Dropped + } + public List records; + + @Data + @Builder(setterPrefix = "with") + @NoArgsConstructor + + public static class Record { + public String recordId; + public Result result; + public ByteBuffer kafkaRecordValue; + + public Record(String recordId, Result result, ByteBuffer kafkaRecordValue) { + super(); + this.recordId = recordId; + this.result = result; + this.kafkaRecordValue = kafkaRecordValue; + } + } +} From a720f87840dd5496317d88d6edb6292f0a47a3be Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 00:32:58 +0530 Subject: [PATCH 08/28] Create msk_firehose_event.json --- .../src/test/resources/msk_firehose_event.json | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 aws-lambda-java-tests/src/test/resources/msk_firehose_event.json diff --git a/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json b/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json new file mode 100644 index 00000000..91c4b420 --- /dev/null +++ b/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json @@ -0,0 +1,18 @@ +{ + "invocationId": "12345621-4787-0000-a418-36e56Example", + "sourceMSKArn": "", + "deliveryStreamArn": "", + "region": "us-east-1", + "records": [ + { + "recordId": "00000000000000000000000000000000000000000000000000000000000000", + "approximateArrivalTimestamp": 1716369573887, + "mskRecordMetadata": { + "offset": "0", + "partitionId": "1", + "approximateArrivalTimestamp": 1716369573887 + }, + "kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ==" + } + ] +} From df1e0e43524218f266eadae1b45bacf23eae5015 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 00:35:19 +0530 Subject: [PATCH 09/28] Create MSKFirehoseEventHandler.java --- .../java/example/MSKFirehoseEventHandler.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java diff --git a/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java new file mode 100644 index 00000000..91bfd38e --- /dev/null +++ b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java @@ -0,0 +1,34 @@ +package example; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent; +import org.json.JSONObject; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * A sample MSKFirehoseEvent handler + * For more information see the developer guide - ... + */ +public class MSKFirehoseEventHandler implements RequestHandler { + + @Override + public MSKFirehoseResponse handleRequest(MSKFirehoseEvent kinesisFirehoseEvent, Context context) { + List records = new ArrayList<>(); + + for (MSKFirehoseEvent.Record record : kinesisFirehoseEvent.getRecords()) { + String recordData = new String(record.getKafkaRecordValue().array()); + // Your business logic + JSONObject jsonObject = new JSONObject(recordData); + records.add(new MSKFirehoseResponse.Record(record.getRecordId(), MSKFirehoseResponse.Result.Ok, encode(jsonObject.toString()))); + } + return new MSKFirehoseResponse(records); + } + private ByteBuffer encode(String content) { + return ByteBuffer.wrap(content.getBytes()); + } +} From 57a9a813c04eda8e9c97fa885d6c9701fa3a302a Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 00:37:56 +0530 Subject: [PATCH 10/28] Create MSKFirehoseEventHandlerTest.java Adding test class. --- .../example/MSKFirehoseEventHandlerTest.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java diff --git a/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java b/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java new file mode 100644 index 00000000..0dc0af58 --- /dev/null +++ b/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java @@ -0,0 +1,27 @@ +package example; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class MSKFirehoseEventHandlerTest { + + private Context context; // intentionally null as it's not used in the test + + @ParameterizedTest + @Event(value = "event.json", type = MSKFirehoseEvent.class) + public void testEventHandler(MSKFirehoseEvent event) { + Sample Sample = new Sample(); + MSKFirehoseResponse response = Sample.handleRequest(event, context); + + String expectedString = "{\"Name\":\"Hello World\"}"; + MSKFirehoseResponse.Record firstRecord = response.getRecords().get(0); + Assertions.assertEquals(expectedString, UTF_8.decode(firstRecord.getKafkaRecordValue()).toString()); + Assertions.assertEquals(MSKFirehoseResponse.Result.Ok, firstRecord.getResult()); + } +} From 5ba689e66cff46e77c73bba2f80b1e4150df4295 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 00:38:52 +0530 Subject: [PATCH 11/28] Create event.json Creating test event. --- .../src/test/resources/event.json | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 samples/msk-firehose-event-handler/src/test/resources/event.json diff --git a/samples/msk-firehose-event-handler/src/test/resources/event.json b/samples/msk-firehose-event-handler/src/test/resources/event.json new file mode 100644 index 00000000..91c4b420 --- /dev/null +++ b/samples/msk-firehose-event-handler/src/test/resources/event.json @@ -0,0 +1,18 @@ +{ + "invocationId": "12345621-4787-0000-a418-36e56Example", + "sourceMSKArn": "", + "deliveryStreamArn": "", + "region": "us-east-1", + "records": [ + { + "recordId": "00000000000000000000000000000000000000000000000000000000000000", + "approximateArrivalTimestamp": 1716369573887, + "mskRecordMetadata": { + "offset": "0", + "partitionId": "1", + "approximateArrivalTimestamp": 1716369573887 + }, + "kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ==" + } + ] +} From f240a629b85ee4d97e40da1fb925a05788fec8fb Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 00:39:48 +0530 Subject: [PATCH 12/28] Update README.md added MSKFirehoseResponse --- aws-lambda-java-events/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/aws-lambda-java-events/README.md b/aws-lambda-java-events/README.md index b3c145c1..aaa2a38f 100644 --- a/aws-lambda-java-events/README.md +++ b/aws-lambda-java-events/README.md @@ -45,6 +45,7 @@ * `LambdaDestinationEvent` * `LexEvent` * `MSKFirehoseEvent` +* `MSKFirehoseResponse` * `RabbitMQEvent` * `S3BatchEvent` * `S3BatchResponse` From 5abcb06999b11d5ba0e76d6118319595f5b3be6f Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 20:05:03 +0530 Subject: [PATCH 13/28] Update MSKFirehoseEventHandler.java Changed naming conventions. --- .../src/main/java/example/MSKFirehoseEventHandler.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java index 91bfd38e..f4a35658 100644 --- a/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java +++ b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java @@ -2,8 +2,8 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse; -import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent; +import model.MSKFirehoseResponse; +import events.MSKFirehoseEvent; import org.json.JSONObject; import java.nio.ByteBuffer; @@ -14,13 +14,13 @@ * A sample MSKFirehoseEvent handler * For more information see the developer guide - ... */ -public class MSKFirehoseEventHandler implements RequestHandler { +public class Sample implements RequestHandler { @Override - public MSKFirehoseResponse handleRequest(MSKFirehoseEvent kinesisFirehoseEvent, Context context) { + public MSKFirehoseResponse handleRequest(MSKFirehoseEvent MSKFirehoseEvent, Context context) { List records = new ArrayList<>(); - for (MSKFirehoseEvent.Record record : kinesisFirehoseEvent.getRecords()) { + for (MSKFirehoseEvent.Record record : MSKFirehoseEvent.getRecords()) { String recordData = new String(record.getKafkaRecordValue().array()); // Your business logic JSONObject jsonObject = new JSONObject(recordData); From f29f105e0584beee230e181dce4e21ec1bc20f5e Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 20:20:27 +0530 Subject: [PATCH 14/28] Update MSKFirehoseEventHandler.java Changed packaging for event handler. --- .../src/main/java/example/MSKFirehoseEventHandler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java index f4a35658..928cd141 100644 --- a/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java +++ b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java @@ -2,8 +2,8 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; -import model.MSKFirehoseResponse; -import events.MSKFirehoseEvent; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse; +import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent; import org.json.JSONObject; import java.nio.ByteBuffer; @@ -14,7 +14,7 @@ * A sample MSKFirehoseEvent handler * For more information see the developer guide - ... */ -public class Sample implements RequestHandler { +public class MSKFirehoseEventHandler implements RequestHandler { @Override public MSKFirehoseResponse handleRequest(MSKFirehoseEvent MSKFirehoseEvent, Context context) { From 24e1b10dbc5e79b914d0b764a280c4d88fb1484d Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 20:21:33 +0530 Subject: [PATCH 15/28] Update MSKFirehoseEventHandlerTest.java --- .../src/test/java/example/MSKFirehoseEventHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java b/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java index 0dc0af58..3de55d54 100644 --- a/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java +++ b/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java @@ -16,7 +16,7 @@ public class MSKFirehoseEventHandlerTest { @ParameterizedTest @Event(value = "event.json", type = MSKFirehoseEvent.class) public void testEventHandler(MSKFirehoseEvent event) { - Sample Sample = new Sample(); + MSKFirehoseEventHandler Sample = new MSKFirehoseEventHandler(); MSKFirehoseResponse response = Sample.handleRequest(event, context); String expectedString = "{\"Name\":\"Hello World\"}"; From feec28f06333e48195e6d8bc66121c6d83877c9d Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 3 Jul 2024 20:40:16 +0530 Subject: [PATCH 16/28] Update MSKFirehoseEvent.java Removed the event structure, we do not have documents stating the event structure for this case. --- .../runtime/events/MSKFirehoseEvent.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java index 70d0796e..620c743b 100644 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java @@ -11,24 +11,6 @@ /** * Created by vermshas on 6/28/24. - * { - * "invocationId": "", - * "sourceMSKArn": "", - * "deliveryStreamArn": "", - * "region": "us-east-1", - * "records": [ - * { - * "recordId": "00000000000000000000000000000000000000000000000000000000000000", - * "approximateArrivalTimestamp": 1716369573887, - * "mskRecordMetadata": { - * "offset": "0", - * "partitionId": "1", - * "approximateArrivalTimestamp": 1716369573887 - * }, - * "kafkaRecordValue": "" - * } - * ] - * } */ @Data From 6fa478eda52b51b4437a9acb086a1b71bf93cc02 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Thu, 4 Jul 2024 03:03:30 +0530 Subject: [PATCH 17/28] Update MSKFirehoseResponse.java Expanded the Lombok import, updated java doc for the class and removed the record constructor. --- .../runtime/events/MSKFirehoseResponse.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java index b923eef6..b34d7c08 100644 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java @@ -3,10 +3,17 @@ import java.nio.ByteBuffer; import java.util.List; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; /** * Response model for Amazon Data Firehose Lambda transformation with MSK as a source. + * [+] Amazon Data Firehose Data Transformation - Data Transformation and Status Model - ... + * OK : Indicates that processing of this item succeeded. + * ProcessingFailed : Indicate that the processing of this item failed. + * Dropped : Indicates that this item should be silently dropped */ @Data @@ -17,12 +24,12 @@ public class MSKFirehoseResponse { public enum Result { - + /** * Indicates that processing of this item succeeded. */ Ok, - + /** * Indicate that the processing of this item failed */ @@ -36,19 +43,14 @@ public enum Result { public List records; @Data - @Builder(setterPrefix = "with") @NoArgsConstructor + @Builder(setterPrefix = "with") + @AllArgsConstructor public static class Record { public String recordId; public Result result; public ByteBuffer kafkaRecordValue; - public Record(String recordId, Result result, ByteBuffer kafkaRecordValue) { - super(); - this.recordId = recordId; - this.result = result; - this.kafkaRecordValue = kafkaRecordValue; - } } } From 243253bc82d4cd39f6fdcce3424a8f09c17cf24d Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Thu, 4 Jul 2024 03:38:14 +0530 Subject: [PATCH 18/28] Update EventLoader.java Added new test. --- .../amazonaws/services/lambda/runtime/tests/EventLoader.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java b/aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java index aa600749..601d2f3f 100644 --- a/aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java +++ b/aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java @@ -89,6 +89,10 @@ public static LexEvent loadLexEvent(String filename) { return loadEvent(filename, LexEvent.class); } + public static MSKFirehoseEvent loadMSKFirehoseEvent(String filename) { + return loadEvent(filename, MSKFirehoseEvent.class); + } + public static S3Event loadS3Event(String filename) { return loadEvent(filename, S3Event.class); } From 4505f6d1e3468e2cc31a5db446806eb285dac10c Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Thu, 4 Jul 2024 03:41:24 +0530 Subject: [PATCH 19/28] Update EventLoaderTest.java --- .../services/lambda/runtime/tests/EventLoaderTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java index 1c9d17e1..44d6be21 100644 --- a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java +++ b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java @@ -118,6 +118,16 @@ public void testLoadKinesisFirehoseEvent() { assertThat(event.getRecords().get(0).getData().array()).asString().isEqualTo("Hello, this is a test 123."); } + @Test + public void testLoadMSKFirehoseEvent() { + MSKFirehoseEvent event = EventLoader.loadMSKFirehoseEvent("msk_firehose_event.json"); + + assertThat(event).isNotNull(); + assertThat(event.getDeliveryStreamArn()).isEqualTo("arn:aws:kinesis:EXAMPLE"); + assertThat(event.getRecords()).hasSize(1); + assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{"Name":"Hello World"}"); + } + @Test public void testLoadS3Event() { S3Event event = EventLoader.loadS3Event("s3_event.json"); From cb370720c5d75d59f784858de4cd34469143b7b9 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Thu, 4 Jul 2024 04:01:58 +0530 Subject: [PATCH 20/28] Update EventLoaderTest.java escaped quotes in JSON --- .../services/lambda/runtime/tests/EventLoaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java index 44d6be21..63dbacaf 100644 --- a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java +++ b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java @@ -125,7 +125,7 @@ public void testLoadMSKFirehoseEvent() { assertThat(event).isNotNull(); assertThat(event.getDeliveryStreamArn()).isEqualTo("arn:aws:kinesis:EXAMPLE"); assertThat(event.getRecords()).hasSize(1); - assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{"Name":"Hello World"}"); + assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{\"Name\":\"Hello World\"}"); } @Test From 8239c4aa8b5549566a6f2290a3d44604aa4e915e Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Thu, 4 Jul 2024 04:14:32 +0530 Subject: [PATCH 21/28] Update msk_firehose_event.json Added values to ARN fields. --- .../src/test/resources/msk_firehose_event.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json b/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json index 91c4b420..6b839912 100644 --- a/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json +++ b/aws-lambda-java-tests/src/test/resources/msk_firehose_event.json @@ -1,7 +1,7 @@ { "invocationId": "12345621-4787-0000-a418-36e56Example", - "sourceMSKArn": "", - "deliveryStreamArn": "", + "sourceMSKArn": "arn:aws:kafka:EXAMPLE", + "deliveryStreamArn": "arn:aws:firehose:EXAMPLE", "region": "us-east-1", "records": [ { From 5769d769ff939c343141489d05040ff9e4bebc58 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Thu, 4 Jul 2024 04:46:11 +0530 Subject: [PATCH 22/28] Update EventLoaderTest.java --- .../services/lambda/runtime/tests/EventLoaderTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java index 63dbacaf..d0e92ecc 100644 --- a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java +++ b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java @@ -121,9 +121,10 @@ public void testLoadKinesisFirehoseEvent() { @Test public void testLoadMSKFirehoseEvent() { MSKFirehoseEvent event = EventLoader.loadMSKFirehoseEvent("msk_firehose_event.json"); - + assertThat(event).isNotNull(); - assertThat(event.getDeliveryStreamArn()).isEqualTo("arn:aws:kinesis:EXAMPLE"); + assertThat(event.getSourceMSKArn()).isEqualTo("arn:aws:kafka:EXAMPLE"); + assertThat(event.getDeliveryStreamArn()).isEqualTo("arn:aws:firehose:EXAMPLE"); assertThat(event.getRecords()).hasSize(1); assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{\"Name\":\"Hello World\"}"); } From e12da265f4ddda0c89129be5d9ff9e0d939a7820 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:19:38 +0530 Subject: [PATCH 23/28] Update EventLoaderTest.java added MSKMetadata check and Approximate arrivaltimestamp. --- .../services/lambda/runtime/tests/EventLoaderTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java index d0e92ecc..9e5f8db4 100644 --- a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java +++ b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java @@ -127,6 +127,13 @@ public void testLoadMSKFirehoseEvent() { assertThat(event.getDeliveryStreamArn()).isEqualTo("arn:aws:firehose:EXAMPLE"); assertThat(event.getRecords()).hasSize(1); assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{\"Name\":\"Hello World\"}"); + assertThat(event.getRecords().get(0).getApproximateArrivalTimestamp()).asString().isEqualTo("1716369573887"); + assertThat(event.getRecords().get(0).getMskRecordMetadata()).asString().isEqualTo("{\n" + + " \"offset\": \"0\",\n" + + " \"partitionId\": \"1\",\n" + + " \"approximateArrivalTimestamp\": 1716369573887\n" + + " }"); + } @Test From 6fd7282ebc001ec0d24b2638d4f31e63b1400521 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:43:00 +0530 Subject: [PATCH 24/28] Update EventLoaderTest.java --- .../services/lambda/runtime/tests/EventLoaderTest.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java index 9e5f8db4..12dc436c 100644 --- a/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java +++ b/aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java @@ -128,12 +128,7 @@ public void testLoadMSKFirehoseEvent() { assertThat(event.getRecords()).hasSize(1); assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{\"Name\":\"Hello World\"}"); assertThat(event.getRecords().get(0).getApproximateArrivalTimestamp()).asString().isEqualTo("1716369573887"); - assertThat(event.getRecords().get(0).getMskRecordMetadata()).asString().isEqualTo("{\n" + - " \"offset\": \"0\",\n" + - " \"partitionId\": \"1\",\n" + - " \"approximateArrivalTimestamp\": 1716369573887\n" + - " }"); - + assertThat(event.getRecords().get(0).getMskRecordMetadata()).asString().isEqualTo("{offset=0, partitionId=1, approximateArrivalTimestamp=1716369573887}"); } @Test From 4c14fec2013701c87f33ca61775ca2ddd95c9961 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Tue, 9 Jul 2024 21:45:58 +0530 Subject: [PATCH 25/28] Update MSKFirehoseEvent.java added copyright text. --- .../services/lambda/runtime/events/MSKFirehoseEvent.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java index 620c743b..1af40ce4 100644 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseEvent.java @@ -1,3 +1,8 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + package com.amazonaws.services.lambda.runtime.events; import java.nio.ByteBuffer; @@ -9,10 +14,6 @@ import lombok.Data; import lombok.NoArgsConstructor; -/** - * Created by vermshas on 6/28/24. - */ - @Data @Builder(setterPrefix = "with") @NoArgsConstructor From b3b5f6101dc80f26d224e8ed11eb4c72fe6fd5df Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Tue, 9 Jul 2024 21:46:16 +0530 Subject: [PATCH 26/28] Update MSKFirehoseResponse.java Added Copyright text. --- .../services/lambda/runtime/events/MSKFirehoseResponse.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java index b34d7c08..18b5aa13 100644 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/MSKFirehoseResponse.java @@ -1,3 +1,8 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + package com.amazonaws.services.lambda.runtime.events; import java.nio.ByteBuffer; From 62d58c1f842f20a254b72b692f93bdc5babdb9e1 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 10 Jul 2024 18:52:05 +0530 Subject: [PATCH 27/28] Update MSKFirehoseEventHandler.java added license text. --- .../src/main/java/example/MSKFirehoseEventHandler.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java index 928cd141..f5e51349 100644 --- a/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java +++ b/samples/msk-firehose-event-handler/src/main/java/example/MSKFirehoseEventHandler.java @@ -1,3 +1,8 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + package example; import com.amazonaws.services.lambda.runtime.Context; From 641b3f498af2da56c9dc0cf9dcd6d00561310304 Mon Sep 17 00:00:00 2001 From: Shashank <48707265+ShashankAWS@users.noreply.github.com> Date: Wed, 10 Jul 2024 18:52:43 +0530 Subject: [PATCH 28/28] Update MSKFirehoseEventHandlerTest.java Added license text. --- .../src/test/java/example/MSKFirehoseEventHandlerTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java b/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java index 3de55d54..77223e51 100644 --- a/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java +++ b/samples/msk-firehose-event-handler/src/test/java/example/MSKFirehoseEventHandlerTest.java @@ -1,3 +1,8 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + package example; import com.amazonaws.services.lambda.runtime.Context;