From 0bc0a660af511cee46569c1d2e72fd8f22621fa2 Mon Sep 17 00:00:00 2001
From: holomekc <30546982+holomekc@users.noreply.github.com>
Date: Wed, 1 May 2024 19:18:51 +0200
Subject: [PATCH] feat: Funqy Amazon Lambda support more Amazon events
- Support for SQS, SNS, DynamoDB and Kinesis, with focus on
the batching feature of AWS. Furthermore, add support for
CloudEvents.
- Add tests for funqy amazon lambda
- Refactor, so that the implementation is less invasive.
Using the JacksonInputReader and JacksonOutputWriter for
the advanced event handling.
- Fix issue in Kinesis event handling
- Add logging
- Advanced event handling is the new default
---
bom/application/pom.xml | 7 +
.../funqy-amazon-lambda/deployment/pom.xml | 10 +
.../bindings/FunqyAmazonLambdaProcessor.java | 46 ++++
.../bindings/FunqyLambdaBuildStep.java | 13 +-
.../quarkus/funqy/test/AnyFunctionTest.java | 52 +++++
.../test/CloudEventsEventFunctionTest.java | 69 ++++++
.../funqy/test/CloudEventsFunctionTest.java | 68 ++++++
.../funqy/test/DynamoDbEventFunctionTest.java | 100 +++++++++
.../funqy/test/DynamoDbFunctionTest.java | 72 +++++++
.../test/java/io/quarkus/funqy/test/Item.java | 24 +++
.../funqy/test/KinesisEventFunctionTest.java | 100 +++++++++
.../funqy/test/KinesisFunctionTest.java | 99 +++++++++
.../funqy/test/SnsEventFunctionTest.java | 53 +++++
.../quarkus/funqy/test/SnsFunctionTest.java | 52 +++++
.../funqy/test/SqsEventFunctionTest.java | 100 +++++++++
.../SqsFunctionNoBatchItemFailuresTest.java | 48 +++++
.../quarkus/funqy/test/SqsFunctionTest.java | 99 +++++++++
.../io/quarkus/funqy/test/TestFunctions.java | 65 ++++++
.../funqy/test/model/BatchItemFailures.java | 6 +
.../quarkus/funqy/test/model/ItemFailure.java | 4 +
.../funqy/test/util/BodyDeserializer.java | 32 +++
.../funqy/test/util/EventDataProvider.java | 19 ++
.../test/resources/any-function.properties | 1 +
.../src/test/resources/events/any/fail.json | 4 +
.../src/test/resources/events/any/ok.json | 4 +
.../resources/events/cloudevents/fail.json | 26 +++
.../test/resources/events/cloudevents/ok.json | 41 ++++
.../test/resources/events/dynamodb/fail.json | 62 ++++++
.../test/resources/events/dynamodb/ok.json | 64 ++++++
.../resources/events/dynamodb/pipes-fail.json | 60 ++++++
.../resources/events/dynamodb/pipes-ok.json | 60 ++++++
.../test/resources/events/kinesis/fail.json | 36 ++++
.../src/test/resources/events/kinesis/ok.json | 36 ++++
.../resources/events/kinesis/pipes-fail.json | 30 +++
.../resources/events/kinesis/pipes-ok.json | 30 +++
.../src/test/resources/events/sns/fail.json | 31 +++
.../src/test/resources/events/sns/ok.json | 31 +++
.../src/test/resources/events/sqs/fail.json | 36 ++++
.../src/test/resources/events/sqs/ok.json | 36 ++++
.../test/resources/events/sqs/pipes-fail.json | 34 +++
.../test/resources/events/sqs/pipes-ok.json | 34 +++
.../test/resources/item-function.properties | 2 +
.../resources/no-batch-function.properties | 3 +
.../funqy/funqy-amazon-lambda/runtime/pom.xml | 14 ++
.../lambda/FunqyLambdaBindingRecorder.java | 55 ++++-
.../AdvancedEventHandlingBuildTimeConfig.java | 17 ++
.../config/AdvancedEventHandlingConfig.java | 30 +++
.../quarkus/funqy/lambda/config/DynamoDb.java | 17 ++
.../config/FunqyAmazonBuildTimeConfig.java | 17 ++
.../lambda/config/FunqyAmazonConfig.java | 17 ++
.../quarkus/funqy/lambda/config/Kinesis.java | 17 ++
.../io/quarkus/funqy/lambda/config/Sns.java | 11 +
.../io/quarkus/funqy/lambda/config/Sqs.java | 17 ++
.../lambda/event/AwsEventInputReader.java | 201 ++++++++++++++++++
.../lambda/event/AwsEventOutputWriter.java | 39 ++++
.../funqy/lambda/event/DateDeserializer.java | 22 ++
.../funqy/lambda/event/EventErrorHandler.java | 29 +++
.../funqy/lambda/event/EventHandler.java | 61 ++++++
.../funqy/lambda/event/EventProcessor.java | 153 +++++++++++++
.../event/cloudevents/CloudEventsHandler.java | 46 ++++
.../event/dynamodb/DynamoDbEventHandler.java | 53 +++++
.../dynamodb/PipesDynamoDbEventHandler.java | 53 +++++
.../event/kinesis/KinesisEventHandler.java | 54 +++++
.../kinesis/PipesKinesisEventHandler.java | 57 +++++
.../lambda/event/sns/SnsEventHandler.java | 49 +++++
.../event/sqs/PipesSqsEventHandler.java | 49 +++++
.../lambda/event/sqs/SqsEventHandler.java | 54 +++++
.../funqy/lambda/model/FunqyMethod.java | 51 +++++
.../model/cloudevents/CloudEventDataV1.java | 27 +++
.../model/cloudevents/CloudEventV1.java | 172 +++++++++++++++
.../model/kinesis/PipesKinesisEvent.java | 63 ++++++
.../lambda/model/pipes/BatchItemFailures.java | 14 ++
.../funqy/lambda/model/pipes/Response.java | 16 ++
.../io/quarkus/funqy/test/GreetTestBase.java | 2 +-
74 files changed, 3264 insertions(+), 12 deletions(-)
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisEventFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/BodyDeserializer.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties
create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventOutputWriter.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/DateDeserializer.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java
create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java
diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index 28bcbfe8b791e..85c28f0054ff3 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -149,6 +149,7 @@
1.2.3
3.11.6
2.17.0
+ 3.0.0
3.1.0
1.0.0
2.0.0
@@ -5817,6 +5818,12 @@
+
+ io.cloudevents
+ cloudevents-api
+ ${cloudevents-api.version}
+
+
com.microsoft.azure.functions
azure-functions-java-library
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml b/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml
index 2a03684b9a675..159c1b235e5ad 100644
--- a/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml
@@ -29,6 +29,16 @@
io.quarkus
quarkus-arc-deployment
+
+ io.quarkus
+ quarkus-junit5-internal
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java
new file mode 100644
index 0000000000000..298f90d74d0e7
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java
@@ -0,0 +1,46 @@
+package io.quarkus.funqy.deployment.bindings;
+
+import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.amazonaws.services.lambda.runtime.events.SNSEvent;
+import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record;
+
+import io.quarkus.deployment.annotations.BuildProducer;
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
+import io.quarkus.deployment.pkg.steps.NativeBuild;
+import io.quarkus.funqy.lambda.model.cloudevents.CloudEventDataV1;
+import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1;
+
+public class FunqyAmazonLambdaProcessor {
+
+ @BuildStep(onlyIf = NativeBuild.class)
+ public void process(BuildProducer reflectiveClass) {
+ reflectiveClass.produce(ReflectiveClassBuildItem.builder(
+ // io CloudEvents
+ CloudEventV1.class.getName(),
+ CloudEventDataV1.class.getName(),
+ // SQS
+ SQSEvent.class.getName(),
+ SQSEvent.SQSMessage.class.getName(),
+ SQSEvent.MessageAttribute.class.getName(),
+ SQSBatchResponse.class.getName(),
+ SQSBatchResponse.BatchItemFailure.class.getName(),
+ // SNS
+ SNSEvent.class.getName(),
+ SNSEvent.SNSRecord.class.getName(),
+ SNSEvent.SNS.class.getName(),
+ // Kinesis
+ KinesisEvent.class.getName(),
+ KinesisEvent.KinesisEventRecord.class.getName(),
+ Record.class.getName(),
+ StreamsEventResponse.class.getName(),
+ StreamsEventResponse.BatchItemFailure.class.getName(),
+ // DynamoDB
+ DynamodbEvent.class.getName(),
+ DynamodbEvent.DynamodbStreamRecord.class.getName()).constructors().methods().fields().build());
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java
index 6a8bcbdc213d9..9633e155a1d49 100644
--- a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java
@@ -20,6 +20,8 @@
import io.quarkus.funqy.deployment.FunctionBuildItem;
import io.quarkus.funqy.deployment.FunctionInitializedBuildItem;
import io.quarkus.funqy.lambda.FunqyLambdaBindingRecorder;
+import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig;
+import io.quarkus.funqy.lambda.config.FunqyAmazonConfig;
import io.quarkus.funqy.runtime.FunqyConfig;
import io.quarkus.runtime.LaunchMode;
@@ -37,17 +39,20 @@ public void init(List functions,
BuildProducer feature,
Optional hasFunctions,
LambdaObjectMapperInitializedBuildItem mapperDependency,
- BeanContainerBuildItem beanContainer) throws Exception {
+ BeanContainerBuildItem beanContainer,
+ FunqyAmazonBuildTimeConfig buildTimeConfig) throws Exception {
if (!hasFunctions.isPresent() || hasFunctions.get() == null)
return;
feature.produce(new FeatureBuildItem(FUNQY_AMAZON_LAMBDA));
- recorder.init(beanContainer.getValue());
+ recorder.init(beanContainer.getValue(), buildTimeConfig);
}
@BuildStep
@Record(RUNTIME_INIT)
- public RuntimeComplete choose(FunqyConfig config, FunqyLambdaBindingRecorder recorder) {
- recorder.chooseInvoker(config);
+ public RuntimeComplete choose(FunqyConfig config,
+ FunqyAmazonConfig amazonConfig,
+ FunqyLambdaBindingRecorder recorder) {
+ recorder.chooseInvoker(config, amazonConfig);
return new RuntimeComplete();
}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java
new file mode 100644
index 0000000000000..b80bcdb19343a
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java
@@ -0,0 +1,52 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the item-function can handle an event, which just represents the item itself. So no special aws event
+ * is used as envelope
+ */
+public class AnyFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("any-function.properties", "application.properties")
+ .addAsResource("events/any", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ EventDataProvider.class));
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ response.then().statusCode(204);
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ response.then().statusCode(500);
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java
new file mode 100644
index 0000000000000..97f6c877a05e0
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java
@@ -0,0 +1,69 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.model.BatchItemFailures;
+import io.quarkus.funqy.test.model.ItemFailure;
+import io.quarkus.funqy.test.util.BodyDeserializer;
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the cloudevents-function with a cloud events specific model can handle events.
+ */
+public class CloudEventsEventFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .overrideRuntimeConfigKey("quarkus.funqy.export", "cloudevents-function")
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/cloudevents", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ BatchItemFailures.class, ItemFailure.class,
+ EventDataProvider.class, BodyDeserializer.class));
+
+ @Inject
+ BodyDeserializer deserializer;
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java
new file mode 100644
index 0000000000000..793729fbd1fbf
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java
@@ -0,0 +1,68 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.model.BatchItemFailures;
+import io.quarkus.funqy.test.model.ItemFailure;
+import io.quarkus.funqy.test.util.BodyDeserializer;
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the item-function with a customer model can handle cloud events.
+ */
+public class CloudEventsFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/cloudevents", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ BatchItemFailures.class, ItemFailure.class,
+ EventDataProvider.class, BodyDeserializer.class));
+
+ @Inject
+ BodyDeserializer deserializer;
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java
new file mode 100644
index 0000000000000..28f735375f9ab
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java
@@ -0,0 +1,100 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.model.BatchItemFailures;
+import io.quarkus.funqy.test.model.ItemFailure;
+import io.quarkus.funqy.test.util.BodyDeserializer;
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the dynamodb-function with a customer model can handle cloud events.
+ */
+public class DynamoDbEventFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .overrideRuntimeConfigKey("quarkus.funqy.export", "dynamodb-function")
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/dynamodb", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ BatchItemFailures.class, ItemFailure.class,
+ EventDataProvider.class, BodyDeserializer.class));
+
+ @Inject
+ BodyDeserializer deserializer;
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+
+ @Test
+ public void should_return_no_failures_if_processing_pipes_is_ok() {
+ // given
+ var body = getData("pipes-ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_pipes_fails() {
+ // given
+ var body = getData("pipes-fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java
new file mode 100644
index 0000000000000..9278289417046
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java
@@ -0,0 +1,72 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.model.BatchItemFailures;
+import io.quarkus.funqy.test.model.ItemFailure;
+import io.quarkus.funqy.test.util.BodyDeserializer;
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the item-function with a customer model cannot handle dynamodb events. Due to the structure we cannot
+ * really guess which data is relevant for the customer. But the impl will allow to use batching.
+ */
+public class DynamoDbFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/dynamodb", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ BatchItemFailures.class, ItemFailure.class,
+ EventDataProvider.class, BodyDeserializer.class));
+
+ @Inject
+ BodyDeserializer deserializer;
+
+ @Test
+ public void should_fail_on_dynamodb_event_without_dynamodb_event_type() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ // It is not supported to transform the DynamoDB event record to an internal model. Therefore, if somebody
+ // would try this, the lambda would return every message as failure in batch item failures and log an error.
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(2));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItems("1", "2"));
+ }
+
+ @Test
+ public void should_fail_on_dynamodb_event_via_pipes_without_dynamodb_event_type() {
+ // given
+ var body = getData("pipes-ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ // It is not supported to transform the DynamoDB event record to an internal model. Therefore, if somebody
+ // would try this, the lambda would return every message as failure in batch item failures and log an error.
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(2));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItems("1", "2"));
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java
new file mode 100644
index 0000000000000..26975491bde6d
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java
@@ -0,0 +1,24 @@
+package io.quarkus.funqy.test;
+
+public class Item {
+
+ String message;
+
+ boolean throwError;
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(final String message) {
+ this.message = message;
+ }
+
+ public boolean isThrowError() {
+ return throwError;
+ }
+
+ public void setThrowError(final boolean throwError) {
+ this.throwError = throwError;
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisEventFunctionTest.java
new file mode 100644
index 0000000000000..cfc283165bf2c
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisEventFunctionTest.java
@@ -0,0 +1,100 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.model.BatchItemFailures;
+import io.quarkus.funqy.test.model.ItemFailure;
+import io.quarkus.funqy.test.util.BodyDeserializer;
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the kinesis-function with a kinesis specific model can handle events.
+ */
+public class KinesisEventFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .overrideRuntimeConfigKey("quarkus.funqy.export", "kinesis-function")
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/kinesis", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ BatchItemFailures.class, ItemFailure.class,
+ EventDataProvider.class, BodyDeserializer.class));
+
+ @Inject
+ BodyDeserializer deserializer;
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+
+ @Test
+ public void should_return_no_failures_if_processing_pipes_is_ok() {
+ // given
+ var body = getData("pipes-ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_pipes_fails() {
+ // given
+ var body = getData("pipes-fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java
new file mode 100644
index 0000000000000..40d7ad0f216a0
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java
@@ -0,0 +1,99 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.model.BatchItemFailures;
+import io.quarkus.funqy.test.model.ItemFailure;
+import io.quarkus.funqy.test.util.BodyDeserializer;
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the item-function with a customer model can handle kinesis events.
+ */
+public class KinesisFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/kinesis", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ BatchItemFailures.class, ItemFailure.class,
+ EventDataProvider.class, BodyDeserializer.class));
+
+ @Inject
+ BodyDeserializer deserializer;
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+
+ @Test
+ public void should_return_no_failures_if_processing_pipes_is_ok() {
+ // given
+ var body = getData("pipes-ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_pipes_fails() {
+ // given
+ var body = getData("pipes-fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java
new file mode 100644
index 0000000000000..d95c66fd727b6
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java
@@ -0,0 +1,53 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the sns-function with an sns specific model can handle events.
+ */
+public class SnsEventFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .overrideRuntimeConfigKey("quarkus.funqy.export", "sns-function")
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/sns", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ EventDataProvider.class));
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ response.then().statusCode(204);
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ // SNS triggers have no error handling.
+ response.then().statusCode(204);
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java
new file mode 100644
index 0000000000000..7297a7452b6c6
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java
@@ -0,0 +1,52 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the item-function with a customer model can handle sns events.
+ */
+public class SnsFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/sns", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ EventDataProvider.class));
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ response.then().statusCode(204);
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ // SNS triggers have no error handling.
+ response.then().statusCode(204);
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java
new file mode 100644
index 0000000000000..88b6852b52158
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java
@@ -0,0 +1,100 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.model.BatchItemFailures;
+import io.quarkus.funqy.test.model.ItemFailure;
+import io.quarkus.funqy.test.util.BodyDeserializer;
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the sqs-function with an sqs specific model can handle events.
+ */
+public class SqsEventFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .overrideRuntimeConfigKey("quarkus.funqy.export", "sqs-function")
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/sqs", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ BatchItemFailures.class, ItemFailure.class,
+ EventDataProvider.class, BodyDeserializer.class));
+
+ @Inject
+ BodyDeserializer deserializer;
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+
+ @Test
+ public void should_return_no_failures_if_processing_pipes_is_ok() {
+ // given
+ var body = getData("pipes-ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_pipes_fails() {
+ // given
+ var body = getData("pipes-fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java
new file mode 100644
index 0000000000000..89bf3e585587d
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java
@@ -0,0 +1,48 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+public class SqsFunctionNoBatchItemFailuresTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("no-batch-function.properties", "application.properties")
+ .addAsResource("events/sqs", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ EventDataProvider.class));
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ response.then().statusCode(204);
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ response.then().statusCode(204);
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java
new file mode 100644
index 0000000000000..a8ec58a725dea
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java
@@ -0,0 +1,99 @@
+package io.quarkus.funqy.test;
+
+import static io.quarkus.funqy.test.util.EventDataProvider.getData;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.funqy.test.model.BatchItemFailures;
+import io.quarkus.funqy.test.model.ItemFailure;
+import io.quarkus.funqy.test.util.BodyDeserializer;
+import io.quarkus.funqy.test.util.EventDataProvider;
+import io.quarkus.test.QuarkusUnitTest;
+import io.restassured.RestAssured;
+
+/**
+ * Testing that the item-function with a customer model can handle sqs events.
+ */
+public class SqsFunctionTest {
+ @RegisterExtension
+ static QuarkusUnitTest test = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addAsResource("item-function.properties", "application.properties")
+ .addAsResource("events/sqs", "events")
+ .addClasses(TestFunctions.class, Item.class,
+ BatchItemFailures.class, ItemFailure.class,
+ EventDataProvider.class, BodyDeserializer.class));
+
+ @Inject
+ BodyDeserializer deserializer;
+
+ @Test
+ public void should_return_no_failures_if_processing_is_ok() {
+ // given
+ var body = getData("ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_fails() {
+ // given
+ var body = getData("fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+
+ @Test
+ public void should_return_no_failures_if_processing_pipes_is_ok() {
+ // given
+ var body = getData("pipes-ok.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), is(empty()));
+ }
+
+ @Test
+ public void should_return_one_failure_if_processing_pipes_fails() {
+ // given
+ var body = getData("pipes-fail.json");
+
+ // when
+ var response = RestAssured.given().contentType("application/json")
+ .body(body)
+ .post("/");
+
+ // then
+ var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
+ assertThat(respBody.batchItemFailures(), hasSize(1));
+ assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java
new file mode 100644
index 0000000000000..18a47c49c37ce
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java
@@ -0,0 +1,65 @@
+package io.quarkus.funqy.test;
+
+import java.nio.charset.StandardCharsets;
+
+import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.amazonaws.services.lambda.runtime.events.SNSEvent;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+
+import io.cloudevents.CloudEvent;
+import io.quarkus.funqy.Funq;
+import io.smallrye.mutiny.Uni;
+
+public class TestFunctions {
+
+ @Funq("item-function")
+ public Uni itemFunction(Item item) {
+ if (item.isThrowError()) {
+ return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
+ }
+ return Uni.createFrom().voidItem();
+ }
+
+ @Funq("sqs-function")
+ public Uni sqsFunction(SQSEvent.SQSMessage msg) {
+ if (msg.getBody().contains("true")) {
+ return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
+ }
+ return Uni.createFrom().voidItem();
+ }
+
+ @Funq("sns-function")
+ public Uni snsFunction(SNSEvent.SNSRecord msg) {
+ if (msg.getSNS().getMessage().contains("true")) {
+ return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
+ }
+ return Uni.createFrom().voidItem();
+ }
+
+ @Funq("cloudevents-function")
+ public Uni cloudEventsFunction(CloudEvent msg) {
+ // Due to jackson deserialization the base64 decoding already happened.
+ if (new String(msg.getData().toBytes(), StandardCharsets.UTF_8).contains("true")) {
+ return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
+ }
+ return Uni.createFrom().voidItem();
+ }
+
+ @Funq("kinesis-function")
+ public Uni kinesisFunction(KinesisEvent.Record msg) {
+ // Due to jackson deserialization the base64 decoding already happened.
+ if (StandardCharsets.UTF_8.decode(msg.getData()).toString().contains("true")) {
+ return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
+ }
+ return Uni.createFrom().voidItem();
+ }
+
+ @Funq("dynamodb-function")
+ public Uni dynamodbFunction(DynamodbEvent.DynamodbStreamRecord msg) {
+ if (msg.getDynamodb().getNewImage().get("ThrowError").getBOOL()) {
+ return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
+ }
+ return Uni.createFrom().voidItem();
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java
new file mode 100644
index 0000000000000..4914a34a5f801
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java
@@ -0,0 +1,6 @@
+package io.quarkus.funqy.test.model;
+
+import java.util.List;
+
+public record BatchItemFailures(List batchItemFailures) {
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java
new file mode 100644
index 0000000000000..bd08b653841e2
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java
@@ -0,0 +1,4 @@
+package io.quarkus.funqy.test.model;
+
+public record ItemFailure(String itemIdentifier) {
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/BodyDeserializer.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/BodyDeserializer.java
new file mode 100644
index 0000000000000..c69081d484c72
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/BodyDeserializer.java
@@ -0,0 +1,32 @@
+package io.quarkus.funqy.test.util;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.restassured.response.ValidatableResponse;
+
+@ApplicationScoped
+public class BodyDeserializer {
+
+ @Inject
+ ObjectMapper objectMapper;
+
+ /**
+ * Allows to deserialize the response provided by RestAssured to the specified class.
+ *
+ * @param response RestAssured response
+ * @param clazz class to deserialize to
+ * @return the deserialized class
+ * @param type of the class to deserialize to
+ */
+ public T getBodyAs(ValidatableResponse response, Class clazz) {
+ try {
+ return objectMapper.readValue(response.extract().body().asString(), clazz);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java
new file mode 100644
index 0000000000000..17e1efadc5fe1
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java
@@ -0,0 +1,19 @@
+package io.quarkus.funqy.test.util;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.commons.io.IOUtils;
+
+public class EventDataProvider {
+
+ public static String getData(String path) {
+ try {
+ return IOUtils.toString(
+ EventDataProvider.class.getClassLoader().getResourceAsStream("events/" + path),
+ Charset.defaultCharset());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties
new file mode 100644
index 0000000000000..e9be9dfb08bdd
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties
@@ -0,0 +1 @@
+quarkus.funqy.export=item-function
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json
new file mode 100644
index 0000000000000..2ec250d2e81a1
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json
@@ -0,0 +1,4 @@
+{
+ "message": "hello",
+ "throwError": true
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json
new file mode 100644
index 0000000000000..47ae6aedbde1d
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json
@@ -0,0 +1,4 @@
+{
+ "message": "hello",
+ "throwError": false
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json
new file mode 100644
index 0000000000000..62f32e16345e3
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json
@@ -0,0 +1,26 @@
+[
+ {
+ "specversion" : "1.0",
+ "type" : "com.github.pull_request.opened",
+ "source" : "https://github.com/cloudevents/spec/pull",
+ "subject" : "123",
+ "id" : "1",
+ "time" : "2018-04-05T17:31:00Z",
+ "comexampleextension1" : "value",
+ "comexampleothervalue" : 5,
+ "datacontenttype" : "text/plain",
+ "data" : "{\"message\":\"hello\",\"throwError\":true}"
+ },
+ {
+ "specversion" : "1.0",
+ "type" : "com.github.pull_request.opened",
+ "source" : "https://github.com/cloudevents/spec/pull",
+ "subject" : "123",
+ "id" : "2",
+ "time" : "2018-04-05T17:31:00Z",
+ "comexampleextension1" : "value",
+ "comexampleothervalue" : 5,
+ "datacontenttype" : "text/plain",
+ "data" : "{\"message\":\"fail\",\"throwError\":false}"
+ }
+]
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json
new file mode 100644
index 0000000000000..e49c9ddcece01
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json
@@ -0,0 +1,41 @@
+[
+ {
+ "specversion" : "1.0",
+ "type" : "com.github.pull_request.opened",
+ "source" : "https://github.com/cloudevents/spec/pull",
+ "subject" : "123",
+ "id" : "1",
+ "time" : "2018-04-05T17:31:00Z",
+ "comexampleextension1" : "value",
+ "comexampleothervalue" : 5,
+ "datacontenttype" : "text/plain",
+ "data" : "{\"message\":\"hello\",\"throwError\":false}"
+ },
+ {
+ "specversion" : "1.0",
+ "type" : "com.github.pull_request.opened",
+ "source" : "https://github.com/cloudevents/spec/pull",
+ "subject" : "123",
+ "id" : "2",
+ "time" : "2018-04-05T17:31:00Z",
+ "comexampleextension1" : "value",
+ "comexampleothervalue" : 5,
+ "datacontenttype" : "application/json",
+ "data" : {
+ "message": "ok",
+ "throwError": false
+ }
+ },
+ {
+ "specversion" : "1.0",
+ "type" : "com.github.pull_request.opened",
+ "source" : "https://github.com/cloudevents/spec/pull",
+ "subject" : "123",
+ "id" : "3",
+ "time" : "2018-04-05T17:31:00Z",
+ "comexampleextension1" : "value",
+ "comexampleothervalue" : 5,
+ "datacontenttype" : "text/xml",
+ "data_base64" : "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ=="
+ }
+]
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json
new file mode 100644
index 0000000000000..9bb03b7bc1489
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json
@@ -0,0 +1,62 @@
+{
+ "Records": [
+ {
+ "eventID": "1",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "Keys": {
+ "Id": {
+ "N": "1"
+ }
+ },
+ "NewImage": {
+ "Message": {
+ "S": "hello"
+ },
+ "ThrowError": {
+ "BOOL": true
+ },
+ "Id": {
+ "N": "1"
+ }
+ },
+ "StreamViewType": "NEW_AND_OLD_IMAGES",
+ "SequenceNumber": "1",
+ "SizeBytes": 26
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525",
+ "eventSource": "aws:dynamodb"
+ },
+ {
+ "eventID": "2",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "NewImage": {
+ "Message": {
+ "S": "fail"
+ },
+ "ThrowError": {
+ "BOOL": false
+ },
+ "Id": {
+ "N": "2"
+ }
+ },
+ "SequenceNumber": "2",
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "SizeBytes": 59,
+ "StreamViewType": "NEW_AND_OLD_IMAGES"
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525",
+ "eventSource": "aws:dynamodb"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json
new file mode 100644
index 0000000000000..722e5aa15662c
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json
@@ -0,0 +1,64 @@
+{
+ "Records": [
+ {
+ "eventID": "1",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "ApproximateCreationDateTime": 1719318377.0,
+ "Keys": {
+ "Id": {
+ "N": "1"
+ }
+ },
+ "NewImage": {
+ "Message": {
+ "S": "hello"
+ },
+ "ThrowError": {
+ "BOOL": false
+ },
+ "Id": {
+ "N": "1"
+ }
+ },
+ "StreamViewType": "NEW_AND_OLD_IMAGES",
+ "SequenceNumber": "1",
+ "SizeBytes": 26
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525",
+ "eventSource": "aws:dynamodb"
+ },
+ {
+ "eventID": "2",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "ApproximateCreationDateTime": 1719318377.0,
+ "NewImage": {
+ "Message": {
+ "S": "fail"
+ },
+ "ThrowError": {
+ "BOOL": false
+ },
+ "Id": {
+ "N": "2"
+ }
+ },
+ "SequenceNumber": "2",
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "SizeBytes": 59,
+ "StreamViewType": "NEW_AND_OLD_IMAGES"
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525",
+ "eventSource": "aws:dynamodb"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json
new file mode 100644
index 0000000000000..98d791b29e8b5
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json
@@ -0,0 +1,60 @@
+[
+ {
+ "eventID": "1",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "Keys": {
+ "Id": {
+ "N": "1"
+ }
+ },
+ "NewImage": {
+ "Message": {
+ "S": "hello"
+ },
+ "ThrowError": {
+ "BOOL": true
+ },
+ "Id": {
+ "N": "1"
+ }
+ },
+ "StreamViewType": "NEW_AND_OLD_IMAGES",
+ "SequenceNumber": "1",
+ "SizeBytes": 26
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525",
+ "eventSource": "aws:dynamodb"
+ },
+ {
+ "eventID": "2",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "NewImage": {
+ "Message": {
+ "S": "fail"
+ },
+ "ThrowError": {
+ "BOOL": false
+ },
+ "Id": {
+ "N": "2"
+ }
+ },
+ "SequenceNumber": "2",
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "SizeBytes": 59,
+ "StreamViewType": "NEW_AND_OLD_IMAGES"
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525",
+ "eventSource": "aws:dynamodb"
+ }
+]
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json
new file mode 100644
index 0000000000000..d0d3816b1496c
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json
@@ -0,0 +1,60 @@
+[
+ {
+ "eventID": "1",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "Keys": {
+ "Id": {
+ "N": "1"
+ }
+ },
+ "NewImage": {
+ "Message": {
+ "S": "hello"
+ },
+ "ThrowError": {
+ "BOOL": false
+ },
+ "Id": {
+ "N": "1"
+ }
+ },
+ "StreamViewType": "NEW_AND_OLD_IMAGES",
+ "SequenceNumber": "1",
+ "SizeBytes": 26
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525",
+ "eventSource": "aws:dynamodb"
+ },
+ {
+ "eventID": "2",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "NewImage": {
+ "Message": {
+ "S": "fail"
+ },
+ "ThrowError": {
+ "BOOL": false
+ },
+ "Id": {
+ "N": "2"
+ }
+ },
+ "SequenceNumber": "2",
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "SizeBytes": 59,
+ "StreamViewType": "NEW_AND_OLD_IMAGES"
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525",
+ "eventSource": "aws:dynamodb"
+ }
+]
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json
new file mode 100644
index 0000000000000..061f8dd6b880b
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json
@@ -0,0 +1,36 @@
+{
+ "Records": [
+ {
+ "kinesis": {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "1",
+ "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjp0cnVlfQ==",
+ "approximateArrivalTimestamp": 1545084650.987
+ },
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ },
+ {
+ "kinesis": {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "2",
+ "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==",
+ "approximateArrivalTimestamp": 1545084711.166
+ },
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json
new file mode 100644
index 0000000000000..c8550929724b1
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json
@@ -0,0 +1,36 @@
+{
+ "Records": [
+ {
+ "kinesis": {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "1",
+ "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjpmYWxzZX0=",
+ "approximateArrivalTimestamp": 1545084650.987
+ },
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ },
+ {
+ "kinesis": {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "2",
+ "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==",
+ "approximateArrivalTimestamp": 1545084711.166
+ },
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json
new file mode 100644
index 0000000000000..3cc956e1cee0d
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json
@@ -0,0 +1,30 @@
+[
+ {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "1",
+ "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjp0cnVlfQ==",
+ "approximateArrivalTimestamp": 1545084650.987,
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ },
+ {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "2",
+ "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==",
+ "approximateArrivalTimestamp": 1545084711.166,
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ }
+]
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json
new file mode 100644
index 0000000000000..1a40ebe10175d
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json
@@ -0,0 +1,30 @@
+[
+ {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "1",
+ "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjpmYWxzZX0=",
+ "approximateArrivalTimestamp": 1545084650.987,
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ },
+ {
+ "kinesisSchemaVersion": "1.0",
+ "partitionKey": "1",
+ "sequenceNumber": "2",
+ "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==",
+ "approximateArrivalTimestamp": 1545084711.166,
+ "eventSource": "aws:kinesis",
+ "eventVersion": "1.0",
+ "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
+ "eventName": "aws:kinesis:record",
+ "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
+ "awsRegion": "us-east-2",
+ "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
+ }
+]
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json
new file mode 100644
index 0000000000000..4643568fdfe33
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json
@@ -0,0 +1,31 @@
+{
+ "Records": [
+ {
+ "EventVersion": "1.0",
+ "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
+ "EventSource": "aws:sns",
+ "Sns": {
+ "SignatureVersion": "1",
+ "Timestamp": "2019-01-02T12:45:07.000Z",
+ "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==",
+ "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
+ "MessageId": "1",
+ "Message": "{\"message\":\"hello\",\"throwError\":true}",
+ "MessageAttributes": {
+ "Test": {
+ "Type": "String",
+ "Value": "TestString"
+ },
+ "TestBinary": {
+ "Type": "Binary",
+ "Value": "TestBinary"
+ }
+ },
+ "Type": "Notification",
+ "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
+ "TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda",
+ "Subject": "TestInvoke"
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json
new file mode 100644
index 0000000000000..c97c6511cbab2
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json
@@ -0,0 +1,31 @@
+{
+ "Records": [
+ {
+ "EventVersion": "1.0",
+ "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
+ "EventSource": "aws:sns",
+ "Sns": {
+ "SignatureVersion": "1",
+ "Timestamp": "2019-01-02T12:45:07.000Z",
+ "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==",
+ "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
+ "MessageId": "1",
+ "Message": "{\"message\":\"hello\",\"throwError\":false}",
+ "MessageAttributes": {
+ "Test": {
+ "Type": "String",
+ "Value": "TestString"
+ },
+ "TestBinary": {
+ "Type": "Binary",
+ "Value": "TestBinary"
+ }
+ },
+ "Type": "Notification",
+ "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
+ "TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda",
+ "Subject": "TestInvoke"
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json
new file mode 100644
index 0000000000000..b6b52c29187ec
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json
@@ -0,0 +1,36 @@
+{
+ "Records": [
+ {
+ "messageId": "1",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
+ "body": "{\"message\":\"hello\",\"throwError\":true}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
+ "awsRegion": "us-east-2"
+ },
+ {
+ "messageId": "2",
+ "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
+ "body": "{\"message\":\"fail\",\"throwError\":false}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082650636",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082650649"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
+ "awsRegion": "us-east-2"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json
new file mode 100644
index 0000000000000..a5b3b93714505
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json
@@ -0,0 +1,36 @@
+{
+ "Records": [
+ {
+ "messageId": "1",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
+ "body": "{\"message\":\"hello\",\"throwError\":false}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
+ "awsRegion": "us-east-2"
+ },
+ {
+ "messageId": "2",
+ "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
+ "body": "{\"message\":\"fail\",\"throwError\":false}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082650636",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082650649"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
+ "awsRegion": "us-east-2"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json
new file mode 100644
index 0000000000000..76e02d9b27aaa
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json
@@ -0,0 +1,34 @@
+[
+ {
+ "messageId": "1",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
+ "body": "{\"message\":\"hello\",\"throwError\":true}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
+ "awsRegion": "us-east-2"
+ },
+ {
+ "messageId": "2",
+ "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
+ "body": "{\"message\":\"fail\",\"throwError\":false}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082650636",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082650649"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
+ "awsRegion": "us-east-2"
+ }
+]
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json
new file mode 100644
index 0000000000000..83f6fb1449950
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json
@@ -0,0 +1,34 @@
+[
+ {
+ "messageId": "1",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
+ "body": "{\"message\":\"hello\",\"throwError\":false}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
+ "awsRegion": "us-east-2"
+ },
+ {
+ "messageId": "2",
+ "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
+ "body": "{\"message\":\"fail\",\"throwError\":false}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082650636",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082650649"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
+ "awsRegion": "us-east-2"
+ }
+]
\ No newline at end of file
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties
new file mode 100644
index 0000000000000..3e2957b10e52c
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties
@@ -0,0 +1,2 @@
+quarkus.funqy.export=item-function
+quarkus.funqy.amazon-lambda.advanced-event-handling.enabled=true
diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties
new file mode 100644
index 0000000000000..ca2d666955c8f
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties
@@ -0,0 +1,3 @@
+quarkus.funqy.export=item-function
+quarkus.funqy.amazon-lambda.advanced-event-handling.enabled=true
+quarkus.funqy.amazon-lambda.advanced-event-handling.sqs.report-batch-item-failures=false
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml b/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml
index 4366bd5e63b3d..ccf8a40910d24 100644
--- a/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml
@@ -26,6 +26,20 @@
io.quarkus
quarkus-jackson
+
+ com.amazonaws
+ aws-lambda-java-events
+
+
+ joda-time
+ joda-time
+
+
+
+
+ io.cloudevents
+ cloudevents-api
+
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java
index d2726940abd85..29ae0fd378bb6 100644
--- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java
@@ -21,6 +21,11 @@
import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter;
import io.quarkus.arc.ManagedContext;
import io.quarkus.arc.runtime.BeanContainer;
+import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig;
+import io.quarkus.funqy.lambda.config.FunqyAmazonConfig;
+import io.quarkus.funqy.lambda.event.AwsEventInputReader;
+import io.quarkus.funqy.lambda.event.AwsEventOutputWriter;
+import io.quarkus.funqy.lambda.event.EventProcessor;
import io.quarkus.funqy.runtime.FunctionConstructor;
import io.quarkus.funqy.runtime.FunctionInvoker;
import io.quarkus.funqy.runtime.FunctionRecorder;
@@ -42,11 +47,15 @@ public class FunqyLambdaBindingRecorder {
private static BeanContainer beanContainer;
private static LambdaInputReader reader;
private static LambdaOutputWriter writer;
+ private static EventProcessor eventProcessor;
+ private static FunqyAmazonBuildTimeConfig amazonBuildTimeConfig;
- public void init(BeanContainer bc) {
+ public void init(BeanContainer bc, FunqyAmazonBuildTimeConfig buildTimeConfig) {
beanContainer = bc;
FunctionConstructor.CONTAINER = bc;
+ amazonBuildTimeConfig = buildTimeConfig;
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper;
+
for (FunctionInvoker invoker : FunctionRecorder.registry.invokers()) {
if (invoker.hasInput()) {
JavaType javaInputType = objectMapper.constructType(invoker.getInputType());
@@ -61,7 +70,7 @@ public void init(BeanContainer bc) {
}
}
- public void chooseInvoker(FunqyConfig config) {
+ public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) {
// this is done at Runtime so that we can change it with an environment variable.
if (config.export.isPresent()) {
invoker = FunctionRecorder.registry.matchInvoker(config.export.get());
@@ -76,35 +85,59 @@ public void chooseInvoker(FunqyConfig config) {
} else {
invoker = FunctionRecorder.registry.invokers().iterator().next();
}
+
+ ObjectReader objectReader = null;
if (invoker.hasInput()) {
- reader = new JacksonInputReader((ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName()));
+ objectReader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());
+
+ if (amazonBuildTimeConfig.advancedEventHandling().enabled()) {
+ // We create a copy, because the mapper will be reconfigured for the advanced event handling,
+ // and we do not want to adjust the ObjectMapper, which is available in arc context.
+ ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy();
+ reader = new AwsEventInputReader(objectMapper, objectReader, amazonBuildTimeConfig);
+ } else {
+ reader = new JacksonInputReader(objectReader);
+ }
+
}
if (invoker.hasOutput()) {
- writer = new JacksonOutputWriter((ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName()));
+ ObjectWriter objectWriter = (ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName());
+
+ if (!amazonBuildTimeConfig.advancedEventHandling().enabled()) {
+ writer = new JacksonOutputWriter(objectWriter);
+ }
}
+ if (amazonBuildTimeConfig.advancedEventHandling().enabled()) {
+ ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy();
+ writer = new AwsEventOutputWriter(objectMapper);
+ eventProcessor = new EventProcessor(objectReader, amazonBuildTimeConfig, amazonConfig);
+ }
}
/**
* Called by JVM handler wrapper
*
* @param inputStream
+ * {@link InputStream} of the AWS SDK {@link com.amazonaws.services.lambda.runtime.RequestStreamHandler}
* @param outputStream
+ * {@link OutputStream} of the AWS SDK {@link com.amazonaws.services.lambda.runtime.RequestStreamHandler}
* @param context
+ * AWS context information provided to the Lambda
* @throws IOException
+ * Is thrown in case the (de)serialization fails
*/
public static void handle(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
Object input = null;
if (invoker.hasInput()) {
input = reader.readValue(inputStream);
}
- FunqyServerResponse response = dispatch(input);
+ FunqyServerResponse response = dispatch(input, context);
Object value = response.getOutput().await().indefinitely();
if (value != null) {
writer.writeValue(outputStream, value);
}
-
}
@SuppressWarnings("rawtypes")
@@ -114,7 +147,7 @@ public void startPollLoop(ShutdownContext context, LaunchMode launchMode) {
@Override
protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception {
- FunqyServerResponse response = dispatch(input);
+ FunqyServerResponse response = dispatch(input, context);
return response.getOutput().await().indefinitely();
}
@@ -143,6 +176,14 @@ protected void processRequest(InputStream input, OutputStream output, AmazonLamb
}
+ private static FunqyServerResponse dispatch(Object input, Context context) throws IOException {
+ if (eventProcessor != null) {
+ return eventProcessor.handle(input, FunqyLambdaBindingRecorder::dispatch, context);
+ } else {
+ return dispatch(input);
+ }
+ }
+
private static FunqyServerResponse dispatch(Object input) {
ManagedContext requestContext = beanContainer.requestContext();
requestContext.activate();
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java
new file mode 100644
index 0000000000000..af4f294027748
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java
@@ -0,0 +1,17 @@
+package io.quarkus.funqy.lambda.config;
+
+import io.quarkus.runtime.annotations.ConfigGroup;
+import io.smallrye.config.WithDefault;
+
+/**
+ * Advanced event handling build time configuration
+ */
+@ConfigGroup
+public interface AdvancedEventHandlingBuildTimeConfig {
+
+ /**
+ * If advanced event handling should be enabled
+ */
+ @WithDefault("true")
+ boolean enabled();
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java
new file mode 100644
index 0000000000000..e5ca3b0afacac
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java
@@ -0,0 +1,30 @@
+package io.quarkus.funqy.lambda.config;
+
+import io.quarkus.runtime.annotations.ConfigGroup;
+
+/**
+ * Advanced event handling configuration
+ */
+@ConfigGroup
+public interface AdvancedEventHandlingConfig {
+
+ /**
+ * Sqs related config.
+ */
+ Sqs sqs();
+
+ /**
+ * Sns related config.
+ */
+ Sns sns();
+
+ /**
+ * Kinesis related config.
+ */
+ Kinesis kinesis();
+
+ /**
+ * DynamoDb related config.
+ */
+ DynamoDb dynamoDb();
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java
new file mode 100644
index 0000000000000..6d182d725670b
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java
@@ -0,0 +1,17 @@
+package io.quarkus.funqy.lambda.config;
+
+import io.quarkus.runtime.annotations.ConfigGroup;
+import io.smallrye.config.WithDefault;
+
+/**
+ * Kinesis event config
+ */
+@ConfigGroup
+public interface DynamoDb {
+
+ /**
+ * Allows functions to return partially successful responses for a batch of event records.
+ */
+ @WithDefault("true")
+ boolean reportBatchItemFailures();
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java
new file mode 100644
index 0000000000000..94d8c2da860bf
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java
@@ -0,0 +1,17 @@
+package io.quarkus.funqy.lambda.config;
+
+import io.quarkus.runtime.annotations.ConfigPhase;
+import io.quarkus.runtime.annotations.ConfigRoot;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithName;
+
+@ConfigMapping(prefix = "quarkus.funqy.amazon-lambda")
+@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED)
+public interface FunqyAmazonBuildTimeConfig {
+
+ /**
+ * The advanced event handling config
+ */
+ @WithName("advanced-event-handling")
+ AdvancedEventHandlingBuildTimeConfig advancedEventHandling();
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java
new file mode 100644
index 0000000000000..ede409c6e90d1
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java
@@ -0,0 +1,17 @@
+package io.quarkus.funqy.lambda.config;
+
+import io.quarkus.runtime.annotations.ConfigPhase;
+import io.quarkus.runtime.annotations.ConfigRoot;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithName;
+
+@ConfigMapping(prefix = "quarkus.funqy.amazon-lambda")
+@ConfigRoot(phase = ConfigPhase.RUN_TIME)
+public interface FunqyAmazonConfig {
+
+ /**
+ * The advanced event handling config
+ */
+ @WithName("advanced-event-handling")
+ AdvancedEventHandlingConfig advancedEventHandling();
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java
new file mode 100644
index 0000000000000..b05562d93573a
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java
@@ -0,0 +1,17 @@
+package io.quarkus.funqy.lambda.config;
+
+import io.quarkus.runtime.annotations.ConfigGroup;
+import io.smallrye.config.WithDefault;
+
+/**
+ * Kinesis event config
+ */
+@ConfigGroup
+public interface Kinesis {
+
+ /**
+ * Allows functions to return partially successful responses for a batch of event records.
+ */
+ @WithDefault("true")
+ boolean reportBatchItemFailures();
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java
new file mode 100644
index 0000000000000..cfb273a73550f
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java
@@ -0,0 +1,11 @@
+package io.quarkus.funqy.lambda.config;
+
+import io.quarkus.runtime.annotations.ConfigGroup;
+
+/**
+ * Sns event config
+ */
+@ConfigGroup
+public interface Sns {
+
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java
new file mode 100644
index 0000000000000..75346e96e5865
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java
@@ -0,0 +1,17 @@
+package io.quarkus.funqy.lambda.config;
+
+import io.quarkus.runtime.annotations.ConfigGroup;
+import io.smallrye.config.WithDefault;
+
+/**
+ * Sqs event config
+ */
+@ConfigGroup
+public interface Sqs {
+
+ /**
+ * Allows functions to return partially successful responses for a batch of event records.
+ */
+ @WithDefault("true")
+ boolean reportBatchItemFailures();
+}
diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java
new file mode 100644
index 0000000000000..0aae721182342
--- /dev/null
+++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java
@@ -0,0 +1,201 @@
+package io.quarkus.funqy.lambda.event;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Date;
+import java.util.List;
+
+import org.jboss.logging.Logger;
+
+import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.amazonaws.services.lambda.runtime.events.SNSEvent;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.cloudevents.SpecVersion;
+import io.quarkus.amazon.lambda.runtime.LambdaInputReader;
+import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig;
+import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1;
+import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent;
+
+public class AwsEventInputReader implements LambdaInputReader