Skip to content

Commit

Permalink
feat: Funqy Amazon Lambda support more Amazon events
Browse files Browse the repository at this point in the history
Support for SQS, SNS, DynamoDB and Kinesis, with focus on
the batching feature of AWS. Furthermore, add support for
CloudEvents.
Add tests for funqy amazon lambda
  • Loading branch information
holomekc committed May 1, 2024
1 parent c02cafb commit 2463297
Show file tree
Hide file tree
Showing 72 changed files with 3,001 additions and 41 deletions.
7 changes: 7 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
<aws-lambda-java.version>1.2.3</aws-lambda-java.version>
<aws-lambda-java-events.version>3.11.4</aws-lambda-java-events.version>
<aws-xray.version>2.15.2</aws-xray.version>
<cloudevents-api.version>3.0.0</cloudevents-api.version>
<azure-functions-java-library.version>3.1.0</azure-functions-java-library.version>
<azure-functions-java-spi.version>1.0.0</azure-functions-java-spi.version>
<kotlin.version>1.9.23</kotlin.version>
Expand Down Expand Up @@ -5678,6 +5679,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<version>${cloudevents-api.version}</version>
</dependency>

<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void run() {
URL url = AmazonLambdaApi.invocationResponse(baseUrl, requestId);
if (isStream()) {
HttpURLConnection responseConnection = responseStream(url);
responseConnection.setRequestProperty("Content-Type", "application/json");
if (running.get()) {
processRequest(requestConnection.getInputStream(), responseConnection.getOutputStream(),
createContext(requestConnection));
Expand Down
10 changes: 10 additions & 0 deletions extensions/funqy/funqy-amazon-lambda/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.funqy.deployment.bindings;

import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record;

import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.pkg.steps.NativeBuild;

public class FunqyAmazonLambdaProcessor {

@BuildStep(onlyIf = NativeBuild.class)
public void process(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
reflectiveClass.produce(ReflectiveClassBuildItem.builder(
// SQS
SQSEvent.class.getName(),
SQSEvent.SQSMessage.class.getName(),
SQSEvent.MessageAttribute.class.getName(),
SQSBatchResponse.class.getName(),
SQSBatchResponse.BatchItemFailure.class.getName(),
// SNS
SNSEvent.class.getName(),
SNSEvent.SNSRecord.class.getName(),
SNSEvent.SNS.class.getName(),
// Kinesis
KinesisEvent.class.getName(),
KinesisEvent.KinesisEventRecord.class.getName(),
Record.class.getName(),
StreamsEventResponse.class.getName(),
StreamsEventResponse.BatchItemFailure.class.getName(),
// DynamoDB
DynamodbEvent.class.getName(),
DynamodbEvent.DynamodbStreamRecord.class.getName()
).constructors().methods().fields().build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.quarkus.funqy.deployment.FunctionBuildItem;
import io.quarkus.funqy.deployment.FunctionInitializedBuildItem;
import io.quarkus.funqy.lambda.FunqyLambdaBindingRecorder;
import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig;
import io.quarkus.funqy.lambda.config.FunqyAmazonConfig;
import io.quarkus.funqy.runtime.FunqyConfig;
import io.quarkus.runtime.LaunchMode;

Expand All @@ -37,17 +39,19 @@ public void init(List<FunctionBuildItem> functions,
BuildProducer<FeatureBuildItem> feature,
Optional<FunctionInitializedBuildItem> hasFunctions,
LambdaObjectMapperInitializedBuildItem mapperDependency,
BeanContainerBuildItem beanContainer) throws Exception {
BeanContainerBuildItem beanContainer,
FunqyAmazonBuildTimeConfig buildTimeConfig) throws Exception {
if (!hasFunctions.isPresent() || hasFunctions.get() == null)
return;
feature.produce(new FeatureBuildItem(FUNQY_AMAZON_LAMBDA));
recorder.init(beanContainer.getValue());
recorder.init(beanContainer.getValue(), buildTimeConfig);
}

@BuildStep
@Record(RUNTIME_INIT)
public RuntimeComplete choose(FunqyConfig config, FunqyLambdaBindingRecorder recorder) {
recorder.chooseInvoker(config);
public RuntimeComplete choose(FunqyConfig config, FunqyAmazonConfig amazonConfig,
FunqyLambdaBindingRecorder recorder) {
recorder.chooseInvoker(config, amazonConfig);
return new RuntimeComplete();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.quarkus.funqy.test;

import static io.quarkus.funqy.test.util.EventDataProvider.getData;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.test.util.EventDataProvider;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class AnyFunctionTest {
@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addAsResource("any-function.properties", "application.properties")
.addAsResource("events/any", "events")
.addClasses(TestFunctions.class, Item.class,
EventDataProvider.class));

@Test
public void should_return_no_failures_if_processing_is_ok() {
// given
var body = getData("ok.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
response.then().statusCode(204);
}

@Test
public void should_return_one_failure_if_processing_fails() {
// given
var body = getData("fail.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
response.then().statusCode(500);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.quarkus.funqy.test;

import static io.quarkus.funqy.test.util.EventDataProvider.getData;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.test.model.BatchItemFailures;
import io.quarkus.funqy.test.model.ItemFailure;
import io.quarkus.funqy.test.util.EventDataProvider;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class CloudEventsEventFunctionTest {
@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.overrideRuntimeConfigKey("quarkus.funqy.export", "cloudevents-function")
.withApplicationRoot((jar) -> jar
.addAsResource("item-function.properties", "application.properties")
.addAsResource("events/cloudevents", "events")
.addClasses(TestFunctions.class, Item.class,
BatchItemFailures.class, ItemFailure.class,
EventDataProvider.class));

@Test
public void should_return_no_failures_if_processing_is_ok() {
// given
var body = getData("ok.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = response.then().statusCode(200)
.extract().body().as(BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), is(empty()));
}

@Test
public void should_return_one_failure_if_processing_fails() {
// given
var body = getData("fail.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = response.then().statusCode(200)
.extract().body().as(BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), hasSize(1));
assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.quarkus.funqy.test;

import static io.quarkus.funqy.test.util.EventDataProvider.getData;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.test.model.BatchItemFailures;
import io.quarkus.funqy.test.model.ItemFailure;
import io.quarkus.funqy.test.util.EventDataProvider;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class CloudEventsFunctionTest {
@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addAsResource("item-function.properties", "application.properties")
.addAsResource("events/cloudevents", "events")
.addClasses(TestFunctions.class, Item.class,
BatchItemFailures.class, ItemFailure.class,
EventDataProvider.class));

@Test
public void should_return_no_failures_if_processing_is_ok() {
// given
var body = getData("ok.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = response.then().statusCode(200)
.extract().body().as(BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), is(empty()));
}

@Test
public void should_return_one_failure_if_processing_fails() {
// given
var body = getData("fail.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = response.then().statusCode(200)
.extract().body().as(BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), hasSize(1));
assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.quarkus.funqy.test;

import static io.quarkus.funqy.test.util.EventDataProvider.getData;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.test.model.BatchItemFailures;
import io.quarkus.funqy.test.model.ItemFailure;
import io.quarkus.funqy.test.util.EventDataProvider;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class DynamoDbEventFunctionTest {
@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.overrideRuntimeConfigKey("quarkus.funqy.export", "dynamodb-function")
.withApplicationRoot((jar) -> jar
.addAsResource("item-function.properties", "application.properties")
.addAsResource("events/dynamodb", "events")
.addClasses(TestFunctions.class, Item.class,
BatchItemFailures.class, ItemFailure.class,
EventDataProvider.class));

@Test
public void should_return_no_failures_if_processing_is_ok() {
// given
var body = getData("ok.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = response.then().statusCode(200)
.extract().body().as(BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), is(empty()));
}

@Test
public void should_return_one_failure_if_processing_fails() {
// given
var body = getData("fail.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = response.then().statusCode(200)
.extract().body().as(BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), hasSize(1));
assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
}

@Test
public void should_return_no_failures_if_processing_pipes_is_ok() {
// given
var body = getData("pipes-ok.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = response.then().statusCode(200)
.extract().body().as(BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), is(empty()));
}

@Test
public void should_return_one_failure_if_processing_pipes_fails() {
// given
var body = getData("pipes-fail.json");

// when
var response = RestAssured.given().contentType("application/json")
.body(body)
.post("/");

// then
var respBody = response.then().statusCode(200)
.extract().body().as(BatchItemFailures.class);
assertThat(respBody.batchItemFailures(), hasSize(1));
assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1"));
}
}
Loading

0 comments on commit 2463297

Please sign in to comment.