Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Funqy Amazon Lambda should support more Amazon events #40396

Merged
merged 1 commit into from
Jul 21, 2024

Conversation

holomekc
Copy link
Contributor

@holomekc holomekc commented May 1, 2024

Why this feature. We like the Funqy extension. Sadly it makes the usage in AWS a bit difficult at the moment, because although Funqy seems to be cloud agnostic this is not really the case when Event driven sources are used as trigger for a lambda function. E.g. SQS. When using SQS it was necessary to use the SQS model in the funq method, which reduces the benefit of a cloud agnostic solution. Furthermore, it prevents the usage of features of AWS batching.

Requirements

To solve that I thought about the following requirements:

  • Check which trigger exists in AWS Lambda and select at least a sub set. I decided for SQS, SNS, DynamoDB and Kinesis.
  • Realized that multiple triggers can be used for a Lambda at the same time
  • Support the batching in case this is possible
  • Support for CloudEvents
  • configurable so that the new "advanced" features can be enabled or disabled

So I checked the following documentation of AWS:
AWS Lambda: https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html
AWS EventBridge Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html

I checked AWS EventBridge Pipes, because this allows to use CloudEvents in AWS. E.g.:
image
With an input transformer similar to this:
image

So what did I do

  • Add CloudEvents dependency
  • Adjust the deserialization of messages for AWS. Due to the fact that it is not clear which event the logic receives it is a custom deserializer. It checks the content of the records to identify the type and then deserializes it by type.
  • If the type is determined try to deserialize the content of the event to the type of the Funqy method parameter.
  • Wrap the invocation so that I can catch failures and create the batch item failures response
  • Deserialization of CloudEvents

What is tricky and I could need feedback for

  1. Native build. I am not sure how I can see which classes I need to add for reflection. I added the one for the different Events, but I am unsure how to see that. When we tested funqy so far we also received an error, because of io.quarkus.amazon.lambda.runtime.FunctionError, which was missing. But, the code seems to add it for reflection. We were forced to add it manually to reflection, so this made me more unsure how to handle this topic.
  2. The deserialization. I created a copy of the ObjectMapper, which was retrieved via ARC as it seems. It felt wrong to register a deserializer for Object, which might influence more logic. It was strange though, because the amazon-lambda (not funqy) already reconfigures the ObjectMapper. I tried to reuse the ObjectReader and ObjectWriter for the funqy method, which seems like a good idea, but I was not sure, if it was worth it to create a dedicated reader and writer for all the AWS events.
  3. I am unsure about the processing of the messages. E.g. in SQS I receive 10 messages in one event and the funq method is called for all of them. I used parallel execution. I guess this should be configurable what do you think?
  4. I tried to understand why some variables are staticly defined. I think it is because of the different executions. Some static (build time?) and some for run time. This created a bit of a strange approach, where I needed to update the EventProcessor, which was created during build time? I hope this is ok. Maybe you have a better idea

Last words

I know this PR looks huge. There are a lot of json test files though. I would love if you could take a look and tell me what you think. Help would be appreciated.

@quarkus-bot quarkus-bot bot added area/amazon-lambda area/dependencies Pull requests that update a dependency file area/funqy labels May 1, 2024
Copy link

quarkus-bot bot commented May 1, 2024

Thanks for your pull request!

The title of your pull request does not follow our editorial rules. Could you have a look?

  • title should preferably start with an uppercase character (if it makes sense!)
  • title should not start with chore/docs/feat/fix/refactor but be a proper sentence

This message is automatically generated by a bot.

@holomekc holomekc changed the title feat: Funqy Amazon Lambda should support more Amazon events Funqy Amazon Lambda should support more Amazon events May 1, 2024
@holomekc
Copy link
Contributor Author

Hi @cescoffier @geoand

I am not sure, but I think normally the prs get assigned to a person automatically. Does this mean nobody is responsible for Funqy anymore, and does this mean you drop the support for Funqy?

Br,
Chris

@geoand
Copy link
Contributor

geoand commented May 10, 2024

Neither assumption is correct, we simply missed this.

@patriot1burke is responsible for Funqy

@holomekc
Copy link
Contributor Author

@geoand thx for the extremly fast response.

@geoand
Copy link
Contributor

geoand commented May 10, 2024

🙏

@geoand geoand requested a review from patriot1burke May 10, 2024 17:51
@patriot1burke
Copy link
Contributor

AWS supports cloud events now?

@patriot1burke
Copy link
Contributor

This is really good. Let me dive in next week.

@holomekc
Copy link
Contributor Author

Hi @patriot1burke,

This is really good. Let me dive in next week.

Sounds great. Let me know what I messed up and can be improved.

AWS supports cloud events now?

Yes AWS supports CloudEvents. At least kind of. Via AWS EventBridge: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-api-destinations-cloudevents.html

Br,
Chris

@holomekc
Copy link
Contributor Author

Hi @patriot1burke,

I do not really want to change the topic of this pr, but I noticed this log during development in CloudWatch:
ERROR [io.qua.ama.lam.run.AbstractLambdaPollLoop] (Lambda Thread (NORMAL)) Error running lambda (NORMAL) [Error Occurred After Shutdown]: java.net.SocketException: Socket closed

After checking I found this line of code:

Does not this mean that the LambdaPollLoop is also executed in production in case a native build is used? Is this intended? For me this class looks like it should be used for testing (dev / test) purposes only. Am I missing something?

@patriot1burke
Copy link
Contributor

@holomekc The poll loop runs when you deploy a quarkus lambda as a native executable. It will also run in dev and test mode along with a mock event server.

@patriot1burke
Copy link
Contributor

@holomekc Oh, and yes, it is intended to work with native executable. Native executables fall under custom lambdas:

https://docs.aws.amazon.com/lambda/latest/dg/runtimes-custom.html

@holomekc
Copy link
Contributor Author

Hi @patriot1burke,

thx for the information. I totally missed that.

@gsmet
Copy link
Member

gsmet commented Jun 6, 2024

Where are we on this one?

@holomekc
Copy link
Contributor Author

Hi @gsmet ,

I am still waiting for more feedback. Especially for the four things I mentioned in my initial post.

@patriot1burke
Copy link
Contributor

It seems my review comments were in "pending"....not sure how to get them out of "pending"

@patriot1burke
Copy link
Contributor

I wrote this comment weeks ago after looking at your test functions:

If I'm making bad assumptions about your submition, let me know and I'll dive deeper.

So...The WHOLE point of funqy is to be function agnostic. If you are adding support to pull in AWS event types into your application code, what is the point of using funqy? Just use AWS Lambda APIs instead.

What I thought you were doing was allowing different AWS event types behind the scenes so

@FunQ
public void myFunc(MyType message) {
}

I thought you were adding support for different AWS message types that could unpack the AWS message and deserialize it to a application specific type.

}

@Funq("sqs-function")
public Uni<Void> sqsFunction(SQSEvent.SQSMessage msg) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm making bad assumptions about your submition, let me know and I'll dive deeper.

So...The WHOLE point of funqy is to be function agnostic. If you are adding support to pull in AWS event types into your application code, what is the point of using funqy? Just use AWS Lambda APIs instead.

What I thought you were doing was allowing different AWS event types behind the scenes so

@Funq
public void myFunc(MyType message) {
}

I thought you were adding support for different AWS message types that could unpack the AWS message and deserialize it to a application specific type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is exactly that. In this class I have different Funq methods. The "item-function" is the one you are looking for. When you then check SqsFunctionTest (https://github.com/quarkusio/quarkus/pull/40396/files#diff-b5a78b7cdaacb019453a4395d8a8af3644d689f4cdddc512ca5f550d7e9cca70) You can see that this test is only using the "item-function" for testing. So In this case the user only works with the message model, which is "Item" in this case and no cloud specific models are used.

The other methods exists to also test if the user wants to use the actual event models from the AWS SDK. This is then tested in SqsEventFunctionTest (https://github.com/quarkusio/quarkus/pull/40396/files#diff-316ae74a342b75733f0d16d2e40e8acffa8ad938cf431c789117cdb2e6c0d588) There you can see that the funq method is changed for the tests. If this is necessary to test I am not sure.

There is one exception though. The DynamoDB events. Due to its structure (newItem, oldItem) it is not really possible to deserialize it to an internal model. So I am not sure what to do there. But there is still the special treatment for the batching.

I hope this is understandable. If you need further info, please let me know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some addition. I did it like this, because I saw the same handling in other funqy extensions regarding CloudEvents. It was allows to use the internal message model and it was allowed to use the CloudEvents model. But yeah there is a difference, because CloudEvents is not specific to a cloud provided, so maybe there it makes more sense. But I am not sure if this really hurts if it is possible to use the specific model as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloudEvents is a special case. The serverless team at red hat needed a function model for cloud events specification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the removal of the content type in the poll loop I needed to adjust the tests, while I did that I added a short description in jdoc why a test exists and what it covers. Maybe it also provides some insights why I implemented it like it is. If not please let me know if I can help somehow.

@patriot1burke
Copy link
Contributor

I swear I clicked "Submit review" weeks ago....sorry for holding this up.

@holomekc
Copy link
Contributor Author

Hi @patriot1burke ,

thx for the feedback and no worries. I was on vacation and on sick leave anyway so no holding up here.

One comment is doubled. I added my feedback to the inline comment.

I could need some feedback regarding the EventDeserializer though. See: https://github.com/quarkusio/quarkus/pull/40396/files#diff-7c7bb52403a3ef27cd1c40c3e4a3959781175e3d05033a6b4d66663894b15996

@holomekc
Copy link
Contributor Author

holomekc commented Jul 2, 2024

Hi @patriot1burke
Friendly reminder. Did you have the time to check my answers already?

@holomekc
Copy link
Contributor Author

holomekc commented Jul 4, 2024

I am working on some refactoring, so that it is closer to the existing structure. I am close, but still need a little bit of time. I will try to do that asap

@holomekc
Copy link
Contributor Author

holomekc commented Jul 7, 2024

Hi @patriot1burke,

I refactored some parts. It should be less invasive now, and hopefully easier to understand. It would be awesome if you can take a look. The logic did not change though, except for Kinesis, where I had a bug to fix.

@patriot1burke
Copy link
Contributor

Great work! Honestly, really great work.

Any reason why advanced mode wouldn't be on by default? Is this a performance issue? If so, I'd say turn it on by default until somebody has a problem with performance of vanilla Funqy.

@quarkus-bot quarkus-bot bot added the area/devtools Issues/PR related to maven, gradle, platform and cli tooling/plugins label Jul 8, 2024
@quarkus-bot quarkus-bot bot added the area/gradle Gradle label Jul 8, 2024
@holomekc
Copy link
Contributor Author

holomekc commented Jul 8, 2024

Thx for the kind words. I hope you do not hate me now, because I realized I forgot to push :( I am so sorry. Not sure what happened.

To make it easier. I did not change the deserialization much and also not the event handling. I just moved most logic to the AwsEventInputReader, which is implementing the LambdaInputReader interface, which seems to be the interface to use here. This results in less changes in the FunqyLambdaBindingRecorder.

The AwsEventInputReader uses the ObjectMapper to deserialize the event and to look into the json structure to determine the actual type. If the type is found it continues deserialization to this specific event. Then the message is extracted from the event (EventHandler) and the already existing reader and writer from the "old" approach is used to deserialize to the model of the funqy method. Nothing else really changed regarding the logic. Also the batching stays the same.

I did the changes mainly to reduce the complexity in the FunqyLambdaBindingRecorder

Regarding performance. Not 100% sure, but I do not think so. With Jackson I read the json first to figure out the event as described above. So this could in theory influence performance. Should not be much, because it navigates through maps for certain indicators. So yes in theory I could remove the config. I will think about that.

Br and sorry once again,
Chris

Edit: I rebased the branch to be up-to-date. I can squash the commits if you think my refactoring was ok.

@holomekc
Copy link
Contributor Author

holomekc commented Jul 8, 2024

Maybe you can help me here: https://github.com/quarkusio/quarkus/pull/40396/files#diff-025bb7ab865d989d9f60525c861771a6805efa192f5d22a60ace6df5f92769c0

I do not understand how this stuff works. I am afraid of breaking the native builds, so that it does not find the necessary models during Jackson deserialization. I could not figure out how I could write a test for native mode. Maybe I also need to add the cloud events model?

This comment has been minimized.

@holomekc
Copy link
Contributor Author

holomekc commented Jul 9, 2024

I will check why the test are failing...
Edit: I love "works on my machine"
image

I will check if I can reproduce this behavior

Edit: Ok the issue is fixed already in the commit I forgot to push...

@patriot1burke
Copy link
Contributor

Maybe you can help me here: https://github.com/quarkusio/quarkus/pull/40396/files#diff-025bb7ab865d989d9f60525c861771a6805efa192f5d22a60ace6df5f92769c0

I do not understand how this stuff works. I am afraid of breaking the native builds, so that it does not find the necessary models during Jackson deserialization. I could not figure out how I could write a test for native mode. Maybe I also need to add the cloud events model?

Best approach is to investigate the deployment to only add the classes you need as reflective classes. Can you even know how Funqy will be deployed though at build time?

Also, any reason you can't turn on the advanced features by default?

@holomekc
Copy link
Contributor Author

holomekc commented Jul 10, 2024

I will check. I think I can also build one of our native lambdas locally, with this quarkus version, and check how it behaves.
Regarding the advanced feature. I can remove it and use it as the new default.

I will try to do that asap.

@holomekc holomekc force-pushed the main branch 2 times, most recently from f00342b to 8099e82 Compare July 13, 2024 10:19
@holomekc
Copy link
Contributor Author

holomekc commented Jul 13, 2024

Hi @patriot1burke
I hope I covered everything now.
A colleague told me that I probably misunderstood you, you did not want me to remove the advanced event handling config, but to just use it as the default. Is that correct, because I did that now.

Besides that only debug logging added and minor log message changes.

Besides that I think the pr is complete now.

Edit: Not sure about missing documentation though.

This comment has been minimized.

@holomekc
Copy link
Contributor Author

I will take a look.

@holomekc holomekc force-pushed the main branch 2 times, most recently from e1ea745 to 0bc0a66 Compare July 18, 2024 17:23
@holomekc
Copy link
Contributor Author

holomekc commented Jul 18, 2024

Hi @patriot1burke
Ok 42 time's the charm I guess.
I rebased and changed the following:

  1. AwsEventOutputWriter: Use application/json output now as the JacksonOutputWriter does. Sorry for that. I did not see that.
  2. AwsEventInputReader and AwsEventOutputWriter wrap their logic safely and catch and wrap Jackson related exceptions, so that not too many details about the actual issue are provided via the lambda response. Instead, it is visible in the logs and set as caused with a more generic exception.
  3. Adjusted the GreetTestBase, because the body message for the NPE / 500 case is different now.

I hope I covered it now. If something is still not ok, please let me know

This comment has been minimized.

@holomekc
Copy link
Contributor Author

Hi @patriot1burke,
should I worry about PluginCatalogServiceTest.shouldSyncWhenProjectFileIsNewerThanCatalog
Not sure what this means.

This comment has been minimized.

- Support for SQS, SNS, DynamoDB and Kinesis, with focus on
  the batching feature of AWS. Furthermore, add support for
  CloudEvents.
- Add tests for funqy amazon lambda
- Refactor, so that the implementation is less invasive.
  Using the JacksonInputReader and JacksonOutputWriter for
  the advanced event handling.
- Fix issue in Kinesis event handling
- Add logging
- Advanced event handling is the new default
Copy link

quarkus-bot bot commented Jul 20, 2024

Status for workflow Quarkus CI

This is the status report for running Quarkus CI on commit 19c3783.

✅ The latest workflow run for the pull request has completed successfully.

It should be safe to merge provided you have a look at the other checks in the summary.

You can consult the Develocity build scans.


Flaky tests - Develocity

⚙️ JVM Tests - JDK 21

📦 extensions/hibernate-orm/deployment

io.quarkus.hibernate.orm.HibernateHotReloadDevModeTest.testImportSqlWithContinuousTesting - History

  • Failed to wait for test run 3 State{lastRun=1, running=true, inProgress=false, run=1, passed=1, failed=0, skipped=0, isBrokenOnly=false, isTestOutput=false, isInstrumentationBasedReload=false, isLiveReload=true} - org.awaitility.core.ConditionTimeoutException
org.awaitility.core.ConditionTimeoutException: Failed to wait for test run 3 State{lastRun=1, running=true, inProgress=false, run=1, passed=1, failed=0, skipped=0, isBrokenOnly=false, isTestOutput=false, isInstrumentationBasedReload=false, isLiveReload=true}
	at io.quarkus.test.ContinuousTestingTestUtils.waitForNextCompletion(ContinuousTestingTestUtils.java:44)
	at io.quarkus.hibernate.orm.HibernateHotReloadDevModeTest.testImportSqlWithContinuousTesting(HibernateHotReloadDevModeTest.java:89)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: org.awaitility.core.ConditionTimeoutException: Condition returned by method "waitForNextCompletion" in class io.quarkus.test.ContinuousTestingTestUtils was not fulfilled within 1 minutes.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)

@patriot1burke patriot1burke merged commit cd78946 into quarkusio:main Jul 21, 2024
52 checks passed
@patriot1burke
Copy link
Contributor

Nice work. Thanks for sticking with this so long!

@quarkus-bot quarkus-bot bot added this to the 3.14 - main milestone Jul 21, 2024
@holomekc
Copy link
Contributor Author

Nice. Thx for being so patient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/amazon-lambda area/dependencies Pull requests that update a dependency file area/devtools Issues/PR related to maven, gradle, platform and cli tooling/plugins area/funqy area/gradle Gradle triage/flaky-test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants