-
Notifications
You must be signed in to change notification settings - Fork 146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: Support Sequential Async Processing of Records for SqsFifoPartialProcessor #3140
Comments
Thanks for opening your first issue here! We'll come back to you as soon as we can. |
Hi @amaral-ng, thank you for opening this feature request. I think it makes total sense to add this, as long as we process the items one by one and await each promise before moving onto the next one. I've also added the |
Ah yes, I wanted to plus 1 this as I also spent a fair bit of time on this one. I eventually came to the same conclusion - that it's not currently supported, so I ended up dropping powertools for my use case and writing my own boilerplate here. In my use case, I wanted to not only use |
Hi @bml1g12, thanks for the added context, this is very helpful. May I ask how you'd be doing the rate limiting part? Do you maintain a separate persistence layer? How do you identify a request/operation? We're considering a rate limiting feature since we've had some other customers requesting it, and this info would be valuable. |
This is the approach I'm using currently, i.e. rate limiting within the handler: import pThrottle from "p-throttle"
...
const handler: SQSHandler = async (event: SQSEvent, context: Context) => {
...
const throttle = pThrottle({
limit: config.CallsPerSecondLimit,
interval: 1000,
strict: true,
})
const throttled = throttle(async (record: SqsRecord) => {
log.debug("Processing a record from SQS", { local_context: { record } })
await processRecord(record, sqsClient, "start")
})
for (const record of result.data.Records) {
await throttled(record) It would be even better if there was a convenience tool for global rate limiting across all lambdas - as it's a common problem we face when we have different lambda execution contextx running and hitting AWS imposed API rate limits I appreciate maybe a better way would be to use DynamoDB to store the number of calls in last minute and use that instead, to provide persistence between lambda handlers - but also a lot more complex to implement and maintain |
I am interested in this. Currently, There may be other solutions worth exploring, but this is the one that comes to mind. Let me know your thoughts, @dreamorosi. |
Hey @arnabrahman, ideally that would be the way to go, but unfortunately I think it would constitute a breaking change - even though I doubt many people use the I think adding this now will mean we have to do the opposite:
We'll then add to the v3 backlog an action item to swap the two in the next major version. In v3, Regarding the order of processing, yes, we'll need to always keep them sequential to avoid ordering issues. What do you think? |
Why not extend to Since |
Hey @arnabrahman, I'm not familiar with mixins but I'm open to try. I'd say let's move forward and continue the discussion on the PR. I'm sure it'll be easier to talk once we have the code. Thanks for the ideas! |
This issue is now closed. Please be mindful that future comments are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so. |
This is now released under v2.11.0 version! |
Use case
I am working with a FIFO SQS queue that requires processing batch records in an asynchronous manner. However, to maintain the order of messages, the
SqsFifoPartialProcessor
currently only supports sequential synchronous processing. This limitation prevents me from using asynchronous processing in my FIFO queue handler, which is essential for my use case.Solution/User Experience
I propose enhancing the
SqsFifoPartialProcessor
to support sequential asynchronous processing while maintaining message ordering. This approach would be similar to the solution implemented here, but tailored to work with theSqsFifoPartialProcessor
. This would allow users to leverage asynchronous processing within FIFO queues without sacrificing the ordering guarantees.Alternative solutions
No response
Acknowledgment
Future readers
Please react with 👍 and your use case to help us understand customer demand.
The text was updated successfully, but these errors were encountered: