Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: made it easier to run a batch consumer and added methods to convert between normal and batch consumer callbacks #1180

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

ajscholl
Copy link
Contributor

@ajscholl ajscholl commented Nov 21, 2024

This allows you to use application.RunBatchConsumer similary to how you would use application.RunConsumer to run a batch consumer. It also stream.ConsumerToBatchConsumer and stream.ConsumerToParallelBatchConsumer adapt single consumers to batch consumers.

You should set stream.consumer..batch_size to take advantage of batching as it defaults to 1.

TODO:

  • Add tests

This exposes additional options for AWS clients to the config files.
These include:

- Disabling request compression and min compression request size for cloudwatch
- Disabling response checksum validation and accepting gzip compression for dynamodb
- The usePathStyle setting was documented for S3 (but it was already exposed before)

These settings are needed when talking to services implementing the AWS
API like localstack or ScyllaDB as these services sometimes can't map
all features exactly.
…onvert between normal and batch consumer callbacks;

This allows you to use `application.RunBatchConsumer` similary to how
you would use `application.RunConsumer` to run a batch consumer. It also
adds `stream.ConsumerToBatchConsumer` and `stream.ConsumerToParallelBatchConsumer`
to adapt single consumers to batch consumers.

You should set stream.consumer.<name>.batch_size to take advantage of batching as it defaults to 1.
@ajscholl ajscholl force-pushed the make-batch-consumer-available branch from 201943f to d113b65 Compare November 21, 2024 08:29
…unner;

Right now, `stream.consumer.<consumer>.runner_count` must be equal to 1
when using the batch consumer as the batch consumer is not thread safe.
This commit fixes this and thus allows processing and acknowledging more
than one batch at once.
This commit allows the batch consumer to acknowledge aggregate messages.
Until now, the consumer would struggle to find the correct message ids
to delete the messages from something like SQS, making it impossible to
actually acknowledge an aggregate message in a batch. We now correctly
preserve the original message as well as added some delayed context
cancels in case of needing to write a message to a retry queue
(otherwise, the normal consumer would just lose the message if the
queue doesn't have any native retry functionality).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant