Skip to content

Commit

Permalink
feat: Funqy Amazon Lambda support more Amazon events
Browse files Browse the repository at this point in the history
- Support for SQS, SNS, DynamoDB and Kinesis, with focus on
  the batching feature of AWS. Furthermore, add support for
  CloudEvents.
- Add tests for funqy amazon lambda
- Refactor, so that the implementation is less invasive.
  Using the JacksonInputReader and JacksonOutputWriter for
  the advanced event handling.
- Fix issue in Kinesis event handling
- Add logging
- Advanced event handling is the new default
  • Loading branch information
holomekc committed Jul 18, 2024
1 parent 96e2144 commit 0bc0a66
Show file tree
Hide file tree
Showing 74 changed files with 3,264 additions and 12 deletions.
7 changes: 7 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
<aws-lambda-java.version>1.2.3</aws-lambda-java.version>
<aws-lambda-java-events.version>3.11.6</aws-lambda-java-events.version>
<aws-xray.version>2.17.0</aws-xray.version>
<cloudevents-api.version>3.0.0</cloudevents-api.version>
<azure-functions-java-library.version>3.1.0</azure-functions-java-library.version>
<azure-functions-java-spi.version>1.0.0</azure-functions-java-spi.version>
<kotlin.version>2.0.0</kotlin.version>
Expand Down Expand Up @@ -5817,6 +5818,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<version>${cloudevents-api.version}</version>
</dependency>

<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions extensions/funqy/funqy-amazon-lambda/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.funqy.deployment.bindings;

import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record;

import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.pkg.steps.NativeBuild;
import io.quarkus.funqy.lambda.model.cloudevents.CloudEventDataV1;
import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1;

public class FunqyAmazonLambdaProcessor {

@BuildStep(onlyIf = NativeBuild.class)
public void process(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
reflectiveClass.produce(ReflectiveClassBuildItem.builder(
// io CloudEvents
CloudEventV1.class.getName(),
CloudEventDataV1.class.getName(),
// SQS
SQSEvent.class.getName(),
SQSEvent.SQSMessage.class.getName(),
SQSEvent.MessageAttribute.class.getName(),
SQSBatchResponse.class.getName(),
SQSBatchResponse.BatchItemFailure.class.getName(),
// SNS
SNSEvent.class.getName(),
SNSEvent.SNSRecord.class.getName(),
SNSEvent.SNS.class.getName(),
// Kinesis
KinesisEvent.class.getName(),
KinesisEvent.KinesisEventRecord.class.getName(),
Record.class.getName(),
StreamsEventResponse.class.getName(),
StreamsEventResponse.BatchItemFailure.class.getName(),
// DynamoDB
DynamodbEvent.class.getName(),
DynamodbEvent.DynamodbStreamRecord.class.getName()).constructors().methods().fields().build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.quarkus.funqy.deployment.FunctionBuildItem;
import io.quarkus.funqy.deployment.FunctionInitializedBuildItem;
import io.quarkus.funqy.lambda.FunqyLambdaBindingRecorder;
import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig;
import io.quarkus.funqy.lambda.config.FunqyAmazonConfig;
import io.quarkus.funqy.runtime.FunqyConfig;
import io.quarkus.runtime.LaunchMode;

Expand All @@ -37,17 +39,20 @@ public void init(List<FunctionBuildItem> functions,
BuildProducer<FeatureBuildItem> feature,
Optional<FunctionInitializedBuildItem> hasFunctions,
LambdaObjectMapperInitializedBuildItem mapperDependency,
BeanContainerBuildItem beanContainer) throws Exception {
BeanContainerBuildItem beanContainer,
FunqyAmazonBuildTimeConfig buildTimeConfig) throws Exception {
if (!hasFunctions.isPresent() || hasFunctions.get() == null)
return;
feature.produce(new FeatureBuildItem(FUNQY_AMAZON_LAMBDA));
recorder.init(beanContainer.getValue());
recorder.init(beanContainer.getValue(), buildTimeConfig);
}

@BuildStep
@Record(RUNTIME_INIT)
public RuntimeComplete choose(FunqyConfig config, FunqyLambdaBindingRecorder recorder) {
recorder.chooseInvoker(config);
public RuntimeComplete choose(FunqyConfig config,
FunqyAmazonConfig amazonConfig,
FunqyLambdaBindingRecorder recorder) {
recorder.chooseInvoker(config, amazonConfig);
return new RuntimeComplete();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.quarkus.funqy.test;

import static io.quarkus.funqy.test.util.EventDataProvider.getData;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.test.util.EventDataProvider;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

/**
* Testing that the item-function can handle an event, which just represents the item itself. So no special aws event
* is used as envelope
*/
public class AnyFunctionTest {
@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addAsResource("any-function.properties", "application.properties")
.addAsResource("events/any", "events")
.addClasses(TestFunctions.class, Item.class,
EventDataProvider.class));

@Test
public void should_return_no_failures_if_processing_is_ok() {
// given
var body = getData("ok.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
response.then().statusCode(204);
}

@Test
public void should_return_one_failure_if_processing_fails() {
// given
var body = getData("fail.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
response.then().statusCode(500);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.quarkus.funqy.test;

import static io.quarkus.funqy.test.util.EventDataProvider.getData;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.test.model.BatchItemFailures;
import io.quarkus.funqy.test.model.ItemFailure;
import io.quarkus.funqy.test.util.BodyDeserializer;
import io.quarkus.funqy.test.util.EventDataProvider;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

/**
* Testing that the cloudevents-function with a cloud events specific model can handle events.
*/
public class CloudEventsEventFunctionTest {
@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.overrideRuntimeConfigKey("quarkus.funqy.export", "cloudevents-function")
.withApplicationRoot((jar) -> jar
.addAsResource("item-function.properties", "application.properties")
.addAsResource("events/cloudevents", "events")
.addClasses(TestFunctions.class, Item.class,
BatchItemFailures.class, ItemFailure.class,
EventDataProvider.class, BodyDeserializer.class));

@Inject
BodyDeserializer deserializer;

@Test
public void should_return_no_failures_if_processing_is_ok() {
// given
var body = getData("ok.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), is(empty()));
}

@Test
public void should_return_one_failure_if_processing_fails() {
// given
var body = getData("fail.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), hasSize(1));
assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.quarkus.funqy.test;

import static io.quarkus.funqy.test.util.EventDataProvider.getData;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.test.model.BatchItemFailures;
import io.quarkus.funqy.test.model.ItemFailure;
import io.quarkus.funqy.test.util.BodyDeserializer;
import io.quarkus.funqy.test.util.EventDataProvider;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

/**
* Testing that the item-function with a customer model can handle cloud events.
*/
public class CloudEventsFunctionTest {
@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addAsResource("item-function.properties", "application.properties")
.addAsResource("events/cloudevents", "events")
.addClasses(TestFunctions.class, Item.class,
BatchItemFailures.class, ItemFailure.class,
EventDataProvider.class, BodyDeserializer.class));

@Inject
BodyDeserializer deserializer;

@Test
public void should_return_no_failures_if_processing_is_ok() {
// given
var body = getData("ok.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), is(empty()));
}

@Test
public void should_return_one_failure_if_processing_fails() {
// given
var body = getData("fail.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), hasSize(1));
assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
}
}
Loading

0 comments on commit 0bc0a66

Please sign in to comment.